MES-ETL/MesETL.App/HostedServices/TransformService.cs
2024-02-09 23:18:34 +08:00

131 lines
4.6 KiB
C#

using MesETL.App.Cache;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
using MesETL.App.Services.ErrorRecorder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace MesETL.App.HostedServices;
public record DataTransformContext(DataRecord Record, ICacher Cacher, ILogger Logger);
/// <summary>
/// 数据处理服务,对导入后的数据进行处理
/// </summary>
public class TransformService : ITransformService
{
private readonly ILogger _logger;
private readonly IOptions<DataTransformOptions> _options;
private readonly DataRecordQueue _producerQueue;
private readonly RecordQueuePool _queuePool;
private readonly ProcessContext _context;
private readonly ICacher _cache;
private readonly ErrorRecorderFactory _errorRecorderFactory;
public TransformService(ILogger<TransformService> logger,
IOptions<DataTransformOptions> options,
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
RecordQueuePool queuePool,
ProcessContext context,
ICacher cache,
ErrorRecorderFactory errorRecorderFactory)
{
_logger = logger;
_options = options;
_producerQueue = producerQueue;
_queuePool = queuePool;
_context = context;
_cache = cache;
_errorRecorderFactory = errorRecorderFactory;
}
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
// var tasks = new List<Task>();
// for (int i = 0; i < 4; i++)
// {
// tasks.Add(Task.Run(TransformWorker, cancellationToken));
// }
//
// await Task.WhenAll(tasks);
await TransformWorker();
_logger.LogInformation("***** Data transformation service finished *****");
}
public async Task TransformWorker()
{
while (!_context.IsInputCompleted || _producerQueue.Count > 0)
{
if (!_producerQueue.TryDequeue(out var record))
{
continue;
}
try
{
var context = new DataTransformContext(record, _cache, _logger);
if (_options.Value.EnableFilter)
{
// 数据过滤
var filter = _options.Value.RecordFilter;
if (filter is not null && await filter(context) == false) continue;
}
if (_options.Value.EnableReplacer)
{
// 数据替换
var replacer = _options.Value.RecordModify;
if (replacer is not null)
{
record = await replacer(context);
}
}
// 字段缓存
var cacher = _options.Value.RecordCache;
if(cacher is not null)
await cacher.Invoke(context);
//计算需要分流的数据库
var dbFilter = _options.Value.DatabaseFilter
?? throw new ApplicationException("未配置数据库过滤器");
record.Database = dbFilter(record);
await _queuePool[record.Database].EnqueueAsync(record);
_context.AddTransform();
if (_options.Value.EnableReBuilder)
{
//数据重建
var addRecords = _options.Value.RecordReBuild?.Invoke(context);
if (addRecords is { Count: > 0 })
{
foreach (var rc in addRecords)
{
if(dbFilter is not null)
rc.Database =dbFilter.Invoke(record);
await _queuePool[record.Database].EnqueueAsync(rc);
_context.AddTransform();
}
}
}
}
catch (Exception e)
{
_context.AddException(e);
var errorRecorder = _errorRecorderFactory.CreateTransform();
await errorRecorder.LogErrorRecordAsync(record, e);
if (!_options.Value.StrictMode)
_logger.LogError(e, "数据转换时发生错误");
else throw;
}
}
_context.CompleteTransform();
}
}