diff --git a/MesETL.App/HostedServices/FileInputService.cs b/MesETL.App/HostedServices/FileInputService.cs index b45f904..4c973b6 100644 --- a/MesETL.App/HostedServices/FileInputService.cs +++ b/MesETL.App/HostedServices/FileInputService.cs @@ -2,6 +2,7 @@ using MesETL.App.Options; using MesETL.App.Services; using MesETL.App.Services.ETL; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -32,18 +33,21 @@ public class FileInputService : IInputService private readonly IOptions _dataInputOptions; private readonly ProcessContext _context; private readonly DataReaderFactory _dataReaderFactory; + private readonly long _memoryThreshold; public FileInputService(ILogger logger, IOptions dataInputOptions, ProcessContext context, [FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue, - DataReaderFactory dataReaderFactory) + DataReaderFactory dataReaderFactory, + IConfiguration configuration) { _logger = logger; _dataInputOptions = dataInputOptions; _context = context; _producerQueue = producerQueue; _dataReaderFactory = dataReaderFactory; + _memoryThreshold = (long)(configuration.GetValue("MemoryThreshold", 8) * 1024 * 1024 * 1024); } public async Task ExecuteAsync(CancellationToken cancellationToken) @@ -73,6 +77,12 @@ public class FileInputService : IInputService while (await source.ReadAsync()) { + if (GC.GetTotalMemory(false) > _memoryThreshold) + { + _logger.LogWarning("内存过高,暂缓输入"); + GC.Collect(); + await Task.Delay(3000, cancellationToken); + } var record = source.Current; await _producerQueue.EnqueueAsync(record); count++; diff --git a/MesETL.App/HostedServices/MainHostedService.cs b/MesETL.App/HostedServices/MainHostedService.cs index 40660d4..233a68c 100644 --- a/MesETL.App/HostedServices/MainHostedService.cs +++ b/MesETL.App/HostedServices/MainHostedService.cs @@ -61,7 +61,9 @@ public class MainHostedService : BackgroundService } _stopwatch = Stopwatch.StartNew(); - await SetVariableAsync(); // 开启延迟写入,禁用重做日志 >>> 重做日志处于禁用状态时不要关闭数据库服务! + var enableUnsafeVar = _config.GetValue("UnsafeVariable", false); + if (enableUnsafeVar) + await SetVariableAsync(); // 开启延迟写入,禁用重做日志 >>> 重做日志处于禁用状态时不要关闭数据库服务! var monitorTask = Task.Run(async () => await _taskMonitor.Monitor(stoppingToken), stoppingToken); var inputTask = ExecuteAndCatch( @@ -77,7 +79,8 @@ public class MainHostedService : BackgroundService _logger.LogInformation("***** ElapseTime: {Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3")); await Task.Delay(5000, stoppingToken); - await SetVariableAsync(false); // 关闭延迟写入,开启重做日志 + if(enableUnsafeVar) + await SetVariableAsync(false); // 关闭延迟写入,开启重做日志 if (!stoppingToken.IsCancellationRequested) { await ExportResultAsync(); @@ -167,7 +170,7 @@ public class MainHostedService : BackgroundService sb.AppendLine("\n---\n"); sb.AppendLine("## Table Output Progress"); var tableOutputProgress = _context.TableProgress.Select(pair => - new { Table = pair.Key, Count = pair.Value }); + new { Table = pair.Key, Count = pair.Value }).OrderBy(s => s.Table); sb.AppendLine(tableOutputProgress.ToMarkdownTable()); sb.AppendLine("\n---\n"); sb.AppendLine("## Result"); diff --git a/MesETL.App/HostedServices/OutputService.cs b/MesETL.App/HostedServices/OutputService.cs index bff29fc..101b516 100644 --- a/MesETL.App/HostedServices/OutputService.cs +++ b/MesETL.App/HostedServices/OutputService.cs @@ -1,4 +1,5 @@ -using MesETL.App.Helpers; +using System.Buffers; +using MesETL.App.Helpers; using MesETL.App.HostedServices.Abstractions; using MesETL.App.Options; using MesETL.App.Services; @@ -65,13 +66,14 @@ public class OutputService : IOutputService { _logger.LogInformation("*****开启输出线程,数据库: {db} *****", db); var taskManager = new TaskManager(_outputOptions.Value.MaxDatabaseOutputTask); + var ignoreOutput = new HashSet(_outputOptions.Value.NoOutput); var tmp = new List(); while (!_context.IsTransformCompleted || queue.Count > 0) { if (ct.IsCancellationRequested) break; - if (!queue.TryDequeue(out var record)) continue; + if (!queue.TryDequeue(out var record) || ignoreOutput.Contains(record.TableName)) continue; var dbName = record.Database ?? throw new ApplicationException("输出的记录缺少数据库名"); if(dbName != db) diff --git a/MesETL.App/HostedServices/TaskMonitorService.cs b/MesETL.App/HostedServices/TaskMonitorService.cs index 85bb7a3..88d99bd 100644 --- a/MesETL.App/HostedServices/TaskMonitorService.cs +++ b/MesETL.App/HostedServices/TaskMonitorService.cs @@ -2,7 +2,6 @@ using System.Text; using MesETL.App.Services; using MesETL.App.Services.Loggers; -using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; namespace MesETL.App.HostedServices; @@ -16,32 +15,24 @@ public class TaskMonitorService private readonly ProcessContext _context; private readonly DataRecordQueue _producerQueue; private readonly RecordQueuePool _queuePool; - private readonly IConfiguration _configuration; private string _outputPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Log/progress.txt"); - private readonly int _gcInterval; - public TaskMonitorService(ProcessContext context, [FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue, RecordQueuePool queuePool, - IEnumerable monitorLoggers, - IConfiguration configuration) + IEnumerable monitorLoggers) { _context = context; _producerQueue = producerQueue; _queuePool = queuePool; _monitorLoggers = monitorLoggers; - _configuration = configuration; - - _gcInterval = _configuration.GetValue("GCIntervalMilliseconds)"); } public async Task Monitor(CancellationToken stoppingToken) { var sw = Stopwatch.StartNew(); - var lastGCTime = sw.ElapsedMilliseconds; var lastTime = sw.ElapsedMilliseconds; var lastInputCount = _context.InputCount; var lastTransformCount = _context.TransformCount; @@ -83,12 +74,6 @@ public class TaskMonitorService var inputSpeed = (inputCount - lastInputCount) / elapseTime; var transformSpeed = (transformCount - lastTransformCount) / elapseTime; var outputSpeed = (outputCount - lastOutputCount) / elapseTime; - - if(_gcInterval > 0 && time - lastGCTime > _gcInterval) - { - GC.Collect(); - lastGCTime = time; - } // _logger.LogInformation( // "Task monitor: running: {Running}, error: {Error}, completed: {Completed}, canceled: {Canceled}, outputSpeed: {Speed} records/s", diff --git a/MesETL.App/Options/DatabaseOutputOptions.cs b/MesETL.App/Options/DatabaseOutputOptions.cs index e59dc76..d2a9714 100644 --- a/MesETL.App/Options/DatabaseOutputOptions.cs +++ b/MesETL.App/Options/DatabaseOutputOptions.cs @@ -4,14 +4,17 @@ public class DatabaseOutputOptions { public string? ConnectionString { get; set; } - public int MaxAllowedPacket { get; set; } = 64 * 1024 * 1024; + public int MaxAllowedPacket { get; set; } = 32 * 1024 * 1024; public int FlushCount { get; set; } = 10000; public int MaxDatabaseOutputTask { get; set; } = 4; public bool TreatJsonAsHex { get; set; } = true; + + public string[] NoOutput { get; set; } = []; + public Dictionary? ForUpdate { get; set; } /// /// 配置导入数据的特殊列 @@ -22,4 +25,12 @@ public class DatabaseOutputOptions { return ColumnTypeConfig.GetValueOrDefault($"{table}.{column}", ColumnType.UnDefine); } + + public bool TryGetForUpdate(string table, out string? forUpdate) + { + forUpdate = null; + if (ForUpdate is null || !ForUpdate.TryGetValue(table, out forUpdate)) + return false; + return true; + } } \ No newline at end of file diff --git a/MesETL.App/Program.cs b/MesETL.App/Program.cs index dd93ad4..ee25d1c 100644 --- a/MesETL.App/Program.cs +++ b/MesETL.App/Program.cs @@ -137,12 +137,12 @@ async Task RunProgram() TableNames.Order, TableNames.OrderBoxBlock, // 依赖Order.CompanyID + TableNames.OrderDataBlock, // 依赖Order.CompanyID TableNames.OrderBlockPlan, TableNames.OrderBlockPlanResult,// 依赖OrderBlockPlan.CompanyID / 删除 TableNames.OrderItem, - TableNames.OrderDataBlock, TableNames.OrderDataGoods, TableNames.OrderDataParts, TableNames.OrderModule, @@ -253,6 +253,13 @@ async Task RunProgram() return false; break; } + // OrderDataBlock删除对应Order.OrderNo不存在的对象 + case TableNames.OrderDataBlock: + { + if (!await cache.ExistsAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"]))) + return false; + break; + } // OrderBlockPlan删除CreateTime < 202301的 case TableNames.OrderBlockPlan: { @@ -397,6 +404,13 @@ async Task RunProgram() ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])), TableNames.OrderBoxBlock, TableNames.Order, "OrderNo", "无法获取对应的CompanyID")); break; + // 修正OrderDataBlock.CompanyID + case TableNames.OrderDataBlock: + record["CompanyID"] = + // 获取Order.OrderNo -> CompanyID + ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])), + TableNames.OrderBoxBlock, TableNames.Order, "OrderNo", "无法获取对应的CompanyID"); + break; // OrderModule添加ShardKey列,移除ViewFileName列 case TableNames.OrderModule: record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"])); @@ -529,9 +543,11 @@ async Task RunProgram() { options.ConnectionString = outputOptions.ConnectionString; options.FlushCount = outputOptions.FlushCount; - options.MaxAllowedPacket = outputOptions.MaxAllowedPacket; + options.MaxAllowedPacket = outputOptions.MaxAllowedPacket / 2; options.MaxDatabaseOutputTask = outputOptions.MaxDatabaseOutputTask; options.TreatJsonAsHex = outputOptions.TreatJsonAsHex; + options.NoOutput = outputOptions.NoOutput; + options.ForUpdate = outputOptions.ForUpdate; #if USE_TEST_DB // Test Server diff --git a/MesETL.App/Services/DataRecordQueue.cs b/MesETL.App/Services/DataRecordQueue.cs index 552a3c8..c8ab985 100644 --- a/MesETL.App/Services/DataRecordQueue.cs +++ b/MesETL.App/Services/DataRecordQueue.cs @@ -50,9 +50,6 @@ public class DataRecordQueue : IDisposable public async Task EnqueueAsync(DataRecord record) { - if (_queue.Count >= _queue.BoundedCapacity) - await Task.Delay(500); - var charCount = record.FieldCharCount; LongestFieldCharCount = Math.Max(LongestFieldCharCount, charCount); if(_currentCharCount + charCount > _maxCharCount) diff --git a/MesETL.App/Services/ETL/MySqlDestination.cs b/MesETL.App/Services/ETL/MySqlDestination.cs index 9a6f7e8..7051949 100644 --- a/MesETL.App/Services/ETL/MySqlDestination.cs +++ b/MesETL.App/Services/ETL/MySqlDestination.cs @@ -66,7 +66,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable try { - var excuseList = GetExcuseList(_recordCache, maxAllowPacket).ToList(); + var excuseList = GetExcuseList(_recordCache, maxAllowPacket); foreach (var insertSql in excuseList) { cmd.CommandText = insertSql; @@ -106,6 +106,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable public IEnumerable GetExcuseList(IDictionary> tableRecords,int maxAllowPacket) { var sb = new StringBuilder("SET AUTOCOMMIT = 1;\n"); + var appendCount = 0; foreach (var (tableName, records) in tableRecords) { if (records.Count == 0) @@ -187,6 +188,14 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable // 若字符数量即将大于限制,则返回SQL,清空StringBuilder,保留当前记录的索引值,然后转到StartBuild标签重新开始一轮INSERT if (sb.Length + recordSb.Length + 23 > maxAllowPacket) { + if (appendCount == 0) // 如果单条记录超出maxAllowedPacket + { + sb.Append(recordSb); + _logger.LogWarning("{Table}表单条数据的SQL超出了配置的MaxAllowedPacket,字符数{Count}", tableName, + sb.Length + recordSb.Length + 23); + } + + TryAddForUpdateSuffix(tableName, sb); sb.Append(';').AppendLine(); sb.Append("SET AUTOCOMMIT = 1;"); yield return sb.ToString(); @@ -198,8 +207,10 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable sb.Append(',').AppendLine(); noCommas = false; sb.Append(recordSb); // StringBuilder.Append(StringBuilder)不会分配多余的内存 + appendCount++; } + TryAddForUpdateSuffix(tableName, sb); sb.Append(';'); sb.Append("COMMIT;"); yield return sb.ToString(); @@ -207,7 +218,24 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable } } - + /// + /// 数据必须是同一张表 + /// + /// + /// + private void TryAddForUpdateSuffix(string tableName, StringBuilder sb) + { + var forUpdate = _options.Value.TryGetForUpdate(tableName, out var forUpdateSql); + if (forUpdate) + { + sb.AppendLine($""" + AS new + ON DUPLICATE KEY UPDATE + {forUpdateSql} + """); + } + } + public void Dispose() { _conn.Close(); diff --git a/MesETL.App/appsettings.json b/MesETL.App/appsettings.json index cdb23e0..3fda32a 100644 --- a/MesETL.App/appsettings.json +++ b/MesETL.App/appsettings.json @@ -1,15 +1,17 @@ { + "MemoryThreshold": 8, "GCIntervalMilliseconds": -1, + "UnsafeVariable": false, "Logging": { "LogLevel": { "Default": "Debug" } }, "Input":{ - "InputDir": "D:\\Dump\\MyDumper-ZST 2024-02-05", // Csv数据输入目录 + "InputDir": "D:\\Dump\\NewMockData", // Csv数据输入目录 "UseMock": false, // 使用模拟数据进行测试 "MockCountMultiplier": 1, // 模拟数据量级的乘数 - "TableOrder": [], // 按顺序输入的表 + "TableOrder": ["order", "order_data_block"], // 按顺序输入的表 "TableIgnoreList": [] // 忽略输入的表 }, "Transform":{ @@ -24,8 +26,13 @@ "MaxAllowedPacket": 67108864, "FlushCount": 10000, // 每次提交记录条数 "MaxDatabaseOutputTask" : 4, // 每个数据库最大提交任务数 - "TreatJsonAsHex": false // 将json列作为16进制格式输出(0x前缀),生产库是没有json列的 - }, + "TreatJsonAsHex": false, // 将json列作为16进制格式输出(0x前缀),生产库是没有json列的 + "NoOutput": ["order"], + "ForUpdate": + { + "order_data_block": "CompanyID = new.CompanyID" + } + }, "RecordQueue":{ "ProducerQueueLength": 50000, // 输入队列最大长度 "ConsumerQueueLength": 10000, // 每个输出队列最大长度 diff --git a/MesETL.Test/Test.cs b/MesETL.Test/Test.cs index f9a4f78..176cb46 100644 --- a/MesETL.Test/Test.cs +++ b/MesETL.Test/Test.cs @@ -1,6 +1,7 @@ using System.Collections.Concurrent; using System.Diagnostics; using MesETL.App.Services.ETL; +using MesETL.Shared.Helper; using Xunit.Abstractions; using ZstdSharp; @@ -86,4 +87,66 @@ public class Test } } + + [Fact] + public void GetResult() + { + var input = + """ + machine: 19303/19061 + order: 3416759/3415192 + order_block_plan: 2934281/1968850 + order_block_plan_item: 0/235927707 + order_block_plan_result: 1375479/277667 + order_box_block: 23457666/23450841 + order_data_block: 513012248/513012248 + order_data_goods: 18655270/18655270 + order_data_parts: 353139066/353139066 + order_item: 955274320/955274320 + order_module: 102907480/56935691 + order_module_extra: 40044077/40044077 + order_module_item: 49209022/49209022 + order_package: 12012712/12012712 + order_package_item: 0/80605124 + order_process: 4045309/2682043 + order_process_step: 8343418/5505158 + order_process_step_item: 14856509/9787696 + order_scrap_board: 136096/136090 + process_group: 1577/1543 + process_info: 9212/9008 + process_item_exp: 30/30 + process_schdule_capacity: 42442/42442 + process_step_efficiency: 8/8 + report_template: 7358/7338 + simple_package: 142861/137730 + simple_plan_order: 1167004/854699 + simple_plan_order: 0/55677 + sys_config: 2608/2608 + work_calendar: 11/11 + work_shift: 73/73 + work_time: 77/77 + order_process_step_item: 14856509/9790701 + order_process_step: 8343418/5506925 + order_module: 102907480/56935691 + order_process: 4045309/2682043 + report_template: 7358/7358 + process_info: 9212/9212 + process_group: 1577/1577 + order_block_plan_result: 1375479/277667 + order_box_block: 23457666/23457666 + order_block_plan: 2934281/1968850 + order: 3416759/3416759 + machine: 19303/19303 + order_scrap_board: 136096/136096 + """; + + var arr = input.Split('\n').Select(s => + { + var x = s.Split(':'); + var y = x[1].Split('/').Select(i => long.Parse(i)).ToArray(); + return new {TABLE_NAME = x[0], INPUT = y[0], OUTPUT = y[1], FILTER = y[0] - y[1]}; + }).OrderBy(s => s.TABLE_NAME); + + _output.WriteLine(arr.ToMarkdownTable()); + } } \ No newline at end of file