From c049e092e413c954fc5e3686189c1e5eb116c192 Mon Sep 17 00:00:00 2001 From: "2817212736@qq.com" <2817212736@qq.com> Date: Mon, 20 Jan 2025 14:29:43 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E8=AF=95=E8=BF=90=E8=A1=8C?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F=EF=BC=8C=E4=BF=AE=E6=94=B9=E7=94=9F=E4=BA=A7?= =?UTF-8?q?=E7=8E=AF=E5=A2=83=E6=95=B0=E6=8D=AE=E9=87=8D=E5=BB=BA=E8=A7=84?= =?UTF-8?q?=E5=88=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- MesETL.App/HostedServices/FileInputService.cs | 10 +++++++++- MesETL.App/HostedServices/MainHostedService.cs | 13 ++++++++++--- MesETL.App/Program.cs | 12 +++--------- MesETL.App/appsettings.json | 12 +++--------- MesETL.Test/DatabaseToolBox.cs | 2 +- 5 files changed, 26 insertions(+), 23 deletions(-) diff --git a/MesETL.App/HostedServices/FileInputService.cs b/MesETL.App/HostedServices/FileInputService.cs index 6e8c3bf..94e9758 100644 --- a/MesETL.App/HostedServices/FileInputService.cs +++ b/MesETL.App/HostedServices/FileInputService.cs @@ -31,6 +31,8 @@ public class FileInputService : IInputService private readonly ProcessContext _context; private readonly DataReaderFactory _dataReaderFactory; private readonly long _memoryThreshold; + private readonly bool _dryRun; + private readonly int _dryRunCount; public FileInputService(ILogger logger, IOptions dataInputOptions, @@ -45,12 +47,16 @@ public class FileInputService : IInputService _producerQueue = producerQueue; _dataReaderFactory = dataReaderFactory; _memoryThreshold = (long)(configuration.GetValue("MemoryThreshold", 8) * 1024 * 1024 * 1024); + _dryRun = configuration.GetValue("DryRun", false); + _dryRunCount = configuration.GetValue("DryRunCount", 100_000); } public async Task ExecuteAsync(CancellationToken cancellationToken) { var inputDir = _dataInputOptions.Value.InputDir ?? throw new ApplicationException("未配置文件输入目录"); _logger.LogInformation("***** 输入服务已启动,工作目录为:{InputDir} *****", inputDir); + if (_dryRun) + _logger.LogInformation("***** 试运行模式已开启,只读取前 {Count} 行数据 *****", _dryRunCount); var orderedInfo = GetOrderedInputInfo(inputDir); @@ -74,6 +80,8 @@ public class FileInputService : IInputService await _producerQueue.EnqueueAsync(record); count++; _context.AddInput(); + if (_dryRun && count >= _dryRunCount) + break; } _context.AddTableInput(info.TableName, count); @@ -82,7 +90,7 @@ public class FileInputService : IInputService } _context.CompleteInput(); - _logger.LogInformation("***** 输入服务已执行完毕 *****"); + _logger.LogInformation("***** 输入服务{DryRun}已执行完毕 *****", _dryRun ? " (试运行)" : ""); } public IEnumerable GetOrderedInputInfo(string dir) diff --git a/MesETL.App/HostedServices/MainHostedService.cs b/MesETL.App/HostedServices/MainHostedService.cs index 5570002..061945c 100644 --- a/MesETL.App/HostedServices/MainHostedService.cs +++ b/MesETL.App/HostedServices/MainHostedService.cs @@ -163,9 +163,16 @@ public class MainHostedService : BackgroundService private async Task ExportResultAsync() { var sb = new StringBuilder(); - if (_context.HasException) - sb.AppendLine("# 程序执行完毕,**但中途发生了异常**"); - else sb.AppendLine("# 程序执行完毕,没有发生错误"); + + var title = (_config.GetValue("DryRun", false), _context.HasException) switch + { + (true, true) => "# 试运行结束,**请注意处理异常**", + (true, false) => "# 试运行结束,没有发生异常", + (false, true) => "# 程序执行完毕,**但中途发生了异常**", + (false, false) => "# 程序执行完毕,没有发生错误" + }; + sb.AppendLine(title); + sb.AppendLine("## 处理计数"); var processCount = new[] { diff --git a/MesETL.App/Program.cs b/MesETL.App/Program.cs index ad9dd05..07afda2 100644 --- a/MesETL.App/Program.cs +++ b/MesETL.App/Program.cs @@ -210,15 +210,6 @@ async Task RunProgram() break; } -#if !FIX_PLAN_ITEM - // 删除PlanID和PackageID列 - case TableNames.OrderItem: - { - record.RemoveField("PlanID"); - record.RemoveField("PackageID"); - break; - } -#endif // 将JsonStr列转换为Data列,添加CompressionType列 case TableNames.OrderModuleExtra: { @@ -327,6 +318,9 @@ async Task RunProgram() [ "ItemID", "ShardKey", "PackageID", "CompanyID" ] )); } + + record.RemoveField("PlanID"); + record.RemoveField("PackageID"); return resultList; } diff --git a/MesETL.App/appsettings.json b/MesETL.App/appsettings.json index 7afa662..2e8da5e 100644 --- a/MesETL.App/appsettings.json +++ b/MesETL.App/appsettings.json @@ -2,6 +2,7 @@ "MemoryThreshold": 6, "GCIntervalMilliseconds": -1, "UnsafeVariable": true, + "DryRun": false, // 试运行,仅输入每张表的前100000条数据 "Logging": { "LogLevel": { "Default": "Trace" @@ -11,7 +12,7 @@ "InputDir": "D:\\Data\\DatabaseDump\\Prod_Mock_CSV_2024-12-31", // Csv数据输入目录 "UseMock": false, // 使用模拟数据进行测试 "MockCountMultiplier": 1, // 模拟数据量级的乘数 - "TableOrder": ["order_item"], // 按顺序输入的表 +// "TableOrder": ["order_item"], // 按顺序输入的表 "TableIgnoreList": [] // 忽略输入的表 }, "Transform":{ @@ -44,7 +45,7 @@ "TenantDb": // 分库配置 { "TenantKey" : "CompanyID", - "UseDbGroup": "mock", + "UseDbGroup": "prod", "DbGroups": { "test": { "cferp_test_1": 1000, @@ -59,13 +60,6 @@ "mesdb_5": 20000, "mesdb_6": 2147483647 }, - "mock":{ - "mesdb_1": 5000, - "mesdb_2": 10000, - "mesdb_3": 15000, - "mesdb_4": 20000, - "mesdb_5": 2147483647 - }, "mock_void":{ "mesdb_1_void": 5000, "mesdb_2_void": 10000, diff --git a/MesETL.Test/DatabaseToolBox.cs b/MesETL.Test/DatabaseToolBox.cs index 91052c3..7a8ff60 100644 --- a/MesETL.Test/DatabaseToolBox.cs +++ b/MesETL.Test/DatabaseToolBox.cs @@ -12,7 +12,7 @@ namespace TestProject1; public class DatabaseToolBox { private readonly ITestOutputHelper _output; - public const string ConnStr = "Server=192.168.1.245;Port=3306;UserId=root;Password=ruixinjie!@#123;"; + public const string ConnStr = "Server=localhost;Port=3306;UserId=root;Password=123456;"; public DatabaseToolBox(ITestOutputHelper output) {