416 lines
18 KiB
C#
416 lines
18 KiB
C#
// #define FIX_PLAN_ITEM // 测试环境对OrderBlockPlanItem表进行修复时使用
|
||
|
||
using System.Text;
|
||
using MesETL.App;
|
||
using MesETL.App.Services;
|
||
using MesETL.App.Services.ETL;
|
||
using MesETL.App.Cache;
|
||
using MesETL.App.Const;
|
||
using MesETL.App.HostedServices;
|
||
using MesETL.App.HostedServices.Abstractions;
|
||
using MesETL.App.Options;
|
||
using MesETL.App.Services.ErrorRecorder;
|
||
using MesETL.App.Services.Loggers;
|
||
using MesETL.App.Services.Seq;
|
||
using MesETL.Shared.Compression;
|
||
using Microsoft.Extensions.Configuration;
|
||
using Microsoft.Extensions.DependencyInjection;
|
||
using Microsoft.Extensions.Hosting;
|
||
using Microsoft.Extensions.Logging;
|
||
using Serilog;
|
||
using Serilog.Events;
|
||
using DumpDataHelper = MesETL.App.Helpers.DumpDataHelper;
|
||
|
||
await RunProgram();
|
||
return;
|
||
|
||
async Task RunProgram()
|
||
{
|
||
ThreadPool.SetMaxThreads(200, 200);
|
||
var host = Host.CreateApplicationBuilder(args);
|
||
host.Configuration.AddJsonFile(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "appsettings.json"), false, false);
|
||
host.Configuration.AddCommandLine(args, new Dictionary<string, string>
|
||
{
|
||
{ "-d", "Input:InputDir" },
|
||
{ "--InputDir", "Input:InputDir" },
|
||
{ "-s", "Output:ConnectionString" },
|
||
{ "--ConnectionString", "Output:ConnectionString" },
|
||
{ "-r", "RedisCache:Configuration" },
|
||
{ "--Redis", "RedisCache:Configuration" },
|
||
{ "-g", "TenantDb:UseDbGroup" },
|
||
{ "--UseDbGroup", "TenantDb:UseDbGroup" },
|
||
{ "-c", "Command" },
|
||
{ "--Command", "Command" }
|
||
});
|
||
|
||
var inputOptions = host.Configuration.GetRequiredSection("Input").Get<DataInputOptions>()
|
||
?? throw new ApplicationException("缺少Input配置");
|
||
|
||
var transformOptions = host.Configuration.GetRequiredSection("Transform").Get<DataTransformOptions>()
|
||
?? throw new ApplicationException("缺少Transform配置");
|
||
|
||
var outputOptions = host.Configuration.GetRequiredSection("Output").Get<DatabaseOutputOptions>()
|
||
?? throw new ApplicationException("缺少Output配置");
|
||
|
||
var redisSection = host.Configuration.GetRequiredSection("RedisCache");
|
||
var redisOptions = redisSection.Get<RedisCacheOptions>() ?? throw new ApplicationException("缺少RedisCache配置");
|
||
|
||
var tenantDbSection = host.Configuration.GetRequiredSection("TenantDb");
|
||
var tenantDbOptions = new TenantDbOptions()
|
||
{
|
||
TenantKey = tenantDbSection.GetValue<string>(nameof(TenantDbOptions.TenantKey)) ?? throw new ApplicationException("分库配置缺少分库键TenantKey"),
|
||
UseDbGroup = tenantDbSection.GetValue<string>(nameof(TenantDbOptions.UseDbGroup)) ?? throw new ApplicationException("分库配置缺少使用分库组UseDbGroup")
|
||
};
|
||
tenantDbOptions.DbGroup = tenantDbSection.GetRequiredSection($"DbGroups:{tenantDbOptions.UseDbGroup}").Get<Dictionary<string,int>>()
|
||
?? throw new ApplicationException($"分库配置无法解析分库组{tenantDbOptions.UseDbGroup},请检查配置");
|
||
|
||
host.Services.Configure<TenantDbOptions>(options =>
|
||
{
|
||
options.TenantKey = tenantDbOptions.TenantKey;
|
||
options.UseDbGroup = tenantDbOptions.UseDbGroup;
|
||
options.DbGroup = tenantDbOptions.DbGroup;
|
||
});
|
||
host.Services.Configure<RedisCacheOptions>(redisSection);
|
||
|
||
var oldestTime = DateTime.ParseExact(transformOptions.CleanDate, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo);
|
||
var oldestTimeInt_yyyyMM = int.Parse(transformOptions.CleanDate);
|
||
var oldestTimeInt_yyMM = int.Parse(transformOptions.CleanDate[2..]);
|
||
|
||
// 输入配置
|
||
host.Services.Configure<DataInputOptions>(options =>
|
||
{
|
||
options.InputDir = inputOptions.InputDir ?? throw new ApplicationException("未配置输入目录");
|
||
options.UseMock = inputOptions.UseMock;
|
||
options.TableMockConfig = inputOptions.TableMockConfig;
|
||
options.MockCountMultiplier = inputOptions.MockCountMultiplier;
|
||
options.TableIgnoreList = inputOptions.TableIgnoreList;
|
||
options.TableOrder = inputOptions.TableOrder;
|
||
|
||
// 配置文件元数据构建方法
|
||
options.FileInputMetaBuilder = DumpDataHelper.MyDumperFileInputMetaBuilder;
|
||
|
||
// 配置表输入完成事件
|
||
options.OnTableInputCompleted = null;
|
||
|
||
// 配置表模拟数据
|
||
options.TableMockConfig = new Dictionary<string, TableMockConfig>
|
||
{ };
|
||
});
|
||
|
||
host.Services.Configure<DataTransformOptions>(options =>
|
||
{
|
||
static string CalculateShardKeyByOrderNo(ReadOnlySpan<char> orderNo)
|
||
=> $"{orderNo[2..6]}0";
|
||
|
||
options.StrictMode = transformOptions.StrictMode;
|
||
options.EnableFilter = transformOptions.EnableFilter;
|
||
options.EnableReplacer = transformOptions.EnableReplacer;
|
||
options.EnableReBuilder = transformOptions.EnableReBuilder;
|
||
|
||
// order_block_plan_item和order_package_item表不导入,根据order_item数据直接重建
|
||
|
||
// 数据清理
|
||
options.RecordFilter = async context => // TODO: OPT: oldestTime等外部变量会产生闭包
|
||
{
|
||
var record = context.Record;
|
||
switch (record.TableName)
|
||
{
|
||
// 清理CreateTime < 202401的
|
||
case TableNames.OrderBlockPlan:
|
||
{
|
||
var creationTime = DateTime.Parse(record["CreateTime"].AsSpan().Trim(['"', '\'']));
|
||
if (creationTime < oldestTime)
|
||
{
|
||
return false;
|
||
}
|
||
break;
|
||
}
|
||
// 忽略OrderBlockPlanItem
|
||
case TableNames.OrderBlockPlanItem:
|
||
{
|
||
return false;
|
||
}
|
||
// 清理(Status != 0 || Deleted = 1) && ID前四位 < 2401的
|
||
case TableNames.OrderScrapBoard:
|
||
{
|
||
var status = record["Status"].AsSpan();
|
||
var deleted = record["Deleted"].AsSpan();
|
||
var idPref = int.Parse(record["ID"].AsSpan()[..4]);
|
||
if ((status is not "0" || deleted is "1") && idPref < oldestTimeInt_yyMM)
|
||
{
|
||
return false;
|
||
}
|
||
break;
|
||
}
|
||
// 清理OrderNo < 202401的
|
||
case TableNames.SimplePackage:
|
||
{
|
||
var orderNo = int.Parse(record["OrderNo"].AsSpan()[..4]);
|
||
if (orderNo < oldestTimeInt_yyMM)
|
||
{
|
||
return false;
|
||
}
|
||
break;
|
||
}
|
||
// 清理CreateTime < 202401的
|
||
case TableNames.SimplePlanOrder:
|
||
{
|
||
var creationTime = DateTime.Parse(record["CreateTime"].AsSpan().Trim(['"', '\'']));
|
||
if (creationTime < oldestTime)
|
||
{
|
||
return false;
|
||
}
|
||
break;
|
||
}
|
||
default: break;
|
||
}
|
||
|
||
return true;
|
||
};
|
||
|
||
// 数据替换
|
||
/*
|
||
* 空数据处理:
|
||
* 某些列生产库为可空,而测试库变为了不可空,则需要根据列的类型对这些列做单独处理
|
||
* int或任何非无符号整型 -> -1
|
||
* varchar -> ''(空字符串)
|
||
* datetime -> '1000-01-01'(datetime最小值)
|
||
* text -> 0 (16进制0,MyDumper中的text是为16进制)
|
||
*/
|
||
const string DefaultInt = "0";
|
||
const string DefaultStr = "''";
|
||
const string DefaultDateTime = "'1000-01-01'";
|
||
const string DefaultText = "0";
|
||
|
||
options.RecordModify = async context =>
|
||
{
|
||
void ReplaceIfMyDumperNull(DataRecord record, string fieldName, string replaceValue)
|
||
{
|
||
if (record[fieldName] is ConstVar.MyDumperNull)
|
||
{
|
||
context.Logger.LogWarning("发现不可空的字段为空({TableName}.{FieldName}),填充默认值: {DefaultValue}",
|
||
record.TableName, fieldName, replaceValue);
|
||
record[fieldName] = replaceValue;
|
||
}
|
||
}
|
||
|
||
var record = context.Record;
|
||
var cache = context.Cacher;
|
||
switch (record.TableName)
|
||
{
|
||
// 重构Data列二进制数据
|
||
case TableNames.OrderBoxBlock:
|
||
{
|
||
var data = record["Data"];
|
||
if (data is not ConstVar.MyDumperNull and ConstVar.Null)
|
||
{
|
||
var hex = Encoding.UTF8.GetString(Convert.FromHexString(data));
|
||
record["Data"] = hex;
|
||
}
|
||
|
||
break;
|
||
}
|
||
// 将JsonStr列转换为Data列,添加CompressionType列
|
||
case TableNames.OrderModuleExtra:
|
||
{
|
||
record.AppendField("CompressionType", "1");
|
||
record.AppendField("Data",
|
||
Convert.ToHexString(DeflateArchive.Compress(Convert.FromHexString(record["JsonStr"]))));
|
||
record.RemoveField("JsonStr");
|
||
break;
|
||
}
|
||
// 删除ID列,让数据库自行递增
|
||
// TODO: 数据表改进,删除ID列或是替换为流水号
|
||
case TableNames.ProcessStepEfficiency:
|
||
{
|
||
record.RemoveField("ID");
|
||
break;
|
||
}
|
||
case TableNames.ProcessScheduleCapacity:
|
||
{
|
||
record.RemoveField("ID");
|
||
break;
|
||
}
|
||
case TableNames.SysConfig:
|
||
{
|
||
record.RemoveField("Key");
|
||
break;
|
||
}
|
||
// 移除PlaceData列(如果存在的话,生产库已经删除)
|
||
case TableNames.SimplePlanOrder:
|
||
{
|
||
if(record.HeaderExists("PlaceData"))
|
||
record.RemoveField("PlaceData");
|
||
break;
|
||
}
|
||
default: break;
|
||
}
|
||
|
||
return record;
|
||
|
||
string ThrowIfNoCached(string? cached, string tableName, string cachedTableName, string cachedColumn, string appendMessage = "")
|
||
{
|
||
if (cached is null)
|
||
throw new InvalidDataException(
|
||
$"{tableName}数据异常,在缓存中未找到对应{cachedTableName}.{cachedColumn}\t{appendMessage}");
|
||
return cached;
|
||
}
|
||
};
|
||
|
||
// 数据缓存
|
||
options.RecordCache = null;
|
||
|
||
// 数据库过滤
|
||
options.DatabaseFilter = record =>
|
||
{
|
||
var companyId = int.Parse(record[tenantDbOptions.TenantKey]); // 每个实体都应存在CompanyID,否则异常
|
||
return tenantDbOptions.GetDbNameByTenantKeyValue(companyId);
|
||
};
|
||
|
||
// 数据重建
|
||
options.RecordReBuild = context =>
|
||
{
|
||
var record = context.Record;
|
||
|
||
// 将OrderExtra表迁移至OrderWaveGroup表
|
||
if (record.TableName == TableNames.OrderExtra)
|
||
{
|
||
record.Ignore = true;
|
||
var resultList = new List<DataRecord>();
|
||
var seq = context.Services.GetRequiredService<SeqService>();
|
||
string[] headers = ["OrderNo", "ShardKey", "ConfigType", "ConfigJson", "CompanyID"];
|
||
var id = seq.AddCachedSeq(SeqConfig.OrderWaveGroupID);
|
||
var orderWaveGroup = new DataRecord(
|
||
[id.ToString(), ..headers.Select(c => record[c])],
|
||
TableNames.OrderExtraList,
|
||
["ID", "OrderNo", "ShardKey", "Type", "ConfigJson", "CompanyID"]);
|
||
resultList.Add(orderWaveGroup);
|
||
return resultList;
|
||
}
|
||
|
||
// 通过OrderItem重建OrderBlockPlanItem表
|
||
if (record.TableName == TableNames.OrderItem)
|
||
{
|
||
#if FIX_PLAN_ITEM
|
||
record.Ignore = true;
|
||
#endif
|
||
var resultList = new List<DataRecord>();
|
||
record.TryGetField("ID", out var itemId);
|
||
record.TryGetField("ShardKey", out var shardKey);
|
||
record.TryGetField("PlanID", out var planId);
|
||
record.TryGetField("PackageID", out var packageId);
|
||
record.TryGetField("CompanyID", out var companyId);
|
||
if(!int.TryParse(planId, out var pid))
|
||
throw new ApplicationException($"数据发生异常:OrderItem.PlanID,值: {(string.IsNullOrWhiteSpace(planId) ? "NULL" : planId)}");
|
||
if (pid > 0)
|
||
{
|
||
resultList.Add(new DataRecord([itemId, shardKey, planId, companyId],
|
||
TableNames.OrderBlockPlanItem,
|
||
["ItemID", "ShardKey", "PlanID", "CompanyID"]
|
||
));
|
||
}
|
||
if(!int.TryParse(packageId, out var pkid))
|
||
throw new ApplicationException($"数据发生异常:OrderItem.PackageID,值: {(string.IsNullOrWhiteSpace(packageId) ? "NULL" : packageId)}");
|
||
if(pkid > 0)
|
||
{
|
||
resultList.Add(new DataRecord([itemId, shardKey, packageId, companyId],
|
||
TableNames.OrderPackageItem,
|
||
[ "ItemID", "ShardKey", "PackageID", "CompanyID" ]
|
||
));
|
||
}
|
||
|
||
record.RemoveField("PlanID");
|
||
record.RemoveField("PackageID");
|
||
|
||
return resultList;
|
||
}
|
||
|
||
|
||
return ArraySegment<DataRecord>.Empty;
|
||
};
|
||
});
|
||
|
||
host.Services.Configure<DatabaseOutputOptions>(options =>
|
||
{
|
||
options.ConnectionString = outputOptions.ConnectionString;
|
||
options.FlushCount = outputOptions.FlushCount;
|
||
options.MaxAllowedPacket = outputOptions.MaxAllowedPacket / 2;
|
||
options.MaxDatabaseOutputTask = outputOptions.MaxDatabaseOutputTask;
|
||
options.TreatJsonAsHex = outputOptions.TreatJsonAsHex;
|
||
options.NoOutput = outputOptions.NoOutput;
|
||
options.ForUpdate = outputOptions.ForUpdate;
|
||
|
||
// 配置列的类型以便于在输出时区分二进制内容
|
||
// Prod server
|
||
options.ColumnTypeConfig = new Dictionary<string, ColumnType>
|
||
{
|
||
{"machine.Settings", ColumnType.Text},
|
||
{"order_block_plan.BlockInfo", ColumnType.Text},
|
||
{"order_block_plan.OrderNos", ColumnType.Json},
|
||
{"order_block_plan_result.PlaceData", ColumnType.Blob},
|
||
{"order_box_block.Data", ColumnType.Blob},
|
||
{"order_data_block.RemarkJson", ColumnType.Text},
|
||
{"order_data_goods.ExtraProp", ColumnType.Json},
|
||
{"order_extra.ConfigJson", ColumnType.Json},
|
||
{"order_module_extra.Data", ColumnType.Blob},
|
||
{"order_module_extra.JsonStr", ColumnType.Text},
|
||
{"order_patch_detail.BlockDetail", ColumnType.Json},
|
||
{"order_process_schdule.AreaName", ColumnType.Text},
|
||
{"order_process_schdule.ConsigneeAddress", ColumnType.Text},
|
||
{"order_process_schdule.ConsigneePhone", ColumnType.Text},
|
||
{"order_process_schdule.CustomOrderNo", ColumnType.Text},
|
||
{"order_process_schdule.OrderProcessStepName", ColumnType.Text},
|
||
{"order_scrap_board.OutLineJson", ColumnType.Text},
|
||
{"order_wave_group.ConfigJson", ColumnType.Json},
|
||
{"process_info.Users", ColumnType.Text},
|
||
{"process_item_exp.ItemJson", ColumnType.Text},
|
||
{"report_template.SourceConfig", ColumnType.Text},
|
||
{"report_template.Template", ColumnType.Text},
|
||
{"simple_package.Items", ColumnType.Json},
|
||
{"sys_config.JsonStr", ColumnType.Text},
|
||
{"sys_config.Value", ColumnType.Text}
|
||
};
|
||
|
||
options.OutputFinished += ctx =>
|
||
{
|
||
var seq = ctx.Serivces.GetRequiredService<SeqService>();
|
||
seq.ApplyToDatabaseAsync().GetAwaiter().GetResult();
|
||
};
|
||
});
|
||
|
||
host.Services.AddLogging(builder =>
|
||
{
|
||
builder.ClearProviders();
|
||
var logger = new LoggerConfiguration()
|
||
.MinimumLevel.Verbose()
|
||
.WriteTo.Console()
|
||
.WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"),
|
||
restrictedToMinimumLevel: LogEventLevel.Error)
|
||
// .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
|
||
.CreateLogger();
|
||
builder.AddSerilog(logger);
|
||
Log.Logger = logger;
|
||
});
|
||
|
||
host.Services.AddDataSourceFactory();
|
||
host.Services.AddErrorRecorderFactory();
|
||
host.Services.AddSingleton<ProcessContext>();
|
||
host.Services.AddSingleton<SeqService>();
|
||
var prodLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ProducerQueueLength");
|
||
var consLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ConsumerQueueLength");
|
||
var maxCharCount = host.Configuration.GetRequiredSection("RecordQueue").GetValue<long>("MaxByteCount") / 2;
|
||
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer, new DataRecordQueue(prodLen, maxCharCount));
|
||
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(consLen, maxCharCount))).ToArray());
|
||
// host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
|
||
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();
|
||
|
||
host.Services.AddHostedService<MainHostedService>();
|
||
host.Services.AddSingleton<IInputService, FileInputService>();
|
||
host.Services.AddSingleton<ITransformService, TransformService>();
|
||
host.Services.AddSingleton<IOutputService, OutputService>();
|
||
host.Services.AddSingleton<TaskMonitorService>();
|
||
// host.Services.AddRedisCache(redisOptions);
|
||
host.Services.AddSingleton<ICacher, MemoryCache>();
|
||
var app = host.Build();
|
||
await app.RunAsync();
|
||
} |