From ff41a71484d8bd11ee43383cf6f2af781dc1a689 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zieli=C5=84ski?= Date: Sun, 8 Jun 2025 10:18:52 +0200 Subject: [PATCH] Final fixes before tests --- .../Services/JobQueueProcessor.cs | 110 +++-- .../Processors/T1R1Processor.cs | 4 +- .../T3MultiSourceSummaryProcessor.cs | 4 +- .../T3MultiSourceYearSummaryProcessor.cs | 4 +- .../T3SourceYearSummaryProcessor.cs | 4 +- .../Processors/T4R2Processor.cs | 4 +- .../Processors/T5LastValuesProcessor.cs | 2 +- .../Controllers/LayersController.cs | 418 ++++++++++++------ src/Backend/DiunaBI.WebAPI/Program.cs | 23 +- tools/http-tests/AutoImport.http | 2 +- 10 files changed, 374 insertions(+), 201 deletions(-) diff --git a/src/Backend/DiunaBI.Core/Services/JobQueueProcessor.cs b/src/Backend/DiunaBI.Core/Services/JobQueueProcessor.cs index 66c7387..624441c 100644 --- a/src/Backend/DiunaBI.Core/Services/JobQueueProcessor.cs +++ b/src/Backend/DiunaBI.Core/Services/JobQueueProcessor.cs @@ -17,6 +17,7 @@ public class JobQueueProcessor : BackgroundService { private readonly IServiceProvider _serviceProvider; private readonly ILogger _logger; + private readonly ManualResetEventSlim _processSignal; public JobQueueProcessor( IServiceProvider serviceProvider, @@ -24,27 +25,30 @@ public class JobQueueProcessor : BackgroundService { _serviceProvider = serviceProvider; _logger = logger; + _processSignal = new ManualResetEventSlim(false); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - _logger.LogInformation("JobQueueProcessor: Started"); + _logger.LogInformation("JobQueueProcessor: Started (manual trigger mode)"); while (!stoppingToken.IsCancellationRequested) { try { + // Wait for manual trigger or cancellation + _processSignal.Wait(stoppingToken); + _processSignal.Reset(); + + _logger.LogInformation("JobQueueProcessor: Processing triggered manually"); + using var scope = _serviceProvider.CreateScope(); var queueService = scope.ServiceProvider.GetRequiredService(); - // First process all imports (they run sequentially due to Google Sheets API limits) - await ProcessJobType(queueService, JobType.Import, maxConcurrency: 1, stoppingToken); + // Process all jobs until queue is empty + await ProcessAllJobs(queueService, stoppingToken); - // Then process processors (can run in parallel within same priority) - await ProcessJobType(queueService, JobType.Process, maxConcurrency: 3, stoppingToken); - - // Wait before next cycle - await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken); + _logger.LogInformation("JobQueueProcessor: Manual processing completed"); } catch (OperationCanceledException) { @@ -54,57 +58,79 @@ public class JobQueueProcessor : BackgroundService catch (Exception ex) { _logger.LogError(ex, "JobQueueProcessor: Unexpected error in queue processor"); - await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken); + await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken); } } _logger.LogInformation("JobQueueProcessor: Stopped"); } - private async Task ProcessJobType(IJobQueueService queueService, JobType jobType, int maxConcurrency, CancellationToken cancellationToken) + public void TriggerProcessing() { - var runningJobs = await queueService.GetRunningJobsCountAsync(jobType); + _logger.LogInformation("JobQueueProcessor: Manual trigger received"); + _processSignal.Set(); + } - // Don't start new jobs if we're at max concurrency - if (runningJobs >= maxConcurrency) + private async Task ProcessAllJobs(IJobQueueService queueService, CancellationToken cancellationToken) + { + var startTime = DateTime.UtcNow; + var initialQueueSize = await queueService.GetQueueCountAsync(); + int processedJobs = 0; + int failedJobs = 0; + + _logger.LogInformation("JobQueueProcessor: Starting processing of {InitialQueueSize} jobs", initialQueueSize); + + while (!cancellationToken.IsCancellationRequested) { - return; - } - - var job = await queueService.DequeueJobAsync(jobType); - if (job == null) - { - return; - } - - _logger.LogInformation("JobQueueProcessor: Processing {JobType} job {JobId} for layer {LayerName} (attempt {RetryCount}/{MaxRetries}, priority {Priority})", - job.JobType, job.Id, job.LayerName, job.RetryCount + 1, job.MaxRetries, job.Priority); - - // Process job asynchronously to allow parallel processing of processors - _ = Task.Run(async () => - { - try + // First process all imports (they run sequentially due to Google Sheets API limits) + var importJob = await queueService.DequeueJobAsync(JobType.Import); + if (importJob != null) { - await ProcessJobAsync(job, cancellationToken); + await ProcessJobAsync(importJob, cancellationToken); + if (importJob.Status == JobStatus.Completed) processedJobs++; + else failedJobs++; // Add delay between imports to respect Google Sheets API limits - if (job.JobType == JobType.Import) - { - await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); - } + await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); + continue; } - catch (Exception ex) + + // Then process processors (can run in parallel within same priority) + var processJob = await queueService.DequeueJobAsync(JobType.Process); + if (processJob != null) { - _logger.LogError(ex, "JobQueueProcessor: Error in background job processing for {JobType} job {JobId}", - job.JobType, job.Id); + await ProcessJobAsync(processJob, cancellationToken); + if (processJob.Status == JobStatus.Completed) processedJobs++; + else failedJobs++; + continue; } - }, cancellationToken); + + // No more jobs in queue + break; + } + + var endTime = DateTime.UtcNow; + var duration = endTime - startTime; + var finalQueueSize = await queueService.GetQueueCountAsync(); + + _logger.LogInformation("JobQueueProcessor: Processing completed. Duration: {Duration:hh\\:mm\\:ss}, " + + "Initial queue: {InitialQueueSize}, Processed: {ProcessedJobs}, Failed: {FailedJobs}, " + + "Final queue size: {FinalQueueSize}", + duration, initialQueueSize, processedJobs, failedJobs, finalQueueSize); + + if (failedJobs > 0) + { + _logger.LogWarning("JobQueueProcessor: {FailedJobs} jobs failed during processing. Check logs for details.", failedJobs); + } } private async Task ProcessJobAsync(QueueJob job, CancellationToken cancellationToken) { try { + _logger.LogInformation("JobQueueProcessor: Processing {JobType} job {JobId} for layer {LayerName} (attempt {RetryCount}/{MaxRetries}, priority {Priority})", + job.JobType, job.Id, job.LayerName, job.RetryCount + 1, job.MaxRetries, job.Priority); + using var scope = _serviceProvider.CreateScope(); var dbContext = scope.ServiceProvider.GetRequiredService(); var pluginManager = scope.ServiceProvider.GetRequiredService(); @@ -119,6 +145,7 @@ public class JobQueueProcessor : BackgroundService { _logger.LogWarning("JobQueueProcessor: Layer {LayerId} not found, marking job as failed", job.LayerId); await queueService.MarkJobFailedAsync(job.Id, "Layer not found"); + job.Status = JobStatus.Failed; return; } @@ -131,6 +158,7 @@ public class JobQueueProcessor : BackgroundService { _logger.LogWarning("JobQueueProcessor: Importer {PluginName} not found, marking job as failed", job.PluginName); await queueService.MarkJobFailedAsync(job.Id, $"Importer {job.PluginName} not found"); + job.Status = JobStatus.Failed; return; } @@ -146,10 +174,11 @@ public class JobQueueProcessor : BackgroundService { _logger.LogWarning("JobQueueProcessor: Processor {PluginName} not found, marking job as failed", job.PluginName); await queueService.MarkJobFailedAsync(job.Id, $"Processor {job.PluginName} not found"); + job.Status = JobStatus.Failed; return; } - _logger.LogInformation("JobQueueProcessor: Executing process for layer {LayerName} with plugin {PluginName}", + _logger.LogInformation("JobQueueProcessor: Executing process for layer {LayerName} with processor {PluginName}", layer.Name, job.PluginName); processor.Process(layer); @@ -160,6 +189,7 @@ public class JobQueueProcessor : BackgroundService } await queueService.MarkJobCompletedAsync(job.Id); + job.Status = JobStatus.Completed; _logger.LogInformation("JobQueueProcessor: Successfully completed {JobType} for layer {LayerName}", job.JobType, layer.Name); @@ -176,10 +206,12 @@ public class JobQueueProcessor : BackgroundService if (IsRetriableError(ex)) { await queueService.MarkJobForRetryAsync(job.Id, ex.Message); + job.Status = JobStatus.Retrying; } else { await queueService.MarkJobFailedAsync(job.Id, ex.Message); + job.Status = JobStatus.Failed; } } } diff --git a/src/Backend/DiunaBI.Plugins.Morska/Processors/T1R1Processor.cs b/src/Backend/DiunaBI.Plugins.Morska/Processors/T1R1Processor.cs index f7a2ebd..7b818b3 100644 --- a/src/Backend/DiunaBI.Plugins.Morska/Processors/T1R1Processor.cs +++ b/src/Backend/DiunaBI.Plugins.Morska/Processors/T1R1Processor.cs @@ -237,8 +237,8 @@ public class T1R1Processor : MorskaBaseProcessor { var dataSource = _db.Layers .Where(x => x.Type == LayerType.Processed && - !x.IsDeleted && !x.IsCancelled && - x.Name != null && x.Name.Contains($"{Year}/{month:D2}-{source.Desc1}-T3")) + !x.IsDeleted && !x.IsCancelled && + x.Name != null && x.Name.Contains($"{Year}/{month:D2}-{source.Desc1}-T3")) .Include(x => x.Records) .AsNoTracking() .FirstOrDefault(); diff --git a/src/Backend/DiunaBI.Plugins.Morska/Processors/T3MultiSourceSummaryProcessor.cs b/src/Backend/DiunaBI.Plugins.Morska/Processors/T3MultiSourceSummaryProcessor.cs index ce7247a..ff5198c 100644 --- a/src/Backend/DiunaBI.Plugins.Morska/Processors/T3MultiSourceSummaryProcessor.cs +++ b/src/Backend/DiunaBI.Plugins.Morska/Processors/T3MultiSourceSummaryProcessor.cs @@ -181,8 +181,8 @@ public class T3MultiSourceSummaryProcessor : MorskaBaseProcessor var dataSources = Sources!.Select(source => _db.Layers .Where(x => x.Type == LayerType.Processed && - !x.IsDeleted && !x.IsCancelled && - x.Name != null && x.Name.Contains($"{Year}/{Month:D2}-{source.Desc1}-T3")) + !x.IsDeleted && !x.IsCancelled && + x.Name != null && x.Name.Contains($"{Year}/{Month:D2}-{source.Desc1}-T3")) .Include(x => x.Records) .AsNoTracking() .FirstOrDefault()) diff --git a/src/Backend/DiunaBI.Plugins.Morska/Processors/T3MultiSourceYearSummaryProcessor.cs b/src/Backend/DiunaBI.Plugins.Morska/Processors/T3MultiSourceYearSummaryProcessor.cs index 66d1a13..c50cffe 100644 --- a/src/Backend/DiunaBI.Plugins.Morska/Processors/T3MultiSourceYearSummaryProcessor.cs +++ b/src/Backend/DiunaBI.Plugins.Morska/Processors/T3MultiSourceYearSummaryProcessor.cs @@ -171,8 +171,8 @@ public class T3MultiSourceYearSummaryProcessor : MorskaBaseProcessor var dataSources = Sources!.Select(source => _db.Layers .Where(x => x.Type == LayerType.Processed && - !x.IsDeleted && !x.IsCancelled && - x.Name != null && x.Name.Contains($"{Year}/13-{source.Desc1}-T3")) + !x.IsDeleted && !x.IsCancelled && + x.Name != null && x.Name.Contains($"{Year}/13-{source.Desc1}-T3")) .Include(x => x.Records) .AsNoTracking() .FirstOrDefault()) diff --git a/src/Backend/DiunaBI.Plugins.Morska/Processors/T3SourceYearSummaryProcessor.cs b/src/Backend/DiunaBI.Plugins.Morska/Processors/T3SourceYearSummaryProcessor.cs index 2b1f4da..00c0343 100644 --- a/src/Backend/DiunaBI.Plugins.Morska/Processors/T3SourceYearSummaryProcessor.cs +++ b/src/Backend/DiunaBI.Plugins.Morska/Processors/T3SourceYearSummaryProcessor.cs @@ -163,8 +163,8 @@ public class T3SourceYearSummaryProcessor : MorskaBaseProcessor { var dataSource = _db.Layers .Where(x => x.Type == LayerType.Processed && - !x.IsDeleted && !x.IsCancelled && - x.Name != null && x.Name.Contains($"{Year}/{month:D2}-{Source}-T3")) + !x.IsDeleted && !x.IsCancelled && + x.Name != null && x.Name.Contains($"{Year}/{month:D2}-{Source}-T3")) .Include(x => x.Records) .AsNoTracking() .FirstOrDefault(); diff --git a/src/Backend/DiunaBI.Plugins.Morska/Processors/T4R2Processor.cs b/src/Backend/DiunaBI.Plugins.Morska/Processors/T4R2Processor.cs index 357e877..6f5271d 100644 --- a/src/Backend/DiunaBI.Plugins.Morska/Processors/T4R2Processor.cs +++ b/src/Backend/DiunaBI.Plugins.Morska/Processors/T4R2Processor.cs @@ -287,8 +287,8 @@ public class T4R2Processor : MorskaBaseProcessor { return _db.Layers .Where(x => x.Type == LayerType.Processed && - !x.IsDeleted && !x.IsCancelled && - x.Name != null && x.Name.Contains($"{Year}/{month:D2}-{source.Desc1}-T")) + !x.IsDeleted && !x.IsCancelled && + x.Name != null && x.Name.Contains($"{Year}/{month:D2}-{source.Desc1}-T")) .Include(x => x.Records) .AsNoTracking() .FirstOrDefault(); diff --git a/src/Backend/DiunaBI.Plugins.Morska/Processors/T5LastValuesProcessor.cs b/src/Backend/DiunaBI.Plugins.Morska/Processors/T5LastValuesProcessor.cs index 4f290dd..7354601 100644 --- a/src/Backend/DiunaBI.Plugins.Morska/Processors/T5LastValuesProcessor.cs +++ b/src/Backend/DiunaBI.Plugins.Morska/Processors/T5LastValuesProcessor.cs @@ -195,7 +195,7 @@ public class T5LastValuesProcessor : MorskaBaseProcessor var dataSources = _db.Layers .Include(x => x.Records) .Where(x => x.ParentId == sourceImportWorker.Id && - !x.IsDeleted && !x.IsCancelled) + !x.IsDeleted && !x.IsCancelled) .OrderByDescending(x => x.CreatedAt) .AsNoTracking() .ToList(); diff --git a/src/Backend/DiunaBI.WebAPI/Controllers/LayersController.cs b/src/Backend/DiunaBI.WebAPI/Controllers/LayersController.cs index 0158348..bc60ece 100644 --- a/src/Backend/DiunaBI.WebAPI/Controllers/LayersController.cs +++ b/src/Backend/DiunaBI.WebAPI/Controllers/LayersController.cs @@ -676,147 +676,19 @@ public class LayersController : Controller } } } - - [HttpGet] - [Route("GetImportWorkers")] - [AllowAnonymous] - public async Task GetImportWorkers() - { - try - { - var importWorkerLayers = await _db.Layers - .Include(x => x.Records) - .Where(x => - x.Records!.Any(y => y.Code == "Type" && y.Desc1 == "ImportWorker") && - x.Records!.Any(y => y.Code == "IsEnabled" && y.Desc1 == "True") - && x.Number == 5579 - ) - .OrderBy(x => x.CreatedAt) - .AsNoTracking() - .ToListAsync(); - - _logger.LogInformation("GetImportWorkers: Found {LayerCount} import worker layers to queue", - importWorkerLayers.Count); - - int queuedCount = 0; - - foreach (var importWorker in importWorkerLayers) - { - var pluginName = importWorker.Records!.FirstOrDefault(x => x.Code == "Plugin")?.Desc1; - if (string.IsNullOrEmpty(pluginName)) - { - _logger.LogWarning("GetImportWorkers: No plugin name found for layer {LayerName} ({LayerId}), skipping", - importWorker.Name, importWorker.Id); - continue; - } - - // Check if plugin exists - var importer = _pluginManager.GetImporter(pluginName); - if (importer == null) - { - _logger.LogWarning("GetImportWorkers: Importer {PluginName} not found for layer {LayerName} ({LayerId}), skipping", - pluginName, importWorker.Name, importWorker.Id); - continue; - } - - var job = new QueueJob - { - LayerId = importWorker.Id, - LayerName = importWorker.Name ?? "Unknown", - PluginName = pluginName, - JobType = JobType.Import, - Priority = 0, // All imports have same priority - MaxRetries = 5, - CreatedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D"), - ModifiedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D") - }; - - await _queueService.EnqueueJobAsync(job); - queuedCount++; - - _logger.LogDebug("GetImportWorkers: Queued import job for layer {LayerName} ({LayerId}) with plugin {PluginName}", - importWorker.Name, importWorker.Id, pluginName); - } - - var totalQueueSize = await _queueService.GetQueueCountAsync(); - - _logger.LogInformation("GetImportWorkers: Successfully queued {QueuedCount} import jobs. Total queue size: {QueueSize}", - queuedCount, totalQueueSize); - - return Ok(new { - Message = $"Queued {queuedCount} import jobs", - QueuedJobs = queuedCount, - TotalQueueSize = totalQueueSize, - SkippedLayers = importWorkerLayers.Count - queuedCount - }); - } - catch (Exception e) - { - _logger.LogError(e, "GetImportWorkers: Error queuing import workers"); - return BadRequest(e.ToString()); - } - } - [HttpGet] [Route("CheckProcessors")] [AllowAnonymous] public IActionResult CheckProcessors() { - // get list od all enabled processors and check if they has record 'Plugin' - var enabledProcessors = _db.Layers - .Include(x => x.Records) - .Where(x => - x.Records!.Any(y => y.Code == "Type" && y.Desc1 == "ProcessWorker") && - x.Records!.Any(y => y.Code == "IsEnabled" && y.Desc1 == "True") - ) - .OrderByDescending(x => x.CreatedAt) - .AsNoTracking() - .ToList(); + // get - _logger.LogInformation("CheckProcessors: Found {ProcessorCount} enabled processors", enabledProcessors.Count); - foreach (var processor in enabledProcessors) - { - var pluginRecord = processor.Records!.FirstOrDefault(x => x.Code == "Plugin"); - if (pluginRecord == null) - { - _logger.LogWarning("CheckProcessors: No Plugin record found for processor {ProcessorName} ({ProcessorId}), skipping", - processor.Name, processor.Id); - continue; - } - - var pluginName = pluginRecord.Desc1; - if (string.IsNullOrEmpty(pluginName)) - { - _logger.LogWarning("CheckProcessors: Empty Plugin name for processor {ProcessorName} ({ProcessorId}), skipping", - processor.Name, processor.Id); - continue; - } - - var processorInstance = _pluginManager.GetProcessor(pluginName); - if (processorInstance == null) - { - _logger.LogWarning("CheckProcessors: Processor {PluginName} not found for {ProcessorName} ({ProcessorId}), skipping", - pluginName, processor.Name, processor.Id); - continue; - } - } - _logger.LogInformation("CheckProcessors: Completed checking processors"); - - return Ok(); - } - - [HttpGet] - [Route("AddPluginName")] - [AllowAnonymous] - public IActionResult AddPluginName() - { - + /* var importWorkerLayers = _db.Layers .Include(x => x.Records) .Where(x => - x.Records!.Any(y => y.Code == "Type" && y.Desc1 == "ProcessWorker") && - x.Records!.Any(y => y.Code == "IsEnabled" && y.Desc1 == "True") && - x.Records!.Any(y => y.Code == "ProcessType" && y.Desc1 == "T1-R3") + x.Records!.Any(y => y.Code == "Type" && y.Desc1 == "ImportWorker") && + x.Records!.Any(y => y.Code == "ImportType" && y.Desc1 == "Import-D3") ) .OrderByDescending(x => x.CreatedAt) .AsNoTracking() @@ -829,20 +701,21 @@ public class LayersController : Controller Id = Guid.NewGuid(), LayerId = importWorker.Id, Code = "Plugin", - Desc1 = "Morska.Process.T1.R3", + Desc1 = "Morska.Import.D3", CreatedAt = DateTime.UtcNow, ModifiedAt = DateTime.UtcNow, CreatedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D"), ModifiedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D") }; - _db.Records.Add(record); + //_db.Records.Add(record); } - _db.SaveChanges(); - + //_db.SaveChanges(); + */ return Ok(); } + private static void WriteToConsole(params string[] messages) { foreach (var message in messages) @@ -936,4 +809,277 @@ public class LayersController : Controller throw; } } + + [HttpGet] + [Route("EnqueueImportWorkers/{apiKey}")] + [AllowAnonymous] + public async Task EnqueueImportWorkers(string apiKey, [FromQuery] Guid? layerId = null) + { + if (Request.Host.Value != _configuration["apiLocalUrl"] || apiKey != _configuration["apiKey"]) + { + _logger.LogWarning("EnqueueImportWorkers: Unauthorized request with apiKey {ApiKey}", apiKey); + return Unauthorized(); + } + + try + { + var query = _db.Layers + .Include(x => x.Records) + .Where(x => + x.Records!.Any(y => y.Code == "Type" && y.Desc1 == "ImportWorker") && + x.Records!.Any(y => y.Code == "IsEnabled" && y.Desc1 == "True") && + !x.IsDeleted && !x.IsCancelled + ); + + // If specific layerId is provided, filter to that layer only + if (layerId.HasValue) + { + query = query.Where(x => x.Id == layerId.Value); + } + + var importWorkerLayers = await query + .OrderBy(x => x.CreatedAt) + .AsNoTracking() + .ToListAsync(); + + _logger.LogInformation("EnqueueImportWorkers: Found {LayerCount} import worker layers to queue{LayerFilter}", + importWorkerLayers.Count, layerId.HasValue ? $" (filtered by LayerId: {layerId})" : ""); + + if (importWorkerLayers.Count == 0) + { + return Ok(new + { + Message = "No import workers found to queue", + QueuedJobs = 0, + TotalQueueSize = await _queueService.GetQueueCountAsync(), + SkippedLayers = 0 + }); + } + + int queuedCount = 0; + + foreach (var importWorker in importWorkerLayers) + { + var pluginName = importWorker.Records!.FirstOrDefault(x => x.Code == "Plugin")?.Desc1; + if (string.IsNullOrEmpty(pluginName)) + { + _logger.LogWarning("EnqueueImportWorkers: No plugin name found for layer {LayerName} ({LayerId}), skipping", + importWorker.Name, importWorker.Id); + continue; + } + + // Check if plugin exists + var importer = _pluginManager.GetImporter(pluginName); + if (importer == null) + { + _logger.LogWarning("EnqueueImportWorkers: Importer {PluginName} not found for layer {LayerName} ({LayerId}), skipping", + pluginName, importWorker.Name, importWorker.Id); + continue; + } + + var job = new QueueJob + { + LayerId = importWorker.Id, + LayerName = importWorker.Name ?? "Unknown", + PluginName = pluginName, + JobType = JobType.Import, + Priority = 0, // All imports have same priority + MaxRetries = 5, + CreatedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D"), + ModifiedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D") + }; + + await _queueService.EnqueueJobAsync(job); + queuedCount++; + + _logger.LogDebug("EnqueueImportWorkers: Queued import job for layer {LayerName} ({LayerId}) with plugin {PluginName}", + importWorker.Name, importWorker.Id, pluginName); + } + + var totalQueueSize = await _queueService.GetQueueCountAsync(); + + _logger.LogInformation("EnqueueImportWorkers: Successfully queued {QueuedCount} import jobs. Total queue size: {QueueSize}", + queuedCount, totalQueueSize); + + return Ok(new + { + Message = $"Queued {queuedCount} import jobs", + QueuedJobs = queuedCount, + TotalQueueSize = totalQueueSize, + SkippedLayers = importWorkerLayers.Count - queuedCount + }); + } + catch (Exception e) + { + _logger.LogError(e, "EnqueueImportWorkers: Error queuing import workers"); + return BadRequest(e.ToString()); + } + } + + [HttpGet] + [Route("EnqueueProcessWorkers/{apiKey}")] + [AllowAnonymous] + public async Task EnqueueProcessWorkers(string apiKey, [FromQuery] Guid? layerId = null) + { + if (Request.Host.Value != _configuration["apiLocalUrl"] || apiKey != _configuration["apiKey"]) + { + _logger.LogWarning("EnqueueProcessWorkers: Unauthorized request with apiKey {ApiKey}", apiKey); + return Unauthorized(); + } + + try + { + var query = _db.Layers + .Include(x => x.Records) + .Where(x => + x.Records!.Any(y => y.Code == "Type" && y.Desc1 == "ProcessWorker") && + x.Records!.Any(y => y.Code == "IsEnabled" && y.Desc1 == "True") && + !x.IsDeleted && !x.IsCancelled + ); + + // If specific layerId is provided, filter to that layer only + if (layerId.HasValue) + { + query = query.Where(x => x.Id == layerId.Value); + } + + var processWorkerLayers = await query + .OrderBy(x => x.CreatedAt) + .AsNoTracking() + .ToListAsync(); + + _logger.LogInformation("EnqueueProcessWorkers: Found {LayerCount} process worker layers to queue{LayerFilter}", + processWorkerLayers.Count, layerId.HasValue ? $" (filtered by LayerId: {layerId})" : ""); + + if (processWorkerLayers.Count == 0) + { + return Ok(new + { + Message = "No process workers found to queue", + QueuedJobs = 0, + TotalQueueSize = await _queueService.GetQueueCountAsync(), + SkippedLayers = 0 + }); + } + + int queuedCount = 0; + + foreach (var processWorker in processWorkerLayers) + { + var pluginName = processWorker.Records!.FirstOrDefault(x => x.Code == "Plugin")?.Desc1; + if (string.IsNullOrEmpty(pluginName)) + { + _logger.LogWarning("EnqueueProcessWorkers: No plugin name found for layer {LayerName} ({LayerId}), skipping", + processWorker.Name, processWorker.Id); + continue; + } + + var processorType = processWorker.Records!.FirstOrDefault(x => x.Code == "ProcessorType")?.Desc1; + if (string.IsNullOrEmpty(processorType)) + { + _logger.LogWarning("EnqueueProcessWorkers: No processor type found for layer {LayerName} ({LayerId}), skipping", + processWorker.Name, processWorker.Id); + continue; + } + + // Check if processor exists + var processor = _pluginManager.GetProcessor(processorType); + if (processor == null) + { + _logger.LogWarning("EnqueueProcessWorkers: Processor {ProcessorType} not found for layer {LayerName} ({LayerId}), skipping", + processorType, processWorker.Name, processWorker.Id); + continue; + } + + // Get priority from ProcessWorker record, default to 10 if not found + var priorityStr = processWorker.Records!.FirstOrDefault(x => x.Code == "Priority")?.Desc1; + var priority = 10; // Default priority + if (!string.IsNullOrEmpty(priorityStr) && int.TryParse(priorityStr, out var parsedPriority)) + { + priority = parsedPriority; + } + + var job = new QueueJob + { + LayerId = processWorker.Id, + LayerName = processWorker.Name ?? "Unknown", + PluginName = processorType, // Use processorType as PluginName for process jobs + JobType = JobType.Process, + Priority = priority, + MaxRetries = 3, + CreatedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D"), + ModifiedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D") + }; + + await _queueService.EnqueueJobAsync(job); + queuedCount++; + + _logger.LogDebug("EnqueueProcessWorkers: Queued process job for layer {LayerName} ({LayerId}) with processor {ProcessorType}, priority {Priority}", + processWorker.Name, processWorker.Id, processorType, priority); + } + + var totalQueueSize = await _queueService.GetQueueCountAsync(); + + _logger.LogInformation("EnqueueProcessWorkers: Successfully queued {QueuedCount} process jobs. Total queue size: {QueueSize}", + queuedCount, totalQueueSize); + + return Ok(new + { + Message = $"Queued {queuedCount} process jobs", + QueuedJobs = queuedCount, + TotalQueueSize = totalQueueSize, + SkippedLayers = processWorkerLayers.Count - queuedCount + }); + } + catch (Exception e) + { + _logger.LogError(e, "EnqueueProcessWorkers: Error queuing process workers"); + return BadRequest(e.ToString()); + } + } + + [HttpGet] + [Route("RunQueueJobs/{apiKey}")] + [AllowAnonymous] + public async Task RunQueueJobs(string apiKey) + { + if (Request.Host.Value != _configuration["apiLocalUrl"] || apiKey != _configuration["apiKey"]) + { + _logger.LogWarning("RunQueueJobs: Unauthorized request with apiKey {ApiKey}", apiKey); + return Unauthorized(); + } + + try + { + var queueSize = await _queueService.GetQueueCountAsync(); + + if (queueSize == 0) + { + return Ok(new + { + Message = "Queue is empty", + QueueSize = 0, + Status = "No jobs to process" + }); + } + + _logger.LogInformation("RunQueueJobs: Triggering queue processing for {QueueSize} jobs", queueSize); + + // PRZYWRÓĆ SINGLETON ACCESS: + var queueProcessor = HttpContext.RequestServices.GetRequiredService(); + queueProcessor.TriggerProcessing(); + + return Ok(new + { + Message = $"Queue processing triggered for {queueSize} jobs", + QueueSize = queueSize, + Status = "Processing started in background" + }); + } + catch (Exception e) + { + _logger.LogError(e, "RunQueueJobs: Error triggering queue processing"); + return BadRequest(e.ToString()); + } + } } \ No newline at end of file diff --git a/src/Backend/DiunaBI.WebAPI/Program.cs b/src/Backend/DiunaBI.WebAPI/Program.cs index 2f29ace..99b7654 100644 --- a/src/Backend/DiunaBI.WebAPI/Program.cs +++ b/src/Backend/DiunaBI.WebAPI/Program.cs @@ -1,5 +1,3 @@ -using Google.Apis.Auth.OAuth2; -using Google.Cloud.Firestore; using Microsoft.AspNetCore.Authentication.JwtBearer; using Microsoft.EntityFrameworkCore; using Microsoft.IdentityModel.Tokens; @@ -14,7 +12,6 @@ using DiunaBI.Core.Interfaces; var builder = WebApplication.CreateBuilder(args); -// ✅ SERILOG TYLKO DLA PRODUKCJI if (builder.Environment.IsProduction()) { builder.Host.UseSerilog((context, configuration) => @@ -30,6 +27,7 @@ if (builder.Environment.IsProduction()) } var connectionString = builder.Configuration.GetConnectionString("SQLDatabase"); + builder.Services.AddDbContext(x => { x.UseSqlServer(connectionString); @@ -70,25 +68,24 @@ builder.Services.AddAuthentication(options => IssuerSigningKey = new SymmetricSecurityKey(Encoding.UTF8.GetBytes(builder.Configuration["Secret"]!)) }; }); -builder.Services.AddAuthentication(); // Queue services builder.Services.AddScoped(); -builder.Services.AddHostedService(); +builder.Services.AddSingleton(); -// Zarejestruj Google Sheets dependencies +// Google Sheets dependencies builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(provider => { var googleSheetsHelper = provider.GetRequiredService(); var valuesResource = googleSheetsHelper.Service?.Spreadsheets.Values; - + if (valuesResource == null) { throw new InvalidOperationException("Google Sheets Service is not initialized properly"); } - + return valuesResource; }); @@ -96,7 +93,6 @@ builder.Services.AddSingleton(); var app = builder.Build(); -// ✅ SERILOG REQUEST LOGGING TYLKO DLA PRODUKCJI if (app.Environment.IsProduction()) { app.UseSerilogRequestLogging(options => @@ -106,25 +102,25 @@ if (app.Environment.IsProduction()) { diagnosticContext.Set("RequestHost", httpContext.Request.Host.Value); diagnosticContext.Set("RequestScheme", httpContext.Request.Scheme); - + var userAgent = httpContext.Request.Headers.UserAgent.FirstOrDefault(); if (!string.IsNullOrEmpty(userAgent)) { diagnosticContext.Set("UserAgent", userAgent); } - + diagnosticContext.Set("RemoteIP", httpContext.Connection.RemoteIpAddress?.ToString() ?? "unknown"); diagnosticContext.Set("RequestContentType", httpContext.Request.ContentType ?? "none"); }; }); } +// Plugin initialization var pluginManager = app.Services.GetRequiredService(); var executablePath = Assembly.GetExecutingAssembly().Location; var executableDir = Path.GetDirectoryName(executablePath)!; var pluginsPath = Path.Combine(executableDir, "Plugins"); -// ✅ RÓŻNE LOGGERY W ZALEŻNOŚCI OD ŚRODOWISKA if (app.Environment.IsProduction()) { Log.Information("Starting DiunaBI application"); @@ -165,8 +161,7 @@ app.MapControllers(); app.Run(); -// ✅ SERILOG CLEANUP TYLKO DLA PRODUKCJI if (app.Environment.IsProduction()) { Log.CloseAndFlush(); -} +} \ No newline at end of file diff --git a/tools/http-tests/AutoImport.http b/tools/http-tests/AutoImport.http index 53b13da..a5f2d95 100644 --- a/tools/http-tests/AutoImport.http +++ b/tools/http-tests/AutoImport.http @@ -1,3 +1,3 @@ ### -GET http://localhost:5400/api/Layers/GetImportWorkers +GET http://localhost:5400/api/Layers/AddPriorityRecords