From 0e28d639c18c043671d7293e595d0cf0e1e54087 Mon Sep 17 00:00:00 2001 From: "2817212736@qq.com" <2817212736@qq.com> Date: Tue, 10 Dec 2024 14:03:09 +0800 Subject: [PATCH] =?UTF-8?q?2025=E8=BF=81=E7=A7=BB=E7=89=88=E6=9C=AC?= =?UTF-8?q?=EF=BC=8C=E5=A4=9A=E9=A1=B9=E8=A7=84=E5=88=99=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- MesETL.App/Const/TableNames.cs | 3 + MesETL.App/DataRecord.cs | 85 ++- MesETL.App/Helpers/DumpDataHelper.cs | 81 ++- MesETL.App/HostedServices/FileInputService.cs | 76 ++- .../HostedServices/MainHostedService.cs | 38 +- MesETL.App/HostedServices/OutputService.cs | 22 +- .../HostedServices/TaskMonitorService.cs | 22 +- MesETL.App/HostedServices/TransformService.cs | 26 +- MesETL.App/Options/DataInputOptions.cs | 26 +- MesETL.App/Options/DataTransformOptions.cs | 10 +- MesETL.App/Options/DatabaseOutputOptions.cs | 43 +- MesETL.App/Options/RedisCacheOptions.cs | 12 + MesETL.App/Options/TableMockConfig.cs | 3 + MesETL.App/Options/TenantDbOptions.cs | 20 +- MesETL.App/Program.cs | 551 +++++------------- MesETL.App/Services/ETL/MySqlDestination.cs | 41 +- .../Loggers/LoggerTaskMonitorLogger.cs | 2 +- MesETL.App/Services/ProcessContext.cs | 2 + MesETL.App/Services/Seq/SeqConfig.cs | 42 ++ MesETL.App/Services/Seq/SeqService.cs | 134 +++++ MesETL.App/appsettings.json | 22 +- MesETL.Shared/Helper/DatabaseHelper.cs | 33 ++ MesETL.Shared/Helper/Extensions.Lang.cs | 63 ++ MesETL.Shared/Helper/Helper.Compress.cs | 22 + MesETL.Test/DatabaseToolBox.cs | 28 +- MesETL.Test/InputServiceTest.cs | 54 ++ MesETL.Test/MesETL.Test.csproj | 1 + MesETL.Test/Services/SeqServiceTests.cs | 33 ++ .../XUnit/Configuration/XUnitConfiguration.cs | 16 + MesETL.Test/XUnit/Logging/XUnitLogger.cs | 39 ++ .../XUnit/Logging/XUnitLoggerExtensions.cs | 13 + .../XUnit/Logging/XUnitLoggerProvider.cs | 20 + MesETL.Test/XUnit/TestBase.cs | 38 ++ MesETL.Test/XUnit/XUnitConsoleWriter.cs | 18 + 34 files changed, 1075 insertions(+), 564 deletions(-) create mode 100644 MesETL.App/Services/Seq/SeqConfig.cs create mode 100644 MesETL.App/Services/Seq/SeqService.cs create mode 100644 MesETL.Shared/Helper/Extensions.Lang.cs create mode 100644 MesETL.Shared/Helper/Helper.Compress.cs create mode 100644 MesETL.Test/InputServiceTest.cs create mode 100644 MesETL.Test/Services/SeqServiceTests.cs create mode 100644 MesETL.Test/XUnit/Configuration/XUnitConfiguration.cs create mode 100644 MesETL.Test/XUnit/Logging/XUnitLogger.cs create mode 100644 MesETL.Test/XUnit/Logging/XUnitLoggerExtensions.cs create mode 100644 MesETL.Test/XUnit/Logging/XUnitLoggerProvider.cs create mode 100644 MesETL.Test/XUnit/TestBase.cs create mode 100644 MesETL.Test/XUnit/XUnitConsoleWriter.cs diff --git a/MesETL.App/Const/TableNames.cs b/MesETL.App/Const/TableNames.cs index 06a42b2..c68cdf6 100644 --- a/MesETL.App/Const/TableNames.cs +++ b/MesETL.App/Const/TableNames.cs @@ -8,9 +8,11 @@ public static class TableNames public const string OrderBlockPlanItem = "order_block_plan_item"; public const string OrderBlockPlanResult = "order_block_plan_result"; public const string OrderBoxBlock = "order_box_block"; + public const string OrderBoxViewConfig = "order_box_view_config"; public const string OrderDataBlock = "order_data_block"; public const string OrderDataGoods = "order_data_goods"; public const string OrderDataParts = "order_data_parts"; + public const string OrderExtra = "order_extra"; public const string OrderItem = "order_item"; public const string OrderModule = "order_module"; public const string OrderModuleExtra = "order_module_extra"; @@ -23,6 +25,7 @@ public static class TableNames public const string OrderProcessStep = "order_process_step"; public const string OrderProcessStepItem = "order_process_step_item"; public const string OrderScrapBoard = "order_scrap_board"; + public const string OrderWaveGroup = "order_wave_group"; public const string ProcessGroup = "process_group"; public const string ProcessInfo = "process_info"; public const string ProcessItemExp = "process_item_exp"; diff --git a/MesETL.App/DataRecord.cs b/MesETL.App/DataRecord.cs index c4c79c6..6ec74bb 100644 --- a/MesETL.App/DataRecord.cs +++ b/MesETL.App/DataRecord.cs @@ -2,6 +2,14 @@ public class DataRecord : ICloneable { + /// + /// 尝试获取一条记录的某个字段值 + /// + /// + /// + /// + /// + /// public static bool TryGetField(DataRecord record, string columnName, out string value) { value = string.Empty; @@ -14,6 +22,15 @@ public class DataRecord : ICloneable return true; } + /// + /// 获取一条记录的某个字段值 + /// TODO: 最好能优化至O(1) + /// + /// + /// + /// + /// + /// public static string GetField(DataRecord record, string columnName) { if (record.Headers is null) @@ -24,7 +41,7 @@ public class DataRecord : ICloneable $"Column name '{columnName}' not found in this record, table name '{record.TableName}'."); return record.Fields[idx]; } - + private static int IndexOfIgnoreCase(IList list, string value) { var idx = -1; @@ -40,11 +57,30 @@ public class DataRecord : ICloneable return idx; } + /// + /// 字段列表 + /// public IList Fields { get; } + /// + /// 表头列表 + /// public IList Headers { get; } + /// + /// 来源表名 + /// public string TableName { get; } + /// + /// 需要输出的数据库 + /// public string? Database { get; set; } + /// + /// 所有字段的总字符数量 + /// public long FieldCharCount { get; } + /// + /// 忽略这个记录,不会被输出 + /// + public bool Ignore { get; set; } public DataRecord(IEnumerable fields, string tableName, IEnumerable headers, string? database = null) @@ -62,45 +98,64 @@ public class DataRecord : ICloneable FieldCharCount = Fields.Sum(x => (long)x.Length); } + /// + /// 使用索引访问字段 + /// + /// public string this[int index] { get => Fields[index]; set => Fields[index] = value; } + /// + /// 使用列名访问字段,不区分大小写 + /// + /// public string this[string columnName] { get => GetField(this, columnName); set => SetField(columnName, value); } - public int FieldCount => Fields.Count; - + /// + /// 尝试获取字段值 + /// + /// + /// + /// public bool TryGetField(string columnName, out string value) => TryGetField(this, columnName, out value); + /// + /// 为一个字段赋值 + /// + /// + /// + /// public bool SetField(string columnName, string value) => SetField(this, columnName, value); - public bool SetField(DataRecord record, string columnName, string value) + /// + /// + /// + /// + /// + /// + /// + /// + /// + public static bool SetField(DataRecord record, string columnName, string value) { + // 表头检查 if (record.Headers is null) - throw new InvalidOperationException("Headers have not been set."); + throw new InvalidOperationException("记录的表头尚未设置,无法赋值"); var idx = IndexOfIgnoreCase(record.Headers, columnName); if (idx is -1) throw new IndexOutOfRangeException( - $"Column name '{columnName}' not found in this record, table name '{record.TableName}"); + $"列 '{columnName}' 不存在于该纪录中,表名 '{record.TableName}"); record.Fields[idx] = value; return true; } - public void AddField(string columnName, string value) - { - if (IndexOfIgnoreCase(Headers, columnName) != -1) - throw new InvalidOperationException($"{TableName}: 列名 '{columnName}' 已存在"); - - Fields.Add(value); - Headers.Add(columnName); - } - public void RemoveField(string columnName) { var idx = IndexOfIgnoreCase(Headers, columnName); diff --git a/MesETL.App/Helpers/DumpDataHelper.cs b/MesETL.App/Helpers/DumpDataHelper.cs index 148bcbf..26ecb7a 100644 --- a/MesETL.App/Helpers/DumpDataHelper.cs +++ b/MesETL.App/Helpers/DumpDataHelper.cs @@ -1,5 +1,7 @@ using System.Text.Json; using System.Text.RegularExpressions; +using MesETL.App.HostedServices; +using Serilog; using ZstdSharp; namespace MesETL.App.Helpers; @@ -27,16 +29,13 @@ public static partial class DumpDataHelper string[] ParseHeader(ReadOnlySpan headerStr) { headerStr = headerStr[1..^1]; - Span ranges = stackalloc Range[50]; - var count = headerStr.Split(ranges, ','); - var arr = new string[count]; - - for (var i = 0; i < count; i++) + var headers = new List(); + foreach (var range in headerStr.Split(',')) { - arr[i] = headerStr[ranges[i]].Trim("@`").ToString(); // 消除列名的反引号,如果是变量则消除@ + headers.Add(headerStr[range].Trim("@`").ToString()); // 消除列名的反引号,如果是变量则消除@ } - - return arr; + + return headers.ToArray(); } } @@ -45,6 +44,7 @@ public static partial class DumpDataHelper /// /// /// + [Obsolete("用ParseMyDumperFile替代")] public static string GetTableNameFromCsvFileName(ReadOnlySpan filePath) { filePath = filePath[(filePath.LastIndexOf('\\') + 1)..]; @@ -68,6 +68,30 @@ public static partial class DumpDataHelper return filePath[(firstDotIdx+1)..secondDotIdx].ToString(); } + + public enum MyDumperFileType { Dat, Sql } + + public record MyDumperFileMeta(string Path, string Database, string TableName, int Index, MyDumperFileType Type); + + public static MyDumperFileMeta ParseMyDumperFile(ReadOnlySpan path) + { + try + { + var fileName = Path.GetFileName(path).ToString(); + var parts = fileName.Split('.'); + var type = parts[3] switch + { + "dat" => MyDumperFileType.Dat, + "sql" => MyDumperFileType.Sql, + _ => throw new ArgumentException("不支持的MyDumper文件类型", nameof(path)) + }; + return new MyDumperFileMeta(path.ToString(), parts[0], parts[1], int.Parse(parts[2]), type); + } + catch (Exception e) + { + throw new ArgumentException($"此文件不是MyDumper导出的文件 {path}", nameof(path), e); + } + } /// /// 从MyDumper导出的SQL文件内容中读取CSV文件名 @@ -122,17 +146,46 @@ public static partial class DumpDataHelper var reader = new StreamReader(ds); return await reader.ReadToEndAsync(); } - - public static bool IsJson(string str) + + /// + /// 适用于文件输入服务以及MyDumper Zst导出目录的文件元数据构建函数 + /// + /// + /// + /// + public static FileInputInfo? MyDumperFileInputMetaBuilder(string filePath) { + // 只查找后缀为.dat.zst的文件 + if (!filePath.EndsWith(".dat.zst")) return null; + + var fileMeta = ParseMyDumperFile(filePath); + var inputDir = Path.GetDirectoryName(filePath); + string[]? headers; try { - JsonDocument.Parse(str); - return true; + // 查找同目录下同表的SQL文件 + var sqlFile = Directory.GetFiles(inputDir!) + .SingleOrDefault(f => f.Equals(filePath.Replace(".dat.zst", ".sql.zst"))); + if (sqlFile is null) + { + Log.Debug("{TableName}表的SQL文件不存在", fileMeta.TableName); + return null; + } + headers = GetCsvHeadersFromSqlFile( + DecompressZstAsStringAsync(File.OpenRead(sqlFile)).Result); } - catch (JsonException) + catch (InvalidOperationException e) { - return false; + throw new ApplicationException($"目录下不止一个{fileMeta.TableName}表的SQL文件", e); } + + return new FileInputInfo + { + FileName = filePath, + TableName = fileMeta.TableName, + Headers = headers, + Database = fileMeta.Database, + Part = fileMeta.Index + }; } } \ No newline at end of file diff --git a/MesETL.App/HostedServices/FileInputService.cs b/MesETL.App/HostedServices/FileInputService.cs index 4c973b6..e1ce00b 100644 --- a/MesETL.App/HostedServices/FileInputService.cs +++ b/MesETL.App/HostedServices/FileInputService.cs @@ -1,4 +1,5 @@ -using MesETL.App.HostedServices.Abstractions; +using MesETL.App.Const; +using MesETL.App.HostedServices.Abstractions; using MesETL.App.Options; using MesETL.App.Services; using MesETL.App.Services.ETL; @@ -13,16 +14,11 @@ public record FileInputInfo { public required string FileName { get; init; } public required string TableName { get; init; } + public required string Database { get; init; } + public required int Part { get; init; } public required string[] Headers { get; init; } } -public enum FileInputType -{ - MyDumperCsv, - MyDumperZst, - ErrorLog, -} - /// /// 从输入目录中导入文件 /// @@ -53,33 +49,23 @@ public class FileInputService : IInputService public async Task ExecuteAsync(CancellationToken cancellationToken) { var inputDir = _dataInputOptions.Value.InputDir ?? throw new ApplicationException("未配置文件输入目录"); - _logger.LogInformation("***** Input service started, working directory: {InputDir} *****", inputDir); - - var trans = _dataInputOptions.Value.FileInputMetaBuilder; - if(trans is null) throw new ApplicationException("未配置文件名-表名映射委托"); - FileInputInfo[] infoArr = Directory.GetFiles(inputDir) - .Select(f => trans(f)) - .Where(info => info is not null).ToArray()!; - - var orderedInfo = GetFilesInOrder(infoArr).ToArray(); - - _logger.LogInformation("***** {Count} files founded in directory,{OrderedCount} files is matched with configuration *****", infoArr.Length, orderedInfo.Length); - foreach (var info in orderedInfo) - { - _logger.LogDebug("Table {TableName}: {FileName}", info.TableName, info.FileName); - } + _logger.LogInformation("***** 输入服务已启动,工作目录为:{InputDir} *****", inputDir); + var orderedInfo = GetOrderedInputInfo(inputDir); + foreach (var info in orderedInfo) { - _logger.LogInformation("Reading file: {FileName}, table: {TableName}", info.FileName, info.TableName); - using var source = _dataReaderFactory.CreateReader(info.FileName,info.TableName,info.Headers); + var file = Path.GetFileName(info.FileName); + _logger.LogInformation("正在读取文件:{FileName}, 对应的数据表:{TableName}", file, info.TableName); + using var source = _dataReaderFactory.CreateReader(info.FileName, info.TableName, info.Headers); var count = 0; while (await source.ReadAsync()) { if (GC.GetTotalMemory(false) > _memoryThreshold) { - _logger.LogWarning("内存过高,暂缓输入"); + _logger.LogWarning("内存使用率过高,暂缓输入"); + GC.Collect(); GC.Collect(); await Task.Delay(3000, cancellationToken); } @@ -90,12 +76,34 @@ public class FileInputService : IInputService } _context.AddTableInput(info.TableName, count); - _logger.LogInformation("Input of table: '{TableName}' finished", info.TableName); + _logger.LogInformation("文件 {File} 输入完成", file); _dataInputOptions.Value.OnTableInputCompleted?.Invoke(info.TableName); } _context.CompleteInput(); - _logger.LogInformation("***** Input service finished *****"); + _logger.LogInformation("***** 输入服务已执行完毕 *****"); + } + + public IEnumerable GetOrderedInputInfo(string dir) + { + var metaBuilder = _dataInputOptions.Value.FileInputMetaBuilder; + if(metaBuilder is null) throw new ApplicationException("未配置文件名->表名的映射委托函数"); + var files = Directory.GetFiles(dir); + FileInputInfo[] infoArr = files + .Select(f => metaBuilder(f)) + .Where(info => info is not null).ToArray()!; + + var orderedInfo = GetFilesInOrder(infoArr).ToArray(); + + _logger.LogInformation("***** 输入目录中发现 {Count} 个文件, {InfoCount} 个文件可用,{OrderedCount} 个文件符合当前输入配置 *****", + files.Length, infoArr.Length, orderedInfo.Length); + foreach (var info in orderedInfo.GroupBy(i => i.TableName)) + { + _logger.LogDebug("表 {TableName} 发现 {FileCount} 个对应文件:\n{FileName}", + info.Key, info.Count(), string.Join('\n', info.Select(f => f.FileName))); + } + + return orderedInfo; } /// @@ -104,7 +112,7 @@ public class FileInputService : IInputService /// private IEnumerable GetFilesInOrder(FileInputInfo[] inputFiles) { - var tableOrder = _dataInputOptions.Value.TableOrder; + var tableOrder = _dataInputOptions.Value.TableOrder ?? typeof(TableNames).GetFields().Select(f => f.GetValue(null) as string).ToArray(); var ignoreTable = _dataInputOptions.Value.TableIgnoreList; if (tableOrder is null or { Length: 0 }) return inputFiles; @@ -115,10 +123,14 @@ public class FileInputService : IInputService { foreach (var tableName in tableOrder) { - var target = inputFiles.FirstOrDefault(f => - f.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase)); - if (target is not null && !ignoreTable.Contains(target.TableName)) + var targets = inputFiles.Where(f => + f.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase) && + !ignoreTable.Contains(f.TableName)); + + foreach (var target in targets) + { yield return target; + } } } } diff --git a/MesETL.App/HostedServices/MainHostedService.cs b/MesETL.App/HostedServices/MainHostedService.cs index 233a68c..2b1f7b2 100644 --- a/MesETL.App/HostedServices/MainHostedService.cs +++ b/MesETL.App/HostedServices/MainHostedService.cs @@ -51,12 +51,13 @@ public class MainHostedService : BackgroundService protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - _logger.LogInformation("Command argument detected, execute for each database"); var command = _config["Command"]; if (!string.IsNullOrEmpty(command)) { - _logger.LogInformation("***** Running Sql Command *****"); + _logger.LogInformation("检测到命令参数传入,将对所有配置的数据库执行输入的命令。。。"); + _logger.LogInformation("***** 执行SQL命令 *****"); await ExecuteEachDatabase(command, stoppingToken); + _logger.LogInformation("***** 执行完成 *****"); Environment.Exit(0); } @@ -75,8 +76,8 @@ public class MainHostedService : BackgroundService await Task.WhenAll(inputTask, transformTask, outputTask); _stopwatch.Stop(); - _logger.LogInformation("***** All tasks completed *****"); - _logger.LogInformation("***** ElapseTime: {Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3")); + _logger.LogInformation("***** 所有传输任务均已完成 *****"); + _logger.LogInformation("***** 耗时:{Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3")); await Task.Delay(5000, stoppingToken); if(enableUnsafeVar) @@ -84,7 +85,7 @@ public class MainHostedService : BackgroundService if (!stoppingToken.IsCancellationRequested) { await ExportResultAsync(); - _logger.LogInformation("The execution result export to {Path}", + _logger.LogInformation("传输结果已保存至 {Path}", Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md")); Environment.Exit(0); @@ -157,32 +158,33 @@ public class MainHostedService : BackgroundService { var sb = new StringBuilder(); if (_context.HasException) - sb.AppendLine("# Program Completed With Error"); - else sb.AppendLine("# Program Completed Successfully"); - sb.AppendLine("## Process Count"); + sb.AppendLine("# 程序执行完毕,**但中途发生了异常**"); + else sb.AppendLine("# 程序执行完毕,没有发生错误"); + sb.AppendLine("## 处理计数"); var processCount = new[] { - new { State = "Input", Count = _context.InputCount }, - new { State = "Transform", Count = _context.TransformCount }, - new { State = "Output", Count = _context.OutputCount } + new { 操作 = "输入", 数量 = _context.InputCount }, + new { 操作 = "转换", 数量 = _context.TransformCount }, + new { 操作 = "输出", 数量 = _context.OutputCount } }; sb.AppendLine(processCount.ToMarkdownTable()); sb.AppendLine("\n---\n"); - sb.AppendLine("## Table Output Progress"); + sb.AppendLine("## 表输入/输出计数"); var tableOutputProgress = _context.TableProgress.Select(pair => - new { Table = pair.Key, Count = pair.Value }).OrderBy(s => s.Table); + new { 表名 = pair.Key, 计数 = pair.Value }).OrderBy(s => s.表名); sb.AppendLine(tableOutputProgress.ToMarkdownTable()); sb.AppendLine("\n---\n"); - sb.AppendLine("## Result"); + sb.AppendLine("## 总览"); var elapsedTime = (_stopwatch!.ElapsedMilliseconds / 1000f); var result = new[] { - new { Field = "ElapsedTime", Value = elapsedTime.ToString("F2") }, + new { 条目 = "耗时", 值 = elapsedTime.ToString("F2") + " 秒" }, new { - Field = "Average Output Speed", - Value = (_context.OutputCount / elapsedTime).ToString("F2") + "records/s" - } + 条目 = "平均输出速度", + 值 = (_context.OutputCount / elapsedTime).ToString("F2") + " 条记录/秒" + }, + new { 条目 = "内存占用峰值", 值 = _context.MaxMemoryUsage + " 兆字节" } }; sb.AppendLine(result.ToMarkdownTable()); await File.WriteAllTextAsync(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md"), diff --git a/MesETL.App/HostedServices/OutputService.cs b/MesETL.App/HostedServices/OutputService.cs index 101b516..901c762 100644 --- a/MesETL.App/HostedServices/OutputService.cs +++ b/MesETL.App/HostedServices/OutputService.cs @@ -1,4 +1,5 @@ using System.Buffers; +using MesETL.App.Const; using MesETL.App.Helpers; using MesETL.App.HostedServices.Abstractions; using MesETL.App.Options; @@ -13,6 +14,8 @@ using TaskExtensions = MesETL.Shared.Helper.TaskExtensions; namespace MesETL.App.HostedServices; +public record DataOutputContext(IServiceProvider Serivces); + /// /// 数据导出服务,将数据导出至MySql服务 /// @@ -23,23 +26,26 @@ public class OutputService : IOutputService private readonly ProcessContext _context; private readonly ErrorRecorderFactory _errorRecorderFactory; private readonly RecordQueuePool _queuePool; + private readonly IServiceProvider _services; public OutputService(ILogger logger, IOptions outputOptions, ProcessContext context, RecordQueuePool queuePool, - ErrorRecorderFactory errorRecorderFactory) + ErrorRecorderFactory errorRecorderFactory, + IServiceProvider services) { _logger = logger; _outputOptions = outputOptions; _context = context; _queuePool = queuePool; _errorRecorderFactory = errorRecorderFactory; + _services = services; } public async Task ExecuteAsync(CancellationToken ct) { - _logger.LogInformation("***** Output service started *****"); + _logger.LogInformation("***** 输出服务已启动 *****"); var dbTaskManager = new TaskManager(5); var dbTasks = new Dictionary(); while (!_context.IsTransformCompleted) @@ -59,7 +65,8 @@ public class OutputService : IOutputService await TaskExtensions.WaitUntil(() => dbTaskManager.RunningTaskCount == 0, 25, ct); _context.CompleteOutput(); - _logger.LogInformation("***** Output service finished *****"); + _outputOptions.Value.OutputFinished?.Invoke(new DataOutputContext(_services)); + _logger.LogInformation("***** 输出服务执行完毕 *****"); } private async Task StartDatabaseWorker(string db, DataRecordQueue queue, CancellationToken ct = default) @@ -72,8 +79,9 @@ public class OutputService : IOutputService { if (ct.IsCancellationRequested) break; - - if (!queue.TryDequeue(out var record) || ignoreOutput.Contains(record.TableName)) continue; + + if (!queue.TryDequeue(out var record) || record.Ignore || ignoreOutput.Contains(record.TableName)) + continue; var dbName = record.Database ?? throw new ApplicationException("输出的记录缺少数据库名"); if(dbName != db) @@ -109,7 +117,7 @@ public class OutputService : IOutputService await FlushAsync(db, tmp); } - _logger.LogInformation("*****输出线程结束,数据库: {db} *****", db); + _logger.LogInformation("***** 输出线程结束,数据库: {db} *****", db); } private async Task FlushAsync(string dbName, IEnumerable records) @@ -142,6 +150,6 @@ public class OutputService : IOutputService _context.AddOutput(value); _context.AddTableOutput(key, value); } - _logger.LogTrace("Flushed {Count} records", tableOutput.Values.Sum(i => i)); + _logger.LogTrace("输出任务:刷新了 {Count} 条记录", tableOutput.Values.Sum(i => i)); } } \ No newline at end of file diff --git a/MesETL.App/HostedServices/TaskMonitorService.cs b/MesETL.App/HostedServices/TaskMonitorService.cs index 88d99bd..4f331b4 100644 --- a/MesETL.App/HostedServices/TaskMonitorService.cs +++ b/MesETL.App/HostedServices/TaskMonitorService.cs @@ -80,27 +80,29 @@ public class TaskMonitorService // running, error, completed, canceled, outputSpeed); foreach (var logger in _monitorLoggers) { - logger.LogStatus("Monitor: Progress status", new Dictionary + var memory = GC.GetTotalMemory(false) / 1024 / 1024; + _context.MaxMemoryUsage = Math.Max(_context.MaxMemoryUsage, memory); + logger.LogStatus("系统监控", new Dictionary { - {"Input",_context.IsInputCompleted ? "OK" : $"{inputSpeed:F2}/s" }, - {"Transform", _context.IsTransformCompleted ? "OK" : $"{transformSpeed:F2}/s" }, - {"Output", _context.IsOutputCompleted ? "OK" : $"{outputSpeed:F2}/s" }, + {"输入速度",_context.IsInputCompleted ? "OK" : $"{inputSpeed:F2}/s" }, + {"转换速度", _context.IsTransformCompleted ? "OK" : $"{transformSpeed:F2}/s" }, + {"输出速度", _context.IsOutputCompleted ? "OK" : $"{outputSpeed:F2}/s" }, - {"| Input Queue", _producerQueue.Count.ToString() }, - {"Output Queue", _queuePool.Queues.Values.Sum(queue => queue.Count).ToString()}, - {"Memory", $"{GC.GetTotalMemory(false) / 1024 / 1024} MiB"}, + {"| 输入队列长度", _producerQueue.Count.ToString() }, + {"输出队列长度", _queuePool.Queues.Values.Sum(queue => queue.Count).ToString()}, + {"内存使用", $"{memory} MiB"}, }); var dict = _context.TableProgress .ToDictionary(kv => kv.Key, kv => $"{kv.Value.input}/{kv.Value.output}"); - logger.LogStatus("Monitor: Table progress", dict, ITaskMonitorLogger.LogLevel.Progress); - var sb = new StringBuilder("Table Progress: \n"); + logger.LogStatus("系统监控: 表处理进度", dict, ITaskMonitorLogger.LogLevel.Progress); + var sb = new StringBuilder("表处理进度:\n"); foreach (var kv in dict) { sb.Append(kv.Key).AppendLine(kv.Value); } - sb.AppendLine($"LongestCharCount: {_producerQueue.LongestFieldCharCount}"); + sb.AppendLine($"数据记录字段的最大长度:{_producerQueue.LongestFieldCharCount}"); await File.WriteAllTextAsync(_outputPath, sb.ToString(), CancellationToken.None); diff --git a/MesETL.App/HostedServices/TransformService.cs b/MesETL.App/HostedServices/TransformService.cs index 6dcaca6..c2a90d2 100644 --- a/MesETL.App/HostedServices/TransformService.cs +++ b/MesETL.App/HostedServices/TransformService.cs @@ -1,4 +1,5 @@ using MesETL.App.Cache; +using MesETL.App.Const; using MesETL.App.HostedServices.Abstractions; using MesETL.App.Options; using MesETL.App.Services; @@ -9,7 +10,7 @@ using Microsoft.Extensions.Options; namespace MesETL.App.HostedServices; -public record DataTransformContext(DataRecord Record, ICacher Cacher, ILogger Logger); +public record DataTransformContext(DataRecord Record, ICacher Cacher, ILogger Logger, IServiceProvider Services); /// /// 数据处理服务,对导入后的数据进行处理 @@ -23,6 +24,7 @@ public class TransformService : ITransformService private readonly ProcessContext _context; private readonly ICacher _cache; private readonly ErrorRecorderFactory _errorRecorderFactory; + private readonly IServiceProvider _services; public TransformService(ILogger logger, @@ -31,7 +33,8 @@ public class TransformService : ITransformService RecordQueuePool queuePool, ProcessContext context, ICacher cache, - ErrorRecorderFactory errorRecorderFactory) + ErrorRecorderFactory errorRecorderFactory, + IServiceProvider services) { _logger = logger; _options = options; @@ -40,11 +43,12 @@ public class TransformService : ITransformService _context = context; _cache = cache; _errorRecorderFactory = errorRecorderFactory; + _services = services; } public async Task ExecuteAsync(CancellationToken cancellationToken) { - _logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId); + _logger.LogInformation("***** 数据转换服务已启动, 当前线程ID: {ThreadId} *****", Environment.CurrentManagedThreadId); // var tasks = new List(); // for (int i = 0; i < 4; i++) @@ -55,7 +59,7 @@ public class TransformService : ITransformService // await Task.WhenAll(tasks); await TransformWorker(); - _logger.LogInformation("***** Data transformation service finished *****"); + _logger.LogInformation("***** 数据转换服务执行完毕 *****"); } public async Task TransformWorker() @@ -66,10 +70,10 @@ public class TransformService : ITransformService { continue; } - + try { - var context = new DataTransformContext(record, _cache, _logger); + var context = new DataTransformContext(record, _cache, _logger, _services); if (_options.Value.EnableFilter) { // 数据过滤 @@ -85,7 +89,7 @@ public class TransformService : ITransformService { record = await replacer(context); } - } + } // 字段缓存 var cacher = _options.Value.RecordCache; @@ -96,9 +100,6 @@ public class TransformService : ITransformService var dbFilter = _options.Value.DatabaseFilter ?? throw new ApplicationException("未配置数据库过滤器"); record.Database = dbFilter(record); - - await _queuePool[record.Database].EnqueueAsync(record); - _context.AddTransform(); if (_options.Value.EnableReBuilder) { @@ -109,12 +110,15 @@ public class TransformService : ITransformService foreach (var rc in addRecords) { if(dbFilter is not null) - rc.Database =dbFilter.Invoke(record); + rc.Database = dbFilter.Invoke(record); await _queuePool[record.Database].EnqueueAsync(rc); _context.AddTransform(); } } } + + await _queuePool[record.Database].EnqueueAsync(record); + _context.AddTransform(); } catch (Exception e) { diff --git a/MesETL.App/Options/DataInputOptions.cs b/MesETL.App/Options/DataInputOptions.cs index b36fdf1..a9f5560 100644 --- a/MesETL.App/Options/DataInputOptions.cs +++ b/MesETL.App/Options/DataInputOptions.cs @@ -4,6 +4,9 @@ namespace MesETL.App.Options { public class DataInputOptions { + /// + /// 文件输入的目录 + /// public string? InputDir { get; set; } #region CSV @@ -22,29 +25,46 @@ namespace MesETL.App.Options #region Mock + /// + /// 生成模拟数据进行测试 + /// 启用后在读取数据时会截取ZST文件中的CSV文件的第一条记录,然后复制成指定数量的数据 + /// public bool UseMock { get; set; } + /// + /// 当开启模拟数据生成时,模拟数据的倍数 + /// public double MockCountMultiplier { get; set; } = 1; /// - /// Table -> Mock Count 暂时为手动配置 + /// 配置每张表生成模拟数据的规则,此属性暂时在程序中配置 /// public Dictionary? TableMockConfig { get; set; } #endregion - #region ManualSet + #region Reader + /// + /// 配置输入表及其顺序,如果为空则按照程序默认的顺序。 + /// 该值如果存在,程序会按照集合中表的顺序来读取数据,不在集合中的表将被忽略! + /// public string[]? TableOrder { get; set; } + /// + /// 忽略集合中配置的表,不进行读取 + /// public string[] TableIgnoreList { get; set; } = []; /// /// 配置如何从文件名转换为表名和表头 /// - public Func? FileInputMetaBuilder { get; set; } //TODO: 抽离 + public Func? FileInputMetaBuilder { get; set; } + /// + /// 表输入完成事件 + /// public Action? OnTableInputCompleted { get; set; } #endregion diff --git a/MesETL.App/Options/DataTransformOptions.cs b/MesETL.App/Options/DataTransformOptions.cs index c80af1b..f1c129e 100644 --- a/MesETL.App/Options/DataTransformOptions.cs +++ b/MesETL.App/Options/DataTransformOptions.cs @@ -20,26 +20,26 @@ public class DataTransformOptions /// /// yyyyMM /// - public string CleanDate { get; set; } = "202301"; + public string CleanDate { get; set; } = "202401"; /// /// Record -> Database name - /// 对记录进行数据库过滤 + /// 决定记录应当被插入到哪一个数据库中 /// public Func? DatabaseFilter { get; set; } /// /// Context -> Should output - /// 配置对数据过滤的条件 + /// 对记录进行过滤,返回false则不输出 /// public Func>? RecordFilter { get; set; }//数据过滤方法 /// /// Context -> New record - /// 对当前记录进行修改或完整替换 + /// 对当前记录进行修改或完整替换,你可以在这里修改记录中的字段,或者新增/删除字段 /// public Func>? RecordModify { get; set; }//数据替换 /// /// Context -> New rebuild records - /// 使用当前记录对某些数据进行重建 + /// 基于当前记录新增多个记录 /// public Func?>? RecordReBuild { get; set; }//新增数据 /// diff --git a/MesETL.App/Options/DatabaseOutputOptions.cs b/MesETL.App/Options/DatabaseOutputOptions.cs index d2a9714..0db426a 100644 --- a/MesETL.App/Options/DatabaseOutputOptions.cs +++ b/MesETL.App/Options/DatabaseOutputOptions.cs @@ -1,29 +1,66 @@ -namespace MesETL.App.Options; +using MesETL.App.HostedServices; + +namespace MesETL.App.Options; public class DatabaseOutputOptions { + /// + /// 输出数据库的连接字符串 + /// public string? ConnectionString { get; set; } + /// + /// MySql max_allowed_packet变量值大小 + /// public int MaxAllowedPacket { get; set; } = 32 * 1024 * 1024; + /// + /// 每次Insert提交的数据量 + /// public int FlushCount { get; set; } = 10000; + /// + /// 每个数据库最大提交任务数 + /// public int MaxDatabaseOutputTask { get; set; } = 4; + /// + /// 将json列作为16进制格式输出(0x前缀) + /// public bool TreatJsonAsHex { get; set; } = true; + /// + /// 不对某些表进行输出 + /// public string[] NoOutput { get; set; } = []; + /// + /// 当某张表的键出现重复时,在输出时使用ON DUPLICATE KEY UPDATE更新该条记录 + /// 表名为键,更新的字段为值 + /// + /// + /// { + /// // 当order_data_parts表的键出现重复时,使用ON DUPLICATE KEY UPDATE更新已存在记录的CompanyID为新插入记录的值 + /// "order_data_parts": "CompanyID = new.CompanyID" + /// } + /// + /// + /// public Dictionary? ForUpdate { get; set; } /// - /// 配置导入数据的特殊列 + /// 配置导入数据的特殊列,请在代码中配置 /// public Dictionary ColumnTypeConfig { get; set; } = new(); // "table.column" -> type + + /// + /// 所有数据都输出完毕时的事件,请在代码中配置 + /// + public Action? OutputFinished { get; set; } public ColumnType GetColumnType(string table, string column) { - return ColumnTypeConfig.GetValueOrDefault($"{table}.{column}", ColumnType.UnDefine); + return ColumnTypeConfig.GetValueOrDefault(string.Concat(table, ".", column), ColumnType.UnDefine); } public bool TryGetForUpdate(string table, out string? forUpdate) diff --git a/MesETL.App/Options/RedisCacheOptions.cs b/MesETL.App/Options/RedisCacheOptions.cs index 10dc669..145099a 100644 --- a/MesETL.App/Options/RedisCacheOptions.cs +++ b/MesETL.App/Options/RedisCacheOptions.cs @@ -1,8 +1,20 @@ namespace MesETL.App.Options; +/// +/// Redis缓存选项 +/// public class RedisCacheOptions { + /// + /// Redis连接字符串 + /// public string? Configuration { get; init; } + /// + /// Redis实例名称 + /// public string InstanceName { get; init; } = ""; + /// + /// 使用的数据库序号 + /// public int Database { get; init; } = 0; } \ No newline at end of file diff --git a/MesETL.App/Options/TableMockConfig.cs b/MesETL.App/Options/TableMockConfig.cs index ab11d63..45d29e1 100644 --- a/MesETL.App/Options/TableMockConfig.cs +++ b/MesETL.App/Options/TableMockConfig.cs @@ -1,5 +1,8 @@ namespace MesETL.App.Options; +/// +/// 表模拟数据生成规则 +/// public struct TableMockConfig { /// diff --git a/MesETL.App/Options/TenantDbOptions.cs b/MesETL.App/Options/TenantDbOptions.cs index f18cf13..0189bc6 100644 --- a/MesETL.App/Options/TenantDbOptions.cs +++ b/MesETL.App/Options/TenantDbOptions.cs @@ -1,5 +1,8 @@ namespace MesETL.App.Options; +/// +/// 多租户分库配置 +/// public class TenantDbOptions { public string? TenantKey { get; set; } @@ -16,8 +19,21 @@ public class TenantDbOptions // DbList.ForEach(pair => dictionary.Add(pair.Value, pair.Key)); // 注意配置顺序 if(DbGroup is null) throw new ApplicationException("分库配置中没有发现任何数据库"); - var dbName = DbGroup.Cast?>() - .FirstOrDefault(pair => pair?.Value != null && pair.Value.Value > tenantKeyValue)!.Value.Key; + + #region 性能较低,不使用 + + // var dbName = DbGroup.Cast?>() + // .FirstOrDefault(pair => pair?.Value != null && pair.Value.Value > tenantKeyValue)!.Value.Key; + + #endregion + + string? dbName = null; + foreach (var (key, value) in DbGroup) + { + if (value > tenantKeyValue) + dbName = key; + } + return dbName ?? throw new ArgumentOutOfRangeException(nameof(tenantKeyValue), $"分库配置中没有任何符合'{nameof(tenantKeyValue)}'值的数据库"); diff --git a/MesETL.App/Program.cs b/MesETL.App/Program.cs index c2299aa..2e66937 100644 --- a/MesETL.App/Program.cs +++ b/MesETL.App/Program.cs @@ -1,5 +1,6 @@ -// #define USE_TEST_DB // 测试库的结构与生产库不一样,如果使用测试库运行,则加上USE_TEST_DB预处理器指令 +#define USE_TEST_DB // 如果使用测试库运行,则加上USE_TEST_DB预处理器指令 +using System.Text; using MesETL.App; using MesETL.App.Services; using MesETL.App.Services.ETL; @@ -10,6 +11,7 @@ using MesETL.App.HostedServices.Abstractions; using MesETL.App.Options; using MesETL.App.Services.ErrorRecorder; using MesETL.App.Services.Loggers; +using MesETL.App.Services.Seq; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -68,9 +70,10 @@ async Task RunProgram() options.DbGroup = tenantDbOptions.DbGroup; }); host.Services.Configure(redisSection); - + var oldestTime = DateTime.ParseExact(transformOptions.CleanDate, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo); - var oldestTimeInt = int.Parse(transformOptions.CleanDate); + var oldestTimeInt_yyyyMM = int.Parse(transformOptions.CleanDate); + var oldestTimeInt_yyMM = int.Parse(transformOptions.CleanDate[2..]); // 输入配置 host.Services.Configure(options => @@ -80,151 +83,17 @@ async Task RunProgram() options.TableMockConfig = inputOptions.TableMockConfig; options.MockCountMultiplier = inputOptions.MockCountMultiplier; options.TableIgnoreList = inputOptions.TableIgnoreList; + options.TableOrder = inputOptions.TableOrder; - // 配置文件输入方法 - options.FileInputMetaBuilder = fileName => - { - if (fileName.EndsWith(".dat.zst")) - { - var tableName = DumpDataHelper.GetTableNameFromCsvFileName( - Path.GetFileNameWithoutExtension(fileName)); // 去除.zst - string[]? headers; - try - { - // 查找同目录下同表的SQL文件 - var sqlFile = Directory.GetFiles(options.InputDir) - .SingleOrDefault(f => f.Equals(fileName.Replace(".dat.zst",".sql.zst"))); - if (sqlFile is null) - return null; - headers = DumpDataHelper.GetCsvHeadersFromSqlFile( - DumpDataHelper.DecompressZstAsStringAsync(File.OpenRead(sqlFile)).Result); - } - catch (InvalidOperationException e) - { - throw new ApplicationException($"目录下不止一个{tableName}表的SQL文件", e); - } - - return new FileInputInfo - { - FileName = fileName, - TableName = tableName, - Headers = headers - }; - } - return null; - }; + // 配置文件元数据构建方法 + options.FileInputMetaBuilder = DumpDataHelper.MyDumperFileInputMetaBuilder; - // 配置表输入完成事件,字典清理 - options.OnTableInputCompleted = table => - { - switch (table) - { - case TableNames.OrderBlockPlan: - MemoryCache.Instance?.Delete(s => s.StartsWith(TableNames.Order + '-')); - break; - case TableNames.OrderItem: - MemoryCache.Instance?.Delete(s => s.StartsWith(TableNames.OrderBlockPlan + '-')); - break; - case TableNames.OrderProcessSchedule: - MemoryCache.Instance?.Delete(s => s.StartsWith(TableNames.OrderProcess + '-')); - break; - } - }; + // 配置表输入完成事件 + options.OnTableInputCompleted = null; - options.TableOrder = inputOptions.TableOrder ?? - [ - TableNames.Machine, - - TableNames.Order, - TableNames.OrderBoxBlock, // 依赖Order.CompanyID - TableNames.OrderDataBlock, // 依赖Order.CompanyID - - TableNames.OrderBlockPlan, - TableNames.OrderBlockPlanResult,// 依赖OrderBlockPlan.CompanyID / 删除 - - TableNames.OrderItem, - TableNames.OrderDataGoods, - TableNames.OrderDataParts, - TableNames.OrderModule, - TableNames.OrderModuleExtra, - TableNames.OrderModuleItem, - TableNames.OrderPackage, - - TableNames.OrderProcess, - TableNames.OrderProcessStep, - TableNames.OrderProcessStepItem,// 依赖OrderProcess.ShardKey / 删除 - - TableNames.OrderProcessSchedule, - TableNames.OrderScrapBoard, - TableNames.ProcessGroup, - TableNames.ProcessInfo, - TableNames.ProcessItemExp, - TableNames.ProcessScheduleCapacity, - TableNames.ProcessStepEfficiency, - TableNames.ReportTemplate, - TableNames.SimplePackage, - TableNames.SimplePlanOrder, - TableNames.SysConfig, - TableNames.WorkCalendar, - TableNames.WorkShift, - TableNames.WorkTime - ]; - - // options.TableMockConfig = new Dictionary - // { - // { TableNames.Machine, new TableMockConfig(true, 14655, ["ID"]) }, - // { TableNames.Order, new TableMockConfig(true, 5019216, ["OrderNo"]) }, - // { TableNames.OrderDataBlock, new TableMockConfig(true, 731800334, ["ID"]) }, - // { TableNames.OrderDataGoods, new TableMockConfig(true, 25803671, ["ID"]) }, - // { TableNames.OrderDataParts, new TableMockConfig(true, 468517543, ["ID"]) }, - // { TableNames.OrderModule, new TableMockConfig(true, 103325385, ["ID"]) }, - // { TableNames.OrderModuleExtra, new TableMockConfig(true, 54361321, ["ID"]) }, - // { TableNames.OrderModuleItem, new TableMockConfig(true, 69173339, ["ID"]) }, - // { TableNames.OrderPackage, new TableMockConfig(true, 16196195, ["ID"]) }, - // { TableNames.OrderProcess, new TableMockConfig(true, 3892685, ["ID"]) }, - // { TableNames.OrderProcessStep, new TableMockConfig(true, 8050349, ["ID"]) }, - // { TableNames.OrderProcessStepItem, new TableMockConfig(true, 14538058, ["ID"]) }, - // { TableNames.OrderScrapBoard, new TableMockConfig(true, 123998, ["ID"]) }, - // { TableNames.ProcessGroup, new TableMockConfig(true, 1253, ["ID"]) }, - // { TableNames.ProcessInfo, new TableMockConfig(true, 7839, ["ID"]) }, - // { TableNames.ProcessItemExp, new TableMockConfig(true, 28, ["ID"]) }, - // { TableNames.ProcessScheduleCapacity, new TableMockConfig(true, 39736, ["ID"]) }, - // { TableNames.ProcessStepEfficiency, new TableMockConfig(true, 8, ["ID"]) }, - // { TableNames.ReportTemplate, new TableMockConfig(true, 7337, ["ID"]) }, - // { TableNames.SimplePackage, new TableMockConfig(true, 130436, ["ID"]) }, - // { TableNames.SysConfig, new TableMockConfig(true, 2296, ["ID"]) }, - // { TableNames.WorkCalendar, new TableMockConfig(true, 11, ["ID"]) }, - // { TableNames.WorkShift, new TableMockConfig(true, 59, ["ID"]) }, - // { TableNames.WorkTime, new TableMockConfig(true, 62, ["ID"]) } - // }; + // 配置表模拟数据 options.TableMockConfig = new Dictionary - { - { TableNames.Machine, new TableMockConfig(true, 14655, ["ID"]) }, - { TableNames.Order, new TableMockConfig(true, 50192, ["OrderNo"]) }, - { TableNames.OrderDataBlock, new TableMockConfig(true, 7318003, ["ID"]) }, - { TableNames.OrderDataGoods, new TableMockConfig(true, 258036, ["ID"]) }, - { TableNames.OrderDataParts, new TableMockConfig(true, 4685175, ["ID"]) }, - { TableNames.OrderItem, new TableMockConfig(true, 13298896, ["ID"])}, - { TableNames.OrderModule, new TableMockConfig(true, 1033253, ["ID"]) }, - { TableNames.OrderModuleExtra, new TableMockConfig(true, 543613, ["ID"]) }, - { TableNames.OrderModuleItem, new TableMockConfig(true, 691733, ["ID"]) }, - { TableNames.OrderPackage, new TableMockConfig(true, 161961, ["ID"]) }, - { TableNames.OrderProcess, new TableMockConfig(true, 38926, ["ID"]) }, - { TableNames.OrderProcessStep, new TableMockConfig(true, 80503, ["ID"]) }, - { TableNames.OrderProcessStepItem, new TableMockConfig(true, 145380, ["ID"]) }, - { TableNames.OrderScrapBoard, new TableMockConfig(true, 1239, ["ID"]) }, - { TableNames.ProcessGroup, new TableMockConfig(true, 125, ["ID"]) }, - { TableNames.ProcessInfo, new TableMockConfig(true, 783, ["ID"]) }, - { TableNames.ProcessItemExp, new TableMockConfig(true, 28, ["ID"]) }, - { TableNames.ProcessScheduleCapacity, new TableMockConfig(true, 39736, ["ID"]) }, - { TableNames.ProcessStepEfficiency, new TableMockConfig(true, 8, ["ID"]) }, - { TableNames.ReportTemplate, new TableMockConfig(true, 7337, ["ID"]) }, - { TableNames.SimplePackage, new TableMockConfig(true, 130436, ["ID"]) }, - { TableNames.SysConfig, new TableMockConfig(true, 2296, ["Key"]) }, - { TableNames.WorkCalendar, new TableMockConfig(true, 11, ["ID"]) }, - { TableNames.WorkShift, new TableMockConfig(true, 59, ["ID"]) }, - { TableNames.WorkTime, new TableMockConfig(true, 62, ["ID"]) } - }; + { }; }); host.Services.Configure(options => @@ -240,109 +109,64 @@ async Task RunProgram() // order_block_plan_item和order_package_item表不导入,根据order_item数据直接重建 // 数据清理 - options.RecordFilter = async context => + options.RecordFilter = async context => // TODO: OPT: oldestTime等外部变量会产生闭包 { var record = context.Record; - var cache = context.Cacher; switch (record.TableName) { - // OrderBoxBlock删除对应Order.OrderNo不存在的对象 - case TableNames.OrderBoxBlock: - { - if (!await cache.ExistsAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"]))) - return false; - break; - } - // OrderDataBlock删除对应Order.OrderNo不存在的对象 - case TableNames.OrderDataBlock: - { - if (!await cache.ExistsAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"]))) - return false; - break; - } - // OrderDataParts删除对应Order.OrderNo不存在的对象 - case TableNames.OrderDataParts: - { - if (!await cache.ExistsAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"]))) - return false; - break; - } - // OrderBlockPlan删除CreateTime < 202301的 + // 清理CreateTime < 202401的 case TableNames.OrderBlockPlan: { - var time = DateTime.Parse(record["CreateTime"].Trim('"','\'')); - if (time < oldestTime) + var creationTime = DateTime.Parse(record["CreateTime"].AsSpan().Trim(['"', '\''])); + if (creationTime < oldestTime) + { return false; - - // if (!DumpDataHelper.IsJson(record["OrderNos"])) return false; //Json列合法检查 + } break; } - // OrderBlockPlanResult删除对应order_block_plan.ID不存在的对象 - case TableNames.OrderBlockPlanResult: + // 清理ShardKey < 24010的 + case TableNames.OrderExtra: { - if (!await cache.ExistsAsync(CacheKeysFunc.OrderBlockPlan_ID_CompanyID(record["ID"]))) + var shardKey = int.Parse(record["ShardKey"].AsSpan()[..4]); + if (shardKey < oldestTimeInt_yyMM) + { return false; + } break; } - // case TableNames.OrderBlockPlanResult: // 用SaveTime过滤 - // { - // if (DateTime.Parse(record["SaveTime"].Trim('"', '\'')) < oldestTime) - // return false; - // break; - // } - // OrderDataGoods Json列合法检查 - case TableNames.OrderDataGoods: + // 清理(Status != 0 || Deleted = 1) && ID前四位 < 2401的 + case TableNames.OrderScrapBoard: { - // if (!DumpDataHelper.IsJson(record["ExtraProp"])) return false; - break; - } - // OrderModule删除OrderNo < 202301的 - case TableNames.OrderModule: - { - var orderNo = record["OrderNo"]; - if(int.Parse(orderNo.AsSpan(0, 6).ToString()) < oldestTimeInt) + var status = record["Status"].AsSpan(); + var deleted = record["Deleted"].AsSpan(); + var idPref = int.Parse(record["ID"].AsSpan()[..4]); + if ((status is not "0" || deleted is "1") && idPref < oldestTimeInt_yyMM) + { return false; + } break; } - // OrderProcess删除OrderNo < 202301的 - case TableNames.OrderProcess: - { - var orderNo = record["OrderNo"]; - if(int.Parse(orderNo.AsSpan(0, 6).ToString()) < oldestTimeInt) - return false; - break; - } - // OrderProcessStep删除OrderNo < 202301的 - case TableNames.OrderProcessStep: - { - var orderNo = record["OrderNo"]; - if(int.Parse(orderNo.AsSpan(0, 6).ToString()) < oldestTimeInt) - return false; - break; - } - // OrderProcessStepStep删除对应OrderProcess.ID不存在的对象 - case TableNames.OrderProcessStepItem: - { - if (!await cache.ExistsAsync(CacheKeysFunc.OrderProcess_ID_ShardKey(record["OrderProcessID"]))) - return false; - break; - } - // SimplePackage删除OrderNo < 202301的 + // 清理OrderNo < 202401的 case TableNames.SimplePackage: { - var orderNo = record["OrderNo"]; - if(int.Parse(orderNo.AsSpan(0, 6).ToString()) < oldestTimeInt) + var orderNo = int.Parse(record["OrderNo"].AsSpan()[..4]); + if (orderNo < oldestTimeInt_yyMM) + { return false; + } break; } - // SimplePlanOrder删除CreateTime < 202301的 + // 清理CreateTime < 202401的 case TableNames.SimplePlanOrder: { - var time = DateTime.Parse(record["CreateTime"].Trim('"', '\'')); - if (time < oldestTime) + var creationTime = DateTime.Parse(record["CreateTime"].AsSpan().Trim(['"', '\''])); + if (creationTime < oldestTime) + { return false; + } break; } + default: break; } return true; @@ -378,94 +202,53 @@ async Task RunProgram() var cache = context.Cacher; switch (record.TableName) { - // Machine处理非空列 - case TableNames.Machine: - ReplaceIfMyDumperNull(record, "Name", DefaultStr); - ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime); - ReplaceIfMyDumperNull(record, "CreatorID", DefaultInt); - ReplaceIfMyDumperNull(record, "EditTime", DefaultDateTime); - ReplaceIfMyDumperNull(record, "EditorID", DefaultInt); - ReplaceIfMyDumperNull(record, "Settings", DefaultText); - break; - // Order处理非空列 - case TableNames.Order: - ReplaceIfMyDumperNull(record, "Deleted", DefaultInt); - break; - // OrderBlockPlan处理text->json列 - case TableNames.OrderBlockPlan: - // 将所有值为'[]'(即字符串长度小等于2(16进制长度小于4))的置空 [] = 0x5b5d - if (record["OrderNos"].Length <= 4) - record["OrderNos"] = "NULL"; - break; - // OrderBlockPlanResult,添加CompanyID - case TableNames.OrderBlockPlanResult: - record.AddField("CompanyID", - // 获取OrderBlockPlan.ID -> CompanyID - ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.OrderBlockPlan_ID_CompanyID(record["ID"])), - TableNames.OrderBlockPlanResult, TableNames.OrderBlockPlan, "ID", "无法获取对应的CompanyID")); - break; - // OrderBoxBlock添加CompanyID列 + // 重构Data列二进制数据 case TableNames.OrderBoxBlock: - record.AddField("CompanyID", - // 获取Order.OrderNo -> CompanyID - ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])), - TableNames.OrderBoxBlock, TableNames.Order, "OrderNo", "无法获取对应的CompanyID")); + { + var data = record["Data"]; + if (data is not ConstVar.MyDumperNull and ConstVar.Null) + { + var hex = Encoding.UTF8.GetString(Convert.FromHexString(data)); + record["Data"] = hex; + } + break; - // 修正OrderDataBlock.CompanyID - case TableNames.OrderDataBlock: - record["CompanyID"] = - // 获取Order.OrderNo -> CompanyID - ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])), - TableNames.OrderDataBlock, TableNames.Order, "OrderNo", "无法获取对应的CompanyID"); - break; - // 修正OrderDataParts.CompanyID: - case TableNames.OrderDataParts: - record["CompanyID"] = - // 获取Order.OrderNo -> CompanyID - ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])), - TableNames.OrderDataParts, TableNames.Order, "OrderNo", "无法获取对应的CompanyID"); - break; - // OrderModule添加ShardKey列,移除ViewFileName列 + } + // 移除ViewFileName列 case TableNames.OrderModule: - record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"])); - record.RemoveField("ViewFileName"); + { +#if USE_TEST_DB + if (record.HeaderExists("ViewFileName")) +#endif + record.RemoveField("ViewFileName"); break; - // OrderProcess添加ShardKey列,NextStepID的空值转换为0 - case TableNames.OrderProcess: - record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"])); + } +#if USE_TEST_DB + // 删除ID列,让数据库自行递增 + // TODO: 数据表改进,删除ID列或是替换为流水号 + case TableNames.ProcessStepEfficiency: + { + record.RemoveField("ID"); break; - // OrderProcessStep添加ShardKey - case TableNames.OrderProcessStep: - record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"])); + } + case TableNames.ProcessScheduleCapacity: + { + record.RemoveField("ID"); break; - // OrderProcessStepItem添加ShardKey列,处理非空列 - case TableNames.OrderProcessStepItem: - ReplaceIfMyDumperNull(record, "DataID", DefaultInt); - record.AddField("ShardKey", - // 获取OrderProcess.ID -> ShardKey - ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.OrderProcess_ID_ShardKey(record["OrderProcessID"])), - TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "无法获取对应的ShardKey")); - break; - // OrderScrapBoard处理非空列 - case TableNames.OrderScrapBoard: - ReplaceIfMyDumperNull(record, "Color", DefaultStr); - ReplaceIfMyDumperNull(record, "GoodsName", DefaultStr); - ReplaceIfMyDumperNull(record, "Material", DefaultStr); - ReplaceIfMyDumperNull(record, "MaterialName", DefaultStr); - break; - // ProcessItemExp处理非空列 - case TableNames.ProcessItemExp: - ReplaceIfMyDumperNull(record, "MaxPartsID", DefaultInt); - ReplaceIfMyDumperNull(record, "ProcessGroupID", DefaultInt); - break; - // SimplePlanOrder处理非空列,添加Deleted + } + // 测试环境忽略PlaceData列,生产环境会提前将其移除 case TableNames.SimplePlanOrder: - ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime); - ReplaceIfMyDumperNull(record, "UpdateTime", DefaultDateTime); - ReplaceIfMyDumperNull(record, "CompanyID", DefaultInt); - ReplaceIfMyDumperNull(record, "SingleName", DefaultStr); - record.AddField("Deleted", "0"); + { + record.RemoveField("PlaceData"); break; + } + case TableNames.SysConfig: + { + record.RemoveField("Key"); + break; + } +#endif + default: break; } return record; @@ -480,34 +263,7 @@ async Task RunProgram() }; // 数据缓存 - options.RecordCache = async context => - { - var record = context.Record; - var cache = context.Cacher; - switch (record.TableName) - { - // 缓存Order.OrderNo -> CompanyID - case TableNames.Order: - await cache.SetStringAsync( - CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"]), - record["CompanyID"]); - break; - - // 缓存OrderBlockPlan.ID -> CompanyID - case TableNames.OrderBlockPlan: - await cache.SetStringAsync( - CacheKeysFunc.OrderBlockPlan_ID_CompanyID(record["ID"]), - record["CompanyID"]); - break; - - // 缓存OrderProcess.ID -> ShardKey - case TableNames.OrderProcess: - await cache.SetStringAsync( - CacheKeysFunc.OrderProcess_ID_ShardKey(record["ID"]), - record["ShardKey"]); - break; - } - }; + options.RecordCache = null; // 数据库过滤 options.DatabaseFilter = record => @@ -520,36 +276,24 @@ async Task RunProgram() options.RecordReBuild = context => { var record = context.Record; - var resultList = new List(); - // 分流OrderItem表 - if (record.TableName == TableNames.OrderItem) + + // 将OrderExtra表迁移至OrderWaveGroup表 + if (record.TableName == TableNames.OrderExtra) { - record.TryGetField("ID", out var itemId); - record.TryGetField("ShardKey", out var shardKey); - record.TryGetField("PlanID", out var planId); - record.TryGetField("PackageID", out var packageId); - record.TryGetField("CompanyID", out var companyId); - if(!int.TryParse(planId, out var pid)) - throw new ApplicationException($"数据发生异常:OrderItem.PlanID,值: {planId}"); - if (pid > 0) - { - resultList.Add(new DataRecord(new[] { itemId, shardKey, planId, companyId }, - TableNames.OrderBlockPlanItem, - ["ItemID", "ShardKey", "PlanID", "CompanyID"] - )); - } - if(!int.TryParse(packageId, out var pkid)) - throw new ApplicationException($"数据发生异常:OrderItem.PackageID,值: {packageId}"); - if(pkid > 0) - { - resultList.Add(new DataRecord(new[] { itemId, shardKey, packageId, companyId }, - TableNames.OrderPackageItem, - [ "ItemID", "ShardKey", "PackageID", "CompanyID" ] - )); - } + record.Ignore = true; + var resultList = new List(); + var seq = context.Services.GetRequiredService(); + string[] headers = ["OrderNo", "ShardKey", "ConfigType", "ConfigJson", "CompanyID"]; + var id = seq.AddCachedSeq(SeqConfig.OrderWaveGroupID); + var orderWaveGroup = new DataRecord( + [id.ToString(), ..headers.Select(c => record[c])], + TableNames.OrderWaveGroup, + ["ID", "OrderNo", "ShardKey", "Type", "ConfigJson", "CompanyID"]); + resultList.Add(orderWaveGroup); + return resultList; } - return resultList; + return ArraySegment.Empty; }; }); @@ -562,73 +306,43 @@ async Task RunProgram() options.TreatJsonAsHex = outputOptions.TreatJsonAsHex; options.NoOutput = outputOptions.NoOutput; options.ForUpdate = outputOptions.ForUpdate; - -#if USE_TEST_DB - // Test Server - options.ColumnTypeConfig = new Dictionary - { - { "simple_plan_order.PlaceData", ColumnType.Blob }, - { "order_block_plan_result.PlaceData", ColumnType.Blob }, - { "order_box_block.Data", ColumnType.Blob }, - { "order_data_goods.ExtraProp", ColumnType.Json }, - { "order_module_extra.JsonStr", ColumnType.Text }, - { "process_info.Users", ColumnType.Text }, - { "order_process_schdule.CustomOrderNo", ColumnType.Text }, - { "order_process_schdule.OrderProcessStepName", ColumnType.Text }, - { "order_process_schdule.AreaName", ColumnType.Text }, - { "order_process_schdule.ConsigneeAddress", ColumnType.Text }, - { "order_process_schdule.ConsigneePhone", ColumnType.Text }, - { "report_source.Sql", ColumnType.Text }, - { "report_source.KeyValue", ColumnType.Text }, - { "report_source.Setting", ColumnType.Text }, - { "order_data_block.RemarkJson", ColumnType.Text }, - { "order_patch_detail.BlockDetail", ColumnType.Json }, - { "order_scrap_board.OutLineJson", ColumnType.Text }, - { "simple_package.Items", ColumnType.Json }, - { "order_batch_pack_config.Setting", ColumnType.Text }, - { "machine.Settings", ColumnType.Text }, - { "sys_config.Value", ColumnType.Text }, - { "sys_config.JsonStr", ColumnType.Text }, - { "process_item_exp.ItemJson", ColumnType.Text }, - { "report_template.Template", ColumnType.Text }, - { "report_template.SourceConfig", ColumnType.Text }, - { "order_block_plan.OrderNos", ColumnType.Json }, - { "order_block_plan.BlockInfo", ColumnType.Text }, - }; -#else - // 配置列类型 + + // 配置列的类型以便于在输出时区分二进制内容 // Prod server options.ColumnTypeConfig = new Dictionary { - { "simple_plan_order.PlaceData", ColumnType.Blob }, - { "order_block_plan_result.PlaceData", ColumnType.Blob }, - { "order_box_block.Data", ColumnType.Blob }, - { "order_data_goods.ExtraProp", ColumnType.Text }, - { "order_module_extra.JsonStr", ColumnType.Text }, - { "process_info.Users", ColumnType.Text }, - { "order_process_schdule.CustomOrderNo", ColumnType.Text }, - { "order_process_schdule.OrderProcessStepName", ColumnType.Text }, - { "order_process_schdule.AreaName", ColumnType.Text }, - { "order_process_schdule.ConsigneeAddress", ColumnType.Text }, - { "order_process_schdule.ConsigneePhone", ColumnType.Text }, - { "report_source.Sql", ColumnType.Text }, - { "report_source.KeyValue", ColumnType.Text }, - { "report_source.Setting", ColumnType.Text }, - { "order_data_block.RemarkJson", ColumnType.Text }, - { "order_patch_detail.BlockDetail", ColumnType.Text }, - { "order_scrap_board.OutLineJson", ColumnType.Text }, - { "simple_package.Items", ColumnType.Text }, - { "order_batch_pack_config.Setting", ColumnType.Text }, - { "machine.Settings", ColumnType.Text }, - { "sys_config.Value", ColumnType.Text }, - { "sys_config.JsonStr", ColumnType.Text }, - { "process_item_exp.ItemJson", ColumnType.Text }, - { "report_template.Template", ColumnType.Text }, - { "report_template.SourceConfig", ColumnType.Text }, - { "order_block_plan.OrderNos", ColumnType.Text }, - { "order_block_plan.BlockInfo", ColumnType.Text }, + {"machine.Settings", ColumnType.Text}, + {"order_block_plan.BlockInfo", ColumnType.Text}, + {"order_block_plan.OrderNos", ColumnType.Json}, + {"order_block_plan_result.PlaceData", ColumnType.Blob}, + {"order_box_block.Data", ColumnType.Blob}, + {"order_data_block.RemarkJson", ColumnType.Text}, + {"order_data_goods.ExtraProp", ColumnType.Json}, + {"order_extra.ConfigJson", ColumnType.Json}, + {"order_module_extra.Data", ColumnType.Blob}, + {"order_module_extra.JsonStr", ColumnType.Text}, + {"order_patch_detail.BlockDetail", ColumnType.Json}, + {"order_process_schdule.AreaName", ColumnType.Text}, + {"order_process_schdule.ConsigneeAddress", ColumnType.Text}, + {"order_process_schdule.ConsigneePhone", ColumnType.Text}, + {"order_process_schdule.CustomOrderNo", ColumnType.Text}, + {"order_process_schdule.OrderProcessStepName", ColumnType.Text}, + {"order_scrap_board.OutLineJson", ColumnType.Text}, + {"order_wave_group.ConfigJson", ColumnType.Json}, + {"process_info.Users", ColumnType.Text}, + {"process_item_exp.ItemJson", ColumnType.Text}, + {"report_template.SourceConfig", ColumnType.Text}, + {"report_template.Template", ColumnType.Text}, + {"simple_package.Items", ColumnType.Json}, + {"sys_config.JsonStr", ColumnType.Text}, + {"sys_config.Value", ColumnType.Text} + }; + + options.OutputFinished += ctx => + { + var seq = ctx.Serivces.GetRequiredService(); + seq.ApplyToDatabaseAsync().GetAwaiter().GetResult(); }; -#endif }); host.Services.AddLogging(builder => @@ -646,6 +360,7 @@ async Task RunProgram() host.Services.AddDataSourceFactory(); host.Services.AddErrorRecorderFactory(); host.Services.AddSingleton(); + host.Services.AddSingleton(); var prodLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue("ProducerQueueLength"); var consLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue("ConsumerQueueLength"); var maxCharCount = host.Configuration.GetRequiredSection("RecordQueue").GetValue("MaxByteCount") / 2; diff --git a/MesETL.App/Services/ETL/MySqlDestination.cs b/MesETL.App/Services/ETL/MySqlDestination.cs index 7051949..6972238 100644 --- a/MesETL.App/Services/ETL/MySqlDestination.cs +++ b/MesETL.App/Services/ETL/MySqlDestination.cs @@ -15,6 +15,9 @@ namespace MesETL.App.Services.ETL; /// public partial class MySqlDestination : IDisposable, IAsyncDisposable { + /// + /// table => records + /// private readonly Dictionary> _recordCache; private readonly MySqlConnection _conn; private readonly ILogger _logger; @@ -66,8 +69,8 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable try { - var excuseList = GetExcuseList(_recordCache, maxAllowPacket); - foreach (var insertSql in excuseList) + var executionList = GetExecutionList(_recordCache, maxAllowPacket); + foreach (var insertSql in executionList) { cmd.CommandText = insertSql; try @@ -103,7 +106,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable [GeneratedRegex("INSERT INTO `([^`]+)`")] private static partial Regex MatchTableName(); - public IEnumerable GetExcuseList(IDictionary> tableRecords,int maxAllowPacket) + public IEnumerable GetExecutionList(IDictionary> tableRecords, int maxAllowPacket) { var sb = new StringBuilder("SET AUTOCOMMIT = 1;\n"); var appendCount = 0; @@ -116,13 +119,16 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable StartBuild: var noCommas = true; + // 标准列顺序,插入时的字段需要按照该顺序排列 + var headers = records[0].Headers; + // INSERT INTO ... VALUES >>> sb.Append($"INSERT INTO `{tableName}`("); - for (var i = 0; i < records[0].Headers.Count; i++) + for (var i = 0; i < headers.Count; i++) { var header = records[0].Headers[i]; sb.Append($"`{header}`"); - if (i != records[0].Headers.Count - 1) + if (i != headers.Count - 1) sb.Append(','); } @@ -132,11 +138,20 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable for (;recordIdx < records.Count; recordIdx++) { var record = records[recordIdx]; + + // 数据列校验 + if (record.Headers.Count != headers.Count) + { + throw new InvalidOperationException($"数据异常,数据列数量出现冲突,表名:{tableName}"); + } + var recordSb = new StringBuilder(); recordSb.Append('('); - for (var fieldIdx = 0; fieldIdx < record.Fields.Count; fieldIdx++) + for (var idx = 0; idx < headers.Count; idx++) { - var field = record.Fields[fieldIdx]; + var header = headers[idx]; + // TODO: 可进行性能优化 + var field = record[header]; // 在这里处理特殊列 #region HandleFields @@ -147,7 +162,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable goto Escape; } - switch (_options.Value.GetColumnType(record.TableName, record.Headers[fieldIdx])) + switch (_options.Value.GetColumnType(record.TableName, header)) { case ColumnType.Text: if(string.IsNullOrEmpty(field)) @@ -163,12 +178,12 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable recordSb.Append(ConstVar.Null); else recordSb.Append($"0x{field}"); break; - case ColumnType.Json:// 生产库没有JSON列,仅用于测试库进行测试 - if(string.IsNullOrEmpty(field)) - recordSb.Append("'[]'"); // JObject or JArray? + case ColumnType.Json: // Mydumper v0.16.7-5导出的Json为字符串,且会将逗号转义,需要适配 + if(string.IsNullOrEmpty(field)) + recordSb.Append(ConstVar.Null); else if (_options.Value.TreatJsonAsHex) recordSb.Append($"_utf8mb4 0x{field}"); - else recordSb.AppendLine(field); + else recordSb.AppendLine(field.Replace("\\,", ",")); break; case ColumnType.UnDefine: default: @@ -179,7 +194,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable Escape: #endregion - if (fieldIdx != record.Fields.Count - 1) + if (idx != headers.Count - 1) recordSb.Append(','); } diff --git a/MesETL.App/Services/Loggers/LoggerTaskMonitorLogger.cs b/MesETL.App/Services/Loggers/LoggerTaskMonitorLogger.cs index 9af02f3..e386f0e 100644 --- a/MesETL.App/Services/Loggers/LoggerTaskMonitorLogger.cs +++ b/MesETL.App/Services/Loggers/LoggerTaskMonitorLogger.cs @@ -17,7 +17,7 @@ public class LoggerTaskMonitorLogger : ITaskMonitorLogger var sb = new StringBuilder(); sb.Append($"{name}: {{"); sb.AppendJoin(',', properties.Select((pair, i) => $" {pair.Key}: {pair.Value}")); - sb.Append('}'); + sb.Append([' ', '}']); // var args = new List { name }; // properties.Aggregate(args, (args, pair) => // { diff --git a/MesETL.App/Services/ProcessContext.cs b/MesETL.App/Services/ProcessContext.cs index 5ca8ab6..80cfb34 100644 --- a/MesETL.App/Services/ProcessContext.cs +++ b/MesETL.App/Services/ProcessContext.cs @@ -35,6 +35,8 @@ public class ProcessContext set => Interlocked.Exchange(ref _outputCount, value); } + public long MaxMemoryUsage { get; set; } + // TableName -> Count public IReadOnlyDictionary TableProgress => _tableProgress; diff --git a/MesETL.App/Services/Seq/SeqConfig.cs b/MesETL.App/Services/Seq/SeqConfig.cs new file mode 100644 index 0000000..6ae5d67 --- /dev/null +++ b/MesETL.App/Services/Seq/SeqConfig.cs @@ -0,0 +1,42 @@ +// ReSharper disable InconsistentNaming +namespace MesETL.App.Services.Seq; + +public class SeqConfig(string Name, bool Recycle = true, int Step = 1, long Max = 999_999_999) +{ + public string Name { get; init; } = Name; + public bool Recycle { get; init; } = Recycle; + public int Step { get; init; } = Step; + public long Max { get; init; } = Max; + + public static readonly SeqConfig ItemNo = new("seq_ItemNo", true, 1, 999_999_999); + public static readonly SeqConfig OrderModuleID = new("seq_order_module_id", false); + public static readonly SeqConfig OrderDataID = new("seq_order_data_id", false); + public static readonly SeqConfig OrderItemID = new("seq_order_item", false); + public static readonly SeqConfig ProcessStepID = new("seq_step_id", false); + public static readonly SeqConfig PackageNo = new("seq_pack_no", true, 1, 9_999_999); + public static readonly SeqConfig PlanNo = new("seq_plan_order", true, 1, 999_999); + public static readonly SeqConfig SimplePlanNo = new("seq_simple_plan_order", true, 1, 999_999); + + // 下面这些类型的流水号在BaseService添加实体时进行生成 + public static readonly SeqConfig MachineID = new("seq_machine_id", false); + public static readonly SeqConfig OrderBlockPlanID = new("seq_order_block_plan_id", false); + public static readonly SeqConfig OrderDataGoodsID = new("seq_order_data_goods_id", false); + public static readonly SeqConfig OrderPackageID = new("seq_order_pack_id", false); + public static readonly SeqConfig OrderProcessID = new("seq_order_process_id", false); + public static readonly SeqConfig OrderProcessStepItemID = new("seq_order_process_step_item_id", false); + public static readonly SeqConfig ProcessGroupID = new("seq_process_group_id", false); + public static readonly SeqConfig ProcessInfoID = new("seq_process_info_id", false); + public static readonly SeqConfig ProcessItemExpID = new("seq_process_item_exp_id", false); + public static readonly SeqConfig ProcessScheduleCapacityID = new("seq_process_schedule_capacity_id", false); + public static readonly SeqConfig ProcessStepEfficiencyID = new("seq_process_step_efficiency_id", false); + public static readonly SeqConfig ReportTemplateID = new("seq_report_template_id", false); + public static readonly SeqConfig SysConfigKey = new("seq_sys_config_key", false); + public static readonly SeqConfig WorkCalendarID = new("seq_work_calendar_id", false); + public static readonly SeqConfig WorkShiftID = new("seq_work_shift_id", false); + public static readonly SeqConfig WorkTimeID = new("seq_work_time_id", false); + public static readonly SeqConfig OrderPatchDetailID = new("seq_order_patch_detail_id", false); + public static readonly SeqConfig OrderModuleExtraID = new("seq_order_module_extra_id", false); + public static readonly SeqConfig SimplePackageID = new("seq_simple_pack_id", false); + public static readonly SeqConfig OrderModuleItemID = new("seq_order_module_item_id", false); + public static readonly SeqConfig OrderWaveGroupID = new("seq_order_wave_group_id", false); +} \ No newline at end of file diff --git a/MesETL.App/Services/Seq/SeqService.cs b/MesETL.App/Services/Seq/SeqService.cs new file mode 100644 index 0000000..42da8fe --- /dev/null +++ b/MesETL.App/Services/Seq/SeqService.cs @@ -0,0 +1,134 @@ +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Text; +using MesETL.App.Options; +using MesETL.Shared.Helper; +using Microsoft.Extensions.Options; +using MySqlConnector; + +namespace MesETL.App.Services.Seq; + +public class SeqService +{ + private readonly string _connectionString; + private readonly Dictionary _cachedSequence; + + public IReadOnlyDictionary CachedSequence => _cachedSequence; + + public SeqService(IOptions options) + { + var connStr = options.Value.ConnectionString ?? throw new ApplicationException("未配置输出数据库连接字符串"); + var builder = new MySqlConnectionStringBuilder(connStr) + { + Database = "mes_global" + }; + _connectionString = builder.ConnectionString; + + _cachedSequence = new Dictionary(); + } + + private async Task UpdateSequenceID(string name,int step,long max,bool recycle, int add) + { + var sql = new StringBuilder( + $""" + INSERT INTO seq (SeqName,CurrentVal,Increment,MinVal,MaxVal,UpdateTime) + VALUES ({name},{add},{step},1,{max},NOW()) + ON DUPLICATE KEY UPDATE UpdateTime = NOW(), + """); + if (recycle) + { + sql.Append($"CurrentVal = (@updatedVal := IF(CurrentVal + {add} >= MaxVal, {add}, CurrentVal + {add}));"); + } + else + { + sql.Append($"CurrentVal = (@updatedVal := CurrentVal + {add});"); + } + sql.Append("SELECT @updatedVal;"); + var result = await DatabaseHelper.QueryScalarAsync(_connectionString, sql.ToString()); + return Convert.ToInt64(result); + } + + public async Task PeekKey(SeqConfig config) + { + var sql = $"SELECT CurrentVal FROM seq WHERE SeqName = '{config.Name}' LIMIT 1;"; + return Convert.ToInt64(await DatabaseHelper.QueryScalarAsync(_connectionString, sql)); + } + + public async Task GetKeys(SeqConfig config, int count) + { + if (count < 1) return []; + + var list = new long[count]; + var add = config.Step * count; + var lastId = await UpdateSequenceID(config.Name, config.Step, config.Max, config.Recycle, add); + var step = Convert.ToInt64(config.Step); + for (var i = count - 1; i > -1; i--) + { + list[i] = lastId; + lastId -= step; + } + + return list; + } + + /// + /// 添加并取得一个缓存的流水号 + /// + /// + /// + public long AddCachedSeq(SeqConfig config) + { + if (!_cachedSequence.TryGetValue(config, out var val)) + { + var seq = PeekKey(config).GetAwaiter().GetResult(); + val = seq; + _cachedSequence[config] = val; + } + + var step = config.Step; + if (config.Recycle) + { + val = val + step >= config.Max ? val : val + step; + } + else val += step; + _cachedSequence[config] = val; + return val; + } + + /// + /// 移除一个缓存的流水号 + /// + /// + public void RemoveCachedSeq(SeqConfig config) + { + _cachedSequence.Remove(config); + } + + /// + /// 清空所有缓存的流水号 + /// + public void ClearCache() + { + _cachedSequence.Clear(); + } + + /// + /// 将缓存的流水号应用至数据库 + /// + public async Task ApplyToDatabaseAsync() + { + var sql = GenerateCachedSeqSql(); + await DatabaseHelper.NonQueryAsync(_connectionString, sql); + } + + private string GenerateCachedSeqSql() + { + var sb = new StringBuilder(); + foreach (var kv in _cachedSequence) + { + sb.AppendLine($"UPDATE seq SET CurrentVal = {kv.Value} WHERE SeqName = '{kv.Key.Name}';"); + } + + return sb.ToString(); + } +} \ No newline at end of file diff --git a/MesETL.App/appsettings.json b/MesETL.App/appsettings.json index afe2598..40ecb6b 100644 --- a/MesETL.App/appsettings.json +++ b/MesETL.App/appsettings.json @@ -8,29 +8,28 @@ } }, "Input":{ - "InputDir": "D:\\Dump\\NewMockData", // Csv数据输入目录 + "InputDir": "D:\\Data\\DatabaseDump\\MyDumper-ZST 2024-12-3", // Csv数据输入目录 "UseMock": false, // 使用模拟数据进行测试 "MockCountMultiplier": 1, // 模拟数据量级的乘数 - "TableOrder": ["order", "order_data_parts"], // 按顺序输入的表 +// "TableOrder": ["order_extra"], // 按顺序输入的表 "TableIgnoreList": [] // 忽略输入的表 }, "Transform":{ - "StrictMode": false, // 设为true时如果数据转换发生错误,立刻停止程序 + "StrictMode": true, // 设为true时如果数据转换发生错误,立刻停止程序 "EnableFilter": true, // 启用数据过滤 "EnableReplacer": true, // 启用数据修改 "EnableReBuilder": true, // 启用数据重建 - "CleanDate": "202301" // 当数据过滤开启时,删除这个时间之前的数据 + "CleanDate": "202401" // 当数据过滤开启时,删除这个时间之前的数据 }, "Output":{ - "ConnectionString": "Server=127.0.0.1;Port=3306;UserId=root;Password=cfmes123456;", // 要分库,不用加'Database='了 + "ConnectionString": "Server=127.0.0.1;Port=3306;UserId=root;Password=123456;", // 要分库,不用加'Database='了 "MaxAllowedPacket": 67108864, "FlushCount": 10000, // 每次提交记录条数 "MaxDatabaseOutputTask" : 4, // 每个数据库最大提交任务数 "TreatJsonAsHex": false, // 将json列作为16进制格式输出(0x前缀),生产库是没有json列的 - "NoOutput": ["order"], + "NoOutput": [], // 不输出的表 "ForUpdate": { - "order_data_parts": "CompanyID = new.CompanyID" } }, "RecordQueue":{ @@ -54,10 +53,11 @@ }, "prod":{ "mesdb_1": 5000, - "mesdb_2": 10000, - "mesdb_3": 15000, - "mesdb_4": 20000, - "mesdb_5": 2147483647 + "mesdb_2": 7500, + "mesdb_3": 10000, + "mesdb_4": 15000, + "mesdb_5": 20000, + "mesdb_6": 2147483647 } } } diff --git a/MesETL.Shared/Helper/DatabaseHelper.cs b/MesETL.Shared/Helper/DatabaseHelper.cs index 6abf1d4..f23ede9 100644 --- a/MesETL.Shared/Helper/DatabaseHelper.cs +++ b/MesETL.Shared/Helper/DatabaseHelper.cs @@ -5,6 +5,11 @@ namespace MesETL.Shared.Helper; public static class DatabaseHelper { + /// + /// 创建一个MySql连接 + /// + /// + /// public static MySqlConnection CreateConnection(string connStr) { var newConnStr = new MySqlConnectionStringBuilder(connStr) @@ -15,6 +20,13 @@ public static class DatabaseHelper return new MySqlConnection(newConnStr); } + /// + /// 使用语句查询数据库 + /// + /// + /// + /// + /// public static async Task QueryTableAsync(string connStr, string sql, CancellationToken ct = default) { await using var conn = CreateConnection(connStr); @@ -27,6 +39,13 @@ public static class DatabaseHelper return ds; } + /// + /// 使用语句进行标量查询 + /// + /// + /// + /// + /// public static async Task QueryScalarAsync(string connStr, string sql, CancellationToken ct = default) { await using var conn = CreateConnection(connStr); @@ -37,6 +56,13 @@ public static class DatabaseHelper return await cmd.ExecuteScalarAsync(ct); } + /// + /// 执行非查询语句 + /// + /// + /// + /// + /// public static async Task NonQueryAsync(string connStr, string sql, CancellationToken ct = default) { await using var conn = CreateConnection(connStr); @@ -47,6 +73,13 @@ public static class DatabaseHelper return await cmd.ExecuteNonQueryAsync(ct); } + /// + /// 在事务中执行语句 + /// + /// + /// + /// + /// public static async Task TransactionAsync(string connStr, string sql, params MySqlParameter[] parameters) { await using var conn = CreateConnection(connStr); diff --git a/MesETL.Shared/Helper/Extensions.Lang.cs b/MesETL.Shared/Helper/Extensions.Lang.cs new file mode 100644 index 0000000..65e3a8e --- /dev/null +++ b/MesETL.Shared/Helper/Extensions.Lang.cs @@ -0,0 +1,63 @@ +namespace Azusa.Shared.Extensions; + +/// +/// 使用Range作为参数的迭代器方法 +///
+/// 扩展foreach关键字来实现类似foreach (var i in 1..5)的效果 +///
+public static class ForeachExtensions +{ + /// + /// 拓展Range结构实现GetEnumerator方法供foreach读取,实现foreach(var i in x..y) + /// + /// + public static CustomIntEnumerator GetEnumerator(this Range range) + { + return new CustomIntEnumerator(range); + } + + /// + /// 拓展int类实现GetEnumerator方法供foreach读取,实现foreach(var i in x) + /// + /// + /// + public static CustomIntEnumerator GetEnumerator(this int end) + { + return new CustomIntEnumerator(end); + return new CustomIntEnumerator(new Range(0, end));//在执行空函数时性能比上一句低10倍,Why + } +} + +//使用引用结构体增强性能 +public ref struct CustomIntEnumerator +{ + private int _current; + private readonly int _end; + + public CustomIntEnumerator(Range range) + { + //避免某些时候从结尾开始编制 + // x.. 时会产生Range(x,^0) + if (range.End.IsFromEnd) + { + throw new NotSupportedException("不支持从结尾编制索引"); + } + _current = range.Start.Value - 1; + _end = range.End.Value - 1;//迭代器不包含范围的尾部 + } + + public CustomIntEnumerator(int end) + { + _current = -1; + _end = end; + } + + /* 注意,供foeach使用的迭代器不需要实现IEnumerator接口,只需要提供Current属性以及MoveNext方法即可,*/ + public int Current => _current; + + public bool MoveNext() + { + _current++; + return _current <= _end; + } +} \ No newline at end of file diff --git a/MesETL.Shared/Helper/Helper.Compress.cs b/MesETL.Shared/Helper/Helper.Compress.cs new file mode 100644 index 0000000..e3ede29 --- /dev/null +++ b/MesETL.Shared/Helper/Helper.Compress.cs @@ -0,0 +1,22 @@ +using System.IO.Compression; + +namespace MesETL.Shared.Helper; + +public class CompressHelper +{ + /// + /// + /// + /// + /// + public static byte[] CompressDeflate(byte[] data) + { + using var src = new MemoryStream(data); + + using var outStream = new MemoryStream(); + using var gzip = new DeflateStream(outStream, CompressionMode.Compress); + src.CopyTo(gzip); + gzip.Flush(); + return outStream.ToArray(); + } +} \ No newline at end of file diff --git a/MesETL.Test/DatabaseToolBox.cs b/MesETL.Test/DatabaseToolBox.cs index 9803595..b82e2a0 100644 --- a/MesETL.Test/DatabaseToolBox.cs +++ b/MesETL.Test/DatabaseToolBox.cs @@ -12,7 +12,7 @@ namespace TestProject1; public class DatabaseToolBox { private readonly ITestOutputHelper _output; - public const string ConnStr = "Server=127.0.0.1;Port=3306;UserId=root;Password=cfmes123456;"; + public const string ConnStr = "Server=127.0.0.1;Port=3306;UserId=root;Password=123456;"; public DatabaseToolBox(ITestOutputHelper output) { @@ -170,4 +170,30 @@ public class DatabaseToolBox await DatabaseHelper.NonQueryAsync(ConnStr, sb.ToString()); _output.WriteLine($"Dropped {indexes.Length} indexes from {database}"); } + + [Theory] + [InlineData("mesdb_1")] + [InlineData("mesdb_2")] + [InlineData("mesdb_3")] + [InlineData("mesdb_4")] + [InlineData("mesdb_5")] + [InlineData("mesdb_6")] + public async Task TruncateAllTable(string database) + { + var tables = await DatabaseHelper.QueryTableAsync(ConnStr, + $""" + SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{database}'; + """); + var sb = new StringBuilder(); + sb.AppendLine($"USE `{database}`;"); + foreach (DataRow row in tables.Tables[0].Rows) + { + var tableName = row["TABLE_NAME"].ToString(); + var sql = $""" + TRUNCATE TABLE `{tableName}`; + """; + sb.AppendLine(sql); + } + await DatabaseHelper.NonQueryAsync(ConnStr, sb.ToString()); + } } \ No newline at end of file diff --git a/MesETL.Test/InputServiceTest.cs b/MesETL.Test/InputServiceTest.cs new file mode 100644 index 0000000..eba60fc --- /dev/null +++ b/MesETL.Test/InputServiceTest.cs @@ -0,0 +1,54 @@ +using MesETL.App.Helpers; +using MesETL.App.HostedServices; +using MesETL.App.Options; +using MesETL.App.Services; +using MesETL.App.Services.ETL; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Options; +using TestProject1.XUnit; +using Xunit.Abstractions; + +namespace TestProject1; + +public class InputServiceTest : TestBase +{ + private readonly ITestOutputHelper _output; + + public InputServiceTest(ITestOutputHelper output) : base(output) + { + _output = output; + } + + /// + /// 测试文件输入服务是否能正确的认到应有的文件 + /// + /// + /// + /// + /// + [Theory] + [InlineData(@"D:\Data\DatabaseDump\MyDumper-ZST 2024-12-3", null, new string[0], 152)] // 没有seq和三个库的efmigration + [InlineData(@"D:\Data\DatabaseDump\MyDumper-ZST 2024-12-3", new[] { "order", "machine" }, new string[0], + 11)] // 只有order和machine + [InlineData(@"D:\Data\DatabaseDump\MyDumper-ZST 2024-12-3", null, new[] { "order", "machine" }, + 152 - 11)] // 忽略order和machine + public void Test_InputInfo_Get_And_Order(string inputDir, string[]? tableOrder, string[] ignored, int assertCount) + { + var options = new OptionsWrapper(new DataInputOptions() + { + InputDir = inputDir, + FileInputMetaBuilder = DumpDataHelper.MyDumperFileInputMetaBuilder, + TableOrder = tableOrder, + TableIgnoreList = ignored + }); + var ctx = new ProcessContext(); + var queue = new DataRecordQueue(); + var dataReaderFactory = new DataReaderFactory(CreateXUnitLogger(), options); + var sut = new FileInputService(CreateXUnitLogger(), options, ctx, queue, dataReaderFactory, + new ConfigurationManager()); + + var result = sut.GetOrderedInputInfo(inputDir).ToArray(); + WriteJson(result); + Assert.True(assertCount == result.Length); + } +} \ No newline at end of file diff --git a/MesETL.Test/MesETL.Test.csproj b/MesETL.Test/MesETL.Test.csproj index 3adba57..383e5f1 100644 --- a/MesETL.Test/MesETL.Test.csproj +++ b/MesETL.Test/MesETL.Test.csproj @@ -12,6 +12,7 @@ + runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/MesETL.Test/Services/SeqServiceTests.cs b/MesETL.Test/Services/SeqServiceTests.cs new file mode 100644 index 0000000..5aa4d93 --- /dev/null +++ b/MesETL.Test/Services/SeqServiceTests.cs @@ -0,0 +1,33 @@ +using System.Reflection; +using Azusa.Shared.Extensions; +using MesETL.App.Options; +using MesETL.App.Services.Seq; +using Microsoft.Extensions.Options; +using TestProject1.XUnit; +using Xunit.Abstractions; + +namespace TestProject1.Services; + +public class SeqServiceTests : TestBase +{ + public SeqServiceTests(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public void Test_Sequence_Sql_Generation() + { + var sut = new SeqService(new OptionsWrapper(new DatabaseOutputOptions() + { + ConnectionString = "Server=127.0.0.1;Port=3306;UserId=root;Password=123456;" + })); + + foreach (var i in 10) + { + Write("Seq: " + sut.AddCachedSeq(SeqConfig.OrderWaveGroupID)); + } + + var sql = typeof(SeqService).GetMethod("GenerateCachedSeqSql", BindingFlags.Instance | BindingFlags.NonPublic)!.Invoke(sut, []); + Write(sql ?? "null"); + } +} \ No newline at end of file diff --git a/MesETL.Test/XUnit/Configuration/XUnitConfiguration.cs b/MesETL.Test/XUnit/Configuration/XUnitConfiguration.cs new file mode 100644 index 0000000..50f4a74 --- /dev/null +++ b/MesETL.Test/XUnit/Configuration/XUnitConfiguration.cs @@ -0,0 +1,16 @@ +using Microsoft.Extensions.Configuration; + +namespace TestProject1.XUnit.Configuration; + +public static class XUnitConfiguration +{ + public static IConfiguration Configuration { get; } + + static XUnitConfiguration() + { + Configuration = new ConfigurationBuilder() + .SetBasePath(AppContext.BaseDirectory) + .AddJsonFile("appsettings.json", false, true) + .Build(); + } +} \ No newline at end of file diff --git a/MesETL.Test/XUnit/Logging/XUnitLogger.cs b/MesETL.Test/XUnit/Logging/XUnitLogger.cs new file mode 100644 index 0000000..73426e0 --- /dev/null +++ b/MesETL.Test/XUnit/Logging/XUnitLogger.cs @@ -0,0 +1,39 @@ +using Microsoft.Extensions.Logging; +using Xunit.Abstractions; + +namespace TestProject1.XUnit.Logging; + +/// +/// 适用于Xunit的日志记录器,使用ITestOutputHelper输出 +/// +public class XunitLogger : ILogger +{ + private readonly ITestOutputHelper _testOutputHelper; + private readonly string _categoryName; + + public XunitLogger(ITestOutputHelper testOutputHelper, string categoryName) + { + _testOutputHelper = testOutputHelper; + _categoryName = categoryName; + } + + public IDisposable? BeginScope(TState state) where TState : notnull + => NoopDisposable.Instance; + + public bool IsEnabled(LogLevel logLevel) + => true; + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, + Func formatter) + { + _testOutputHelper.WriteLine($"{_categoryName} [{eventId}] {formatter(state, exception)}"); + if (exception != null) + _testOutputHelper.WriteLine(exception.ToString()); + } + + private class NoopDisposable : IDisposable + { + public static readonly NoopDisposable Instance = new(); + public void Dispose() { } + } +} \ No newline at end of file diff --git a/MesETL.Test/XUnit/Logging/XUnitLoggerExtensions.cs b/MesETL.Test/XUnit/Logging/XUnitLoggerExtensions.cs new file mode 100644 index 0000000..a4d2c1e --- /dev/null +++ b/MesETL.Test/XUnit/Logging/XUnitLoggerExtensions.cs @@ -0,0 +1,13 @@ +using Microsoft.Extensions.Logging; +using Xunit.Abstractions; + +namespace TestProject1.XUnit.Logging; + +internal static class XUnitLoggerExtensions +{ + public static ILoggingBuilder AddXUnitLogger(this ILoggingBuilder builder, ITestOutputHelper output) + { + builder.AddProvider(new XunitLoggerProvider(output)); + return builder; + } +} \ No newline at end of file diff --git a/MesETL.Test/XUnit/Logging/XUnitLoggerProvider.cs b/MesETL.Test/XUnit/Logging/XUnitLoggerProvider.cs new file mode 100644 index 0000000..afa8682 --- /dev/null +++ b/MesETL.Test/XUnit/Logging/XUnitLoggerProvider.cs @@ -0,0 +1,20 @@ +using Microsoft.Extensions.Logging; +using Xunit.Abstractions; + +namespace TestProject1.XUnit.Logging; + +public class XunitLoggerProvider : ILoggerProvider +{ + private readonly ITestOutputHelper _testOutputHelper; + + public XunitLoggerProvider(ITestOutputHelper testOutputHelper) + { + _testOutputHelper = testOutputHelper; + } + + public ILogger CreateLogger(string categoryName) + => new XunitLogger(_testOutputHelper, categoryName); + + public void Dispose() + { } +} \ No newline at end of file diff --git a/MesETL.Test/XUnit/TestBase.cs b/MesETL.Test/XUnit/TestBase.cs new file mode 100644 index 0000000..ef8fcfe --- /dev/null +++ b/MesETL.Test/XUnit/TestBase.cs @@ -0,0 +1,38 @@ +using System.Text.Json; +using Microsoft.Extensions.Logging; +using Serilog; +using TestProject1.XUnit.Logging; +using Xunit.Abstractions; + +namespace TestProject1.XUnit; + +public class TestBase +{ + private readonly LoggerFactory _loggerFactory; + protected readonly ITestOutputHelper Output; + + private readonly JsonSerializerOptions _jsonSerializerOptions = + new(JsonSerializerDefaults.Web) { WriteIndented = true }; + + public TestBase(ITestOutputHelper output) + { + Output = output; + Console.SetOut(new XUnitConsoleWriter(output)); + _loggerFactory = new LoggerFactory([new XunitLoggerProvider(Output)]); + } + + protected void Write(object obj) + { + Output.WriteLine(obj.ToString()); + } + + protected void WriteJson(T obj) + { + Console.WriteLine(JsonSerializer.Serialize(obj, _jsonSerializerOptions)); + } + + protected ILogger CreateXUnitLogger() + { + return _loggerFactory.CreateLogger(); + } +} \ No newline at end of file diff --git a/MesETL.Test/XUnit/XUnitConsoleWriter.cs b/MesETL.Test/XUnit/XUnitConsoleWriter.cs new file mode 100644 index 0000000..bb1f09f --- /dev/null +++ b/MesETL.Test/XUnit/XUnitConsoleWriter.cs @@ -0,0 +1,18 @@ +using Xunit.Abstractions; + +namespace TestProject1.XUnit; + +public class XUnitConsoleWriter : StringWriter +{ + private ITestOutputHelper output; + + public XUnitConsoleWriter(ITestOutputHelper output) + { + this.output = output; + } + + public override void WriteLine(string? m) + { + output.WriteLine(m); + } +} \ No newline at end of file