using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
using MesETL.App.Services.ETL;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace MesETL.App.HostedServices;
public record FileInputInfo
{
public required string FileName { get; init; }
public required string TableName { get; init; }
public required string[] Headers { get; init; }
}
public enum FileInputType
{
MyDumperCsv,
MyDumperZst,
ErrorLog,
}
///
/// 从输入目录中导入文件
///
public class FileInputService : IInputService
{
private readonly ILogger _logger;
private readonly DataRecordQueue _producerQueue;
private readonly IOptions _dataInputOptions;
private readonly ProcessContext _context;
private readonly DataReaderFactory _dataReaderFactory;
public FileInputService(ILogger logger,
IOptions dataInputOptions,
ProcessContext context,
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
DataReaderFactory dataReaderFactory)
{
_logger = logger;
_dataInputOptions = dataInputOptions;
_context = context;
_producerQueue = producerQueue;
_dataReaderFactory = dataReaderFactory;
}
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
var inputDir = _dataInputOptions.Value.InputDir ?? throw new ApplicationException("未配置文件输入目录");
_logger.LogInformation("***** Input service started, working directory: {InputDir} *****", inputDir);
var trans = _dataInputOptions.Value.FileInputMetaBuilder;
if(trans is null) throw new ApplicationException("未配置文件名-表名映射委托");
FileInputInfo[] infoArr = Directory.GetFiles(inputDir)
.Select(f => trans(f))
.Where(info => info is not null).ToArray()!;
var orderedInfo = GetFilesInOrder(infoArr).ToArray();
_logger.LogInformation("***** {Count} files founded in directory,{OrderedCount} files is matched with configuration *****", infoArr.Length, orderedInfo.Length);
foreach (var info in orderedInfo)
{
_logger.LogDebug("Table {TableName}: {FileName}", info.TableName, info.FileName);
}
foreach (var info in orderedInfo)
{
_logger.LogInformation("Reading file: {FileName}, table: {TableName}", info.FileName, info.TableName);
using var source = _dataReaderFactory.CreateReader(info.FileName,info.TableName,info.Headers);
var count = 0;
while (await source.ReadAsync())
{
var record = source.Current;
await _producerQueue.EnqueueAsync(record);
count++;
_context.AddInput();
}
_context.AddTableInput(info.TableName, count);
_logger.LogInformation("Input of table: '{TableName}' finished", info.TableName);
_dataInputOptions.Value.OnTableInputCompleted?.Invoke(info.TableName);
}
_context.CompleteInput();
_logger.LogInformation("***** Input service finished *****");
}
///
/// 读取配置,按照配置的表顺序来返回
///
///
private IEnumerable GetFilesInOrder(FileInputInfo[] inputFiles)
{
var tableOrder = _dataInputOptions.Value.TableOrder;
var ignoreTable = _dataInputOptions.Value.TableIgnoreList;
if (tableOrder is null or { Length: 0 })
return inputFiles;
return Yield();
IEnumerable Yield()
{
foreach (var tableName in tableOrder)
{
var target = inputFiles.FirstOrDefault(f =>
f.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase));
if (target is not null && !ignoreTable.Contains(target.TableName))
yield return target;
}
}
}
}