159 lines
6.5 KiB
C#
159 lines
6.5 KiB
C#
using System.Runtime;
|
||
using MesETL.App.Const;
|
||
using MesETL.App.HostedServices.Abstractions;
|
||
using MesETL.App.Options;
|
||
using MesETL.App.Services;
|
||
using MesETL.App.Services.ETL;
|
||
using Microsoft.Extensions.Configuration;
|
||
using Microsoft.Extensions.DependencyInjection;
|
||
using Microsoft.Extensions.Logging;
|
||
using Microsoft.Extensions.Options;
|
||
|
||
namespace MesETL.App.HostedServices;
|
||
|
||
public record FileInputInfo
|
||
{
|
||
public required string FileName { get; init; }
|
||
public required string TableName { get; init; }
|
||
public required string Database { get; init; }
|
||
public required int Part { get; init; }
|
||
public required string[] Headers { get; init; }
|
||
}
|
||
|
||
/// <summary>
|
||
/// 从输入目录中导入文件
|
||
/// </summary>
|
||
public class FileInputService : IInputService
|
||
{
|
||
private readonly ILogger _logger;
|
||
private readonly DataRecordQueue _producerQueue;
|
||
private readonly IOptions<DataInputOptions> _dataInputOptions;
|
||
private readonly ProcessContext _context;
|
||
private readonly DataReaderFactory _dataReaderFactory;
|
||
private readonly long _memoryThreshold;
|
||
private readonly bool _dryRun;
|
||
private readonly int _dryRunCount;
|
||
|
||
public FileInputService(ILogger<FileInputService> logger,
|
||
IOptions<DataInputOptions> dataInputOptions,
|
||
ProcessContext context,
|
||
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
|
||
DataReaderFactory dataReaderFactory,
|
||
IConfiguration configuration)
|
||
{
|
||
_logger = logger;
|
||
_dataInputOptions = dataInputOptions;
|
||
_context = context;
|
||
_producerQueue = producerQueue;
|
||
_dataReaderFactory = dataReaderFactory;
|
||
_memoryThreshold = (long)(configuration.GetValue<double>("MemoryThreshold", 8) * 1024 * 1024 * 1024);
|
||
_dryRun = configuration.GetValue("DryRun", false);
|
||
_dryRunCount = configuration.GetValue("DryRunCount", 100_000);
|
||
}
|
||
|
||
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
||
{
|
||
var inputDir = _dataInputOptions.Value.InputDir ?? throw new ApplicationException("未配置文件输入目录");
|
||
_logger.LogInformation("***** 输入服务已启动,工作目录为:{InputDir} *****", inputDir);
|
||
if (_dryRun)
|
||
_logger.LogInformation("***** 试运行模式已开启,只读取前 {Count} 行数据 *****", _dryRunCount);
|
||
|
||
var orderedInfo = GetOrderedInputInfo(inputDir);
|
||
|
||
foreach (var info in orderedInfo)
|
||
{
|
||
var file = Path.GetFileName(info.FileName);
|
||
_logger.LogInformation("正在读取文件:{FileName}, 对应的数据表:{TableName}", file, info.TableName);
|
||
using var source = _dataReaderFactory.CreateReader(info.FileName, info.TableName, info.Headers);
|
||
var countBuffer = 0;
|
||
|
||
if (_dryRun && _context.TableProgress.GetValueOrDefault(info.TableName, (input: 0, output: 0)).input >= _dryRunCount)
|
||
continue;
|
||
|
||
while (await source.ReadAsync())
|
||
{
|
||
if (GC.GetTotalMemory(false) > _memoryThreshold)
|
||
{
|
||
_logger.LogWarning("内存使用率过高,暂缓输入");
|
||
GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
|
||
GC.Collect();
|
||
await Task.Delay(3000, cancellationToken);
|
||
}
|
||
var record = source.Current;
|
||
await _producerQueue.EnqueueAsync(record);
|
||
countBuffer++;
|
||
_context.AddInput();
|
||
|
||
// 避免影响性能,每1000条更新一次表输入进度
|
||
if (countBuffer >= 1000)
|
||
{
|
||
_context.AddTableInput(info.TableName, countBuffer);
|
||
countBuffer = 0;
|
||
// 试运行模式下,超出了指定行数则停止输入
|
||
if (_dryRun && _context.TableProgress[info.TableName].input >= _dryRunCount)
|
||
{
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
_context.AddTableInput(info.TableName, countBuffer);
|
||
_logger.LogInformation("文件 {File} 输入完成", file);
|
||
_dataInputOptions.Value.OnTableInputCompleted?.Invoke(info.TableName);
|
||
}
|
||
|
||
_context.CompleteInput();
|
||
_logger.LogInformation("***** 输入服务{DryRun}已执行完毕 *****", _dryRun ? " (试运行)" : "");
|
||
}
|
||
|
||
public IEnumerable<FileInputInfo> GetOrderedInputInfo(string dir)
|
||
{
|
||
var metaBuilder = _dataInputOptions.Value.FileInputMetaBuilder;
|
||
if(metaBuilder is null) throw new ApplicationException("未配置文件名->表名的映射委托函数");
|
||
var files = Directory.GetFiles(dir);
|
||
FileInputInfo[] infoArr = files
|
||
.Select(f => metaBuilder(f))
|
||
.Where(info => info is not null).ToArray()!;
|
||
|
||
var orderedInfo = GetFilesInOrder(infoArr).ToArray();
|
||
|
||
_logger.LogInformation("***** 输入目录中发现 {Count} 个文件, {InfoCount} 个文件可用,{OrderedCount} 个文件符合当前输入配置 *****",
|
||
files.Length, infoArr.Length, orderedInfo.Length);
|
||
foreach (var info in orderedInfo.GroupBy(i => i.TableName))
|
||
{
|
||
_logger.LogDebug("表 {TableName} 发现 {FileCount} 个对应文件:\n{FileName}",
|
||
info.Key, info.Count(), string.Join('\n', info.Select(f => f.FileName)));
|
||
}
|
||
|
||
return orderedInfo;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 读取配置,按照配置的表顺序来返回
|
||
/// </summary>
|
||
/// <returns></returns>
|
||
private IEnumerable<FileInputInfo> GetFilesInOrder(FileInputInfo[] inputFiles)
|
||
{
|
||
var tableOrder = _dataInputOptions.Value.TableOrder ?? typeof(TableNames).GetFields().Select(f => f.GetValue(null) as string).ToArray();
|
||
var ignoreTable = _dataInputOptions.Value.TableIgnoreList;
|
||
if (tableOrder is null or { Length: 0 })
|
||
return inputFiles;
|
||
|
||
return Yield();
|
||
|
||
IEnumerable<FileInputInfo> Yield()
|
||
{
|
||
foreach (var tableName in tableOrder)
|
||
{
|
||
var targets = inputFiles.Where(f =>
|
||
f.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase) &&
|
||
!ignoreTable.Contains(f.TableName));
|
||
|
||
foreach (var target in targets)
|
||
{
|
||
yield return target;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
} |