Plugins little refactor
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
using DiunaBI.Domain.Entities;
|
||||
using DiunaBI.Infrastructure.Interfaces;
|
||||
|
||||
namespace DiunaBI.Plugins.Morska.Exporters;
|
||||
|
||||
public abstract class MorskaBaseExporter : IDataExporter
|
||||
{
|
||||
public abstract string ExporterType { get; }
|
||||
public virtual bool CanExport(string exporterType) => ExporterType == exporterType;
|
||||
public abstract void Export(Layer layer);
|
||||
}
|
||||
using DiunaBI.Domain.Entities;
|
||||
using DiunaBI.Infrastructure.Interfaces;
|
||||
|
||||
namespace DiunaBI.Infrastructure.Plugins;
|
||||
|
||||
public abstract class BaseDataExporter : IDataExporter
|
||||
{
|
||||
public abstract string ExporterType { get; }
|
||||
|
||||
public virtual bool CanExport(string exporterType) => ExporterType == exporterType;
|
||||
|
||||
public abstract void Export(Layer layer);
|
||||
}
|
||||
@@ -1,13 +1,13 @@
|
||||
using DiunaBI.Domain.Entities;
|
||||
using DiunaBI.Infrastructure.Interfaces;
|
||||
|
||||
namespace DiunaBI.Plugins.Morska.Importers;
|
||||
|
||||
public abstract class MorskaBaseImporter : IDataImporter
|
||||
{
|
||||
public abstract string ImporterType { get; }
|
||||
|
||||
public virtual bool CanImport(string importerType) => ImporterType == importerType;
|
||||
|
||||
public abstract void Import(Layer importWorker);
|
||||
}
|
||||
using DiunaBI.Domain.Entities;
|
||||
using DiunaBI.Infrastructure.Interfaces;
|
||||
|
||||
namespace DiunaBI.Infrastructure.Plugins;
|
||||
|
||||
public abstract class BaseDataImporter : IDataImporter
|
||||
{
|
||||
public abstract string ImporterType { get; }
|
||||
|
||||
public virtual bool CanImport(string importerType) => ImporterType == importerType;
|
||||
|
||||
public abstract void Import(Layer importWorker);
|
||||
}
|
||||
@@ -1,13 +1,13 @@
|
||||
using DiunaBI.Domain.Entities;
|
||||
using DiunaBI.Infrastructure.Interfaces;
|
||||
|
||||
namespace DiunaBI.Plugins.Morska.Processors;
|
||||
|
||||
public abstract class MorskaBaseProcessor : IDataProcessor
|
||||
{
|
||||
public abstract string ProcessorType { get; }
|
||||
|
||||
public virtual bool CanProcess(string processorType) => ProcessorType == processorType;
|
||||
|
||||
public abstract void Process(Layer processWorker);
|
||||
}
|
||||
using DiunaBI.Domain.Entities;
|
||||
using DiunaBI.Infrastructure.Interfaces;
|
||||
|
||||
namespace DiunaBI.Infrastructure.Plugins;
|
||||
|
||||
public abstract class BaseDataProcessor : IDataProcessor
|
||||
{
|
||||
public abstract string ProcessorType { get; }
|
||||
|
||||
public virtual bool CanProcess(string processorType) => ProcessorType == processorType;
|
||||
|
||||
public abstract void Process(Layer processWorker);
|
||||
}
|
||||
216
DiunaBI.Infrastructure/Services/JobSchedulerService.cs
Normal file
216
DiunaBI.Infrastructure/Services/JobSchedulerService.cs
Normal file
@@ -0,0 +1,216 @@
|
||||
using DiunaBI.Domain.Entities;
|
||||
using DiunaBI.Infrastructure.Data;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace DiunaBI.Infrastructure.Services;
|
||||
|
||||
public class JobSchedulerService
|
||||
{
|
||||
private readonly AppDbContext _db;
|
||||
private readonly ILogger<JobSchedulerService> _logger;
|
||||
|
||||
public JobSchedulerService(AppDbContext db, ILogger<JobSchedulerService> logger)
|
||||
{
|
||||
_db = db;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public async Task<int> ScheduleImportJobsAsync(string? nameFilter = null)
|
||||
{
|
||||
_logger.LogInformation("JobScheduler: Starting import job scheduling with filter: {NameFilter}", nameFilter ?? "none");
|
||||
|
||||
var query = _db.Layers
|
||||
.Include(x => x.Records)
|
||||
.Where(x =>
|
||||
x.Records!.Any(r => r.Code == "Type" && r.Desc1 == "ImportWorker") &&
|
||||
x.Records!.Any(r => r.Code == "IsEnabled" && r.Desc1 == "True")
|
||||
);
|
||||
|
||||
if (!string.IsNullOrEmpty(nameFilter))
|
||||
{
|
||||
query = query.Where(x => x.Name != null && x.Name.Contains(nameFilter));
|
||||
}
|
||||
|
||||
var importWorkers = await query
|
||||
.OrderBy(x => x.CreatedAt)
|
||||
.AsNoTracking()
|
||||
.ToListAsync();
|
||||
|
||||
_logger.LogInformation("JobScheduler: Found {Count} import workers to schedule", importWorkers.Count);
|
||||
|
||||
var jobsCreated = 0;
|
||||
|
||||
foreach (var worker in importWorkers)
|
||||
{
|
||||
try
|
||||
{
|
||||
var plugin = worker.Records?.FirstOrDefault(r => r.Code == "Plugin")?.Desc1;
|
||||
if (string.IsNullOrEmpty(plugin))
|
||||
{
|
||||
_logger.LogWarning("JobScheduler: Import worker {LayerName} ({LayerId}) has no Plugin configured, skipping",
|
||||
worker.Name, worker.Id);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get priority from config (default: 50)
|
||||
var priorityStr = worker.Records?.FirstOrDefault(r => r.Code == "Priority")?.Desc1;
|
||||
var priority = int.TryParse(priorityStr, out var p) ? p : 50;
|
||||
|
||||
// Get max retries from config (default: 3)
|
||||
var maxRetriesStr = worker.Records?.FirstOrDefault(r => r.Code == "MaxRetries")?.Desc1;
|
||||
var maxRetries = int.TryParse(maxRetriesStr, out var mr) ? mr : 3;
|
||||
|
||||
// Check if there's already a pending/running job for this layer
|
||||
var existingJob = await _db.QueueJobs
|
||||
.Where(j => j.LayerId == worker.Id &&
|
||||
(j.Status == JobStatus.Pending || j.Status == JobStatus.Running))
|
||||
.FirstOrDefaultAsync();
|
||||
|
||||
if (existingJob != null)
|
||||
{
|
||||
_logger.LogDebug("JobScheduler: Job already exists for {LayerName} ({LayerId}), status: {Status}",
|
||||
worker.Name, worker.Id, existingJob.Status);
|
||||
continue;
|
||||
}
|
||||
|
||||
var job = new QueueJob
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
LayerId = worker.Id,
|
||||
LayerName = worker.Name ?? "Unknown",
|
||||
PluginName = plugin,
|
||||
JobType = JobType.Import,
|
||||
Priority = priority,
|
||||
MaxRetries = maxRetries,
|
||||
Status = JobStatus.Pending,
|
||||
CreatedAt = DateTime.UtcNow,
|
||||
CreatedAtUtc = DateTime.UtcNow,
|
||||
ModifiedAtUtc = DateTime.UtcNow,
|
||||
CreatedById = Guid.Empty, // System user
|
||||
ModifiedById = Guid.Empty
|
||||
};
|
||||
|
||||
_db.QueueJobs.Add(job);
|
||||
jobsCreated++;
|
||||
|
||||
_logger.LogInformation("JobScheduler: Created import job for {LayerName} ({LayerId}) with priority {Priority}",
|
||||
worker.Name, worker.Id, priority);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "JobScheduler: Failed to create job for {LayerName} ({LayerId})",
|
||||
worker.Name, worker.Id);
|
||||
}
|
||||
}
|
||||
|
||||
if (jobsCreated > 0)
|
||||
{
|
||||
await _db.SaveChangesAsync();
|
||||
_logger.LogInformation("JobScheduler: Successfully created {Count} import jobs", jobsCreated);
|
||||
}
|
||||
|
||||
return jobsCreated;
|
||||
}
|
||||
|
||||
public async Task<int> ScheduleProcessJobsAsync()
|
||||
{
|
||||
_logger.LogInformation("JobScheduler: Starting process job scheduling");
|
||||
|
||||
var processWorkers = await _db.Layers
|
||||
.Include(x => x.Records)
|
||||
.Where(x =>
|
||||
x.Records!.Any(r => r.Code == "Type" && r.Desc1 == "ProcessWorker") &&
|
||||
x.Records!.Any(r => r.Code == "IsEnabled" && r.Desc1 == "True")
|
||||
)
|
||||
.OrderBy(x => x.CreatedAt)
|
||||
.AsNoTracking()
|
||||
.ToListAsync();
|
||||
|
||||
_logger.LogInformation("JobScheduler: Found {Count} process workers to schedule", processWorkers.Count);
|
||||
|
||||
var jobsCreated = 0;
|
||||
|
||||
foreach (var worker in processWorkers)
|
||||
{
|
||||
try
|
||||
{
|
||||
var plugin = worker.Records?.FirstOrDefault(r => r.Code == "Plugin")?.Desc1;
|
||||
if (string.IsNullOrEmpty(plugin))
|
||||
{
|
||||
_logger.LogWarning("JobScheduler: Process worker {LayerName} ({LayerId}) has no Plugin configured, skipping",
|
||||
worker.Name, worker.Id);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get priority from config (default: 100 for processes - higher than imports)
|
||||
var priorityStr = worker.Records?.FirstOrDefault(r => r.Code == "Priority")?.Desc1;
|
||||
var priority = int.TryParse(priorityStr, out var p) ? p : 100;
|
||||
|
||||
// Get max retries from config (default: 3)
|
||||
var maxRetriesStr = worker.Records?.FirstOrDefault(r => r.Code == "MaxRetries")?.Desc1;
|
||||
var maxRetries = int.TryParse(maxRetriesStr, out var mr) ? mr : 3;
|
||||
|
||||
// Check if there's already a pending/running job for this layer
|
||||
var existingJob = await _db.QueueJobs
|
||||
.Where(j => j.LayerId == worker.Id &&
|
||||
(j.Status == JobStatus.Pending || j.Status == JobStatus.Running))
|
||||
.FirstOrDefaultAsync();
|
||||
|
||||
if (existingJob != null)
|
||||
{
|
||||
_logger.LogDebug("JobScheduler: Job already exists for {LayerName} ({LayerId}), status: {Status}",
|
||||
worker.Name, worker.Id, existingJob.Status);
|
||||
continue;
|
||||
}
|
||||
|
||||
var job = new QueueJob
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
LayerId = worker.Id,
|
||||
LayerName = worker.Name ?? "Unknown",
|
||||
PluginName = plugin,
|
||||
JobType = JobType.Process,
|
||||
Priority = priority,
|
||||
MaxRetries = maxRetries,
|
||||
Status = JobStatus.Pending,
|
||||
CreatedAt = DateTime.UtcNow,
|
||||
CreatedAtUtc = DateTime.UtcNow,
|
||||
ModifiedAtUtc = DateTime.UtcNow,
|
||||
CreatedById = Guid.Empty,
|
||||
ModifiedById = Guid.Empty
|
||||
};
|
||||
|
||||
_db.QueueJobs.Add(job);
|
||||
jobsCreated++;
|
||||
|
||||
_logger.LogInformation("JobScheduler: Created process job for {LayerName} ({LayerId}) with priority {Priority}",
|
||||
worker.Name, worker.Id, priority);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "JobScheduler: Failed to create job for {LayerName} ({LayerId})",
|
||||
worker.Name, worker.Id);
|
||||
}
|
||||
}
|
||||
|
||||
if (jobsCreated > 0)
|
||||
{
|
||||
await _db.SaveChangesAsync();
|
||||
_logger.LogInformation("JobScheduler: Successfully created {Count} process jobs", jobsCreated);
|
||||
}
|
||||
|
||||
return jobsCreated;
|
||||
}
|
||||
|
||||
public async Task<int> ScheduleAllJobsAsync(string? nameFilter = null)
|
||||
{
|
||||
var importCount = await ScheduleImportJobsAsync(nameFilter);
|
||||
var processCount = await ScheduleProcessJobsAsync();
|
||||
|
||||
_logger.LogInformation("JobScheduler: Scheduled {ImportCount} import jobs and {ProcessCount} process jobs",
|
||||
importCount, processCount);
|
||||
|
||||
return importCount + processCount;
|
||||
}
|
||||
}
|
||||
@@ -11,7 +11,7 @@ public class PluginManager
|
||||
private readonly IServiceProvider _serviceProvider;
|
||||
private readonly List<Type> _processorTypes = new();
|
||||
private readonly List<Type> _importerTypes = new();
|
||||
private readonly List<IDataExporter> _exporters = new();
|
||||
private readonly List<Type> _exporterTypes = new();
|
||||
private readonly List<IPlugin> _plugins = new();
|
||||
|
||||
public PluginManager(ILogger<PluginManager> logger, IServiceProvider serviceProvider)
|
||||
@@ -42,10 +42,11 @@ public class PluginManager
|
||||
}
|
||||
}
|
||||
|
||||
_logger.LogInformation("Loaded {ProcessorCount} processors and {ImporterCount} importers from {AssemblyCount} assemblies",
|
||||
_logger.LogInformation("Loaded {ProcessorCount} processors, {ImporterCount} importers, and {ExporterCount} exporters from {AssemblyCount} assemblies",
|
||||
_processorTypes.Count,
|
||||
_importerTypes.Count,
|
||||
dllFiles.Length); // Zmień z _plugins.Count na assemblyFiles.Length
|
||||
_exporterTypes.Count,
|
||||
dllFiles.Length);
|
||||
}
|
||||
|
||||
private void LoadPluginFromAssembly(string assemblyPath)
|
||||
@@ -70,6 +71,12 @@ public class PluginManager
|
||||
_importerTypes.Add(type);
|
||||
_logger.LogDebug("Registered importer: {Type}", type.Name); // Information -> Debug
|
||||
}
|
||||
|
||||
if (typeof(IDataExporter).IsAssignableFrom(type) && !type.IsInterface && !type.IsAbstract)
|
||||
{
|
||||
_exporterTypes.Add(type);
|
||||
_logger.LogDebug("Registered exporter: {Type}", type.Name);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
@@ -84,14 +91,15 @@ public class PluginManager
|
||||
{
|
||||
try
|
||||
{
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var scope = _serviceProvider.CreateScope();
|
||||
var instance = (IDataProcessor)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type);
|
||||
|
||||
if (instance.CanProcess(processorType))
|
||||
{
|
||||
var scopedProvider = _serviceProvider.CreateScope().ServiceProvider;
|
||||
return (IDataProcessor)ActivatorUtilities.CreateInstance(scopedProvider, type);
|
||||
return instance;
|
||||
}
|
||||
|
||||
scope.Dispose();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -107,14 +115,15 @@ public class PluginManager
|
||||
{
|
||||
try
|
||||
{
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var scope = _serviceProvider.CreateScope();
|
||||
var instance = (IDataImporter)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type);
|
||||
|
||||
if (instance.CanImport(importerType))
|
||||
{
|
||||
var scopedProvider = _serviceProvider.CreateScope().ServiceProvider;
|
||||
return (IDataImporter)ActivatorUtilities.CreateInstance(scopedProvider, type);
|
||||
return instance;
|
||||
}
|
||||
|
||||
scope.Dispose();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -126,7 +135,27 @@ public class PluginManager
|
||||
|
||||
public IDataExporter? GetExporter(string exporterType)
|
||||
{
|
||||
return _exporters.FirstOrDefault(e => e.CanExport(exporterType));
|
||||
foreach (var type in _exporterTypes)
|
||||
{
|
||||
try
|
||||
{
|
||||
var scope = _serviceProvider.CreateScope();
|
||||
var instance = (IDataExporter)ActivatorUtilities.CreateInstance(scope.ServiceProvider, type);
|
||||
|
||||
if (instance.CanExport(exporterType))
|
||||
{
|
||||
return instance;
|
||||
}
|
||||
|
||||
scope.Dispose();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to create exporter instance of type {Type}", type.Name);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
public int GetPluginsCount() => _processorTypes.Count + _importerTypes.Count + _exporters.Count;
|
||||
|
||||
public int GetPluginsCount() => _processorTypes.Count + _importerTypes.Count + _exporterTypes.Count;
|
||||
}
|
||||
Reference in New Issue
Block a user