Plugins little refactor

This commit is contained in:
2025-12-02 15:21:27 +01:00
parent 587d4d66f8
commit 248106a239
5 changed files with 295 additions and 48 deletions

View File

@@ -1,11 +1,13 @@
using DiunaBI.Domain.Entities;
using DiunaBI.Infrastructure.Interfaces;
namespace DiunaBI.Plugins.Morska.Exporters;
namespace DiunaBI.Infrastructure.Plugins;
public abstract class MorskaBaseExporter : IDataExporter
public abstract class BaseDataExporter : IDataExporter
{
public abstract string ExporterType { get; }
public virtual bool CanExport(string exporterType) => ExporterType == exporterType;
public abstract void Export(Layer layer);
}

View File

@@ -1,9 +1,9 @@
using DiunaBI.Domain.Entities;
using DiunaBI.Infrastructure.Interfaces;
namespace DiunaBI.Plugins.Morska.Importers;
namespace DiunaBI.Infrastructure.Plugins;
public abstract class MorskaBaseImporter : IDataImporter
public abstract class BaseDataImporter : IDataImporter
{
public abstract string ImporterType { get; }

View File

@@ -1,9 +1,9 @@
using DiunaBI.Domain.Entities;
using DiunaBI.Infrastructure.Interfaces;
namespace DiunaBI.Plugins.Morska.Processors;
namespace DiunaBI.Infrastructure.Plugins;
public abstract class MorskaBaseProcessor : IDataProcessor
public abstract class BaseDataProcessor : IDataProcessor
{
public abstract string ProcessorType { get; }

View File

@@ -0,0 +1,216 @@
using DiunaBI.Domain.Entities;
using DiunaBI.Infrastructure.Data;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
namespace DiunaBI.Infrastructure.Services;
public class JobSchedulerService
{
private readonly AppDbContext _db;
private readonly ILogger<JobSchedulerService> _logger;
public JobSchedulerService(AppDbContext db, ILogger<JobSchedulerService> logger)
{
_db = db;
_logger = logger;
}
public async Task<int> ScheduleImportJobsAsync(string? nameFilter = null)
{
_logger.LogInformation("JobScheduler: Starting import job scheduling with filter: {NameFilter}", nameFilter ?? "none");
var query = _db.Layers
.Include(x => x.Records)
.Where(x =>
x.Records!.Any(r => r.Code == "Type" && r.Desc1 == "ImportWorker") &&
x.Records!.Any(r => r.Code == "IsEnabled" && r.Desc1 == "True")
);
if (!string.IsNullOrEmpty(nameFilter))
{
query = query.Where(x => x.Name != null && x.Name.Contains(nameFilter));
}
var importWorkers = await query
.OrderBy(x => x.CreatedAt)
.AsNoTracking()
.ToListAsync();
_logger.LogInformation("JobScheduler: Found {Count} import workers to schedule", importWorkers.Count);
var jobsCreated = 0;
foreach (var worker in importWorkers)
{
try
{
var plugin = worker.Records?.FirstOrDefault(r => r.Code == "Plugin")?.Desc1;
if (string.IsNullOrEmpty(plugin))
{
_logger.LogWarning("JobScheduler: Import worker {LayerName} ({LayerId}) has no Plugin configured, skipping",
worker.Name, worker.Id);
continue;
}
// Get priority from config (default: 50)
var priorityStr = worker.Records?.FirstOrDefault(r => r.Code == "Priority")?.Desc1;
var priority = int.TryParse(priorityStr, out var p) ? p : 50;
// Get max retries from config (default: 3)
var maxRetriesStr = worker.Records?.FirstOrDefault(r => r.Code == "MaxRetries")?.Desc1;
var maxRetries = int.TryParse(maxRetriesStr, out var mr) ? mr : 3;
// Check if there's already a pending/running job for this layer
var existingJob = await _db.QueueJobs
.Where(j => j.LayerId == worker.Id &&
(j.Status == JobStatus.Pending || j.Status == JobStatus.Running))
.FirstOrDefaultAsync();
if (existingJob != null)
{
_logger.LogDebug("JobScheduler: Job already exists for {LayerName} ({LayerId}), status: {Status}",
worker.Name, worker.Id, existingJob.Status);
continue;
}
var job = new QueueJob
{
Id = Guid.NewGuid(),
LayerId = worker.Id,
LayerName = worker.Name ?? "Unknown",
PluginName = plugin,
JobType = JobType.Import,
Priority = priority,
MaxRetries = maxRetries,
Status = JobStatus.Pending,
CreatedAt = DateTime.UtcNow,
CreatedAtUtc = DateTime.UtcNow,
ModifiedAtUtc = DateTime.UtcNow,
CreatedById = Guid.Empty, // System user
ModifiedById = Guid.Empty
};
_db.QueueJobs.Add(job);
jobsCreated++;
_logger.LogInformation("JobScheduler: Created import job for {LayerName} ({LayerId}) with priority {Priority}",
worker.Name, worker.Id, priority);
}
catch (Exception ex)
{
_logger.LogError(ex, "JobScheduler: Failed to create job for {LayerName} ({LayerId})",
worker.Name, worker.Id);
}
}
if (jobsCreated > 0)
{
await _db.SaveChangesAsync();
_logger.LogInformation("JobScheduler: Successfully created {Count} import jobs", jobsCreated);
}
return jobsCreated;
}
public async Task<int> ScheduleProcessJobsAsync()
{
_logger.LogInformation("JobScheduler: Starting process job scheduling");
var processWorkers = await _db.Layers
.Include(x => x.Records)
.Where(x =>
x.Records!.Any(r => r.Code == "Type" && r.Desc1 == "ProcessWorker") &&
x.Records!.Any(r => r.Code == "IsEnabled" && r.Desc1 == "True")
)
.OrderBy(x => x.CreatedAt)
.AsNoTracking()
.ToListAsync();
_logger.LogInformation("JobScheduler: Found {Count} process workers to schedule", processWorkers.Count);
var jobsCreated = 0;
foreach (var worker in processWorkers)
{
try
{
var plugin = worker.Records?.FirstOrDefault(r => r.Code == "Plugin")?.Desc1;
if (string.IsNullOrEmpty(plugin))
{
_logger.LogWarning("JobScheduler: Process worker {LayerName} ({LayerId}) has no Plugin configured, skipping",
worker.Name, worker.Id);
continue;
}
// Get priority from config (default: 100 for processes - higher than imports)
var priorityStr = worker.Records?.FirstOrDefault(r => r.Code == "Priority")?.Desc1;
var priority = int.TryParse(priorityStr, out var p) ? p : 100;
// Get max retries from config (default: 3)
var maxRetriesStr = worker.Records?.FirstOrDefault(r => r.Code == "MaxRetries")?.Desc1;
var maxRetries = int.TryParse(maxRetriesStr, out var mr) ? mr : 3;
// Check if there's already a pending/running job for this layer
var existingJob = await _db.QueueJobs
.Where(j => j.LayerId == worker.Id &&
(j.Status == JobStatus.Pending || j.Status == JobStatus.Running))
.FirstOrDefaultAsync();
if (existingJob != null)
{
_logger.LogDebug("JobScheduler: Job already exists for {LayerName} ({LayerId}), status: {Status}",
worker.Name, worker.Id, existingJob.Status);
continue;
}
var job = new QueueJob
{
Id = Guid.NewGuid(),
LayerId = worker.Id,
LayerName = worker.Name ?? "Unknown",
PluginName = plugin,
JobType = JobType.Process,
Priority = priority,
MaxRetries = maxRetries,
Status = JobStatus.Pending,
CreatedAt = DateTime.UtcNow,
CreatedAtUtc = DateTime.UtcNow,
ModifiedAtUtc = DateTime.UtcNow,
CreatedById = Guid.Empty,
ModifiedById = Guid.Empty
};
_db.QueueJobs.Add(job);
jobsCreated++;
_logger.LogInformation("JobScheduler: Created process job for {LayerName} ({LayerId}) with priority {Priority}",
worker.Name, worker.Id, priority);
}
catch (Exception ex)
{
_logger.LogError(ex, "JobScheduler: Failed to create job for {LayerName} ({LayerId})",
worker.Name, worker.Id);
}
}
if (jobsCreated > 0)
{
await _db.SaveChangesAsync();
_logger.LogInformation("JobScheduler: Successfully created {Count} process jobs", jobsCreated);
}
return jobsCreated;
}
public async Task<int> ScheduleAllJobsAsync(string? nameFilter = null)
{
var importCount = await ScheduleImportJobsAsync(nameFilter);
var processCount = await ScheduleProcessJobsAsync();
_logger.LogInformation("JobScheduler: Scheduled {ImportCount} import jobs and {ProcessCount} process jobs",
importCount, processCount);
return importCount + processCount;
}
}

View File

@@ -11,7 +11,7 @@ public class PluginManager
private readonly IServiceProvider _serviceProvider;
private readonly List<Type> _processorTypes = new();
private readonly List<Type> _importerTypes = new();
private readonly List<IDataExporter> _exporters = new();
private readonly List<Type> _exporterTypes = new();
private readonly List<IPlugin> _plugins = new();
public PluginManager(ILogger<PluginManager> logger, IServiceProvider serviceProvider)
@@ -42,10 +42,11 @@ public class PluginManager
}
}
_logger.LogInformation("Loaded {ProcessorCount} processors and {ImporterCount} importers from {AssemblyCount} assemblies",
_logger.LogInformation("Loaded {ProcessorCount} processors, {ImporterCount} importers, and {ExporterCount} exporters from {AssemblyCount} assemblies",
_processorTypes.Count,
_importerTypes.Count,
dllFiles.Length); // Zmień z _plugins.Count na assemblyFiles.Length
_exporterTypes.Count,
dllFiles.Length);
}
private void LoadPluginFromAssembly(string assemblyPath)
@@ -70,6 +71,12 @@ public class PluginManager
_importerTypes.Add(type);
_logger.LogDebug("Registered importer: {Type}", type.Name); // Information -> Debug
}
if (typeof(IDataExporter).IsAssignableFrom(type) && !type.IsInterface && !type.IsAbstract)
{
_exporterTypes.Add(type);
_logger.LogDebug("Registered exporter: {Type}", type.Name);
}
}
}
catch (Exception ex)
@@ -84,14 +91,15 @@ public class PluginManager
{
try
{
using var scope = _serviceProvider.CreateScope();
var scope = _serviceProvider.CreateScope();
var instance = (IDataProcessor)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type);
if (instance.CanProcess(processorType))
{
var scopedProvider = _serviceProvider.CreateScope().ServiceProvider;
return (IDataProcessor)ActivatorUtilities.CreateInstance(scopedProvider, type);
return instance;
}
scope.Dispose();
}
catch (Exception ex)
{
@@ -107,14 +115,15 @@ public class PluginManager
{
try
{
using var scope = _serviceProvider.CreateScope();
var scope = _serviceProvider.CreateScope();
var instance = (IDataImporter)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type);
if (instance.CanImport(importerType))
{
var scopedProvider = _serviceProvider.CreateScope().ServiceProvider;
return (IDataImporter)ActivatorUtilities.CreateInstance(scopedProvider, type);
return instance;
}
scope.Dispose();
}
catch (Exception ex)
{
@@ -126,7 +135,27 @@ public class PluginManager
public IDataExporter? GetExporter(string exporterType)
{
return _exporters.FirstOrDefault(e => e.CanExport(exporterType));
foreach (var type in _exporterTypes)
{
try
{
var scope = _serviceProvider.CreateScope();
var instance = (IDataExporter)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type);
if (instance.CanExport(exporterType))
{
return instance;
}
scope.Dispose();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to create exporter instance of type {Type}", type.Name);
}
}
return null;
}
public int GetPluginsCount() => _processorTypes.Count + _importerTypes.Count + _exporters.Count;
public int GetPluginsCount() => _processorTypes.Count + _importerTypes.Count + _exporterTypes.Count;
}