错误修正

This commit is contained in:
陈梓阳 2024-02-05 16:47:36 +08:00
parent 5cda84797b
commit 719cd2d8e7
5 changed files with 29 additions and 23 deletions

View File

@ -5,9 +5,19 @@ namespace MesETL.App.Helpers;
public static class DatabaseHelper public static class DatabaseHelper
{ {
public static MySqlConnection CreateConnection(string connStr)
{
var newConnStr = new MySqlConnectionStringBuilder(connStr)
{
ConnectionTimeout = 30,
DefaultCommandTimeout = 0,
}.ConnectionString;
return new MySqlConnection(newConnStr);
}
public static async Task<DataSet> QueryTableAsync(string connStr, string sql) public static async Task<DataSet> QueryTableAsync(string connStr, string sql)
{ {
await using var conn = new MySqlConnection(connStr); await using var conn = CreateConnection(connStr);
if(conn.State is not ConnectionState.Open) if(conn.State is not ConnectionState.Open)
await conn.OpenAsync(); await conn.OpenAsync();
await using var cmd = conn.CreateCommand(); await using var cmd = conn.CreateCommand();
@ -19,7 +29,7 @@ public static class DatabaseHelper
public static async Task<object?> QueryScalarAsync(string connStr, string sql) public static async Task<object?> QueryScalarAsync(string connStr, string sql)
{ {
await using var conn = new MySqlConnection(connStr); await using var conn = CreateConnection(connStr);
if(conn.State is not ConnectionState.Open) if(conn.State is not ConnectionState.Open)
await conn.OpenAsync(); await conn.OpenAsync();
await using var cmd = conn.CreateCommand(); await using var cmd = conn.CreateCommand();
@ -29,7 +39,7 @@ public static class DatabaseHelper
public static async Task<int> NonQueryAsync(string connStr, string sql) public static async Task<int> NonQueryAsync(string connStr, string sql)
{ {
await using var conn = new MySqlConnection(connStr); await using var conn = CreateConnection(connStr);
if(conn.State is not ConnectionState.Open) if(conn.State is not ConnectionState.Open)
await conn.OpenAsync(); await conn.OpenAsync();
await using var cmd = conn.CreateCommand(); await using var cmd = conn.CreateCommand();
@ -39,7 +49,7 @@ public static class DatabaseHelper
public static async Task<int> TransactionAsync(string connStr, string sql, params MySqlParameter[] parameters) public static async Task<int> TransactionAsync(string connStr, string sql, params MySqlParameter[] parameters)
{ {
await using var conn = new MySqlConnection(connStr); await using var conn = CreateConnection(connStr);
if(conn.State is not ConnectionState.Open) if(conn.State is not ConnectionState.Open)
await conn.OpenAsync(); await conn.OpenAsync();
await using var trans = await conn.BeginTransactionAsync(); await using var trans = await conn.BeginTransactionAsync();

View File

@ -50,6 +50,7 @@ public class MainHostedService : BackgroundService
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{ {
_logger.LogInformation("Command argument detected, execute for each database");
var command = _config["Command"]; var command = _config["Command"];
if (!string.IsNullOrEmpty(command)) if (!string.IsNullOrEmpty(command))
{ {

View File

@ -131,7 +131,9 @@ async Task RunProgram()
TableNames.OrderModuleExtra, TableNames.OrderModuleExtra,
TableNames.OrderModuleItem, TableNames.OrderModuleItem,
TableNames.OrderPackage, TableNames.OrderPackage,
#if USE_TEST_DB
TableNames.OrderPatchDetail, TableNames.OrderPatchDetail,
#endif
TableNames.OrderProcess, TableNames.OrderProcess,
TableNames.OrderProcessStep, TableNames.OrderProcessStep,
@ -314,12 +316,6 @@ async Task RunProgram()
var cache = context.Cacher; var cache = context.Cacher;
switch (record.TableName) switch (record.TableName)
{ {
#if USE_TEST_DB
// Order表移除IsBatch列
case TableNames.Order:
record.RemoveField("IsBatch");
break;
#endif
//OrderBlockPlan将OrderNo长度<2的置空 //OrderBlockPlan将OrderNo长度<2的置空
case TableNames.OrderBlockPlan: case TableNames.OrderBlockPlan:
if (record["OrderNos"].Length < 2) if (record["OrderNos"].Length < 2)
@ -347,10 +343,6 @@ async Task RunProgram()
// OrderProcess添加ShardKey列NextStepID的空值转换为0 // OrderProcess添加ShardKey列NextStepID的空值转换为0
case TableNames.OrderProcess: case TableNames.OrderProcess:
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"])); record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
#if USE_TEST_DB
if(record["NextStepID"] == "\\N")
record["NextStepID"] = "0";
#endif
break; break;
// OrderProcessStepOrderProcessStepItem添加ShardKey列 // OrderProcessStepOrderProcessStepItem添加ShardKey列
case TableNames.OrderProcessStep: case TableNames.OrderProcessStep:
@ -363,9 +355,6 @@ async Task RunProgram()
TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "脏数据未处理")); TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "脏数据未处理"));
break; break;
case TableNames.SimplePlanOrder: case TableNames.SimplePlanOrder:
#if USE_TEST_DB
record.RemoveField("ProcessState");
#endif
record.AddField("Deleted", "0"); record.AddField("Deleted", "0");
break; break;
} }
@ -531,7 +520,7 @@ async Task RunProgram()
{ "process_item_exp.ItemJson", ColumnType.Text }, { "process_item_exp.ItemJson", ColumnType.Text },
{ "report_template.Template", ColumnType.Text }, { "report_template.Template", ColumnType.Text },
{ "report_template.SourceConfig", ColumnType.Text }, { "report_template.SourceConfig", ColumnType.Text },
{ "order_block_plan.OrderNos", ColumnType.Json }, { "order_block_plan.OrderNos", ColumnType.Text },
{ "order_block_plan.BlockInfo", ColumnType.Text }, { "order_block_plan.BlockInfo", ColumnType.Text },
}; };
#endif #endif

View File

@ -137,9 +137,11 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
// 在这里处理特殊列 // 在这里处理特殊列
#region HandleFields #region HandleFields
if (field.Length == 2 && field == @"\N") // MyDumper NULL
const string NULL = "NULL";
if (field.Length == 2 && field == @"\N") // MyDumper导出的NULL为'\N''\'不是转义字符)
{ {
recordSb.Append("NULL"); recordSb.Append(NULL);
goto Escape; goto Escape;
} }
@ -148,14 +150,18 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
case ColumnType.Text: case ColumnType.Text:
if(string.IsNullOrEmpty(field)) if(string.IsNullOrEmpty(field))
recordSb.Append("''"); recordSb.Append("''");
else if (field == NULL)
recordSb.Append(NULL);
else recordSb.Append($"_utf8mb4 0x{field}"); else recordSb.Append($"_utf8mb4 0x{field}");
break; break;
case ColumnType.Blob: case ColumnType.Blob:
if (string.IsNullOrEmpty(field)) if (string.IsNullOrEmpty(field))
recordSb.Append("''"); recordSb.Append("''");
else if (field == NULL)
recordSb.Append(NULL);
else recordSb.Append($"0x{field}"); else recordSb.Append($"0x{field}");
break; break;
case ColumnType.Json: case ColumnType.Json:// 生产库没有JSON列仅用于测试库进行测试
if(string.IsNullOrEmpty(field)) if(string.IsNullOrEmpty(field))
recordSb.Append("'[]'"); // JObject or JArray? recordSb.Append("'[]'"); // JObject or JArray?
else if (_options.Value.TreatJsonAsHex) else if (_options.Value.TreatJsonAsHex)

View File

@ -7,7 +7,7 @@
"Input":{ "Input":{
"InputDir": "D:\\Dump\\MockData", // Csv "InputDir": "D:\\Dump\\MockData", // Csv
"UseMock": false, // 使 "UseMock": false, // 使
"MockCountMultiplier": 0.5 // "MockCountMultiplier": 0 //
}, },
"Transform":{ "Transform":{
"StrictMode": false, // true "StrictMode": false, // true
@ -21,7 +21,7 @@
"MaxAllowedPacket": 67108864, "MaxAllowedPacket": 67108864,
"FlushCount": 10000, // "FlushCount": 10000, //
"MaxDatabaseOutputTask" : 4, // "MaxDatabaseOutputTask" : 4, //
"TreatJsonAsHex": false, // json16(0x) "TreatJsonAsHex": false // json16(0x)json
}, },
"RedisCache": { "RedisCache": {
"Configuration": "192.168.1.246:6380", "Configuration": "192.168.1.246:6380",