MES-ETL/ConsoleApp2/HostedServices/TransformService.cs

85 lines
3.1 KiB
C#
Raw Normal View History

2024-01-04 09:00:44 +08:00
using ConsoleApp2.Const;
using ConsoleApp2.HostedServices.Abstractions;
2023-12-29 16:16:05 +08:00
using ConsoleApp2.Options;
using ConsoleApp2.Services;
using Microsoft.Extensions.Caching.Distributed;
2023-12-29 16:16:05 +08:00
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
2024-01-16 18:00:23 +08:00
using StackExchange.Redis;
2023-12-29 16:16:05 +08:00
namespace ConsoleApp2.HostedServices;
2024-01-04 09:00:44 +08:00
/// <summary>
/// 数据处理服务,对导入后的数据进行处理
/// </summary>
public class TransformService : ITransformService
2023-12-29 16:16:05 +08:00
{
private readonly ILogger _logger;
private readonly IOptions<DataTransformOptions> _options;
private readonly DataRecordQueue _producerQueue;
private readonly DataRecordQueue _consumerQueue;
private readonly ProcessContext _context;
private readonly IDistributedCache _cache;
2023-12-29 16:16:05 +08:00
2024-01-04 09:00:44 +08:00
public TransformService(ILogger<TransformService> logger,
IOptions<DataTransformOptions> options,
2023-12-29 16:16:05 +08:00
[FromKeyedServices(ProcessStep.Producer)]DataRecordQueue producerQueue,
[FromKeyedServices(ProcessStep.Consumer)]DataRecordQueue consumerQueue,
ProcessContext context,
IDistributedCache cache)
2023-12-29 16:16:05 +08:00
{
_logger = logger;
_options = options;
_producerQueue = producerQueue;
_consumerQueue = consumerQueue;
_context = context;
_cache = cache;
2023-12-29 16:16:05 +08:00
}
2024-01-04 09:00:44 +08:00
public async Task ExecuteAsync(CancellationToken cancellationToken)
2023-12-29 16:16:05 +08:00
{
_logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
2024-01-12 16:50:37 +08:00
while ((!_context.IsInputCompleted || _producerQueue.Count > 0))
2023-12-29 16:16:05 +08:00
{
2024-01-12 16:50:37 +08:00
if (_context.GetExceptions().Count > 0)
{
_logger.LogInformation("***** Csv transform service is canceled *****");
return;
}
2023-12-29 16:16:05 +08:00
// var dbOptions = _options.Value.DatabaseFilter(record);
if (!_producerQueue.TryDequeue(out var record)) continue;
2024-01-12 16:50:37 +08:00
//过滤不要的record
if ( await _options.Value.RecordFilter?.Invoke(record, _cache) == false) continue;
2024-01-12 16:50:37 +08:00
record.Database = _options.Value.DatabaseFilter?.Invoke(record);
//修改record
_options.Value.RecordModify?.Invoke(record);
2024-01-16 18:00:23 +08:00
//缓存record
_options.Value.RecordCache?.Invoke(record, _cache);
2024-01-12 16:50:37 +08:00
//替换record
var replaceRecord =await _options.Value.RecordReplace?.Invoke(record, _cache);
2024-01-12 16:50:37 +08:00
if (replaceRecord != null)
{
record = replaceRecord;
}
2023-12-29 16:16:05 +08:00
_consumerQueue.Enqueue(record);
2024-01-17 11:53:44 +08:00
_context.AddTransform();
2024-01-12 16:50:37 +08:00
//数据增加
var addRecords=_options.Value.RecordAdd?.Invoke(record);
2024-01-15 17:26:44 +08:00
if(addRecords != null&& addRecords.Count>0)
2024-01-12 16:50:37 +08:00
{
foreach(var rc in addRecords)
{
_consumerQueue.Enqueue(rc);
2024-01-17 11:40:16 +08:00
_context.AddTransform();
2024-01-12 16:50:37 +08:00
}
}
2024-01-17 11:53:44 +08:00
2023-12-29 16:16:05 +08:00
}
_context.CompleteTransform();
_logger.LogInformation("***** Data transformation service completed *****");
}
}