Compare commits
2 Commits
5cda84797b
...
d58c9d5177
Author | SHA1 | Date | |
---|---|---|---|
d58c9d5177 | |||
719cd2d8e7 |
8
MesETL.App/Const/ConstVar.cs
Normal file
8
MesETL.App/Const/ConstVar.cs
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
namespace MesETL.App.Const;
|
||||||
|
|
||||||
|
public static class ConstVar
|
||||||
|
{
|
||||||
|
public const string Producer = "Producer";
|
||||||
|
public const string Null = "NULL";
|
||||||
|
public const string MyDumperNull = "NULL";
|
||||||
|
}
|
@ -1,6 +0,0 @@
|
|||||||
namespace MesETL.App.Const;
|
|
||||||
|
|
||||||
public static class ProcessStep
|
|
||||||
{
|
|
||||||
public const string Produce = "Producer";
|
|
||||||
}
|
|
@ -5,9 +5,19 @@ namespace MesETL.App.Helpers;
|
|||||||
|
|
||||||
public static class DatabaseHelper
|
public static class DatabaseHelper
|
||||||
{
|
{
|
||||||
|
public static MySqlConnection CreateConnection(string connStr)
|
||||||
|
{
|
||||||
|
var newConnStr = new MySqlConnectionStringBuilder(connStr)
|
||||||
|
{
|
||||||
|
ConnectionTimeout = 30,
|
||||||
|
DefaultCommandTimeout = 0,
|
||||||
|
}.ConnectionString;
|
||||||
|
return new MySqlConnection(newConnStr);
|
||||||
|
}
|
||||||
|
|
||||||
public static async Task<DataSet> QueryTableAsync(string connStr, string sql)
|
public static async Task<DataSet> QueryTableAsync(string connStr, string sql)
|
||||||
{
|
{
|
||||||
await using var conn = new MySqlConnection(connStr);
|
await using var conn = CreateConnection(connStr);
|
||||||
if(conn.State is not ConnectionState.Open)
|
if(conn.State is not ConnectionState.Open)
|
||||||
await conn.OpenAsync();
|
await conn.OpenAsync();
|
||||||
await using var cmd = conn.CreateCommand();
|
await using var cmd = conn.CreateCommand();
|
||||||
@ -19,7 +29,7 @@ public static class DatabaseHelper
|
|||||||
|
|
||||||
public static async Task<object?> QueryScalarAsync(string connStr, string sql)
|
public static async Task<object?> QueryScalarAsync(string connStr, string sql)
|
||||||
{
|
{
|
||||||
await using var conn = new MySqlConnection(connStr);
|
await using var conn = CreateConnection(connStr);
|
||||||
if(conn.State is not ConnectionState.Open)
|
if(conn.State is not ConnectionState.Open)
|
||||||
await conn.OpenAsync();
|
await conn.OpenAsync();
|
||||||
await using var cmd = conn.CreateCommand();
|
await using var cmd = conn.CreateCommand();
|
||||||
@ -29,7 +39,7 @@ public static class DatabaseHelper
|
|||||||
|
|
||||||
public static async Task<int> NonQueryAsync(string connStr, string sql)
|
public static async Task<int> NonQueryAsync(string connStr, string sql)
|
||||||
{
|
{
|
||||||
await using var conn = new MySqlConnection(connStr);
|
await using var conn = CreateConnection(connStr);
|
||||||
if(conn.State is not ConnectionState.Open)
|
if(conn.State is not ConnectionState.Open)
|
||||||
await conn.OpenAsync();
|
await conn.OpenAsync();
|
||||||
await using var cmd = conn.CreateCommand();
|
await using var cmd = conn.CreateCommand();
|
||||||
@ -39,7 +49,7 @@ public static class DatabaseHelper
|
|||||||
|
|
||||||
public static async Task<int> TransactionAsync(string connStr, string sql, params MySqlParameter[] parameters)
|
public static async Task<int> TransactionAsync(string connStr, string sql, params MySqlParameter[] parameters)
|
||||||
{
|
{
|
||||||
await using var conn = new MySqlConnection(connStr);
|
await using var conn = CreateConnection(connStr);
|
||||||
if(conn.State is not ConnectionState.Open)
|
if(conn.State is not ConnectionState.Open)
|
||||||
await conn.OpenAsync();
|
await conn.OpenAsync();
|
||||||
await using var trans = await conn.BeginTransactionAsync();
|
await using var trans = await conn.BeginTransactionAsync();
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
using System.Text.RegularExpressions;
|
using System.Text.Json;
|
||||||
|
using System.Text.RegularExpressions;
|
||||||
using ZstdSharp;
|
using ZstdSharp;
|
||||||
|
|
||||||
namespace MesETL.App.Helpers;
|
namespace MesETL.App.Helpers;
|
||||||
@ -121,5 +122,17 @@ public static partial class DumpDataHelper
|
|||||||
var reader = new StreamReader(ds);
|
var reader = new StreamReader(ds);
|
||||||
return await reader.ReadToEndAsync();
|
return await reader.ReadToEndAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static bool IsJson(string str)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
JsonDocument.Parse(str);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
catch (JsonException)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
@ -1,5 +1,4 @@
|
|||||||
using MesETL.App.Const;
|
using MesETL.App.HostedServices.Abstractions;
|
||||||
using MesETL.App.HostedServices.Abstractions;
|
|
||||||
using MesETL.App.Options;
|
using MesETL.App.Options;
|
||||||
using MesETL.App.Services;
|
using MesETL.App.Services;
|
||||||
using MesETL.App.Services.ETL;
|
using MesETL.App.Services.ETL;
|
||||||
@ -37,7 +36,7 @@ public class FileInputService : IInputService
|
|||||||
public FileInputService(ILogger<FileInputService> logger,
|
public FileInputService(ILogger<FileInputService> logger,
|
||||||
IOptions<DataInputOptions> dataInputOptions,
|
IOptions<DataInputOptions> dataInputOptions,
|
||||||
ProcessContext context,
|
ProcessContext context,
|
||||||
[FromKeyedServices(ProcessStep.Produce)] DataRecordQueue producerQueue,
|
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
|
||||||
DataReaderFactory dataReaderFactory)
|
DataReaderFactory dataReaderFactory)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
|
@ -50,6 +50,7 @@ public class MainHostedService : BackgroundService
|
|||||||
|
|
||||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
{
|
{
|
||||||
|
_logger.LogInformation("Command argument detected, execute for each database");
|
||||||
var command = _config["Command"];
|
var command = _config["Command"];
|
||||||
if (!string.IsNullOrEmpty(command))
|
if (!string.IsNullOrEmpty(command))
|
||||||
{
|
{
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using MesETL.App.Const;
|
|
||||||
using MesETL.App.Services;
|
using MesETL.App.Services;
|
||||||
using MesETL.App.Services.Loggers;
|
using MesETL.App.Services.Loggers;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
@ -17,7 +16,7 @@ public class TaskMonitorService
|
|||||||
private readonly RecordQueuePool _queuePool;
|
private readonly RecordQueuePool _queuePool;
|
||||||
|
|
||||||
public TaskMonitorService(ProcessContext context,
|
public TaskMonitorService(ProcessContext context,
|
||||||
[FromKeyedServices(ProcessStep.Produce)]
|
[FromKeyedServices(Const.ConstVar.Producer)]
|
||||||
DataRecordQueue producerQueue,
|
DataRecordQueue producerQueue,
|
||||||
RecordQueuePool queuePool,
|
RecordQueuePool queuePool,
|
||||||
IEnumerable<ITaskMonitorLogger> monitorLoggers)
|
IEnumerable<ITaskMonitorLogger> monitorLoggers)
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
using MesETL.App.Cache;
|
using MesETL.App.Cache;
|
||||||
using MesETL.App.Const;
|
|
||||||
using MesETL.App.HostedServices.Abstractions;
|
using MesETL.App.HostedServices.Abstractions;
|
||||||
using MesETL.App.Options;
|
using MesETL.App.Options;
|
||||||
using MesETL.App.Services;
|
using MesETL.App.Services;
|
||||||
@ -28,7 +27,7 @@ public class TransformService : ITransformService
|
|||||||
|
|
||||||
public TransformService(ILogger<TransformService> logger,
|
public TransformService(ILogger<TransformService> logger,
|
||||||
IOptions<DataTransformOptions> options,
|
IOptions<DataTransformOptions> options,
|
||||||
[FromKeyedServices(ProcessStep.Produce)] DataRecordQueue producerQueue,
|
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
|
||||||
RecordQueuePool queuePool,
|
RecordQueuePool queuePool,
|
||||||
ProcessContext context,
|
ProcessContext context,
|
||||||
ICacher cache,
|
ICacher cache,
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
// #define USE_TEST_DB // 测试库的结构与生产库不一样,如果使用测试库运行,则加上USE_TEST_DB预处理器指令
|
// #define USE_TEST_DB // 测试库的结构与生产库不一样,如果使用测试库运行,则加上USE_TEST_DB预处理器指令
|
||||||
|
|
||||||
|
using System.Text.Json;
|
||||||
using MesETL.App;
|
using MesETL.App;
|
||||||
using MesETL.App.Services;
|
using MesETL.App.Services;
|
||||||
using MesETL.App.Services.ETL;
|
using MesETL.App.Services.ETL;
|
||||||
@ -131,7 +132,9 @@ async Task RunProgram()
|
|||||||
TableNames.OrderModuleExtra,
|
TableNames.OrderModuleExtra,
|
||||||
TableNames.OrderModuleItem,
|
TableNames.OrderModuleItem,
|
||||||
TableNames.OrderPackage,
|
TableNames.OrderPackage,
|
||||||
|
#if USE_TEST_DB
|
||||||
TableNames.OrderPatchDetail,
|
TableNames.OrderPatchDetail,
|
||||||
|
#endif
|
||||||
|
|
||||||
TableNames.OrderProcess,
|
TableNames.OrderProcess,
|
||||||
TableNames.OrderProcessStep,
|
TableNames.OrderProcessStep,
|
||||||
@ -240,12 +243,14 @@ async Task RunProgram()
|
|||||||
return false;
|
return false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// OrderBlockPlan删除CreateTime < 202301的
|
// OrderBlockPlan删除CreateTime < 202301的,Json列合法检查
|
||||||
case TableNames.OrderBlockPlan:
|
case TableNames.OrderBlockPlan:
|
||||||
{
|
{
|
||||||
var time = DateTime.Parse(record["CreateTime"].Trim('"'));
|
var time = DateTime.Parse(record["CreateTime"].Trim('"','\''));
|
||||||
if (time < oldestTime)
|
if (time < oldestTime)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
// if (!DumpDataHelper.IsJson(record["OrderNos"])) return false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// OrderBlockPlanResult删除对应order_block_plan.ID不存在的对象
|
// OrderBlockPlanResult删除对应order_block_plan.ID不存在的对象
|
||||||
@ -255,6 +260,12 @@ async Task RunProgram()
|
|||||||
return false;
|
return false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
// OrderDataGoods Json列合法检查
|
||||||
|
case TableNames.OrderDataGoods:
|
||||||
|
{
|
||||||
|
if (!DumpDataHelper.IsJson(record["ExtraProp"])) return false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
// OrderModule删除OrderNo < 202301的
|
// OrderModule删除OrderNo < 202301的
|
||||||
case TableNames.OrderModule:
|
case TableNames.OrderModule:
|
||||||
{
|
{
|
||||||
@ -297,7 +308,7 @@ async Task RunProgram()
|
|||||||
// SimplePlanOrder删除CreateTime < 202301的
|
// SimplePlanOrder删除CreateTime < 202301的
|
||||||
case TableNames.SimplePlanOrder:
|
case TableNames.SimplePlanOrder:
|
||||||
{
|
{
|
||||||
var time = DateTime.Parse(record["CreateTime"].Trim('"'));
|
var time = DateTime.Parse(record["CreateTime"].Trim('"', '\''));
|
||||||
if (time < oldestTime)
|
if (time < oldestTime)
|
||||||
return false;
|
return false;
|
||||||
break;
|
break;
|
||||||
@ -308,24 +319,49 @@ async Task RunProgram()
|
|||||||
};
|
};
|
||||||
|
|
||||||
// 数据替换
|
// 数据替换
|
||||||
|
/*
|
||||||
|
* 空数据处理:
|
||||||
|
* 某些列生产库为可空,而测试库变为了不可空,则需要根据列的类型对这些列做单独处理
|
||||||
|
* int或任何非无符号整型 -> -1
|
||||||
|
* varchar -> ''(空字符串)
|
||||||
|
* datetime -> '1000-01-01'(datetime最小值)
|
||||||
|
* text -> 0 (16进制0,MyDumper中的text是为16进制)
|
||||||
|
*/
|
||||||
|
const string DefaultInt = "0";
|
||||||
|
const string DefaultStr = "''";
|
||||||
|
const string DefaultDateTime = "'1000-01-01'";
|
||||||
|
const string DefaultText = "0";
|
||||||
|
|
||||||
|
static void ReplaceIfMyDumperNull(DataRecord record, string fieldName, string replaceValue)
|
||||||
|
{
|
||||||
|
if (record[fieldName] is ConstVar.MyDumperNull)
|
||||||
|
record[fieldName] = replaceValue;
|
||||||
|
}
|
||||||
|
|
||||||
options.RecordModify = async context =>
|
options.RecordModify = async context =>
|
||||||
{
|
{
|
||||||
var record = context.Record;
|
var record = context.Record;
|
||||||
var cache = context.Cacher;
|
var cache = context.Cacher;
|
||||||
switch (record.TableName)
|
switch (record.TableName)
|
||||||
{
|
{
|
||||||
#if USE_TEST_DB
|
// Machine处理非空列
|
||||||
// Order表移除IsBatch列
|
case TableNames.Machine:
|
||||||
|
ReplaceIfMyDumperNull(record, "Name", DefaultStr);
|
||||||
|
ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime);
|
||||||
|
ReplaceIfMyDumperNull(record, "CreatorID", DefaultInt);
|
||||||
|
ReplaceIfMyDumperNull(record, "EditTime", DefaultDateTime);
|
||||||
|
ReplaceIfMyDumperNull(record, "EditorID", DefaultInt);
|
||||||
|
ReplaceIfMyDumperNull(record, "Settings", DefaultText);
|
||||||
|
break;
|
||||||
|
// Order处理非空列
|
||||||
case TableNames.Order:
|
case TableNames.Order:
|
||||||
record.RemoveField("IsBatch");
|
ReplaceIfMyDumperNull(record, "Deleted", DefaultInt);
|
||||||
break;
|
break;
|
||||||
#endif
|
// OrderBlockPlan处理text->json列
|
||||||
//OrderBlockPlan将OrderNo长度<2的置空
|
|
||||||
case TableNames.OrderBlockPlan:
|
case TableNames.OrderBlockPlan:
|
||||||
if (record["OrderNos"].Length < 2)
|
JsonDocument.Parse(record["OrderNos"]);
|
||||||
record["OrderNos"] = "NULL";
|
|
||||||
break;
|
break;
|
||||||
// OrderBlockPlanResult表添加CompanyID列
|
// OrderBlockPlanResult,添加CompanyID
|
||||||
case TableNames.OrderBlockPlanResult:
|
case TableNames.OrderBlockPlanResult:
|
||||||
record.AddField("CompanyID",
|
record.AddField("CompanyID",
|
||||||
// 获取OrderBlockPlan.ID -> CompanyID
|
// 获取OrderBlockPlan.ID -> CompanyID
|
||||||
@ -347,25 +383,37 @@ async Task RunProgram()
|
|||||||
// OrderProcess添加ShardKey列,NextStepID的空值转换为0
|
// OrderProcess添加ShardKey列,NextStepID的空值转换为0
|
||||||
case TableNames.OrderProcess:
|
case TableNames.OrderProcess:
|
||||||
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
|
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
|
||||||
#if USE_TEST_DB
|
|
||||||
if(record["NextStepID"] == "\\N")
|
|
||||||
record["NextStepID"] = "0";
|
|
||||||
#endif
|
|
||||||
break;
|
break;
|
||||||
// OrderProcessStep,OrderProcessStepItem添加ShardKey列
|
// OrderProcessStep添加ShardKey
|
||||||
case TableNames.OrderProcessStep:
|
case TableNames.OrderProcessStep:
|
||||||
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
|
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
|
||||||
break;
|
break;
|
||||||
|
// OrderProcessStepItem添加ShardKey列,处理非空列
|
||||||
case TableNames.OrderProcessStepItem:
|
case TableNames.OrderProcessStepItem:
|
||||||
|
ReplaceIfMyDumperNull(record, "DataID", DefaultInt);
|
||||||
record.AddField("ShardKey",
|
record.AddField("ShardKey",
|
||||||
// 获取OrderProcess.ID -> ShardKey
|
// 获取OrderProcess.ID -> ShardKey
|
||||||
ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.OrderProcess_ID_ShardKey(record["OrderProcessID"])),
|
ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.OrderProcess_ID_ShardKey(record["OrderProcessID"])),
|
||||||
TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "脏数据未处理"));
|
TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "脏数据未处理"));
|
||||||
break;
|
break;
|
||||||
|
// OrderScrapBoard处理非空列
|
||||||
|
case TableNames.OrderScrapBoard:
|
||||||
|
ReplaceIfMyDumperNull(record, "Color", DefaultStr);
|
||||||
|
ReplaceIfMyDumperNull(record, "GoodsName", DefaultStr);
|
||||||
|
ReplaceIfMyDumperNull(record, "Material", DefaultStr);
|
||||||
|
ReplaceIfMyDumperNull(record, "MaterialName", DefaultStr);
|
||||||
|
break;
|
||||||
|
// ProcessItemExp处理非空列
|
||||||
|
case TableNames.ProcessItemExp:
|
||||||
|
ReplaceIfMyDumperNull(record, "MaxPartsID", DefaultInt);
|
||||||
|
ReplaceIfMyDumperNull(record, "ProcessGroupID", DefaultInt);
|
||||||
|
break;
|
||||||
|
// SimplePlanOrder处理非空列,添加Deleted
|
||||||
case TableNames.SimplePlanOrder:
|
case TableNames.SimplePlanOrder:
|
||||||
#if USE_TEST_DB
|
ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime);
|
||||||
record.RemoveField("ProcessState");
|
ReplaceIfMyDumperNull(record, "UpdateTime", DefaultDateTime);
|
||||||
#endif
|
ReplaceIfMyDumperNull(record, "CompanyID", DefaultInt);
|
||||||
|
ReplaceIfMyDumperNull(record, "SingleName", DefaultStr);
|
||||||
record.AddField("Deleted", "0");
|
record.AddField("Deleted", "0");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -531,7 +579,7 @@ async Task RunProgram()
|
|||||||
{ "process_item_exp.ItemJson", ColumnType.Text },
|
{ "process_item_exp.ItemJson", ColumnType.Text },
|
||||||
{ "report_template.Template", ColumnType.Text },
|
{ "report_template.Template", ColumnType.Text },
|
||||||
{ "report_template.SourceConfig", ColumnType.Text },
|
{ "report_template.SourceConfig", ColumnType.Text },
|
||||||
{ "order_block_plan.OrderNos", ColumnType.Json },
|
{ "order_block_plan.OrderNos", ColumnType.Text },
|
||||||
{ "order_block_plan.BlockInfo", ColumnType.Text },
|
{ "order_block_plan.BlockInfo", ColumnType.Text },
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
@ -552,7 +600,7 @@ async Task RunProgram()
|
|||||||
host.Services.AddDataSourceFactory();
|
host.Services.AddDataSourceFactory();
|
||||||
host.Services.AddErrorRecorderFactory();
|
host.Services.AddErrorRecorderFactory();
|
||||||
host.Services.AddSingleton<ProcessContext>();
|
host.Services.AddSingleton<ProcessContext>();
|
||||||
host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Produce);
|
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer);
|
||||||
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(500_000))).ToArray());
|
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(500_000))).ToArray());
|
||||||
host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
|
host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
|
||||||
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();
|
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Text.RegularExpressions;
|
using System.Text.RegularExpressions;
|
||||||
|
using MesETL.App.Const;
|
||||||
using MesETL.App.Helpers;
|
using MesETL.App.Helpers;
|
||||||
using MesETL.App.Options;
|
using MesETL.App.Options;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
@ -137,9 +138,10 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
|||||||
|
|
||||||
// 在这里处理特殊列
|
// 在这里处理特殊列
|
||||||
#region HandleFields
|
#region HandleFields
|
||||||
if (field.Length == 2 && field == @"\N") // MyDumper NULL
|
|
||||||
|
if (field.Length == 2 && field == @"\N") // MyDumper导出的NULL为'\N'('\'不是转义字符)
|
||||||
{
|
{
|
||||||
recordSb.Append("NULL");
|
recordSb.Append(ConstVar.Null);
|
||||||
goto Escape;
|
goto Escape;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -148,14 +150,18 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
|||||||
case ColumnType.Text:
|
case ColumnType.Text:
|
||||||
if(string.IsNullOrEmpty(field))
|
if(string.IsNullOrEmpty(field))
|
||||||
recordSb.Append("''");
|
recordSb.Append("''");
|
||||||
|
else if (field == ConstVar.Null)
|
||||||
|
recordSb.Append(ConstVar.Null);
|
||||||
else recordSb.Append($"_utf8mb4 0x{field}");
|
else recordSb.Append($"_utf8mb4 0x{field}");
|
||||||
break;
|
break;
|
||||||
case ColumnType.Blob:
|
case ColumnType.Blob:
|
||||||
if (string.IsNullOrEmpty(field))
|
if (string.IsNullOrEmpty(field))
|
||||||
recordSb.Append("''");
|
recordSb.Append("''");
|
||||||
|
else if (field == ConstVar.Null)
|
||||||
|
recordSb.Append(ConstVar.Null);
|
||||||
else recordSb.Append($"0x{field}");
|
else recordSb.Append($"0x{field}");
|
||||||
break;
|
break;
|
||||||
case ColumnType.Json:
|
case ColumnType.Json:// 生产库没有JSON列,仅用于测试库进行测试
|
||||||
if(string.IsNullOrEmpty(field))
|
if(string.IsNullOrEmpty(field))
|
||||||
recordSb.Append("'[]'"); // JObject or JArray?
|
recordSb.Append("'[]'"); // JObject or JArray?
|
||||||
else if (_options.Value.TreatJsonAsHex)
|
else if (_options.Value.TreatJsonAsHex)
|
||||||
|
@ -7,7 +7,7 @@
|
|||||||
"Input":{
|
"Input":{
|
||||||
"InputDir": "D:\\Dump\\MockData", // Csv数据输入目录
|
"InputDir": "D:\\Dump\\MockData", // Csv数据输入目录
|
||||||
"UseMock": false, // 使用模拟数据进行测试
|
"UseMock": false, // 使用模拟数据进行测试
|
||||||
"MockCountMultiplier": 0.5 // 模拟数据量级的乘数
|
"MockCountMultiplier": 0 // 模拟数据量级的乘数
|
||||||
},
|
},
|
||||||
"Transform":{
|
"Transform":{
|
||||||
"StrictMode": false, // 设为true时如果数据转换发生错误,立刻停止程序
|
"StrictMode": false, // 设为true时如果数据转换发生错误,立刻停止程序
|
||||||
@ -21,7 +21,7 @@
|
|||||||
"MaxAllowedPacket": 67108864,
|
"MaxAllowedPacket": 67108864,
|
||||||
"FlushCount": 10000, // 每次提交记录条数
|
"FlushCount": 10000, // 每次提交记录条数
|
||||||
"MaxDatabaseOutputTask" : 4, // 每个数据库最大提交任务数
|
"MaxDatabaseOutputTask" : 4, // 每个数据库最大提交任务数
|
||||||
"TreatJsonAsHex": false, // 将json列作为16进制格式输出(0x前缀)
|
"TreatJsonAsHex": false // 将json列作为16进制格式输出(0x前缀),生产库是没有json列的
|
||||||
},
|
},
|
||||||
"RedisCache": {
|
"RedisCache": {
|
||||||
"Configuration": "192.168.1.246:6380",
|
"Configuration": "192.168.1.246:6380",
|
||||||
|
Loading…
Reference in New Issue
Block a user