From 4fd0b1cd508e1d2b7e258f643bf6af5491816e09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zieli=C5=84ski?= Date: Sun, 8 Jun 2025 11:08:48 +0200 Subject: [PATCH] Remove queue --- .../Interfaces/IJobQueueService.cs | 18 - .../Services/JobQueueProcessor.cs | 238 ------------ .../DiunaBI.Core/Services/JobQueueService.cs | 155 -------- .../Controllers/LayersController.cs | 360 +----------------- src/Backend/DiunaBI.WebAPI/Program.cs | 8 +- 5 files changed, 2 insertions(+), 777 deletions(-) delete mode 100644 src/Backend/DiunaBI.Core/Interfaces/IJobQueueService.cs delete mode 100644 src/Backend/DiunaBI.Core/Services/JobQueueProcessor.cs delete mode 100644 src/Backend/DiunaBI.Core/Services/JobQueueService.cs diff --git a/src/Backend/DiunaBI.Core/Interfaces/IJobQueueService.cs b/src/Backend/DiunaBI.Core/Interfaces/IJobQueueService.cs deleted file mode 100644 index 25a70be..0000000 --- a/src/Backend/DiunaBI.Core/Interfaces/IJobQueueService.cs +++ /dev/null @@ -1,18 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using DiunaBI.Core.Models; - -namespace DiunaBI.Core.Interfaces; - -public interface IJobQueueService -{ - Task EnqueueJobAsync(QueueJob job); - Task DequeueJobAsync(JobType? jobType = null); - Task MarkJobCompletedAsync(Guid jobId); - Task MarkJobFailedAsync(Guid jobId, string error); - Task MarkJobForRetryAsync(Guid jobId, string error); - Task> GetQueueStatusAsync(); - Task GetQueueCountAsync(JobType? jobType = null); - Task GetRunningJobsCountAsync(JobType jobType); -} \ No newline at end of file diff --git a/src/Backend/DiunaBI.Core/Services/JobQueueProcessor.cs b/src/Backend/DiunaBI.Core/Services/JobQueueProcessor.cs deleted file mode 100644 index 2ea61c5..0000000 --- a/src/Backend/DiunaBI.Core/Services/JobQueueProcessor.cs +++ /dev/null @@ -1,238 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.EntityFrameworkCore; -using DiunaBI.Core.Models; -using DiunaBI.Core.Interfaces; -using DiunaBI.Core.Database.Context; -using System; -using System.Threading.Tasks; -using System.Threading; -using System.Net.Http; -using System.Linq; - -namespace DiunaBI.Core.Services; - -public class JobQueueProcessor : BackgroundService -{ - private readonly IServiceScopeFactory _scopeFactory; // ✅ GOOD - używa scope factory - private readonly ILogger _logger; - private readonly ManualResetEventSlim _processSignal = new(false); - - // ❌ USUŃ DIRECT INJECTION scoped services: - // private readonly IJobQueueService _queueService; - // private readonly AppDbContext _db; - - public JobQueueProcessor( - IServiceScopeFactory scopeFactory, // ✅ GOOD - inject scope factory - ILogger logger) - { - _scopeFactory = scopeFactory; - _logger = logger; - } - - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - _logger.LogInformation("JobQueueProcessor: Started (manual trigger mode)"); - - while (!stoppingToken.IsCancellationRequested) - { - try - { - // Wait for manual trigger - _processSignal.Wait(stoppingToken); - _processSignal.Reset(); - - _logger.LogInformation("JobQueueProcessor: Processing triggered"); - - // ✅ GOOD - create scope for each processing cycle - using var scope = _scopeFactory.CreateScope(); - var queueService = scope.ServiceProvider.GetRequiredService(); - var pluginManager = scope.ServiceProvider.GetRequiredService(); - - await ProcessQueueAsync(queueService, pluginManager, stoppingToken); - } - catch (OperationCanceledException) - { - _logger.LogInformation("JobQueueProcessor: Cancellation requested"); - break; - } - catch (Exception ex) - { - _logger.LogError(ex, "JobQueueProcessor: Error in processing loop"); - await Task.Delay(5000, stoppingToken); // Wait before retry - } - } - - _logger.LogInformation("JobQueueProcessor: Stopped"); - } - - public void TriggerProcessing() - { - _logger.LogInformation("JobQueueProcessor: Manual trigger received"); - _processSignal.Set(); - } - - private async Task ProcessQueueAsync(IJobQueueService queueService, PluginManager pluginManager, CancellationToken cancellationToken) - { - var startTime = DateTime.UtcNow; - var initialQueueSize = await queueService.GetQueueCountAsync(); - int processedJobs = 0; - int failedJobs = 0; - - _logger.LogInformation("JobQueueProcessor: Starting processing of {InitialQueueSize} jobs", initialQueueSize); - - while (!cancellationToken.IsCancellationRequested) - { - // First process all imports (they run sequentially due to Google Sheets API limits) - var importJob = await queueService.DequeueJobAsync(JobType.Import); - if (importJob != null) - { - await ProcessJobAsync(importJob, cancellationToken); - if (importJob.Status == JobStatus.Completed) processedJobs++; - else failedJobs++; - - // Add delay between imports to respect Google Sheets API limits - await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); - continue; - } - - // Then process processors (can run in parallel within same priority) - var processJob = await queueService.DequeueJobAsync(JobType.Process); - if (processJob != null) - { - await ProcessJobAsync(processJob, cancellationToken); - if (processJob.Status == JobStatus.Completed) processedJobs++; - else failedJobs++; - continue; - } - - // No more jobs in queue - break; - } - - var endTime = DateTime.UtcNow; - var duration = endTime - startTime; - var finalQueueSize = await queueService.GetQueueCountAsync(); - - _logger.LogInformation("JobQueueProcessor: Processing completed. Duration: {Duration:hh\\:mm\\:ss}, " + - "Initial queue: {InitialQueueSize}, Processed: {ProcessedJobs}, Failed: {FailedJobs}, " + - "Final queue size: {FinalQueueSize}", - duration, initialQueueSize, processedJobs, failedJobs, finalQueueSize); - - if (failedJobs > 0) - { - _logger.LogWarning("JobQueueProcessor: {FailedJobs} jobs failed during processing. Check logs for details.", failedJobs); - } - } - - private async Task ProcessJobAsync(QueueJob job, CancellationToken cancellationToken) - { - try - { - _logger.LogInformation("JobQueueProcessor: Processing {JobType} job {JobId} for layer {LayerName} (attempt {RetryCount}/{MaxRetries}, priority {Priority})", - job.JobType, job.Id, job.LayerName, job.RetryCount + 1, job.MaxRetries, job.Priority); - - // POPRAWKA: używaj _scopeFactory zamiast _serviceProvider - using var scope = _scopeFactory.CreateScope(); - var dbContext = scope.ServiceProvider.GetRequiredService(); - var pluginManager = scope.ServiceProvider.GetRequiredService(); - var queueService = scope.ServiceProvider.GetRequiredService(); - - // Get the layer with records - var layer = await dbContext.Layers - .Include(x => x.Records) - .FirstOrDefaultAsync(x => x.Id == job.LayerId && !x.IsDeleted, cancellationToken); - - if (layer == null) - { - _logger.LogWarning("JobQueueProcessor: Layer {LayerId} not found, marking job as failed", job.LayerId); - await queueService.MarkJobFailedAsync(job.Id, "Layer not found"); - job.Status = JobStatus.Failed; - return; - } - - // Process based on job type - switch (job.JobType) - { - case JobType.Import: - var importer = pluginManager.GetImporter(job.PluginName); - if (importer == null) - { - _logger.LogWarning("JobQueueProcessor: Importer {PluginName} not found, marking job as failed", job.PluginName); - await queueService.MarkJobFailedAsync(job.Id, $"Importer {job.PluginName} not found"); - job.Status = JobStatus.Failed; - return; - } - - _logger.LogInformation("JobQueueProcessor: Executing import for layer {LayerName} with plugin {PluginName}", - layer.Name, job.PluginName); - - importer.Import(layer); - break; - - case JobType.Process: - var processor = pluginManager.GetProcessor(job.PluginName); - if (processor == null) - { - _logger.LogWarning("JobQueueProcessor: Processor {PluginName} not found, marking job as failed", job.PluginName); - await queueService.MarkJobFailedAsync(job.Id, $"Processor {job.PluginName} not found"); - job.Status = JobStatus.Failed; - return; - } - - _logger.LogInformation("JobQueueProcessor: Executing process for layer {LayerName} with processor {PluginName}", - layer.Name, job.PluginName); - - processor.Process(layer); - break; - - default: - throw new ArgumentOutOfRangeException(nameof(job.JobType), job.JobType, "Unknown job type"); - } - - await queueService.MarkJobCompletedAsync(job.Id); - job.Status = JobStatus.Completed; - - _logger.LogInformation("JobQueueProcessor: Successfully completed {JobType} for layer {LayerName}", - job.JobType, layer.Name); - } - catch (Exception ex) - { - _logger.LogError(ex, "JobQueueProcessor: Error processing {JobType} job {JobId} for layer {LayerName}", - job.JobType, job.Id, job.LayerName); - - // POPRAWKA: tutaj też używaj _scopeFactory - using var scope = _scopeFactory.CreateScope(); - var queueService = scope.ServiceProvider.GetRequiredService(); - - // Check if it's a retriable error - if (IsRetriableError(ex)) - { - await queueService.MarkJobForRetryAsync(job.Id, ex.Message); - job.Status = JobStatus.Retrying; - } - else - { - await queueService.MarkJobFailedAsync(job.Id, ex.Message); - job.Status = JobStatus.Failed; - } - } - } - - private static bool IsRetriableError(Exception ex) - { - var message = ex.Message.ToLowerInvariant(); - - var retriableErrors = new[] - { - "quota", "rate limit", "timeout", "service unavailable", - "internal server error", "bad gateway", "gateway timeout", - "network", "connection" - }; - - return retriableErrors.Any(error => message.Contains(error)) || - ex is HttpRequestException || - ex is TimeoutException; - } -} \ No newline at end of file diff --git a/src/Backend/DiunaBI.Core/Services/JobQueueService.cs b/src/Backend/DiunaBI.Core/Services/JobQueueService.cs deleted file mode 100644 index c45201c..0000000 --- a/src/Backend/DiunaBI.Core/Services/JobQueueService.cs +++ /dev/null @@ -1,155 +0,0 @@ - -using Microsoft.Extensions.Logging; -using DiunaBI.Core.Models; -using DiunaBI.Core.Interfaces; -using System.Threading.Tasks; -using DiunaBI.Core.Database.Context; -using System; -using System.Linq; -using Microsoft.EntityFrameworkCore; -using System.Collections.Generic; - -namespace DiunaBI.Core.Services; - -public class JobQueueService : IJobQueueService -{ - private readonly AppDbContext _db; - private readonly ILogger _logger; - - public JobQueueService(AppDbContext db, ILogger logger) - { - _db = db; - _logger = logger; - } - - public async Task EnqueueJobAsync(QueueJob job) - { - _db.QueueJobs.Add(job); - await _db.SaveChangesAsync(); - - _logger.LogInformation("JobQueue: Enqueued {JobType} job {JobId} for layer {LayerName} with plugin {PluginName} (priority {Priority})", - job.JobType, job.Id, job.LayerName, job.PluginName, job.Priority); - } - - public async Task DequeueJobAsync(JobType? jobType = null) - { - var query = _db.QueueJobs.Where(x => x.Status == JobStatus.Pending); - - if (jobType.HasValue) - { - query = query.Where(x => x.JobType == jobType.Value); - } - - var job = await query - .OrderBy(x => x.JobType) // Importers first (0), then Processors (1) - .ThenBy(x => x.Priority) // Then by priority - .ThenBy(x => x.CreatedAt) // Then FIFO - .FirstOrDefaultAsync(); - - if (job != null) - { - job.Status = JobStatus.Running; - job.LastAttemptAt = DateTime.UtcNow; - job.ModifiedAtUtc = DateTime.UtcNow; - await _db.SaveChangesAsync(); - - _logger.LogDebug("JobQueue: Dequeued {JobType} job {JobId} for layer {LayerName} (priority {Priority})", - job.JobType, job.Id, job.LayerName, job.Priority); - } - - return job; - } - - public async Task MarkJobCompletedAsync(Guid jobId) - { - var job = await _db.QueueJobs.FindAsync(jobId); - if (job != null) - { - job.Status = JobStatus.Completed; - job.CompletedAt = DateTime.UtcNow; - job.ModifiedAtUtc = DateTime.UtcNow; - await _db.SaveChangesAsync(); - - _logger.LogInformation("JobQueue: {JobType} job {JobId} completed successfully for layer {LayerName}", - job.JobType, jobId, job.LayerName); - } - } - - public async Task MarkJobFailedAsync(Guid jobId, string error) - { - var job = await _db.QueueJobs.FindAsync(jobId); - if (job != null) - { - job.Status = JobStatus.Failed; - job.LastError = error; - job.ModifiedAtUtc = DateTime.UtcNow; - await _db.SaveChangesAsync(); - - _logger.LogError("JobQueue: {JobType} job {JobId} failed permanently for layer {LayerName}: {Error}", - job.JobType, jobId, job.LayerName, error); - } - } - - public async Task MarkJobForRetryAsync(Guid jobId, string error) - { - var job = await _db.QueueJobs.FindAsync(jobId); - if (job != null) - { - job.RetryCount++; - job.LastError = error; - job.ModifiedAtUtc = DateTime.UtcNow; - - if (job.RetryCount >= job.MaxRetries) - { - await MarkJobFailedAsync(jobId, $"Max retries ({job.MaxRetries}) exceeded. Last error: {error}"); - return; - } - - job.Status = JobStatus.Retrying; - await _db.SaveChangesAsync(); - - // Schedule retry with exponential backoff - var delayMinutes = Math.Pow(2, job.RetryCount); - _ = Task.Delay(TimeSpan.FromMinutes(delayMinutes)) - .ContinueWith(async _ => - { - var retryJob = await _db.QueueJobs.FindAsync(jobId); - if (retryJob?.Status == JobStatus.Retrying) - { - retryJob.Status = JobStatus.Pending; - retryJob.ModifiedAtUtc = DateTime.UtcNow; - await _db.SaveChangesAsync(); - - _logger.LogWarning("JobQueue: {JobType} job {JobId} re-queued for retry {RetryCount}/{MaxRetries} for layer {LayerName}", - retryJob.JobType, jobId, retryJob.RetryCount, retryJob.MaxRetries, retryJob.LayerName); - } - }); - } - } - - public async Task> GetQueueStatusAsync() - { - return await _db.QueueJobs - .OrderBy(x => x.JobType) - .ThenBy(x => x.Priority) - .ThenBy(x => x.CreatedAt) - .ToListAsync(); - } - - public async Task GetQueueCountAsync(JobType? jobType = null) - { - var query = _db.QueueJobs.Where(x => x.Status == JobStatus.Pending); - - if (jobType.HasValue) - { - query = query.Where(x => x.JobType == jobType.Value); - } - - return await query.CountAsync(); - } - - public async Task GetRunningJobsCountAsync(JobType jobType) - { - return await _db.QueueJobs.CountAsync(x => x.Status == JobStatus.Running && x.JobType == jobType); - } -} \ No newline at end of file diff --git a/src/Backend/DiunaBI.WebAPI/Controllers/LayersController.cs b/src/Backend/DiunaBI.WebAPI/Controllers/LayersController.cs index 2fca29c..af9bc21 100644 --- a/src/Backend/DiunaBI.WebAPI/Controllers/LayersController.cs +++ b/src/Backend/DiunaBI.WebAPI/Controllers/LayersController.cs @@ -21,7 +21,6 @@ public class LayersController : Controller private readonly IConfiguration _configuration; private readonly PluginManager _pluginManager; private readonly ILogger _logger; - private readonly IJobQueueService _queueService; public LayersController( AppDbContext db, @@ -29,8 +28,7 @@ public class LayersController : Controller GoogleDriveHelper googleDriveHelper, IConfiguration configuration, PluginManager pluginManager, - ILogger logger, - IJobQueueService queueService + ILogger logger ) { _db = db; @@ -39,7 +37,6 @@ public class LayersController : Controller _configuration = configuration; _pluginManager = pluginManager; _logger = logger; - _queueService = queueService; } [HttpGet] @@ -241,51 +238,6 @@ public class LayersController : Controller } } - [HttpGet] - [Route("AutoImportWithQueue/{apiKey}")] - [AllowAnonymous] - public IActionResult AutoImportWithQueue(string apiKey) - { - if (Request.Host.Value != _configuration["apiLocalUrl"] || apiKey != _configuration["apiKey"]) - { - _logger.LogWarning("AutoImportQueue: Unauthorized request with apiKey {ApiKey}", apiKey); - return Unauthorized(); - } - - var importWorkerLayers = _db.Layers - .Include(x => x.Records) - .Where(x => - x.Records!.Any(y => y.Code == "Type" && y.Desc1 == "ImportWorker") && - x.Records!.Any(y => y.Code == "IsEnabled" && y.Desc1 == "True") - ) - .OrderBy(x => x.CreatedAt) - .AsNoTracking() - .ToList(); - - if (importWorkerLayers.Count == 0) - { - _logger.LogInformation("AutoImportQueue: No layers to import"); - return Ok(); - } - - _logger.LogInformation("AutoImportQueue: Found {LayerCount} layers to queue", importWorkerLayers.Count); - - foreach (var importWorker in importWorkerLayers) - { - try - { - // Queue job implementation would go here - _logger.LogDebug("AutoImportQueue: Queued layer {LayerName} ({LayerId})", importWorker.Name, importWorker.Id); - } - catch (Exception e) - { - _logger.LogError(e, "AutoImportQueue: Error while adding job for layer {LayerName} ({LayerId})", - importWorker.Name, importWorker.Id); - } - } - return Ok(); - } - [HttpGet] [Route("ProcessQueue/{apiKey}")] [AllowAnonymous] @@ -679,42 +631,6 @@ public class LayersController : Controller [HttpGet] [Route("CheckProcessors")] [AllowAnonymous] - public IActionResult CheckProcessors() - { - // get - - /* - var importWorkerLayers = _db.Layers - .Include(x => x.Records) - .Where(x => - x.Records!.Any(y => y.Code == "Type" && y.Desc1 == "ImportWorker") && - x.Records!.Any(y => y.Code == "ImportType" && y.Desc1 == "Import-D3") - ) - .OrderByDescending(x => x.CreatedAt) - .AsNoTracking() - .ToList(); - - foreach (var importWorker in importWorkerLayers) - { - var record = new Record - { - Id = Guid.NewGuid(), - LayerId = importWorker.Id, - Code = "Plugin", - Desc1 = "Morska.Import.D3", - CreatedAt = DateTime.UtcNow, - ModifiedAt = DateTime.UtcNow, - CreatedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D"), - ModifiedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D") - }; - //_db.Records.Add(record); - } - - //_db.SaveChanges(); - */ - return Ok(); - } - private static void WriteToConsole(params string[] messages) { @@ -809,278 +725,4 @@ public class LayersController : Controller throw; } } - - [HttpGet] - [Route("EnqueueImportWorkers/{apiKey}")] - [AllowAnonymous] - public async Task EnqueueImportWorkers(string apiKey, [FromQuery] Guid? layerId = null) - { - if (Request.Host.Value != _configuration["apiLocalUrl"] || apiKey != _configuration["apiKey"]) - { - _logger.LogWarning("EnqueueImportWorkers: Unauthorized request with apiKey {ApiKey}", apiKey); - return Unauthorized(); - } - - try - { - var query = _db.Layers - .Include(x => x.Records) - .Where(x => - x.Records!.Any(y => y.Code == "Type" && y.Desc1 == "ImportWorker") && - x.Records!.Any(y => y.Code == "IsEnabled" && y.Desc1 == "True") && - !x.IsDeleted && !x.IsCancelled - ).Take(5); - - // If specific layerId is provided, filter to that layer only - if (layerId.HasValue) - { - query = query.Where(x => x.Id == layerId.Value); - } - - var importWorkerLayers = await query - .OrderBy(x => x.CreatedAt) - .AsNoTracking() - .ToListAsync(); - - _logger.LogInformation("EnqueueImportWorkers: Found {LayerCount} import worker layers to queue{LayerFilter}", - importWorkerLayers.Count, layerId.HasValue ? $" (filtered by LayerId: {layerId})" : ""); - - if (importWorkerLayers.Count == 0) - { - return Ok(new - { - Message = "No import workers found to queue", - QueuedJobs = 0, - TotalQueueSize = await _queueService.GetQueueCountAsync(), - SkippedLayers = 0 - }); - } - - int queuedCount = 0; - - foreach (var importWorker in importWorkerLayers) - { - var pluginName = importWorker.Records!.FirstOrDefault(x => x.Code == "Plugin")?.Desc1; - if (string.IsNullOrEmpty(pluginName)) - { - _logger.LogWarning("EnqueueImportWorkers: No plugin name found for layer {LayerName} ({LayerId}), skipping", - importWorker.Name, importWorker.Id); - continue; - } - - // Check if plugin exists - var importer = _pluginManager.GetImporter(pluginName); - if (importer == null) - { - _logger.LogWarning("EnqueueImportWorkers: Importer {PluginName} not found for layer {LayerName} ({LayerId}), skipping", - pluginName, importWorker.Name, importWorker.Id); - continue; - } - - var job = new QueueJob - { - LayerId = importWorker.Id, - LayerName = importWorker.Name ?? "Unknown", - PluginName = pluginName, - JobType = JobType.Import, - Priority = 0, // All imports have same priority - MaxRetries = 5, - CreatedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D"), - ModifiedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D") - }; - - await _queueService.EnqueueJobAsync(job); - queuedCount++; - - _logger.LogDebug("EnqueueImportWorkers: Queued import job for layer {LayerName} ({LayerId}) with plugin {PluginName}", - importWorker.Name, importWorker.Id, pluginName); - } - - var totalQueueSize = await _queueService.GetQueueCountAsync(); - - _logger.LogInformation("EnqueueImportWorkers: Successfully queued {QueuedCount} import jobs. Total queue size: {QueueSize}", - queuedCount, totalQueueSize); - - return Ok(new - { - Message = $"Queued {queuedCount} import jobs", - QueuedJobs = queuedCount, - TotalQueueSize = totalQueueSize, - SkippedLayers = importWorkerLayers.Count - queuedCount - }); - } - catch (Exception e) - { - _logger.LogError(e, "EnqueueImportWorkers: Error queuing import workers"); - return BadRequest(e.ToString()); - } - } - - [HttpGet] - [Route("EnqueueProcessWorkers/{apiKey}")] - [AllowAnonymous] - public async Task EnqueueProcessWorkers(string apiKey, [FromQuery] Guid? layerId = null) - { - if (Request.Host.Value != _configuration["apiLocalUrl"] || apiKey != _configuration["apiKey"]) - { - _logger.LogWarning("EnqueueProcessWorkers: Unauthorized request with apiKey {ApiKey}", apiKey); - return Unauthorized(); - } - - try - { - var query = _db.Layers - .Include(x => x.Records) - .Where(x => - x.Records!.Any(y => y.Code == "Type" && y.Desc1 == "ProcessWorker") && - x.Records!.Any(y => y.Code == "IsEnabled" && y.Desc1 == "True") && - !x.IsDeleted && !x.IsCancelled - ).Take(5); - - // If specific layerId is provided, filter to that layer only - if (layerId.HasValue) - { - query = query.Where(x => x.Id == layerId.Value); - } - - var processWorkerLayers = await query - .OrderBy(x => x.CreatedAt) - .AsNoTracking() - .ToListAsync(); - - _logger.LogInformation("EnqueueProcessWorkers: Found {LayerCount} process worker layers to queue{LayerFilter}", - processWorkerLayers.Count, layerId.HasValue ? $" (filtered by LayerId: {layerId})" : ""); - - if (processWorkerLayers.Count == 0) - { - return Ok(new - { - Message = "No process workers found to queue", - QueuedJobs = 0, - TotalQueueSize = await _queueService.GetQueueCountAsync(), - SkippedLayers = 0 - }); - } - - int queuedCount = 0; - - foreach (var processWorker in processWorkerLayers) - { - // POPRAWIONE: Używaj Plugin zamiast ProcessorType - var pluginName = processWorker.Records!.FirstOrDefault(x => x.Code == "Plugin")?.Desc1; - if (string.IsNullOrEmpty(pluginName)) - { - _logger.LogWarning("EnqueueProcessWorkers: No plugin name found for layer {LayerName} ({LayerId}), skipping", - processWorker.Name, processWorker.Id); - continue; - } - - // POPRAWIONE: Sprawdź czy processor istnieje przez pluginName - var processor = _pluginManager.GetProcessor(pluginName); - if (processor == null) - { - _logger.LogWarning("EnqueueProcessWorkers: Processor {PluginName} not found for layer {LayerName} ({LayerId}), skipping", - pluginName, processWorker.Name, processWorker.Id); - continue; - } - - // Get priority from ProcessWorker record, default to 10 if not found - var priorityStr = processWorker.Records!.FirstOrDefault(x => x.Code == "Priority")?.Desc1; - var priority = 10; // Default priority - if (!string.IsNullOrEmpty(priorityStr) && int.TryParse(priorityStr, out var parsedPriority)) - { - priority = parsedPriority; - } - - var job = new QueueJob - { - LayerId = processWorker.Id, - LayerName = processWorker.Name ?? "Unknown", - PluginName = pluginName, // POPRAWIONE: Używaj pluginName bezpośrednio - JobType = JobType.Process, - Priority = priority, - MaxRetries = 3, - CreatedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D"), - ModifiedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D") - }; - - await _queueService.EnqueueJobAsync(job); - queuedCount++; - - _logger.LogDebug("EnqueueProcessWorkers: Queued process job for layer {LayerName} ({LayerId}) with plugin {PluginName}, priority {Priority}", - processWorker.Name, processWorker.Id, pluginName, priority); - } - - var totalQueueSize = await _queueService.GetQueueCountAsync(); - - _logger.LogInformation("EnqueueProcessWorkers: Successfully queued {QueuedCount} process jobs. Total queue size: {QueueSize}", - queuedCount, totalQueueSize); - - return Ok(new - { - Message = $"Queued {queuedCount} process jobs", - QueuedJobs = queuedCount, - TotalQueueSize = totalQueueSize, - SkippedLayers = processWorkerLayers.Count - queuedCount - }); - } - catch (Exception e) - { - _logger.LogError(e, "EnqueueProcessWorkers: Error queuing process workers"); - return BadRequest(e.ToString()); - } - } - - [HttpGet] - [Route("RunQueueJobs/{apiKey}")] - [AllowAnonymous] - public async Task RunQueueJobs(string apiKey) - { - if (Request.Host.Value != _configuration["apiLocalUrl"] || apiKey != _configuration["apiKey"]) - { - _logger.LogWarning("RunQueueJobs: Unauthorized request with apiKey {ApiKey}", apiKey); - return Unauthorized(); - } - - try - { - var queueSize = await _queueService.GetQueueCountAsync(); - - if (queueSize == 0) - { - return Ok(new - { - Message = "Queue is empty", - QueueSize = 0, - Status = "No jobs to process" - }); - } - - _logger.LogInformation("RunQueueJobs: Triggering queue processing for {QueueSize} jobs", queueSize); - - // ZMIEŃ NA DOSTĘP PRZEZ IHostedService: - var hostedServices = HttpContext.RequestServices.GetServices(); - var queueProcessor = hostedServices.OfType().FirstOrDefault(); - - if (queueProcessor == null) - { - _logger.LogError("RunQueueJobs: JobQueueProcessor not found"); - return BadRequest("JobQueueProcessor not found"); - } - - queueProcessor.TriggerProcessing(); - - return Ok(new - { - Message = $"Queue processing triggered for {queueSize} jobs", - QueueSize = queueSize, - Status = "Processing started in background" - }); - } - catch (Exception e) - { - _logger.LogError(e, "RunQueueJobs: Error triggering queue processing"); - return BadRequest(e.ToString()); - } - } } \ No newline at end of file diff --git a/src/Backend/DiunaBI.WebAPI/Program.cs b/src/Backend/DiunaBI.WebAPI/Program.cs index bb1c1ff..440e6c5 100644 --- a/src/Backend/DiunaBI.WebAPI/Program.cs +++ b/src/Backend/DiunaBI.WebAPI/Program.cs @@ -69,10 +69,6 @@ builder.Services.AddAuthentication(options => }; }); -// Queue services -builder.Services.AddScoped(); -builder.Services.AddHostedService(); // ✅ GOOD - with proper scope factory - // Google Sheets dependencies Console.WriteLine("Adding Google Sheets dependencies..."); builder.Services.AddSingleton(); @@ -142,9 +138,7 @@ app.Use(async (context, next) => if (token.Length > 0 && !context.Request.Path.ToString().Contains("getForPowerBI") && !context.Request.Path.ToString().Contains("getConfiguration") - && !context.Request.Path.ToString().Contains("DataInbox/Add") - && !context.Request.Path.ToString().Contains("AddPluginName") // TODO: Remove this - && !context.Request.Path.ToString().Contains("GetImportWorkers")) + && !context.Request.Path.ToString().Contains("DataInbox/Add")) { var handler = new JwtSecurityTokenHandler(); var data = handler.ReadJwtToken(token.Split(' ')[1]);