From a169eecec58719dee35ab06307cf883b811cde44 Mon Sep 17 00:00:00 2001 From: lindj <67092759@qq.com> Date: Mon, 22 Jan 2024 17:04:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B4=E7=90=86=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../HostedServices/TransformService.cs | 57 +++++++++++-------- ConsoleApp2/Program.cs | 2 +- 2 files changed, 35 insertions(+), 24 deletions(-) diff --git a/ConsoleApp2/HostedServices/TransformService.cs b/ConsoleApp2/HostedServices/TransformService.cs index e450560..976b89a 100644 --- a/ConsoleApp2/HostedServices/TransformService.cs +++ b/ConsoleApp2/HostedServices/TransformService.cs @@ -21,7 +21,7 @@ public class TransformService : ITransformService private readonly IDistributedCache _cache; - public TransformService(ILogger logger, + public TransformService(ILogger logger, IOptions options, ProcessContext context, IDistributedCache cache) @@ -35,30 +35,41 @@ public class TransformService : ITransformService public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken) { _logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId); - while ((!context.IsInputCompleted || producerQueue.Count > 0)) + while ((!context.IsInputCompleted || producerQueue.Count > 0)) + { + if (_context.GetExceptions().Count > 0) { - if (_context.GetExceptions().Count > 0) - { - _logger.LogInformation("***** Csv transform service is canceled *****"); - return; - } - if (!producerQueue.TryDequeue(out var record)) continue; + _logger.LogInformation("***** Csv transform service is canceled *****"); + return; + } + if (!producerQueue.TryDequeue(out var record)) continue; - //过滤不要的record - if (await _options.Value.RecordFilter?.Invoke(record, _cache) == false) continue; - record.Database = _options.Value.DatabaseFilter?.Invoke(record); - //修改record - _options.Value.RecordModify?.Invoke(record); - //缓存record - await _options.Value.RecordCache?.Invoke(record, _cache); - //替换record - var replaceRecord = await _options.Value.RecordReplace?.Invoke(record, _cache); - if (replaceRecord != null) + //过滤不要的record + + if (_options.Value.RecordFilter != null) + { + var result = await _options.Value.RecordFilter.Invoke(record, _cache); + if (result == false) continue; + } + record.Database = _options.Value.DatabaseFilter?.Invoke(record); + //修改record + _options.Value.RecordModify?.Invoke(record); + //缓存record + if (_options.Value.RecordCache != null) + { + await _options.Value.RecordCache.Invoke(record, _cache); + } + //替换record + if (_options.Value.RecordReplace != null) + { + var result = await _options.Value.RecordReplace.Invoke(record, _cache); + if (result != null) { - record = replaceRecord; + record = result; } - consumerQueue.Enqueue(record); - _context.AddTransform(); + } + consumerQueue.Enqueue(record); + _context.AddTransform(); //数据增加 var addRecords = _options.Value.RecordAdd?.Invoke(record); if (addRecords != null && addRecords.Count > 0) @@ -70,8 +81,8 @@ public class TransformService : ITransformService } } } - context.CompleteTransform(); - + context.CompleteTransform(); + _logger.LogInformation("***** Data transformation service completed *****"); } } \ No newline at end of file diff --git a/ConsoleApp2/Program.cs b/ConsoleApp2/Program.cs index 84b5a5e..b15ac5c 100644 --- a/ConsoleApp2/Program.cs +++ b/ConsoleApp2/Program.cs @@ -445,7 +445,7 @@ async Task RunProgram() host.Services.AddHostedService(); host.Services.AddHostedService(); - if(commandOptions.IsMock)host.Services.AddSingleton(); + if(commandOptions.IsMock)host.Services.AddSingleton(); else host.Services.AddSingleton(); host.Services.AddSingleton(); host.Services.AddSingleton();