1 Commits

Author SHA1 Message Date
f10dfe629e WIP: AI Validator 2025-12-15 20:05:26 +01:00
17 changed files with 1687 additions and 14 deletions

381
CLAUDE.md Normal file
View File

@@ -0,0 +1,381 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
DiunaBI is a full-stack Business Intelligence platform built on .NET 10.0 with a multi-tier Clean Architecture. It provides a plugin-based system for importing, processing, and exporting business data with real-time updates, job queue management, and comprehensive audit trails.
**Tech Stack:**
- Backend: ASP.NET Core 10.0 Web API
- Frontend: Blazor Server + MAUI Mobile (iOS, Android, Windows, macOS)
- Database: SQL Server + EF Core 10.0
- UI Framework: MudBlazor 8.0
- Real-time: SignalR (EntityChangeHub)
- Authentication: JWT Bearer + Google OAuth
- External APIs: Google Sheets API, Google Drive API
- Logging: Serilog (Console, File)
## Common Commands
### Build and Run
```bash
# Build the entire solution
dotnet build DiunaBI.sln
# Build specific project
dotnet build DiunaBI.API/DiunaBI.API.csproj
# Run API (backend)
dotnet run --project DiunaBI.API/DiunaBI.API.csproj
# Run Web UI (Blazor Server)
dotnet run --project DiunaBI.UI.Web/DiunaBI.UI.Web.csproj
# Clean build artifacts
dotnet clean DiunaBI.sln
```
### Database Migrations
```bash
# Add new migration
dotnet ef migrations add <MigrationName> --project DiunaBI.Infrastructure --startup-project DiunaBI.API
# Apply migrations to database
dotnet ef database update --project DiunaBI.Infrastructure --startup-project DiunaBI.API
# Remove last migration (if not applied)
dotnet ef migrations remove --project DiunaBI.Infrastructure --startup-project DiunaBI.API
# List all migrations
dotnet ef migrations list --project DiunaBI.Infrastructure --startup-project DiunaBI.API
```
### Testing
```bash
# Run all tests
dotnet test DiunaBI.Tests/DiunaBI.Tests.csproj
# Run tests with detailed output
dotnet test DiunaBI.Tests/DiunaBI.Tests.csproj --logger "console;verbosity=detailed"
```
### Plugin Development
```bash
# Build plugin project (automatically copies DLL to API bin/Plugins/)
dotnet build DiunaBI.Plugins.Morska/DiunaBI.Plugins.Morska.csproj
dotnet build DiunaBI.Plugins.PedrolloPL/DiunaBI.Plugins.PedrolloPL.csproj
# Plugins are auto-loaded from bin/Plugins/ at API startup
```
### Docker (Production)
```bash
# Build API container (with Morska plugin)
docker build -f DiunaBI.API/Dockerfile -t diunabi-api --build-arg PLUGIN_PROJECT=DiunaBI.Plugins.Morska .
# Build API container (with PedrolloPL plugin)
docker build -f DiunaBI.API/Dockerfile -t diunabi-api --build-arg PLUGIN_PROJECT=DiunaBI.Plugins.PedrolloPL .
```
## Architecture
### Solution Structure (10 Projects)
**DiunaBI.API** - ASP.NET Core Web API
- Controllers: Auth, Layers, Jobs, DataInbox
- Hubs: EntityChangeHub (SignalR)
- Services: GoogleAuthService, JwtTokenService
- API Key authorization via [ApiKeyAuth] attribute
**DiunaBI.Domain** - Domain entities (9 entities)
- User, Layer, Record, RecordHistory, QueueJob, DataInbox, ProcessSource
**DiunaBI.Application** - DTOs and application models
- LayerDto, RecordDto, UserDto, RecordHistoryDto, JobDto, PagedResult<T>
**DiunaBI.Infrastructure** - Data access and core services
- Data: AppDbContext, 47 EF Core migrations
- Interceptors: EntityChangeInterceptor (auto-broadcasts DB changes via SignalR)
- Services: PluginManager, JobSchedulerService, JobWorkerService
- Helpers: GoogleSheetsHelper, GoogleDriveHelper
- Plugin Base Classes: BaseDataImporter, BaseDataProcessor, BaseDataExporter
**DiunaBI.UI.Web** - Blazor Server web application
**DiunaBI.UI.Mobile** - MAUI mobile application (iOS, Android, Windows, macOS)
**DiunaBI.UI.Shared** - Shared Blazor component library
- Pages/: Feature-based folders (Layers/, Jobs/, DataInbox/)
- Components/: Layout/ (MainLayout, EmptyLayout, Routes), Auth/ (AuthGuard, LoginCard)
- Services/: LayerService, JobService, DataInboxService, EntityChangeHubService, AuthService
**DiunaBI.Plugins.Morska** - Production plugin (17 total)
- 4 Importers: Standard, D1, D3, FK2
- 12 Processors: D6, T1, T3, T4, T5 variants
- 1 Exporter: Google Sheets export
**DiunaBI.Plugins.PedrolloPL** - Production plugin (1 total)
- 1 Importer: B3 (DataInbox → Layer with dictionary mapping)
**DiunaBI.Tests** - Unit and integration tests
### Data Flow Architecture
1. **Import Flow**: External data → DataInbox API → Import Plugin → Import Layer
2. **Process Flow**: Import Layer → Process Plugin → Processed Layer
3. **Export Flow**: Processed Layer → Export Plugin → Google Sheets/Drive
4. **Real-time Updates**: DB Change → EntityChangeInterceptor → SignalR Hub → All Clients
### Plugin System
**Plugin Discovery:**
- Plugins loaded from `bin/Plugins/` directory at startup
- PluginManager scans assemblies for IDataImporter, IDataProcessor, IDataExporter implementations
- Plugins auto-registered in DI container
**Plugin Base Classes** (in DiunaBI.Infrastructure/Plugins/):
- `BaseDataImporter`: Abstract base for importers, access to AppDbContext, GoogleSheetsHelper, GoogleDriveHelper
- `BaseDataProcessor`: Abstract base for processors, access to AppDbContext, PluginManager
- `BaseDataExporter`: Abstract base for exporters, access to AppDbContext, GoogleSheetsHelper, GoogleDriveHelper
**Plugin Execution:**
- Importers: Read external data sources, create Layer + Records
- Processors: Read source Layers, apply transformations, create target Layers + Records
- Exporters: Read Layers, write to Google Sheets/Drive
**Plugin Configuration:**
- Stored in Administration layers (Type = ImportWorker/ProcessWorker)
- Records with Code = "Plugin", "Priority", "MaxRetries" define job configs
- JobSchedulerService reads configs and creates QueueJobs
### Job Queue System
**Components:**
- `QueueJob` entity: LayerId, PluginName, JobType (Import/Process), Status (Pending/Running/Completed/Failed/Retrying), Priority (0 = highest)
- `JobSchedulerService`: Creates jobs from Administration layer configs
- `JobWorkerService`: Background service polling every 5 seconds, executes jobs via PluginManager
- Retry logic: 30s → 2m → 5m delays, max 5 retries
**Job Lifecycle:**
1. Creation: JobSchedulerService or manual via `/jobs/create-for-layer/{layerId}`
2. Queued: Status = Pending, sorted by CreatedAt DESC then Priority ASC
3. Execution: JobWorkerService picks up, Status = Running
4. Completion: Status = Completed or Failed
5. Retry: On failure, Status = Retrying with exponential backoff
6. Real-time: All status changes broadcast via SignalR to UI
**Job Scheduling Endpoints:**
- `POST /jobs/ui/schedule` - Schedule all jobs (JWT auth for UI users)
- `POST /jobs/ui/schedule/imports` - Schedule import jobs only (JWT auth)
- `POST /jobs/ui/schedule/processes` - Schedule process jobs only (JWT auth)
- `POST /jobs/schedule/{apiKey}` - Schedule all jobs (API key auth for cron)
- `POST /jobs/schedule/imports/{apiKey}` - Schedule import jobs (API key auth)
- `POST /jobs/schedule/processes/{apiKey}` - Schedule process jobs (API key auth)
### Real-time Updates (SignalR)
**Architecture:**
- Hub: `/hubs/entitychanges` (requires JWT authentication)
- Interceptor: `EntityChangeInterceptor` captures EF Core changes (Added, Modified, Deleted)
- Broadcast: After SaveChanges, sends `EntityChanged(module, id, operation)` to all clients
- Modules: QueueJobs, Layers, Records, RecordHistory
**UI Integration:**
- `EntityChangeHubService`: Singleton service initialized after authentication in MainLayout
- Components subscribe: `HubService.EntityChanged += OnEntityChanged`
- Auto-reconnect enabled
- Pages with real-time updates: Jobs/Index, Jobs/Details, Layers/Index, Layers/Details, DataInbox/Index
**Authentication Flow:**
1. User logs in with Google OAuth → JWT token stored in localStorage
2. `TokenProvider.Token` populated in AuthService
3. `MainLayout` subscribes to `AuthenticationStateChanged` event
4. On auth success, SignalR connection initialized with JWT token
5. Token sent via `accessTokenProvider` in HubConnection options
### Authentication & Security
**Google OAuth Flow:**
1. Client exchanges Google ID token → `POST /auth/apiToken`
2. GoogleAuthService validates with Google, maps to internal User entity
3. Returns JWT (7-day expiration, HS256 signing)
4. JWT required on all protected endpoints except `/auth/apiToken`, `/health`
5. UserId extraction middleware sets X-UserId header for audit trails
**API Key Authentication:**
- Custom [ApiKeyAuth] attribute for cron job endpoints
- X-API-Key header with constant-time comparison
- Used for DataInbox external endpoints and job scheduling
**Security Features:**
- Rate limiting: 100 req/min general, 10 req/min auth
- Security headers: XSS, clickjacking, MIME sniffing protection
- Input validation: Pagination limits (1-1000), Base64 size limits (10MB)
- Stack trace hiding: Generic error messages in production
- CORS: Configured for localhost:4200, diuna.bim-it.pl, morska.diunabi.com
### Database Schema
**47 EF Core Migrations** - All in DiunaBI.Infrastructure/Migrations/
**Key Entities:**
**User**
- Id (Guid), Email, UserName
- Google OAuth identity
- Constant: `User.AutoImportUserId = "f392209e-123e-4651-a5a4-0b1d6cf9ff9d"` (system operations)
**Layer**
- Number, Name, Type (Import/Processed/Administration/Dictionary)
- ParentId (hierarchical relationships)
- IsDeleted (soft delete), IsCancelled (processing control)
- CreatedAt, ModifiedAt, CreatedById, ModifiedById (audit trail)
**Record**
- Code (unique identifier per layer), LayerId
- Value1-Value32 (double?), Desc1 (string, max 10000 chars)
- IsDeleted (soft delete)
- Full audit trail via RecordHistory
**RecordHistory** (Migration 47)
- RecordId, LayerId, ChangedAt, ChangedById
- ChangeType (Created/Updated/Deleted)
- Code, Desc1 (snapshot)
- ChangedFields (comma-separated), ChangesSummary (JSON old/new values)
- Indexes: (RecordId, ChangedAt), (LayerId, ChangedAt)
**QueueJob**
- LayerId, PluginName, JobType, Priority, Status
- RetryCount, MaxRetries (default 5)
- CreatedAt, ModifiedAt, LastAttemptAt, CompletedAt
- CreatedById, ModifiedById (uses User.AutoImportUserId for system jobs)
**DataInbox**
- Name, Source (identifiers), Data (base64-encoded JSON array)
- Used by importers to stage incoming data
**ProcessSource**
- SourceLayerId, TargetLayerId (defines layer processing relationships)
**Audit Trail Patterns:**
- All entities have CreatedAt, ModifiedAt with GETUTCDATE() defaults
- Foreign keys to Users: CreatedById, ModifiedById
- Soft deletes via IsDeleted flag
- RecordHistory tracks field-level changes with JSON diffs
### UI Organization
**Feature-Based Structure** (as of Dec 5, 2025):
- `Pages/Layers/` - Index.razor + Details.razor (list, detail, edit, history)
- `Pages/Jobs/` - Index.razor + Details.razor (list, detail, retry, cancel, scheduling)
- `Pages/DataInbox/` - Index.razor + Details.razor (list, detail, base64 decode)
- `Components/Layout/` - MainLayout, EmptyLayout, Routes
- `Components/Auth/` - AuthGuard, LoginCard
**Code-Behind Pattern:**
- Complex pages (50+ lines logic): Separate `.razor.cs` files
- Simple pages: Inline `@code` blocks
- Namespaces: `DiunaBI.UI.Shared.Pages.{Feature}`
**Filter State Persistence:**
- LayerFilterStateService, DataInboxFilterStateService
- Singleton services remember search, type, page selections across navigation
**Timezone Handling:**
- DateTimeHelper service detects browser timezone via JS Interop
- All dates stored as UTC in DB, converted to user's local timezone for display
- Format: "yyyy-MM-dd HH:mm:ss"
## Configuration
**Environment Variables** (appsettings.Development.json):
- `ConnectionStrings:SQLDatabase` - SQL Server connection (localhost:21433, DB: DiunaBI-PedrolloPL)
- `JwtSettings:SecurityKey`, `JwtSettings:ExpiryDays` (7)
- `GoogleAuth:ClientId`, `GoogleAuth:RedirectUri`
- `apiKey`, `apiUser`, `apiPass` - DataInbox API security
- `exportDirectory` - Google Drive folder ID for exports
- `InstanceName` - DEV/PROD environment identifier
**CORS Origins:**
- http://localhost:4200 (development)
- https://diuna.bim-it.pl (production)
- https://morska.diunabi.com (production)
**Logging:**
- Serilog with Console + File sinks
- Override levels: Microsoft.AspNetCore = Warning, EF Core = Warning, Google.Apis = Warning
- Sensitive data logging only in Development
## Development Patterns
### When Adding a New Plugin
1. Create new project: `DiunaBI.Plugins.{Name}`
2. Reference: DiunaBI.Domain, DiunaBI.Infrastructure
3. Inherit from: BaseDataImporter/BaseDataProcessor/BaseDataExporter
4. Implement abstract methods: ImportAsync/ProcessAsync/ExportAsync, ValidateConfiguration
5. Build triggers automatic copy to `bin/Plugins/` via MSBuild target
6. No registration needed - PluginManager auto-discovers at startup
### When Adding a New Entity
1. Create entity in DiunaBI.Domain/Entities/
2. Add DbSet to AppDbContext
3. Create migration: `dotnet ef migrations add {Name} --project DiunaBI.Infrastructure --startup-project DiunaBI.API`
4. If entity needs real-time updates, add module name to EntityChangeInterceptor
5. Create DTO in DiunaBI.Application/DTOs/
6. Add controller in DiunaBI.API/Controllers/ with [Authorize] and [RequireRateLimit("api")]
### When Adding Real-time Updates to a UI Page
1. Inject EntityChangeHubService
2. Subscribe to EntityChanged event in OnInitializedAsync
3. Filter by module name
4. Call StateHasChanged() in event handler
5. Implement IDisposable, unsubscribe in Dispose
6. Test both initial load and SignalR updates
### When Modifying Job System
- JobSchedulerService: Changes to job creation from layer configs
- JobWorkerService: Changes to job execution, retry logic, rate limiting
- QueueJob entity: Changes to job schema require migration
- Jobs UI: Real-time updates required, test SignalR broadcasts
### Security Considerations
- Never log sensitive data (tokens, passwords, API keys) except in Development
- Use generic error messages in production (no stack traces)
- All new endpoints require [Authorize] unless explicitly [AllowAnonymous]
- API key endpoints require [ApiKeyAuth] and [AllowAnonymous]
- Validate pagination parameters (1-1000 range)
- Use constant-time comparison for API keys
- Rate limit all public endpoints
## Known Limitations
- JobWorkerService polls every 5 seconds (not event-driven)
- Google Sheets API quota: 5-second delay after import jobs
- Retry delays fixed: 30s → 2m → 5m (not configurable per job)
- Plugin configuration stored in Records (not strongly typed)
- No plugin versioning or hot-reload support
- SignalR requires JWT authentication (no anonymous connections)
## Important Notes
- **DO NOT** commit `.env` file (contains secrets)
- **DO NOT** modify migration files after applying to production
- **ALWAYS** use User.AutoImportUserId for system-created entities (jobs, automated processes)
- **ALWAYS** implement IDisposable for pages subscribing to SignalR events
- **ALWAYS** test real-time updates when modifying entities with SignalR broadcasts
- **Plugin DLLs** are auto-copied to `bin/Plugins/` on build via MSBuild target
- **Database changes** require migration AND applying to production via deployment scripts
- **Foreign keys** use RESTRICT (not CASCADE) to prevent accidental data loss (Migration 45)
- **Soft deletes** via IsDeleted flag, not physical deletion
- **Timezone handling** - store UTC, display local (via DateTimeHelper)

View File

@@ -28,5 +28,17 @@
],
"Enrich": ["FromLogContext", "WithMachineName", "WithThreadId"]
},
"AllowedHosts": "*"
"AllowedHosts": "*",
"AnomalyDetection": {
"Provider": "OpenAI",
"Model": "gpt-4o-mini",
"ApiKey": "",
"Endpoint": "",
"MaxTokens": 4000,
"Temperature": 0.1,
"MinHistoricalImports": 5,
"RecentImportsWindow": 5,
"MonthlyImportsWindow": 5,
"ConfidenceThreshold": 0.7
}
}

View File

@@ -25,7 +25,8 @@ public enum LayerType
Import,
Processed,
Administration,
Dictionary
Dictionary,
Validation
}
public class LayerFilterRequest

View File

@@ -10,6 +10,7 @@ public enum LayerType
Processed,
Administration,
Dictionary,
Validation,
}
public class Layer
{

View File

@@ -26,7 +26,8 @@ public class QueueJob
public enum JobType
{
Import = 0,
Process = 1
Process = 1,
Validate = 2
}
public enum JobStatus

View File

@@ -22,6 +22,8 @@
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="10.0.0" />
<PackageReference Include="Google.Apis.Sheets.v4" Version="1.68.0.3525" />
<PackageReference Include="Google.Apis.Drive.v3" Version="1.68.0.3490" />
<PackageReference Include="Microsoft.SemanticKernel" Version="1.68.0" />
<PackageReference Include="Microsoft.SemanticKernel.Connectors.Ollama" Version="1.68.0-alpha" />
</ItemGroup>
<ItemGroup>

View File

@@ -0,0 +1,11 @@
using DiunaBI.Domain.Entities;
namespace DiunaBI.Infrastructure.Interfaces;
public interface IDataValidator
{
string ValidatorType { get; }
bool CanValidate(string validatorType);
void Validate(Layer validationWorker);
}

View File

@@ -0,0 +1,489 @@
// <auto-generated />
using System;
using DiunaBI.Infrastructure.Data;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
#nullable disable
namespace DiunaBI.Infrastructure.Migrations
{
[DbContext(typeof(AppDbContext))]
[Migration("20251214143012_AddValidationLayerAndJobTypes")]
partial class AddValidationLayerAndJobTypes
{
/// <inheritdoc />
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "10.0.0")
.HasAnnotation("Relational:MaxIdentifierLength", 128);
SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder);
modelBuilder.Entity("DiunaBI.Domain.Entities.DataInbox", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uniqueidentifier");
b.Property<DateTime>("CreatedAt")
.ValueGeneratedOnAdd()
.HasColumnType("datetime2")
.HasDefaultValueSql("GETUTCDATE()");
b.Property<string>("Data")
.IsRequired()
.HasColumnType("nvarchar(max)");
b.Property<string>("Name")
.IsRequired()
.HasMaxLength(50)
.HasColumnType("nvarchar(50)");
b.Property<string>("Source")
.IsRequired()
.HasMaxLength(50)
.HasColumnType("nvarchar(50)");
b.HasKey("Id");
b.ToTable("DataInbox");
});
modelBuilder.Entity("DiunaBI.Domain.Entities.Layer", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uniqueidentifier");
b.Property<DateTime>("CreatedAt")
.ValueGeneratedOnAdd()
.HasColumnType("datetime2")
.HasDefaultValueSql("GETUTCDATE()");
b.Property<Guid>("CreatedById")
.HasColumnType("uniqueidentifier");
b.Property<bool>("IsCancelled")
.ValueGeneratedOnAdd()
.HasColumnType("bit")
.HasDefaultValue(false);
b.Property<bool>("IsDeleted")
.ValueGeneratedOnAdd()
.HasColumnType("bit")
.HasDefaultValue(false);
b.Property<DateTime>("ModifiedAt")
.ValueGeneratedOnAdd()
.HasColumnType("datetime2")
.HasDefaultValueSql("GETUTCDATE()");
b.Property<Guid>("ModifiedById")
.HasColumnType("uniqueidentifier");
b.Property<string>("Name")
.IsRequired()
.HasMaxLength(50)
.HasColumnType("nvarchar(50)");
b.Property<int>("Number")
.HasColumnType("int");
b.Property<Guid?>("ParentId")
.HasColumnType("uniqueidentifier");
b.Property<int>("Type")
.HasColumnType("int");
b.HasKey("Id");
b.HasIndex("CreatedById");
b.HasIndex("ModifiedById");
b.ToTable("Layers");
});
modelBuilder.Entity("DiunaBI.Domain.Entities.ProcessSource", b =>
{
b.Property<Guid>("LayerId")
.HasColumnType("uniqueidentifier");
b.Property<Guid>("SourceId")
.HasColumnType("uniqueidentifier");
b.HasKey("LayerId", "SourceId");
b.HasIndex("SourceId");
b.ToTable("ProcessSources");
});
modelBuilder.Entity("DiunaBI.Domain.Entities.QueueJob", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uniqueidentifier");
b.Property<DateTime?>("CompletedAt")
.HasColumnType("datetime2");
b.Property<DateTime>("CreatedAt")
.ValueGeneratedOnAdd()
.HasColumnType("datetime2")
.HasDefaultValueSql("GETUTCDATE()");
b.Property<Guid>("CreatedById")
.HasColumnType("uniqueidentifier");
b.Property<int>("JobType")
.HasColumnType("int");
b.Property<DateTime?>("LastAttemptAt")
.HasColumnType("datetime2");
b.Property<string>("LastError")
.HasMaxLength(1000)
.HasColumnType("nvarchar(1000)");
b.Property<Guid>("LayerId")
.HasColumnType("uniqueidentifier");
b.Property<string>("LayerName")
.IsRequired()
.HasMaxLength(200)
.HasColumnType("nvarchar(200)");
b.Property<int>("MaxRetries")
.HasColumnType("int");
b.Property<DateTime>("ModifiedAt")
.ValueGeneratedOnAdd()
.HasColumnType("datetime2")
.HasDefaultValueSql("GETUTCDATE()");
b.Property<Guid>("ModifiedById")
.HasColumnType("uniqueidentifier");
b.Property<string>("PluginName")
.IsRequired()
.HasMaxLength(100)
.HasColumnType("nvarchar(100)");
b.Property<int>("Priority")
.HasColumnType("int");
b.Property<int>("RetryCount")
.HasColumnType("int");
b.Property<int>("Status")
.HasColumnType("int");
b.HasKey("Id");
b.ToTable("QueueJobs");
});
modelBuilder.Entity("DiunaBI.Domain.Entities.Record", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uniqueidentifier");
b.Property<string>("Code")
.IsRequired()
.HasMaxLength(50)
.HasColumnType("nvarchar(50)");
b.Property<DateTime>("CreatedAt")
.ValueGeneratedOnAdd()
.HasColumnType("datetime2")
.HasDefaultValueSql("GETUTCDATE()");
b.Property<Guid>("CreatedById")
.HasColumnType("uniqueidentifier");
b.Property<string>("Desc1")
.HasMaxLength(10000)
.HasColumnType("nvarchar(max)");
b.Property<bool>("IsDeleted")
.HasColumnType("bit");
b.Property<Guid>("LayerId")
.HasColumnType("uniqueidentifier");
b.Property<DateTime>("ModifiedAt")
.ValueGeneratedOnAdd()
.HasColumnType("datetime2")
.HasDefaultValueSql("GETUTCDATE()");
b.Property<Guid>("ModifiedById")
.HasColumnType("uniqueidentifier");
b.Property<double?>("Value1")
.HasColumnType("float");
b.Property<double?>("Value10")
.HasColumnType("float");
b.Property<double?>("Value11")
.HasColumnType("float");
b.Property<double?>("Value12")
.HasColumnType("float");
b.Property<double?>("Value13")
.HasColumnType("float");
b.Property<double?>("Value14")
.HasColumnType("float");
b.Property<double?>("Value15")
.HasColumnType("float");
b.Property<double?>("Value16")
.HasColumnType("float");
b.Property<double?>("Value17")
.HasColumnType("float");
b.Property<double?>("Value18")
.HasColumnType("float");
b.Property<double?>("Value19")
.HasColumnType("float");
b.Property<double?>("Value2")
.HasColumnType("float");
b.Property<double?>("Value20")
.HasColumnType("float");
b.Property<double?>("Value21")
.HasColumnType("float");
b.Property<double?>("Value22")
.HasColumnType("float");
b.Property<double?>("Value23")
.HasColumnType("float");
b.Property<double?>("Value24")
.HasColumnType("float");
b.Property<double?>("Value25")
.HasColumnType("float");
b.Property<double?>("Value26")
.HasColumnType("float");
b.Property<double?>("Value27")
.HasColumnType("float");
b.Property<double?>("Value28")
.HasColumnType("float");
b.Property<double?>("Value29")
.HasColumnType("float");
b.Property<double?>("Value3")
.HasColumnType("float");
b.Property<double?>("Value30")
.HasColumnType("float");
b.Property<double?>("Value31")
.HasColumnType("float");
b.Property<double?>("Value32")
.HasColumnType("float");
b.Property<double?>("Value4")
.HasColumnType("float");
b.Property<double?>("Value5")
.HasColumnType("float");
b.Property<double?>("Value6")
.HasColumnType("float");
b.Property<double?>("Value7")
.HasColumnType("float");
b.Property<double?>("Value8")
.HasColumnType("float");
b.Property<double?>("Value9")
.HasColumnType("float");
b.HasKey("Id");
b.HasIndex("CreatedById");
b.HasIndex("LayerId");
b.HasIndex("ModifiedById");
b.ToTable("Records");
});
modelBuilder.Entity("DiunaBI.Domain.Entities.RecordHistory", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uniqueidentifier");
b.Property<int>("ChangeType")
.HasColumnType("int");
b.Property<DateTime>("ChangedAt")
.HasColumnType("datetime2");
b.Property<Guid>("ChangedById")
.HasColumnType("uniqueidentifier");
b.Property<string>("ChangedFields")
.HasMaxLength(200)
.HasColumnType("nvarchar(200)");
b.Property<string>("ChangesSummary")
.HasMaxLength(4000)
.HasColumnType("nvarchar(4000)");
b.Property<string>("Code")
.IsRequired()
.HasMaxLength(50)
.HasColumnType("nvarchar(50)");
b.Property<string>("Desc1")
.HasMaxLength(10000)
.HasColumnType("nvarchar(max)");
b.Property<Guid>("LayerId")
.HasColumnType("uniqueidentifier");
b.Property<Guid>("RecordId")
.HasColumnType("uniqueidentifier");
b.HasKey("Id");
b.HasIndex("ChangedById");
b.HasIndex("LayerId", "ChangedAt");
b.HasIndex("RecordId", "ChangedAt");
b.ToTable("RecordHistory");
});
modelBuilder.Entity("DiunaBI.Domain.Entities.User", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uniqueidentifier");
b.Property<DateTime>("CreatedAt")
.ValueGeneratedOnAdd()
.HasColumnType("datetime2")
.HasDefaultValueSql("GETUTCDATE()");
b.Property<string>("Email")
.HasMaxLength(50)
.HasColumnType("nvarchar(50)");
b.Property<string>("UserName")
.HasMaxLength(50)
.HasColumnType("nvarchar(50)");
b.HasKey("Id");
b.ToTable("Users");
});
modelBuilder.Entity("DiunaBI.Domain.Entities.Layer", b =>
{
b.HasOne("DiunaBI.Domain.Entities.User", "CreatedBy")
.WithMany()
.HasForeignKey("CreatedById")
.OnDelete(DeleteBehavior.Restrict)
.IsRequired();
b.HasOne("DiunaBI.Domain.Entities.User", "ModifiedBy")
.WithMany()
.HasForeignKey("ModifiedById")
.OnDelete(DeleteBehavior.Restrict)
.IsRequired();
b.Navigation("CreatedBy");
b.Navigation("ModifiedBy");
});
modelBuilder.Entity("DiunaBI.Domain.Entities.ProcessSource", b =>
{
b.HasOne("DiunaBI.Domain.Entities.Layer", null)
.WithMany()
.HasForeignKey("LayerId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.HasOne("DiunaBI.Domain.Entities.Layer", "Source")
.WithMany()
.HasForeignKey("SourceId")
.OnDelete(DeleteBehavior.Restrict)
.IsRequired();
b.Navigation("Source");
});
modelBuilder.Entity("DiunaBI.Domain.Entities.Record", b =>
{
b.HasOne("DiunaBI.Domain.Entities.User", "CreatedBy")
.WithMany()
.HasForeignKey("CreatedById")
.OnDelete(DeleteBehavior.Restrict)
.IsRequired();
b.HasOne("DiunaBI.Domain.Entities.Layer", null)
.WithMany("Records")
.HasForeignKey("LayerId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired();
b.HasOne("DiunaBI.Domain.Entities.User", "ModifiedBy")
.WithMany()
.HasForeignKey("ModifiedById")
.OnDelete(DeleteBehavior.Restrict)
.IsRequired();
b.Navigation("CreatedBy");
b.Navigation("ModifiedBy");
});
modelBuilder.Entity("DiunaBI.Domain.Entities.RecordHistory", b =>
{
b.HasOne("DiunaBI.Domain.Entities.User", "ChangedBy")
.WithMany()
.HasForeignKey("ChangedById")
.OnDelete(DeleteBehavior.Restrict)
.IsRequired();
b.Navigation("ChangedBy");
});
modelBuilder.Entity("DiunaBI.Domain.Entities.Layer", b =>
{
b.Navigation("Records");
});
#pragma warning restore 612, 618
}
}
}

View File

@@ -0,0 +1,22 @@
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace DiunaBI.Infrastructure.Migrations
{
/// <inheritdoc />
public partial class AddValidationLayerAndJobTypes : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
}
/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
}
}
}

View File

@@ -0,0 +1,21 @@
using DiunaBI.Domain.Entities;
using DiunaBI.Infrastructure.Interfaces;
namespace DiunaBI.Infrastructure.Plugins;
public abstract class BaseDataValidator : IDataValidator
{
public abstract string ValidatorType { get; }
public virtual bool CanValidate(string validatorType) => ValidatorType == validatorType;
public abstract void Validate(Layer validationWorker);
/// <summary>
/// Helper method to get record value by code from layer records
/// </summary>
protected string? GetRecordValue(ICollection<Record> records, string code)
{
return records.FirstOrDefault(x => x.Code == code)?.Desc1;
}
}

View File

@@ -221,14 +221,114 @@ public class JobSchedulerService
return jobsCreated;
}
public async Task<int> ScheduleValidateJobsAsync()
{
_logger.LogInformation("JobScheduler: Starting validation job scheduling");
var validationWorkers = await _db.Layers
.Include(x => x.Records)
.Where(x =>
x.Records!.Any(r => r.Code == "Type" && r.Desc1 == "ValidationWorker") &&
x.Records!.Any(r => r.Code == "IsEnabled" && r.Desc1 == "True")
)
.OrderBy(x => x.CreatedAt)
.AsNoTracking()
.ToListAsync();
_logger.LogInformation("JobScheduler: Found {Count} validation workers to schedule", validationWorkers.Count);
var jobsCreated = 0;
var scheduledLayerIds = new HashSet<Guid>(); // Track LayerIds scheduled in this batch
foreach (var worker in validationWorkers)
{
try
{
var plugin = worker.Records?.FirstOrDefault(r => r.Code == "Plugin")?.Desc1;
if (string.IsNullOrEmpty(plugin))
{
_logger.LogWarning("JobScheduler: Validation worker {LayerName} ({LayerId}) has no Plugin configured, skipping",
worker.Name, worker.Id);
continue;
}
// Get priority from config (default: 200 for validation - lower than processes)
var priorityStr = worker.Records?.FirstOrDefault(r => r.Code == "Priority")?.Desc1;
var priority = int.TryParse(priorityStr, out var p) ? p : 200;
// Get max retries from config (default: 3)
var maxRetriesStr = worker.Records?.FirstOrDefault(r => r.Code == "MaxRetries")?.Desc1;
var maxRetries = int.TryParse(maxRetriesStr, out var mr) ? mr : 3;
// Check in-memory: already scheduled in this batch?
if (scheduledLayerIds.Contains(worker.Id))
{
_logger.LogDebug("JobScheduler: Job already scheduled in this batch for {LayerName} ({LayerId})",
worker.Name, worker.Id);
continue;
}
// Check if there's already a pending/running job for this layer in database
var existingJob = await _db.QueueJobs
.Where(j => j.LayerId == worker.Id &&
(j.Status == JobStatus.Pending || j.Status == JobStatus.Running))
.FirstOrDefaultAsync();
if (existingJob != null)
{
_logger.LogDebug("JobScheduler: Job already exists for {LayerName} ({LayerId}), status: {Status}",
worker.Name, worker.Id, existingJob.Status);
continue;
}
var job = new QueueJob
{
Id = Guid.NewGuid(),
LayerId = worker.Id,
LayerName = worker.Name ?? "Unknown",
PluginName = plugin,
JobType = JobType.Validate,
Priority = priority,
MaxRetries = maxRetries,
Status = JobStatus.Pending,
CreatedAt = DateTime.UtcNow,
ModifiedAt = DateTime.UtcNow,
CreatedById = DiunaBI.Domain.Entities.User.AutoImportUserId,
ModifiedById = DiunaBI.Domain.Entities.User.AutoImportUserId
};
_db.QueueJobs.Add(job);
scheduledLayerIds.Add(worker.Id); // Track that we've scheduled this layer
jobsCreated++;
_logger.LogInformation("JobScheduler: Created validation job for {LayerName} ({LayerId}) with priority {Priority}",
worker.Name, worker.Id, priority);
}
catch (Exception ex)
{
_logger.LogError(ex, "JobScheduler: Failed to create job for {LayerName} ({LayerId})",
worker.Name, worker.Id);
}
}
if (jobsCreated > 0)
{
await _db.SaveChangesAsync();
_logger.LogInformation("JobScheduler: Successfully created {Count} validation jobs", jobsCreated);
}
return jobsCreated;
}
public async Task<int> ScheduleAllJobsAsync(string? nameFilter = null)
{
var importCount = await ScheduleImportJobsAsync(nameFilter);
var processCount = await ScheduleProcessJobsAsync();
var validateCount = await ScheduleValidateJobsAsync();
_logger.LogInformation("JobScheduler: Scheduled {ImportCount} import jobs and {ProcessCount} process jobs",
importCount, processCount);
_logger.LogInformation("JobScheduler: Scheduled {ImportCount} import jobs, {ProcessCount} process jobs, and {ValidateCount} validation jobs",
importCount, processCount, validateCount);
return importCount + processCount;
return importCount + processCount + validateCount;
}
}

View File

@@ -110,6 +110,19 @@ public class JobWorkerService : BackgroundService
processor.Process(layer);
}
else if (job.JobType == JobType.Validate)
{
var validator = pluginManager.GetValidator(job.PluginName);
if (validator == null)
{
throw new Exception($"Validator '{job.PluginName}' not found");
}
_logger.LogInformation("JobWorker: Executing validation for {LayerName} using {PluginName}",
job.LayerName, job.PluginName);
validator.Validate(layer);
}
// Job completed successfully
job.Status = JobStatus.Completed;

View File

@@ -12,6 +12,7 @@ public class PluginManager
private readonly List<Type> _processorTypes = new();
private readonly List<Type> _importerTypes = new();
private readonly List<Type> _exporterTypes = new();
private readonly List<Type> _validatorTypes = new();
private readonly List<IPlugin> _plugins = new();
public PluginManager(ILogger<PluginManager> logger, IServiceProvider serviceProvider)
@@ -42,10 +43,11 @@ public class PluginManager
}
}
_logger.LogInformation("Loaded {ProcessorCount} processors, {ImporterCount} importers, and {ExporterCount} exporters from {AssemblyCount} assemblies",
_logger.LogInformation("Loaded {ProcessorCount} processors, {ImporterCount} importers, {ExporterCount} exporters, and {ValidatorCount} validators from {AssemblyCount} assemblies",
_processorTypes.Count,
_importerTypes.Count,
_exporterTypes.Count,
_validatorTypes.Count,
dllFiles.Length);
}
@@ -77,6 +79,12 @@ public class PluginManager
_exporterTypes.Add(type);
_logger.LogDebug("Registered exporter: {Type}", type.Name);
}
if (typeof(IDataValidator).IsAssignableFrom(type) && !type.IsInterface && !type.IsAbstract)
{
_validatorTypes.Add(type);
_logger.LogDebug("Registered validator: {Type}", type.Name);
}
}
}
catch (Exception ex)
@@ -157,5 +165,29 @@ public class PluginManager
return null;
}
public int GetPluginsCount() => _processorTypes.Count + _importerTypes.Count + _exporterTypes.Count;
public IDataValidator? GetValidator(string validatorType)
{
foreach (var type in _validatorTypes)
{
try
{
var scope = _serviceProvider.CreateScope();
var instance = (IDataValidator)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type);
if (instance.CanValidate(validatorType))
{
return instance;
}
scope.Dispose();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to create validator instance of type {Type}", type.Name);
}
}
return null;
}
public int GetPluginsCount() => _processorTypes.Count + _importerTypes.Count + _exporterTypes.Count + _validatorTypes.Count;
}

View File

@@ -0,0 +1,496 @@
using System.Text.Json;
using DiunaBI.Domain.Entities;
using DiunaBI.Infrastructure.Data;
using DiunaBI.Infrastructure.Plugins;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.ChatCompletion;
using Microsoft.SemanticKernel.Connectors.OpenAI;
using Microsoft.SemanticKernel.Connectors.Ollama;
namespace DiunaBI.Infrastructure.Validators;
public class LlmAnomalyValidator : BaseDataValidator
{
public override string ValidatorType => "LlmAnomalyValidator";
private readonly AppDbContext _db;
private readonly IConfiguration _config;
private readonly ILogger<LlmAnomalyValidator> _logger;
private readonly Kernel _kernel;
// Configuration loaded from appsettings.json
private readonly string _provider;
private readonly string _model;
private readonly int _minHistoricalImports;
private readonly int _recentImportsWindow;
private readonly int _monthlyImportsWindow;
private readonly double _confidenceThreshold;
// Configuration loaded from ValidationWorker records
private string? SourceLayerName { get; set; }
private Layer? SourceImportWorker { get; set; }
public LlmAnomalyValidator(
AppDbContext db,
IConfiguration config,
ILogger<LlmAnomalyValidator> logger)
{
_db = db;
_config = config;
_logger = logger;
// Load configuration from appsettings.json
_provider = config["AnomalyDetection:Provider"] ?? "OpenAI";
_model = config["AnomalyDetection:Model"] ?? "gpt-4o-mini";
_minHistoricalImports = int.Parse(config["AnomalyDetection:MinHistoricalImports"] ?? "5");
_recentImportsWindow = int.Parse(config["AnomalyDetection:RecentImportsWindow"] ?? "5");
_monthlyImportsWindow = int.Parse(config["AnomalyDetection:MonthlyImportsWindow"] ?? "5");
_confidenceThreshold = double.Parse(config["AnomalyDetection:ConfidenceThreshold"] ?? "0.7");
// Initialize Semantic Kernel based on provider
_kernel = InitializeKernel();
_logger.LogInformation("LlmAnomalyValidator initialized with provider: {Provider}, model: {Model}",
_provider, _model);
}
private Kernel InitializeKernel()
{
var builder = Kernel.CreateBuilder();
switch (_provider.ToLower())
{
case "openai":
var openAiKey = _config["AnomalyDetection:ApiKey"];
if (string.IsNullOrEmpty(openAiKey))
{
throw new InvalidOperationException("OpenAI API key not configured");
}
builder.AddOpenAIChatCompletion(_model, openAiKey);
break;
case "azureopenai":
var azureEndpoint = _config["AnomalyDetection:Endpoint"];
var azureKey = _config["AnomalyDetection:ApiKey"];
if (string.IsNullOrEmpty(azureEndpoint) || string.IsNullOrEmpty(azureKey))
{
throw new InvalidOperationException("Azure OpenAI endpoint or API key not configured");
}
builder.AddAzureOpenAIChatCompletion(_model, azureEndpoint, azureKey);
break;
case "ollama":
var ollamaEndpoint = _config["AnomalyDetection:Endpoint"] ?? "http://localhost:11434";
builder.AddOllamaChatCompletion(_model, new Uri(ollamaEndpoint));
break;
default:
throw new NotSupportedException($"LLM provider '{_provider}' is not supported");
}
return builder.Build();
}
public override void Validate(Layer validationWorker)
{
try
{
_logger.LogInformation("{ValidatorType}: Starting validation for {ValidationWorkerName} ({ValidationWorkerId})",
ValidatorType, validationWorker.Name, validationWorker.Id);
// Load configuration from layer records
LoadConfiguration(validationWorker);
// Validate configuration
ValidateConfiguration();
// Find latest import layer
var latestImport = GetLatestImportLayer();
// Get historical context
var historicalImports = GetHistoricalImports();
// Check if enough historical data
if (historicalImports.Count < _minHistoricalImports)
{
_logger.LogWarning("{ValidatorType}: Not enough historical imports: {Count} (need {Min}). Skipping validation.",
ValidatorType, historicalImports.Count, _minHistoricalImports);
return;
}
// Perform validation
PerformValidation(validationWorker, latestImport, historicalImports);
_logger.LogInformation("{ValidatorType}: Successfully completed validation for {ValidationWorkerName}",
ValidatorType, validationWorker.Name);
}
catch (Exception e)
{
_logger.LogError(e, "{ValidatorType}: Failed to validate {ValidationWorkerName} ({ValidationWorkerId})",
ValidatorType, validationWorker.Name, validationWorker.Id);
throw;
}
}
private void LoadConfiguration(Layer validationWorker)
{
if (validationWorker.Records == null)
{
throw new InvalidOperationException("ValidationWorker has no records");
}
// Load source layer name (ImportWorker Administration Layer)
SourceLayerName = GetRecordValue(validationWorker.Records, "SourceLayer");
if (string.IsNullOrEmpty(SourceLayerName))
{
throw new InvalidOperationException("SourceLayer record not found");
}
_logger.LogDebug("{ValidatorType}: Configuration loaded - SourceLayer: {SourceLayer}",
ValidatorType, SourceLayerName);
}
private void ValidateConfiguration()
{
var errors = new List<string>();
if (string.IsNullOrEmpty(SourceLayerName)) errors.Add("SourceLayer is required");
// Find source import worker (Administration Layer)
SourceImportWorker = _db.Layers
.SingleOrDefault(x => x.Name == SourceLayerName &&
x.Type == LayerType.Administration &&
!x.IsDeleted &&
!x.IsCancelled);
if (SourceImportWorker == null)
{
errors.Add($"SourceImportWorker layer '{SourceLayerName}' not found");
}
if (errors.Any())
{
throw new InvalidOperationException($"Configuration validation failed: {string.Join(", ", errors)}");
}
_logger.LogDebug("{ValidatorType}: Configuration validation passed", ValidatorType);
}
private Layer GetLatestImportLayer()
{
// Find latest Import layer where ParentId = SourceImportWorker.Id
var latestImport = _db.Layers
.Include(x => x.Records)
.Where(x => x.ParentId == SourceImportWorker!.Id &&
x.Type == LayerType.Import &&
!x.IsDeleted &&
!x.IsCancelled)
.OrderByDescending(x => x.CreatedAt)
.FirstOrDefault();
if (latestImport == null)
{
throw new InvalidOperationException(
$"No import layers found for import worker '{SourceImportWorker!.Name}'");
}
_logger.LogDebug("{ValidatorType}: Found latest import layer: {LayerName} ({LayerId})",
ValidatorType, latestImport.Name, latestImport.Id);
return latestImport;
}
private List<Layer> GetHistoricalImports()
{
// Get last N import layers (ordered by CreatedAt)
var historicalImports = _db.Layers
.Include(x => x.Records)
.Where(x => x.ParentId == SourceImportWorker!.Id &&
x.Type == LayerType.Import &&
!x.IsDeleted &&
!x.IsCancelled)
.OrderByDescending(x => x.CreatedAt)
.Take(_recentImportsWindow)
.AsNoTracking()
.ToList();
_logger.LogDebug("{ValidatorType}: Found {Count} historical imports for recent window",
ValidatorType, historicalImports.Count);
return historicalImports;
}
private List<Layer> GetMonthlyBaselineImports()
{
// Get last N "first-of-month" import layers
var monthlyImports = _db.Layers
.Include(x => x.Records)
.Where(x => x.ParentId == SourceImportWorker!.Id &&
x.Type == LayerType.Import &&
x.CreatedAt.Day == 1 &&
!x.IsDeleted &&
!x.IsCancelled)
.OrderByDescending(x => x.CreatedAt)
.Take(_monthlyImportsWindow)
.AsNoTracking()
.ToList();
_logger.LogDebug("{ValidatorType}: Found {Count} monthly baseline imports",
ValidatorType, monthlyImports.Count);
return monthlyImports;
}
private void PerformValidation(Layer validationWorker, Layer latestImport, List<Layer> historicalImports)
{
_logger.LogDebug("{ValidatorType}: Performing validation for import: {ImportName}",
ValidatorType, latestImport.Name);
// Get monthly baseline if available
var monthlyBaseline = GetMonthlyBaselineImports();
// Build prompt with all data
var prompt = BuildPrompt(latestImport, historicalImports, monthlyBaseline);
// Call LLM
var startTime = DateTime.UtcNow;
var llmResponse = CallLlm(prompt);
var processingTime = DateTime.UtcNow - startTime;
// Create Validation Layer with results
var validationLayer = CreateValidationLayer(validationWorker, latestImport, llmResponse, processingTime);
// Save to database
SaveValidationLayer(validationLayer, llmResponse);
_logger.LogInformation("{ValidatorType}: Created validation layer {LayerName} ({LayerId}) in {ProcessingTime}ms",
ValidatorType, validationLayer.Name, validationLayer.Id, processingTime.TotalMilliseconds);
}
private string BuildPrompt(Layer currentImport, List<Layer> recentImports, List<Layer> monthlyBaseline)
{
var currentRecords = currentImport.Records?.OrderBy(r => r.Code).ToList() ?? new List<Record>();
var importType = SourceImportWorker?.Name ?? "Unknown";
var prompt = $@"You are a data quality analyst specializing in anomaly detection for business intelligence imports.
**Import Type:** {importType}
**Import Date:** {currentImport.CreatedAt:yyyy-MM-dd HH:mm:ss}
**Current Import:** {currentImport.Name}
**Current Import Data ({currentRecords.Count} records):**
{JsonSerializer.Serialize(currentRecords.Select(r => new { code = r.Code, value1 = r.Value1 }), new JsonSerializerOptions { WriteIndented = true })}
**Historical Context - Last {recentImports.Count} Imports:**
{string.Join("\n", recentImports.Select((imp, idx) => $"Import {idx + 1} ({imp.CreatedAt:yyyy-MM-dd}): {JsonSerializer.Serialize(imp.Records?.OrderBy(r => r.Code).Select(r => new { code = r.Code, value1 = r.Value1 }) ?? Enumerable.Empty<object>())}"))}
";
if (monthlyBaseline.Any())
{
prompt += $@"
**Monthly Baseline - Last {monthlyBaseline.Count} First-Day Imports:**
{string.Join("\n", monthlyBaseline.Select((imp, idx) => $"Monthly Import {idx + 1} ({imp.CreatedAt:yyyy-MM-dd}): {JsonSerializer.Serialize(imp.Records?.OrderBy(r => r.Code).Select(r => new { code = r.Code, value1 = r.Value1 }) ?? Enumerable.Empty<object>())}"))}
";
}
prompt += @"
**Analysis Tasks:**
1. **Record-level anomalies:** Identify unusual values for specific codes compared to historical patterns
2. **Structural issues:** Detect missing codes, new codes, or unexpected count changes
3. **Pattern breaks:** Find trend reversals, unexpected correlations, or statistical outliers
**Response Format (JSON):**
```json
{
""overallStatus"": ""pass|warning|critical"",
""recordAnomalies"": [
{
""code"": ""string"",
""value1"": number,
""confidence"": 0.0-1.0,
""severity"": ""low|medium|high|critical"",
""reason"": ""brief explanation"",
""recommendation"": ""suggested action""
}
],
""structuralIssues"": [
{
""issueType"": ""missing_codes|new_codes|count_change"",
""description"": ""string"",
""codes"": [""code1"", ""code2""],
""severity"": ""low|medium|high|critical""
}
],
""summary"": ""Brief overall assessment""
}
```
Analyze the data and respond ONLY with the JSON object. Do not include any markdown formatting or additional text.";
return prompt;
}
private AnomalyResponse CallLlm(string prompt)
{
try
{
var chatService = _kernel.GetRequiredService<IChatCompletionService>();
var chatHistory = new ChatHistory();
chatHistory.AddUserMessage(prompt);
var result = chatService.GetChatMessageContentAsync(
chatHistory,
new OpenAIPromptExecutionSettings
{
Temperature = _config.GetValue<double?>("AnomalyDetection:Temperature") ?? 0.1,
MaxTokens = _config.GetValue<int?>("AnomalyDetection:MaxTokens") ?? 4000
}).GetAwaiter().GetResult();
var jsonResponse = result.Content?.Trim() ?? "{}";
// Try to parse JSON response
try
{
return JsonSerializer.Deserialize<AnomalyResponse>(jsonResponse)
?? throw new InvalidOperationException("LLM returned null response");
}
catch (JsonException)
{
_logger.LogWarning("Failed to parse LLM response as JSON. Raw response: {Response}", jsonResponse);
throw new InvalidOperationException($"LLM did not return valid JSON. Response: {jsonResponse}");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to call LLM for anomaly detection");
throw;
}
}
private Layer CreateValidationLayer(Layer validationWorker, Layer importLayer, AnomalyResponse response, TimeSpan processingTime)
{
var layerNumber = _db.Layers.Count() + 1;
var timestamp = DateTime.UtcNow.ToString("yyyyMMddHHmmss");
var validationLayer = new Layer
{
Id = Guid.NewGuid(),
Type = LayerType.Validation,
ParentId = importLayer.Id, // Links to the import that was validated
Number = layerNumber,
Name = $"L{layerNumber}-V-{timestamp}",
CreatedById = User.AutoImportUserId,
ModifiedById = User.AutoImportUserId,
CreatedAt = DateTime.UtcNow,
ModifiedAt = DateTime.UtcNow
};
_logger.LogDebug("{ValidatorType}: Created validation layer {LayerName}",
ValidatorType, validationLayer.Name);
return validationLayer;
}
private void SaveValidationLayer(Layer validationLayer, AnomalyResponse response)
{
// Add the validation layer
_db.Layers.Add(validationLayer);
var records = new List<Record>();
// Add metadata records
records.Add(CreateRecord(validationLayer.Id, "ValidatedAt", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss")));
records.Add(CreateRecord(validationLayer.Id, "OverallStatus", response.OverallStatus));
records.Add(CreateRecord(validationLayer.Id, "RecordsChecked", value1: response.RecordAnomalies?.Count ?? 0));
records.Add(CreateRecord(validationLayer.Id, "AnomaliesDetected", value1: response.RecordAnomalies?.Count ?? 0));
records.Add(CreateRecord(validationLayer.Id, "StructuralIssuesDetected", value1: response.StructuralIssues?.Count ?? 0));
records.Add(CreateRecord(validationLayer.Id, "LlmProvider", _provider));
records.Add(CreateRecord(validationLayer.Id, "LlmModel", _model));
records.Add(CreateRecord(validationLayer.Id, "Summary", response.Summary));
// Add individual anomaly records
if (response.RecordAnomalies != null)
{
foreach (var anomaly in response.RecordAnomalies)
{
records.Add(CreateRecord(
validationLayer.Id,
$"ANOMALY_{anomaly.Code}",
$"[{anomaly.Severity}] {anomaly.Reason}. Recommendation: {anomaly.Recommendation}",
anomaly.Confidence
));
}
}
// Add structural issue records
if (response.StructuralIssues != null)
{
foreach (var issue in response.StructuralIssues)
{
var codes = issue.Codes != null ? string.Join(", ", issue.Codes) : "";
records.Add(CreateRecord(
validationLayer.Id,
$"STRUCTURAL_{issue.IssueType?.ToUpper()}",
$"[{issue.Severity}] {issue.Description}. Codes: {codes}"
));
}
}
// Store full LLM response as JSON (for debugging)
records.Add(CreateRecord(validationLayer.Id, "LLM_RESPONSE_JSON", JsonSerializer.Serialize(response)));
// Add all records to database
_db.Records.AddRange(records);
_db.SaveChanges();
_logger.LogDebug("{ValidatorType}: Saved {RecordCount} records for validation layer {LayerId}",
ValidatorType, records.Count, validationLayer.Id);
}
private Record CreateRecord(Guid layerId, string code, string? desc1 = null, double? value1 = null)
{
return new Record
{
Id = Guid.NewGuid(),
LayerId = layerId,
Code = code,
Desc1 = desc1,
Value1 = value1,
CreatedById = User.AutoImportUserId,
ModifiedById = User.AutoImportUserId,
CreatedAt = DateTime.UtcNow,
ModifiedAt = DateTime.UtcNow
};
}
}
// Response models for LLM
public class AnomalyResponse
{
public string OverallStatus { get; set; } = "pass";
public List<RecordAnomaly>? RecordAnomalies { get; set; }
public List<StructuralIssue>? StructuralIssues { get; set; }
public string Summary { get; set; } = "";
}
public class RecordAnomaly
{
public string Code { get; set; } = "";
public double? Value1 { get; set; }
public double Confidence { get; set; }
public string Severity { get; set; } = "low";
public string Reason { get; set; } = "";
public string Recommendation { get; set; } = "";
}
public class StructuralIssue
{
public string? IssueType { get; set; }
public string Description { get; set; } = "";
public List<string>? Codes { get; set; }
public string Severity { get; set; } = "low";
}

View File

@@ -58,14 +58,10 @@ public class MorskaD3Importer : BaseDataImporter
// Deserialize data early - right after LoadConfiguration
DeserializeDataInboxData();
if (!ShouldPerformImport(importWorker))
{
_logger.LogInformation("{ImporterType}: Import not needed for {ImportWorkerName}",
ImporterType, importWorker.Name);
// but export to Google Sheets still
// Export to Google Sheets after successful import
ExportToGoogleSheets();
return;
}

View File

@@ -234,6 +234,89 @@
}
</MudTabPanel>
@if (layer?.Type == LayerType.Validation)
{
<MudTabPanel Text="Validation Results" Icon="@Icons.Material.Filled.CheckCircle">
@if (records.Any())
{
var overallStatus = records.FirstOrDefault(r => r.Code == "OverallStatus")?.Desc1;
var confidence = records.FirstOrDefault(r => r.Code == "Confidence")?.Desc1;
var summary = records.FirstOrDefault(r => r.Code == "Summary")?.Desc1;
<MudAlert Severity="@GetValidationSeverity(overallStatus)" Dense="false" Class="mb-4">
<strong>Overall Status:</strong> @(overallStatus?.ToUpper() ?? "UNKNOWN")
@if (!string.IsNullOrEmpty(confidence))
{
<span style="margin-left: 16px;"><strong>Confidence:</strong> @confidence</span>
}
</MudAlert>
@if (!string.IsNullOrEmpty(summary))
{
<MudPaper Class="pa-4 mb-4" Outlined="true">
<MudText Typo="Typo.h6" Class="mb-2">Summary</MudText>
<MudText Typo="Typo.body2">@summary</MudText>
</MudPaper>
}
@if (records.Where(r => r.Code.StartsWith("ANOMALY_")).Any())
{
<MudText Typo="Typo.h6" Class="mb-3">Record Anomalies</MudText>
<MudTable Items="@records.Where(r => r.Code.StartsWith("ANOMALY_")).ToList()"
Dense="true"
Striped="true"
FixedHeader="true"
Elevation="0"
Class="mb-4">
<HeaderContent>
<MudTh>Code</MudTh>
<MudTh>Details</MudTh>
</HeaderContent>
<RowTemplate>
<MudTd DataLabel="Code">
<MudChip T="string" Size="Size.Small" Color="Color.Warning">@context.Code.Replace("ANOMALY_", "")</MudChip>
</MudTd>
<MudTd DataLabel="Details">@context.Desc1</MudTd>
</RowTemplate>
</MudTable>
}
@if (records.Where(r => r.Code.StartsWith("STRUCTURAL_")).Any())
{
<MudText Typo="Typo.h6" Class="mb-3">Structural Issues</MudText>
<MudTable Items="@records.Where(r => r.Code.StartsWith("STRUCTURAL_")).ToList()"
Dense="true"
Striped="true"
FixedHeader="true"
Elevation="0"
Class="mb-4">
<HeaderContent>
<MudTh>Type</MudTh>
<MudTh>Details</MudTh>
</HeaderContent>
<RowTemplate>
<MudTd DataLabel="Type">
<MudChip T="string" Size="Size.Small" Color="Color.Error">@context.Code.Replace("STRUCTURAL_", "")</MudChip>
</MudTd>
<MudTd DataLabel="Details">@context.Desc1</MudTd>
</RowTemplate>
</MudTable>
}
@if (!records.Where(r => r.Code.StartsWith("ANOMALY_")).Any() && !records.Where(r => r.Code.StartsWith("STRUCTURAL_")).Any())
{
<MudAlert Severity="Severity.Success" Dense="true">
No anomalies or structural issues detected. All data appears normal.
</MudAlert>
}
}
else
{
<MudAlert Severity="Severity.Info">No validation results available.</MudAlert>
}
</MudTabPanel>
}
@if (showHistoryTab)
{
<MudTabPanel Text="History" Icon="@Icons.Material.Filled.History">

View File

@@ -503,7 +503,7 @@ public partial class Details : ComponentBase, IDisposable
if (layer?.Records == null) return false;
var typeRecord = layer.Records.FirstOrDefault(x => x.Code == "Type");
return typeRecord?.Desc1 == "ImportWorker" || typeRecord?.Desc1 == "ProcessWorker";
return typeRecord?.Desc1 == "ImportWorker" || typeRecord?.Desc1 == "ProcessWorker" || typeRecord?.Desc1 == "ValidationWorker";
}
private async Task RunNow()
@@ -545,6 +545,18 @@ public partial class Details : ComponentBase, IDisposable
}
}
// Validation tab helper methods
private Severity GetValidationSeverity(string? status)
{
return status?.ToLower() switch
{
"pass" => Severity.Success,
"warning" => Severity.Warning,
"critical" => Severity.Error,
_ => Severity.Info
};
}
public void Dispose()
{
HubService.EntityChanged -= OnEntityChanged;