MES-ETL/MesETL.App/HostedServices/FileInputService.cs

159 lines
6.5 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Runtime;
using MesETL.App.Const;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
using MesETL.App.Services.ETL;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace MesETL.App.HostedServices;
public record FileInputInfo
{
public required string FileName { get; init; }
public required string TableName { get; init; }
public required string Database { get; init; }
public required int Part { get; init; }
public required string[] Headers { get; init; }
}
/// <summary>
/// 从输入目录中导入文件
/// </summary>
public class FileInputService : IInputService
{
private readonly ILogger _logger;
private readonly DataRecordQueue _producerQueue;
private readonly IOptions<DataInputOptions> _dataInputOptions;
private readonly ProcessContext _context;
private readonly DataReaderFactory _dataReaderFactory;
private readonly long _memoryThreshold;
private readonly bool _dryRun;
private readonly int _dryRunCount;
public FileInputService(ILogger<FileInputService> logger,
IOptions<DataInputOptions> dataInputOptions,
ProcessContext context,
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
DataReaderFactory dataReaderFactory,
IConfiguration configuration)
{
_logger = logger;
_dataInputOptions = dataInputOptions;
_context = context;
_producerQueue = producerQueue;
_dataReaderFactory = dataReaderFactory;
_memoryThreshold = (long)(configuration.GetValue<double>("MemoryThreshold", 8) * 1024 * 1024 * 1024);
_dryRun = configuration.GetValue("DryRun", false);
_dryRunCount = configuration.GetValue("DryRunCount", 100_000);
}
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
var inputDir = _dataInputOptions.Value.InputDir ?? throw new ApplicationException("未配置文件输入目录");
_logger.LogInformation("***** 输入服务已启动,工作目录为:{InputDir} *****", inputDir);
if (_dryRun)
_logger.LogInformation("***** 试运行模式已开启,只读取前 {Count} 行数据 *****", _dryRunCount);
var orderedInfo = GetOrderedInputInfo(inputDir);
foreach (var info in orderedInfo)
{
var file = Path.GetFileName(info.FileName);
_logger.LogInformation("正在读取文件:{FileName}, 对应的数据表:{TableName}", file, info.TableName);
using var source = _dataReaderFactory.CreateReader(info.FileName, info.TableName, info.Headers);
var countBuffer = 0;
if (_dryRun && _context.TableProgress.GetValueOrDefault(info.TableName, (input: 0, output: 0)).input >= _dryRunCount)
continue;
while (await source.ReadAsync())
{
if (GC.GetTotalMemory(false) > _memoryThreshold)
{
_logger.LogWarning("内存使用率过高,暂缓输入");
GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
GC.Collect();
await Task.Delay(3000, cancellationToken);
}
var record = source.Current;
await _producerQueue.EnqueueAsync(record);
countBuffer++;
_context.AddInput();
// 避免影响性能每1000条更新一次表输入进度
if (countBuffer >= 1000)
{
_context.AddTableInput(info.TableName, countBuffer);
countBuffer = 0;
// 试运行模式下,超出了指定行数则停止输入
if (_dryRun && _context.TableProgress[info.TableName].input >= _dryRunCount)
{
break;
}
}
}
_context.AddTableInput(info.TableName, countBuffer);
_logger.LogInformation("文件 {File} 输入完成", file);
_dataInputOptions.Value.OnTableInputCompleted?.Invoke(info.TableName);
}
_context.CompleteInput();
_logger.LogInformation("***** 输入服务{DryRun}已执行完毕 *****", _dryRun ? " (试运行)" : "");
}
public IEnumerable<FileInputInfo> GetOrderedInputInfo(string dir)
{
var metaBuilder = _dataInputOptions.Value.FileInputMetaBuilder;
if(metaBuilder is null) throw new ApplicationException("未配置文件名->表名的映射委托函数");
var files = Directory.GetFiles(dir);
FileInputInfo[] infoArr = files
.Select(f => metaBuilder(f))
.Where(info => info is not null).ToArray()!;
var orderedInfo = GetFilesInOrder(infoArr).ToArray();
_logger.LogInformation("***** 输入目录中发现 {Count} 个文件, {InfoCount} 个文件可用,{OrderedCount} 个文件符合当前输入配置 *****",
files.Length, infoArr.Length, orderedInfo.Length);
foreach (var info in orderedInfo.GroupBy(i => i.TableName))
{
_logger.LogDebug("表 {TableName} 发现 {FileCount} 个对应文件:\n{FileName}",
info.Key, info.Count(), string.Join('\n', info.Select(f => f.FileName)));
}
return orderedInfo;
}
/// <summary>
/// 读取配置,按照配置的表顺序来返回
/// </summary>
/// <returns></returns>
private IEnumerable<FileInputInfo> GetFilesInOrder(FileInputInfo[] inputFiles)
{
var tableOrder = _dataInputOptions.Value.TableOrder ?? typeof(TableNames).GetFields().Select(f => f.GetValue(null) as string).ToArray();
var ignoreTable = _dataInputOptions.Value.TableIgnoreList;
if (tableOrder is null or { Length: 0 })
return inputFiles;
return Yield();
IEnumerable<FileInputInfo> Yield()
{
foreach (var tableName in tableOrder)
{
var targets = inputFiles.Where(f =>
f.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase) &&
!ignoreTable.Contains(f.TableName));
foreach (var target in targets)
{
yield return target;
}
}
}
}
}