Files
DiunaBI/DiunaBI.Infrastructure/Services/JobSchedulerService.cs

215 lines
8.5 KiB
C#

using DiunaBI.Domain.Entities;
using DiunaBI.Infrastructure.Data;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
namespace DiunaBI.Infrastructure.Services;
public class JobSchedulerService
{
private readonly AppDbContext _db;
private readonly ILogger<JobSchedulerService> _logger;
public JobSchedulerService(AppDbContext db, ILogger<JobSchedulerService> logger)
{
_db = db;
_logger = logger;
}
public async Task<int> ScheduleImportJobsAsync(string? nameFilter = null)
{
_logger.LogInformation("JobScheduler: Starting import job scheduling with filter: {NameFilter}", nameFilter ?? "none");
var query = _db.Layers
.Include(x => x.Records)
.Where(x =>
x.Records!.Any(r => r.Code == "Type" && r.Desc1 == "ImportWorker") &&
x.Records!.Any(r => r.Code == "IsEnabled" && r.Desc1 == "True")
);
if (!string.IsNullOrEmpty(nameFilter))
{
query = query.Where(x => x.Name != null && x.Name.Contains(nameFilter));
}
var importWorkers = await query
.OrderBy(x => x.CreatedAt)
.AsNoTracking()
.ToListAsync();
_logger.LogInformation("JobScheduler: Found {Count} import workers to schedule", importWorkers.Count);
var jobsCreated = 0;
foreach (var worker in importWorkers)
{
try
{
var plugin = worker.Records?.FirstOrDefault(r => r.Code == "Plugin")?.Desc1;
if (string.IsNullOrEmpty(plugin))
{
_logger.LogWarning("JobScheduler: Import worker {LayerName} ({LayerId}) has no Plugin configured, skipping",
worker.Name, worker.Id);
continue;
}
// Get priority from config (default: 50)
var priorityStr = worker.Records?.FirstOrDefault(r => r.Code == "Priority")?.Desc1;
var priority = int.TryParse(priorityStr, out var p) ? p : 50;
// Get max retries from config (default: 3)
var maxRetriesStr = worker.Records?.FirstOrDefault(r => r.Code == "MaxRetries")?.Desc1;
var maxRetries = int.TryParse(maxRetriesStr, out var mr) ? mr : 3;
// Check if there's already a pending/running job for this layer
var existingJob = await _db.QueueJobs
.Where(j => j.LayerId == worker.Id &&
(j.Status == JobStatus.Pending || j.Status == JobStatus.Running))
.FirstOrDefaultAsync();
if (existingJob != null)
{
_logger.LogDebug("JobScheduler: Job already exists for {LayerName} ({LayerId}), status: {Status}",
worker.Name, worker.Id, existingJob.Status);
continue;
}
var job = new QueueJob
{
Id = Guid.NewGuid(),
LayerId = worker.Id,
LayerName = worker.Name ?? "Unknown",
PluginName = plugin,
JobType = JobType.Import,
Priority = priority,
MaxRetries = maxRetries,
Status = JobStatus.Pending,
CreatedAt = DateTime.UtcNow,
ModifiedAt = DateTime.UtcNow,
CreatedById = DiunaBI.Domain.Entities.User.AutoImportUserId,
ModifiedById = DiunaBI.Domain.Entities.User.AutoImportUserId
};
_db.QueueJobs.Add(job);
jobsCreated++;
_logger.LogInformation("JobScheduler: Created import job for {LayerName} ({LayerId}) with priority {Priority}",
worker.Name, worker.Id, priority);
}
catch (Exception ex)
{
_logger.LogError(ex, "JobScheduler: Failed to create job for {LayerName} ({LayerId})",
worker.Name, worker.Id);
}
}
if (jobsCreated > 0)
{
await _db.SaveChangesAsync();
_logger.LogInformation("JobScheduler: Successfully created {Count} import jobs", jobsCreated);
}
return jobsCreated;
}
public async Task<int> ScheduleProcessJobsAsync()
{
_logger.LogInformation("JobScheduler: Starting process job scheduling");
var processWorkers = await _db.Layers
.Include(x => x.Records)
.Where(x =>
x.Records!.Any(r => r.Code == "Type" && r.Desc1 == "ProcessWorker") &&
x.Records!.Any(r => r.Code == "IsEnabled" && r.Desc1 == "True")
)
.OrderBy(x => x.CreatedAt)
.AsNoTracking()
.ToListAsync();
_logger.LogInformation("JobScheduler: Found {Count} process workers to schedule", processWorkers.Count);
var jobsCreated = 0;
foreach (var worker in processWorkers)
{
try
{
var plugin = worker.Records?.FirstOrDefault(r => r.Code == "Plugin")?.Desc1;
if (string.IsNullOrEmpty(plugin))
{
_logger.LogWarning("JobScheduler: Process worker {LayerName} ({LayerId}) has no Plugin configured, skipping",
worker.Name, worker.Id);
continue;
}
// Get priority from config (default: 100 for processes - higher than imports)
var priorityStr = worker.Records?.FirstOrDefault(r => r.Code == "Priority")?.Desc1;
var priority = int.TryParse(priorityStr, out var p) ? p : 100;
// Get max retries from config (default: 3)
var maxRetriesStr = worker.Records?.FirstOrDefault(r => r.Code == "MaxRetries")?.Desc1;
var maxRetries = int.TryParse(maxRetriesStr, out var mr) ? mr : 3;
// Check if there's already a pending/running job for this layer
var existingJob = await _db.QueueJobs
.Where(j => j.LayerId == worker.Id &&
(j.Status == JobStatus.Pending || j.Status == JobStatus.Running))
.FirstOrDefaultAsync();
if (existingJob != null)
{
_logger.LogDebug("JobScheduler: Job already exists for {LayerName} ({LayerId}), status: {Status}",
worker.Name, worker.Id, existingJob.Status);
continue;
}
var job = new QueueJob
{
Id = Guid.NewGuid(),
LayerId = worker.Id,
LayerName = worker.Name ?? "Unknown",
PluginName = plugin,
JobType = JobType.Process,
Priority = priority,
MaxRetries = maxRetries,
Status = JobStatus.Pending,
CreatedAt = DateTime.UtcNow,
ModifiedAt = DateTime.UtcNow,
CreatedById = DiunaBI.Domain.Entities.User.AutoImportUserId,
ModifiedById = DiunaBI.Domain.Entities.User.AutoImportUserId
};
_db.QueueJobs.Add(job);
jobsCreated++;
_logger.LogInformation("JobScheduler: Created process job for {LayerName} ({LayerId}) with priority {Priority}",
worker.Name, worker.Id, priority);
}
catch (Exception ex)
{
_logger.LogError(ex, "JobScheduler: Failed to create job for {LayerName} ({LayerId})",
worker.Name, worker.Id);
}
}
if (jobsCreated > 0)
{
await _db.SaveChangesAsync();
_logger.LogInformation("JobScheduler: Successfully created {Count} process jobs", jobsCreated);
}
return jobsCreated;
}
public async Task<int> ScheduleAllJobsAsync(string? nameFilter = null)
{
var importCount = await ScheduleImportJobsAsync(nameFilter);
var processCount = await ScheduleProcessJobsAsync();
_logger.LogInformation("JobScheduler: Scheduled {ImportCount} import jobs and {ProcessCount} process jobs",
importCount, processCount);
return importCount + processCount;
}
}