diff --git a/src/Backend/DiunaBI.Core/Database/Context/AppDbContext.cs b/src/Backend/DiunaBI.Core/Database/Context/AppDbContext.cs index ad56609..549d369 100644 --- a/src/Backend/DiunaBI.Core/Database/Context/AppDbContext.cs +++ b/src/Backend/DiunaBI.Core/Database/Context/AppDbContext.cs @@ -21,14 +21,4 @@ public class AppDbContext(DbContextOptions options) : DbContext(op x.SourceId }); } - - private static readonly LoggerFactory MyLoggerFactory = - new(new[] { - new Microsoft.Extensions.Logging.Debug.DebugLoggerProvider() - }); - - protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) - { - optionsBuilder.UseLoggerFactory(MyLoggerFactory); - } } \ No newline at end of file diff --git a/src/Backend/DiunaBI.Core/Database/Context/DesignTimeDbContextFactory.cs b/src/Backend/DiunaBI.Core/Database/Context/DesignTimeDbContextFactory.cs index 378b43f..1da1ba1 100644 --- a/src/Backend/DiunaBI.Core/Database/Context/DesignTimeDbContextFactory.cs +++ b/src/Backend/DiunaBI.Core/Database/Context/DesignTimeDbContextFactory.cs @@ -11,17 +11,17 @@ public class DesignTimeDbContextFactory : IDesignTimeDbContextFactory(); - var connectionString = configuration.GetConnectionString("DefaultConnection"); + var connectionString = configuration.GetConnectionString("SQLDatabase"); if (string.IsNullOrEmpty(connectionString)) { - throw new InvalidOperationException("Connection string 'DefaultConnection' not found in appsettings.json"); + throw new InvalidOperationException("Connection string 'SQLDatabase' not found in appsettings.json"); } optionsBuilder.UseSqlServer(connectionString); diff --git a/src/Backend/DiunaBI.Core/Database/Migrations/20250607084540_QueueJobRefactor.Designer.cs b/src/Backend/DiunaBI.Core/Database/Migrations/20250607084540_QueueJobRefactor.Designer.cs new file mode 100644 index 0000000..aec761a --- /dev/null +++ b/src/Backend/DiunaBI.Core/Database/Migrations/20250607084540_QueueJobRefactor.Designer.cs @@ -0,0 +1,415 @@ +// +using System; +using DiunaBI.Core.Database.Context; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; + +#nullable disable + +namespace DiunaBI.Core.Migrations +{ + [DbContext(typeof(AppDbContext))] + [Migration("20250607084540_QueueJobRefactor")] + partial class QueueJobRefactor + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("ProductVersion", "8.0.0") + .HasAnnotation("Relational:MaxIdentifierLength", 128); + + SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder); + + modelBuilder.Entity("DiunaBI.Core.Models.DataInbox", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uniqueidentifier"); + + b.Property("CreatedAt") + .HasColumnType("datetime2"); + + b.Property("Data") + .IsRequired() + .HasMaxLength(2147483647) + .HasColumnType("nvarchar(max)"); + + b.Property("Name") + .IsRequired() + .HasMaxLength(50) + .HasColumnType("nvarchar(50)"); + + b.Property("Source") + .IsRequired() + .HasMaxLength(50) + .HasColumnType("nvarchar(50)"); + + b.HasKey("Id"); + + b.ToTable("DataInbox"); + }); + + modelBuilder.Entity("DiunaBI.Core.Models.Layer", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uniqueidentifier"); + + b.Property("CreatedAt") + .HasColumnType("datetime2"); + + b.Property("CreatedById") + .HasColumnType("uniqueidentifier"); + + b.Property("IsCancelled") + .HasColumnType("bit"); + + b.Property("IsDeleted") + .HasColumnType("bit"); + + b.Property("ModifiedAt") + .HasColumnType("datetime2"); + + b.Property("ModifiedById") + .HasColumnType("uniqueidentifier"); + + b.Property("Name") + .IsRequired() + .HasMaxLength(50) + .HasColumnType("nvarchar(50)"); + + b.Property("Number") + .HasColumnType("int"); + + b.Property("ParentId") + .HasColumnType("uniqueidentifier"); + + b.Property("Type") + .HasColumnType("int"); + + b.HasKey("Id"); + + b.HasIndex("CreatedById"); + + b.HasIndex("ModifiedById"); + + b.HasIndex("ParentId"); + + b.ToTable("Layers"); + }); + + modelBuilder.Entity("DiunaBI.Core.Models.ProcessSource", b => + { + b.Property("LayerId") + .HasColumnType("uniqueidentifier"); + + b.Property("SourceId") + .HasColumnType("uniqueidentifier"); + + b.HasKey("LayerId", "SourceId"); + + b.HasIndex("SourceId"); + + b.ToTable("ProcessSources"); + }); + + modelBuilder.Entity("DiunaBI.Core.Models.QueueJob", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uniqueidentifier"); + + b.Property("CompletedAt") + .HasColumnType("datetime2"); + + b.Property("CreatedAt") + .HasColumnType("datetime2"); + + b.Property("CreatedAtUtc") + .HasColumnType("datetime2"); + + b.Property("CreatedById") + .HasColumnType("uniqueidentifier"); + + b.Property("JobType") + .HasColumnType("int"); + + b.Property("LastAttemptAt") + .HasColumnType("datetime2"); + + b.Property("LastError") + .HasMaxLength(1000) + .HasColumnType("nvarchar(1000)"); + + b.Property("LayerId") + .HasColumnType("uniqueidentifier"); + + b.Property("LayerName") + .IsRequired() + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); + + b.Property("MaxRetries") + .HasColumnType("int"); + + b.Property("ModifiedAtUtc") + .HasColumnType("datetime2"); + + b.Property("ModifiedById") + .HasColumnType("uniqueidentifier"); + + b.Property("PluginName") + .IsRequired() + .HasMaxLength(100) + .HasColumnType("nvarchar(100)"); + + b.Property("Priority") + .HasColumnType("int"); + + b.Property("RetryCount") + .HasColumnType("int"); + + b.Property("Status") + .HasColumnType("int"); + + b.HasKey("Id"); + + b.ToTable("QueueJobs"); + }); + + modelBuilder.Entity("DiunaBI.Core.Models.Record", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uniqueidentifier"); + + b.Property("Code") + .IsRequired() + .HasMaxLength(50) + .HasColumnType("nvarchar(50)"); + + b.Property("CreatedAt") + .HasColumnType("datetime2"); + + b.Property("CreatedById") + .HasColumnType("uniqueidentifier"); + + b.Property("Desc1") + .HasMaxLength(10000) + .HasColumnType("nvarchar(max)"); + + b.Property("IsDeleted") + .HasColumnType("bit"); + + b.Property("LayerId") + .HasColumnType("uniqueidentifier"); + + b.Property("ModifiedAt") + .HasColumnType("datetime2"); + + b.Property("ModifiedById") + .HasColumnType("uniqueidentifier"); + + b.Property("Value1") + .HasColumnType("float"); + + b.Property("Value10") + .HasColumnType("float"); + + b.Property("Value11") + .HasColumnType("float"); + + b.Property("Value12") + .HasColumnType("float"); + + b.Property("Value13") + .HasColumnType("float"); + + b.Property("Value14") + .HasColumnType("float"); + + b.Property("Value15") + .HasColumnType("float"); + + b.Property("Value16") + .HasColumnType("float"); + + b.Property("Value17") + .HasColumnType("float"); + + b.Property("Value18") + .HasColumnType("float"); + + b.Property("Value19") + .HasColumnType("float"); + + b.Property("Value2") + .HasColumnType("float"); + + b.Property("Value20") + .HasColumnType("float"); + + b.Property("Value21") + .HasColumnType("float"); + + b.Property("Value22") + .HasColumnType("float"); + + b.Property("Value23") + .HasColumnType("float"); + + b.Property("Value24") + .HasColumnType("float"); + + b.Property("Value25") + .HasColumnType("float"); + + b.Property("Value26") + .HasColumnType("float"); + + b.Property("Value27") + .HasColumnType("float"); + + b.Property("Value28") + .HasColumnType("float"); + + b.Property("Value29") + .HasColumnType("float"); + + b.Property("Value3") + .HasColumnType("float"); + + b.Property("Value30") + .HasColumnType("float"); + + b.Property("Value31") + .HasColumnType("float"); + + b.Property("Value32") + .HasColumnType("float"); + + b.Property("Value4") + .HasColumnType("float"); + + b.Property("Value5") + .HasColumnType("float"); + + b.Property("Value6") + .HasColumnType("float"); + + b.Property("Value7") + .HasColumnType("float"); + + b.Property("Value8") + .HasColumnType("float"); + + b.Property("Value9") + .HasColumnType("float"); + + b.HasKey("Id"); + + b.HasIndex("CreatedById"); + + b.HasIndex("LayerId"); + + b.HasIndex("ModifiedById"); + + b.ToTable("Records"); + }); + + modelBuilder.Entity("DiunaBI.Core.Models.User", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uniqueidentifier"); + + b.Property("CreatedAt") + .HasColumnType("datetime2"); + + b.Property("Email") + .HasMaxLength(50) + .HasColumnType("nvarchar(50)"); + + b.Property("UserName") + .HasMaxLength(50) + .HasColumnType("nvarchar(50)"); + + b.HasKey("Id"); + + b.ToTable("Users"); + }); + + modelBuilder.Entity("DiunaBI.Core.Models.Layer", b => + { + b.HasOne("DiunaBI.Core.Models.User", "CreatedBy") + .WithMany() + .HasForeignKey("CreatedById") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.HasOne("DiunaBI.Core.Models.User", "ModifiedBy") + .WithMany() + .HasForeignKey("ModifiedById") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.HasOne("DiunaBI.Core.Models.Layer", "Parent") + .WithMany() + .HasForeignKey("ParentId"); + + b.Navigation("CreatedBy"); + + b.Navigation("ModifiedBy"); + + b.Navigation("Parent"); + }); + + modelBuilder.Entity("DiunaBI.Core.Models.ProcessSource", b => + { + b.HasOne("DiunaBI.Core.Models.Layer", "Source") + .WithMany() + .HasForeignKey("SourceId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("Source"); + }); + + modelBuilder.Entity("DiunaBI.Core.Models.Record", b => + { + b.HasOne("DiunaBI.Core.Models.User", "CreatedBy") + .WithMany() + .HasForeignKey("CreatedById") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.HasOne("DiunaBI.Core.Models.Layer", null) + .WithMany("Records") + .HasForeignKey("LayerId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.HasOne("DiunaBI.Core.Models.User", "ModifiedBy") + .WithMany() + .HasForeignKey("ModifiedById") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("CreatedBy"); + + b.Navigation("ModifiedBy"); + }); + + modelBuilder.Entity("DiunaBI.Core.Models.Layer", b => + { + b.Navigation("Records"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/Backend/DiunaBI.Core/Database/Migrations/20250607084540_QueueJobRefactor.cs b/src/Backend/DiunaBI.Core/Database/Migrations/20250607084540_QueueJobRefactor.cs new file mode 100644 index 0000000..895d2f1 --- /dev/null +++ b/src/Backend/DiunaBI.Core/Database/Migrations/20250607084540_QueueJobRefactor.cs @@ -0,0 +1,170 @@ +using System; +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace DiunaBI.Core.Migrations +{ + /// + public partial class QueueJobRefactor : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropColumn( + name: "Message", + table: "QueueJobs"); + + migrationBuilder.RenameColumn( + name: "Type", + table: "QueueJobs", + newName: "RetryCount"); + + migrationBuilder.RenameColumn( + name: "ModifiedAt", + table: "QueueJobs", + newName: "ModifiedAtUtc"); + + migrationBuilder.RenameColumn( + name: "Attempts", + table: "QueueJobs", + newName: "Priority"); + + migrationBuilder.AddColumn( + name: "CompletedAt", + table: "QueueJobs", + type: "datetime2", + nullable: true); + + migrationBuilder.AddColumn( + name: "CreatedAtUtc", + table: "QueueJobs", + type: "datetime2", + nullable: false, + defaultValue: new DateTime(1, 1, 1, 0, 0, 0, 0, DateTimeKind.Unspecified)); + + migrationBuilder.AddColumn( + name: "CreatedById", + table: "QueueJobs", + type: "uniqueidentifier", + nullable: false, + defaultValue: new Guid("00000000-0000-0000-0000-000000000000")); + + migrationBuilder.AddColumn( + name: "JobType", + table: "QueueJobs", + type: "int", + nullable: false, + defaultValue: 0); + + migrationBuilder.AddColumn( + name: "LastAttemptAt", + table: "QueueJobs", + type: "datetime2", + nullable: true); + + migrationBuilder.AddColumn( + name: "LastError", + table: "QueueJobs", + type: "nvarchar(1000)", + maxLength: 1000, + nullable: true); + + migrationBuilder.AddColumn( + name: "LayerName", + table: "QueueJobs", + type: "nvarchar(200)", + maxLength: 200, + nullable: false, + defaultValue: ""); + + migrationBuilder.AddColumn( + name: "MaxRetries", + table: "QueueJobs", + type: "int", + nullable: false, + defaultValue: 0); + + migrationBuilder.AddColumn( + name: "ModifiedById", + table: "QueueJobs", + type: "uniqueidentifier", + nullable: false, + defaultValue: new Guid("00000000-0000-0000-0000-000000000000")); + + migrationBuilder.AddColumn( + name: "PluginName", + table: "QueueJobs", + type: "nvarchar(100)", + maxLength: 100, + nullable: false, + defaultValue: ""); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropColumn( + name: "CompletedAt", + table: "QueueJobs"); + + migrationBuilder.DropColumn( + name: "CreatedAtUtc", + table: "QueueJobs"); + + migrationBuilder.DropColumn( + name: "CreatedById", + table: "QueueJobs"); + + migrationBuilder.DropColumn( + name: "JobType", + table: "QueueJobs"); + + migrationBuilder.DropColumn( + name: "LastAttemptAt", + table: "QueueJobs"); + + migrationBuilder.DropColumn( + name: "LastError", + table: "QueueJobs"); + + migrationBuilder.DropColumn( + name: "LayerName", + table: "QueueJobs"); + + migrationBuilder.DropColumn( + name: "MaxRetries", + table: "QueueJobs"); + + migrationBuilder.DropColumn( + name: "ModifiedById", + table: "QueueJobs"); + + migrationBuilder.DropColumn( + name: "PluginName", + table: "QueueJobs"); + + migrationBuilder.RenameColumn( + name: "RetryCount", + table: "QueueJobs", + newName: "Type"); + + migrationBuilder.RenameColumn( + name: "Priority", + table: "QueueJobs", + newName: "Attempts"); + + migrationBuilder.RenameColumn( + name: "ModifiedAtUtc", + table: "QueueJobs", + newName: "ModifiedAt"); + + migrationBuilder.AddColumn( + name: "Message", + table: "QueueJobs", + type: "nvarchar(max)", + nullable: false, + defaultValue: ""); + } + } +} diff --git a/src/Backend/DiunaBI.Core/Database/Migrations/AppDbContextModelSnapshot.cs b/src/Backend/DiunaBI.Core/Database/Migrations/AppDbContextModelSnapshot.cs index 3bc187d..c325531 100644 --- a/src/Backend/DiunaBI.Core/Database/Migrations/AppDbContextModelSnapshot.cs +++ b/src/Backend/DiunaBI.Core/Database/Migrations/AppDbContextModelSnapshot.cs @@ -1,11 +1,10 @@ // using System; +using DiunaBI.Core.Database.Context; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.EntityFrameworkCore.Metadata; using Microsoft.EntityFrameworkCore.Storage.ValueConversion; -using DiunaBI.Core.Models; -using DiunaBI.Core.Database.Context; #nullable disable @@ -18,12 +17,12 @@ namespace DiunaBI.Core.Migrations { #pragma warning disable 612, 618 modelBuilder - .HasAnnotation("ProductVersion", "9.0.0") + .HasAnnotation("ProductVersion", "8.0.0") .HasAnnotation("Relational:MaxIdentifierLength", 128); SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder); - modelBuilder.Entity("WebAPI.Models.DataInbox", b => + modelBuilder.Entity("DiunaBI.Core.Models.DataInbox", b => { b.Property("Id") .ValueGeneratedOnAdd() @@ -52,7 +51,7 @@ namespace DiunaBI.Core.Migrations b.ToTable("DataInbox"); }); - modelBuilder.Entity("WebAPI.Models.Layer", b => + modelBuilder.Entity("DiunaBI.Core.Models.Layer", b => { b.Property("Id") .ValueGeneratedOnAdd() @@ -101,7 +100,7 @@ namespace DiunaBI.Core.Migrations b.ToTable("Layers"); }); - modelBuilder.Entity("WebAPI.Models.ProcessSource", b => + modelBuilder.Entity("DiunaBI.Core.Models.ProcessSource", b => { b.Property("LayerId") .HasColumnType("uniqueidentifier"); @@ -116,32 +115,63 @@ namespace DiunaBI.Core.Migrations b.ToTable("ProcessSources"); }); - modelBuilder.Entity("WebAPI.Models.QueueJob", b => + modelBuilder.Entity("DiunaBI.Core.Models.QueueJob", b => { b.Property("Id") .ValueGeneratedOnAdd() .HasColumnType("uniqueidentifier"); - b.Property("Attempts") - .HasColumnType("int"); + b.Property("CompletedAt") + .HasColumnType("datetime2"); b.Property("CreatedAt") .HasColumnType("datetime2"); + b.Property("CreatedAtUtc") + .HasColumnType("datetime2"); + + b.Property("CreatedById") + .HasColumnType("uniqueidentifier"); + + b.Property("JobType") + .HasColumnType("int"); + + b.Property("LastAttemptAt") + .HasColumnType("datetime2"); + + b.Property("LastError") + .HasMaxLength(1000) + .HasColumnType("nvarchar(1000)"); + b.Property("LayerId") .HasColumnType("uniqueidentifier"); - b.Property("Message") + b.Property("LayerName") .IsRequired() - .HasColumnType("nvarchar(max)"); + .HasMaxLength(200) + .HasColumnType("nvarchar(200)"); - b.Property("ModifiedAt") - .HasColumnType("datetime2"); - - b.Property("Status") + b.Property("MaxRetries") .HasColumnType("int"); - b.Property("Type") + b.Property("ModifiedAtUtc") + .HasColumnType("datetime2"); + + b.Property("ModifiedById") + .HasColumnType("uniqueidentifier"); + + b.Property("PluginName") + .IsRequired() + .HasMaxLength(100) + .HasColumnType("nvarchar(100)"); + + b.Property("Priority") + .HasColumnType("int"); + + b.Property("RetryCount") + .HasColumnType("int"); + + b.Property("Status") .HasColumnType("int"); b.HasKey("Id"); @@ -149,7 +179,7 @@ namespace DiunaBI.Core.Migrations b.ToTable("QueueJobs"); }); - modelBuilder.Entity("WebAPI.Models.Record", b => + modelBuilder.Entity("DiunaBI.Core.Models.Record", b => { b.Property("Id") .ValueGeneratedOnAdd() @@ -289,7 +319,7 @@ namespace DiunaBI.Core.Migrations b.ToTable("Records"); }); - modelBuilder.Entity("WebAPI.Models.User", b => + modelBuilder.Entity("DiunaBI.Core.Models.User", b => { b.Property("Id") .ValueGeneratedOnAdd() @@ -311,21 +341,21 @@ namespace DiunaBI.Core.Migrations b.ToTable("Users"); }); - modelBuilder.Entity("WebAPI.Models.Layer", b => + modelBuilder.Entity("DiunaBI.Core.Models.Layer", b => { - b.HasOne("WebAPI.Models.User", "CreatedBy") + b.HasOne("DiunaBI.Core.Models.User", "CreatedBy") .WithMany() .HasForeignKey("CreatedById") .OnDelete(DeleteBehavior.Cascade) .IsRequired(); - b.HasOne("WebAPI.Models.User", "ModifiedBy") + b.HasOne("DiunaBI.Core.Models.User", "ModifiedBy") .WithMany() .HasForeignKey("ModifiedById") .OnDelete(DeleteBehavior.Cascade) .IsRequired(); - b.HasOne("WebAPI.Models.Layer", "Parent") + b.HasOne("DiunaBI.Core.Models.Layer", "Parent") .WithMany() .HasForeignKey("ParentId"); @@ -336,9 +366,9 @@ namespace DiunaBI.Core.Migrations b.Navigation("Parent"); }); - modelBuilder.Entity("WebAPI.Models.ProcessSource", b => + modelBuilder.Entity("DiunaBI.Core.Models.ProcessSource", b => { - b.HasOne("WebAPI.Models.Layer", "Source") + b.HasOne("DiunaBI.Core.Models.Layer", "Source") .WithMany() .HasForeignKey("SourceId") .OnDelete(DeleteBehavior.Cascade) @@ -347,21 +377,21 @@ namespace DiunaBI.Core.Migrations b.Navigation("Source"); }); - modelBuilder.Entity("WebAPI.Models.Record", b => + modelBuilder.Entity("DiunaBI.Core.Models.Record", b => { - b.HasOne("WebAPI.Models.User", "CreatedBy") + b.HasOne("DiunaBI.Core.Models.User", "CreatedBy") .WithMany() .HasForeignKey("CreatedById") .OnDelete(DeleteBehavior.Cascade) .IsRequired(); - b.HasOne("WebAPI.Models.Layer", null) + b.HasOne("DiunaBI.Core.Models.Layer", null) .WithMany("Records") .HasForeignKey("LayerId") .OnDelete(DeleteBehavior.Cascade) .IsRequired(); - b.HasOne("WebAPI.Models.User", "ModifiedBy") + b.HasOne("DiunaBI.Core.Models.User", "ModifiedBy") .WithMany() .HasForeignKey("ModifiedById") .OnDelete(DeleteBehavior.Cascade) @@ -372,7 +402,7 @@ namespace DiunaBI.Core.Migrations b.Navigation("ModifiedBy"); }); - modelBuilder.Entity("WebAPI.Models.Layer", b => + modelBuilder.Entity("DiunaBI.Core.Models.Layer", b => { b.Navigation("Records"); }); diff --git a/src/Backend/DiunaBI.Core/Services/JobQueueProcessor.cs b/src/Backend/DiunaBI.Core/Services/JobQueueProcessor.cs new file mode 100644 index 0000000..66c7387 --- /dev/null +++ b/src/Backend/DiunaBI.Core/Services/JobQueueProcessor.cs @@ -0,0 +1,202 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.EntityFrameworkCore; +using DiunaBI.Core.Models; +using DiunaBI.Core.Interfaces; +using DiunaBI.Core.Database.Context; +using System; +using System.Threading.Tasks; +using System.Threading; +using System.Net.Http; +using System.Linq; + +namespace DiunaBI.Core.Services; + +public class JobQueueProcessor : BackgroundService +{ + private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; + + public JobQueueProcessor( + IServiceProvider serviceProvider, + ILogger logger) + { + _serviceProvider = serviceProvider; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("JobQueueProcessor: Started"); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + using var scope = _serviceProvider.CreateScope(); + var queueService = scope.ServiceProvider.GetRequiredService(); + + // First process all imports (they run sequentially due to Google Sheets API limits) + await ProcessJobType(queueService, JobType.Import, maxConcurrency: 1, stoppingToken); + + // Then process processors (can run in parallel within same priority) + await ProcessJobType(queueService, JobType.Process, maxConcurrency: 3, stoppingToken); + + // Wait before next cycle + await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken); + } + catch (OperationCanceledException) + { + _logger.LogInformation("JobQueueProcessor: Cancellation requested"); + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "JobQueueProcessor: Unexpected error in queue processor"); + await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken); + } + } + + _logger.LogInformation("JobQueueProcessor: Stopped"); + } + + private async Task ProcessJobType(IJobQueueService queueService, JobType jobType, int maxConcurrency, CancellationToken cancellationToken) + { + var runningJobs = await queueService.GetRunningJobsCountAsync(jobType); + + // Don't start new jobs if we're at max concurrency + if (runningJobs >= maxConcurrency) + { + return; + } + + var job = await queueService.DequeueJobAsync(jobType); + if (job == null) + { + return; + } + + _logger.LogInformation("JobQueueProcessor: Processing {JobType} job {JobId} for layer {LayerName} (attempt {RetryCount}/{MaxRetries}, priority {Priority})", + job.JobType, job.Id, job.LayerName, job.RetryCount + 1, job.MaxRetries, job.Priority); + + // Process job asynchronously to allow parallel processing of processors + _ = Task.Run(async () => + { + try + { + await ProcessJobAsync(job, cancellationToken); + + // Add delay between imports to respect Google Sheets API limits + if (job.JobType == JobType.Import) + { + await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "JobQueueProcessor: Error in background job processing for {JobType} job {JobId}", + job.JobType, job.Id); + } + }, cancellationToken); + } + + private async Task ProcessJobAsync(QueueJob job, CancellationToken cancellationToken) + { + try + { + using var scope = _serviceProvider.CreateScope(); + var dbContext = scope.ServiceProvider.GetRequiredService(); + var pluginManager = scope.ServiceProvider.GetRequiredService(); + var queueService = scope.ServiceProvider.GetRequiredService(); + + // Get the layer with records + var layer = await dbContext.Layers + .Include(x => x.Records) + .FirstOrDefaultAsync(x => x.Id == job.LayerId && !x.IsDeleted, cancellationToken); + + if (layer == null) + { + _logger.LogWarning("JobQueueProcessor: Layer {LayerId} not found, marking job as failed", job.LayerId); + await queueService.MarkJobFailedAsync(job.Id, "Layer not found"); + return; + } + + // Process based on job type + switch (job.JobType) + { + case JobType.Import: + var importer = pluginManager.GetImporter(job.PluginName); + if (importer == null) + { + _logger.LogWarning("JobQueueProcessor: Importer {PluginName} not found, marking job as failed", job.PluginName); + await queueService.MarkJobFailedAsync(job.Id, $"Importer {job.PluginName} not found"); + return; + } + + _logger.LogInformation("JobQueueProcessor: Executing import for layer {LayerName} with plugin {PluginName}", + layer.Name, job.PluginName); + + importer.Import(layer); + break; + + case JobType.Process: + var processor = pluginManager.GetProcessor(job.PluginName); + if (processor == null) + { + _logger.LogWarning("JobQueueProcessor: Processor {PluginName} not found, marking job as failed", job.PluginName); + await queueService.MarkJobFailedAsync(job.Id, $"Processor {job.PluginName} not found"); + return; + } + + _logger.LogInformation("JobQueueProcessor: Executing process for layer {LayerName} with plugin {PluginName}", + layer.Name, job.PluginName); + + processor.Process(layer); + break; + + default: + throw new ArgumentOutOfRangeException(nameof(job.JobType), job.JobType, "Unknown job type"); + } + + await queueService.MarkJobCompletedAsync(job.Id); + + _logger.LogInformation("JobQueueProcessor: Successfully completed {JobType} for layer {LayerName}", + job.JobType, layer.Name); + } + catch (Exception ex) + { + _logger.LogError(ex, "JobQueueProcessor: Error processing {JobType} job {JobId} for layer {LayerName}", + job.JobType, job.Id, job.LayerName); + + using var scope = _serviceProvider.CreateScope(); + var queueService = scope.ServiceProvider.GetRequiredService(); + + // Check if it's a retriable error + if (IsRetriableError(ex)) + { + await queueService.MarkJobForRetryAsync(job.Id, ex.Message); + } + else + { + await queueService.MarkJobFailedAsync(job.Id, ex.Message); + } + } + } + + private static bool IsRetriableError(Exception ex) + { + var message = ex.Message.ToLowerInvariant(); + + var retriableErrors = new[] + { + "quota", "rate limit", "timeout", "service unavailable", + "internal server error", "bad gateway", "gateway timeout", + "network", "connection" + }; + + return retriableErrors.Any(error => message.Contains(error)) || + ex is HttpRequestException || + ex is TimeoutException; + } +} \ No newline at end of file diff --git a/src/Backend/DiunaBI.WebAPI/Controllers/LayersController.cs b/src/Backend/DiunaBI.WebAPI/Controllers/LayersController.cs index b80d80f..5288bd7 100644 --- a/src/Backend/DiunaBI.WebAPI/Controllers/LayersController.cs +++ b/src/Backend/DiunaBI.WebAPI/Controllers/LayersController.cs @@ -7,7 +7,7 @@ using Microsoft.EntityFrameworkCore; using DiunaBI.Core.Models; using DiunaBI.Core.Database.Context; using DiunaBI.Core.Services; -using Google.Cloud.Firestore; +using DiunaBI.Core.Interfaces; namespace DiunaBI.WebAPI.Controllers; @@ -21,6 +21,7 @@ public class LayersController : Controller private readonly IConfiguration _configuration; private readonly PluginManager _pluginManager; private readonly ILogger _logger; + private readonly IJobQueueService _queueService; public LayersController( AppDbContext db, @@ -28,7 +29,8 @@ public class LayersController : Controller GoogleDriveHelper googleDriveHelper, IConfiguration configuration, PluginManager pluginManager, - ILogger logger + ILogger logger, + IJobQueueService queueService ) { _db = db; @@ -37,6 +39,7 @@ public class LayersController : Controller _configuration = configuration; _pluginManager = pluginManager; _logger = logger; + _queueService = queueService; } [HttpGet] @@ -677,56 +680,81 @@ public class LayersController : Controller [HttpGet] [Route("GetImportWorkers")] [AllowAnonymous] - public IActionResult GetImportWorkers() + public async Task GetImportWorkers() { - var importWorkerLayers = _db.Layers - .Include(x => x.Records) - .Where(x => - x.Records!.Any(y => y.Code == "Type" && y.Desc1 == "ImportWorker") && - x.Records!.Any(y => y.Code == "IsEnabled" && y.Desc1 == "True") - && x.Number == 7270 - ) - .OrderByDescending(x => x.CreatedAt) - .AsNoTracking() - .ToList(); - - foreach (var importWorker in importWorkerLayers) + try { - _logger.LogDebug("GetImportWorkers: Found import worker layer {LayerName} ({LayerId})", - importWorker.Name, importWorker.Id); - var pluginName = importWorker.Records!.FirstOrDefault(x => x.Code == "Plugin")?.Desc1; - if (pluginName != null) + var importWorkerLayers = await _db.Layers + .Include(x => x.Records) + .Where(x => + x.Records!.Any(y => y.Code == "Type" && y.Desc1 == "ImportWorker") && + x.Records!.Any(y => y.Code == "IsEnabled" && y.Desc1 == "True") + && x.Number == 5579 + ) + .OrderBy(x => x.CreatedAt) + .AsNoTracking() + .ToListAsync(); + + _logger.LogInformation("GetImportWorkers: Found {LayerCount} import worker layers to queue", + importWorkerLayers.Count); + + int queuedCount = 0; + + foreach (var importWorker in importWorkerLayers) { + var pluginName = importWorker.Records!.FirstOrDefault(x => x.Code == "Plugin")?.Desc1; + if (string.IsNullOrEmpty(pluginName)) + { + _logger.LogWarning("GetImportWorkers: No plugin name found for layer {LayerName} ({LayerId}), skipping", + importWorker.Name, importWorker.Id); + continue; + } + + // Check if plugin exists var importer = _pluginManager.GetImporter(pluginName); if (importer == null) { - _logger.LogWarning("GetImportWorkers: Importer {PluginName} not found for layer {LayerName} ({LayerId})", + _logger.LogWarning("GetImportWorkers: Importer {PluginName} not found for layer {LayerName} ({LayerId}), skipping", pluginName, importWorker.Name, importWorker.Id); - throw new Exception($"Importer {pluginName} not found for layer {importWorker.Name}"); + continue; } - try + + var job = new QueueJob { - _logger.LogInformation("GetImportWorkers: Starting import for layer {LayerName} ({LayerId}) with plugin {PluginName}", - importWorker.Name, importWorker.Id, pluginName); - importer.Import(importWorker); - _logger.LogInformation("GetImportWorkers: Successfully imported layer {LayerName} ({LayerId})", - importWorker.Name, importWorker.Id); - } - catch (Exception e) - { - _logger.LogError(e, "GetImportWorkers: Error importing layer {LayerName} ({LayerId}) with plugin {PluginName}", - importWorker.Name, importWorker.Id, pluginName); - throw; - } - } - else - { - _logger.LogWarning("GetImportWorkers: No plugin name found for import worker layer {LayerName} ({LayerId})", - importWorker.Name, importWorker.Id); - throw new Exception($"No plugin name found for import worker layer {importWorker.Name}"); + LayerId = importWorker.Id, + LayerName = importWorker.Name ?? "Unknown", + PluginName = pluginName, + JobType = JobType.Import, + Priority = 0, // All imports have same priority + MaxRetries = 5, + CreatedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D"), + ModifiedById = Guid.Parse("F392209E-123E-4651-A5A4-0B1D6CF9FF9D") + }; + + await _queueService.EnqueueJobAsync(job); + queuedCount++; + + _logger.LogDebug("GetImportWorkers: Queued import job for layer {LayerName} ({LayerId}) with plugin {PluginName}", + importWorker.Name, importWorker.Id, pluginName); } + + var totalQueueSize = await _queueService.GetQueueCountAsync(); + + _logger.LogInformation("GetImportWorkers: Successfully queued {QueuedCount} import jobs. Total queue size: {QueueSize}", + queuedCount, totalQueueSize); + + return Ok(new { + Message = $"Queued {queuedCount} import jobs", + QueuedJobs = queuedCount, + TotalQueueSize = totalQueueSize, + SkippedLayers = importWorkerLayers.Count - queuedCount + }); + } + catch (Exception e) + { + _logger.LogError(e, "GetImportWorkers: Error queuing import workers"); + return BadRequest(e.ToString()); } - return Ok(); } [HttpGet] diff --git a/src/Backend/DiunaBI.WebAPI/Program.cs b/src/Backend/DiunaBI.WebAPI/Program.cs index 071e540..2f29ace 100644 --- a/src/Backend/DiunaBI.WebAPI/Program.cs +++ b/src/Backend/DiunaBI.WebAPI/Program.cs @@ -10,20 +10,24 @@ using DiunaBI.Core.Database.Context; using DiunaBI.Core.Services; using Google.Apis.Sheets.v4; using Serilog; +using DiunaBI.Core.Interfaces; var builder = WebApplication.CreateBuilder(args); -// ✅ DODAJ SERILOG CONFIGURATION -builder.Host.UseSerilog((context, configuration) => +// ✅ SERILOG TYLKO DLA PRODUKCJI +if (builder.Environment.IsProduction()) { - configuration - .ReadFrom.Configuration(context.Configuration) - .Enrich.FromLogContext() - .Enrich.WithProperty("Application", "DiunaBI") - .Enrich.WithProperty("Version", Assembly.GetExecutingAssembly().GetName().Version?.ToString() ?? "unknown") - .Enrich.WithEnvironmentName() - .Enrich.WithMachineName(); -}); + builder.Host.UseSerilog((context, configuration) => + { + configuration + .ReadFrom.Configuration(context.Configuration) + .Enrich.FromLogContext() + .Enrich.WithProperty("Application", "DiunaBI") + .Enrich.WithProperty("Version", Assembly.GetExecutingAssembly().GetName().Version?.ToString() ?? "unknown") + .Enrich.WithEnvironmentName() + .Enrich.WithMachineName(); + }); +} var connectionString = builder.Configuration.GetConnectionString("SQLDatabase"); builder.Services.AddDbContext(x => @@ -50,7 +54,6 @@ builder.Services.AddCors(options => builder.Services.AddControllers(); - builder.Services.AddAuthentication(options => { options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme; @@ -66,10 +69,13 @@ builder.Services.AddAuthentication(options => ValidateIssuerSigningKey = true, IssuerSigningKey = new SymmetricSecurityKey(Encoding.UTF8.GetBytes(builder.Configuration["Secret"]!)) }; - }); builder.Services.AddAuthentication(); +// Queue services +builder.Services.AddScoped(); +builder.Services.AddHostedService(); + // Zarejestruj Google Sheets dependencies builder.Services.AddSingleton(); builder.Services.AddSingleton(); @@ -90,32 +96,46 @@ builder.Services.AddSingleton(); var app = builder.Build(); -app.UseSerilogRequestLogging(options => +// ✅ SERILOG REQUEST LOGGING TYLKO DLA PRODUKCJI +if (app.Environment.IsProduction()) { - options.MessageTemplate = "HTTP {RequestMethod} {RequestPath} responded {StatusCode} in {Elapsed:0.0000} ms"; - options.EnrichDiagnosticContext = (diagnosticContext, httpContext) => + app.UseSerilogRequestLogging(options => { - diagnosticContext.Set("RequestHost", httpContext.Request.Host.Value); - diagnosticContext.Set("RequestScheme", httpContext.Request.Scheme); - - var userAgent = httpContext.Request.Headers.UserAgent.FirstOrDefault(); - if (!string.IsNullOrEmpty(userAgent)) + options.MessageTemplate = "HTTP {RequestMethod} {RequestPath} responded {StatusCode} in {Elapsed:0.0000} ms"; + options.EnrichDiagnosticContext = (diagnosticContext, httpContext) => { - diagnosticContext.Set("UserAgent", userAgent); - } - - diagnosticContext.Set("RemoteIP", httpContext.Connection.RemoteIpAddress?.ToString() ?? "unknown"); - diagnosticContext.Set("RequestContentType", httpContext.Request.ContentType ?? "none"); - }; -}); + diagnosticContext.Set("RequestHost", httpContext.Request.Host.Value); + diagnosticContext.Set("RequestScheme", httpContext.Request.Scheme); + + var userAgent = httpContext.Request.Headers.UserAgent.FirstOrDefault(); + if (!string.IsNullOrEmpty(userAgent)) + { + diagnosticContext.Set("UserAgent", userAgent); + } + + diagnosticContext.Set("RemoteIP", httpContext.Connection.RemoteIpAddress?.ToString() ?? "unknown"); + diagnosticContext.Set("RequestContentType", httpContext.Request.ContentType ?? "none"); + }; + }); +} var pluginManager = app.Services.GetRequiredService(); var executablePath = Assembly.GetExecutingAssembly().Location; var executableDir = Path.GetDirectoryName(executablePath)!; var pluginsPath = Path.Combine(executableDir, "Plugins"); -Log.Information("Starting DiunaBI application"); -Log.Information("Loading plugins from: {PluginsPath}", pluginsPath); +// ✅ RÓŻNE LOGGERY W ZALEŻNOŚCI OD ŚRODOWISKA +if (app.Environment.IsProduction()) +{ + Log.Information("Starting DiunaBI application"); + Log.Information("Loading plugins from: {PluginsPath}", pluginsPath); +} +else +{ + var logger = app.Services.GetRequiredService>(); + logger.LogInformation("Starting DiunaBI application (Development)"); + logger.LogInformation("Loading plugins from: {PluginsPath}", pluginsPath); +} pluginManager.LoadPluginsFromDirectory(pluginsPath); @@ -145,5 +165,8 @@ app.MapControllers(); app.Run(); -// ✅ DODAJ CLEANUP -Log.CloseAndFlush(); +// ✅ SERILOG CLEANUP TYLKO DLA PRODUKCJI +if (app.Environment.IsProduction()) +{ + Log.CloseAndFlush(); +}