Compare commits
2 Commits
5cda84797b
...
d58c9d5177
Author | SHA1 | Date | |
---|---|---|---|
d58c9d5177 | |||
719cd2d8e7 |
8
MesETL.App/Const/ConstVar.cs
Normal file
8
MesETL.App/Const/ConstVar.cs
Normal file
@ -0,0 +1,8 @@
|
||||
namespace MesETL.App.Const;
|
||||
|
||||
public static class ConstVar
|
||||
{
|
||||
public const string Producer = "Producer";
|
||||
public const string Null = "NULL";
|
||||
public const string MyDumperNull = "NULL";
|
||||
}
|
@ -1,6 +0,0 @@
|
||||
namespace MesETL.App.Const;
|
||||
|
||||
public static class ProcessStep
|
||||
{
|
||||
public const string Produce = "Producer";
|
||||
}
|
@ -5,9 +5,19 @@ namespace MesETL.App.Helpers;
|
||||
|
||||
public static class DatabaseHelper
|
||||
{
|
||||
public static MySqlConnection CreateConnection(string connStr)
|
||||
{
|
||||
var newConnStr = new MySqlConnectionStringBuilder(connStr)
|
||||
{
|
||||
ConnectionTimeout = 30,
|
||||
DefaultCommandTimeout = 0,
|
||||
}.ConnectionString;
|
||||
return new MySqlConnection(newConnStr);
|
||||
}
|
||||
|
||||
public static async Task<DataSet> QueryTableAsync(string connStr, string sql)
|
||||
{
|
||||
await using var conn = new MySqlConnection(connStr);
|
||||
await using var conn = CreateConnection(connStr);
|
||||
if(conn.State is not ConnectionState.Open)
|
||||
await conn.OpenAsync();
|
||||
await using var cmd = conn.CreateCommand();
|
||||
@ -19,7 +29,7 @@ public static class DatabaseHelper
|
||||
|
||||
public static async Task<object?> QueryScalarAsync(string connStr, string sql)
|
||||
{
|
||||
await using var conn = new MySqlConnection(connStr);
|
||||
await using var conn = CreateConnection(connStr);
|
||||
if(conn.State is not ConnectionState.Open)
|
||||
await conn.OpenAsync();
|
||||
await using var cmd = conn.CreateCommand();
|
||||
@ -29,7 +39,7 @@ public static class DatabaseHelper
|
||||
|
||||
public static async Task<int> NonQueryAsync(string connStr, string sql)
|
||||
{
|
||||
await using var conn = new MySqlConnection(connStr);
|
||||
await using var conn = CreateConnection(connStr);
|
||||
if(conn.State is not ConnectionState.Open)
|
||||
await conn.OpenAsync();
|
||||
await using var cmd = conn.CreateCommand();
|
||||
@ -39,7 +49,7 @@ public static class DatabaseHelper
|
||||
|
||||
public static async Task<int> TransactionAsync(string connStr, string sql, params MySqlParameter[] parameters)
|
||||
{
|
||||
await using var conn = new MySqlConnection(connStr);
|
||||
await using var conn = CreateConnection(connStr);
|
||||
if(conn.State is not ConnectionState.Open)
|
||||
await conn.OpenAsync();
|
||||
await using var trans = await conn.BeginTransactionAsync();
|
||||
|
@ -1,4 +1,5 @@
|
||||
using System.Text.RegularExpressions;
|
||||
using System.Text.Json;
|
||||
using System.Text.RegularExpressions;
|
||||
using ZstdSharp;
|
||||
|
||||
namespace MesETL.App.Helpers;
|
||||
@ -121,5 +122,17 @@ public static partial class DumpDataHelper
|
||||
var reader = new StreamReader(ds);
|
||||
return await reader.ReadToEndAsync();
|
||||
}
|
||||
|
||||
|
||||
public static bool IsJson(string str)
|
||||
{
|
||||
try
|
||||
{
|
||||
JsonDocument.Parse(str);
|
||||
return true;
|
||||
}
|
||||
catch (JsonException)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,5 +1,4 @@
|
||||
using MesETL.App.Const;
|
||||
using MesETL.App.HostedServices.Abstractions;
|
||||
using MesETL.App.HostedServices.Abstractions;
|
||||
using MesETL.App.Options;
|
||||
using MesETL.App.Services;
|
||||
using MesETL.App.Services.ETL;
|
||||
@ -37,7 +36,7 @@ public class FileInputService : IInputService
|
||||
public FileInputService(ILogger<FileInputService> logger,
|
||||
IOptions<DataInputOptions> dataInputOptions,
|
||||
ProcessContext context,
|
||||
[FromKeyedServices(ProcessStep.Produce)] DataRecordQueue producerQueue,
|
||||
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
|
||||
DataReaderFactory dataReaderFactory)
|
||||
{
|
||||
_logger = logger;
|
||||
|
@ -50,6 +50,7 @@ public class MainHostedService : BackgroundService
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation("Command argument detected, execute for each database");
|
||||
var command = _config["Command"];
|
||||
if (!string.IsNullOrEmpty(command))
|
||||
{
|
||||
|
@ -1,5 +1,4 @@
|
||||
using System.Diagnostics;
|
||||
using MesETL.App.Const;
|
||||
using MesETL.App.Services;
|
||||
using MesETL.App.Services.Loggers;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
@ -17,7 +16,7 @@ public class TaskMonitorService
|
||||
private readonly RecordQueuePool _queuePool;
|
||||
|
||||
public TaskMonitorService(ProcessContext context,
|
||||
[FromKeyedServices(ProcessStep.Produce)]
|
||||
[FromKeyedServices(Const.ConstVar.Producer)]
|
||||
DataRecordQueue producerQueue,
|
||||
RecordQueuePool queuePool,
|
||||
IEnumerable<ITaskMonitorLogger> monitorLoggers)
|
||||
|
@ -1,5 +1,4 @@
|
||||
using MesETL.App.Cache;
|
||||
using MesETL.App.Const;
|
||||
using MesETL.App.HostedServices.Abstractions;
|
||||
using MesETL.App.Options;
|
||||
using MesETL.App.Services;
|
||||
@ -28,7 +27,7 @@ public class TransformService : ITransformService
|
||||
|
||||
public TransformService(ILogger<TransformService> logger,
|
||||
IOptions<DataTransformOptions> options,
|
||||
[FromKeyedServices(ProcessStep.Produce)] DataRecordQueue producerQueue,
|
||||
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
|
||||
RecordQueuePool queuePool,
|
||||
ProcessContext context,
|
||||
ICacher cache,
|
||||
|
@ -1,5 +1,6 @@
|
||||
// #define USE_TEST_DB // 测试库的结构与生产库不一样,如果使用测试库运行,则加上USE_TEST_DB预处理器指令
|
||||
|
||||
using System.Text.Json;
|
||||
using MesETL.App;
|
||||
using MesETL.App.Services;
|
||||
using MesETL.App.Services.ETL;
|
||||
@ -131,7 +132,9 @@ async Task RunProgram()
|
||||
TableNames.OrderModuleExtra,
|
||||
TableNames.OrderModuleItem,
|
||||
TableNames.OrderPackage,
|
||||
#if USE_TEST_DB
|
||||
TableNames.OrderPatchDetail,
|
||||
#endif
|
||||
|
||||
TableNames.OrderProcess,
|
||||
TableNames.OrderProcessStep,
|
||||
@ -240,12 +243,14 @@ async Task RunProgram()
|
||||
return false;
|
||||
break;
|
||||
}
|
||||
// OrderBlockPlan删除CreateTime < 202301的
|
||||
// OrderBlockPlan删除CreateTime < 202301的,Json列合法检查
|
||||
case TableNames.OrderBlockPlan:
|
||||
{
|
||||
var time = DateTime.Parse(record["CreateTime"].Trim('"'));
|
||||
var time = DateTime.Parse(record["CreateTime"].Trim('"','\''));
|
||||
if (time < oldestTime)
|
||||
return false;
|
||||
|
||||
// if (!DumpDataHelper.IsJson(record["OrderNos"])) return false;
|
||||
break;
|
||||
}
|
||||
// OrderBlockPlanResult删除对应order_block_plan.ID不存在的对象
|
||||
@ -255,6 +260,12 @@ async Task RunProgram()
|
||||
return false;
|
||||
break;
|
||||
}
|
||||
// OrderDataGoods Json列合法检查
|
||||
case TableNames.OrderDataGoods:
|
||||
{
|
||||
if (!DumpDataHelper.IsJson(record["ExtraProp"])) return false;
|
||||
break;
|
||||
}
|
||||
// OrderModule删除OrderNo < 202301的
|
||||
case TableNames.OrderModule:
|
||||
{
|
||||
@ -297,7 +308,7 @@ async Task RunProgram()
|
||||
// SimplePlanOrder删除CreateTime < 202301的
|
||||
case TableNames.SimplePlanOrder:
|
||||
{
|
||||
var time = DateTime.Parse(record["CreateTime"].Trim('"'));
|
||||
var time = DateTime.Parse(record["CreateTime"].Trim('"', '\''));
|
||||
if (time < oldestTime)
|
||||
return false;
|
||||
break;
|
||||
@ -308,24 +319,49 @@ async Task RunProgram()
|
||||
};
|
||||
|
||||
// 数据替换
|
||||
/*
|
||||
* 空数据处理:
|
||||
* 某些列生产库为可空,而测试库变为了不可空,则需要根据列的类型对这些列做单独处理
|
||||
* int或任何非无符号整型 -> -1
|
||||
* varchar -> ''(空字符串)
|
||||
* datetime -> '1000-01-01'(datetime最小值)
|
||||
* text -> 0 (16进制0,MyDumper中的text是为16进制)
|
||||
*/
|
||||
const string DefaultInt = "0";
|
||||
const string DefaultStr = "''";
|
||||
const string DefaultDateTime = "'1000-01-01'";
|
||||
const string DefaultText = "0";
|
||||
|
||||
static void ReplaceIfMyDumperNull(DataRecord record, string fieldName, string replaceValue)
|
||||
{
|
||||
if (record[fieldName] is ConstVar.MyDumperNull)
|
||||
record[fieldName] = replaceValue;
|
||||
}
|
||||
|
||||
options.RecordModify = async context =>
|
||||
{
|
||||
var record = context.Record;
|
||||
var cache = context.Cacher;
|
||||
switch (record.TableName)
|
||||
{
|
||||
#if USE_TEST_DB
|
||||
// Order表移除IsBatch列
|
||||
// Machine处理非空列
|
||||
case TableNames.Machine:
|
||||
ReplaceIfMyDumperNull(record, "Name", DefaultStr);
|
||||
ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime);
|
||||
ReplaceIfMyDumperNull(record, "CreatorID", DefaultInt);
|
||||
ReplaceIfMyDumperNull(record, "EditTime", DefaultDateTime);
|
||||
ReplaceIfMyDumperNull(record, "EditorID", DefaultInt);
|
||||
ReplaceIfMyDumperNull(record, "Settings", DefaultText);
|
||||
break;
|
||||
// Order处理非空列
|
||||
case TableNames.Order:
|
||||
record.RemoveField("IsBatch");
|
||||
ReplaceIfMyDumperNull(record, "Deleted", DefaultInt);
|
||||
break;
|
||||
#endif
|
||||
//OrderBlockPlan将OrderNo长度<2的置空
|
||||
// OrderBlockPlan处理text->json列
|
||||
case TableNames.OrderBlockPlan:
|
||||
if (record["OrderNos"].Length < 2)
|
||||
record["OrderNos"] = "NULL";
|
||||
JsonDocument.Parse(record["OrderNos"]);
|
||||
break;
|
||||
// OrderBlockPlanResult表添加CompanyID列
|
||||
// OrderBlockPlanResult,添加CompanyID
|
||||
case TableNames.OrderBlockPlanResult:
|
||||
record.AddField("CompanyID",
|
||||
// 获取OrderBlockPlan.ID -> CompanyID
|
||||
@ -347,25 +383,37 @@ async Task RunProgram()
|
||||
// OrderProcess添加ShardKey列,NextStepID的空值转换为0
|
||||
case TableNames.OrderProcess:
|
||||
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
|
||||
#if USE_TEST_DB
|
||||
if(record["NextStepID"] == "\\N")
|
||||
record["NextStepID"] = "0";
|
||||
#endif
|
||||
break;
|
||||
// OrderProcessStep,OrderProcessStepItem添加ShardKey列
|
||||
// OrderProcessStep添加ShardKey
|
||||
case TableNames.OrderProcessStep:
|
||||
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
|
||||
break;
|
||||
// OrderProcessStepItem添加ShardKey列,处理非空列
|
||||
case TableNames.OrderProcessStepItem:
|
||||
ReplaceIfMyDumperNull(record, "DataID", DefaultInt);
|
||||
record.AddField("ShardKey",
|
||||
// 获取OrderProcess.ID -> ShardKey
|
||||
ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.OrderProcess_ID_ShardKey(record["OrderProcessID"])),
|
||||
TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "脏数据未处理"));
|
||||
break;
|
||||
// OrderScrapBoard处理非空列
|
||||
case TableNames.OrderScrapBoard:
|
||||
ReplaceIfMyDumperNull(record, "Color", DefaultStr);
|
||||
ReplaceIfMyDumperNull(record, "GoodsName", DefaultStr);
|
||||
ReplaceIfMyDumperNull(record, "Material", DefaultStr);
|
||||
ReplaceIfMyDumperNull(record, "MaterialName", DefaultStr);
|
||||
break;
|
||||
// ProcessItemExp处理非空列
|
||||
case TableNames.ProcessItemExp:
|
||||
ReplaceIfMyDumperNull(record, "MaxPartsID", DefaultInt);
|
||||
ReplaceIfMyDumperNull(record, "ProcessGroupID", DefaultInt);
|
||||
break;
|
||||
// SimplePlanOrder处理非空列,添加Deleted
|
||||
case TableNames.SimplePlanOrder:
|
||||
#if USE_TEST_DB
|
||||
record.RemoveField("ProcessState");
|
||||
#endif
|
||||
ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime);
|
||||
ReplaceIfMyDumperNull(record, "UpdateTime", DefaultDateTime);
|
||||
ReplaceIfMyDumperNull(record, "CompanyID", DefaultInt);
|
||||
ReplaceIfMyDumperNull(record, "SingleName", DefaultStr);
|
||||
record.AddField("Deleted", "0");
|
||||
break;
|
||||
}
|
||||
@ -531,7 +579,7 @@ async Task RunProgram()
|
||||
{ "process_item_exp.ItemJson", ColumnType.Text },
|
||||
{ "report_template.Template", ColumnType.Text },
|
||||
{ "report_template.SourceConfig", ColumnType.Text },
|
||||
{ "order_block_plan.OrderNos", ColumnType.Json },
|
||||
{ "order_block_plan.OrderNos", ColumnType.Text },
|
||||
{ "order_block_plan.BlockInfo", ColumnType.Text },
|
||||
};
|
||||
#endif
|
||||
@ -552,7 +600,7 @@ async Task RunProgram()
|
||||
host.Services.AddDataSourceFactory();
|
||||
host.Services.AddErrorRecorderFactory();
|
||||
host.Services.AddSingleton<ProcessContext>();
|
||||
host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Produce);
|
||||
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer);
|
||||
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(500_000))).ToArray());
|
||||
host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
|
||||
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();
|
||||
|
@ -1,5 +1,6 @@
|
||||
using System.Text;
|
||||
using System.Text.RegularExpressions;
|
||||
using MesETL.App.Const;
|
||||
using MesETL.App.Helpers;
|
||||
using MesETL.App.Options;
|
||||
using Microsoft.Extensions.Logging;
|
||||
@ -137,9 +138,10 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
||||
|
||||
// 在这里处理特殊列
|
||||
#region HandleFields
|
||||
if (field.Length == 2 && field == @"\N") // MyDumper NULL
|
||||
|
||||
if (field.Length == 2 && field == @"\N") // MyDumper导出的NULL为'\N'('\'不是转义字符)
|
||||
{
|
||||
recordSb.Append("NULL");
|
||||
recordSb.Append(ConstVar.Null);
|
||||
goto Escape;
|
||||
}
|
||||
|
||||
@ -148,14 +150,18 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
||||
case ColumnType.Text:
|
||||
if(string.IsNullOrEmpty(field))
|
||||
recordSb.Append("''");
|
||||
else if (field == ConstVar.Null)
|
||||
recordSb.Append(ConstVar.Null);
|
||||
else recordSb.Append($"_utf8mb4 0x{field}");
|
||||
break;
|
||||
case ColumnType.Blob:
|
||||
if (string.IsNullOrEmpty(field))
|
||||
recordSb.Append("''");
|
||||
else if (field == ConstVar.Null)
|
||||
recordSb.Append(ConstVar.Null);
|
||||
else recordSb.Append($"0x{field}");
|
||||
break;
|
||||
case ColumnType.Json:
|
||||
case ColumnType.Json:// 生产库没有JSON列,仅用于测试库进行测试
|
||||
if(string.IsNullOrEmpty(field))
|
||||
recordSb.Append("'[]'"); // JObject or JArray?
|
||||
else if (_options.Value.TreatJsonAsHex)
|
||||
|
@ -7,7 +7,7 @@
|
||||
"Input":{
|
||||
"InputDir": "D:\\Dump\\MockData", // Csv数据输入目录
|
||||
"UseMock": false, // 使用模拟数据进行测试
|
||||
"MockCountMultiplier": 0.5 // 模拟数据量级的乘数
|
||||
"MockCountMultiplier": 0 // 模拟数据量级的乘数
|
||||
},
|
||||
"Transform":{
|
||||
"StrictMode": false, // 设为true时如果数据转换发生错误,立刻停止程序
|
||||
@ -21,7 +21,7 @@
|
||||
"MaxAllowedPacket": 67108864,
|
||||
"FlushCount": 10000, // 每次提交记录条数
|
||||
"MaxDatabaseOutputTask" : 4, // 每个数据库最大提交任务数
|
||||
"TreatJsonAsHex": false, // 将json列作为16进制格式输出(0x前缀)
|
||||
"TreatJsonAsHex": false // 将json列作为16进制格式输出(0x前缀),生产库是没有json列的
|
||||
},
|
||||
"RedisCache": {
|
||||
"Configuration": "192.168.1.246:6380",
|
||||
|
Loading…
Reference in New Issue
Block a user