添加试运行模式,修改生产环境数据重建规则

This commit is contained in:
陈梓阳 2025-01-20 14:29:43 +08:00
parent 1f7213a75c
commit c049e092e4
5 changed files with 26 additions and 23 deletions

View File

@ -31,6 +31,8 @@ public class FileInputService : IInputService
private readonly ProcessContext _context; private readonly ProcessContext _context;
private readonly DataReaderFactory _dataReaderFactory; private readonly DataReaderFactory _dataReaderFactory;
private readonly long _memoryThreshold; private readonly long _memoryThreshold;
private readonly bool _dryRun;
private readonly int _dryRunCount;
public FileInputService(ILogger<FileInputService> logger, public FileInputService(ILogger<FileInputService> logger,
IOptions<DataInputOptions> dataInputOptions, IOptions<DataInputOptions> dataInputOptions,
@ -45,12 +47,16 @@ public class FileInputService : IInputService
_producerQueue = producerQueue; _producerQueue = producerQueue;
_dataReaderFactory = dataReaderFactory; _dataReaderFactory = dataReaderFactory;
_memoryThreshold = (long)(configuration.GetValue<double>("MemoryThreshold", 8) * 1024 * 1024 * 1024); _memoryThreshold = (long)(configuration.GetValue<double>("MemoryThreshold", 8) * 1024 * 1024 * 1024);
_dryRun = configuration.GetValue("DryRun", false);
_dryRunCount = configuration.GetValue("DryRunCount", 100_000);
} }
public async Task ExecuteAsync(CancellationToken cancellationToken) public async Task ExecuteAsync(CancellationToken cancellationToken)
{ {
var inputDir = _dataInputOptions.Value.InputDir ?? throw new ApplicationException("未配置文件输入目录"); var inputDir = _dataInputOptions.Value.InputDir ?? throw new ApplicationException("未配置文件输入目录");
_logger.LogInformation("***** 输入服务已启动,工作目录为:{InputDir} *****", inputDir); _logger.LogInformation("***** 输入服务已启动,工作目录为:{InputDir} *****", inputDir);
if (_dryRun)
_logger.LogInformation("***** 试运行模式已开启,只读取前 {Count} 行数据 *****", _dryRunCount);
var orderedInfo = GetOrderedInputInfo(inputDir); var orderedInfo = GetOrderedInputInfo(inputDir);
@ -74,6 +80,8 @@ public class FileInputService : IInputService
await _producerQueue.EnqueueAsync(record); await _producerQueue.EnqueueAsync(record);
count++; count++;
_context.AddInput(); _context.AddInput();
if (_dryRun && count >= _dryRunCount)
break;
} }
_context.AddTableInput(info.TableName, count); _context.AddTableInput(info.TableName, count);
@ -82,7 +90,7 @@ public class FileInputService : IInputService
} }
_context.CompleteInput(); _context.CompleteInput();
_logger.LogInformation("***** 输入服务已执行完毕 *****"); _logger.LogInformation("***** 输入服务{DryRun}已执行完毕 *****", _dryRun ? " (试运行)" : "");
} }
public IEnumerable<FileInputInfo> GetOrderedInputInfo(string dir) public IEnumerable<FileInputInfo> GetOrderedInputInfo(string dir)

View File

@ -163,9 +163,16 @@ public class MainHostedService : BackgroundService
private async Task ExportResultAsync() private async Task ExportResultAsync()
{ {
var sb = new StringBuilder(); var sb = new StringBuilder();
if (_context.HasException)
sb.AppendLine("# 程序执行完毕,**但中途发生了异常**"); var title = (_config.GetValue("DryRun", false), _context.HasException) switch
else sb.AppendLine("# 程序执行完毕,没有发生错误"); {
(true, true) => "# 试运行结束,**请注意处理异常**",
(true, false) => "# 试运行结束,没有发生异常",
(false, true) => "# 程序执行完毕,**但中途发生了异常**",
(false, false) => "# 程序执行完毕,没有发生错误"
};
sb.AppendLine(title);
sb.AppendLine("## 处理计数"); sb.AppendLine("## 处理计数");
var processCount = new[] var processCount = new[]
{ {

View File

@ -210,15 +210,6 @@ async Task RunProgram()
break; break;
} }
#if !FIX_PLAN_ITEM
// 删除PlanID和PackageID列
case TableNames.OrderItem:
{
record.RemoveField("PlanID");
record.RemoveField("PackageID");
break;
}
#endif
// 将JsonStr列转换为Data列添加CompressionType列 // 将JsonStr列转换为Data列添加CompressionType列
case TableNames.OrderModuleExtra: case TableNames.OrderModuleExtra:
{ {
@ -327,6 +318,9 @@ async Task RunProgram()
[ "ItemID", "ShardKey", "PackageID", "CompanyID" ] [ "ItemID", "ShardKey", "PackageID", "CompanyID" ]
)); ));
} }
record.RemoveField("PlanID");
record.RemoveField("PackageID");
return resultList; return resultList;
} }

View File

@ -2,6 +2,7 @@
"MemoryThreshold": 6, "MemoryThreshold": 6,
"GCIntervalMilliseconds": -1, "GCIntervalMilliseconds": -1,
"UnsafeVariable": true, "UnsafeVariable": true,
"DryRun": false, // 100000
"Logging": { "Logging": {
"LogLevel": { "LogLevel": {
"Default": "Trace" "Default": "Trace"
@ -11,7 +12,7 @@
"InputDir": "D:\\Data\\DatabaseDump\\Prod_Mock_CSV_2024-12-31", // Csv "InputDir": "D:\\Data\\DatabaseDump\\Prod_Mock_CSV_2024-12-31", // Csv
"UseMock": false, // 使 "UseMock": false, // 使
"MockCountMultiplier": 1, // "MockCountMultiplier": 1, //
"TableOrder": ["order_item"], // // "TableOrder": ["order_item"], //
"TableIgnoreList": [] // "TableIgnoreList": [] //
}, },
"Transform":{ "Transform":{
@ -44,7 +45,7 @@
"TenantDb": // "TenantDb": //
{ {
"TenantKey" : "CompanyID", "TenantKey" : "CompanyID",
"UseDbGroup": "mock", "UseDbGroup": "prod",
"DbGroups": { "DbGroups": {
"test": { "test": {
"cferp_test_1": 1000, "cferp_test_1": 1000,
@ -59,13 +60,6 @@
"mesdb_5": 20000, "mesdb_5": 20000,
"mesdb_6": 2147483647 "mesdb_6": 2147483647
}, },
"mock":{
"mesdb_1": 5000,
"mesdb_2": 10000,
"mesdb_3": 15000,
"mesdb_4": 20000,
"mesdb_5": 2147483647
},
"mock_void":{ "mock_void":{
"mesdb_1_void": 5000, "mesdb_1_void": 5000,
"mesdb_2_void": 10000, "mesdb_2_void": 10000,

View File

@ -12,7 +12,7 @@ namespace TestProject1;
public class DatabaseToolBox public class DatabaseToolBox
{ {
private readonly ITestOutputHelper _output; private readonly ITestOutputHelper _output;
public const string ConnStr = "Server=192.168.1.245;Port=3306;UserId=root;Password=ruixinjie!@#123;"; public const string ConnStr = "Server=localhost;Port=3306;UserId=root;Password=123456;";
public DatabaseToolBox(ITestOutputHelper output) public DatabaseToolBox(ITestOutputHelper output)
{ {