using DiunaBI.Domain.Entities; using DiunaBI.Infrastructure.Data; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; namespace DiunaBI.Infrastructure.Services; public class JobSchedulerService { private readonly AppDbContext _db; private readonly ILogger _logger; public JobSchedulerService(AppDbContext db, ILogger logger) { _db = db; _logger = logger; } public async Task ScheduleImportJobsAsync(string? nameFilter = null) { _logger.LogInformation("JobScheduler: Starting import job scheduling with filter: {NameFilter}", nameFilter ?? "none"); var query = _db.Layers .Include(x => x.Records) .Where(x => x.Records!.Any(r => r.Code == "Type" && r.Desc1 == "ImportWorker") && x.Records!.Any(r => r.Code == "IsEnabled" && r.Desc1 == "True") ); if (!string.IsNullOrEmpty(nameFilter)) { query = query.Where(x => x.Name != null && x.Name.Contains(nameFilter)); } var importWorkers = await query .OrderBy(x => x.CreatedAt) .AsNoTracking() .ToListAsync(); _logger.LogInformation("JobScheduler: Found {Count} import workers to schedule", importWorkers.Count); var jobsCreated = 0; foreach (var worker in importWorkers) { try { var plugin = worker.Records?.FirstOrDefault(r => r.Code == "Plugin")?.Desc1; if (string.IsNullOrEmpty(plugin)) { _logger.LogWarning("JobScheduler: Import worker {LayerName} ({LayerId}) has no Plugin configured, skipping", worker.Name, worker.Id); continue; } // Get priority from config (default: 50) var priorityStr = worker.Records?.FirstOrDefault(r => r.Code == "Priority")?.Desc1; var priority = int.TryParse(priorityStr, out var p) ? p : 50; // Get max retries from config (default: 3) var maxRetriesStr = worker.Records?.FirstOrDefault(r => r.Code == "MaxRetries")?.Desc1; var maxRetries = int.TryParse(maxRetriesStr, out var mr) ? mr : 3; // Check if there's already a pending/running job for this layer var existingJob = await _db.QueueJobs .Where(j => j.LayerId == worker.Id && (j.Status == JobStatus.Pending || j.Status == JobStatus.Running)) .FirstOrDefaultAsync(); if (existingJob != null) { _logger.LogDebug("JobScheduler: Job already exists for {LayerName} ({LayerId}), status: {Status}", worker.Name, worker.Id, existingJob.Status); continue; } var job = new QueueJob { Id = Guid.NewGuid(), LayerId = worker.Id, LayerName = worker.Name ?? "Unknown", PluginName = plugin, JobType = JobType.Import, Priority = priority, MaxRetries = maxRetries, Status = JobStatus.Pending, CreatedAt = DateTime.UtcNow, CreatedAtUtc = DateTime.UtcNow, ModifiedAtUtc = DateTime.UtcNow, CreatedById = Guid.Empty, // System user ModifiedById = Guid.Empty }; _db.QueueJobs.Add(job); jobsCreated++; _logger.LogInformation("JobScheduler: Created import job for {LayerName} ({LayerId}) with priority {Priority}", worker.Name, worker.Id, priority); } catch (Exception ex) { _logger.LogError(ex, "JobScheduler: Failed to create job for {LayerName} ({LayerId})", worker.Name, worker.Id); } } if (jobsCreated > 0) { await _db.SaveChangesAsync(); _logger.LogInformation("JobScheduler: Successfully created {Count} import jobs", jobsCreated); } return jobsCreated; } public async Task ScheduleProcessJobsAsync() { _logger.LogInformation("JobScheduler: Starting process job scheduling"); var processWorkers = await _db.Layers .Include(x => x.Records) .Where(x => x.Records!.Any(r => r.Code == "Type" && r.Desc1 == "ProcessWorker") && x.Records!.Any(r => r.Code == "IsEnabled" && r.Desc1 == "True") ) .OrderBy(x => x.CreatedAt) .AsNoTracking() .ToListAsync(); _logger.LogInformation("JobScheduler: Found {Count} process workers to schedule", processWorkers.Count); var jobsCreated = 0; foreach (var worker in processWorkers) { try { var plugin = worker.Records?.FirstOrDefault(r => r.Code == "Plugin")?.Desc1; if (string.IsNullOrEmpty(plugin)) { _logger.LogWarning("JobScheduler: Process worker {LayerName} ({LayerId}) has no Plugin configured, skipping", worker.Name, worker.Id); continue; } // Get priority from config (default: 100 for processes - higher than imports) var priorityStr = worker.Records?.FirstOrDefault(r => r.Code == "Priority")?.Desc1; var priority = int.TryParse(priorityStr, out var p) ? p : 100; // Get max retries from config (default: 3) var maxRetriesStr = worker.Records?.FirstOrDefault(r => r.Code == "MaxRetries")?.Desc1; var maxRetries = int.TryParse(maxRetriesStr, out var mr) ? mr : 3; // Check if there's already a pending/running job for this layer var existingJob = await _db.QueueJobs .Where(j => j.LayerId == worker.Id && (j.Status == JobStatus.Pending || j.Status == JobStatus.Running)) .FirstOrDefaultAsync(); if (existingJob != null) { _logger.LogDebug("JobScheduler: Job already exists for {LayerName} ({LayerId}), status: {Status}", worker.Name, worker.Id, existingJob.Status); continue; } var job = new QueueJob { Id = Guid.NewGuid(), LayerId = worker.Id, LayerName = worker.Name ?? "Unknown", PluginName = plugin, JobType = JobType.Process, Priority = priority, MaxRetries = maxRetries, Status = JobStatus.Pending, CreatedAt = DateTime.UtcNow, CreatedAtUtc = DateTime.UtcNow, ModifiedAtUtc = DateTime.UtcNow, CreatedById = Guid.Empty, ModifiedById = Guid.Empty }; _db.QueueJobs.Add(job); jobsCreated++; _logger.LogInformation("JobScheduler: Created process job for {LayerName} ({LayerId}) with priority {Priority}", worker.Name, worker.Id, priority); } catch (Exception ex) { _logger.LogError(ex, "JobScheduler: Failed to create job for {LayerName} ({LayerId})", worker.Name, worker.Id); } } if (jobsCreated > 0) { await _db.SaveChangesAsync(); _logger.LogInformation("JobScheduler: Successfully created {Count} process jobs", jobsCreated); } return jobsCreated; } public async Task ScheduleAllJobsAsync(string? nameFilter = null) { var importCount = await ScheduleImportJobsAsync(nameFilter); var processCount = await ScheduleProcessJobsAsync(); _logger.LogInformation("JobScheduler: Scheduled {ImportCount} import jobs and {ProcessCount} process jobs", importCount, processCount); return importCount + processCount; } }