203 lines
7.5 KiB
C#
203 lines
7.5 KiB
C#
using DiunaBI.Domain.Entities;
|
|
using DiunaBI.Infrastructure.Data;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Hosting;
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
namespace DiunaBI.Infrastructure.Services;
|
|
|
|
public class JobWorkerService : BackgroundService
|
|
{
|
|
private readonly IServiceProvider _serviceProvider;
|
|
private readonly ILogger<JobWorkerService> _logger;
|
|
private readonly TimeSpan _pollInterval = TimeSpan.FromSeconds(10);
|
|
private readonly TimeSpan _rateLimitDelay = TimeSpan.FromSeconds(5);
|
|
|
|
public JobWorkerService(IServiceProvider serviceProvider, ILogger<JobWorkerService> logger)
|
|
{
|
|
_serviceProvider = serviceProvider;
|
|
_logger = logger;
|
|
}
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
_logger.LogInformation("JobWorker: Service started");
|
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
await ProcessNextJobAsync(stoppingToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "JobWorker: Unexpected error in main loop");
|
|
}
|
|
|
|
await Task.Delay(_pollInterval, stoppingToken);
|
|
}
|
|
|
|
_logger.LogInformation("JobWorker: Service stopped");
|
|
}
|
|
|
|
private async Task ProcessNextJobAsync(CancellationToken stoppingToken)
|
|
{
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
|
|
var pluginManager = scope.ServiceProvider.GetRequiredService<PluginManager>();
|
|
|
|
// Get next pending job (ordered by priority, then creation time)
|
|
var job = await db.QueueJobs
|
|
.Where(j => j.Status == JobStatus.Pending || j.Status == JobStatus.Retrying)
|
|
.OrderBy(j => j.Priority)
|
|
.ThenBy(j => j.CreatedAt)
|
|
.FirstOrDefaultAsync(stoppingToken);
|
|
|
|
if (job == null)
|
|
{
|
|
// No jobs to process
|
|
return;
|
|
}
|
|
|
|
_logger.LogInformation("JobWorker: Processing job {JobId} - {LayerName} ({JobType}) - Current RetryCount: {RetryCount}, MaxRetries: {MaxRetries}, Status: {Status}",
|
|
job.Id, job.LayerName, job.JobType, job.RetryCount, job.MaxRetries, job.Status);
|
|
|
|
// Mark job as running
|
|
job.Status = JobStatus.Running;
|
|
job.LastAttemptAt = DateTime.UtcNow;
|
|
job.ModifiedAt = DateTime.UtcNow;
|
|
job.ModifiedById = DiunaBI.Domain.Entities.User.AutoImportUserId;
|
|
await db.SaveChangesAsync(stoppingToken);
|
|
|
|
try
|
|
{
|
|
// Load the layer with its configuration
|
|
var layer = await db.Layers
|
|
.Include(l => l.Records)
|
|
.AsNoTracking()
|
|
.FirstOrDefaultAsync(l => l.Id == job.LayerId, stoppingToken);
|
|
|
|
if (layer == null)
|
|
{
|
|
throw new Exception($"Layer {job.LayerId} not found");
|
|
}
|
|
|
|
// Execute the job based on type
|
|
if (job.JobType == JobType.Import)
|
|
{
|
|
var importer = pluginManager.GetImporter(job.PluginName);
|
|
if (importer == null)
|
|
{
|
|
throw new Exception($"Importer '{job.PluginName}' not found");
|
|
}
|
|
|
|
_logger.LogInformation("JobWorker: Executing import for {LayerName} using {PluginName}",
|
|
job.LayerName, job.PluginName);
|
|
|
|
importer.Import(layer);
|
|
}
|
|
else if (job.JobType == JobType.Process)
|
|
{
|
|
var processor = pluginManager.GetProcessor(job.PluginName);
|
|
if (processor == null)
|
|
{
|
|
throw new Exception($"Processor '{job.PluginName}' not found");
|
|
}
|
|
|
|
_logger.LogInformation("JobWorker: Executing process for {LayerName} using {PluginName}",
|
|
job.LayerName, job.PluginName);
|
|
|
|
processor.Process(layer);
|
|
}
|
|
|
|
// Job completed successfully
|
|
job.Status = JobStatus.Completed;
|
|
job.CompletedAt = DateTime.UtcNow;
|
|
job.LastError = null;
|
|
job.ModifiedAt = DateTime.UtcNow;
|
|
job.ModifiedById = DiunaBI.Domain.Entities.User.AutoImportUserId;
|
|
|
|
_logger.LogInformation("JobWorker: Job {JobId} completed successfully", job.Id);
|
|
|
|
// Rate limiting delay (for Google Sheets API quota)
|
|
if (job.JobType == JobType.Import)
|
|
{
|
|
_logger.LogDebug("JobWorker: Applying rate limit delay of {Delay} seconds", _rateLimitDelay.TotalSeconds);
|
|
await Task.Delay(_rateLimitDelay, stoppingToken);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "JobWorker: Job {JobId} failed - {LayerName}", job.Id, job.LayerName);
|
|
|
|
// Capture full error details including inner exceptions
|
|
job.LastError = GetFullErrorMessage(ex);
|
|
job.ModifiedAt = DateTime.UtcNow;
|
|
job.ModifiedById = DiunaBI.Domain.Entities.User.AutoImportUserId;
|
|
|
|
if (job.RetryCount >= job.MaxRetries)
|
|
{
|
|
job.Status = JobStatus.Failed;
|
|
_logger.LogWarning("JobWorker: Job {JobId} marked as Failed - no more retries available (RetryCount: {RetryCount}, MaxRetries: {MaxRetries})",
|
|
job.Id, job.RetryCount, job.MaxRetries);
|
|
}
|
|
else
|
|
{
|
|
job.Status = JobStatus.Pending;
|
|
|
|
// Exponential backoff: wait before retrying
|
|
var backoffDelay = GetBackoffDelay(job.RetryCount + 1);
|
|
|
|
_logger.LogInformation("JobWorker: Job {JobId} will retry in {Delay} (retry {RetryNumber} of {MaxRetries})",
|
|
job.Id, backoffDelay, job.RetryCount + 1, job.MaxRetries);
|
|
|
|
// Save current state with error message
|
|
await db.SaveChangesAsync(stoppingToken);
|
|
|
|
// Wait before next attempt
|
|
await Task.Delay(backoffDelay, stoppingToken);
|
|
|
|
// Increment retry count for next attempt
|
|
job.RetryCount++;
|
|
job.ModifiedAt = DateTime.UtcNow;
|
|
job.ModifiedById = DiunaBI.Domain.Entities.User.AutoImportUserId;
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
await db.SaveChangesAsync(stoppingToken);
|
|
}
|
|
}
|
|
|
|
public override async Task StopAsync(CancellationToken stoppingToken)
|
|
{
|
|
_logger.LogInformation("JobWorker: Stopping service...");
|
|
await base.StopAsync(stoppingToken);
|
|
}
|
|
|
|
private static TimeSpan GetBackoffDelay(int retryCount)
|
|
{
|
|
return retryCount switch
|
|
{
|
|
1 => TimeSpan.FromSeconds(30), // 1st retry: 30 seconds
|
|
2 => TimeSpan.FromMinutes(2), // 2nd retry: 2 minutes
|
|
_ => TimeSpan.FromMinutes(5) // 3rd+ retry: 5 minutes
|
|
};
|
|
}
|
|
|
|
private static string GetFullErrorMessage(Exception ex)
|
|
{
|
|
var messages = new List<string>();
|
|
var currentException = ex;
|
|
|
|
while (currentException != null)
|
|
{
|
|
messages.Add(currentException.Message);
|
|
currentException = currentException.InnerException;
|
|
}
|
|
|
|
return string.Join(" → ", messages);
|
|
}
|
|
}
|