WIP: AI Validator

This commit is contained in:
2025-12-15 20:05:26 +01:00
parent 096ff5573e
commit f10dfe629e
16 changed files with 1686 additions and 9 deletions

View File

@@ -221,14 +221,114 @@ public class JobSchedulerService
return jobsCreated;
}
public async Task<int> ScheduleValidateJobsAsync()
{
_logger.LogInformation("JobScheduler: Starting validation job scheduling");
var validationWorkers = await _db.Layers
.Include(x => x.Records)
.Where(x =>
x.Records!.Any(r => r.Code == "Type" && r.Desc1 == "ValidationWorker") &&
x.Records!.Any(r => r.Code == "IsEnabled" && r.Desc1 == "True")
)
.OrderBy(x => x.CreatedAt)
.AsNoTracking()
.ToListAsync();
_logger.LogInformation("JobScheduler: Found {Count} validation workers to schedule", validationWorkers.Count);
var jobsCreated = 0;
var scheduledLayerIds = new HashSet<Guid>(); // Track LayerIds scheduled in this batch
foreach (var worker in validationWorkers)
{
try
{
var plugin = worker.Records?.FirstOrDefault(r => r.Code == "Plugin")?.Desc1;
if (string.IsNullOrEmpty(plugin))
{
_logger.LogWarning("JobScheduler: Validation worker {LayerName} ({LayerId}) has no Plugin configured, skipping",
worker.Name, worker.Id);
continue;
}
// Get priority from config (default: 200 for validation - lower than processes)
var priorityStr = worker.Records?.FirstOrDefault(r => r.Code == "Priority")?.Desc1;
var priority = int.TryParse(priorityStr, out var p) ? p : 200;
// Get max retries from config (default: 3)
var maxRetriesStr = worker.Records?.FirstOrDefault(r => r.Code == "MaxRetries")?.Desc1;
var maxRetries = int.TryParse(maxRetriesStr, out var mr) ? mr : 3;
// Check in-memory: already scheduled in this batch?
if (scheduledLayerIds.Contains(worker.Id))
{
_logger.LogDebug("JobScheduler: Job already scheduled in this batch for {LayerName} ({LayerId})",
worker.Name, worker.Id);
continue;
}
// Check if there's already a pending/running job for this layer in database
var existingJob = await _db.QueueJobs
.Where(j => j.LayerId == worker.Id &&
(j.Status == JobStatus.Pending || j.Status == JobStatus.Running))
.FirstOrDefaultAsync();
if (existingJob != null)
{
_logger.LogDebug("JobScheduler: Job already exists for {LayerName} ({LayerId}), status: {Status}",
worker.Name, worker.Id, existingJob.Status);
continue;
}
var job = new QueueJob
{
Id = Guid.NewGuid(),
LayerId = worker.Id,
LayerName = worker.Name ?? "Unknown",
PluginName = plugin,
JobType = JobType.Validate,
Priority = priority,
MaxRetries = maxRetries,
Status = JobStatus.Pending,
CreatedAt = DateTime.UtcNow,
ModifiedAt = DateTime.UtcNow,
CreatedById = DiunaBI.Domain.Entities.User.AutoImportUserId,
ModifiedById = DiunaBI.Domain.Entities.User.AutoImportUserId
};
_db.QueueJobs.Add(job);
scheduledLayerIds.Add(worker.Id); // Track that we've scheduled this layer
jobsCreated++;
_logger.LogInformation("JobScheduler: Created validation job for {LayerName} ({LayerId}) with priority {Priority}",
worker.Name, worker.Id, priority);
}
catch (Exception ex)
{
_logger.LogError(ex, "JobScheduler: Failed to create job for {LayerName} ({LayerId})",
worker.Name, worker.Id);
}
}
if (jobsCreated > 0)
{
await _db.SaveChangesAsync();
_logger.LogInformation("JobScheduler: Successfully created {Count} validation jobs", jobsCreated);
}
return jobsCreated;
}
public async Task<int> ScheduleAllJobsAsync(string? nameFilter = null)
{
var importCount = await ScheduleImportJobsAsync(nameFilter);
var processCount = await ScheduleProcessJobsAsync();
var validateCount = await ScheduleValidateJobsAsync();
_logger.LogInformation("JobScheduler: Scheduled {ImportCount} import jobs and {ProcessCount} process jobs",
importCount, processCount);
_logger.LogInformation("JobScheduler: Scheduled {ImportCount} import jobs, {ProcessCount} process jobs, and {ValidateCount} validation jobs",
importCount, processCount, validateCount);
return importCount + processCount;
return importCount + processCount + validateCount;
}
}