Final fixes before tests
This commit is contained in:
@@ -17,6 +17,7 @@ public class JobQueueProcessor : BackgroundService
|
||||
{
|
||||
private readonly IServiceProvider _serviceProvider;
|
||||
private readonly ILogger<JobQueueProcessor> _logger;
|
||||
private readonly ManualResetEventSlim _processSignal;
|
||||
|
||||
public JobQueueProcessor(
|
||||
IServiceProvider serviceProvider,
|
||||
@@ -24,27 +25,30 @@ public class JobQueueProcessor : BackgroundService
|
||||
{
|
||||
_serviceProvider = serviceProvider;
|
||||
_logger = logger;
|
||||
_processSignal = new ManualResetEventSlim(false);
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation("JobQueueProcessor: Started");
|
||||
_logger.LogInformation("JobQueueProcessor: Started (manual trigger mode)");
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Wait for manual trigger or cancellation
|
||||
_processSignal.Wait(stoppingToken);
|
||||
_processSignal.Reset();
|
||||
|
||||
_logger.LogInformation("JobQueueProcessor: Processing triggered manually");
|
||||
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var queueService = scope.ServiceProvider.GetRequiredService<IJobQueueService>();
|
||||
|
||||
// First process all imports (they run sequentially due to Google Sheets API limits)
|
||||
await ProcessJobType(queueService, JobType.Import, maxConcurrency: 1, stoppingToken);
|
||||
// Process all jobs until queue is empty
|
||||
await ProcessAllJobs(queueService, stoppingToken);
|
||||
|
||||
// Then process processors (can run in parallel within same priority)
|
||||
await ProcessJobType(queueService, JobType.Process, maxConcurrency: 3, stoppingToken);
|
||||
|
||||
// Wait before next cycle
|
||||
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
|
||||
_logger.LogInformation("JobQueueProcessor: Manual processing completed");
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
@@ -54,57 +58,79 @@ public class JobQueueProcessor : BackgroundService
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "JobQueueProcessor: Unexpected error in queue processor");
|
||||
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
|
||||
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
|
||||
}
|
||||
}
|
||||
|
||||
_logger.LogInformation("JobQueueProcessor: Stopped");
|
||||
}
|
||||
|
||||
private async Task ProcessJobType(IJobQueueService queueService, JobType jobType, int maxConcurrency, CancellationToken cancellationToken)
|
||||
public void TriggerProcessing()
|
||||
{
|
||||
var runningJobs = await queueService.GetRunningJobsCountAsync(jobType);
|
||||
_logger.LogInformation("JobQueueProcessor: Manual trigger received");
|
||||
_processSignal.Set();
|
||||
}
|
||||
|
||||
// Don't start new jobs if we're at max concurrency
|
||||
if (runningJobs >= maxConcurrency)
|
||||
private async Task ProcessAllJobs(IJobQueueService queueService, CancellationToken cancellationToken)
|
||||
{
|
||||
var startTime = DateTime.UtcNow;
|
||||
var initialQueueSize = await queueService.GetQueueCountAsync();
|
||||
int processedJobs = 0;
|
||||
int failedJobs = 0;
|
||||
|
||||
_logger.LogInformation("JobQueueProcessor: Starting processing of {InitialQueueSize} jobs", initialQueueSize);
|
||||
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var job = await queueService.DequeueJobAsync(jobType);
|
||||
if (job == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogInformation("JobQueueProcessor: Processing {JobType} job {JobId} for layer {LayerName} (attempt {RetryCount}/{MaxRetries}, priority {Priority})",
|
||||
job.JobType, job.Id, job.LayerName, job.RetryCount + 1, job.MaxRetries, job.Priority);
|
||||
|
||||
// Process job asynchronously to allow parallel processing of processors
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
// First process all imports (they run sequentially due to Google Sheets API limits)
|
||||
var importJob = await queueService.DequeueJobAsync(JobType.Import);
|
||||
if (importJob != null)
|
||||
{
|
||||
await ProcessJobAsync(job, cancellationToken);
|
||||
await ProcessJobAsync(importJob, cancellationToken);
|
||||
if (importJob.Status == JobStatus.Completed) processedJobs++;
|
||||
else failedJobs++;
|
||||
|
||||
// Add delay between imports to respect Google Sheets API limits
|
||||
if (job.JobType == JobType.Import)
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
|
||||
}
|
||||
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
|
||||
continue;
|
||||
}
|
||||
catch (Exception ex)
|
||||
|
||||
// Then process processors (can run in parallel within same priority)
|
||||
var processJob = await queueService.DequeueJobAsync(JobType.Process);
|
||||
if (processJob != null)
|
||||
{
|
||||
_logger.LogError(ex, "JobQueueProcessor: Error in background job processing for {JobType} job {JobId}",
|
||||
job.JobType, job.Id);
|
||||
await ProcessJobAsync(processJob, cancellationToken);
|
||||
if (processJob.Status == JobStatus.Completed) processedJobs++;
|
||||
else failedJobs++;
|
||||
continue;
|
||||
}
|
||||
}, cancellationToken);
|
||||
|
||||
// No more jobs in queue
|
||||
break;
|
||||
}
|
||||
|
||||
var endTime = DateTime.UtcNow;
|
||||
var duration = endTime - startTime;
|
||||
var finalQueueSize = await queueService.GetQueueCountAsync();
|
||||
|
||||
_logger.LogInformation("JobQueueProcessor: Processing completed. Duration: {Duration:hh\\:mm\\:ss}, " +
|
||||
"Initial queue: {InitialQueueSize}, Processed: {ProcessedJobs}, Failed: {FailedJobs}, " +
|
||||
"Final queue size: {FinalQueueSize}",
|
||||
duration, initialQueueSize, processedJobs, failedJobs, finalQueueSize);
|
||||
|
||||
if (failedJobs > 0)
|
||||
{
|
||||
_logger.LogWarning("JobQueueProcessor: {FailedJobs} jobs failed during processing. Check logs for details.", failedJobs);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessJobAsync(QueueJob job, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
_logger.LogInformation("JobQueueProcessor: Processing {JobType} job {JobId} for layer {LayerName} (attempt {RetryCount}/{MaxRetries}, priority {Priority})",
|
||||
job.JobType, job.Id, job.LayerName, job.RetryCount + 1, job.MaxRetries, job.Priority);
|
||||
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
|
||||
var pluginManager = scope.ServiceProvider.GetRequiredService<PluginManager>();
|
||||
@@ -119,6 +145,7 @@ public class JobQueueProcessor : BackgroundService
|
||||
{
|
||||
_logger.LogWarning("JobQueueProcessor: Layer {LayerId} not found, marking job as failed", job.LayerId);
|
||||
await queueService.MarkJobFailedAsync(job.Id, "Layer not found");
|
||||
job.Status = JobStatus.Failed;
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -131,6 +158,7 @@ public class JobQueueProcessor : BackgroundService
|
||||
{
|
||||
_logger.LogWarning("JobQueueProcessor: Importer {PluginName} not found, marking job as failed", job.PluginName);
|
||||
await queueService.MarkJobFailedAsync(job.Id, $"Importer {job.PluginName} not found");
|
||||
job.Status = JobStatus.Failed;
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -146,10 +174,11 @@ public class JobQueueProcessor : BackgroundService
|
||||
{
|
||||
_logger.LogWarning("JobQueueProcessor: Processor {PluginName} not found, marking job as failed", job.PluginName);
|
||||
await queueService.MarkJobFailedAsync(job.Id, $"Processor {job.PluginName} not found");
|
||||
job.Status = JobStatus.Failed;
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogInformation("JobQueueProcessor: Executing process for layer {LayerName} with plugin {PluginName}",
|
||||
_logger.LogInformation("JobQueueProcessor: Executing process for layer {LayerName} with processor {PluginName}",
|
||||
layer.Name, job.PluginName);
|
||||
|
||||
processor.Process(layer);
|
||||
@@ -160,6 +189,7 @@ public class JobQueueProcessor : BackgroundService
|
||||
}
|
||||
|
||||
await queueService.MarkJobCompletedAsync(job.Id);
|
||||
job.Status = JobStatus.Completed;
|
||||
|
||||
_logger.LogInformation("JobQueueProcessor: Successfully completed {JobType} for layer {LayerName}",
|
||||
job.JobType, layer.Name);
|
||||
@@ -176,10 +206,12 @@ public class JobQueueProcessor : BackgroundService
|
||||
if (IsRetriableError(ex))
|
||||
{
|
||||
await queueService.MarkJobForRetryAsync(job.Id, ex.Message);
|
||||
job.Status = JobStatus.Retrying;
|
||||
}
|
||||
else
|
||||
{
|
||||
await queueService.MarkJobFailedAsync(job.Id, ex.Message);
|
||||
job.Status = JobStatus.Failed;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user