Remove queue

This commit is contained in:
Michał Zieliński
2025-06-08 11:08:48 +02:00
parent 99d8593c49
commit 4fd0b1cd50
5 changed files with 2 additions and 777 deletions

View File

@@ -1,18 +0,0 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DiunaBI.Core.Models;
namespace DiunaBI.Core.Interfaces;
public interface IJobQueueService
{
Task EnqueueJobAsync(QueueJob job);
Task<QueueJob?> DequeueJobAsync(JobType? jobType = null);
Task MarkJobCompletedAsync(Guid jobId);
Task MarkJobFailedAsync(Guid jobId, string error);
Task MarkJobForRetryAsync(Guid jobId, string error);
Task<IEnumerable<QueueJob>> GetQueueStatusAsync();
Task<int> GetQueueCountAsync(JobType? jobType = null);
Task<int> GetRunningJobsCountAsync(JobType jobType);
}

View File

@@ -1,238 +0,0 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.EntityFrameworkCore;
using DiunaBI.Core.Models;
using DiunaBI.Core.Interfaces;
using DiunaBI.Core.Database.Context;
using System;
using System.Threading.Tasks;
using System.Threading;
using System.Net.Http;
using System.Linq;
namespace DiunaBI.Core.Services;
public class JobQueueProcessor : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory; // ✅ GOOD - używa scope factory
private readonly ILogger<JobQueueProcessor> _logger;
private readonly ManualResetEventSlim _processSignal = new(false);
// ❌ USUŃ DIRECT INJECTION scoped services:
// private readonly IJobQueueService _queueService;
// private readonly AppDbContext _db;
public JobQueueProcessor(
IServiceScopeFactory scopeFactory, // ✅ GOOD - inject scope factory
ILogger<JobQueueProcessor> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("JobQueueProcessor: Started (manual trigger mode)");
while (!stoppingToken.IsCancellationRequested)
{
try
{
// Wait for manual trigger
_processSignal.Wait(stoppingToken);
_processSignal.Reset();
_logger.LogInformation("JobQueueProcessor: Processing triggered");
// ✅ GOOD - create scope for each processing cycle
using var scope = _scopeFactory.CreateScope();
var queueService = scope.ServiceProvider.GetRequiredService<IJobQueueService>();
var pluginManager = scope.ServiceProvider.GetRequiredService<PluginManager>();
await ProcessQueueAsync(queueService, pluginManager, stoppingToken);
}
catch (OperationCanceledException)
{
_logger.LogInformation("JobQueueProcessor: Cancellation requested");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "JobQueueProcessor: Error in processing loop");
await Task.Delay(5000, stoppingToken); // Wait before retry
}
}
_logger.LogInformation("JobQueueProcessor: Stopped");
}
public void TriggerProcessing()
{
_logger.LogInformation("JobQueueProcessor: Manual trigger received");
_processSignal.Set();
}
private async Task ProcessQueueAsync(IJobQueueService queueService, PluginManager pluginManager, CancellationToken cancellationToken)
{
var startTime = DateTime.UtcNow;
var initialQueueSize = await queueService.GetQueueCountAsync();
int processedJobs = 0;
int failedJobs = 0;
_logger.LogInformation("JobQueueProcessor: Starting processing of {InitialQueueSize} jobs", initialQueueSize);
while (!cancellationToken.IsCancellationRequested)
{
// First process all imports (they run sequentially due to Google Sheets API limits)
var importJob = await queueService.DequeueJobAsync(JobType.Import);
if (importJob != null)
{
await ProcessJobAsync(importJob, cancellationToken);
if (importJob.Status == JobStatus.Completed) processedJobs++;
else failedJobs++;
// Add delay between imports to respect Google Sheets API limits
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
continue;
}
// Then process processors (can run in parallel within same priority)
var processJob = await queueService.DequeueJobAsync(JobType.Process);
if (processJob != null)
{
await ProcessJobAsync(processJob, cancellationToken);
if (processJob.Status == JobStatus.Completed) processedJobs++;
else failedJobs++;
continue;
}
// No more jobs in queue
break;
}
var endTime = DateTime.UtcNow;
var duration = endTime - startTime;
var finalQueueSize = await queueService.GetQueueCountAsync();
_logger.LogInformation("JobQueueProcessor: Processing completed. Duration: {Duration:hh\\:mm\\:ss}, " +
"Initial queue: {InitialQueueSize}, Processed: {ProcessedJobs}, Failed: {FailedJobs}, " +
"Final queue size: {FinalQueueSize}",
duration, initialQueueSize, processedJobs, failedJobs, finalQueueSize);
if (failedJobs > 0)
{
_logger.LogWarning("JobQueueProcessor: {FailedJobs} jobs failed during processing. Check logs for details.", failedJobs);
}
}
private async Task ProcessJobAsync(QueueJob job, CancellationToken cancellationToken)
{
try
{
_logger.LogInformation("JobQueueProcessor: Processing {JobType} job {JobId} for layer {LayerName} (attempt {RetryCount}/{MaxRetries}, priority {Priority})",
job.JobType, job.Id, job.LayerName, job.RetryCount + 1, job.MaxRetries, job.Priority);
// POPRAWKA: używaj _scopeFactory zamiast _serviceProvider
using var scope = _scopeFactory.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var pluginManager = scope.ServiceProvider.GetRequiredService<PluginManager>();
var queueService = scope.ServiceProvider.GetRequiredService<IJobQueueService>();
// Get the layer with records
var layer = await dbContext.Layers
.Include(x => x.Records)
.FirstOrDefaultAsync(x => x.Id == job.LayerId && !x.IsDeleted, cancellationToken);
if (layer == null)
{
_logger.LogWarning("JobQueueProcessor: Layer {LayerId} not found, marking job as failed", job.LayerId);
await queueService.MarkJobFailedAsync(job.Id, "Layer not found");
job.Status = JobStatus.Failed;
return;
}
// Process based on job type
switch (job.JobType)
{
case JobType.Import:
var importer = pluginManager.GetImporter(job.PluginName);
if (importer == null)
{
_logger.LogWarning("JobQueueProcessor: Importer {PluginName} not found, marking job as failed", job.PluginName);
await queueService.MarkJobFailedAsync(job.Id, $"Importer {job.PluginName} not found");
job.Status = JobStatus.Failed;
return;
}
_logger.LogInformation("JobQueueProcessor: Executing import for layer {LayerName} with plugin {PluginName}",
layer.Name, job.PluginName);
importer.Import(layer);
break;
case JobType.Process:
var processor = pluginManager.GetProcessor(job.PluginName);
if (processor == null)
{
_logger.LogWarning("JobQueueProcessor: Processor {PluginName} not found, marking job as failed", job.PluginName);
await queueService.MarkJobFailedAsync(job.Id, $"Processor {job.PluginName} not found");
job.Status = JobStatus.Failed;
return;
}
_logger.LogInformation("JobQueueProcessor: Executing process for layer {LayerName} with processor {PluginName}",
layer.Name, job.PluginName);
processor.Process(layer);
break;
default:
throw new ArgumentOutOfRangeException(nameof(job.JobType), job.JobType, "Unknown job type");
}
await queueService.MarkJobCompletedAsync(job.Id);
job.Status = JobStatus.Completed;
_logger.LogInformation("JobQueueProcessor: Successfully completed {JobType} for layer {LayerName}",
job.JobType, layer.Name);
}
catch (Exception ex)
{
_logger.LogError(ex, "JobQueueProcessor: Error processing {JobType} job {JobId} for layer {LayerName}",
job.JobType, job.Id, job.LayerName);
// POPRAWKA: tutaj też używaj _scopeFactory
using var scope = _scopeFactory.CreateScope();
var queueService = scope.ServiceProvider.GetRequiredService<IJobQueueService>();
// Check if it's a retriable error
if (IsRetriableError(ex))
{
await queueService.MarkJobForRetryAsync(job.Id, ex.Message);
job.Status = JobStatus.Retrying;
}
else
{
await queueService.MarkJobFailedAsync(job.Id, ex.Message);
job.Status = JobStatus.Failed;
}
}
}
private static bool IsRetriableError(Exception ex)
{
var message = ex.Message.ToLowerInvariant();
var retriableErrors = new[]
{
"quota", "rate limit", "timeout", "service unavailable",
"internal server error", "bad gateway", "gateway timeout",
"network", "connection"
};
return retriableErrors.Any(error => message.Contains(error)) ||
ex is HttpRequestException ||
ex is TimeoutException;
}
}

View File

@@ -1,155 +0,0 @@
using Microsoft.Extensions.Logging;
using DiunaBI.Core.Models;
using DiunaBI.Core.Interfaces;
using System.Threading.Tasks;
using DiunaBI.Core.Database.Context;
using System;
using System.Linq;
using Microsoft.EntityFrameworkCore;
using System.Collections.Generic;
namespace DiunaBI.Core.Services;
public class JobQueueService : IJobQueueService
{
private readonly AppDbContext _db;
private readonly ILogger<JobQueueService> _logger;
public JobQueueService(AppDbContext db, ILogger<JobQueueService> logger)
{
_db = db;
_logger = logger;
}
public async Task EnqueueJobAsync(QueueJob job)
{
_db.QueueJobs.Add(job);
await _db.SaveChangesAsync();
_logger.LogInformation("JobQueue: Enqueued {JobType} job {JobId} for layer {LayerName} with plugin {PluginName} (priority {Priority})",
job.JobType, job.Id, job.LayerName, job.PluginName, job.Priority);
}
public async Task<QueueJob?> DequeueJobAsync(JobType? jobType = null)
{
var query = _db.QueueJobs.Where(x => x.Status == JobStatus.Pending);
if (jobType.HasValue)
{
query = query.Where(x => x.JobType == jobType.Value);
}
var job = await query
.OrderBy(x => x.JobType) // Importers first (0), then Processors (1)
.ThenBy(x => x.Priority) // Then by priority
.ThenBy(x => x.CreatedAt) // Then FIFO
.FirstOrDefaultAsync();
if (job != null)
{
job.Status = JobStatus.Running;
job.LastAttemptAt = DateTime.UtcNow;
job.ModifiedAtUtc = DateTime.UtcNow;
await _db.SaveChangesAsync();
_logger.LogDebug("JobQueue: Dequeued {JobType} job {JobId} for layer {LayerName} (priority {Priority})",
job.JobType, job.Id, job.LayerName, job.Priority);
}
return job;
}
public async Task MarkJobCompletedAsync(Guid jobId)
{
var job = await _db.QueueJobs.FindAsync(jobId);
if (job != null)
{
job.Status = JobStatus.Completed;
job.CompletedAt = DateTime.UtcNow;
job.ModifiedAtUtc = DateTime.UtcNow;
await _db.SaveChangesAsync();
_logger.LogInformation("JobQueue: {JobType} job {JobId} completed successfully for layer {LayerName}",
job.JobType, jobId, job.LayerName);
}
}
public async Task MarkJobFailedAsync(Guid jobId, string error)
{
var job = await _db.QueueJobs.FindAsync(jobId);
if (job != null)
{
job.Status = JobStatus.Failed;
job.LastError = error;
job.ModifiedAtUtc = DateTime.UtcNow;
await _db.SaveChangesAsync();
_logger.LogError("JobQueue: {JobType} job {JobId} failed permanently for layer {LayerName}: {Error}",
job.JobType, jobId, job.LayerName, error);
}
}
public async Task MarkJobForRetryAsync(Guid jobId, string error)
{
var job = await _db.QueueJobs.FindAsync(jobId);
if (job != null)
{
job.RetryCount++;
job.LastError = error;
job.ModifiedAtUtc = DateTime.UtcNow;
if (job.RetryCount >= job.MaxRetries)
{
await MarkJobFailedAsync(jobId, $"Max retries ({job.MaxRetries}) exceeded. Last error: {error}");
return;
}
job.Status = JobStatus.Retrying;
await _db.SaveChangesAsync();
// Schedule retry with exponential backoff
var delayMinutes = Math.Pow(2, job.RetryCount);
_ = Task.Delay(TimeSpan.FromMinutes(delayMinutes))
.ContinueWith(async _ =>
{
var retryJob = await _db.QueueJobs.FindAsync(jobId);
if (retryJob?.Status == JobStatus.Retrying)
{
retryJob.Status = JobStatus.Pending;
retryJob.ModifiedAtUtc = DateTime.UtcNow;
await _db.SaveChangesAsync();
_logger.LogWarning("JobQueue: {JobType} job {JobId} re-queued for retry {RetryCount}/{MaxRetries} for layer {LayerName}",
retryJob.JobType, jobId, retryJob.RetryCount, retryJob.MaxRetries, retryJob.LayerName);
}
});
}
}
public async Task<IEnumerable<QueueJob>> GetQueueStatusAsync()
{
return await _db.QueueJobs
.OrderBy(x => x.JobType)
.ThenBy(x => x.Priority)
.ThenBy(x => x.CreatedAt)
.ToListAsync();
}
public async Task<int> GetQueueCountAsync(JobType? jobType = null)
{
var query = _db.QueueJobs.Where(x => x.Status == JobStatus.Pending);
if (jobType.HasValue)
{
query = query.Where(x => x.JobType == jobType.Value);
}
return await query.CountAsync();
}
public async Task<int> GetRunningJobsCountAsync(JobType jobType)
{
return await _db.QueueJobs.CountAsync(x => x.Status == JobStatus.Running && x.JobType == jobType);
}
}