From 248106a2390ea713bee0cd6d8801e1005741c13d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zieli=C5=84ski?= Date: Tue, 2 Dec 2025 15:21:27 +0100 Subject: [PATCH] Plugins little refactor --- .../Plugins/BaseDataExporter.cs | 24 +- .../Plugins/BaseDataImporter.cs | 26 +-- .../Plugins/BaseDataProcessor.cs | 26 +-- .../Services/JobSchedulerService.cs | 216 ++++++++++++++++++ .../Services/PluginManager.cs | 51 ++++- 5 files changed, 295 insertions(+), 48 deletions(-) rename DiunaBI.Plugins.Morska/Exporters/MorskaBaseExporter.cs => DiunaBI.Infrastructure/Plugins/BaseDataExporter.cs (69%) rename DiunaBI.Plugins.Morska/Importers/MorskaBaseImporter.cs => DiunaBI.Infrastructure/Plugins/BaseDataImporter.cs (56%) rename DiunaBI.Plugins.Morska/Processors/MorskaBaseProcessor.cs => DiunaBI.Infrastructure/Plugins/BaseDataProcessor.cs (70%) create mode 100644 DiunaBI.Infrastructure/Services/JobSchedulerService.cs diff --git a/DiunaBI.Plugins.Morska/Exporters/MorskaBaseExporter.cs b/DiunaBI.Infrastructure/Plugins/BaseDataExporter.cs similarity index 69% rename from DiunaBI.Plugins.Morska/Exporters/MorskaBaseExporter.cs rename to DiunaBI.Infrastructure/Plugins/BaseDataExporter.cs index de10e91..6dca339 100644 --- a/DiunaBI.Plugins.Morska/Exporters/MorskaBaseExporter.cs +++ b/DiunaBI.Infrastructure/Plugins/BaseDataExporter.cs @@ -1,11 +1,13 @@ -using DiunaBI.Domain.Entities; -using DiunaBI.Infrastructure.Interfaces; - -namespace DiunaBI.Plugins.Morska.Exporters; - -public abstract class MorskaBaseExporter : IDataExporter -{ - public abstract string ExporterType { get; } - public virtual bool CanExport(string exporterType) => ExporterType == exporterType; - public abstract void Export(Layer layer); -} \ No newline at end of file +using DiunaBI.Domain.Entities; +using DiunaBI.Infrastructure.Interfaces; + +namespace DiunaBI.Infrastructure.Plugins; + +public abstract class BaseDataExporter : IDataExporter +{ + public abstract string ExporterType { get; } + + public virtual bool CanExport(string exporterType) => ExporterType == exporterType; + + public abstract void Export(Layer layer); +} diff --git a/DiunaBI.Plugins.Morska/Importers/MorskaBaseImporter.cs b/DiunaBI.Infrastructure/Plugins/BaseDataImporter.cs similarity index 56% rename from DiunaBI.Plugins.Morska/Importers/MorskaBaseImporter.cs rename to DiunaBI.Infrastructure/Plugins/BaseDataImporter.cs index 8189177..e0a84af 100644 --- a/DiunaBI.Plugins.Morska/Importers/MorskaBaseImporter.cs +++ b/DiunaBI.Infrastructure/Plugins/BaseDataImporter.cs @@ -1,13 +1,13 @@ -using DiunaBI.Domain.Entities; -using DiunaBI.Infrastructure.Interfaces; - -namespace DiunaBI.Plugins.Morska.Importers; - -public abstract class MorskaBaseImporter : IDataImporter -{ - public abstract string ImporterType { get; } - - public virtual bool CanImport(string importerType) => ImporterType == importerType; - - public abstract void Import(Layer importWorker); -} \ No newline at end of file +using DiunaBI.Domain.Entities; +using DiunaBI.Infrastructure.Interfaces; + +namespace DiunaBI.Infrastructure.Plugins; + +public abstract class BaseDataImporter : IDataImporter +{ + public abstract string ImporterType { get; } + + public virtual bool CanImport(string importerType) => ImporterType == importerType; + + public abstract void Import(Layer importWorker); +} diff --git a/DiunaBI.Plugins.Morska/Processors/MorskaBaseProcessor.cs b/DiunaBI.Infrastructure/Plugins/BaseDataProcessor.cs similarity index 70% rename from DiunaBI.Plugins.Morska/Processors/MorskaBaseProcessor.cs rename to DiunaBI.Infrastructure/Plugins/BaseDataProcessor.cs index 7e01f2c..5fa6074 100644 --- a/DiunaBI.Plugins.Morska/Processors/MorskaBaseProcessor.cs +++ b/DiunaBI.Infrastructure/Plugins/BaseDataProcessor.cs @@ -1,13 +1,13 @@ -using DiunaBI.Domain.Entities; -using DiunaBI.Infrastructure.Interfaces; - -namespace DiunaBI.Plugins.Morska.Processors; - -public abstract class MorskaBaseProcessor : IDataProcessor -{ - public abstract string ProcessorType { get; } - - public virtual bool CanProcess(string processorType) => ProcessorType == processorType; - - public abstract void Process(Layer processWorker); -} \ No newline at end of file +using DiunaBI.Domain.Entities; +using DiunaBI.Infrastructure.Interfaces; + +namespace DiunaBI.Infrastructure.Plugins; + +public abstract class BaseDataProcessor : IDataProcessor +{ + public abstract string ProcessorType { get; } + + public virtual bool CanProcess(string processorType) => ProcessorType == processorType; + + public abstract void Process(Layer processWorker); +} diff --git a/DiunaBI.Infrastructure/Services/JobSchedulerService.cs b/DiunaBI.Infrastructure/Services/JobSchedulerService.cs new file mode 100644 index 0000000..f4dc19b --- /dev/null +++ b/DiunaBI.Infrastructure/Services/JobSchedulerService.cs @@ -0,0 +1,216 @@ +using DiunaBI.Domain.Entities; +using DiunaBI.Infrastructure.Data; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; + +namespace DiunaBI.Infrastructure.Services; + +public class JobSchedulerService +{ + private readonly AppDbContext _db; + private readonly ILogger _logger; + + public JobSchedulerService(AppDbContext db, ILogger logger) + { + _db = db; + _logger = logger; + } + + public async Task ScheduleImportJobsAsync(string? nameFilter = null) + { + _logger.LogInformation("JobScheduler: Starting import job scheduling with filter: {NameFilter}", nameFilter ?? "none"); + + var query = _db.Layers + .Include(x => x.Records) + .Where(x => + x.Records!.Any(r => r.Code == "Type" && r.Desc1 == "ImportWorker") && + x.Records!.Any(r => r.Code == "IsEnabled" && r.Desc1 == "True") + ); + + if (!string.IsNullOrEmpty(nameFilter)) + { + query = query.Where(x => x.Name != null && x.Name.Contains(nameFilter)); + } + + var importWorkers = await query + .OrderBy(x => x.CreatedAt) + .AsNoTracking() + .ToListAsync(); + + _logger.LogInformation("JobScheduler: Found {Count} import workers to schedule", importWorkers.Count); + + var jobsCreated = 0; + + foreach (var worker in importWorkers) + { + try + { + var plugin = worker.Records?.FirstOrDefault(r => r.Code == "Plugin")?.Desc1; + if (string.IsNullOrEmpty(plugin)) + { + _logger.LogWarning("JobScheduler: Import worker {LayerName} ({LayerId}) has no Plugin configured, skipping", + worker.Name, worker.Id); + continue; + } + + // Get priority from config (default: 50) + var priorityStr = worker.Records?.FirstOrDefault(r => r.Code == "Priority")?.Desc1; + var priority = int.TryParse(priorityStr, out var p) ? p : 50; + + // Get max retries from config (default: 3) + var maxRetriesStr = worker.Records?.FirstOrDefault(r => r.Code == "MaxRetries")?.Desc1; + var maxRetries = int.TryParse(maxRetriesStr, out var mr) ? mr : 3; + + // Check if there's already a pending/running job for this layer + var existingJob = await _db.QueueJobs + .Where(j => j.LayerId == worker.Id && + (j.Status == JobStatus.Pending || j.Status == JobStatus.Running)) + .FirstOrDefaultAsync(); + + if (existingJob != null) + { + _logger.LogDebug("JobScheduler: Job already exists for {LayerName} ({LayerId}), status: {Status}", + worker.Name, worker.Id, existingJob.Status); + continue; + } + + var job = new QueueJob + { + Id = Guid.NewGuid(), + LayerId = worker.Id, + LayerName = worker.Name ?? "Unknown", + PluginName = plugin, + JobType = JobType.Import, + Priority = priority, + MaxRetries = maxRetries, + Status = JobStatus.Pending, + CreatedAt = DateTime.UtcNow, + CreatedAtUtc = DateTime.UtcNow, + ModifiedAtUtc = DateTime.UtcNow, + CreatedById = Guid.Empty, // System user + ModifiedById = Guid.Empty + }; + + _db.QueueJobs.Add(job); + jobsCreated++; + + _logger.LogInformation("JobScheduler: Created import job for {LayerName} ({LayerId}) with priority {Priority}", + worker.Name, worker.Id, priority); + } + catch (Exception ex) + { + _logger.LogError(ex, "JobScheduler: Failed to create job for {LayerName} ({LayerId})", + worker.Name, worker.Id); + } + } + + if (jobsCreated > 0) + { + await _db.SaveChangesAsync(); + _logger.LogInformation("JobScheduler: Successfully created {Count} import jobs", jobsCreated); + } + + return jobsCreated; + } + + public async Task ScheduleProcessJobsAsync() + { + _logger.LogInformation("JobScheduler: Starting process job scheduling"); + + var processWorkers = await _db.Layers + .Include(x => x.Records) + .Where(x => + x.Records!.Any(r => r.Code == "Type" && r.Desc1 == "ProcessWorker") && + x.Records!.Any(r => r.Code == "IsEnabled" && r.Desc1 == "True") + ) + .OrderBy(x => x.CreatedAt) + .AsNoTracking() + .ToListAsync(); + + _logger.LogInformation("JobScheduler: Found {Count} process workers to schedule", processWorkers.Count); + + var jobsCreated = 0; + + foreach (var worker in processWorkers) + { + try + { + var plugin = worker.Records?.FirstOrDefault(r => r.Code == "Plugin")?.Desc1; + if (string.IsNullOrEmpty(plugin)) + { + _logger.LogWarning("JobScheduler: Process worker {LayerName} ({LayerId}) has no Plugin configured, skipping", + worker.Name, worker.Id); + continue; + } + + // Get priority from config (default: 100 for processes - higher than imports) + var priorityStr = worker.Records?.FirstOrDefault(r => r.Code == "Priority")?.Desc1; + var priority = int.TryParse(priorityStr, out var p) ? p : 100; + + // Get max retries from config (default: 3) + var maxRetriesStr = worker.Records?.FirstOrDefault(r => r.Code == "MaxRetries")?.Desc1; + var maxRetries = int.TryParse(maxRetriesStr, out var mr) ? mr : 3; + + // Check if there's already a pending/running job for this layer + var existingJob = await _db.QueueJobs + .Where(j => j.LayerId == worker.Id && + (j.Status == JobStatus.Pending || j.Status == JobStatus.Running)) + .FirstOrDefaultAsync(); + + if (existingJob != null) + { + _logger.LogDebug("JobScheduler: Job already exists for {LayerName} ({LayerId}), status: {Status}", + worker.Name, worker.Id, existingJob.Status); + continue; + } + + var job = new QueueJob + { + Id = Guid.NewGuid(), + LayerId = worker.Id, + LayerName = worker.Name ?? "Unknown", + PluginName = plugin, + JobType = JobType.Process, + Priority = priority, + MaxRetries = maxRetries, + Status = JobStatus.Pending, + CreatedAt = DateTime.UtcNow, + CreatedAtUtc = DateTime.UtcNow, + ModifiedAtUtc = DateTime.UtcNow, + CreatedById = Guid.Empty, + ModifiedById = Guid.Empty + }; + + _db.QueueJobs.Add(job); + jobsCreated++; + + _logger.LogInformation("JobScheduler: Created process job for {LayerName} ({LayerId}) with priority {Priority}", + worker.Name, worker.Id, priority); + } + catch (Exception ex) + { + _logger.LogError(ex, "JobScheduler: Failed to create job for {LayerName} ({LayerId})", + worker.Name, worker.Id); + } + } + + if (jobsCreated > 0) + { + await _db.SaveChangesAsync(); + _logger.LogInformation("JobScheduler: Successfully created {Count} process jobs", jobsCreated); + } + + return jobsCreated; + } + + public async Task ScheduleAllJobsAsync(string? nameFilter = null) + { + var importCount = await ScheduleImportJobsAsync(nameFilter); + var processCount = await ScheduleProcessJobsAsync(); + + _logger.LogInformation("JobScheduler: Scheduled {ImportCount} import jobs and {ProcessCount} process jobs", + importCount, processCount); + + return importCount + processCount; + } +} diff --git a/DiunaBI.Infrastructure/Services/PluginManager.cs b/DiunaBI.Infrastructure/Services/PluginManager.cs index 710c234..5e0f384 100644 --- a/DiunaBI.Infrastructure/Services/PluginManager.cs +++ b/DiunaBI.Infrastructure/Services/PluginManager.cs @@ -11,7 +11,7 @@ public class PluginManager private readonly IServiceProvider _serviceProvider; private readonly List _processorTypes = new(); private readonly List _importerTypes = new(); - private readonly List _exporters = new(); + private readonly List _exporterTypes = new(); private readonly List _plugins = new(); public PluginManager(ILogger logger, IServiceProvider serviceProvider) @@ -42,10 +42,11 @@ public class PluginManager } } - _logger.LogInformation("Loaded {ProcessorCount} processors and {ImporterCount} importers from {AssemblyCount} assemblies", + _logger.LogInformation("Loaded {ProcessorCount} processors, {ImporterCount} importers, and {ExporterCount} exporters from {AssemblyCount} assemblies", _processorTypes.Count, _importerTypes.Count, - dllFiles.Length); // ZmieƄ z _plugins.Count na assemblyFiles.Length + _exporterTypes.Count, + dllFiles.Length); } private void LoadPluginFromAssembly(string assemblyPath) @@ -70,6 +71,12 @@ public class PluginManager _importerTypes.Add(type); _logger.LogDebug("Registered importer: {Type}", type.Name); // Information -> Debug } + + if (typeof(IDataExporter).IsAssignableFrom(type) && !type.IsInterface && !type.IsAbstract) + { + _exporterTypes.Add(type); + _logger.LogDebug("Registered exporter: {Type}", type.Name); + } } } catch (Exception ex) @@ -84,14 +91,15 @@ public class PluginManager { try { - using var scope = _serviceProvider.CreateScope(); + var scope = _serviceProvider.CreateScope(); var instance = (IDataProcessor)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type); if (instance.CanProcess(processorType)) { - var scopedProvider = _serviceProvider.CreateScope().ServiceProvider; - return (IDataProcessor)ActivatorUtilities.CreateInstance(scopedProvider, type); + return instance; } + + scope.Dispose(); } catch (Exception ex) { @@ -107,14 +115,15 @@ public class PluginManager { try { - using var scope = _serviceProvider.CreateScope(); + var scope = _serviceProvider.CreateScope(); var instance = (IDataImporter)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type); if (instance.CanImport(importerType)) { - var scopedProvider = _serviceProvider.CreateScope().ServiceProvider; - return (IDataImporter)ActivatorUtilities.CreateInstance(scopedProvider, type); + return instance; } + + scope.Dispose(); } catch (Exception ex) { @@ -126,7 +135,27 @@ public class PluginManager public IDataExporter? GetExporter(string exporterType) { - return _exporters.FirstOrDefault(e => e.CanExport(exporterType)); + foreach (var type in _exporterTypes) + { + try + { + var scope = _serviceProvider.CreateScope(); + var instance = (IDataExporter)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type); + + if (instance.CanExport(exporterType)) + { + return instance; + } + + scope.Dispose(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to create exporter instance of type {Type}", type.Name); + } + } + return null; } - public int GetPluginsCount() => _processorTypes.Count + _importerTypes.Count + _exporters.Count; + + public int GetPluginsCount() => _processorTypes.Count + _importerTypes.Count + _exporterTypes.Count; } \ No newline at end of file