JobQueue for import layers

This commit is contained in:
Michał Zieliński
2025-06-07 12:33:33 +02:00
parent e56be55274
commit 787df8b838
8 changed files with 972 additions and 114 deletions

View File

@@ -7,7 +7,7 @@ using Microsoft.EntityFrameworkCore;
using DiunaBI.Core.Models;
using DiunaBI.Core.Database.Context;
using DiunaBI.Core.Services;
using Google.Cloud.Firestore;
using DiunaBI.Core.Interfaces;
namespace DiunaBI.WebAPI.Controllers;
@@ -21,6 +21,7 @@ public class LayersController : Controller
private readonly IConfiguration _configuration;
private readonly PluginManager _pluginManager;
private readonly ILogger<LayersController> _logger;
private readonly IJobQueueService _queueService;
public LayersController(
AppDbContext db,
@@ -28,7 +29,8 @@ public class LayersController : Controller
GoogleDriveHelper googleDriveHelper,
IConfiguration configuration,
PluginManager pluginManager,
ILogger<LayersController> logger
ILogger<LayersController> logger,
IJobQueueService queueService
)
{
_db = db;
@@ -37,6 +39,7 @@ public class LayersController : Controller
_configuration = configuration;
_pluginManager = pluginManager;
_logger = logger;
_queueService = queueService;
}
[HttpGet]
@@ -677,56 +680,81 @@ public class LayersController : Controller
[HttpGet]
[Route("GetImportWorkers")]
[AllowAnonymous]
public IActionResult GetImportWorkers()
public async Task<IActionResult> GetImportWorkers()
{
var importWorkerLayers = _db.Layers
.Include(x => x.Records)
.Where(x =>
x.Records!.Any(y => y.Code == "Type" && y.Desc1 == "ImportWorker") &&
x.Records!.Any(y => y.Code == "IsEnabled" && y.Desc1 == "True")
&& x.Number == 7270
)
.OrderByDescending(x => x.CreatedAt)
.AsNoTracking()
.ToList();
foreach (var importWorker in importWorkerLayers)
try
{
_logger.LogDebug("GetImportWorkers: Found import worker layer {LayerName} ({LayerId})",
importWorker.Name, importWorker.Id);
var pluginName = importWorker.Records!.FirstOrDefault(x => x.Code == "Plugin")?.Desc1;
if (pluginName != null)
var importWorkerLayers = await _db.Layers
.Include(x => x.Records)
.Where(x =>
x.Records!.Any(y => y.Code == "Type" && y.Desc1 == "ImportWorker") &&
x.Records!.Any(y => y.Code == "IsEnabled" && y.Desc1 == "True")
&& x.Number == 5579
)
.OrderBy(x => x.CreatedAt)
.AsNoTracking()
.ToListAsync();
_logger.LogInformation("GetImportWorkers: Found {LayerCount} import worker layers to queue",
importWorkerLayers.Count);
int queuedCount = 0;
foreach (var importWorker in importWorkerLayers)
{
var pluginName = importWorker.Records!.FirstOrDefault(x => x.Code == "Plugin")?.Desc1;
if (string.IsNullOrEmpty(pluginName))
{
_logger.LogWarning("GetImportWorkers: No plugin name found for layer {LayerName} ({LayerId}), skipping",
importWorker.Name, importWorker.Id);
continue;
}
// Check if plugin exists
var importer = _pluginManager.GetImporter(pluginName);
if (importer == null)
{
_logger.LogWarning("GetImportWorkers: Importer {PluginName} not found for layer {LayerName} ({LayerId})",
_logger.LogWarning("GetImportWorkers: Importer {PluginName} not found for layer {LayerName} ({LayerId}), skipping",
pluginName, importWorker.Name, importWorker.Id);
throw new Exception($"Importer {pluginName} not found for layer {importWorker.Name}");
continue;
}
try
var job = new QueueJob
{
_logger.LogInformation("GetImportWorkers: Starting import for layer {LayerName} ({LayerId}) with plugin {PluginName}",
importWorker.Name, importWorker.Id, pluginName);
importer.Import(importWorker);
_logger.LogInformation("GetImportWorkers: Successfully imported layer {LayerName} ({LayerId})",
importWorker.Name, importWorker.Id);
}
catch (Exception e)
{
_logger.LogError(e, "GetImportWorkers: Error importing layer {LayerName} ({LayerId}) with plugin {PluginName}",
importWorker.Name, importWorker.Id, pluginName);
throw;
}
}
else
{
_logger.LogWarning("GetImportWorkers: No plugin name found for import worker layer {LayerName} ({LayerId})",
importWorker.Name, importWorker.Id);
throw new Exception($"No plugin name found for import worker layer {importWorker.Name}");
LayerId = importWorker.Id,
LayerName = importWorker.Name ?? "Unknown",
PluginName = pluginName,
JobType = JobType.Import,
Priority = 0, // All imports have same priority
MaxRetries = 5,
CreatedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D"),
ModifiedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D")
};
await _queueService.EnqueueJobAsync(job);
queuedCount++;
_logger.LogDebug("GetImportWorkers: Queued import job for layer {LayerName} ({LayerId}) with plugin {PluginName}",
importWorker.Name, importWorker.Id, pluginName);
}
var totalQueueSize = await _queueService.GetQueueCountAsync();
_logger.LogInformation("GetImportWorkers: Successfully queued {QueuedCount} import jobs. Total queue size: {QueueSize}",
queuedCount, totalQueueSize);
return Ok(new {
Message = $"Queued {queuedCount} import jobs",
QueuedJobs = queuedCount,
TotalQueueSize = totalQueueSize,
SkippedLayers = importWorkerLayers.Count - queuedCount
});
}
catch (Exception e)
{
_logger.LogError(e, "GetImportWorkers: Error queuing import workers");
return BadRequest(e.ToString());
}
return Ok();
}
[HttpGet]