2025迁移版本,多项规则修改

This commit is contained in:
2024-12-10 14:03:09 +08:00
parent dc239c776e
commit 0e28d639c1
34 changed files with 1075 additions and 564 deletions

View File

@@ -1,4 +1,5 @@
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Const;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
using MesETL.App.Services.ETL;
@@ -13,16 +14,11 @@ public record FileInputInfo
{
public required string FileName { get; init; }
public required string TableName { get; init; }
public required string Database { get; init; }
public required int Part { get; init; }
public required string[] Headers { get; init; }
}
public enum FileInputType
{
MyDumperCsv,
MyDumperZst,
ErrorLog,
}
/// <summary>
/// 从输入目录中导入文件
/// </summary>
@@ -53,33 +49,23 @@ public class FileInputService : IInputService
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
var inputDir = _dataInputOptions.Value.InputDir ?? throw new ApplicationException("未配置文件输入目录");
_logger.LogInformation("***** Input service started, working directory: {InputDir} *****", inputDir);
var trans = _dataInputOptions.Value.FileInputMetaBuilder;
if(trans is null) throw new ApplicationException("未配置文件名-表名映射委托");
FileInputInfo[] infoArr = Directory.GetFiles(inputDir)
.Select(f => trans(f))
.Where(info => info is not null).ToArray()!;
var orderedInfo = GetFilesInOrder(infoArr).ToArray();
_logger.LogInformation("***** {Count} files founded in directory{OrderedCount} files is matched with configuration *****", infoArr.Length, orderedInfo.Length);
foreach (var info in orderedInfo)
{
_logger.LogDebug("Table {TableName}: {FileName}", info.TableName, info.FileName);
}
_logger.LogInformation("***** 输入服务已启动,工作目录为:{InputDir} *****", inputDir);
var orderedInfo = GetOrderedInputInfo(inputDir);
foreach (var info in orderedInfo)
{
_logger.LogInformation("Reading file: {FileName}, table: {TableName}", info.FileName, info.TableName);
using var source = _dataReaderFactory.CreateReader(info.FileName,info.TableName,info.Headers);
var file = Path.GetFileName(info.FileName);
_logger.LogInformation("正在读取文件:{FileName}, 对应的数据表:{TableName}", file, info.TableName);
using var source = _dataReaderFactory.CreateReader(info.FileName, info.TableName, info.Headers);
var count = 0;
while (await source.ReadAsync())
{
if (GC.GetTotalMemory(false) > _memoryThreshold)
{
_logger.LogWarning("内存过高,暂缓输入");
_logger.LogWarning("内存使用率过高,暂缓输入");
GC.Collect();
GC.Collect();
await Task.Delay(3000, cancellationToken);
}
@@ -90,12 +76,34 @@ public class FileInputService : IInputService
}
_context.AddTableInput(info.TableName, count);
_logger.LogInformation("Input of table: '{TableName}' finished", info.TableName);
_logger.LogInformation("文件 {File} 输入完成", file);
_dataInputOptions.Value.OnTableInputCompleted?.Invoke(info.TableName);
}
_context.CompleteInput();
_logger.LogInformation("***** Input service finished *****");
_logger.LogInformation("***** 输入服务已执行完毕 *****");
}
public IEnumerable<FileInputInfo> GetOrderedInputInfo(string dir)
{
var metaBuilder = _dataInputOptions.Value.FileInputMetaBuilder;
if(metaBuilder is null) throw new ApplicationException("未配置文件名->表名的映射委托函数");
var files = Directory.GetFiles(dir);
FileInputInfo[] infoArr = files
.Select(f => metaBuilder(f))
.Where(info => info is not null).ToArray()!;
var orderedInfo = GetFilesInOrder(infoArr).ToArray();
_logger.LogInformation("***** 输入目录中发现 {Count} 个文件, {InfoCount} 个文件可用,{OrderedCount} 个文件符合当前输入配置 *****",
files.Length, infoArr.Length, orderedInfo.Length);
foreach (var info in orderedInfo.GroupBy(i => i.TableName))
{
_logger.LogDebug("表 {TableName} 发现 {FileCount} 个对应文件:\n{FileName}",
info.Key, info.Count(), string.Join('\n', info.Select(f => f.FileName)));
}
return orderedInfo;
}
/// <summary>
@@ -104,7 +112,7 @@ public class FileInputService : IInputService
/// <returns></returns>
private IEnumerable<FileInputInfo> GetFilesInOrder(FileInputInfo[] inputFiles)
{
var tableOrder = _dataInputOptions.Value.TableOrder;
var tableOrder = _dataInputOptions.Value.TableOrder ?? typeof(TableNames).GetFields().Select(f => f.GetValue(null) as string).ToArray();
var ignoreTable = _dataInputOptions.Value.TableIgnoreList;
if (tableOrder is null or { Length: 0 })
return inputFiles;
@@ -115,10 +123,14 @@ public class FileInputService : IInputService
{
foreach (var tableName in tableOrder)
{
var target = inputFiles.FirstOrDefault(f =>
f.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase));
if (target is not null && !ignoreTable.Contains(target.TableName))
var targets = inputFiles.Where(f =>
f.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase) &&
!ignoreTable.Contains(f.TableName));
foreach (var target in targets)
{
yield return target;
}
}
}
}

View File

@@ -51,12 +51,13 @@ public class MainHostedService : BackgroundService
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Command argument detected, execute for each database");
var command = _config["Command"];
if (!string.IsNullOrEmpty(command))
{
_logger.LogInformation("***** Running Sql Command *****");
_logger.LogInformation("检测到命令参数传入,将对所有配置的数据库执行输入的命令。。。");
_logger.LogInformation("***** 执行SQL命令 *****");
await ExecuteEachDatabase(command, stoppingToken);
_logger.LogInformation("***** 执行完成 *****");
Environment.Exit(0);
}
@@ -75,8 +76,8 @@ public class MainHostedService : BackgroundService
await Task.WhenAll(inputTask, transformTask, outputTask);
_stopwatch.Stop();
_logger.LogInformation("***** All tasks completed *****");
_logger.LogInformation("***** ElapseTime: {Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
_logger.LogInformation("***** 所有传输任务均已完成 *****");
_logger.LogInformation("***** 耗时:{Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
await Task.Delay(5000, stoppingToken);
if(enableUnsafeVar)
@@ -84,7 +85,7 @@ public class MainHostedService : BackgroundService
if (!stoppingToken.IsCancellationRequested)
{
await ExportResultAsync();
_logger.LogInformation("The execution result export to {Path}",
_logger.LogInformation("传输结果已保存至 {Path}",
Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md"));
Environment.Exit(0);
@@ -157,32 +158,33 @@ public class MainHostedService : BackgroundService
{
var sb = new StringBuilder();
if (_context.HasException)
sb.AppendLine("# Program Completed With Error");
else sb.AppendLine("# Program Completed Successfully");
sb.AppendLine("## Process Count");
sb.AppendLine("# 程序执行完毕,**但中途发生了异常**");
else sb.AppendLine("# 程序执行完毕,没有发生错误");
sb.AppendLine("## 处理计数");
var processCount = new[]
{
new { State = "Input", Count = _context.InputCount },
new { State = "Transform", Count = _context.TransformCount },
new { State = "Output", Count = _context.OutputCount }
new { = "输入", = _context.InputCount },
new { = "转换", = _context.TransformCount },
new { = "输出", = _context.OutputCount }
};
sb.AppendLine(processCount.ToMarkdownTable());
sb.AppendLine("\n---\n");
sb.AppendLine("## Table Output Progress");
sb.AppendLine("## 表输入/输出计数");
var tableOutputProgress = _context.TableProgress.Select(pair =>
new { Table = pair.Key, Count = pair.Value }).OrderBy(s => s.Table);
new { = pair.Key, = pair.Value }).OrderBy(s => s.);
sb.AppendLine(tableOutputProgress.ToMarkdownTable());
sb.AppendLine("\n---\n");
sb.AppendLine("## Result");
sb.AppendLine("## 总览");
var elapsedTime = (_stopwatch!.ElapsedMilliseconds / 1000f);
var result = new[]
{
new { Field = "ElapsedTime", Value = elapsedTime.ToString("F2") },
new { = "耗时", = elapsedTime.ToString("F2") + " 秒" },
new
{
Field = "Average Output Speed",
Value = (_context.OutputCount / elapsedTime).ToString("F2") + "records/s"
}
= "平均输出速度",
= (_context.OutputCount / elapsedTime).ToString("F2") + " 条记录/秒"
},
new { = "内存占用峰值", = _context.MaxMemoryUsage + " 兆字节" }
};
sb.AppendLine(result.ToMarkdownTable());
await File.WriteAllTextAsync(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md"),

View File

@@ -1,4 +1,5 @@
using System.Buffers;
using MesETL.App.Const;
using MesETL.App.Helpers;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
@@ -13,6 +14,8 @@ using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
namespace MesETL.App.HostedServices;
public record DataOutputContext(IServiceProvider Serivces);
/// <summary>
/// 数据导出服务将数据导出至MySql服务
/// </summary>
@@ -23,23 +26,26 @@ public class OutputService : IOutputService
private readonly ProcessContext _context;
private readonly ErrorRecorderFactory _errorRecorderFactory;
private readonly RecordQueuePool _queuePool;
private readonly IServiceProvider _services;
public OutputService(ILogger<OutputService> logger,
IOptions<DatabaseOutputOptions> outputOptions,
ProcessContext context,
RecordQueuePool queuePool,
ErrorRecorderFactory errorRecorderFactory)
ErrorRecorderFactory errorRecorderFactory,
IServiceProvider services)
{
_logger = logger;
_outputOptions = outputOptions;
_context = context;
_queuePool = queuePool;
_errorRecorderFactory = errorRecorderFactory;
_services = services;
}
public async Task ExecuteAsync(CancellationToken ct)
{
_logger.LogInformation("***** Output service started *****");
_logger.LogInformation("***** 输出服务已启动 *****");
var dbTaskManager = new TaskManager(5);
var dbTasks = new Dictionary<string, Task>();
while (!_context.IsTransformCompleted)
@@ -59,7 +65,8 @@ public class OutputService : IOutputService
await TaskExtensions.WaitUntil(() => dbTaskManager.RunningTaskCount == 0, 25, ct);
_context.CompleteOutput();
_logger.LogInformation("***** Output service finished *****");
_outputOptions.Value.OutputFinished?.Invoke(new DataOutputContext(_services));
_logger.LogInformation("***** 输出服务执行完毕 *****");
}
private async Task StartDatabaseWorker(string db, DataRecordQueue queue, CancellationToken ct = default)
@@ -72,8 +79,9 @@ public class OutputService : IOutputService
{
if (ct.IsCancellationRequested)
break;
if (!queue.TryDequeue(out var record) || ignoreOutput.Contains(record.TableName)) continue;
if (!queue.TryDequeue(out var record) || record.Ignore || ignoreOutput.Contains(record.TableName))
continue;
var dbName = record.Database ?? throw new ApplicationException("输出的记录缺少数据库名");
if(dbName != db)
@@ -109,7 +117,7 @@ public class OutputService : IOutputService
await FlushAsync(db, tmp);
}
_logger.LogInformation("*****输出线程结束,数据库: {db} *****", db);
_logger.LogInformation("***** 输出线程结束,数据库: {db} *****", db);
}
private async Task FlushAsync(string dbName, IEnumerable<DataRecord> records)
@@ -142,6 +150,6 @@ public class OutputService : IOutputService
_context.AddOutput(value);
_context.AddTableOutput(key, value);
}
_logger.LogTrace("Flushed {Count} records", tableOutput.Values.Sum(i => i));
_logger.LogTrace("输出任务:刷新了 {Count} 条记录", tableOutput.Values.Sum(i => i));
}
}

View File

@@ -80,27 +80,29 @@ public class TaskMonitorService
// running, error, completed, canceled, outputSpeed);
foreach (var logger in _monitorLoggers)
{
logger.LogStatus("Monitor: Progress status", new Dictionary<string, string>
var memory = GC.GetTotalMemory(false) / 1024 / 1024;
_context.MaxMemoryUsage = Math.Max(_context.MaxMemoryUsage, memory);
logger.LogStatus("系统监控", new Dictionary<string, string>
{
{"Input",_context.IsInputCompleted ? "OK" : $"{inputSpeed:F2}/s" },
{"Transform", _context.IsTransformCompleted ? "OK" : $"{transformSpeed:F2}/s" },
{"Output", _context.IsOutputCompleted ? "OK" : $"{outputSpeed:F2}/s" },
{"输入速度",_context.IsInputCompleted ? "OK" : $"{inputSpeed:F2}/s" },
{"转换速度", _context.IsTransformCompleted ? "OK" : $"{transformSpeed:F2}/s" },
{"输出速度", _context.IsOutputCompleted ? "OK" : $"{outputSpeed:F2}/s" },
{"| Input Queue", _producerQueue.Count.ToString() },
{"Output Queue", _queuePool.Queues.Values.Sum(queue => queue.Count).ToString()},
{"Memory", $"{GC.GetTotalMemory(false) / 1024 / 1024} MiB"},
{"| 输入队列长度", _producerQueue.Count.ToString() },
{"输出队列长度", _queuePool.Queues.Values.Sum(queue => queue.Count).ToString()},
{"内存使用", $"{memory} MiB"},
});
var dict = _context.TableProgress
.ToDictionary(kv => kv.Key, kv => $"{kv.Value.input}/{kv.Value.output}");
logger.LogStatus("Monitor: Table progress", dict, ITaskMonitorLogger.LogLevel.Progress);
var sb = new StringBuilder("Table Progress: \n");
logger.LogStatus("系统监控: 表处理进度", dict, ITaskMonitorLogger.LogLevel.Progress);
var sb = new StringBuilder("表处理进度:\n");
foreach (var kv in dict)
{
sb.Append(kv.Key).AppendLine(kv.Value);
}
sb.AppendLine($"LongestCharCount: {_producerQueue.LongestFieldCharCount}");
sb.AppendLine($"数据记录字段的最大长度:{_producerQueue.LongestFieldCharCount}");
await File.WriteAllTextAsync(_outputPath, sb.ToString(), CancellationToken.None);

View File

@@ -1,4 +1,5 @@
using MesETL.App.Cache;
using MesETL.App.Const;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
@@ -9,7 +10,7 @@ using Microsoft.Extensions.Options;
namespace MesETL.App.HostedServices;
public record DataTransformContext(DataRecord Record, ICacher Cacher, ILogger Logger);
public record DataTransformContext(DataRecord Record, ICacher Cacher, ILogger Logger, IServiceProvider Services);
/// <summary>
/// 数据处理服务,对导入后的数据进行处理
@@ -23,6 +24,7 @@ public class TransformService : ITransformService
private readonly ProcessContext _context;
private readonly ICacher _cache;
private readonly ErrorRecorderFactory _errorRecorderFactory;
private readonly IServiceProvider _services;
public TransformService(ILogger<TransformService> logger,
@@ -31,7 +33,8 @@ public class TransformService : ITransformService
RecordQueuePool queuePool,
ProcessContext context,
ICacher cache,
ErrorRecorderFactory errorRecorderFactory)
ErrorRecorderFactory errorRecorderFactory,
IServiceProvider services)
{
_logger = logger;
_options = options;
@@ -40,11 +43,12 @@ public class TransformService : ITransformService
_context = context;
_cache = cache;
_errorRecorderFactory = errorRecorderFactory;
_services = services;
}
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
_logger.LogInformation("***** 数据转换服务已启动, 当前线程ID: {ThreadId} *****", Environment.CurrentManagedThreadId);
// var tasks = new List<Task>();
// for (int i = 0; i < 4; i++)
@@ -55,7 +59,7 @@ public class TransformService : ITransformService
// await Task.WhenAll(tasks);
await TransformWorker();
_logger.LogInformation("***** Data transformation service finished *****");
_logger.LogInformation("***** 数据转换服务执行完毕 *****");
}
public async Task TransformWorker()
@@ -66,10 +70,10 @@ public class TransformService : ITransformService
{
continue;
}
try
{
var context = new DataTransformContext(record, _cache, _logger);
var context = new DataTransformContext(record, _cache, _logger, _services);
if (_options.Value.EnableFilter)
{
// 数据过滤
@@ -85,7 +89,7 @@ public class TransformService : ITransformService
{
record = await replacer(context);
}
}
}
// 字段缓存
var cacher = _options.Value.RecordCache;
@@ -96,9 +100,6 @@ public class TransformService : ITransformService
var dbFilter = _options.Value.DatabaseFilter
?? throw new ApplicationException("未配置数据库过滤器");
record.Database = dbFilter(record);
await _queuePool[record.Database].EnqueueAsync(record);
_context.AddTransform();
if (_options.Value.EnableReBuilder)
{
@@ -109,12 +110,15 @@ public class TransformService : ITransformService
foreach (var rc in addRecords)
{
if(dbFilter is not null)
rc.Database =dbFilter.Invoke(record);
rc.Database = dbFilter.Invoke(record);
await _queuePool[record.Database].EnqueueAsync(rc);
_context.AddTransform();
}
}
}
await _queuePool[record.Database].EnqueueAsync(record);
_context.AddTransform();
}
catch (Exception e)
{