MES-ETL/MesETL.App/Program.cs

416 lines
18 KiB
C#
Raw Permalink Normal View History

// #define FIX_PLAN_ITEM // 测试环境对OrderBlockPlanItem表进行修复时使用
2024-01-29 09:29:16 +08:00
2024-12-10 14:03:09 +08:00
using System.Text;
using MesETL.App;
using MesETL.App.Services;
using MesETL.App.Services.ETL;
using MesETL.App.Cache;
using MesETL.App.Const;
using MesETL.App.HostedServices;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services.ErrorRecorder;
using MesETL.App.Services.Loggers;
2024-12-10 14:03:09 +08:00
using MesETL.App.Services.Seq;
using MesETL.Shared.Compression;
2023-12-28 15:18:03 +08:00
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Serilog;
using Serilog.Events;
using DumpDataHelper = MesETL.App.Helpers.DumpDataHelper;
2024-01-12 16:50:37 +08:00
2023-12-29 16:16:05 +08:00
await RunProgram();
return;
async Task RunProgram()
2023-12-28 15:18:03 +08:00
{
2023-12-29 16:16:05 +08:00
ThreadPool.SetMaxThreads(200, 200);
2024-01-12 16:50:37 +08:00
var host = Host.CreateApplicationBuilder(args);
host.Configuration.AddJsonFile(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "appsettings.json"), false, false);
2024-01-29 09:29:16 +08:00
host.Configuration.AddCommandLine(args, new Dictionary<string, string>
2023-12-29 16:16:05 +08:00
{
{ "-d", "Input:InputDir" },
{ "--InputDir", "Input:InputDir" },
{ "-s", "Output:ConnectionString" },
{ "--ConnectionString", "Output:ConnectionString" },
{ "-r", "RedisCache:Configuration" },
{ "--Redis", "RedisCache:Configuration" },
{ "-g", "TenantDb:UseDbGroup" },
{ "--UseDbGroup", "TenantDb:UseDbGroup" },
{ "-c", "Command" },
{ "--Command", "Command" }
2024-01-29 09:29:16 +08:00
});
var inputOptions = host.Configuration.GetRequiredSection("Input").Get<DataInputOptions>()
?? throw new ApplicationException("缺少Input配置");
var transformOptions = host.Configuration.GetRequiredSection("Transform").Get<DataTransformOptions>()
?? throw new ApplicationException("缺少Transform配置");
var outputOptions = host.Configuration.GetRequiredSection("Output").Get<DatabaseOutputOptions>()
?? throw new ApplicationException("缺少Output配置");
var redisSection = host.Configuration.GetRequiredSection("RedisCache");
var redisOptions = redisSection.Get<RedisCacheOptions>() ?? throw new ApplicationException("缺少RedisCache配置");
2024-01-29 09:29:16 +08:00
var tenantDbSection = host.Configuration.GetRequiredSection("TenantDb");
var tenantDbOptions = new TenantDbOptions()
{
TenantKey = tenantDbSection.GetValue<string>(nameof(TenantDbOptions.TenantKey)) ?? throw new ApplicationException("分库配置缺少分库键TenantKey"),
UseDbGroup = tenantDbSection.GetValue<string>(nameof(TenantDbOptions.UseDbGroup)) ?? throw new ApplicationException("分库配置缺少使用分库组UseDbGroup")
};
tenantDbOptions.DbGroup = tenantDbSection.GetRequiredSection($"DbGroups:{tenantDbOptions.UseDbGroup}").Get<Dictionary<string,int>>()
?? throw new ApplicationException($"分库配置无法解析分库组{tenantDbOptions.UseDbGroup},请检查配置");
2024-01-29 09:29:16 +08:00
host.Services.Configure<TenantDbOptions>(options =>
{
options.TenantKey = tenantDbOptions.TenantKey;
options.UseDbGroup = tenantDbOptions.UseDbGroup;
options.DbGroup = tenantDbOptions.DbGroup;
});
2024-01-29 09:29:16 +08:00
host.Services.Configure<RedisCacheOptions>(redisSection);
2024-12-10 14:03:09 +08:00
2024-02-01 13:41:59 +08:00
var oldestTime = DateTime.ParseExact(transformOptions.CleanDate, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo);
2024-12-10 14:03:09 +08:00
var oldestTimeInt_yyyyMM = int.Parse(transformOptions.CleanDate);
var oldestTimeInt_yyMM = int.Parse(transformOptions.CleanDate[2..]);
2024-01-29 09:29:16 +08:00
// 输入配置
2024-01-12 16:50:37 +08:00
host.Services.Configure<DataInputOptions>(options =>
{
2024-01-29 09:29:16 +08:00
options.InputDir = inputOptions.InputDir ?? throw new ApplicationException("未配置输入目录");
options.UseMock = inputOptions.UseMock;
options.TableMockConfig = inputOptions.TableMockConfig;
options.MockCountMultiplier = inputOptions.MockCountMultiplier;
2024-02-09 19:08:57 +08:00
options.TableIgnoreList = inputOptions.TableIgnoreList;
2024-12-10 14:03:09 +08:00
options.TableOrder = inputOptions.TableOrder;
2024-01-29 09:29:16 +08:00
2024-12-10 14:03:09 +08:00
// 配置文件元数据构建方法
options.FileInputMetaBuilder = DumpDataHelper.MyDumperFileInputMetaBuilder;
2024-02-09 19:08:57 +08:00
2024-12-10 14:03:09 +08:00
// 配置表输入完成事件
options.OnTableInputCompleted = null;
2024-01-29 09:29:16 +08:00
2024-12-10 14:03:09 +08:00
// 配置表模拟数据
2024-01-29 09:29:16 +08:00
options.TableMockConfig = new Dictionary<string, TableMockConfig>
2024-12-10 14:03:09 +08:00
{ };
2024-01-29 09:29:16 +08:00
});
host.Services.Configure<DataTransformOptions>(options =>
{
static string CalculateShardKeyByOrderNo(ReadOnlySpan<char> orderNo)
=> $"{orderNo[2..6]}0";
options.StrictMode = transformOptions.StrictMode;
options.EnableFilter = transformOptions.EnableFilter;
options.EnableReplacer = transformOptions.EnableReplacer;
options.EnableReBuilder = transformOptions.EnableReBuilder;
// order_block_plan_item和order_package_item表不导入根据order_item数据直接重建
// 数据清理
2024-12-10 14:03:09 +08:00
options.RecordFilter = async context => // TODO: OPT: oldestTime等外部变量会产生闭包
2024-01-12 16:50:37 +08:00
{
2024-01-29 09:29:16 +08:00
var record = context.Record;
switch (record.TableName)
2024-01-12 16:50:37 +08:00
{
2024-12-10 14:03:09 +08:00
// 清理CreateTime < 202401的
2024-01-29 09:29:16 +08:00
case TableNames.OrderBlockPlan:
2024-01-16 18:00:23 +08:00
{
2024-12-10 14:03:09 +08:00
var creationTime = DateTime.Parse(record["CreateTime"].AsSpan().Trim(['"', '\'']));
if (creationTime < oldestTime)
{
2024-01-29 09:29:16 +08:00
return false;
2024-12-10 14:03:09 +08:00
}
2024-01-29 09:29:16 +08:00
break;
2024-01-16 18:00:23 +08:00
}
// 忽略OrderBlockPlanItem
case TableNames.OrderBlockPlanItem:
2024-01-16 18:00:23 +08:00
{
return false;
}
2024-12-10 14:03:09 +08:00
// 清理(Status != 0 || Deleted = 1) && ID前四位 < 2401的
case TableNames.OrderScrapBoard:
2024-01-16 18:00:23 +08:00
{
2024-12-10 14:03:09 +08:00
var status = record["Status"].AsSpan();
var deleted = record["Deleted"].AsSpan();
var idPref = int.Parse(record["ID"].AsSpan()[..4]);
if ((status is not "0" || deleted is "1") && idPref < oldestTimeInt_yyMM)
{
2024-01-29 09:29:16 +08:00
return false;
2024-12-10 14:03:09 +08:00
}
2024-01-29 09:29:16 +08:00
break;
2024-01-16 18:00:23 +08:00
}
2024-12-10 14:03:09 +08:00
// 清理OrderNo < 202401的
2024-01-29 09:29:16 +08:00
case TableNames.SimplePackage:
2024-01-16 18:00:23 +08:00
{
2024-12-10 14:03:09 +08:00
var orderNo = int.Parse(record["OrderNo"].AsSpan()[..4]);
if (orderNo < oldestTimeInt_yyMM)
{
2024-01-29 09:29:16 +08:00
return false;
2024-12-10 14:03:09 +08:00
}
2024-01-29 09:29:16 +08:00
break;
2024-01-16 18:00:23 +08:00
}
2024-12-10 14:03:09 +08:00
// 清理CreateTime < 202401的
2024-01-29 09:29:16 +08:00
case TableNames.SimplePlanOrder:
{
2024-12-10 14:03:09 +08:00
var creationTime = DateTime.Parse(record["CreateTime"].AsSpan().Trim(['"', '\'']));
if (creationTime < oldestTime)
{
2024-01-29 09:29:16 +08:00
return false;
2024-12-10 14:03:09 +08:00
}
2024-01-29 09:29:16 +08:00
break;
}
2024-12-10 14:03:09 +08:00
default: break;
2024-01-16 18:00:23 +08:00
}
2024-01-29 09:29:16 +08:00
return true;
};
// 数据替换
2024-02-06 15:37:21 +08:00
/*
*
*
* int或任何非无符号整型 -> -1
* varchar -> ''
* datetime -> '1000-01-01'datetime最小值
* text -> 0 160MyDumper中的text是为16进制
*/
const string DefaultInt = "0";
const string DefaultStr = "''";
const string DefaultDateTime = "'1000-01-01'";
const string DefaultText = "0";
2024-01-29 09:29:16 +08:00
options.RecordModify = async context =>
{
2024-02-08 17:38:23 +08:00
void ReplaceIfMyDumperNull(DataRecord record, string fieldName, string replaceValue)
{
if (record[fieldName] is ConstVar.MyDumperNull)
{
context.Logger.LogWarning("发现不可空的字段为空({TableName}.{FieldName}),填充默认值: {DefaultValue}",
record.TableName, fieldName, replaceValue);
record[fieldName] = replaceValue;
}
}
2024-01-29 09:29:16 +08:00
var record = context.Record;
var cache = context.Cacher;
switch (record.TableName)
2024-01-16 18:00:23 +08:00
{
2024-12-10 14:03:09 +08:00
// 重构Data列二进制数据
2024-01-29 09:29:16 +08:00
case TableNames.OrderBoxBlock:
2024-12-10 14:03:09 +08:00
{
var data = record["Data"];
if (data is not ConstVar.MyDumperNull and ConstVar.Null)
{
var hex = Encoding.UTF8.GetString(Convert.FromHexString(data));
record["Data"] = hex;
}
2024-02-15 16:18:50 +08:00
break;
2024-12-10 14:03:09 +08:00
}
// 将JsonStr列转换为Data列添加CompressionType列
case TableNames.OrderModuleExtra:
{
record.AppendField("CompressionType", "1");
record.AppendField("Data",
Convert.ToHexString(DeflateArchive.Compress(Convert.FromHexString(record["JsonStr"]))));
record.RemoveField("JsonStr");
break;
}
2024-12-10 14:03:09 +08:00
// 删除ID列让数据库自行递增
// TODO: 数据表改进删除ID列或是替换为流水号
case TableNames.ProcessStepEfficiency:
{
record.RemoveField("ID");
2024-02-06 15:37:21 +08:00
break;
2024-12-10 14:03:09 +08:00
}
case TableNames.ProcessScheduleCapacity:
{
record.RemoveField("ID");
2024-02-06 15:37:21 +08:00
break;
2024-12-10 14:03:09 +08:00
}
case TableNames.SysConfig:
2024-12-10 14:03:09 +08:00
{
record.RemoveField("Key");
2024-01-29 09:29:16 +08:00
break;
2024-12-10 14:03:09 +08:00
}
// 移除PlaceData列如果存在的话生产库已经删除
case TableNames.SimplePlanOrder:
2024-12-10 14:03:09 +08:00
{
if(record.HeaderExists("PlaceData"))
record.RemoveField("PlaceData");
2024-12-10 14:03:09 +08:00
break;
}
default: break;
2024-01-16 18:00:23 +08:00
}
2024-01-29 09:29:16 +08:00
return record;
string ThrowIfNoCached(string? cached, string tableName, string cachedTableName, string cachedColumn, string appendMessage = "")
2024-01-16 18:00:23 +08:00
{
2024-01-29 09:29:16 +08:00
if (cached is null)
throw new InvalidDataException(
$"{tableName}数据异常,在缓存中未找到对应{cachedTableName}.{cachedColumn}\t{appendMessage}");
return cached;
2024-01-16 18:00:23 +08:00
}
2024-01-29 09:29:16 +08:00
};
// 数据缓存
2024-12-10 14:03:09 +08:00
options.RecordCache = null;
2024-01-29 09:29:16 +08:00
// 数据库过滤
options.DatabaseFilter = record =>
2024-01-15 17:26:44 +08:00
{
2024-01-29 09:29:16 +08:00
var companyId = int.Parse(record[tenantDbOptions.TenantKey]); // 每个实体都应存在CompanyID否则异常
return tenantDbOptions.GetDbNameByTenantKeyValue(companyId);
};
// 数据重建
options.RecordReBuild = context =>
{
var record = context.Record;
2024-12-10 14:03:09 +08:00
// 将OrderExtra表迁移至OrderWaveGroup表
if (record.TableName == TableNames.OrderExtra)
2024-01-15 17:26:44 +08:00
{
2024-12-10 14:03:09 +08:00
record.Ignore = true;
var resultList = new List<DataRecord>();
var seq = context.Services.GetRequiredService<SeqService>();
string[] headers = ["OrderNo", "ShardKey", "ConfigType", "ConfigJson", "CompanyID"];
var id = seq.AddCachedSeq(SeqConfig.OrderWaveGroupID);
var orderWaveGroup = new DataRecord(
[id.ToString(), ..headers.Select(c => record[c])],
TableNames.OrderExtraList,
2024-12-10 14:03:09 +08:00
["ID", "OrderNo", "ShardKey", "Type", "ConfigJson", "CompanyID"]);
resultList.Add(orderWaveGroup);
return resultList;
2024-01-15 17:26:44 +08:00
}
// 通过OrderItem重建OrderBlockPlanItem表
if (record.TableName == TableNames.OrderItem)
{
#if FIX_PLAN_ITEM
record.Ignore = true;
#endif
var resultList = new List<DataRecord>();
record.TryGetField("ID", out var itemId);
record.TryGetField("ShardKey", out var shardKey);
record.TryGetField("PlanID", out var planId);
record.TryGetField("PackageID", out var packageId);
record.TryGetField("CompanyID", out var companyId);
if(!int.TryParse(planId, out var pid))
throw new ApplicationException($"数据发生异常OrderItem.PlanID值: {(string.IsNullOrWhiteSpace(planId) ? "NULL" : planId)}");
if (pid > 0)
{
resultList.Add(new DataRecord([itemId, shardKey, planId, companyId],
TableNames.OrderBlockPlanItem,
["ItemID", "ShardKey", "PlanID", "CompanyID"]
));
}
if(!int.TryParse(packageId, out var pkid))
throw new ApplicationException($"数据发生异常OrderItem.PackageID值: {(string.IsNullOrWhiteSpace(packageId) ? "NULL" : packageId)}");
if(pkid > 0)
{
resultList.Add(new DataRecord([itemId, shardKey, packageId, companyId],
TableNames.OrderPackageItem,
[ "ItemID", "ShardKey", "PackageID", "CompanyID" ]
));
}
record.RemoveField("PlanID");
record.RemoveField("PackageID");
return resultList;
}
2024-01-15 17:26:44 +08:00
2024-12-10 14:03:09 +08:00
return ArraySegment<DataRecord>.Empty;
2024-01-15 17:26:44 +08:00
};
2024-01-29 09:29:16 +08:00
});
host.Services.Configure<DatabaseOutputOptions>(options =>
{
2024-02-01 13:41:59 +08:00
options.ConnectionString = outputOptions.ConnectionString;
2024-01-29 09:29:16 +08:00
options.FlushCount = outputOptions.FlushCount;
2024-02-15 16:18:50 +08:00
options.MaxAllowedPacket = outputOptions.MaxAllowedPacket / 2;
2024-01-29 09:29:16 +08:00
options.MaxDatabaseOutputTask = outputOptions.MaxDatabaseOutputTask;
options.TreatJsonAsHex = outputOptions.TreatJsonAsHex;
2024-02-15 16:18:50 +08:00
options.NoOutput = outputOptions.NoOutput;
options.ForUpdate = outputOptions.ForUpdate;
2024-12-10 14:03:09 +08:00
// 配置列的类型以便于在输出时区分二进制内容
2024-01-29 09:29:16 +08:00
// Prod server
options.ColumnTypeConfig = new Dictionary<string, ColumnType>
2023-12-29 16:16:05 +08:00
{
2024-12-10 14:03:09 +08:00
{"machine.Settings", ColumnType.Text},
{"order_block_plan.BlockInfo", ColumnType.Text},
{"order_block_plan.OrderNos", ColumnType.Json},
{"order_block_plan_result.PlaceData", ColumnType.Blob},
{"order_box_block.Data", ColumnType.Blob},
{"order_data_block.RemarkJson", ColumnType.Text},
{"order_data_goods.ExtraProp", ColumnType.Json},
{"order_extra.ConfigJson", ColumnType.Json},
{"order_module_extra.Data", ColumnType.Blob},
{"order_module_extra.JsonStr", ColumnType.Text},
{"order_patch_detail.BlockDetail", ColumnType.Json},
{"order_process_schdule.AreaName", ColumnType.Text},
{"order_process_schdule.ConsigneeAddress", ColumnType.Text},
{"order_process_schdule.ConsigneePhone", ColumnType.Text},
{"order_process_schdule.CustomOrderNo", ColumnType.Text},
{"order_process_schdule.OrderProcessStepName", ColumnType.Text},
{"order_scrap_board.OutLineJson", ColumnType.Text},
{"order_wave_group.ConfigJson", ColumnType.Json},
{"process_info.Users", ColumnType.Text},
{"process_item_exp.ItemJson", ColumnType.Text},
{"report_template.SourceConfig", ColumnType.Text},
{"report_template.Template", ColumnType.Text},
{"simple_package.Items", ColumnType.Json},
{"sys_config.JsonStr", ColumnType.Text},
{"sys_config.Value", ColumnType.Text}
};
options.OutputFinished += ctx =>
{
var seq = ctx.Serivces.GetRequiredService<SeqService>();
seq.ApplyToDatabaseAsync().GetAwaiter().GetResult();
2023-12-29 16:16:05 +08:00
};
});
2023-12-29 16:16:05 +08:00
host.Services.AddLogging(builder =>
{
builder.ClearProviders();
var logger = new LoggerConfiguration()
.MinimumLevel.Verbose()
.WriteTo.Console()
.WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"),
restrictedToMinimumLevel: LogEventLevel.Error)
2024-01-29 09:29:16 +08:00
// .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
.CreateLogger();
builder.AddSerilog(logger);
Log.Logger = logger;
2023-12-29 16:16:05 +08:00
});
2024-01-29 09:29:16 +08:00
host.Services.AddDataSourceFactory();
host.Services.AddErrorRecorderFactory();
host.Services.AddSingleton<ProcessContext>();
2024-12-10 14:03:09 +08:00
host.Services.AddSingleton<SeqService>();
2024-02-09 13:41:40 +08:00
var prodLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ProducerQueueLength");
var consLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ConsumerQueueLength");
2024-02-09 19:08:57 +08:00
var maxCharCount = host.Configuration.GetRequiredSection("RecordQueue").GetValue<long>("MaxByteCount") / 2;
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer, new DataRecordQueue(prodLen, maxCharCount));
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(consLen, maxCharCount))).ToArray());
// host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
2024-01-29 09:29:16 +08:00
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();
2023-12-29 16:16:05 +08:00
2024-01-04 09:00:44 +08:00
host.Services.AddHostedService<MainHostedService>();
2024-01-29 09:29:16 +08:00
host.Services.AddSingleton<IInputService, FileInputService>();
2024-01-04 09:00:44 +08:00
host.Services.AddSingleton<ITransformService, TransformService>();
host.Services.AddSingleton<IOutputService, OutputService>();
2024-02-01 15:25:42 +08:00
host.Services.AddSingleton<TaskMonitorService>();
2024-02-10 17:45:13 +08:00
// host.Services.AddRedisCache(redisOptions);
host.Services.AddSingleton<ICacher, MemoryCache>();
2023-12-29 16:16:05 +08:00
var app = host.Build();
await app.RunAsync();
}