From 4986c60416f0a9394967981842e4dfca5f6df74f Mon Sep 17 00:00:00 2001 From: "2817212736@qq.com" <2817212736@qq.com> Date: Fri, 20 Dec 2024 17:04:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=B5=81=E6=B0=B4=E5=8F=B7?= =?UTF-8?q?=E7=BC=93=E5=AD=98=E6=9C=8D=E5=8A=A1=E7=9A=84=E5=B9=B6=E8=A1=8C?= =?UTF-8?q?=E9=94=99=E8=AF=AF=EF=BC=9B=20=E4=BF=AE=E5=A4=8D=E8=BE=93?= =?UTF-8?q?=E5=87=BA=E7=BA=BF=E7=A8=8B=E6=B2=A1=E6=9C=89=E6=8D=95=E8=8E=B7?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E7=9A=84=E4=B8=A5=E9=87=8D=E9=94=99=E8=AF=AF?= =?UTF-8?q?=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- MesETL.App/HostedServices/OutputService.cs | 13 ++- MesETL.App/HostedServices/TransformService.cs | 88 ++++++++++++++++--- MesETL.App/Program.cs | 2 +- MesETL.App/Services/Seq/SeqService.cs | 9 +- MesETL.App/appsettings.json | 6 +- 5 files changed, 98 insertions(+), 20 deletions(-) diff --git a/MesETL.App/HostedServices/OutputService.cs b/MesETL.App/HostedServices/OutputService.cs index 31b8c3e..bd706d4 100644 --- a/MesETL.App/HostedServices/OutputService.cs +++ b/MesETL.App/HostedServices/OutputService.cs @@ -56,7 +56,18 @@ public class OutputService : IOutputService if (!dbTasks.ContainsKey(db)) { dbTasks.Add(db, await dbTaskManager.CreateTaskAsync( - async () => await StartDatabaseWorker(db, queue, ct), ct)); + async () => + { + try + { + await StartDatabaseWorker(db, queue, ct); + } + catch (Exception e) + { + _logger.LogError(e, "输出线程发生错误"); + _queuePool.RemoveQueue(db); + } + }, ct)); } } diff --git a/MesETL.App/HostedServices/TransformService.cs b/MesETL.App/HostedServices/TransformService.cs index 25e8569..a7015ae 100644 --- a/MesETL.App/HostedServices/TransformService.cs +++ b/MesETL.App/HostedServices/TransformService.cs @@ -48,26 +48,92 @@ public class TransformService : ITransformService public async Task ExecuteAsync(CancellationToken cancellationToken) { - _logger.LogInformation("***** 数据转换服务已启动, 当前线程ID: {ThreadId} *****", Environment.CurrentManagedThreadId); - - // var tasks = new List(); - // for (int i = 0; i < 4; i++) - // { - // tasks.Add(Task.Run(TransformWorker, cancellationToken)); - // } - // - // await Task.WhenAll(tasks); - await TransformWorker(); + _logger.LogInformation("***** 数据转换服务已启动 *****"); + await TransformWorker2(); + + _context.CompleteTransform(); _logger.LogInformation("***** 数据转换服务执行完毕 *****"); } - public async Task TransformWorker() + public async Task TransformWorker(DataRecordQueue queue) { while (!_context.IsInputCompleted || _producerQueue.Count > 0) { if (!_producerQueue.TryDequeue(out var record)) { + await Task.Delay(100); + continue; + } + + try + { + var context = new DataTransformContext(record, _cache, _logger, _services); + if (_options.Value.EnableFilter) + { + // 数据过滤 + var filter = _options.Value.RecordFilter; + if (filter is not null && await filter(context) == false) continue; + } + + if (_options.Value.EnableReplacer) + { + // 数据替换 + var replacer = _options.Value.RecordModify; + if (replacer is not null) + { + record = await replacer(context); + } + } + + // 字段缓存 + var cacher = _options.Value.RecordCache; + if(cacher is not null) + await cacher.Invoke(context); + + //计算需要分流的数据库 + var dbFilter = _options.Value.DatabaseFilter + ?? throw new ApplicationException("未配置数据库过滤器"); + record.Database = dbFilter(record); + + if (_options.Value.EnableReBuilder) + { + //数据重建 + var addRecords = _options.Value.RecordReBuild?.Invoke(context); + if (addRecords is { Count: > 0 }) + { + foreach (var rc in addRecords) + { + if(dbFilter is not null) + rc.Database = dbFilter.Invoke(record); + await queue.EnqueueAsync(rc); + _context.AddTransform(); + } + } + } + + await queue.EnqueueAsync(record); + _context.AddTransform(); + } + catch (Exception e) + { + _context.AddException(e); + var errorRecorder = _errorRecorderFactory.CreateTransform(); + await errorRecorder.LogErrorRecordAsync(record, e); + if (!_options.Value.StrictMode) + _logger.LogError(e, "数据转换时发生错误"); + else throw; + } + } + } + + public async Task TransformWorker2() + { + while (!_context.IsInputCompleted || _producerQueue.Count > 0) + { + if (!_producerQueue.TryDequeue(out var record)) + { + await Task.Delay(100); continue; } diff --git a/MesETL.App/Program.cs b/MesETL.App/Program.cs index 2e66937..aaf0043 100644 --- a/MesETL.App/Program.cs +++ b/MesETL.App/Program.cs @@ -349,7 +349,7 @@ async Task RunProgram() { builder.ClearProviders(); builder.AddSerilog(new LoggerConfiguration() - .WriteTo.Console() + .WriteTo.Console().MinimumLevel.Debug() .WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"), restrictedToMinimumLevel:LogEventLevel.Error) // .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用 diff --git a/MesETL.App/Services/Seq/SeqService.cs b/MesETL.App/Services/Seq/SeqService.cs index 55c1a78..ed5caeb 100644 --- a/MesETL.App/Services/Seq/SeqService.cs +++ b/MesETL.App/Services/Seq/SeqService.cs @@ -1,3 +1,4 @@ +using System.Collections.Concurrent; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Text; @@ -11,7 +12,7 @@ namespace MesETL.App.Services.Seq; public class SeqService { private readonly string _connectionString; - private readonly Dictionary _cachedSequence; + private readonly ConcurrentDictionary _cachedSequence; public IReadOnlyDictionary CachedSequence => _cachedSequence; @@ -24,7 +25,7 @@ public class SeqService }; _connectionString = builder.ConnectionString; - _cachedSequence = new Dictionary(); + _cachedSequence = new ConcurrentDictionary(); } private async Task UpdateSequenceID(string name,int step,long max,bool recycle, int add) @@ -99,9 +100,9 @@ public class SeqService /// 移除一个缓存的流水号 /// /// - public void RemoveCachedSeq(SeqConfig config) + public bool RemoveCachedSeq(SeqConfig config) { - _cachedSequence.Remove(config); + return _cachedSequence.Remove(config, out _); } /// diff --git a/MesETL.App/appsettings.json b/MesETL.App/appsettings.json index 3a15284..89de917 100644 --- a/MesETL.App/appsettings.json +++ b/MesETL.App/appsettings.json @@ -4,7 +4,7 @@ "UnsafeVariable": false, "Logging": { "LogLevel": { - "Default": "Debug" + "Default": "Trace" } }, "Input":{ @@ -33,8 +33,8 @@ } }, "RecordQueue":{ - "ProducerQueueLength": 50000, // 输入队列最大长度 - "ConsumerQueueLength": 10000, // 每个输出队列最大长度 + "ProducerQueueLength": 40000, // 输入队列最大长度 + "ConsumerQueueLength": 20000, // 每个输出队列最大长度 "MaxByteCount": 3221225472 // 队列最大字节数 }, "RedisCache": {