Files
DiunaBI/DiunaBI.Infrastructure/Services/JobWorkerService.cs
Michał Zieliński 0e3b3933f0
Some checks failed
Build Docker Images / test (map[name:Morska plugin_project:DiunaBI.Plugins.Morska]) (push) Failing after 1m14s
Build Docker Images / test (map[name:PedrolloPL plugin_project:DiunaBI.Plugins.PedrolloPL]) (push) Failing after 1m10s
Build Docker Images / build-and-push (map[image_suffix:morska name:Morska plugin_project:DiunaBI.Plugins.Morska]) (push) Failing after 1m12s
Build Docker Images / build-and-push (map[image_suffix:pedrollopl name:PedrolloPL plugin_project:DiunaBI.Plugins.PedrolloPL]) (push) Failing after 1m7s
WIP: p2 plugin
2025-12-03 13:33:38 +01:00

179 lines
6.4 KiB
C#

using DiunaBI.Domain.Entities;
using DiunaBI.Infrastructure.Data;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace DiunaBI.Infrastructure.Services;
public class JobWorkerService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<JobWorkerService> _logger;
private readonly TimeSpan _pollInterval = TimeSpan.FromSeconds(10);
private readonly TimeSpan _rateLimitDelay = TimeSpan.FromSeconds(5);
public JobWorkerService(IServiceProvider serviceProvider, ILogger<JobWorkerService> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("JobWorker: Service started");
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessNextJobAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "JobWorker: Unexpected error in main loop");
}
await Task.Delay(_pollInterval, stoppingToken);
}
_logger.LogInformation("JobWorker: Service stopped");
}
private async Task ProcessNextJobAsync(CancellationToken stoppingToken)
{
using var scope = _serviceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var pluginManager = scope.ServiceProvider.GetRequiredService<PluginManager>();
// Get next pending job (ordered by priority, then creation time)
var job = await db.QueueJobs
.Where(j => j.Status == JobStatus.Pending || j.Status == JobStatus.Retrying)
.OrderBy(j => j.Priority)
.ThenBy(j => j.CreatedAt)
.FirstOrDefaultAsync(stoppingToken);
if (job == null)
{
// No jobs to process
return;
}
_logger.LogInformation("JobWorker: Processing job {JobId} - {LayerName} ({JobType})",
job.Id, job.LayerName, job.JobType);
// Mark job as running
job.Status = JobStatus.Running;
job.LastAttemptAt = DateTime.UtcNow;
job.ModifiedAtUtc = DateTime.UtcNow;
await db.SaveChangesAsync(stoppingToken);
try
{
// Load the layer with its configuration
var layer = await db.Layers
.Include(l => l.Records)
.AsNoTracking()
.FirstOrDefaultAsync(l => l.Id == job.LayerId, stoppingToken);
if (layer == null)
{
throw new Exception($"Layer {job.LayerId} not found");
}
// Execute the job based on type
if (job.JobType == JobType.Import)
{
var importer = pluginManager.GetImporter(job.PluginName);
if (importer == null)
{
throw new Exception($"Importer '{job.PluginName}' not found");
}
_logger.LogInformation("JobWorker: Executing import for {LayerName} using {PluginName}",
job.LayerName, job.PluginName);
importer.Import(layer);
}
else if (job.JobType == JobType.Process)
{
var processor = pluginManager.GetProcessor(job.PluginName);
if (processor == null)
{
throw new Exception($"Processor '{job.PluginName}' not found");
}
_logger.LogInformation("JobWorker: Executing process for {LayerName} using {PluginName}",
job.LayerName, job.PluginName);
processor.Process(layer);
}
// Job completed successfully
job.Status = JobStatus.Completed;
job.CompletedAt = DateTime.UtcNow;
job.LastError = null;
job.ModifiedAtUtc = DateTime.UtcNow;
_logger.LogInformation("JobWorker: Job {JobId} completed successfully", job.Id);
// Rate limiting delay (for Google Sheets API quota)
if (job.JobType == JobType.Import)
{
_logger.LogDebug("JobWorker: Applying rate limit delay of {Delay} seconds", _rateLimitDelay.TotalSeconds);
await Task.Delay(_rateLimitDelay, stoppingToken);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "JobWorker: Job {JobId} failed - {LayerName}", job.Id, job.LayerName);
job.RetryCount++;
job.LastError = ex.Message;
job.ModifiedAtUtc = DateTime.UtcNow;
if (job.RetryCount >= job.MaxRetries)
{
job.Status = JobStatus.Failed;
_logger.LogWarning("JobWorker: Job {JobId} marked as Failed after {RetryCount} attempts",
job.Id, job.RetryCount);
}
else
{
job.Status = JobStatus.Retrying;
// Exponential backoff: wait before retrying based on attempt number
var backoffDelay = GetBackoffDelay(job.RetryCount);
_logger.LogInformation("JobWorker: Job {JobId} will retry in {Delay} (attempt {RetryCount}/{MaxRetries})",
job.Id, backoffDelay, job.RetryCount, job.MaxRetries);
// Wait before marking as pending again
await Task.Delay(backoffDelay, stoppingToken);
job.Status = JobStatus.Pending;
}
}
finally
{
await db.SaveChangesAsync(stoppingToken);
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("JobWorker: Stopping service...");
await base.StopAsync(stoppingToken);
}
private static TimeSpan GetBackoffDelay(int retryCount)
{
return retryCount switch
{
1 => TimeSpan.FromSeconds(30), // 1st retry: 30 seconds
2 => TimeSpan.FromMinutes(2), // 2nd retry: 2 minutes
_ => TimeSpan.FromMinutes(5) // 3rd+ retry: 5 minutes
};
}
}