Compare commits

...

2 Commits

Author SHA1 Message Date
CZY
d58c9d5177 新增非法字段检查 2024-02-06 15:37:21 +08:00
CZY
719cd2d8e7 错误修正 2024-02-05 16:47:36 +08:00
11 changed files with 122 additions and 45 deletions

View File

@ -0,0 +1,8 @@
namespace MesETL.App.Const;
public static class ConstVar
{
public const string Producer = "Producer";
public const string Null = "NULL";
public const string MyDumperNull = "NULL";
}

View File

@ -1,6 +0,0 @@
namespace MesETL.App.Const;
public static class ProcessStep
{
public const string Produce = "Producer";
}

View File

@ -5,9 +5,19 @@ namespace MesETL.App.Helpers;
public static class DatabaseHelper
{
public static MySqlConnection CreateConnection(string connStr)
{
var newConnStr = new MySqlConnectionStringBuilder(connStr)
{
ConnectionTimeout = 30,
DefaultCommandTimeout = 0,
}.ConnectionString;
return new MySqlConnection(newConnStr);
}
public static async Task<DataSet> QueryTableAsync(string connStr, string sql)
{
await using var conn = new MySqlConnection(connStr);
await using var conn = CreateConnection(connStr);
if(conn.State is not ConnectionState.Open)
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
@ -19,7 +29,7 @@ public static class DatabaseHelper
public static async Task<object?> QueryScalarAsync(string connStr, string sql)
{
await using var conn = new MySqlConnection(connStr);
await using var conn = CreateConnection(connStr);
if(conn.State is not ConnectionState.Open)
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
@ -29,7 +39,7 @@ public static class DatabaseHelper
public static async Task<int> NonQueryAsync(string connStr, string sql)
{
await using var conn = new MySqlConnection(connStr);
await using var conn = CreateConnection(connStr);
if(conn.State is not ConnectionState.Open)
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
@ -39,7 +49,7 @@ public static class DatabaseHelper
public static async Task<int> TransactionAsync(string connStr, string sql, params MySqlParameter[] parameters)
{
await using var conn = new MySqlConnection(connStr);
await using var conn = CreateConnection(connStr);
if(conn.State is not ConnectionState.Open)
await conn.OpenAsync();
await using var trans = await conn.BeginTransactionAsync();

View File

@ -1,4 +1,5 @@
using System.Text.RegularExpressions;
using System.Text.Json;
using System.Text.RegularExpressions;
using ZstdSharp;
namespace MesETL.App.Helpers;
@ -121,5 +122,17 @@ public static partial class DumpDataHelper
var reader = new StreamReader(ds);
return await reader.ReadToEndAsync();
}
public static bool IsJson(string str)
{
try
{
JsonDocument.Parse(str);
return true;
}
catch (JsonException)
{
return false;
}
}
}

View File

@ -1,5 +1,4 @@
using MesETL.App.Const;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
using MesETL.App.Services.ETL;
@ -37,7 +36,7 @@ public class FileInputService : IInputService
public FileInputService(ILogger<FileInputService> logger,
IOptions<DataInputOptions> dataInputOptions,
ProcessContext context,
[FromKeyedServices(ProcessStep.Produce)] DataRecordQueue producerQueue,
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
DataReaderFactory dataReaderFactory)
{
_logger = logger;

View File

@ -50,6 +50,7 @@ public class MainHostedService : BackgroundService
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Command argument detected, execute for each database");
var command = _config["Command"];
if (!string.IsNullOrEmpty(command))
{

View File

@ -1,5 +1,4 @@
using System.Diagnostics;
using MesETL.App.Const;
using MesETL.App.Services;
using MesETL.App.Services.Loggers;
using Microsoft.Extensions.DependencyInjection;
@ -17,7 +16,7 @@ public class TaskMonitorService
private readonly RecordQueuePool _queuePool;
public TaskMonitorService(ProcessContext context,
[FromKeyedServices(ProcessStep.Produce)]
[FromKeyedServices(Const.ConstVar.Producer)]
DataRecordQueue producerQueue,
RecordQueuePool queuePool,
IEnumerable<ITaskMonitorLogger> monitorLoggers)

View File

@ -1,5 +1,4 @@
using MesETL.App.Cache;
using MesETL.App.Const;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
@ -28,7 +27,7 @@ public class TransformService : ITransformService
public TransformService(ILogger<TransformService> logger,
IOptions<DataTransformOptions> options,
[FromKeyedServices(ProcessStep.Produce)] DataRecordQueue producerQueue,
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
RecordQueuePool queuePool,
ProcessContext context,
ICacher cache,

View File

@ -1,5 +1,6 @@
// #define USE_TEST_DB // 测试库的结构与生产库不一样如果使用测试库运行则加上USE_TEST_DB预处理器指令
using System.Text.Json;
using MesETL.App;
using MesETL.App.Services;
using MesETL.App.Services.ETL;
@ -131,7 +132,9 @@ async Task RunProgram()
TableNames.OrderModuleExtra,
TableNames.OrderModuleItem,
TableNames.OrderPackage,
#if USE_TEST_DB
TableNames.OrderPatchDetail,
#endif
TableNames.OrderProcess,
TableNames.OrderProcessStep,
@ -240,12 +243,14 @@ async Task RunProgram()
return false;
break;
}
// OrderBlockPlan删除CreateTime < 202301的
// OrderBlockPlan删除CreateTime < 202301的Json列合法检查
case TableNames.OrderBlockPlan:
{
var time = DateTime.Parse(record["CreateTime"].Trim('"'));
var time = DateTime.Parse(record["CreateTime"].Trim('"','\''));
if (time < oldestTime)
return false;
// if (!DumpDataHelper.IsJson(record["OrderNos"])) return false;
break;
}
// OrderBlockPlanResult删除对应order_block_plan.ID不存在的对象
@ -255,6 +260,12 @@ async Task RunProgram()
return false;
break;
}
// OrderDataGoods Json列合法检查
case TableNames.OrderDataGoods:
{
if (!DumpDataHelper.IsJson(record["ExtraProp"])) return false;
break;
}
// OrderModule删除OrderNo < 202301的
case TableNames.OrderModule:
{
@ -297,7 +308,7 @@ async Task RunProgram()
// SimplePlanOrder删除CreateTime < 202301的
case TableNames.SimplePlanOrder:
{
var time = DateTime.Parse(record["CreateTime"].Trim('"'));
var time = DateTime.Parse(record["CreateTime"].Trim('"', '\''));
if (time < oldestTime)
return false;
break;
@ -308,24 +319,49 @@ async Task RunProgram()
};
// 数据替换
/*
*
*
* int或任何非无符号整型 -> -1
* varchar -> ''
* datetime -> '1000-01-01'datetime最小值
* text -> 0 160MyDumper中的text是为16进制
*/
const string DefaultInt = "0";
const string DefaultStr = "''";
const string DefaultDateTime = "'1000-01-01'";
const string DefaultText = "0";
static void ReplaceIfMyDumperNull(DataRecord record, string fieldName, string replaceValue)
{
if (record[fieldName] is ConstVar.MyDumperNull)
record[fieldName] = replaceValue;
}
options.RecordModify = async context =>
{
var record = context.Record;
var cache = context.Cacher;
switch (record.TableName)
{
#if USE_TEST_DB
// Order表移除IsBatch列
// Machine处理非空列
case TableNames.Machine:
ReplaceIfMyDumperNull(record, "Name", DefaultStr);
ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime);
ReplaceIfMyDumperNull(record, "CreatorID", DefaultInt);
ReplaceIfMyDumperNull(record, "EditTime", DefaultDateTime);
ReplaceIfMyDumperNull(record, "EditorID", DefaultInt);
ReplaceIfMyDumperNull(record, "Settings", DefaultText);
break;
// Order处理非空列
case TableNames.Order:
record.RemoveField("IsBatch");
ReplaceIfMyDumperNull(record, "Deleted", DefaultInt);
break;
#endif
//OrderBlockPlan将OrderNo长度<2的置空
// OrderBlockPlan处理text->json列
case TableNames.OrderBlockPlan:
if (record["OrderNos"].Length < 2)
record["OrderNos"] = "NULL";
JsonDocument.Parse(record["OrderNos"]);
break;
// OrderBlockPlanResult表添加CompanyID列
// OrderBlockPlanResult添加CompanyID
case TableNames.OrderBlockPlanResult:
record.AddField("CompanyID",
// 获取OrderBlockPlan.ID -> CompanyID
@ -347,25 +383,37 @@ async Task RunProgram()
// OrderProcess添加ShardKey列NextStepID的空值转换为0
case TableNames.OrderProcess:
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
#if USE_TEST_DB
if(record["NextStepID"] == "\\N")
record["NextStepID"] = "0";
#endif
break;
// OrderProcessStepOrderProcessStepItem添加ShardKey
// OrderProcessStep添加ShardKey
case TableNames.OrderProcessStep:
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
break;
// OrderProcessStepItem添加ShardKey列处理非空列
case TableNames.OrderProcessStepItem:
ReplaceIfMyDumperNull(record, "DataID", DefaultInt);
record.AddField("ShardKey",
// 获取OrderProcess.ID -> ShardKey
ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.OrderProcess_ID_ShardKey(record["OrderProcessID"])),
TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "脏数据未处理"));
break;
// OrderScrapBoard处理非空列
case TableNames.OrderScrapBoard:
ReplaceIfMyDumperNull(record, "Color", DefaultStr);
ReplaceIfMyDumperNull(record, "GoodsName", DefaultStr);
ReplaceIfMyDumperNull(record, "Material", DefaultStr);
ReplaceIfMyDumperNull(record, "MaterialName", DefaultStr);
break;
// ProcessItemExp处理非空列
case TableNames.ProcessItemExp:
ReplaceIfMyDumperNull(record, "MaxPartsID", DefaultInt);
ReplaceIfMyDumperNull(record, "ProcessGroupID", DefaultInt);
break;
// SimplePlanOrder处理非空列添加Deleted
case TableNames.SimplePlanOrder:
#if USE_TEST_DB
record.RemoveField("ProcessState");
#endif
ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime);
ReplaceIfMyDumperNull(record, "UpdateTime", DefaultDateTime);
ReplaceIfMyDumperNull(record, "CompanyID", DefaultInt);
ReplaceIfMyDumperNull(record, "SingleName", DefaultStr);
record.AddField("Deleted", "0");
break;
}
@ -531,7 +579,7 @@ async Task RunProgram()
{ "process_item_exp.ItemJson", ColumnType.Text },
{ "report_template.Template", ColumnType.Text },
{ "report_template.SourceConfig", ColumnType.Text },
{ "order_block_plan.OrderNos", ColumnType.Json },
{ "order_block_plan.OrderNos", ColumnType.Text },
{ "order_block_plan.BlockInfo", ColumnType.Text },
};
#endif
@ -552,7 +600,7 @@ async Task RunProgram()
host.Services.AddDataSourceFactory();
host.Services.AddErrorRecorderFactory();
host.Services.AddSingleton<ProcessContext>();
host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Produce);
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer);
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(500_000))).ToArray());
host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();

View File

@ -1,5 +1,6 @@
using System.Text;
using System.Text.RegularExpressions;
using MesETL.App.Const;
using MesETL.App.Helpers;
using MesETL.App.Options;
using Microsoft.Extensions.Logging;
@ -137,9 +138,10 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
// 在这里处理特殊列
#region HandleFields
if (field.Length == 2 && field == @"\N") // MyDumper NULL
if (field.Length == 2 && field == @"\N") // MyDumper导出的NULL为'\N''\'不是转义字符)
{
recordSb.Append("NULL");
recordSb.Append(ConstVar.Null);
goto Escape;
}
@ -148,14 +150,18 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
case ColumnType.Text:
if(string.IsNullOrEmpty(field))
recordSb.Append("''");
else if (field == ConstVar.Null)
recordSb.Append(ConstVar.Null);
else recordSb.Append($"_utf8mb4 0x{field}");
break;
case ColumnType.Blob:
if (string.IsNullOrEmpty(field))
recordSb.Append("''");
else if (field == ConstVar.Null)
recordSb.Append(ConstVar.Null);
else recordSb.Append($"0x{field}");
break;
case ColumnType.Json:
case ColumnType.Json:// 生产库没有JSON列仅用于测试库进行测试
if(string.IsNullOrEmpty(field))
recordSb.Append("'[]'"); // JObject or JArray?
else if (_options.Value.TreatJsonAsHex)

View File

@ -7,7 +7,7 @@
"Input":{
"InputDir": "D:\\Dump\\MockData", // Csv
"UseMock": false, // 使
"MockCountMultiplier": 0.5 //
"MockCountMultiplier": 0 //
},
"Transform":{
"StrictMode": false, // true
@ -21,7 +21,7 @@
"MaxAllowedPacket": 67108864,
"FlushCount": 10000, //
"MaxDatabaseOutputTask" : 4, //
"TreatJsonAsHex": false, // json16(0x)
"TreatJsonAsHex": false // json16(0x)json
},
"RedisCache": {
"Configuration": "192.168.1.246:6380",