using DiunaBI.API.Attributes; using DiunaBI.Application.DTOModels.Common; using DiunaBI.Domain.Entities; using DiunaBI.Infrastructure.Data; using DiunaBI.Infrastructure.Services; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; using Microsoft.EntityFrameworkCore; namespace DiunaBI.API.Controllers; [Authorize] [ApiController] [Route("[controller]")] public class JobsController : Controller { private readonly AppDbContext _db; private readonly JobSchedulerService _jobScheduler; private readonly IConfiguration _configuration; private readonly ILogger _logger; public JobsController( AppDbContext db, JobSchedulerService jobScheduler, IConfiguration configuration, ILogger logger) { _db = db; _jobScheduler = jobScheduler; _configuration = configuration; _logger = logger; } [HttpGet] [Route("")] public async Task GetAll( [FromQuery] int start = 0, [FromQuery] int limit = 50, [FromQuery] JobStatus? status = null, [FromQuery] JobType? jobType = null, [FromQuery] Guid? layerId = null) { try { var query = _db.QueueJobs.AsQueryable(); if (status.HasValue) { query = query.Where(j => j.Status == status.Value); } if (jobType.HasValue) { query = query.Where(j => j.JobType == jobType.Value); } if (layerId.HasValue) { query = query.Where(j => j.LayerId == layerId.Value); } var totalCount = await query.CountAsync(); var items = await query .OrderByDescending(j => j.CreatedAt) .Skip(start) .Take(limit) .AsNoTracking() .ToListAsync(); var pagedResult = new PagedResult { Items = items, TotalCount = totalCount, Page = (start / limit) + 1, PageSize = limit }; _logger.LogDebug("GetAll: Retrieved {Count} of {TotalCount} jobs", items.Count, totalCount); return Ok(pagedResult); } catch (Exception ex) { _logger.LogError(ex, "GetAll: Error retrieving jobs"); return BadRequest("An error occurred while retrieving jobs"); } } [HttpGet] [Route("{id:guid}")] public async Task Get(Guid id) { try { var job = await _db.QueueJobs .AsNoTracking() .FirstOrDefaultAsync(j => j.Id == id); if (job == null) { _logger.LogWarning("Get: Job {JobId} not found", id); return NotFound("Job not found"); } _logger.LogDebug("Get: Retrieved job {JobId}", id); return Ok(job); } catch (Exception ex) { _logger.LogError(ex, "Get: Error retrieving job {JobId}", id); return BadRequest("An error occurred processing your request"); } } [HttpPost] [Route("schedule")] [ApiKeyAuth] public async Task ScheduleJobs([FromQuery] string? nameFilter = null) { try { var jobsCreated = await _jobScheduler.ScheduleAllJobsAsync(nameFilter); _logger.LogInformation("ScheduleJobs: Created {Count} jobs", jobsCreated); return Ok(new { success = true, jobsCreated, message = $"Successfully scheduled {jobsCreated} jobs" }); } catch (Exception ex) { _logger.LogError(ex, "ScheduleJobs: Error scheduling jobs"); return BadRequest("An error occurred processing your request"); } } [HttpPost] [Route("schedule/imports")] [ApiKeyAuth] public async Task ScheduleImportJobs([FromQuery] string? nameFilter = null) { try { var jobsCreated = await _jobScheduler.ScheduleImportJobsAsync(nameFilter); _logger.LogInformation("ScheduleImportJobs: Created {Count} import jobs", jobsCreated); return Ok(new { success = true, jobsCreated, message = $"Successfully scheduled {jobsCreated} import jobs" }); } catch (Exception ex) { _logger.LogError(ex, "ScheduleImportJobs: Error scheduling import jobs"); return BadRequest("An error occurred processing your request"); } } [HttpPost] [Route("schedule/processes")] [ApiKeyAuth] public async Task ScheduleProcessJobs() { try { var jobsCreated = await _jobScheduler.ScheduleProcessJobsAsync(); _logger.LogInformation("ScheduleProcessJobs: Created {Count} process jobs", jobsCreated); return Ok(new { success = true, jobsCreated, message = $"Successfully scheduled {jobsCreated} process jobs" }); } catch (Exception ex) { _logger.LogError(ex, "ScheduleProcessJobs: Error scheduling process jobs"); return BadRequest("An error occurred processing your request"); } } [HttpPost] [Route("{id:guid}/retry")] public async Task RetryJob(Guid id) { try { var job = await _db.QueueJobs.FirstOrDefaultAsync(j => j.Id == id); if (job == null) { _logger.LogWarning("RetryJob: Job {JobId} not found", id); return NotFound("Job not found"); } if (job.Status != JobStatus.Failed) { _logger.LogWarning("RetryJob: Job {JobId} is not in Failed status (current: {Status})", id, job.Status); return BadRequest($"Job is not in Failed status (current: {job.Status})"); } job.Status = JobStatus.Pending; job.RetryCount = 0; job.LastError = null; job.ModifiedAtUtc = DateTime.UtcNow; await _db.SaveChangesAsync(); _logger.LogInformation("RetryJob: Job {JobId} reset to Pending status", id); return Ok(new { success = true, message = "Job reset to Pending status and will be retried" }); } catch (Exception ex) { _logger.LogError(ex, "RetryJob: Error retrying job {JobId}", id); return BadRequest("An error occurred processing your request"); } } [HttpDelete] [Route("{id:guid}")] public async Task CancelJob(Guid id) { try { var job = await _db.QueueJobs.FirstOrDefaultAsync(j => j.Id == id); if (job == null) { _logger.LogWarning("CancelJob: Job {JobId} not found", id); return NotFound("Job not found"); } if (job.Status == JobStatus.Running) { _logger.LogWarning("CancelJob: Cannot cancel running job {JobId}", id); return BadRequest("Cannot cancel a job that is currently running"); } if (job.Status == JobStatus.Completed) { _logger.LogWarning("CancelJob: Cannot cancel completed job {JobId}", id); return BadRequest("Cannot cancel a completed job"); } job.Status = JobStatus.Failed; job.LastError = "Cancelled by user"; job.ModifiedAtUtc = DateTime.UtcNow; await _db.SaveChangesAsync(); _logger.LogInformation("CancelJob: Job {JobId} cancelled", id); return Ok(new { success = true, message = "Job cancelled successfully" }); } catch (Exception ex) { _logger.LogError(ex, "CancelJob: Error cancelling job {JobId}", id); return BadRequest("An error occurred processing your request"); } } [HttpGet] [Route("stats")] public async Task GetStats() { try { var stats = new { pending = await _db.QueueJobs.CountAsync(j => j.Status == JobStatus.Pending), running = await _db.QueueJobs.CountAsync(j => j.Status == JobStatus.Running), completed = await _db.QueueJobs.CountAsync(j => j.Status == JobStatus.Completed), failed = await _db.QueueJobs.CountAsync(j => j.Status == JobStatus.Failed), retrying = await _db.QueueJobs.CountAsync(j => j.Status == JobStatus.Retrying), total = await _db.QueueJobs.CountAsync() }; _logger.LogDebug("GetStats: Retrieved job statistics"); return Ok(stats); } catch (Exception ex) { _logger.LogError(ex, "GetStats: Error retrieving job statistics"); return BadRequest("An error occurred processing your request"); } } [HttpPost] [Route("create-for-layer/{layerId:guid}")] public async Task CreateJobForLayer(Guid layerId) { try { var layer = await _db.Layers .Include(x => x.Records) .FirstOrDefaultAsync(l => l.Id == layerId); if (layer == null) { _logger.LogWarning("CreateJobForLayer: Layer {LayerId} not found", layerId); return NotFound($"Layer {layerId} not found"); } if (layer.Type != LayerType.Administration) { _logger.LogWarning("CreateJobForLayer: Layer {LayerId} is not an Administration layer", layerId); return BadRequest("Only Administration layers can be run as jobs"); } // Get the Type record to determine if it's ImportWorker or ProcessWorker var typeRecord = layer.Records?.FirstOrDefault(x => x.Code == "Type"); if (typeRecord?.Desc1 != "ImportWorker" && typeRecord?.Desc1 != "ProcessWorker") { _logger.LogWarning("CreateJobForLayer: Layer {LayerId} is not a valid worker type", layerId); return BadRequest("Layer must be an ImportWorker or ProcessWorker"); } // Check if enabled var isEnabledRecord = layer.Records?.FirstOrDefault(x => x.Code == "IsEnabled"); if (isEnabledRecord?.Desc1 != "True") { _logger.LogWarning("CreateJobForLayer: Layer {LayerId} is not enabled", layerId); return BadRequest("Layer is not enabled"); } // Get plugin name var pluginRecord = layer.Records?.FirstOrDefault(x => x.Code == "Plugin"); if (string.IsNullOrEmpty(pluginRecord?.Desc1)) { _logger.LogWarning("CreateJobForLayer: Layer {LayerId} has no Plugin configured", layerId); return BadRequest("Layer has no Plugin configured"); } // Get priority and max retries var priorityRecord = layer.Records?.FirstOrDefault(x => x.Code == "Priority"); var maxRetriesRecord = layer.Records?.FirstOrDefault(x => x.Code == "MaxRetries"); var priority = int.TryParse(priorityRecord?.Desc1, out var p) ? p : 0; var maxRetries = int.TryParse(maxRetriesRecord?.Desc1, out var m) ? m : 3; var jobType = typeRecord.Desc1 == "ImportWorker" ? JobType.Import : JobType.Process; // Check if there's already a pending/running job for this layer var existingJob = await _db.QueueJobs .Where(j => j.LayerId == layer.Id && (j.Status == JobStatus.Pending || j.Status == JobStatus.Running)) .FirstOrDefaultAsync(); if (existingJob != null) { _logger.LogInformation("CreateJobForLayer: Job already exists for layer {LayerId}, returning existing job", layerId); return Ok(new { success = true, jobId = existingJob.Id, message = "Job already exists for this layer", existing = true }); } // Create the job var job = new QueueJob { Id = Guid.NewGuid(), LayerId = layer.Id, LayerName = layer.Name ?? "Unknown", PluginName = pluginRecord.Desc1, JobType = jobType, Priority = priority, MaxRetries = maxRetries, Status = JobStatus.Pending, CreatedAt = DateTime.UtcNow, CreatedAtUtc = DateTime.UtcNow, ModifiedAtUtc = DateTime.UtcNow, CreatedById = Guid.Empty, ModifiedById = Guid.Empty }; _db.QueueJobs.Add(job); await _db.SaveChangesAsync(); _logger.LogInformation("CreateJobForLayer: Created job {JobId} for layer {LayerName} ({LayerId})", job.Id, layer.Name, layerId); return Ok(new { success = true, jobId = job.Id, message = "Job created successfully", existing = false }); } catch (Exception ex) { _logger.LogError(ex, "CreateJobForLayer: Error creating job for layer {LayerId}", layerId); return BadRequest("An error occurred processing your request"); } } }