Compare commits

...

2 Commits

Author SHA1 Message Date
CZY
d58c9d5177 新增非法字段检查 2024-02-06 15:37:21 +08:00
CZY
719cd2d8e7 错误修正 2024-02-05 16:47:36 +08:00
11 changed files with 122 additions and 45 deletions

View File

@ -0,0 +1,8 @@
namespace MesETL.App.Const;
public static class ConstVar
{
public const string Producer = "Producer";
public const string Null = "NULL";
public const string MyDumperNull = "NULL";
}

View File

@ -1,6 +0,0 @@
namespace MesETL.App.Const;
public static class ProcessStep
{
public const string Produce = "Producer";
}

View File

@ -5,9 +5,19 @@ namespace MesETL.App.Helpers;
public static class DatabaseHelper public static class DatabaseHelper
{ {
public static MySqlConnection CreateConnection(string connStr)
{
var newConnStr = new MySqlConnectionStringBuilder(connStr)
{
ConnectionTimeout = 30,
DefaultCommandTimeout = 0,
}.ConnectionString;
return new MySqlConnection(newConnStr);
}
public static async Task<DataSet> QueryTableAsync(string connStr, string sql) public static async Task<DataSet> QueryTableAsync(string connStr, string sql)
{ {
await using var conn = new MySqlConnection(connStr); await using var conn = CreateConnection(connStr);
if(conn.State is not ConnectionState.Open) if(conn.State is not ConnectionState.Open)
await conn.OpenAsync(); await conn.OpenAsync();
await using var cmd = conn.CreateCommand(); await using var cmd = conn.CreateCommand();
@ -19,7 +29,7 @@ public static class DatabaseHelper
public static async Task<object?> QueryScalarAsync(string connStr, string sql) public static async Task<object?> QueryScalarAsync(string connStr, string sql)
{ {
await using var conn = new MySqlConnection(connStr); await using var conn = CreateConnection(connStr);
if(conn.State is not ConnectionState.Open) if(conn.State is not ConnectionState.Open)
await conn.OpenAsync(); await conn.OpenAsync();
await using var cmd = conn.CreateCommand(); await using var cmd = conn.CreateCommand();
@ -29,7 +39,7 @@ public static class DatabaseHelper
public static async Task<int> NonQueryAsync(string connStr, string sql) public static async Task<int> NonQueryAsync(string connStr, string sql)
{ {
await using var conn = new MySqlConnection(connStr); await using var conn = CreateConnection(connStr);
if(conn.State is not ConnectionState.Open) if(conn.State is not ConnectionState.Open)
await conn.OpenAsync(); await conn.OpenAsync();
await using var cmd = conn.CreateCommand(); await using var cmd = conn.CreateCommand();
@ -39,7 +49,7 @@ public static class DatabaseHelper
public static async Task<int> TransactionAsync(string connStr, string sql, params MySqlParameter[] parameters) public static async Task<int> TransactionAsync(string connStr, string sql, params MySqlParameter[] parameters)
{ {
await using var conn = new MySqlConnection(connStr); await using var conn = CreateConnection(connStr);
if(conn.State is not ConnectionState.Open) if(conn.State is not ConnectionState.Open)
await conn.OpenAsync(); await conn.OpenAsync();
await using var trans = await conn.BeginTransactionAsync(); await using var trans = await conn.BeginTransactionAsync();

View File

@ -1,4 +1,5 @@
using System.Text.RegularExpressions; using System.Text.Json;
using System.Text.RegularExpressions;
using ZstdSharp; using ZstdSharp;
namespace MesETL.App.Helpers; namespace MesETL.App.Helpers;
@ -121,5 +122,17 @@ public static partial class DumpDataHelper
var reader = new StreamReader(ds); var reader = new StreamReader(ds);
return await reader.ReadToEndAsync(); return await reader.ReadToEndAsync();
} }
public static bool IsJson(string str)
{
try
{
JsonDocument.Parse(str);
return true;
}
catch (JsonException)
{
return false;
}
}
} }

View File

@ -1,5 +1,4 @@
using MesETL.App.Const; using MesETL.App.HostedServices.Abstractions;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options; using MesETL.App.Options;
using MesETL.App.Services; using MesETL.App.Services;
using MesETL.App.Services.ETL; using MesETL.App.Services.ETL;
@ -37,7 +36,7 @@ public class FileInputService : IInputService
public FileInputService(ILogger<FileInputService> logger, public FileInputService(ILogger<FileInputService> logger,
IOptions<DataInputOptions> dataInputOptions, IOptions<DataInputOptions> dataInputOptions,
ProcessContext context, ProcessContext context,
[FromKeyedServices(ProcessStep.Produce)] DataRecordQueue producerQueue, [FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
DataReaderFactory dataReaderFactory) DataReaderFactory dataReaderFactory)
{ {
_logger = logger; _logger = logger;

View File

@ -50,6 +50,7 @@ public class MainHostedService : BackgroundService
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{ {
_logger.LogInformation("Command argument detected, execute for each database");
var command = _config["Command"]; var command = _config["Command"];
if (!string.IsNullOrEmpty(command)) if (!string.IsNullOrEmpty(command))
{ {

View File

@ -1,5 +1,4 @@
using System.Diagnostics; using System.Diagnostics;
using MesETL.App.Const;
using MesETL.App.Services; using MesETL.App.Services;
using MesETL.App.Services.Loggers; using MesETL.App.Services.Loggers;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
@ -17,7 +16,7 @@ public class TaskMonitorService
private readonly RecordQueuePool _queuePool; private readonly RecordQueuePool _queuePool;
public TaskMonitorService(ProcessContext context, public TaskMonitorService(ProcessContext context,
[FromKeyedServices(ProcessStep.Produce)] [FromKeyedServices(Const.ConstVar.Producer)]
DataRecordQueue producerQueue, DataRecordQueue producerQueue,
RecordQueuePool queuePool, RecordQueuePool queuePool,
IEnumerable<ITaskMonitorLogger> monitorLoggers) IEnumerable<ITaskMonitorLogger> monitorLoggers)

View File

@ -1,5 +1,4 @@
using MesETL.App.Cache; using MesETL.App.Cache;
using MesETL.App.Const;
using MesETL.App.HostedServices.Abstractions; using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options; using MesETL.App.Options;
using MesETL.App.Services; using MesETL.App.Services;
@ -28,7 +27,7 @@ public class TransformService : ITransformService
public TransformService(ILogger<TransformService> logger, public TransformService(ILogger<TransformService> logger,
IOptions<DataTransformOptions> options, IOptions<DataTransformOptions> options,
[FromKeyedServices(ProcessStep.Produce)] DataRecordQueue producerQueue, [FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
RecordQueuePool queuePool, RecordQueuePool queuePool,
ProcessContext context, ProcessContext context,
ICacher cache, ICacher cache,

View File

@ -1,5 +1,6 @@
// #define USE_TEST_DB // 测试库的结构与生产库不一样如果使用测试库运行则加上USE_TEST_DB预处理器指令 // #define USE_TEST_DB // 测试库的结构与生产库不一样如果使用测试库运行则加上USE_TEST_DB预处理器指令
using System.Text.Json;
using MesETL.App; using MesETL.App;
using MesETL.App.Services; using MesETL.App.Services;
using MesETL.App.Services.ETL; using MesETL.App.Services.ETL;
@ -131,7 +132,9 @@ async Task RunProgram()
TableNames.OrderModuleExtra, TableNames.OrderModuleExtra,
TableNames.OrderModuleItem, TableNames.OrderModuleItem,
TableNames.OrderPackage, TableNames.OrderPackage,
#if USE_TEST_DB
TableNames.OrderPatchDetail, TableNames.OrderPatchDetail,
#endif
TableNames.OrderProcess, TableNames.OrderProcess,
TableNames.OrderProcessStep, TableNames.OrderProcessStep,
@ -240,12 +243,14 @@ async Task RunProgram()
return false; return false;
break; break;
} }
// OrderBlockPlan删除CreateTime < 202301的 // OrderBlockPlan删除CreateTime < 202301的Json列合法检查
case TableNames.OrderBlockPlan: case TableNames.OrderBlockPlan:
{ {
var time = DateTime.Parse(record["CreateTime"].Trim('"')); var time = DateTime.Parse(record["CreateTime"].Trim('"','\''));
if (time < oldestTime) if (time < oldestTime)
return false; return false;
// if (!DumpDataHelper.IsJson(record["OrderNos"])) return false;
break; break;
} }
// OrderBlockPlanResult删除对应order_block_plan.ID不存在的对象 // OrderBlockPlanResult删除对应order_block_plan.ID不存在的对象
@ -255,6 +260,12 @@ async Task RunProgram()
return false; return false;
break; break;
} }
// OrderDataGoods Json列合法检查
case TableNames.OrderDataGoods:
{
if (!DumpDataHelper.IsJson(record["ExtraProp"])) return false;
break;
}
// OrderModule删除OrderNo < 202301的 // OrderModule删除OrderNo < 202301的
case TableNames.OrderModule: case TableNames.OrderModule:
{ {
@ -297,7 +308,7 @@ async Task RunProgram()
// SimplePlanOrder删除CreateTime < 202301的 // SimplePlanOrder删除CreateTime < 202301的
case TableNames.SimplePlanOrder: case TableNames.SimplePlanOrder:
{ {
var time = DateTime.Parse(record["CreateTime"].Trim('"')); var time = DateTime.Parse(record["CreateTime"].Trim('"', '\''));
if (time < oldestTime) if (time < oldestTime)
return false; return false;
break; break;
@ -308,24 +319,49 @@ async Task RunProgram()
}; };
// 数据替换 // 数据替换
/*
*
*
* int或任何非无符号整型 -> -1
* varchar -> ''
* datetime -> '1000-01-01'datetime最小值
* text -> 0 160MyDumper中的text是为16进制
*/
const string DefaultInt = "0";
const string DefaultStr = "''";
const string DefaultDateTime = "'1000-01-01'";
const string DefaultText = "0";
static void ReplaceIfMyDumperNull(DataRecord record, string fieldName, string replaceValue)
{
if (record[fieldName] is ConstVar.MyDumperNull)
record[fieldName] = replaceValue;
}
options.RecordModify = async context => options.RecordModify = async context =>
{ {
var record = context.Record; var record = context.Record;
var cache = context.Cacher; var cache = context.Cacher;
switch (record.TableName) switch (record.TableName)
{ {
#if USE_TEST_DB // Machine处理非空列
// Order表移除IsBatch列 case TableNames.Machine:
ReplaceIfMyDumperNull(record, "Name", DefaultStr);
ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime);
ReplaceIfMyDumperNull(record, "CreatorID", DefaultInt);
ReplaceIfMyDumperNull(record, "EditTime", DefaultDateTime);
ReplaceIfMyDumperNull(record, "EditorID", DefaultInt);
ReplaceIfMyDumperNull(record, "Settings", DefaultText);
break;
// Order处理非空列
case TableNames.Order: case TableNames.Order:
record.RemoveField("IsBatch"); ReplaceIfMyDumperNull(record, "Deleted", DefaultInt);
break; break;
#endif // OrderBlockPlan处理text->json列
//OrderBlockPlan将OrderNo长度<2的置空
case TableNames.OrderBlockPlan: case TableNames.OrderBlockPlan:
if (record["OrderNos"].Length < 2) JsonDocument.Parse(record["OrderNos"]);
record["OrderNos"] = "NULL";
break; break;
// OrderBlockPlanResult表添加CompanyID列 // OrderBlockPlanResult添加CompanyID
case TableNames.OrderBlockPlanResult: case TableNames.OrderBlockPlanResult:
record.AddField("CompanyID", record.AddField("CompanyID",
// 获取OrderBlockPlan.ID -> CompanyID // 获取OrderBlockPlan.ID -> CompanyID
@ -347,25 +383,37 @@ async Task RunProgram()
// OrderProcess添加ShardKey列NextStepID的空值转换为0 // OrderProcess添加ShardKey列NextStepID的空值转换为0
case TableNames.OrderProcess: case TableNames.OrderProcess:
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"])); record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
#if USE_TEST_DB
if(record["NextStepID"] == "\\N")
record["NextStepID"] = "0";
#endif
break; break;
// OrderProcessStepOrderProcessStepItem添加ShardKey // OrderProcessStep添加ShardKey
case TableNames.OrderProcessStep: case TableNames.OrderProcessStep:
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"])); record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
break; break;
// OrderProcessStepItem添加ShardKey列处理非空列
case TableNames.OrderProcessStepItem: case TableNames.OrderProcessStepItem:
ReplaceIfMyDumperNull(record, "DataID", DefaultInt);
record.AddField("ShardKey", record.AddField("ShardKey",
// 获取OrderProcess.ID -> ShardKey // 获取OrderProcess.ID -> ShardKey
ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.OrderProcess_ID_ShardKey(record["OrderProcessID"])), ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.OrderProcess_ID_ShardKey(record["OrderProcessID"])),
TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "脏数据未处理")); TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "脏数据未处理"));
break; break;
// OrderScrapBoard处理非空列
case TableNames.OrderScrapBoard:
ReplaceIfMyDumperNull(record, "Color", DefaultStr);
ReplaceIfMyDumperNull(record, "GoodsName", DefaultStr);
ReplaceIfMyDumperNull(record, "Material", DefaultStr);
ReplaceIfMyDumperNull(record, "MaterialName", DefaultStr);
break;
// ProcessItemExp处理非空列
case TableNames.ProcessItemExp:
ReplaceIfMyDumperNull(record, "MaxPartsID", DefaultInt);
ReplaceIfMyDumperNull(record, "ProcessGroupID", DefaultInt);
break;
// SimplePlanOrder处理非空列添加Deleted
case TableNames.SimplePlanOrder: case TableNames.SimplePlanOrder:
#if USE_TEST_DB ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime);
record.RemoveField("ProcessState"); ReplaceIfMyDumperNull(record, "UpdateTime", DefaultDateTime);
#endif ReplaceIfMyDumperNull(record, "CompanyID", DefaultInt);
ReplaceIfMyDumperNull(record, "SingleName", DefaultStr);
record.AddField("Deleted", "0"); record.AddField("Deleted", "0");
break; break;
} }
@ -531,7 +579,7 @@ async Task RunProgram()
{ "process_item_exp.ItemJson", ColumnType.Text }, { "process_item_exp.ItemJson", ColumnType.Text },
{ "report_template.Template", ColumnType.Text }, { "report_template.Template", ColumnType.Text },
{ "report_template.SourceConfig", ColumnType.Text }, { "report_template.SourceConfig", ColumnType.Text },
{ "order_block_plan.OrderNos", ColumnType.Json }, { "order_block_plan.OrderNos", ColumnType.Text },
{ "order_block_plan.BlockInfo", ColumnType.Text }, { "order_block_plan.BlockInfo", ColumnType.Text },
}; };
#endif #endif
@ -552,7 +600,7 @@ async Task RunProgram()
host.Services.AddDataSourceFactory(); host.Services.AddDataSourceFactory();
host.Services.AddErrorRecorderFactory(); host.Services.AddErrorRecorderFactory();
host.Services.AddSingleton<ProcessContext>(); host.Services.AddSingleton<ProcessContext>();
host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Produce); host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer);
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(500_000))).ToArray()); host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(500_000))).ToArray());
host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>(); host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>(); host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();

View File

@ -1,5 +1,6 @@
using System.Text; using System.Text;
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using MesETL.App.Const;
using MesETL.App.Helpers; using MesETL.App.Helpers;
using MesETL.App.Options; using MesETL.App.Options;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -137,9 +138,10 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
// 在这里处理特殊列 // 在这里处理特殊列
#region HandleFields #region HandleFields
if (field.Length == 2 && field == @"\N") // MyDumper NULL
if (field.Length == 2 && field == @"\N") // MyDumper导出的NULL为'\N''\'不是转义字符)
{ {
recordSb.Append("NULL"); recordSb.Append(ConstVar.Null);
goto Escape; goto Escape;
} }
@ -148,14 +150,18 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
case ColumnType.Text: case ColumnType.Text:
if(string.IsNullOrEmpty(field)) if(string.IsNullOrEmpty(field))
recordSb.Append("''"); recordSb.Append("''");
else if (field == ConstVar.Null)
recordSb.Append(ConstVar.Null);
else recordSb.Append($"_utf8mb4 0x{field}"); else recordSb.Append($"_utf8mb4 0x{field}");
break; break;
case ColumnType.Blob: case ColumnType.Blob:
if (string.IsNullOrEmpty(field)) if (string.IsNullOrEmpty(field))
recordSb.Append("''"); recordSb.Append("''");
else if (field == ConstVar.Null)
recordSb.Append(ConstVar.Null);
else recordSb.Append($"0x{field}"); else recordSb.Append($"0x{field}");
break; break;
case ColumnType.Json: case ColumnType.Json:// 生产库没有JSON列仅用于测试库进行测试
if(string.IsNullOrEmpty(field)) if(string.IsNullOrEmpty(field))
recordSb.Append("'[]'"); // JObject or JArray? recordSb.Append("'[]'"); // JObject or JArray?
else if (_options.Value.TreatJsonAsHex) else if (_options.Value.TreatJsonAsHex)

View File

@ -7,7 +7,7 @@
"Input":{ "Input":{
"InputDir": "D:\\Dump\\MockData", // Csv "InputDir": "D:\\Dump\\MockData", // Csv
"UseMock": false, // 使 "UseMock": false, // 使
"MockCountMultiplier": 0.5 // "MockCountMultiplier": 0 //
}, },
"Transform":{ "Transform":{
"StrictMode": false, // true "StrictMode": false, // true
@ -21,7 +21,7 @@
"MaxAllowedPacket": 67108864, "MaxAllowedPacket": 67108864,
"FlushCount": 10000, // "FlushCount": 10000, //
"MaxDatabaseOutputTask" : 4, // "MaxDatabaseOutputTask" : 4, //
"TreatJsonAsHex": false, // json16(0x) "TreatJsonAsHex": false // json16(0x)json
}, },
"RedisCache": { "RedisCache": {
"Configuration": "192.168.1.246:6380", "Configuration": "192.168.1.246:6380",