From f10dfe629e53a5e53440dbcf31869378cacf866f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zieli=C5=84ski?= Date: Mon, 15 Dec 2025 20:05:26 +0100 Subject: [PATCH] WIP: AI Validator --- CLAUDE.md | 381 ++++++++++++++ DiunaBI.API/appsettings.json | 14 +- DiunaBI.Application/DTOModels/LayerDto.cs | 3 +- DiunaBI.Domain/Entities/Layer.cs | 1 + DiunaBI.Domain/Entities/QueueJob.cs | 3 +- .../DiunaBI.Infrastructure.csproj | 2 + .../Interfaces/IDataValidator.cs | 11 + ..._AddValidationLayerAndJobTypes.Designer.cs | 489 +++++++++++++++++ ...214143012_AddValidationLayerAndJobTypes.cs | 22 + .../Plugins/BaseDataValidator.cs | 21 + .../Services/JobSchedulerService.cs | 106 +++- .../Services/JobWorkerService.cs | 13 + .../Services/PluginManager.cs | 36 +- .../Validators/LlmAnomalyValidator.cs | 496 ++++++++++++++++++ DiunaBI.UI.Shared/Pages/Layers/Details.razor | 83 +++ .../Pages/Layers/Details.razor.cs | 14 +- 16 files changed, 1686 insertions(+), 9 deletions(-) create mode 100644 CLAUDE.md create mode 100644 DiunaBI.Infrastructure/Interfaces/IDataValidator.cs create mode 100644 DiunaBI.Infrastructure/Migrations/20251214143012_AddValidationLayerAndJobTypes.Designer.cs create mode 100644 DiunaBI.Infrastructure/Migrations/20251214143012_AddValidationLayerAndJobTypes.cs create mode 100644 DiunaBI.Infrastructure/Plugins/BaseDataValidator.cs create mode 100644 DiunaBI.Infrastructure/Validators/LlmAnomalyValidator.cs diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..4bca5d2 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,381 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +DiunaBI is a full-stack Business Intelligence platform built on .NET 10.0 with a multi-tier Clean Architecture. It provides a plugin-based system for importing, processing, and exporting business data with real-time updates, job queue management, and comprehensive audit trails. + +**Tech Stack:** +- Backend: ASP.NET Core 10.0 Web API +- Frontend: Blazor Server + MAUI Mobile (iOS, Android, Windows, macOS) +- Database: SQL Server + EF Core 10.0 +- UI Framework: MudBlazor 8.0 +- Real-time: SignalR (EntityChangeHub) +- Authentication: JWT Bearer + Google OAuth +- External APIs: Google Sheets API, Google Drive API +- Logging: Serilog (Console, File) + +## Common Commands + +### Build and Run + +```bash +# Build the entire solution +dotnet build DiunaBI.sln + +# Build specific project +dotnet build DiunaBI.API/DiunaBI.API.csproj + +# Run API (backend) +dotnet run --project DiunaBI.API/DiunaBI.API.csproj + +# Run Web UI (Blazor Server) +dotnet run --project DiunaBI.UI.Web/DiunaBI.UI.Web.csproj + +# Clean build artifacts +dotnet clean DiunaBI.sln +``` + +### Database Migrations + +```bash +# Add new migration +dotnet ef migrations add --project DiunaBI.Infrastructure --startup-project DiunaBI.API + +# Apply migrations to database +dotnet ef database update --project DiunaBI.Infrastructure --startup-project DiunaBI.API + +# Remove last migration (if not applied) +dotnet ef migrations remove --project DiunaBI.Infrastructure --startup-project DiunaBI.API + +# List all migrations +dotnet ef migrations list --project DiunaBI.Infrastructure --startup-project DiunaBI.API +``` + +### Testing + +```bash +# Run all tests +dotnet test DiunaBI.Tests/DiunaBI.Tests.csproj + +# Run tests with detailed output +dotnet test DiunaBI.Tests/DiunaBI.Tests.csproj --logger "console;verbosity=detailed" +``` + +### Plugin Development + +```bash +# Build plugin project (automatically copies DLL to API bin/Plugins/) +dotnet build DiunaBI.Plugins.Morska/DiunaBI.Plugins.Morska.csproj +dotnet build DiunaBI.Plugins.PedrolloPL/DiunaBI.Plugins.PedrolloPL.csproj + +# Plugins are auto-loaded from bin/Plugins/ at API startup +``` + +### Docker (Production) + +```bash +# Build API container (with Morska plugin) +docker build -f DiunaBI.API/Dockerfile -t diunabi-api --build-arg PLUGIN_PROJECT=DiunaBI.Plugins.Morska . + +# Build API container (with PedrolloPL plugin) +docker build -f DiunaBI.API/Dockerfile -t diunabi-api --build-arg PLUGIN_PROJECT=DiunaBI.Plugins.PedrolloPL . +``` + +## Architecture + +### Solution Structure (10 Projects) + +**DiunaBI.API** - ASP.NET Core Web API +- Controllers: Auth, Layers, Jobs, DataInbox +- Hubs: EntityChangeHub (SignalR) +- Services: GoogleAuthService, JwtTokenService +- API Key authorization via [ApiKeyAuth] attribute + +**DiunaBI.Domain** - Domain entities (9 entities) +- User, Layer, Record, RecordHistory, QueueJob, DataInbox, ProcessSource + +**DiunaBI.Application** - DTOs and application models +- LayerDto, RecordDto, UserDto, RecordHistoryDto, JobDto, PagedResult + +**DiunaBI.Infrastructure** - Data access and core services +- Data: AppDbContext, 47 EF Core migrations +- Interceptors: EntityChangeInterceptor (auto-broadcasts DB changes via SignalR) +- Services: PluginManager, JobSchedulerService, JobWorkerService +- Helpers: GoogleSheetsHelper, GoogleDriveHelper +- Plugin Base Classes: BaseDataImporter, BaseDataProcessor, BaseDataExporter + +**DiunaBI.UI.Web** - Blazor Server web application + +**DiunaBI.UI.Mobile** - MAUI mobile application (iOS, Android, Windows, macOS) + +**DiunaBI.UI.Shared** - Shared Blazor component library +- Pages/: Feature-based folders (Layers/, Jobs/, DataInbox/) +- Components/: Layout/ (MainLayout, EmptyLayout, Routes), Auth/ (AuthGuard, LoginCard) +- Services/: LayerService, JobService, DataInboxService, EntityChangeHubService, AuthService + +**DiunaBI.Plugins.Morska** - Production plugin (17 total) +- 4 Importers: Standard, D1, D3, FK2 +- 12 Processors: D6, T1, T3, T4, T5 variants +- 1 Exporter: Google Sheets export + +**DiunaBI.Plugins.PedrolloPL** - Production plugin (1 total) +- 1 Importer: B3 (DataInbox → Layer with dictionary mapping) + +**DiunaBI.Tests** - Unit and integration tests + +### Data Flow Architecture + +1. **Import Flow**: External data → DataInbox API → Import Plugin → Import Layer +2. **Process Flow**: Import Layer → Process Plugin → Processed Layer +3. **Export Flow**: Processed Layer → Export Plugin → Google Sheets/Drive +4. **Real-time Updates**: DB Change → EntityChangeInterceptor → SignalR Hub → All Clients + +### Plugin System + +**Plugin Discovery:** +- Plugins loaded from `bin/Plugins/` directory at startup +- PluginManager scans assemblies for IDataImporter, IDataProcessor, IDataExporter implementations +- Plugins auto-registered in DI container + +**Plugin Base Classes** (in DiunaBI.Infrastructure/Plugins/): +- `BaseDataImporter`: Abstract base for importers, access to AppDbContext, GoogleSheetsHelper, GoogleDriveHelper +- `BaseDataProcessor`: Abstract base for processors, access to AppDbContext, PluginManager +- `BaseDataExporter`: Abstract base for exporters, access to AppDbContext, GoogleSheetsHelper, GoogleDriveHelper + +**Plugin Execution:** +- Importers: Read external data sources, create Layer + Records +- Processors: Read source Layers, apply transformations, create target Layers + Records +- Exporters: Read Layers, write to Google Sheets/Drive + +**Plugin Configuration:** +- Stored in Administration layers (Type = ImportWorker/ProcessWorker) +- Records with Code = "Plugin", "Priority", "MaxRetries" define job configs +- JobSchedulerService reads configs and creates QueueJobs + +### Job Queue System + +**Components:** +- `QueueJob` entity: LayerId, PluginName, JobType (Import/Process), Status (Pending/Running/Completed/Failed/Retrying), Priority (0 = highest) +- `JobSchedulerService`: Creates jobs from Administration layer configs +- `JobWorkerService`: Background service polling every 5 seconds, executes jobs via PluginManager +- Retry logic: 30s → 2m → 5m delays, max 5 retries + +**Job Lifecycle:** +1. Creation: JobSchedulerService or manual via `/jobs/create-for-layer/{layerId}` +2. Queued: Status = Pending, sorted by CreatedAt DESC then Priority ASC +3. Execution: JobWorkerService picks up, Status = Running +4. Completion: Status = Completed or Failed +5. Retry: On failure, Status = Retrying with exponential backoff +6. Real-time: All status changes broadcast via SignalR to UI + +**Job Scheduling Endpoints:** +- `POST /jobs/ui/schedule` - Schedule all jobs (JWT auth for UI users) +- `POST /jobs/ui/schedule/imports` - Schedule import jobs only (JWT auth) +- `POST /jobs/ui/schedule/processes` - Schedule process jobs only (JWT auth) +- `POST /jobs/schedule/{apiKey}` - Schedule all jobs (API key auth for cron) +- `POST /jobs/schedule/imports/{apiKey}` - Schedule import jobs (API key auth) +- `POST /jobs/schedule/processes/{apiKey}` - Schedule process jobs (API key auth) + +### Real-time Updates (SignalR) + +**Architecture:** +- Hub: `/hubs/entitychanges` (requires JWT authentication) +- Interceptor: `EntityChangeInterceptor` captures EF Core changes (Added, Modified, Deleted) +- Broadcast: After SaveChanges, sends `EntityChanged(module, id, operation)` to all clients +- Modules: QueueJobs, Layers, Records, RecordHistory + +**UI Integration:** +- `EntityChangeHubService`: Singleton service initialized after authentication in MainLayout +- Components subscribe: `HubService.EntityChanged += OnEntityChanged` +- Auto-reconnect enabled +- Pages with real-time updates: Jobs/Index, Jobs/Details, Layers/Index, Layers/Details, DataInbox/Index + +**Authentication Flow:** +1. User logs in with Google OAuth → JWT token stored in localStorage +2. `TokenProvider.Token` populated in AuthService +3. `MainLayout` subscribes to `AuthenticationStateChanged` event +4. On auth success, SignalR connection initialized with JWT token +5. Token sent via `accessTokenProvider` in HubConnection options + +### Authentication & Security + +**Google OAuth Flow:** +1. Client exchanges Google ID token → `POST /auth/apiToken` +2. GoogleAuthService validates with Google, maps to internal User entity +3. Returns JWT (7-day expiration, HS256 signing) +4. JWT required on all protected endpoints except `/auth/apiToken`, `/health` +5. UserId extraction middleware sets X-UserId header for audit trails + +**API Key Authentication:** +- Custom [ApiKeyAuth] attribute for cron job endpoints +- X-API-Key header with constant-time comparison +- Used for DataInbox external endpoints and job scheduling + +**Security Features:** +- Rate limiting: 100 req/min general, 10 req/min auth +- Security headers: XSS, clickjacking, MIME sniffing protection +- Input validation: Pagination limits (1-1000), Base64 size limits (10MB) +- Stack trace hiding: Generic error messages in production +- CORS: Configured for localhost:4200, diuna.bim-it.pl, morska.diunabi.com + +### Database Schema + +**47 EF Core Migrations** - All in DiunaBI.Infrastructure/Migrations/ + +**Key Entities:** + +**User** +- Id (Guid), Email, UserName +- Google OAuth identity +- Constant: `User.AutoImportUserId = "f392209e-123e-4651-a5a4-0b1d6cf9ff9d"` (system operations) + +**Layer** +- Number, Name, Type (Import/Processed/Administration/Dictionary) +- ParentId (hierarchical relationships) +- IsDeleted (soft delete), IsCancelled (processing control) +- CreatedAt, ModifiedAt, CreatedById, ModifiedById (audit trail) + +**Record** +- Code (unique identifier per layer), LayerId +- Value1-Value32 (double?), Desc1 (string, max 10000 chars) +- IsDeleted (soft delete) +- Full audit trail via RecordHistory + +**RecordHistory** (Migration 47) +- RecordId, LayerId, ChangedAt, ChangedById +- ChangeType (Created/Updated/Deleted) +- Code, Desc1 (snapshot) +- ChangedFields (comma-separated), ChangesSummary (JSON old/new values) +- Indexes: (RecordId, ChangedAt), (LayerId, ChangedAt) + +**QueueJob** +- LayerId, PluginName, JobType, Priority, Status +- RetryCount, MaxRetries (default 5) +- CreatedAt, ModifiedAt, LastAttemptAt, CompletedAt +- CreatedById, ModifiedById (uses User.AutoImportUserId for system jobs) + +**DataInbox** +- Name, Source (identifiers), Data (base64-encoded JSON array) +- Used by importers to stage incoming data + +**ProcessSource** +- SourceLayerId, TargetLayerId (defines layer processing relationships) + +**Audit Trail Patterns:** +- All entities have CreatedAt, ModifiedAt with GETUTCDATE() defaults +- Foreign keys to Users: CreatedById, ModifiedById +- Soft deletes via IsDeleted flag +- RecordHistory tracks field-level changes with JSON diffs + +### UI Organization + +**Feature-Based Structure** (as of Dec 5, 2025): +- `Pages/Layers/` - Index.razor + Details.razor (list, detail, edit, history) +- `Pages/Jobs/` - Index.razor + Details.razor (list, detail, retry, cancel, scheduling) +- `Pages/DataInbox/` - Index.razor + Details.razor (list, detail, base64 decode) +- `Components/Layout/` - MainLayout, EmptyLayout, Routes +- `Components/Auth/` - AuthGuard, LoginCard + +**Code-Behind Pattern:** +- Complex pages (50+ lines logic): Separate `.razor.cs` files +- Simple pages: Inline `@code` blocks +- Namespaces: `DiunaBI.UI.Shared.Pages.{Feature}` + +**Filter State Persistence:** +- LayerFilterStateService, DataInboxFilterStateService +- Singleton services remember search, type, page selections across navigation + +**Timezone Handling:** +- DateTimeHelper service detects browser timezone via JS Interop +- All dates stored as UTC in DB, converted to user's local timezone for display +- Format: "yyyy-MM-dd HH:mm:ss" + +## Configuration + +**Environment Variables** (appsettings.Development.json): +- `ConnectionStrings:SQLDatabase` - SQL Server connection (localhost:21433, DB: DiunaBI-PedrolloPL) +- `JwtSettings:SecurityKey`, `JwtSettings:ExpiryDays` (7) +- `GoogleAuth:ClientId`, `GoogleAuth:RedirectUri` +- `apiKey`, `apiUser`, `apiPass` - DataInbox API security +- `exportDirectory` - Google Drive folder ID for exports +- `InstanceName` - DEV/PROD environment identifier + +**CORS Origins:** +- http://localhost:4200 (development) +- https://diuna.bim-it.pl (production) +- https://morska.diunabi.com (production) + +**Logging:** +- Serilog with Console + File sinks +- Override levels: Microsoft.AspNetCore = Warning, EF Core = Warning, Google.Apis = Warning +- Sensitive data logging only in Development + +## Development Patterns + +### When Adding a New Plugin + +1. Create new project: `DiunaBI.Plugins.{Name}` +2. Reference: DiunaBI.Domain, DiunaBI.Infrastructure +3. Inherit from: BaseDataImporter/BaseDataProcessor/BaseDataExporter +4. Implement abstract methods: ImportAsync/ProcessAsync/ExportAsync, ValidateConfiguration +5. Build triggers automatic copy to `bin/Plugins/` via MSBuild target +6. No registration needed - PluginManager auto-discovers at startup + +### When Adding a New Entity + +1. Create entity in DiunaBI.Domain/Entities/ +2. Add DbSet to AppDbContext +3. Create migration: `dotnet ef migrations add {Name} --project DiunaBI.Infrastructure --startup-project DiunaBI.API` +4. If entity needs real-time updates, add module name to EntityChangeInterceptor +5. Create DTO in DiunaBI.Application/DTOs/ +6. Add controller in DiunaBI.API/Controllers/ with [Authorize] and [RequireRateLimit("api")] + +### When Adding Real-time Updates to a UI Page + +1. Inject EntityChangeHubService +2. Subscribe to EntityChanged event in OnInitializedAsync +3. Filter by module name +4. Call StateHasChanged() in event handler +5. Implement IDisposable, unsubscribe in Dispose +6. Test both initial load and SignalR updates + +### When Modifying Job System + +- JobSchedulerService: Changes to job creation from layer configs +- JobWorkerService: Changes to job execution, retry logic, rate limiting +- QueueJob entity: Changes to job schema require migration +- Jobs UI: Real-time updates required, test SignalR broadcasts + +### Security Considerations + +- Never log sensitive data (tokens, passwords, API keys) except in Development +- Use generic error messages in production (no stack traces) +- All new endpoints require [Authorize] unless explicitly [AllowAnonymous] +- API key endpoints require [ApiKeyAuth] and [AllowAnonymous] +- Validate pagination parameters (1-1000 range) +- Use constant-time comparison for API keys +- Rate limit all public endpoints + +## Known Limitations + +- JobWorkerService polls every 5 seconds (not event-driven) +- Google Sheets API quota: 5-second delay after import jobs +- Retry delays fixed: 30s → 2m → 5m (not configurable per job) +- Plugin configuration stored in Records (not strongly typed) +- No plugin versioning or hot-reload support +- SignalR requires JWT authentication (no anonymous connections) + +## Important Notes + +- **DO NOT** commit `.env` file (contains secrets) +- **DO NOT** modify migration files after applying to production +- **ALWAYS** use User.AutoImportUserId for system-created entities (jobs, automated processes) +- **ALWAYS** implement IDisposable for pages subscribing to SignalR events +- **ALWAYS** test real-time updates when modifying entities with SignalR broadcasts +- **Plugin DLLs** are auto-copied to `bin/Plugins/` on build via MSBuild target +- **Database changes** require migration AND applying to production via deployment scripts +- **Foreign keys** use RESTRICT (not CASCADE) to prevent accidental data loss (Migration 45) +- **Soft deletes** via IsDeleted flag, not physical deletion +- **Timezone handling** - store UTC, display local (via DateTimeHelper) diff --git a/DiunaBI.API/appsettings.json b/DiunaBI.API/appsettings.json index 62d8ffa..12d4317 100644 --- a/DiunaBI.API/appsettings.json +++ b/DiunaBI.API/appsettings.json @@ -28,5 +28,17 @@ ], "Enrich": ["FromLogContext", "WithMachineName", "WithThreadId"] }, - "AllowedHosts": "*" + "AllowedHosts": "*", + "AnomalyDetection": { + "Provider": "OpenAI", + "Model": "gpt-4o-mini", + "ApiKey": "", + "Endpoint": "", + "MaxTokens": 4000, + "Temperature": 0.1, + "MinHistoricalImports": 5, + "RecentImportsWindow": 5, + "MonthlyImportsWindow": 5, + "ConfidenceThreshold": 0.7 + } } \ No newline at end of file diff --git a/DiunaBI.Application/DTOModels/LayerDto.cs b/DiunaBI.Application/DTOModels/LayerDto.cs index 1b21d99..0bcc7af 100644 --- a/DiunaBI.Application/DTOModels/LayerDto.cs +++ b/DiunaBI.Application/DTOModels/LayerDto.cs @@ -25,7 +25,8 @@ public enum LayerType Import, Processed, Administration, - Dictionary + Dictionary, + Validation } public class LayerFilterRequest diff --git a/DiunaBI.Domain/Entities/Layer.cs b/DiunaBI.Domain/Entities/Layer.cs index 441a157..c976368 100644 --- a/DiunaBI.Domain/Entities/Layer.cs +++ b/DiunaBI.Domain/Entities/Layer.cs @@ -10,6 +10,7 @@ public enum LayerType Processed, Administration, Dictionary, + Validation, } public class Layer { diff --git a/DiunaBI.Domain/Entities/QueueJob.cs b/DiunaBI.Domain/Entities/QueueJob.cs index 5545852..920ffbd 100644 --- a/DiunaBI.Domain/Entities/QueueJob.cs +++ b/DiunaBI.Domain/Entities/QueueJob.cs @@ -26,7 +26,8 @@ public class QueueJob public enum JobType { Import = 0, - Process = 1 + Process = 1, + Validate = 2 } public enum JobStatus diff --git a/DiunaBI.Infrastructure/DiunaBI.Infrastructure.csproj b/DiunaBI.Infrastructure/DiunaBI.Infrastructure.csproj index 7b5d59c..324b14e 100644 --- a/DiunaBI.Infrastructure/DiunaBI.Infrastructure.csproj +++ b/DiunaBI.Infrastructure/DiunaBI.Infrastructure.csproj @@ -22,6 +22,8 @@ + + diff --git a/DiunaBI.Infrastructure/Interfaces/IDataValidator.cs b/DiunaBI.Infrastructure/Interfaces/IDataValidator.cs new file mode 100644 index 0000000..f311c60 --- /dev/null +++ b/DiunaBI.Infrastructure/Interfaces/IDataValidator.cs @@ -0,0 +1,11 @@ +using DiunaBI.Domain.Entities; + +namespace DiunaBI.Infrastructure.Interfaces; + +public interface IDataValidator +{ + string ValidatorType { get; } + + bool CanValidate(string validatorType); + void Validate(Layer validationWorker); +} diff --git a/DiunaBI.Infrastructure/Migrations/20251214143012_AddValidationLayerAndJobTypes.Designer.cs b/DiunaBI.Infrastructure/Migrations/20251214143012_AddValidationLayerAndJobTypes.Designer.cs new file mode 100644 index 0000000..a1ae750 --- /dev/null +++ b/DiunaBI.Infrastructure/Migrations/20251214143012_AddValidationLayerAndJobTypes.Designer.cs @@ -0,0 +1,489 @@ +// +using System; +using DiunaBI.Infrastructure.Data; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; + +#nullable disable + +namespace DiunaBI.Infrastructure.Migrations +{ + [DbContext(typeof(AppDbContext))] + [Migration("20251214143012_AddValidationLayerAndJobTypes")] + partial class AddValidationLayerAndJobTypes + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("ProductVersion", "10.0.0") + .HasAnnotation("Relational:MaxIdentifierLength", 128); + + SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder); + + modelBuilder.Entity("DiunaBI.Domain.Entities.DataInbox", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uniqueidentifier"); + + b.Property("CreatedAt") + .ValueGeneratedOnAdd() + .HasColumnType("datetime2") + .HasDefaultValueSql("GETUTCDATE()"); + + b.Property("Data") + .IsRequired() + .HasColumnType("nvarchar(max)"); + + b.Property("Name") + .IsRequired() + .HasMaxLength(50) + .HasColumnType("nvarchar(50)"); + + b.Property("Source") + .IsRequired() + .HasMaxLength(50) + .HasColumnType("nvarchar(50)"); + + b.HasKey("Id"); + + b.ToTable("DataInbox"); + }); + + modelBuilder.Entity("DiunaBI.Domain.Entities.Layer", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uniqueidentifier"); + + b.Property("CreatedAt") + .ValueGeneratedOnAdd() + .HasColumnType("datetime2") + .HasDefaultValueSql("GETUTCDATE()"); + + b.Property("CreatedById") + .HasColumnType("uniqueidentifier"); + + b.Property("IsCancelled") + .ValueGeneratedOnAdd() + .HasColumnType("bit") + .HasDefaultValue(false); + + b.Property("IsDeleted") + .ValueGeneratedOnAdd() + .HasColumnType("bit") + .HasDefaultValue(false); + + b.Property("ModifiedAt") + .ValueGeneratedOnAdd() + .HasColumnType("datetime2") + .HasDefaultValueSql("GETUTCDATE()"); + + b.Property("ModifiedById") + .HasColumnType("uniqueidentifier"); + + b.Property("Name") + .IsRequired() + .HasMaxLength(50) + .HasColumnType("nvarchar(50)"); + + b.Property("Number") + .HasColumnType("int"); + + b.Property("ParentId") + .HasColumnType("uniqueidentifier"); + + b.Property("Type") + .HasColumnType("int"); + + b.HasKey("Id"); + + b.HasIndex("CreatedById"); + + b.HasIndex("ModifiedById"); + + b.ToTable("Layers"); + }); + + modelBuilder.Entity("DiunaBI.Domain.Entities.ProcessSource", b => + { + b.Property("LayerId") + .HasColumnType("uniqueidentifier"); + + b.Property("SourceId") + .HasColumnType("uniqueidentifier"); + + b.HasKey("LayerId", "SourceId"); + + b.HasIndex("SourceId"); + + b.ToTable("ProcessSources"); + }); + + modelBuilder.Entity("DiunaBI.Domain.Entities.QueueJob", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uniqueidentifier"); + + b.Property("CompletedAt") + .HasColumnType("datetime2"); + + b.Property("CreatedAt") + .ValueGeneratedOnAdd() + .HasColumnType("datetime2") + .HasDefaultValueSql("GETUTCDATE()"); + + b.Property("CreatedById") + .HasColumnType("uniqueidentifier"); + + b.Property("JobType") + .HasColumnType("int"); + + b.Property("LastAttemptAt") + .HasColumnType("datetime2"); + + b.Property("LastError") + .HasMaxLength(1000) + .HasColumnType("nvarchar(1000)"); + + b.Property("LayerId") + .HasColumnType("uniqueidentifier"); + + b.Property("LayerName") + .IsRequired() + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); + + b.Property("MaxRetries") + .HasColumnType("int"); + + b.Property("ModifiedAt") + .ValueGeneratedOnAdd() + .HasColumnType("datetime2") + .HasDefaultValueSql("GETUTCDATE()"); + + b.Property("ModifiedById") + .HasColumnType("uniqueidentifier"); + + b.Property("PluginName") + .IsRequired() + .HasMaxLength(100) + .HasColumnType("nvarchar(100)"); + + b.Property("Priority") + .HasColumnType("int"); + + b.Property("RetryCount") + .HasColumnType("int"); + + b.Property("Status") + .HasColumnType("int"); + + b.HasKey("Id"); + + b.ToTable("QueueJobs"); + }); + + modelBuilder.Entity("DiunaBI.Domain.Entities.Record", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uniqueidentifier"); + + b.Property("Code") + .IsRequired() + .HasMaxLength(50) + .HasColumnType("nvarchar(50)"); + + b.Property("CreatedAt") + .ValueGeneratedOnAdd() + .HasColumnType("datetime2") + .HasDefaultValueSql("GETUTCDATE()"); + + b.Property("CreatedById") + .HasColumnType("uniqueidentifier"); + + b.Property("Desc1") + .HasMaxLength(10000) + .HasColumnType("nvarchar(max)"); + + b.Property("IsDeleted") + .HasColumnType("bit"); + + b.Property("LayerId") + .HasColumnType("uniqueidentifier"); + + b.Property("ModifiedAt") + .ValueGeneratedOnAdd() + .HasColumnType("datetime2") + .HasDefaultValueSql("GETUTCDATE()"); + + b.Property("ModifiedById") + .HasColumnType("uniqueidentifier"); + + b.Property("Value1") + .HasColumnType("float"); + + b.Property("Value10") + .HasColumnType("float"); + + b.Property("Value11") + .HasColumnType("float"); + + b.Property("Value12") + .HasColumnType("float"); + + b.Property("Value13") + .HasColumnType("float"); + + b.Property("Value14") + .HasColumnType("float"); + + b.Property("Value15") + .HasColumnType("float"); + + b.Property("Value16") + .HasColumnType("float"); + + b.Property("Value17") + .HasColumnType("float"); + + b.Property("Value18") + .HasColumnType("float"); + + b.Property("Value19") + .HasColumnType("float"); + + b.Property("Value2") + .HasColumnType("float"); + + b.Property("Value20") + .HasColumnType("float"); + + b.Property("Value21") + .HasColumnType("float"); + + b.Property("Value22") + .HasColumnType("float"); + + b.Property("Value23") + .HasColumnType("float"); + + b.Property("Value24") + .HasColumnType("float"); + + b.Property("Value25") + .HasColumnType("float"); + + b.Property("Value26") + .HasColumnType("float"); + + b.Property("Value27") + .HasColumnType("float"); + + b.Property("Value28") + .HasColumnType("float"); + + b.Property("Value29") + .HasColumnType("float"); + + b.Property("Value3") + .HasColumnType("float"); + + b.Property("Value30") + .HasColumnType("float"); + + b.Property("Value31") + .HasColumnType("float"); + + b.Property("Value32") + .HasColumnType("float"); + + b.Property("Value4") + .HasColumnType("float"); + + b.Property("Value5") + .HasColumnType("float"); + + b.Property("Value6") + .HasColumnType("float"); + + b.Property("Value7") + .HasColumnType("float"); + + b.Property("Value8") + .HasColumnType("float"); + + b.Property("Value9") + .HasColumnType("float"); + + b.HasKey("Id"); + + b.HasIndex("CreatedById"); + + b.HasIndex("LayerId"); + + b.HasIndex("ModifiedById"); + + b.ToTable("Records"); + }); + + modelBuilder.Entity("DiunaBI.Domain.Entities.RecordHistory", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uniqueidentifier"); + + b.Property("ChangeType") + .HasColumnType("int"); + + b.Property("ChangedAt") + .HasColumnType("datetime2"); + + b.Property("ChangedById") + .HasColumnType("uniqueidentifier"); + + b.Property("ChangedFields") + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); + + b.Property("ChangesSummary") + .HasMaxLength(4000) + .HasColumnType("nvarchar(4000)"); + + b.Property("Code") + .IsRequired() + .HasMaxLength(50) + .HasColumnType("nvarchar(50)"); + + b.Property("Desc1") + .HasMaxLength(10000) + .HasColumnType("nvarchar(max)"); + + b.Property("LayerId") + .HasColumnType("uniqueidentifier"); + + b.Property("RecordId") + .HasColumnType("uniqueidentifier"); + + b.HasKey("Id"); + + b.HasIndex("ChangedById"); + + b.HasIndex("LayerId", "ChangedAt"); + + b.HasIndex("RecordId", "ChangedAt"); + + b.ToTable("RecordHistory"); + }); + + modelBuilder.Entity("DiunaBI.Domain.Entities.User", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uniqueidentifier"); + + b.Property("CreatedAt") + .ValueGeneratedOnAdd() + .HasColumnType("datetime2") + .HasDefaultValueSql("GETUTCDATE()"); + + b.Property("Email") + .HasMaxLength(50) + .HasColumnType("nvarchar(50)"); + + b.Property("UserName") + .HasMaxLength(50) + .HasColumnType("nvarchar(50)"); + + b.HasKey("Id"); + + b.ToTable("Users"); + }); + + modelBuilder.Entity("DiunaBI.Domain.Entities.Layer", b => + { + b.HasOne("DiunaBI.Domain.Entities.User", "CreatedBy") + .WithMany() + .HasForeignKey("CreatedById") + .OnDelete(DeleteBehavior.Restrict) + .IsRequired(); + + b.HasOne("DiunaBI.Domain.Entities.User", "ModifiedBy") + .WithMany() + .HasForeignKey("ModifiedById") + .OnDelete(DeleteBehavior.Restrict) + .IsRequired(); + + b.Navigation("CreatedBy"); + + b.Navigation("ModifiedBy"); + }); + + modelBuilder.Entity("DiunaBI.Domain.Entities.ProcessSource", b => + { + b.HasOne("DiunaBI.Domain.Entities.Layer", null) + .WithMany() + .HasForeignKey("LayerId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.HasOne("DiunaBI.Domain.Entities.Layer", "Source") + .WithMany() + .HasForeignKey("SourceId") + .OnDelete(DeleteBehavior.Restrict) + .IsRequired(); + + b.Navigation("Source"); + }); + + modelBuilder.Entity("DiunaBI.Domain.Entities.Record", b => + { + b.HasOne("DiunaBI.Domain.Entities.User", "CreatedBy") + .WithMany() + .HasForeignKey("CreatedById") + .OnDelete(DeleteBehavior.Restrict) + .IsRequired(); + + b.HasOne("DiunaBI.Domain.Entities.Layer", null) + .WithMany("Records") + .HasForeignKey("LayerId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.HasOne("DiunaBI.Domain.Entities.User", "ModifiedBy") + .WithMany() + .HasForeignKey("ModifiedById") + .OnDelete(DeleteBehavior.Restrict) + .IsRequired(); + + b.Navigation("CreatedBy"); + + b.Navigation("ModifiedBy"); + }); + + modelBuilder.Entity("DiunaBI.Domain.Entities.RecordHistory", b => + { + b.HasOne("DiunaBI.Domain.Entities.User", "ChangedBy") + .WithMany() + .HasForeignKey("ChangedById") + .OnDelete(DeleteBehavior.Restrict) + .IsRequired(); + + b.Navigation("ChangedBy"); + }); + + modelBuilder.Entity("DiunaBI.Domain.Entities.Layer", b => + { + b.Navigation("Records"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/DiunaBI.Infrastructure/Migrations/20251214143012_AddValidationLayerAndJobTypes.cs b/DiunaBI.Infrastructure/Migrations/20251214143012_AddValidationLayerAndJobTypes.cs new file mode 100644 index 0000000..696b7fb --- /dev/null +++ b/DiunaBI.Infrastructure/Migrations/20251214143012_AddValidationLayerAndJobTypes.cs @@ -0,0 +1,22 @@ +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace DiunaBI.Infrastructure.Migrations +{ + /// + public partial class AddValidationLayerAndJobTypes : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + + } + } +} diff --git a/DiunaBI.Infrastructure/Plugins/BaseDataValidator.cs b/DiunaBI.Infrastructure/Plugins/BaseDataValidator.cs new file mode 100644 index 0000000..95b0643 --- /dev/null +++ b/DiunaBI.Infrastructure/Plugins/BaseDataValidator.cs @@ -0,0 +1,21 @@ +using DiunaBI.Domain.Entities; +using DiunaBI.Infrastructure.Interfaces; + +namespace DiunaBI.Infrastructure.Plugins; + +public abstract class BaseDataValidator : IDataValidator +{ + public abstract string ValidatorType { get; } + + public virtual bool CanValidate(string validatorType) => ValidatorType == validatorType; + + public abstract void Validate(Layer validationWorker); + + /// + /// Helper method to get record value by code from layer records + /// + protected string? GetRecordValue(ICollection records, string code) + { + return records.FirstOrDefault(x => x.Code == code)?.Desc1; + } +} diff --git a/DiunaBI.Infrastructure/Services/JobSchedulerService.cs b/DiunaBI.Infrastructure/Services/JobSchedulerService.cs index d7efec5..87be7f6 100644 --- a/DiunaBI.Infrastructure/Services/JobSchedulerService.cs +++ b/DiunaBI.Infrastructure/Services/JobSchedulerService.cs @@ -221,14 +221,114 @@ public class JobSchedulerService return jobsCreated; } + public async Task ScheduleValidateJobsAsync() + { + _logger.LogInformation("JobScheduler: Starting validation job scheduling"); + + var validationWorkers = await _db.Layers + .Include(x => x.Records) + .Where(x => + x.Records!.Any(r => r.Code == "Type" && r.Desc1 == "ValidationWorker") && + x.Records!.Any(r => r.Code == "IsEnabled" && r.Desc1 == "True") + ) + .OrderBy(x => x.CreatedAt) + .AsNoTracking() + .ToListAsync(); + + _logger.LogInformation("JobScheduler: Found {Count} validation workers to schedule", validationWorkers.Count); + + var jobsCreated = 0; + var scheduledLayerIds = new HashSet(); // Track LayerIds scheduled in this batch + + foreach (var worker in validationWorkers) + { + try + { + var plugin = worker.Records?.FirstOrDefault(r => r.Code == "Plugin")?.Desc1; + if (string.IsNullOrEmpty(plugin)) + { + _logger.LogWarning("JobScheduler: Validation worker {LayerName} ({LayerId}) has no Plugin configured, skipping", + worker.Name, worker.Id); + continue; + } + + // Get priority from config (default: 200 for validation - lower than processes) + var priorityStr = worker.Records?.FirstOrDefault(r => r.Code == "Priority")?.Desc1; + var priority = int.TryParse(priorityStr, out var p) ? p : 200; + + // Get max retries from config (default: 3) + var maxRetriesStr = worker.Records?.FirstOrDefault(r => r.Code == "MaxRetries")?.Desc1; + var maxRetries = int.TryParse(maxRetriesStr, out var mr) ? mr : 3; + + // Check in-memory: already scheduled in this batch? + if (scheduledLayerIds.Contains(worker.Id)) + { + _logger.LogDebug("JobScheduler: Job already scheduled in this batch for {LayerName} ({LayerId})", + worker.Name, worker.Id); + continue; + } + + // Check if there's already a pending/running job for this layer in database + var existingJob = await _db.QueueJobs + .Where(j => j.LayerId == worker.Id && + (j.Status == JobStatus.Pending || j.Status == JobStatus.Running)) + .FirstOrDefaultAsync(); + + if (existingJob != null) + { + _logger.LogDebug("JobScheduler: Job already exists for {LayerName} ({LayerId}), status: {Status}", + worker.Name, worker.Id, existingJob.Status); + continue; + } + + var job = new QueueJob + { + Id = Guid.NewGuid(), + LayerId = worker.Id, + LayerName = worker.Name ?? "Unknown", + PluginName = plugin, + JobType = JobType.Validate, + Priority = priority, + MaxRetries = maxRetries, + Status = JobStatus.Pending, + CreatedAt = DateTime.UtcNow, + ModifiedAt = DateTime.UtcNow, + CreatedById = DiunaBI.Domain.Entities.User.AutoImportUserId, + ModifiedById = DiunaBI.Domain.Entities.User.AutoImportUserId + }; + + _db.QueueJobs.Add(job); + scheduledLayerIds.Add(worker.Id); // Track that we've scheduled this layer + jobsCreated++; + + _logger.LogInformation("JobScheduler: Created validation job for {LayerName} ({LayerId}) with priority {Priority}", + worker.Name, worker.Id, priority); + } + catch (Exception ex) + { + _logger.LogError(ex, "JobScheduler: Failed to create job for {LayerName} ({LayerId})", + worker.Name, worker.Id); + } + } + + if (jobsCreated > 0) + { + await _db.SaveChangesAsync(); + _logger.LogInformation("JobScheduler: Successfully created {Count} validation jobs", jobsCreated); + } + + return jobsCreated; + } + public async Task ScheduleAllJobsAsync(string? nameFilter = null) { var importCount = await ScheduleImportJobsAsync(nameFilter); var processCount = await ScheduleProcessJobsAsync(); + var validateCount = await ScheduleValidateJobsAsync(); - _logger.LogInformation("JobScheduler: Scheduled {ImportCount} import jobs and {ProcessCount} process jobs", - importCount, processCount); + _logger.LogInformation("JobScheduler: Scheduled {ImportCount} import jobs, {ProcessCount} process jobs, and {ValidateCount} validation jobs", + importCount, processCount, validateCount); - return importCount + processCount; + return importCount + processCount + validateCount; } } diff --git a/DiunaBI.Infrastructure/Services/JobWorkerService.cs b/DiunaBI.Infrastructure/Services/JobWorkerService.cs index 580c08e..572fe32 100644 --- a/DiunaBI.Infrastructure/Services/JobWorkerService.cs +++ b/DiunaBI.Infrastructure/Services/JobWorkerService.cs @@ -110,6 +110,19 @@ public class JobWorkerService : BackgroundService processor.Process(layer); } + else if (job.JobType == JobType.Validate) + { + var validator = pluginManager.GetValidator(job.PluginName); + if (validator == null) + { + throw new Exception($"Validator '{job.PluginName}' not found"); + } + + _logger.LogInformation("JobWorker: Executing validation for {LayerName} using {PluginName}", + job.LayerName, job.PluginName); + + validator.Validate(layer); + } // Job completed successfully job.Status = JobStatus.Completed; diff --git a/DiunaBI.Infrastructure/Services/PluginManager.cs b/DiunaBI.Infrastructure/Services/PluginManager.cs index 5e0f384..6d0ca4b 100644 --- a/DiunaBI.Infrastructure/Services/PluginManager.cs +++ b/DiunaBI.Infrastructure/Services/PluginManager.cs @@ -12,6 +12,7 @@ public class PluginManager private readonly List _processorTypes = new(); private readonly List _importerTypes = new(); private readonly List _exporterTypes = new(); + private readonly List _validatorTypes = new(); private readonly List _plugins = new(); public PluginManager(ILogger logger, IServiceProvider serviceProvider) @@ -42,10 +43,11 @@ public class PluginManager } } - _logger.LogInformation("Loaded {ProcessorCount} processors, {ImporterCount} importers, and {ExporterCount} exporters from {AssemblyCount} assemblies", + _logger.LogInformation("Loaded {ProcessorCount} processors, {ImporterCount} importers, {ExporterCount} exporters, and {ValidatorCount} validators from {AssemblyCount} assemblies", _processorTypes.Count, _importerTypes.Count, _exporterTypes.Count, + _validatorTypes.Count, dllFiles.Length); } @@ -77,6 +79,12 @@ public class PluginManager _exporterTypes.Add(type); _logger.LogDebug("Registered exporter: {Type}", type.Name); } + + if (typeof(IDataValidator).IsAssignableFrom(type) && !type.IsInterface && !type.IsAbstract) + { + _validatorTypes.Add(type); + _logger.LogDebug("Registered validator: {Type}", type.Name); + } } } catch (Exception ex) @@ -157,5 +165,29 @@ public class PluginManager return null; } - public int GetPluginsCount() => _processorTypes.Count + _importerTypes.Count + _exporterTypes.Count; + public IDataValidator? GetValidator(string validatorType) + { + foreach (var type in _validatorTypes) + { + try + { + var scope = _serviceProvider.CreateScope(); + var instance = (IDataValidator)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type); + + if (instance.CanValidate(validatorType)) + { + return instance; + } + + scope.Dispose(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to create validator instance of type {Type}", type.Name); + } + } + return null; + } + + public int GetPluginsCount() => _processorTypes.Count + _importerTypes.Count + _exporterTypes.Count + _validatorTypes.Count; } \ No newline at end of file diff --git a/DiunaBI.Infrastructure/Validators/LlmAnomalyValidator.cs b/DiunaBI.Infrastructure/Validators/LlmAnomalyValidator.cs new file mode 100644 index 0000000..d010217 --- /dev/null +++ b/DiunaBI.Infrastructure/Validators/LlmAnomalyValidator.cs @@ -0,0 +1,496 @@ +using System.Text.Json; +using DiunaBI.Domain.Entities; +using DiunaBI.Infrastructure.Data; +using DiunaBI.Infrastructure.Plugins; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Microsoft.SemanticKernel; +using Microsoft.SemanticKernel.ChatCompletion; +using Microsoft.SemanticKernel.Connectors.OpenAI; +using Microsoft.SemanticKernel.Connectors.Ollama; + +namespace DiunaBI.Infrastructure.Validators; + +public class LlmAnomalyValidator : BaseDataValidator +{ + public override string ValidatorType => "LlmAnomalyValidator"; + + private readonly AppDbContext _db; + private readonly IConfiguration _config; + private readonly ILogger _logger; + private readonly Kernel _kernel; + + // Configuration loaded from appsettings.json + private readonly string _provider; + private readonly string _model; + private readonly int _minHistoricalImports; + private readonly int _recentImportsWindow; + private readonly int _monthlyImportsWindow; + private readonly double _confidenceThreshold; + + // Configuration loaded from ValidationWorker records + private string? SourceLayerName { get; set; } + private Layer? SourceImportWorker { get; set; } + + public LlmAnomalyValidator( + AppDbContext db, + IConfiguration config, + ILogger logger) + { + _db = db; + _config = config; + _logger = logger; + + // Load configuration from appsettings.json + _provider = config["AnomalyDetection:Provider"] ?? "OpenAI"; + _model = config["AnomalyDetection:Model"] ?? "gpt-4o-mini"; + _minHistoricalImports = int.Parse(config["AnomalyDetection:MinHistoricalImports"] ?? "5"); + _recentImportsWindow = int.Parse(config["AnomalyDetection:RecentImportsWindow"] ?? "5"); + _monthlyImportsWindow = int.Parse(config["AnomalyDetection:MonthlyImportsWindow"] ?? "5"); + _confidenceThreshold = double.Parse(config["AnomalyDetection:ConfidenceThreshold"] ?? "0.7"); + + // Initialize Semantic Kernel based on provider + _kernel = InitializeKernel(); + + _logger.LogInformation("LlmAnomalyValidator initialized with provider: {Provider}, model: {Model}", + _provider, _model); + } + + private Kernel InitializeKernel() + { + var builder = Kernel.CreateBuilder(); + + switch (_provider.ToLower()) + { + case "openai": + var openAiKey = _config["AnomalyDetection:ApiKey"]; + if (string.IsNullOrEmpty(openAiKey)) + { + throw new InvalidOperationException("OpenAI API key not configured"); + } + builder.AddOpenAIChatCompletion(_model, openAiKey); + break; + + case "azureopenai": + var azureEndpoint = _config["AnomalyDetection:Endpoint"]; + var azureKey = _config["AnomalyDetection:ApiKey"]; + if (string.IsNullOrEmpty(azureEndpoint) || string.IsNullOrEmpty(azureKey)) + { + throw new InvalidOperationException("Azure OpenAI endpoint or API key not configured"); + } + builder.AddAzureOpenAIChatCompletion(_model, azureEndpoint, azureKey); + break; + + case "ollama": + var ollamaEndpoint = _config["AnomalyDetection:Endpoint"] ?? "http://localhost:11434"; + builder.AddOllamaChatCompletion(_model, new Uri(ollamaEndpoint)); + break; + + default: + throw new NotSupportedException($"LLM provider '{_provider}' is not supported"); + } + + return builder.Build(); + } + + public override void Validate(Layer validationWorker) + { + try + { + _logger.LogInformation("{ValidatorType}: Starting validation for {ValidationWorkerName} ({ValidationWorkerId})", + ValidatorType, validationWorker.Name, validationWorker.Id); + + // Load configuration from layer records + LoadConfiguration(validationWorker); + + // Validate configuration + ValidateConfiguration(); + + // Find latest import layer + var latestImport = GetLatestImportLayer(); + + // Get historical context + var historicalImports = GetHistoricalImports(); + + // Check if enough historical data + if (historicalImports.Count < _minHistoricalImports) + { + _logger.LogWarning("{ValidatorType}: Not enough historical imports: {Count} (need {Min}). Skipping validation.", + ValidatorType, historicalImports.Count, _minHistoricalImports); + return; + } + + // Perform validation + PerformValidation(validationWorker, latestImport, historicalImports); + + _logger.LogInformation("{ValidatorType}: Successfully completed validation for {ValidationWorkerName}", + ValidatorType, validationWorker.Name); + } + catch (Exception e) + { + _logger.LogError(e, "{ValidatorType}: Failed to validate {ValidationWorkerName} ({ValidationWorkerId})", + ValidatorType, validationWorker.Name, validationWorker.Id); + throw; + } + } + + private void LoadConfiguration(Layer validationWorker) + { + if (validationWorker.Records == null) + { + throw new InvalidOperationException("ValidationWorker has no records"); + } + + // Load source layer name (ImportWorker Administration Layer) + SourceLayerName = GetRecordValue(validationWorker.Records, "SourceLayer"); + if (string.IsNullOrEmpty(SourceLayerName)) + { + throw new InvalidOperationException("SourceLayer record not found"); + } + + _logger.LogDebug("{ValidatorType}: Configuration loaded - SourceLayer: {SourceLayer}", + ValidatorType, SourceLayerName); + } + + private void ValidateConfiguration() + { + var errors = new List(); + + if (string.IsNullOrEmpty(SourceLayerName)) errors.Add("SourceLayer is required"); + + // Find source import worker (Administration Layer) + SourceImportWorker = _db.Layers + .SingleOrDefault(x => x.Name == SourceLayerName && + x.Type == LayerType.Administration && + !x.IsDeleted && + !x.IsCancelled); + + if (SourceImportWorker == null) + { + errors.Add($"SourceImportWorker layer '{SourceLayerName}' not found"); + } + + if (errors.Any()) + { + throw new InvalidOperationException($"Configuration validation failed: {string.Join(", ", errors)}"); + } + + _logger.LogDebug("{ValidatorType}: Configuration validation passed", ValidatorType); + } + + private Layer GetLatestImportLayer() + { + // Find latest Import layer where ParentId = SourceImportWorker.Id + var latestImport = _db.Layers + .Include(x => x.Records) + .Where(x => x.ParentId == SourceImportWorker!.Id && + x.Type == LayerType.Import && + !x.IsDeleted && + !x.IsCancelled) + .OrderByDescending(x => x.CreatedAt) + .FirstOrDefault(); + + if (latestImport == null) + { + throw new InvalidOperationException( + $"No import layers found for import worker '{SourceImportWorker!.Name}'"); + } + + _logger.LogDebug("{ValidatorType}: Found latest import layer: {LayerName} ({LayerId})", + ValidatorType, latestImport.Name, latestImport.Id); + + return latestImport; + } + + private List GetHistoricalImports() + { + // Get last N import layers (ordered by CreatedAt) + var historicalImports = _db.Layers + .Include(x => x.Records) + .Where(x => x.ParentId == SourceImportWorker!.Id && + x.Type == LayerType.Import && + !x.IsDeleted && + !x.IsCancelled) + .OrderByDescending(x => x.CreatedAt) + .Take(_recentImportsWindow) + .AsNoTracking() + .ToList(); + + _logger.LogDebug("{ValidatorType}: Found {Count} historical imports for recent window", + ValidatorType, historicalImports.Count); + + return historicalImports; + } + + private List GetMonthlyBaselineImports() + { + // Get last N "first-of-month" import layers + var monthlyImports = _db.Layers + .Include(x => x.Records) + .Where(x => x.ParentId == SourceImportWorker!.Id && + x.Type == LayerType.Import && + x.CreatedAt.Day == 1 && + !x.IsDeleted && + !x.IsCancelled) + .OrderByDescending(x => x.CreatedAt) + .Take(_monthlyImportsWindow) + .AsNoTracking() + .ToList(); + + _logger.LogDebug("{ValidatorType}: Found {Count} monthly baseline imports", + ValidatorType, monthlyImports.Count); + + return monthlyImports; + } + + private void PerformValidation(Layer validationWorker, Layer latestImport, List historicalImports) + { + _logger.LogDebug("{ValidatorType}: Performing validation for import: {ImportName}", + ValidatorType, latestImport.Name); + + // Get monthly baseline if available + var monthlyBaseline = GetMonthlyBaselineImports(); + + // Build prompt with all data + var prompt = BuildPrompt(latestImport, historicalImports, monthlyBaseline); + + // Call LLM + var startTime = DateTime.UtcNow; + var llmResponse = CallLlm(prompt); + var processingTime = DateTime.UtcNow - startTime; + + // Create Validation Layer with results + var validationLayer = CreateValidationLayer(validationWorker, latestImport, llmResponse, processingTime); + + // Save to database + SaveValidationLayer(validationLayer, llmResponse); + + _logger.LogInformation("{ValidatorType}: Created validation layer {LayerName} ({LayerId}) in {ProcessingTime}ms", + ValidatorType, validationLayer.Name, validationLayer.Id, processingTime.TotalMilliseconds); + } + + private string BuildPrompt(Layer currentImport, List recentImports, List monthlyBaseline) + { + var currentRecords = currentImport.Records?.OrderBy(r => r.Code).ToList() ?? new List(); + var importType = SourceImportWorker?.Name ?? "Unknown"; + + var prompt = $@"You are a data quality analyst specializing in anomaly detection for business intelligence imports. + +**Import Type:** {importType} +**Import Date:** {currentImport.CreatedAt:yyyy-MM-dd HH:mm:ss} +**Current Import:** {currentImport.Name} + +**Current Import Data ({currentRecords.Count} records):** +{JsonSerializer.Serialize(currentRecords.Select(r => new { code = r.Code, value1 = r.Value1 }), new JsonSerializerOptions { WriteIndented = true })} + +**Historical Context - Last {recentImports.Count} Imports:** +{string.Join("\n", recentImports.Select((imp, idx) => $"Import {idx + 1} ({imp.CreatedAt:yyyy-MM-dd}): {JsonSerializer.Serialize(imp.Records?.OrderBy(r => r.Code).Select(r => new { code = r.Code, value1 = r.Value1 }) ?? Enumerable.Empty())}"))} +"; + + if (monthlyBaseline.Any()) + { + prompt += $@" +**Monthly Baseline - Last {monthlyBaseline.Count} First-Day Imports:** +{string.Join("\n", monthlyBaseline.Select((imp, idx) => $"Monthly Import {idx + 1} ({imp.CreatedAt:yyyy-MM-dd}): {JsonSerializer.Serialize(imp.Records?.OrderBy(r => r.Code).Select(r => new { code = r.Code, value1 = r.Value1 }) ?? Enumerable.Empty())}"))} +"; + } + + prompt += @" +**Analysis Tasks:** +1. **Record-level anomalies:** Identify unusual values for specific codes compared to historical patterns +2. **Structural issues:** Detect missing codes, new codes, or unexpected count changes +3. **Pattern breaks:** Find trend reversals, unexpected correlations, or statistical outliers + +**Response Format (JSON):** +```json +{ + ""overallStatus"": ""pass|warning|critical"", + ""recordAnomalies"": [ + { + ""code"": ""string"", + ""value1"": number, + ""confidence"": 0.0-1.0, + ""severity"": ""low|medium|high|critical"", + ""reason"": ""brief explanation"", + ""recommendation"": ""suggested action"" + } + ], + ""structuralIssues"": [ + { + ""issueType"": ""missing_codes|new_codes|count_change"", + ""description"": ""string"", + ""codes"": [""code1"", ""code2""], + ""severity"": ""low|medium|high|critical"" + } + ], + ""summary"": ""Brief overall assessment"" +} +``` + +Analyze the data and respond ONLY with the JSON object. Do not include any markdown formatting or additional text."; + + return prompt; + } + + private AnomalyResponse CallLlm(string prompt) + { + try + { + var chatService = _kernel.GetRequiredService(); + + var chatHistory = new ChatHistory(); + chatHistory.AddUserMessage(prompt); + + var result = chatService.GetChatMessageContentAsync( + chatHistory, + new OpenAIPromptExecutionSettings + { + Temperature = _config.GetValue("AnomalyDetection:Temperature") ?? 0.1, + MaxTokens = _config.GetValue("AnomalyDetection:MaxTokens") ?? 4000 + }).GetAwaiter().GetResult(); + + var jsonResponse = result.Content?.Trim() ?? "{}"; + + // Try to parse JSON response + try + { + return JsonSerializer.Deserialize(jsonResponse) + ?? throw new InvalidOperationException("LLM returned null response"); + } + catch (JsonException) + { + _logger.LogWarning("Failed to parse LLM response as JSON. Raw response: {Response}", jsonResponse); + throw new InvalidOperationException($"LLM did not return valid JSON. Response: {jsonResponse}"); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to call LLM for anomaly detection"); + throw; + } + } + + private Layer CreateValidationLayer(Layer validationWorker, Layer importLayer, AnomalyResponse response, TimeSpan processingTime) + { + var layerNumber = _db.Layers.Count() + 1; + var timestamp = DateTime.UtcNow.ToString("yyyyMMddHHmmss"); + + var validationLayer = new Layer + { + Id = Guid.NewGuid(), + Type = LayerType.Validation, + ParentId = importLayer.Id, // Links to the import that was validated + Number = layerNumber, + Name = $"L{layerNumber}-V-{timestamp}", + CreatedById = User.AutoImportUserId, + ModifiedById = User.AutoImportUserId, + CreatedAt = DateTime.UtcNow, + ModifiedAt = DateTime.UtcNow + }; + + _logger.LogDebug("{ValidatorType}: Created validation layer {LayerName}", + ValidatorType, validationLayer.Name); + + return validationLayer; + } + + private void SaveValidationLayer(Layer validationLayer, AnomalyResponse response) + { + // Add the validation layer + _db.Layers.Add(validationLayer); + + var records = new List(); + + // Add metadata records + records.Add(CreateRecord(validationLayer.Id, "ValidatedAt", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"))); + records.Add(CreateRecord(validationLayer.Id, "OverallStatus", response.OverallStatus)); + records.Add(CreateRecord(validationLayer.Id, "RecordsChecked", value1: response.RecordAnomalies?.Count ?? 0)); + records.Add(CreateRecord(validationLayer.Id, "AnomaliesDetected", value1: response.RecordAnomalies?.Count ?? 0)); + records.Add(CreateRecord(validationLayer.Id, "StructuralIssuesDetected", value1: response.StructuralIssues?.Count ?? 0)); + records.Add(CreateRecord(validationLayer.Id, "LlmProvider", _provider)); + records.Add(CreateRecord(validationLayer.Id, "LlmModel", _model)); + records.Add(CreateRecord(validationLayer.Id, "Summary", response.Summary)); + + // Add individual anomaly records + if (response.RecordAnomalies != null) + { + foreach (var anomaly in response.RecordAnomalies) + { + records.Add(CreateRecord( + validationLayer.Id, + $"ANOMALY_{anomaly.Code}", + $"[{anomaly.Severity}] {anomaly.Reason}. Recommendation: {anomaly.Recommendation}", + anomaly.Confidence + )); + } + } + + // Add structural issue records + if (response.StructuralIssues != null) + { + foreach (var issue in response.StructuralIssues) + { + var codes = issue.Codes != null ? string.Join(", ", issue.Codes) : ""; + records.Add(CreateRecord( + validationLayer.Id, + $"STRUCTURAL_{issue.IssueType?.ToUpper()}", + $"[{issue.Severity}] {issue.Description}. Codes: {codes}" + )); + } + } + + // Store full LLM response as JSON (for debugging) + records.Add(CreateRecord(validationLayer.Id, "LLM_RESPONSE_JSON", JsonSerializer.Serialize(response))); + + // Add all records to database + _db.Records.AddRange(records); + _db.SaveChanges(); + + _logger.LogDebug("{ValidatorType}: Saved {RecordCount} records for validation layer {LayerId}", + ValidatorType, records.Count, validationLayer.Id); + } + + private Record CreateRecord(Guid layerId, string code, string? desc1 = null, double? value1 = null) + { + return new Record + { + Id = Guid.NewGuid(), + LayerId = layerId, + Code = code, + Desc1 = desc1, + Value1 = value1, + CreatedById = User.AutoImportUserId, + ModifiedById = User.AutoImportUserId, + CreatedAt = DateTime.UtcNow, + ModifiedAt = DateTime.UtcNow + }; + } +} + +// Response models for LLM +public class AnomalyResponse +{ + public string OverallStatus { get; set; } = "pass"; + public List? RecordAnomalies { get; set; } + public List? StructuralIssues { get; set; } + public string Summary { get; set; } = ""; +} + +public class RecordAnomaly +{ + public string Code { get; set; } = ""; + public double? Value1 { get; set; } + public double Confidence { get; set; } + public string Severity { get; set; } = "low"; + public string Reason { get; set; } = ""; + public string Recommendation { get; set; } = ""; +} + +public class StructuralIssue +{ + public string? IssueType { get; set; } + public string Description { get; set; } = ""; + public List? Codes { get; set; } + public string Severity { get; set; } = "low"; +} diff --git a/DiunaBI.UI.Shared/Pages/Layers/Details.razor b/DiunaBI.UI.Shared/Pages/Layers/Details.razor index 3533762..a29669d 100644 --- a/DiunaBI.UI.Shared/Pages/Layers/Details.razor +++ b/DiunaBI.UI.Shared/Pages/Layers/Details.razor @@ -234,6 +234,89 @@ } + @if (layer?.Type == LayerType.Validation) + { + + @if (records.Any()) + { + var overallStatus = records.FirstOrDefault(r => r.Code == "OverallStatus")?.Desc1; + var confidence = records.FirstOrDefault(r => r.Code == "Confidence")?.Desc1; + var summary = records.FirstOrDefault(r => r.Code == "Summary")?.Desc1; + + + Overall Status: @(overallStatus?.ToUpper() ?? "UNKNOWN") + @if (!string.IsNullOrEmpty(confidence)) + { + Confidence: @confidence + } + + + @if (!string.IsNullOrEmpty(summary)) + { + + Summary + @summary + + } + + @if (records.Where(r => r.Code.StartsWith("ANOMALY_")).Any()) + { + Record Anomalies + + + Code + Details + + + + @context.Code.Replace("ANOMALY_", "") + + @context.Desc1 + + + } + + @if (records.Where(r => r.Code.StartsWith("STRUCTURAL_")).Any()) + { + Structural Issues + + + Type + Details + + + + @context.Code.Replace("STRUCTURAL_", "") + + @context.Desc1 + + + } + + @if (!records.Where(r => r.Code.StartsWith("ANOMALY_")).Any() && !records.Where(r => r.Code.StartsWith("STRUCTURAL_")).Any()) + { + + No anomalies or structural issues detected. All data appears normal. + + } + } + else + { + No validation results available. + } + + } + @if (showHistoryTab) { diff --git a/DiunaBI.UI.Shared/Pages/Layers/Details.razor.cs b/DiunaBI.UI.Shared/Pages/Layers/Details.razor.cs index f8b8da8..e48c934 100644 --- a/DiunaBI.UI.Shared/Pages/Layers/Details.razor.cs +++ b/DiunaBI.UI.Shared/Pages/Layers/Details.razor.cs @@ -503,7 +503,7 @@ public partial class Details : ComponentBase, IDisposable if (layer?.Records == null) return false; var typeRecord = layer.Records.FirstOrDefault(x => x.Code == "Type"); - return typeRecord?.Desc1 == "ImportWorker" || typeRecord?.Desc1 == "ProcessWorker"; + return typeRecord?.Desc1 == "ImportWorker" || typeRecord?.Desc1 == "ProcessWorker" || typeRecord?.Desc1 == "ValidationWorker"; } private async Task RunNow() @@ -545,6 +545,18 @@ public partial class Details : ComponentBase, IDisposable } } + // Validation tab helper methods + private Severity GetValidationSeverity(string? status) + { + return status?.ToLower() switch + { + "pass" => Severity.Success, + "warning" => Severity.Warning, + "critical" => Severity.Error, + _ => Severity.Info + }; + } + public void Dispose() { HubService.EntityChanged -= OnEntityChanged;