// #define FIX_PLAN_ITEM // 测试环境对OrderBlockPlanItem表进行修复时使用 using System.Text; using MesETL.App; using MesETL.App.Services; using MesETL.App.Services.ETL; using MesETL.App.Cache; using MesETL.App.Const; using MesETL.App.HostedServices; using MesETL.App.HostedServices.Abstractions; using MesETL.App.Options; using MesETL.App.Services.ErrorRecorder; using MesETL.App.Services.Loggers; using MesETL.App.Services.Seq; using MesETL.Shared.Compression; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Serilog; using Serilog.Events; using DumpDataHelper = MesETL.App.Helpers.DumpDataHelper; await RunProgram(); return; async Task RunProgram() { ThreadPool.SetMaxThreads(200, 200); var host = Host.CreateApplicationBuilder(args); host.Configuration.AddJsonFile(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "appsettings.json"), false, false); host.Configuration.AddCommandLine(args, new Dictionary { { "-d", "Input:InputDir" }, { "--InputDir", "Input:InputDir" }, { "-s", "Output:ConnectionString" }, { "--ConnectionString", "Output:ConnectionString" }, { "-r", "RedisCache:Configuration" }, { "--Redis", "RedisCache:Configuration" }, { "-g", "TenantDb:UseDbGroup" }, { "--UseDbGroup", "TenantDb:UseDbGroup" }, { "-c", "Command" }, { "--Command", "Command" } }); var inputOptions = host.Configuration.GetRequiredSection("Input").Get() ?? throw new ApplicationException("缺少Input配置"); var transformOptions = host.Configuration.GetRequiredSection("Transform").Get() ?? throw new ApplicationException("缺少Transform配置"); var outputOptions = host.Configuration.GetRequiredSection("Output").Get() ?? throw new ApplicationException("缺少Output配置"); var redisSection = host.Configuration.GetRequiredSection("RedisCache"); var redisOptions = redisSection.Get() ?? throw new ApplicationException("缺少RedisCache配置"); var tenantDbSection = host.Configuration.GetRequiredSection("TenantDb"); var tenantDbOptions = new TenantDbOptions() { TenantKey = tenantDbSection.GetValue(nameof(TenantDbOptions.TenantKey)) ?? throw new ApplicationException("分库配置缺少分库键TenantKey"), UseDbGroup = tenantDbSection.GetValue(nameof(TenantDbOptions.UseDbGroup)) ?? throw new ApplicationException("分库配置缺少使用分库组UseDbGroup") }; tenantDbOptions.DbGroup = tenantDbSection.GetRequiredSection($"DbGroups:{tenantDbOptions.UseDbGroup}").Get>() ?? throw new ApplicationException($"分库配置无法解析分库组{tenantDbOptions.UseDbGroup},请检查配置"); host.Services.Configure(options => { options.TenantKey = tenantDbOptions.TenantKey; options.UseDbGroup = tenantDbOptions.UseDbGroup; options.DbGroup = tenantDbOptions.DbGroup; }); host.Services.Configure(redisSection); var oldestTime = DateTime.ParseExact(transformOptions.CleanDate, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo); var oldestTimeInt_yyyyMM = int.Parse(transformOptions.CleanDate); var oldestTimeInt_yyMM = int.Parse(transformOptions.CleanDate[2..]); // 输入配置 host.Services.Configure(options => { options.InputDir = inputOptions.InputDir ?? throw new ApplicationException("未配置输入目录"); options.UseMock = inputOptions.UseMock; options.TableMockConfig = inputOptions.TableMockConfig; options.MockCountMultiplier = inputOptions.MockCountMultiplier; options.TableIgnoreList = inputOptions.TableIgnoreList; options.TableOrder = inputOptions.TableOrder; // 配置文件元数据构建方法 options.FileInputMetaBuilder = DumpDataHelper.MyDumperFileInputMetaBuilder; // 配置表输入完成事件 options.OnTableInputCompleted = null; // 配置表模拟数据 options.TableMockConfig = new Dictionary { }; }); host.Services.Configure(options => { static string CalculateShardKeyByOrderNo(ReadOnlySpan orderNo) => $"{orderNo[2..6]}0"; options.StrictMode = transformOptions.StrictMode; options.EnableFilter = transformOptions.EnableFilter; options.EnableReplacer = transformOptions.EnableReplacer; options.EnableReBuilder = transformOptions.EnableReBuilder; // order_block_plan_item和order_package_item表不导入,根据order_item数据直接重建 // 数据清理 options.RecordFilter = async context => // TODO: OPT: oldestTime等外部变量会产生闭包 { var record = context.Record; switch (record.TableName) { // 清理CreateTime < 202401的 case TableNames.OrderBlockPlan: { var creationTime = DateTime.Parse(record["CreateTime"].AsSpan().Trim(['"', '\''])); if (creationTime < oldestTime) { return false; } break; } // 忽略OrderBlockPlanItem case TableNames.OrderBlockPlanItem: { return false; } // 清理(Status != 0 || Deleted = 1) && ID前四位 < 2401的 case TableNames.OrderScrapBoard: { var status = record["Status"].AsSpan(); var deleted = record["Deleted"].AsSpan(); var idPref = int.Parse(record["ID"].AsSpan()[..4]); if ((status is not "0" || deleted is "1") && idPref < oldestTimeInt_yyMM) { return false; } break; } // 清理OrderNo < 202401的 case TableNames.SimplePackage: { var orderNo = int.Parse(record["OrderNo"].AsSpan()[..4]); if (orderNo < oldestTimeInt_yyMM) { return false; } break; } // 清理CreateTime < 202401的 case TableNames.SimplePlanOrder: { var creationTime = DateTime.Parse(record["CreateTime"].AsSpan().Trim(['"', '\''])); if (creationTime < oldestTime) { return false; } break; } default: break; } return true; }; // 数据替换 /* * 空数据处理: * 某些列生产库为可空,而测试库变为了不可空,则需要根据列的类型对这些列做单独处理 * int或任何非无符号整型 -> -1 * varchar -> ''(空字符串) * datetime -> '1000-01-01'(datetime最小值) * text -> 0 (16进制0,MyDumper中的text是为16进制) */ const string DefaultInt = "0"; const string DefaultStr = "''"; const string DefaultDateTime = "'1000-01-01'"; const string DefaultText = "0"; options.RecordModify = async context => { void ReplaceIfMyDumperNull(DataRecord record, string fieldName, string replaceValue) { if (record[fieldName] is ConstVar.MyDumperNull) { context.Logger.LogWarning("发现不可空的字段为空({TableName}.{FieldName}),填充默认值: {DefaultValue}", record.TableName, fieldName, replaceValue); record[fieldName] = replaceValue; } } var record = context.Record; var cache = context.Cacher; switch (record.TableName) { // 重构Data列二进制数据 case TableNames.OrderBoxBlock: { var data = record["Data"]; if (data is not ConstVar.MyDumperNull and ConstVar.Null) { var hex = Encoding.UTF8.GetString(Convert.FromHexString(data)); record["Data"] = hex; } break; } // 将JsonStr列转换为Data列,添加CompressionType列 case TableNames.OrderModuleExtra: { record.AppendField("CompressionType", "1"); record.AppendField("Data", Convert.ToHexString(DeflateArchive.Compress(Convert.FromHexString(record["JsonStr"])))); record.RemoveField("JsonStr"); break; } // 删除ID列,让数据库自行递增 // TODO: 数据表改进,删除ID列或是替换为流水号 case TableNames.ProcessStepEfficiency: { record.RemoveField("ID"); break; } case TableNames.ProcessScheduleCapacity: { record.RemoveField("ID"); break; } case TableNames.SysConfig: { record.RemoveField("Key"); break; } // 移除PlaceData列(如果存在的话,生产库已经删除) case TableNames.SimplePlanOrder: { if(record.HeaderExists("PlaceData")) record.RemoveField("PlaceData"); break; } default: break; } return record; string ThrowIfNoCached(string? cached, string tableName, string cachedTableName, string cachedColumn, string appendMessage = "") { if (cached is null) throw new InvalidDataException( $"{tableName}数据异常,在缓存中未找到对应{cachedTableName}.{cachedColumn}\t{appendMessage}"); return cached; } }; // 数据缓存 options.RecordCache = null; // 数据库过滤 options.DatabaseFilter = record => { var companyId = int.Parse(record[tenantDbOptions.TenantKey]); // 每个实体都应存在CompanyID,否则异常 return tenantDbOptions.GetDbNameByTenantKeyValue(companyId); }; // 数据重建 options.RecordReBuild = context => { var record = context.Record; // 将OrderExtra表迁移至OrderWaveGroup表 if (record.TableName == TableNames.OrderExtra) { record.Ignore = true; var resultList = new List(); var seq = context.Services.GetRequiredService(); string[] headers = ["OrderNo", "ShardKey", "ConfigType", "ConfigJson", "CompanyID"]; var id = seq.AddCachedSeq(SeqConfig.OrderWaveGroupID); var orderWaveGroup = new DataRecord( [id.ToString(), ..headers.Select(c => record[c])], TableNames.OrderExtraList, ["ID", "OrderNo", "ShardKey", "Type", "ConfigJson", "CompanyID"]); resultList.Add(orderWaveGroup); return resultList; } // 通过OrderItem重建OrderBlockPlanItem表 if (record.TableName == TableNames.OrderItem) { #if FIX_PLAN_ITEM record.Ignore = true; #endif var resultList = new List(); record.TryGetField("ID", out var itemId); record.TryGetField("ShardKey", out var shardKey); record.TryGetField("PlanID", out var planId); record.TryGetField("PackageID", out var packageId); record.TryGetField("CompanyID", out var companyId); if(!int.TryParse(planId, out var pid)) throw new ApplicationException($"数据发生异常:OrderItem.PlanID,值: {(string.IsNullOrWhiteSpace(planId) ? "NULL" : planId)}"); if (pid > 0) { resultList.Add(new DataRecord([itemId, shardKey, planId, companyId], TableNames.OrderBlockPlanItem, ["ItemID", "ShardKey", "PlanID", "CompanyID"] )); } if(!int.TryParse(packageId, out var pkid)) throw new ApplicationException($"数据发生异常:OrderItem.PackageID,值: {(string.IsNullOrWhiteSpace(packageId) ? "NULL" : packageId)}"); if(pkid > 0) { resultList.Add(new DataRecord([itemId, shardKey, packageId, companyId], TableNames.OrderPackageItem, [ "ItemID", "ShardKey", "PackageID", "CompanyID" ] )); } record.RemoveField("PlanID"); record.RemoveField("PackageID"); return resultList; } return ArraySegment.Empty; }; }); host.Services.Configure(options => { options.ConnectionString = outputOptions.ConnectionString; options.FlushCount = outputOptions.FlushCount; options.MaxAllowedPacket = outputOptions.MaxAllowedPacket / 2; options.MaxDatabaseOutputTask = outputOptions.MaxDatabaseOutputTask; options.TreatJsonAsHex = outputOptions.TreatJsonAsHex; options.NoOutput = outputOptions.NoOutput; options.ForUpdate = outputOptions.ForUpdate; // 配置列的类型以便于在输出时区分二进制内容 // Prod server options.ColumnTypeConfig = new Dictionary { {"machine.Settings", ColumnType.Text}, {"order_block_plan.BlockInfo", ColumnType.Text}, {"order_block_plan.OrderNos", ColumnType.Json}, {"order_block_plan_result.PlaceData", ColumnType.Blob}, {"order_box_block.Data", ColumnType.Blob}, {"order_data_block.RemarkJson", ColumnType.Text}, {"order_data_goods.ExtraProp", ColumnType.Json}, {"order_extra.ConfigJson", ColumnType.Json}, {"order_module_extra.Data", ColumnType.Blob}, {"order_module_extra.JsonStr", ColumnType.Text}, {"order_patch_detail.BlockDetail", ColumnType.Json}, {"order_process_schdule.AreaName", ColumnType.Text}, {"order_process_schdule.ConsigneeAddress", ColumnType.Text}, {"order_process_schdule.ConsigneePhone", ColumnType.Text}, {"order_process_schdule.CustomOrderNo", ColumnType.Text}, {"order_process_schdule.OrderProcessStepName", ColumnType.Text}, {"order_scrap_board.OutLineJson", ColumnType.Text}, {"order_wave_group.ConfigJson", ColumnType.Json}, {"process_info.Users", ColumnType.Text}, {"process_item_exp.ItemJson", ColumnType.Text}, {"report_template.SourceConfig", ColumnType.Text}, {"report_template.Template", ColumnType.Text}, {"simple_package.Items", ColumnType.Json}, {"sys_config.JsonStr", ColumnType.Text}, {"sys_config.Value", ColumnType.Text} }; options.OutputFinished += ctx => { var seq = ctx.Serivces.GetRequiredService(); seq.ApplyToDatabaseAsync().GetAwaiter().GetResult(); }; }); host.Services.AddLogging(builder => { builder.ClearProviders(); var logger = new LoggerConfiguration() .MinimumLevel.Verbose() .WriteTo.Console() .WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"), restrictedToMinimumLevel: LogEventLevel.Error) // .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用 .CreateLogger(); builder.AddSerilog(logger); Log.Logger = logger; }); host.Services.AddDataSourceFactory(); host.Services.AddErrorRecorderFactory(); host.Services.AddSingleton(); host.Services.AddSingleton(); var prodLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue("ProducerQueueLength"); var consLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue("ConsumerQueueLength"); var maxCharCount = host.Configuration.GetRequiredSection("RecordQueue").GetValue("MaxByteCount") / 2; host.Services.AddKeyedSingleton(ConstVar.Producer, new DataRecordQueue(prodLen, maxCharCount)); host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(consLen, maxCharCount))).ToArray()); // host.Services.AddSingleton(); host.Services.AddSingleton(); host.Services.AddHostedService(); host.Services.AddSingleton(); host.Services.AddSingleton(); host.Services.AddSingleton(); host.Services.AddSingleton(); // host.Services.AddRedisCache(redisOptions); host.Services.AddSingleton(); var app = host.Build(); await app.RunAsync(); }