WIP: queue engine
This commit is contained in:
160
DiunaBI.Infrastructure/Services/JobWorkerService.cs
Normal file
160
DiunaBI.Infrastructure/Services/JobWorkerService.cs
Normal file
@@ -0,0 +1,160 @@
|
||||
using DiunaBI.Domain.Entities;
|
||||
using DiunaBI.Infrastructure.Data;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace DiunaBI.Infrastructure.Services;
|
||||
|
||||
public class JobWorkerService : BackgroundService
|
||||
{
|
||||
private readonly IServiceProvider _serviceProvider;
|
||||
private readonly ILogger<JobWorkerService> _logger;
|
||||
private readonly TimeSpan _pollInterval = TimeSpan.FromSeconds(10);
|
||||
private readonly TimeSpan _rateLimitDelay = TimeSpan.FromSeconds(5);
|
||||
|
||||
public JobWorkerService(IServiceProvider serviceProvider, ILogger<JobWorkerService> logger)
|
||||
{
|
||||
_serviceProvider = serviceProvider;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation("JobWorker: Service started");
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await ProcessNextJobAsync(stoppingToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "JobWorker: Unexpected error in main loop");
|
||||
}
|
||||
|
||||
await Task.Delay(_pollInterval, stoppingToken);
|
||||
}
|
||||
|
||||
_logger.LogInformation("JobWorker: Service stopped");
|
||||
}
|
||||
|
||||
private async Task ProcessNextJobAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
|
||||
var pluginManager = scope.ServiceProvider.GetRequiredService<PluginManager>();
|
||||
|
||||
// Get next pending job (ordered by priority, then creation time)
|
||||
var job = await db.QueueJobs
|
||||
.Where(j => j.Status == JobStatus.Pending || j.Status == JobStatus.Retrying)
|
||||
.OrderBy(j => j.Priority)
|
||||
.ThenBy(j => j.CreatedAt)
|
||||
.FirstOrDefaultAsync(stoppingToken);
|
||||
|
||||
if (job == null)
|
||||
{
|
||||
// No jobs to process
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogInformation("JobWorker: Processing job {JobId} - {LayerName} ({JobType})",
|
||||
job.Id, job.LayerName, job.JobType);
|
||||
|
||||
// Mark job as running
|
||||
job.Status = JobStatus.Running;
|
||||
job.LastAttemptAt = DateTime.UtcNow;
|
||||
job.ModifiedAtUtc = DateTime.UtcNow;
|
||||
await db.SaveChangesAsync(stoppingToken);
|
||||
|
||||
try
|
||||
{
|
||||
// Load the layer with its configuration
|
||||
var layer = await db.Layers
|
||||
.Include(l => l.Records)
|
||||
.AsNoTracking()
|
||||
.FirstOrDefaultAsync(l => l.Id == job.LayerId, stoppingToken);
|
||||
|
||||
if (layer == null)
|
||||
{
|
||||
throw new Exception($"Layer {job.LayerId} not found");
|
||||
}
|
||||
|
||||
// Execute the job based on type
|
||||
if (job.JobType == JobType.Import)
|
||||
{
|
||||
var importer = pluginManager.GetImporter(job.PluginName);
|
||||
if (importer == null)
|
||||
{
|
||||
throw new Exception($"Importer '{job.PluginName}' not found");
|
||||
}
|
||||
|
||||
_logger.LogInformation("JobWorker: Executing import for {LayerName} using {PluginName}",
|
||||
job.LayerName, job.PluginName);
|
||||
|
||||
importer.Import(layer);
|
||||
}
|
||||
else if (job.JobType == JobType.Process)
|
||||
{
|
||||
var processor = pluginManager.GetProcessor(job.PluginName);
|
||||
if (processor == null)
|
||||
{
|
||||
throw new Exception($"Processor '{job.PluginName}' not found");
|
||||
}
|
||||
|
||||
_logger.LogInformation("JobWorker: Executing process for {LayerName} using {PluginName}",
|
||||
job.LayerName, job.PluginName);
|
||||
|
||||
processor.Process(layer);
|
||||
}
|
||||
|
||||
// Job completed successfully
|
||||
job.Status = JobStatus.Completed;
|
||||
job.CompletedAt = DateTime.UtcNow;
|
||||
job.LastError = null;
|
||||
job.ModifiedAtUtc = DateTime.UtcNow;
|
||||
|
||||
_logger.LogInformation("JobWorker: Job {JobId} completed successfully", job.Id);
|
||||
|
||||
// Rate limiting delay (for Google Sheets API quota)
|
||||
if (job.JobType == JobType.Import)
|
||||
{
|
||||
_logger.LogDebug("JobWorker: Applying rate limit delay of {Delay} seconds", _rateLimitDelay.TotalSeconds);
|
||||
await Task.Delay(_rateLimitDelay, stoppingToken);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "JobWorker: Job {JobId} failed - {LayerName}", job.Id, job.LayerName);
|
||||
|
||||
job.RetryCount++;
|
||||
job.LastError = ex.Message;
|
||||
job.ModifiedAtUtc = DateTime.UtcNow;
|
||||
|
||||
if (job.RetryCount >= job.MaxRetries)
|
||||
{
|
||||
job.Status = JobStatus.Failed;
|
||||
_logger.LogWarning("JobWorker: Job {JobId} marked as Failed after {RetryCount} attempts",
|
||||
job.Id, job.RetryCount);
|
||||
}
|
||||
else
|
||||
{
|
||||
job.Status = JobStatus.Retrying;
|
||||
_logger.LogInformation("JobWorker: Job {JobId} will retry (attempt {RetryCount}/{MaxRetries})",
|
||||
job.Id, job.RetryCount, job.MaxRetries);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
await db.SaveChangesAsync(stoppingToken);
|
||||
}
|
||||
}
|
||||
|
||||
public override async Task StopAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation("JobWorker: Stopping service...");
|
||||
await base.StopAsync(stoppingToken);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user