Plugins little refactor
This commit is contained in:
@@ -1,11 +1,13 @@
|
|||||||
using DiunaBI.Domain.Entities;
|
using DiunaBI.Domain.Entities;
|
||||||
using DiunaBI.Infrastructure.Interfaces;
|
using DiunaBI.Infrastructure.Interfaces;
|
||||||
|
|
||||||
namespace DiunaBI.Plugins.Morska.Exporters;
|
namespace DiunaBI.Infrastructure.Plugins;
|
||||||
|
|
||||||
public abstract class MorskaBaseExporter : IDataExporter
|
public abstract class BaseDataExporter : IDataExporter
|
||||||
{
|
{
|
||||||
public abstract string ExporterType { get; }
|
public abstract string ExporterType { get; }
|
||||||
|
|
||||||
public virtual bool CanExport(string exporterType) => ExporterType == exporterType;
|
public virtual bool CanExport(string exporterType) => ExporterType == exporterType;
|
||||||
|
|
||||||
public abstract void Export(Layer layer);
|
public abstract void Export(Layer layer);
|
||||||
}
|
}
|
||||||
@@ -1,9 +1,9 @@
|
|||||||
using DiunaBI.Domain.Entities;
|
using DiunaBI.Domain.Entities;
|
||||||
using DiunaBI.Infrastructure.Interfaces;
|
using DiunaBI.Infrastructure.Interfaces;
|
||||||
|
|
||||||
namespace DiunaBI.Plugins.Morska.Importers;
|
namespace DiunaBI.Infrastructure.Plugins;
|
||||||
|
|
||||||
public abstract class MorskaBaseImporter : IDataImporter
|
public abstract class BaseDataImporter : IDataImporter
|
||||||
{
|
{
|
||||||
public abstract string ImporterType { get; }
|
public abstract string ImporterType { get; }
|
||||||
|
|
||||||
@@ -1,9 +1,9 @@
|
|||||||
using DiunaBI.Domain.Entities;
|
using DiunaBI.Domain.Entities;
|
||||||
using DiunaBI.Infrastructure.Interfaces;
|
using DiunaBI.Infrastructure.Interfaces;
|
||||||
|
|
||||||
namespace DiunaBI.Plugins.Morska.Processors;
|
namespace DiunaBI.Infrastructure.Plugins;
|
||||||
|
|
||||||
public abstract class MorskaBaseProcessor : IDataProcessor
|
public abstract class BaseDataProcessor : IDataProcessor
|
||||||
{
|
{
|
||||||
public abstract string ProcessorType { get; }
|
public abstract string ProcessorType { get; }
|
||||||
|
|
||||||
216
DiunaBI.Infrastructure/Services/JobSchedulerService.cs
Normal file
216
DiunaBI.Infrastructure/Services/JobSchedulerService.cs
Normal file
@@ -0,0 +1,216 @@
|
|||||||
|
using DiunaBI.Domain.Entities;
|
||||||
|
using DiunaBI.Infrastructure.Data;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
|
namespace DiunaBI.Infrastructure.Services;
|
||||||
|
|
||||||
|
public class JobSchedulerService
|
||||||
|
{
|
||||||
|
private readonly AppDbContext _db;
|
||||||
|
private readonly ILogger<JobSchedulerService> _logger;
|
||||||
|
|
||||||
|
public JobSchedulerService(AppDbContext db, ILogger<JobSchedulerService> logger)
|
||||||
|
{
|
||||||
|
_db = db;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<int> ScheduleImportJobsAsync(string? nameFilter = null)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("JobScheduler: Starting import job scheduling with filter: {NameFilter}", nameFilter ?? "none");
|
||||||
|
|
||||||
|
var query = _db.Layers
|
||||||
|
.Include(x => x.Records)
|
||||||
|
.Where(x =>
|
||||||
|
x.Records!.Any(r => r.Code == "Type" && r.Desc1 == "ImportWorker") &&
|
||||||
|
x.Records!.Any(r => r.Code == "IsEnabled" && r.Desc1 == "True")
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!string.IsNullOrEmpty(nameFilter))
|
||||||
|
{
|
||||||
|
query = query.Where(x => x.Name != null && x.Name.Contains(nameFilter));
|
||||||
|
}
|
||||||
|
|
||||||
|
var importWorkers = await query
|
||||||
|
.OrderBy(x => x.CreatedAt)
|
||||||
|
.AsNoTracking()
|
||||||
|
.ToListAsync();
|
||||||
|
|
||||||
|
_logger.LogInformation("JobScheduler: Found {Count} import workers to schedule", importWorkers.Count);
|
||||||
|
|
||||||
|
var jobsCreated = 0;
|
||||||
|
|
||||||
|
foreach (var worker in importWorkers)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var plugin = worker.Records?.FirstOrDefault(r => r.Code == "Plugin")?.Desc1;
|
||||||
|
if (string.IsNullOrEmpty(plugin))
|
||||||
|
{
|
||||||
|
_logger.LogWarning("JobScheduler: Import worker {LayerName} ({LayerId}) has no Plugin configured, skipping",
|
||||||
|
worker.Name, worker.Id);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get priority from config (default: 50)
|
||||||
|
var priorityStr = worker.Records?.FirstOrDefault(r => r.Code == "Priority")?.Desc1;
|
||||||
|
var priority = int.TryParse(priorityStr, out var p) ? p : 50;
|
||||||
|
|
||||||
|
// Get max retries from config (default: 3)
|
||||||
|
var maxRetriesStr = worker.Records?.FirstOrDefault(r => r.Code == "MaxRetries")?.Desc1;
|
||||||
|
var maxRetries = int.TryParse(maxRetriesStr, out var mr) ? mr : 3;
|
||||||
|
|
||||||
|
// Check if there's already a pending/running job for this layer
|
||||||
|
var existingJob = await _db.QueueJobs
|
||||||
|
.Where(j => j.LayerId == worker.Id &&
|
||||||
|
(j.Status == JobStatus.Pending || j.Status == JobStatus.Running))
|
||||||
|
.FirstOrDefaultAsync();
|
||||||
|
|
||||||
|
if (existingJob != null)
|
||||||
|
{
|
||||||
|
_logger.LogDebug("JobScheduler: Job already exists for {LayerName} ({LayerId}), status: {Status}",
|
||||||
|
worker.Name, worker.Id, existingJob.Status);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var job = new QueueJob
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid(),
|
||||||
|
LayerId = worker.Id,
|
||||||
|
LayerName = worker.Name ?? "Unknown",
|
||||||
|
PluginName = plugin,
|
||||||
|
JobType = JobType.Import,
|
||||||
|
Priority = priority,
|
||||||
|
MaxRetries = maxRetries,
|
||||||
|
Status = JobStatus.Pending,
|
||||||
|
CreatedAt = DateTime.UtcNow,
|
||||||
|
CreatedAtUtc = DateTime.UtcNow,
|
||||||
|
ModifiedAtUtc = DateTime.UtcNow,
|
||||||
|
CreatedById = Guid.Empty, // System user
|
||||||
|
ModifiedById = Guid.Empty
|
||||||
|
};
|
||||||
|
|
||||||
|
_db.QueueJobs.Add(job);
|
||||||
|
jobsCreated++;
|
||||||
|
|
||||||
|
_logger.LogInformation("JobScheduler: Created import job for {LayerName} ({LayerId}) with priority {Priority}",
|
||||||
|
worker.Name, worker.Id, priority);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "JobScheduler: Failed to create job for {LayerName} ({LayerId})",
|
||||||
|
worker.Name, worker.Id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (jobsCreated > 0)
|
||||||
|
{
|
||||||
|
await _db.SaveChangesAsync();
|
||||||
|
_logger.LogInformation("JobScheduler: Successfully created {Count} import jobs", jobsCreated);
|
||||||
|
}
|
||||||
|
|
||||||
|
return jobsCreated;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<int> ScheduleProcessJobsAsync()
|
||||||
|
{
|
||||||
|
_logger.LogInformation("JobScheduler: Starting process job scheduling");
|
||||||
|
|
||||||
|
var processWorkers = await _db.Layers
|
||||||
|
.Include(x => x.Records)
|
||||||
|
.Where(x =>
|
||||||
|
x.Records!.Any(r => r.Code == "Type" && r.Desc1 == "ProcessWorker") &&
|
||||||
|
x.Records!.Any(r => r.Code == "IsEnabled" && r.Desc1 == "True")
|
||||||
|
)
|
||||||
|
.OrderBy(x => x.CreatedAt)
|
||||||
|
.AsNoTracking()
|
||||||
|
.ToListAsync();
|
||||||
|
|
||||||
|
_logger.LogInformation("JobScheduler: Found {Count} process workers to schedule", processWorkers.Count);
|
||||||
|
|
||||||
|
var jobsCreated = 0;
|
||||||
|
|
||||||
|
foreach (var worker in processWorkers)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var plugin = worker.Records?.FirstOrDefault(r => r.Code == "Plugin")?.Desc1;
|
||||||
|
if (string.IsNullOrEmpty(plugin))
|
||||||
|
{
|
||||||
|
_logger.LogWarning("JobScheduler: Process worker {LayerName} ({LayerId}) has no Plugin configured, skipping",
|
||||||
|
worker.Name, worker.Id);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get priority from config (default: 100 for processes - higher than imports)
|
||||||
|
var priorityStr = worker.Records?.FirstOrDefault(r => r.Code == "Priority")?.Desc1;
|
||||||
|
var priority = int.TryParse(priorityStr, out var p) ? p : 100;
|
||||||
|
|
||||||
|
// Get max retries from config (default: 3)
|
||||||
|
var maxRetriesStr = worker.Records?.FirstOrDefault(r => r.Code == "MaxRetries")?.Desc1;
|
||||||
|
var maxRetries = int.TryParse(maxRetriesStr, out var mr) ? mr : 3;
|
||||||
|
|
||||||
|
// Check if there's already a pending/running job for this layer
|
||||||
|
var existingJob = await _db.QueueJobs
|
||||||
|
.Where(j => j.LayerId == worker.Id &&
|
||||||
|
(j.Status == JobStatus.Pending || j.Status == JobStatus.Running))
|
||||||
|
.FirstOrDefaultAsync();
|
||||||
|
|
||||||
|
if (existingJob != null)
|
||||||
|
{
|
||||||
|
_logger.LogDebug("JobScheduler: Job already exists for {LayerName} ({LayerId}), status: {Status}",
|
||||||
|
worker.Name, worker.Id, existingJob.Status);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var job = new QueueJob
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid(),
|
||||||
|
LayerId = worker.Id,
|
||||||
|
LayerName = worker.Name ?? "Unknown",
|
||||||
|
PluginName = plugin,
|
||||||
|
JobType = JobType.Process,
|
||||||
|
Priority = priority,
|
||||||
|
MaxRetries = maxRetries,
|
||||||
|
Status = JobStatus.Pending,
|
||||||
|
CreatedAt = DateTime.UtcNow,
|
||||||
|
CreatedAtUtc = DateTime.UtcNow,
|
||||||
|
ModifiedAtUtc = DateTime.UtcNow,
|
||||||
|
CreatedById = Guid.Empty,
|
||||||
|
ModifiedById = Guid.Empty
|
||||||
|
};
|
||||||
|
|
||||||
|
_db.QueueJobs.Add(job);
|
||||||
|
jobsCreated++;
|
||||||
|
|
||||||
|
_logger.LogInformation("JobScheduler: Created process job for {LayerName} ({LayerId}) with priority {Priority}",
|
||||||
|
worker.Name, worker.Id, priority);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "JobScheduler: Failed to create job for {LayerName} ({LayerId})",
|
||||||
|
worker.Name, worker.Id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (jobsCreated > 0)
|
||||||
|
{
|
||||||
|
await _db.SaveChangesAsync();
|
||||||
|
_logger.LogInformation("JobScheduler: Successfully created {Count} process jobs", jobsCreated);
|
||||||
|
}
|
||||||
|
|
||||||
|
return jobsCreated;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<int> ScheduleAllJobsAsync(string? nameFilter = null)
|
||||||
|
{
|
||||||
|
var importCount = await ScheduleImportJobsAsync(nameFilter);
|
||||||
|
var processCount = await ScheduleProcessJobsAsync();
|
||||||
|
|
||||||
|
_logger.LogInformation("JobScheduler: Scheduled {ImportCount} import jobs and {ProcessCount} process jobs",
|
||||||
|
importCount, processCount);
|
||||||
|
|
||||||
|
return importCount + processCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -11,7 +11,7 @@ public class PluginManager
|
|||||||
private readonly IServiceProvider _serviceProvider;
|
private readonly IServiceProvider _serviceProvider;
|
||||||
private readonly List<Type> _processorTypes = new();
|
private readonly List<Type> _processorTypes = new();
|
||||||
private readonly List<Type> _importerTypes = new();
|
private readonly List<Type> _importerTypes = new();
|
||||||
private readonly List<IDataExporter> _exporters = new();
|
private readonly List<Type> _exporterTypes = new();
|
||||||
private readonly List<IPlugin> _plugins = new();
|
private readonly List<IPlugin> _plugins = new();
|
||||||
|
|
||||||
public PluginManager(ILogger<PluginManager> logger, IServiceProvider serviceProvider)
|
public PluginManager(ILogger<PluginManager> logger, IServiceProvider serviceProvider)
|
||||||
@@ -42,10 +42,11 @@ public class PluginManager
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_logger.LogInformation("Loaded {ProcessorCount} processors and {ImporterCount} importers from {AssemblyCount} assemblies",
|
_logger.LogInformation("Loaded {ProcessorCount} processors, {ImporterCount} importers, and {ExporterCount} exporters from {AssemblyCount} assemblies",
|
||||||
_processorTypes.Count,
|
_processorTypes.Count,
|
||||||
_importerTypes.Count,
|
_importerTypes.Count,
|
||||||
dllFiles.Length); // Zmień z _plugins.Count na assemblyFiles.Length
|
_exporterTypes.Count,
|
||||||
|
dllFiles.Length);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void LoadPluginFromAssembly(string assemblyPath)
|
private void LoadPluginFromAssembly(string assemblyPath)
|
||||||
@@ -70,6 +71,12 @@ public class PluginManager
|
|||||||
_importerTypes.Add(type);
|
_importerTypes.Add(type);
|
||||||
_logger.LogDebug("Registered importer: {Type}", type.Name); // Information -> Debug
|
_logger.LogDebug("Registered importer: {Type}", type.Name); // Information -> Debug
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (typeof(IDataExporter).IsAssignableFrom(type) && !type.IsInterface && !type.IsAbstract)
|
||||||
|
{
|
||||||
|
_exporterTypes.Add(type);
|
||||||
|
_logger.LogDebug("Registered exporter: {Type}", type.Name);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
@@ -84,14 +91,15 @@ public class PluginManager
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
using var scope = _serviceProvider.CreateScope();
|
var scope = _serviceProvider.CreateScope();
|
||||||
var instance = (IDataProcessor)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type);
|
var instance = (IDataProcessor)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type);
|
||||||
|
|
||||||
if (instance.CanProcess(processorType))
|
if (instance.CanProcess(processorType))
|
||||||
{
|
{
|
||||||
var scopedProvider = _serviceProvider.CreateScope().ServiceProvider;
|
return instance;
|
||||||
return (IDataProcessor)ActivatorUtilities.CreateInstance(scopedProvider, type);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
scope.Dispose();
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@@ -107,14 +115,15 @@ public class PluginManager
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
using var scope = _serviceProvider.CreateScope();
|
var scope = _serviceProvider.CreateScope();
|
||||||
var instance = (IDataImporter)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type);
|
var instance = (IDataImporter)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type);
|
||||||
|
|
||||||
if (instance.CanImport(importerType))
|
if (instance.CanImport(importerType))
|
||||||
{
|
{
|
||||||
var scopedProvider = _serviceProvider.CreateScope().ServiceProvider;
|
return instance;
|
||||||
return (IDataImporter)ActivatorUtilities.CreateInstance(scopedProvider, type);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
scope.Dispose();
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@@ -126,7 +135,27 @@ public class PluginManager
|
|||||||
|
|
||||||
public IDataExporter? GetExporter(string exporterType)
|
public IDataExporter? GetExporter(string exporterType)
|
||||||
{
|
{
|
||||||
return _exporters.FirstOrDefault(e => e.CanExport(exporterType));
|
foreach (var type in _exporterTypes)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var scope = _serviceProvider.CreateScope();
|
||||||
|
var instance = (IDataExporter)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type);
|
||||||
|
|
||||||
|
if (instance.CanExport(exporterType))
|
||||||
|
{
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
scope.Dispose();
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Failed to create exporter instance of type {Type}", type.Name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
public int GetPluginsCount() => _processorTypes.Count + _importerTypes.Count + _exporters.Count;
|
|
||||||
|
public int GetPluginsCount() => _processorTypes.Count + _importerTypes.Count + _exporterTypes.Count;
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user