Files
DiunaBI/DiunaBI.API/Controllers/JobsController.cs

324 lines
10 KiB
C#
Raw Normal View History

2025-12-02 15:35:04 +01:00
using DiunaBI.Application.DTOModels.Common;
using DiunaBI.Domain.Entities;
using DiunaBI.Infrastructure.Data;
using DiunaBI.Infrastructure.Services;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
namespace DiunaBI.API.Controllers;
[Authorize]
[ApiController]
[Route("[controller]")]
public class JobsController : Controller
{
private readonly AppDbContext _db;
private readonly JobSchedulerService _jobScheduler;
private readonly IConfiguration _configuration;
private readonly ILogger<JobsController> _logger;
public JobsController(
AppDbContext db,
JobSchedulerService jobScheduler,
IConfiguration configuration,
ILogger<JobsController> logger)
{
_db = db;
_jobScheduler = jobScheduler;
_configuration = configuration;
_logger = logger;
}
[HttpGet]
[Route("")]
public async Task<IActionResult> GetAll(
[FromQuery] int start = 0,
[FromQuery] int limit = 50,
[FromQuery] JobStatus? status = null,
[FromQuery] JobType? jobType = null,
[FromQuery] Guid? layerId = null)
{
try
{
var query = _db.QueueJobs.AsQueryable();
if (status.HasValue)
{
query = query.Where(j => j.Status == status.Value);
}
if (jobType.HasValue)
{
query = query.Where(j => j.JobType == jobType.Value);
}
if (layerId.HasValue)
{
query = query.Where(j => j.LayerId == layerId.Value);
}
var totalCount = await query.CountAsync();
var items = await query
.OrderByDescending(j => j.CreatedAt)
.Skip(start)
.Take(limit)
.AsNoTracking()
.ToListAsync();
var pagedResult = new PagedResult<QueueJob>
{
Items = items,
TotalCount = totalCount,
Page = (start / limit) + 1,
PageSize = limit
};
_logger.LogDebug("GetAll: Retrieved {Count} of {TotalCount} jobs", items.Count, totalCount);
return Ok(pagedResult);
}
catch (Exception ex)
{
_logger.LogError(ex, "GetAll: Error retrieving jobs");
return BadRequest(ex.ToString());
}
}
[HttpGet]
[Route("{id:guid}")]
public async Task<IActionResult> Get(Guid id)
{
try
{
var job = await _db.QueueJobs
.AsNoTracking()
.FirstOrDefaultAsync(j => j.Id == id);
if (job == null)
{
_logger.LogWarning("Get: Job {JobId} not found", id);
return NotFound("Job not found");
}
_logger.LogDebug("Get: Retrieved job {JobId}", id);
return Ok(job);
}
catch (Exception ex)
{
_logger.LogError(ex, "Get: Error retrieving job {JobId}", id);
return BadRequest(ex.ToString());
}
}
[HttpPost]
[Route("schedule/{apiKey}")]
[AllowAnonymous]
public async Task<IActionResult> ScheduleJobs(string apiKey, [FromQuery] string? nameFilter = null)
{
if (apiKey != _configuration["apiKey"])
{
_logger.LogWarning("ScheduleJobs: Unauthorized request with apiKey {ApiKey}", apiKey);
return Unauthorized();
}
try
{
var jobsCreated = await _jobScheduler.ScheduleAllJobsAsync(nameFilter);
_logger.LogInformation("ScheduleJobs: Created {Count} jobs", jobsCreated);
return Ok(new
{
success = true,
jobsCreated,
message = $"Successfully scheduled {jobsCreated} jobs"
});
}
catch (Exception ex)
{
_logger.LogError(ex, "ScheduleJobs: Error scheduling jobs");
return BadRequest(ex.ToString());
}
}
[HttpPost]
[Route("schedule/imports/{apiKey}")]
[AllowAnonymous]
public async Task<IActionResult> ScheduleImportJobs(string apiKey, [FromQuery] string? nameFilter = null)
{
if (apiKey != _configuration["apiKey"])
{
_logger.LogWarning("ScheduleImportJobs: Unauthorized request with apiKey {ApiKey}", apiKey);
return Unauthorized();
}
try
{
var jobsCreated = await _jobScheduler.ScheduleImportJobsAsync(nameFilter);
_logger.LogInformation("ScheduleImportJobs: Created {Count} import jobs", jobsCreated);
return Ok(new
{
success = true,
jobsCreated,
message = $"Successfully scheduled {jobsCreated} import jobs"
});
}
catch (Exception ex)
{
_logger.LogError(ex, "ScheduleImportJobs: Error scheduling import jobs");
return BadRequest(ex.ToString());
}
}
[HttpPost]
[Route("schedule/processes/{apiKey}")]
[AllowAnonymous]
public async Task<IActionResult> ScheduleProcessJobs(string apiKey)
{
if (apiKey != _configuration["apiKey"])
{
_logger.LogWarning("ScheduleProcessJobs: Unauthorized request with apiKey {ApiKey}", apiKey);
return Unauthorized();
}
try
{
var jobsCreated = await _jobScheduler.ScheduleProcessJobsAsync();
_logger.LogInformation("ScheduleProcessJobs: Created {Count} process jobs", jobsCreated);
return Ok(new
{
success = true,
jobsCreated,
message = $"Successfully scheduled {jobsCreated} process jobs"
});
}
catch (Exception ex)
{
_logger.LogError(ex, "ScheduleProcessJobs: Error scheduling process jobs");
return BadRequest(ex.ToString());
}
}
[HttpPost]
[Route("{id:guid}/retry")]
public async Task<IActionResult> RetryJob(Guid id)
{
try
{
var job = await _db.QueueJobs.FirstOrDefaultAsync(j => j.Id == id);
if (job == null)
{
_logger.LogWarning("RetryJob: Job {JobId} not found", id);
return NotFound("Job not found");
}
if (job.Status != JobStatus.Failed)
{
_logger.LogWarning("RetryJob: Job {JobId} is not in Failed status (current: {Status})", id, job.Status);
return BadRequest($"Job is not in Failed status (current: {job.Status})");
}
job.Status = JobStatus.Pending;
job.RetryCount = 0;
job.LastError = null;
job.ModifiedAtUtc = DateTime.UtcNow;
await _db.SaveChangesAsync();
_logger.LogInformation("RetryJob: Job {JobId} reset to Pending status", id);
return Ok(new
{
success = true,
message = "Job reset to Pending status and will be retried"
});
}
catch (Exception ex)
{
_logger.LogError(ex, "RetryJob: Error retrying job {JobId}", id);
return BadRequest(ex.ToString());
}
}
[HttpDelete]
[Route("{id:guid}")]
public async Task<IActionResult> CancelJob(Guid id)
{
try
{
var job = await _db.QueueJobs.FirstOrDefaultAsync(j => j.Id == id);
if (job == null)
{
_logger.LogWarning("CancelJob: Job {JobId} not found", id);
return NotFound("Job not found");
}
if (job.Status == JobStatus.Running)
{
_logger.LogWarning("CancelJob: Cannot cancel running job {JobId}", id);
return BadRequest("Cannot cancel a job that is currently running");
}
if (job.Status == JobStatus.Completed)
{
_logger.LogWarning("CancelJob: Cannot cancel completed job {JobId}", id);
return BadRequest("Cannot cancel a completed job");
}
job.Status = JobStatus.Failed;
job.LastError = "Cancelled by user";
job.ModifiedAtUtc = DateTime.UtcNow;
await _db.SaveChangesAsync();
_logger.LogInformation("CancelJob: Job {JobId} cancelled", id);
return Ok(new
{
success = true,
message = "Job cancelled successfully"
});
}
catch (Exception ex)
{
_logger.LogError(ex, "CancelJob: Error cancelling job {JobId}", id);
return BadRequest(ex.ToString());
}
}
[HttpGet]
[Route("stats")]
public async Task<IActionResult> GetStats()
{
try
{
var stats = new
{
pending = await _db.QueueJobs.CountAsync(j => j.Status == JobStatus.Pending),
running = await _db.QueueJobs.CountAsync(j => j.Status == JobStatus.Running),
completed = await _db.QueueJobs.CountAsync(j => j.Status == JobStatus.Completed),
failed = await _db.QueueJobs.CountAsync(j => j.Status == JobStatus.Failed),
retrying = await _db.QueueJobs.CountAsync(j => j.Status == JobStatus.Retrying),
total = await _db.QueueJobs.CountAsync()
};
_logger.LogDebug("GetStats: Retrieved job statistics");
return Ok(stats);
}
catch (Exception ex)
{
_logger.LogError(ex, "GetStats: Error retrieving job statistics");
return BadRequest(ex.ToString());
}
}
}