diff --git a/DiunaBI.API/Controllers/JobsController.cs b/DiunaBI.API/Controllers/JobsController.cs new file mode 100644 index 0000000..a4de76d --- /dev/null +++ b/DiunaBI.API/Controllers/JobsController.cs @@ -0,0 +1,323 @@ +using DiunaBI.Application.DTOModels.Common; +using DiunaBI.Domain.Entities; +using DiunaBI.Infrastructure.Data; +using DiunaBI.Infrastructure.Services; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; +using Microsoft.EntityFrameworkCore; + +namespace DiunaBI.API.Controllers; + +[Authorize] +[ApiController] +[Route("[controller]")] +public class JobsController : Controller +{ + private readonly AppDbContext _db; + private readonly JobSchedulerService _jobScheduler; + private readonly IConfiguration _configuration; + private readonly ILogger _logger; + + public JobsController( + AppDbContext db, + JobSchedulerService jobScheduler, + IConfiguration configuration, + ILogger logger) + { + _db = db; + _jobScheduler = jobScheduler; + _configuration = configuration; + _logger = logger; + } + + [HttpGet] + [Route("")] + public async Task GetAll( + [FromQuery] int start = 0, + [FromQuery] int limit = 50, + [FromQuery] JobStatus? status = null, + [FromQuery] JobType? jobType = null, + [FromQuery] Guid? layerId = null) + { + try + { + var query = _db.QueueJobs.AsQueryable(); + + if (status.HasValue) + { + query = query.Where(j => j.Status == status.Value); + } + + if (jobType.HasValue) + { + query = query.Where(j => j.JobType == jobType.Value); + } + + if (layerId.HasValue) + { + query = query.Where(j => j.LayerId == layerId.Value); + } + + var totalCount = await query.CountAsync(); + + var items = await query + .OrderByDescending(j => j.CreatedAt) + .Skip(start) + .Take(limit) + .AsNoTracking() + .ToListAsync(); + + var pagedResult = new PagedResult + { + Items = items, + TotalCount = totalCount, + Page = (start / limit) + 1, + PageSize = limit + }; + + _logger.LogDebug("GetAll: Retrieved {Count} of {TotalCount} jobs", items.Count, totalCount); + + return Ok(pagedResult); + } + catch (Exception ex) + { + _logger.LogError(ex, "GetAll: Error retrieving jobs"); + return BadRequest(ex.ToString()); + } + } + + [HttpGet] + [Route("{id:guid}")] + public async Task Get(Guid id) + { + try + { + var job = await _db.QueueJobs + .AsNoTracking() + .FirstOrDefaultAsync(j => j.Id == id); + + if (job == null) + { + _logger.LogWarning("Get: Job {JobId} not found", id); + return NotFound("Job not found"); + } + + _logger.LogDebug("Get: Retrieved job {JobId}", id); + return Ok(job); + } + catch (Exception ex) + { + _logger.LogError(ex, "Get: Error retrieving job {JobId}", id); + return BadRequest(ex.ToString()); + } + } + + [HttpPost] + [Route("schedule/{apiKey}")] + [AllowAnonymous] + public async Task ScheduleJobs(string apiKey, [FromQuery] string? nameFilter = null) + { + if (apiKey != _configuration["apiKey"]) + { + _logger.LogWarning("ScheduleJobs: Unauthorized request with apiKey {ApiKey}", apiKey); + return Unauthorized(); + } + + try + { + var jobsCreated = await _jobScheduler.ScheduleAllJobsAsync(nameFilter); + + _logger.LogInformation("ScheduleJobs: Created {Count} jobs", jobsCreated); + + return Ok(new + { + success = true, + jobsCreated, + message = $"Successfully scheduled {jobsCreated} jobs" + }); + } + catch (Exception ex) + { + _logger.LogError(ex, "ScheduleJobs: Error scheduling jobs"); + return BadRequest(ex.ToString()); + } + } + + [HttpPost] + [Route("schedule/imports/{apiKey}")] + [AllowAnonymous] + public async Task ScheduleImportJobs(string apiKey, [FromQuery] string? nameFilter = null) + { + if (apiKey != _configuration["apiKey"]) + { + _logger.LogWarning("ScheduleImportJobs: Unauthorized request with apiKey {ApiKey}", apiKey); + return Unauthorized(); + } + + try + { + var jobsCreated = await _jobScheduler.ScheduleImportJobsAsync(nameFilter); + + _logger.LogInformation("ScheduleImportJobs: Created {Count} import jobs", jobsCreated); + + return Ok(new + { + success = true, + jobsCreated, + message = $"Successfully scheduled {jobsCreated} import jobs" + }); + } + catch (Exception ex) + { + _logger.LogError(ex, "ScheduleImportJobs: Error scheduling import jobs"); + return BadRequest(ex.ToString()); + } + } + + [HttpPost] + [Route("schedule/processes/{apiKey}")] + [AllowAnonymous] + public async Task ScheduleProcessJobs(string apiKey) + { + if (apiKey != _configuration["apiKey"]) + { + _logger.LogWarning("ScheduleProcessJobs: Unauthorized request with apiKey {ApiKey}", apiKey); + return Unauthorized(); + } + + try + { + var jobsCreated = await _jobScheduler.ScheduleProcessJobsAsync(); + + _logger.LogInformation("ScheduleProcessJobs: Created {Count} process jobs", jobsCreated); + + return Ok(new + { + success = true, + jobsCreated, + message = $"Successfully scheduled {jobsCreated} process jobs" + }); + } + catch (Exception ex) + { + _logger.LogError(ex, "ScheduleProcessJobs: Error scheduling process jobs"); + return BadRequest(ex.ToString()); + } + } + + [HttpPost] + [Route("{id:guid}/retry")] + public async Task RetryJob(Guid id) + { + try + { + var job = await _db.QueueJobs.FirstOrDefaultAsync(j => j.Id == id); + + if (job == null) + { + _logger.LogWarning("RetryJob: Job {JobId} not found", id); + return NotFound("Job not found"); + } + + if (job.Status != JobStatus.Failed) + { + _logger.LogWarning("RetryJob: Job {JobId} is not in Failed status (current: {Status})", id, job.Status); + return BadRequest($"Job is not in Failed status (current: {job.Status})"); + } + + job.Status = JobStatus.Pending; + job.RetryCount = 0; + job.LastError = null; + job.ModifiedAtUtc = DateTime.UtcNow; + + await _db.SaveChangesAsync(); + + _logger.LogInformation("RetryJob: Job {JobId} reset to Pending status", id); + + return Ok(new + { + success = true, + message = "Job reset to Pending status and will be retried" + }); + } + catch (Exception ex) + { + _logger.LogError(ex, "RetryJob: Error retrying job {JobId}", id); + return BadRequest(ex.ToString()); + } + } + + [HttpDelete] + [Route("{id:guid}")] + public async Task CancelJob(Guid id) + { + try + { + var job = await _db.QueueJobs.FirstOrDefaultAsync(j => j.Id == id); + + if (job == null) + { + _logger.LogWarning("CancelJob: Job {JobId} not found", id); + return NotFound("Job not found"); + } + + if (job.Status == JobStatus.Running) + { + _logger.LogWarning("CancelJob: Cannot cancel running job {JobId}", id); + return BadRequest("Cannot cancel a job that is currently running"); + } + + if (job.Status == JobStatus.Completed) + { + _logger.LogWarning("CancelJob: Cannot cancel completed job {JobId}", id); + return BadRequest("Cannot cancel a completed job"); + } + + job.Status = JobStatus.Failed; + job.LastError = "Cancelled by user"; + job.ModifiedAtUtc = DateTime.UtcNow; + + await _db.SaveChangesAsync(); + + _logger.LogInformation("CancelJob: Job {JobId} cancelled", id); + + return Ok(new + { + success = true, + message = "Job cancelled successfully" + }); + } + catch (Exception ex) + { + _logger.LogError(ex, "CancelJob: Error cancelling job {JobId}", id); + return BadRequest(ex.ToString()); + } + } + + [HttpGet] + [Route("stats")] + public async Task GetStats() + { + try + { + var stats = new + { + pending = await _db.QueueJobs.CountAsync(j => j.Status == JobStatus.Pending), + running = await _db.QueueJobs.CountAsync(j => j.Status == JobStatus.Running), + completed = await _db.QueueJobs.CountAsync(j => j.Status == JobStatus.Completed), + failed = await _db.QueueJobs.CountAsync(j => j.Status == JobStatus.Failed), + retrying = await _db.QueueJobs.CountAsync(j => j.Status == JobStatus.Retrying), + total = await _db.QueueJobs.CountAsync() + }; + + _logger.LogDebug("GetStats: Retrieved job statistics"); + + return Ok(stats); + } + catch (Exception ex) + { + _logger.LogError(ex, "GetStats: Error retrieving job statistics"); + return BadRequest(ex.ToString()); + } + } +} diff --git a/DiunaBI.API/Program.cs b/DiunaBI.API/Program.cs index 03ae0b7..cef38b5 100644 --- a/DiunaBI.API/Program.cs +++ b/DiunaBI.API/Program.cs @@ -97,6 +97,10 @@ builder.Services.AddSingleton(provider => builder.Services.AddSingleton(); +// Job Queue Services +builder.Services.AddScoped(); +builder.Services.AddHostedService(); + var app = builder.Build(); // Auto-apply migrations on startup diff --git a/DiunaBI.Infrastructure/DiunaBI.Infrastructure.csproj b/DiunaBI.Infrastructure/DiunaBI.Infrastructure.csproj index 1775919..e4041a6 100644 --- a/DiunaBI.Infrastructure/DiunaBI.Infrastructure.csproj +++ b/DiunaBI.Infrastructure/DiunaBI.Infrastructure.csproj @@ -23,6 +23,7 @@ + \ No newline at end of file diff --git a/DiunaBI.Infrastructure/Services/JobWorkerService.cs b/DiunaBI.Infrastructure/Services/JobWorkerService.cs new file mode 100644 index 0000000..413b11a --- /dev/null +++ b/DiunaBI.Infrastructure/Services/JobWorkerService.cs @@ -0,0 +1,160 @@ +using DiunaBI.Domain.Entities; +using DiunaBI.Infrastructure.Data; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace DiunaBI.Infrastructure.Services; + +public class JobWorkerService : BackgroundService +{ + private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; + private readonly TimeSpan _pollInterval = TimeSpan.FromSeconds(10); + private readonly TimeSpan _rateLimitDelay = TimeSpan.FromSeconds(5); + + public JobWorkerService(IServiceProvider serviceProvider, ILogger logger) + { + _serviceProvider = serviceProvider; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("JobWorker: Service started"); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + await ProcessNextJobAsync(stoppingToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "JobWorker: Unexpected error in main loop"); + } + + await Task.Delay(_pollInterval, stoppingToken); + } + + _logger.LogInformation("JobWorker: Service stopped"); + } + + private async Task ProcessNextJobAsync(CancellationToken stoppingToken) + { + using var scope = _serviceProvider.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + var pluginManager = scope.ServiceProvider.GetRequiredService(); + + // Get next pending job (ordered by priority, then creation time) + var job = await db.QueueJobs + .Where(j => j.Status == JobStatus.Pending || j.Status == JobStatus.Retrying) + .OrderBy(j => j.Priority) + .ThenBy(j => j.CreatedAt) + .FirstOrDefaultAsync(stoppingToken); + + if (job == null) + { + // No jobs to process + return; + } + + _logger.LogInformation("JobWorker: Processing job {JobId} - {LayerName} ({JobType})", + job.Id, job.LayerName, job.JobType); + + // Mark job as running + job.Status = JobStatus.Running; + job.LastAttemptAt = DateTime.UtcNow; + job.ModifiedAtUtc = DateTime.UtcNow; + await db.SaveChangesAsync(stoppingToken); + + try + { + // Load the layer with its configuration + var layer = await db.Layers + .Include(l => l.Records) + .AsNoTracking() + .FirstOrDefaultAsync(l => l.Id == job.LayerId, stoppingToken); + + if (layer == null) + { + throw new Exception($"Layer {job.LayerId} not found"); + } + + // Execute the job based on type + if (job.JobType == JobType.Import) + { + var importer = pluginManager.GetImporter(job.PluginName); + if (importer == null) + { + throw new Exception($"Importer '{job.PluginName}' not found"); + } + + _logger.LogInformation("JobWorker: Executing import for {LayerName} using {PluginName}", + job.LayerName, job.PluginName); + + importer.Import(layer); + } + else if (job.JobType == JobType.Process) + { + var processor = pluginManager.GetProcessor(job.PluginName); + if (processor == null) + { + throw new Exception($"Processor '{job.PluginName}' not found"); + } + + _logger.LogInformation("JobWorker: Executing process for {LayerName} using {PluginName}", + job.LayerName, job.PluginName); + + processor.Process(layer); + } + + // Job completed successfully + job.Status = JobStatus.Completed; + job.CompletedAt = DateTime.UtcNow; + job.LastError = null; + job.ModifiedAtUtc = DateTime.UtcNow; + + _logger.LogInformation("JobWorker: Job {JobId} completed successfully", job.Id); + + // Rate limiting delay (for Google Sheets API quota) + if (job.JobType == JobType.Import) + { + _logger.LogDebug("JobWorker: Applying rate limit delay of {Delay} seconds", _rateLimitDelay.TotalSeconds); + await Task.Delay(_rateLimitDelay, stoppingToken); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "JobWorker: Job {JobId} failed - {LayerName}", job.Id, job.LayerName); + + job.RetryCount++; + job.LastError = ex.Message; + job.ModifiedAtUtc = DateTime.UtcNow; + + if (job.RetryCount >= job.MaxRetries) + { + job.Status = JobStatus.Failed; + _logger.LogWarning("JobWorker: Job {JobId} marked as Failed after {RetryCount} attempts", + job.Id, job.RetryCount); + } + else + { + job.Status = JobStatus.Retrying; + _logger.LogInformation("JobWorker: Job {JobId} will retry (attempt {RetryCount}/{MaxRetries})", + job.Id, job.RetryCount, job.MaxRetries); + } + } + finally + { + await db.SaveChangesAsync(stoppingToken); + } + } + + public override async Task StopAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("JobWorker: Stopping service..."); + await base.StopAsync(stoppingToken); + } +} diff --git a/DiunaBI.Plugins.Morska/Exporters/MorskaBaseExporter.cs b/DiunaBI.Plugins.Morska/Exporters/MorskaBaseExporter.cs new file mode 100644 index 0000000..fb2aac7 --- /dev/null +++ b/DiunaBI.Plugins.Morska/Exporters/MorskaBaseExporter.cs @@ -0,0 +1,7 @@ +using DiunaBI.Infrastructure.Plugins; + +namespace DiunaBI.Plugins.Morska.Exporters; + +public abstract class MorskaBaseExporter : BaseDataExporter +{ +} diff --git a/DiunaBI.Plugins.Morska/Importers/MorskaBaseImporter.cs b/DiunaBI.Plugins.Morska/Importers/MorskaBaseImporter.cs new file mode 100644 index 0000000..86bb1b5 --- /dev/null +++ b/DiunaBI.Plugins.Morska/Importers/MorskaBaseImporter.cs @@ -0,0 +1,7 @@ +using DiunaBI.Infrastructure.Plugins; + +namespace DiunaBI.Plugins.Morska.Importers; + +public abstract class MorskaBaseImporter : BaseDataImporter +{ +} diff --git a/DiunaBI.Plugins.Morska/Processors/MorskaBaseProcessor.cs b/DiunaBI.Plugins.Morska/Processors/MorskaBaseProcessor.cs new file mode 100644 index 0000000..08b8e63 --- /dev/null +++ b/DiunaBI.Plugins.Morska/Processors/MorskaBaseProcessor.cs @@ -0,0 +1,7 @@ +using DiunaBI.Infrastructure.Plugins; + +namespace DiunaBI.Plugins.Morska.Processors; + +public abstract class MorskaBaseProcessor : BaseDataProcessor +{ +}