新增非法字段检查

This commit is contained in:
陈梓阳 2024-02-06 15:37:21 +08:00
parent 719cd2d8e7
commit d58c9d5177
8 changed files with 102 additions and 31 deletions

View File

@ -0,0 +1,8 @@
namespace MesETL.App.Const;
public static class ConstVar
{
public const string Producer = "Producer";
public const string Null = "NULL";
public const string MyDumperNull = "NULL";
}

View File

@ -1,6 +0,0 @@
namespace MesETL.App.Const;
public static class ProcessStep
{
public const string Produce = "Producer";
}

View File

@ -1,4 +1,5 @@
using System.Text.RegularExpressions;
using System.Text.Json;
using System.Text.RegularExpressions;
using ZstdSharp;
namespace MesETL.App.Helpers;
@ -121,5 +122,17 @@ public static partial class DumpDataHelper
var reader = new StreamReader(ds);
return await reader.ReadToEndAsync();
}
public static bool IsJson(string str)
{
try
{
JsonDocument.Parse(str);
return true;
}
catch (JsonException)
{
return false;
}
}
}

View File

@ -1,5 +1,4 @@
using MesETL.App.Const;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
using MesETL.App.Services.ETL;
@ -37,7 +36,7 @@ public class FileInputService : IInputService
public FileInputService(ILogger<FileInputService> logger,
IOptions<DataInputOptions> dataInputOptions,
ProcessContext context,
[FromKeyedServices(ProcessStep.Produce)] DataRecordQueue producerQueue,
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
DataReaderFactory dataReaderFactory)
{
_logger = logger;

View File

@ -1,5 +1,4 @@
using System.Diagnostics;
using MesETL.App.Const;
using MesETL.App.Services;
using MesETL.App.Services.Loggers;
using Microsoft.Extensions.DependencyInjection;
@ -17,7 +16,7 @@ public class TaskMonitorService
private readonly RecordQueuePool _queuePool;
public TaskMonitorService(ProcessContext context,
[FromKeyedServices(ProcessStep.Produce)]
[FromKeyedServices(Const.ConstVar.Producer)]
DataRecordQueue producerQueue,
RecordQueuePool queuePool,
IEnumerable<ITaskMonitorLogger> monitorLoggers)

View File

@ -1,5 +1,4 @@
using MesETL.App.Cache;
using MesETL.App.Const;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
@ -28,7 +27,7 @@ public class TransformService : ITransformService
public TransformService(ILogger<TransformService> logger,
IOptions<DataTransformOptions> options,
[FromKeyedServices(ProcessStep.Produce)] DataRecordQueue producerQueue,
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
RecordQueuePool queuePool,
ProcessContext context,
ICacher cache,

View File

@ -1,5 +1,6 @@
// #define USE_TEST_DB // 测试库的结构与生产库不一样如果使用测试库运行则加上USE_TEST_DB预处理器指令
using System.Text.Json;
using MesETL.App;
using MesETL.App.Services;
using MesETL.App.Services.ETL;
@ -242,12 +243,14 @@ async Task RunProgram()
return false;
break;
}
// OrderBlockPlan删除CreateTime < 202301的
// OrderBlockPlan删除CreateTime < 202301的Json列合法检查
case TableNames.OrderBlockPlan:
{
var time = DateTime.Parse(record["CreateTime"].Trim('"'));
var time = DateTime.Parse(record["CreateTime"].Trim('"','\''));
if (time < oldestTime)
return false;
// if (!DumpDataHelper.IsJson(record["OrderNos"])) return false;
break;
}
// OrderBlockPlanResult删除对应order_block_plan.ID不存在的对象
@ -257,6 +260,12 @@ async Task RunProgram()
return false;
break;
}
// OrderDataGoods Json列合法检查
case TableNames.OrderDataGoods:
{
if (!DumpDataHelper.IsJson(record["ExtraProp"])) return false;
break;
}
// OrderModule删除OrderNo < 202301的
case TableNames.OrderModule:
{
@ -299,7 +308,7 @@ async Task RunProgram()
// SimplePlanOrder删除CreateTime < 202301的
case TableNames.SimplePlanOrder:
{
var time = DateTime.Parse(record["CreateTime"].Trim('"'));
var time = DateTime.Parse(record["CreateTime"].Trim('"', '\''));
if (time < oldestTime)
return false;
break;
@ -310,18 +319,49 @@ async Task RunProgram()
};
// 数据替换
/*
*
*
* int或任何非无符号整型 -> -1
* varchar -> ''
* datetime -> '1000-01-01'datetime最小值
* text -> 0 160MyDumper中的text是为16进制
*/
const string DefaultInt = "0";
const string DefaultStr = "''";
const string DefaultDateTime = "'1000-01-01'";
const string DefaultText = "0";
static void ReplaceIfMyDumperNull(DataRecord record, string fieldName, string replaceValue)
{
if (record[fieldName] is ConstVar.MyDumperNull)
record[fieldName] = replaceValue;
}
options.RecordModify = async context =>
{
var record = context.Record;
var cache = context.Cacher;
switch (record.TableName)
{
//OrderBlockPlan将OrderNo长度<2的置空
case TableNames.OrderBlockPlan:
if (record["OrderNos"].Length < 2)
record["OrderNos"] = "NULL";
// Machine处理非空列
case TableNames.Machine:
ReplaceIfMyDumperNull(record, "Name", DefaultStr);
ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime);
ReplaceIfMyDumperNull(record, "CreatorID", DefaultInt);
ReplaceIfMyDumperNull(record, "EditTime", DefaultDateTime);
ReplaceIfMyDumperNull(record, "EditorID", DefaultInt);
ReplaceIfMyDumperNull(record, "Settings", DefaultText);
break;
// OrderBlockPlanResult表添加CompanyID列
// Order处理非空列
case TableNames.Order:
ReplaceIfMyDumperNull(record, "Deleted", DefaultInt);
break;
// OrderBlockPlan处理text->json列
case TableNames.OrderBlockPlan:
JsonDocument.Parse(record["OrderNos"]);
break;
// OrderBlockPlanResult添加CompanyID
case TableNames.OrderBlockPlanResult:
record.AddField("CompanyID",
// 获取OrderBlockPlan.ID -> CompanyID
@ -344,17 +384,36 @@ async Task RunProgram()
case TableNames.OrderProcess:
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
break;
// OrderProcessStepOrderProcessStepItem添加ShardKey
// OrderProcessStep添加ShardKey
case TableNames.OrderProcessStep:
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
break;
// OrderProcessStepItem添加ShardKey列处理非空列
case TableNames.OrderProcessStepItem:
ReplaceIfMyDumperNull(record, "DataID", DefaultInt);
record.AddField("ShardKey",
// 获取OrderProcess.ID -> ShardKey
ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.OrderProcess_ID_ShardKey(record["OrderProcessID"])),
TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "脏数据未处理"));
break;
// OrderScrapBoard处理非空列
case TableNames.OrderScrapBoard:
ReplaceIfMyDumperNull(record, "Color", DefaultStr);
ReplaceIfMyDumperNull(record, "GoodsName", DefaultStr);
ReplaceIfMyDumperNull(record, "Material", DefaultStr);
ReplaceIfMyDumperNull(record, "MaterialName", DefaultStr);
break;
// ProcessItemExp处理非空列
case TableNames.ProcessItemExp:
ReplaceIfMyDumperNull(record, "MaxPartsID", DefaultInt);
ReplaceIfMyDumperNull(record, "ProcessGroupID", DefaultInt);
break;
// SimplePlanOrder处理非空列添加Deleted
case TableNames.SimplePlanOrder:
ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime);
ReplaceIfMyDumperNull(record, "UpdateTime", DefaultDateTime);
ReplaceIfMyDumperNull(record, "CompanyID", DefaultInt);
ReplaceIfMyDumperNull(record, "SingleName", DefaultStr);
record.AddField("Deleted", "0");
break;
}
@ -541,7 +600,7 @@ async Task RunProgram()
host.Services.AddDataSourceFactory();
host.Services.AddErrorRecorderFactory();
host.Services.AddSingleton<ProcessContext>();
host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Produce);
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer);
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(500_000))).ToArray());
host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();

View File

@ -1,5 +1,6 @@
using System.Text;
using System.Text.RegularExpressions;
using MesETL.App.Const;
using MesETL.App.Helpers;
using MesETL.App.Options;
using Microsoft.Extensions.Logging;
@ -138,10 +139,9 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
// 在这里处理特殊列
#region HandleFields
const string NULL = "NULL";
if (field.Length == 2 && field == @"\N") // MyDumper导出的NULL为'\N''\'不是转义字符)
{
recordSb.Append(NULL);
recordSb.Append(ConstVar.Null);
goto Escape;
}
@ -150,15 +150,15 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
case ColumnType.Text:
if(string.IsNullOrEmpty(field))
recordSb.Append("''");
else if (field == NULL)
recordSb.Append(NULL);
else if (field == ConstVar.Null)
recordSb.Append(ConstVar.Null);
else recordSb.Append($"_utf8mb4 0x{field}");
break;
case ColumnType.Blob:
if (string.IsNullOrEmpty(field))
recordSb.Append("''");
else if (field == NULL)
recordSb.Append(NULL);
else if (field == ConstVar.Null)
recordSb.Append(ConstVar.Null);
else recordSb.Append($"0x{field}");
break;
case ColumnType.Json:// 生产库没有JSON列仅用于测试库进行测试