MES-ETL/MesETL.App/Program.cs

416 lines
18 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// #define FIX_PLAN_ITEM // 测试环境对OrderBlockPlanItem表进行修复时使用
using System.Text;
using MesETL.App;
using MesETL.App.Services;
using MesETL.App.Services.ETL;
using MesETL.App.Cache;
using MesETL.App.Const;
using MesETL.App.HostedServices;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services.ErrorRecorder;
using MesETL.App.Services.Loggers;
using MesETL.App.Services.Seq;
using MesETL.Shared.Compression;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Serilog;
using Serilog.Events;
using DumpDataHelper = MesETL.App.Helpers.DumpDataHelper;
await RunProgram();
return;
async Task RunProgram()
{
ThreadPool.SetMaxThreads(200, 200);
var host = Host.CreateApplicationBuilder(args);
host.Configuration.AddJsonFile(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "appsettings.json"), false, false);
host.Configuration.AddCommandLine(args, new Dictionary<string, string>
{
{ "-d", "Input:InputDir" },
{ "--InputDir", "Input:InputDir" },
{ "-s", "Output:ConnectionString" },
{ "--ConnectionString", "Output:ConnectionString" },
{ "-r", "RedisCache:Configuration" },
{ "--Redis", "RedisCache:Configuration" },
{ "-g", "TenantDb:UseDbGroup" },
{ "--UseDbGroup", "TenantDb:UseDbGroup" },
{ "-c", "Command" },
{ "--Command", "Command" }
});
var inputOptions = host.Configuration.GetRequiredSection("Input").Get<DataInputOptions>()
?? throw new ApplicationException("缺少Input配置");
var transformOptions = host.Configuration.GetRequiredSection("Transform").Get<DataTransformOptions>()
?? throw new ApplicationException("缺少Transform配置");
var outputOptions = host.Configuration.GetRequiredSection("Output").Get<DatabaseOutputOptions>()
?? throw new ApplicationException("缺少Output配置");
var redisSection = host.Configuration.GetRequiredSection("RedisCache");
var redisOptions = redisSection.Get<RedisCacheOptions>() ?? throw new ApplicationException("缺少RedisCache配置");
var tenantDbSection = host.Configuration.GetRequiredSection("TenantDb");
var tenantDbOptions = new TenantDbOptions()
{
TenantKey = tenantDbSection.GetValue<string>(nameof(TenantDbOptions.TenantKey)) ?? throw new ApplicationException("分库配置缺少分库键TenantKey"),
UseDbGroup = tenantDbSection.GetValue<string>(nameof(TenantDbOptions.UseDbGroup)) ?? throw new ApplicationException("分库配置缺少使用分库组UseDbGroup")
};
tenantDbOptions.DbGroup = tenantDbSection.GetRequiredSection($"DbGroups:{tenantDbOptions.UseDbGroup}").Get<Dictionary<string,int>>()
?? throw new ApplicationException($"分库配置无法解析分库组{tenantDbOptions.UseDbGroup},请检查配置");
host.Services.Configure<TenantDbOptions>(options =>
{
options.TenantKey = tenantDbOptions.TenantKey;
options.UseDbGroup = tenantDbOptions.UseDbGroup;
options.DbGroup = tenantDbOptions.DbGroup;
});
host.Services.Configure<RedisCacheOptions>(redisSection);
var oldestTime = DateTime.ParseExact(transformOptions.CleanDate, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo);
var oldestTimeInt_yyyyMM = int.Parse(transformOptions.CleanDate);
var oldestTimeInt_yyMM = int.Parse(transformOptions.CleanDate[2..]);
// 输入配置
host.Services.Configure<DataInputOptions>(options =>
{
options.InputDir = inputOptions.InputDir ?? throw new ApplicationException("未配置输入目录");
options.UseMock = inputOptions.UseMock;
options.TableMockConfig = inputOptions.TableMockConfig;
options.MockCountMultiplier = inputOptions.MockCountMultiplier;
options.TableIgnoreList = inputOptions.TableIgnoreList;
options.TableOrder = inputOptions.TableOrder;
// 配置文件元数据构建方法
options.FileInputMetaBuilder = DumpDataHelper.MyDumperFileInputMetaBuilder;
// 配置表输入完成事件
options.OnTableInputCompleted = null;
// 配置表模拟数据
options.TableMockConfig = new Dictionary<string, TableMockConfig>
{ };
});
host.Services.Configure<DataTransformOptions>(options =>
{
static string CalculateShardKeyByOrderNo(ReadOnlySpan<char> orderNo)
=> $"{orderNo[2..6]}0";
options.StrictMode = transformOptions.StrictMode;
options.EnableFilter = transformOptions.EnableFilter;
options.EnableReplacer = transformOptions.EnableReplacer;
options.EnableReBuilder = transformOptions.EnableReBuilder;
// order_block_plan_item和order_package_item表不导入根据order_item数据直接重建
// 数据清理
options.RecordFilter = async context => // TODO: OPT: oldestTime等外部变量会产生闭包
{
var record = context.Record;
switch (record.TableName)
{
// 清理CreateTime < 202401的
case TableNames.OrderBlockPlan:
{
var creationTime = DateTime.Parse(record["CreateTime"].AsSpan().Trim(['"', '\'']));
if (creationTime < oldestTime)
{
return false;
}
break;
}
// 忽略OrderBlockPlanItem
case TableNames.OrderBlockPlanItem:
{
return false;
}
// 清理(Status != 0 || Deleted = 1) && ID前四位 < 2401的
case TableNames.OrderScrapBoard:
{
var status = record["Status"].AsSpan();
var deleted = record["Deleted"].AsSpan();
var idPref = int.Parse(record["ID"].AsSpan()[..4]);
if ((status is not "0" || deleted is "1") && idPref < oldestTimeInt_yyMM)
{
return false;
}
break;
}
// 清理OrderNo < 202401的
case TableNames.SimplePackage:
{
var orderNo = int.Parse(record["OrderNo"].AsSpan()[..4]);
if (orderNo < oldestTimeInt_yyMM)
{
return false;
}
break;
}
// 清理CreateTime < 202401的
case TableNames.SimplePlanOrder:
{
var creationTime = DateTime.Parse(record["CreateTime"].AsSpan().Trim(['"', '\'']));
if (creationTime < oldestTime)
{
return false;
}
break;
}
default: break;
}
return true;
};
// 数据替换
/*
* 空数据处理:
* 某些列生产库为可空,而测试库变为了不可空,则需要根据列的类型对这些列做单独处理
* int或任何非无符号整型 -> -1
* varchar -> ''(空字符串)
* datetime -> '1000-01-01'datetime最小值
* text -> 0 16进制0MyDumper中的text是为16进制
*/
const string DefaultInt = "0";
const string DefaultStr = "''";
const string DefaultDateTime = "'1000-01-01'";
const string DefaultText = "0";
options.RecordModify = async context =>
{
void ReplaceIfMyDumperNull(DataRecord record, string fieldName, string replaceValue)
{
if (record[fieldName] is ConstVar.MyDumperNull)
{
context.Logger.LogWarning("发现不可空的字段为空({TableName}.{FieldName}),填充默认值: {DefaultValue}",
record.TableName, fieldName, replaceValue);
record[fieldName] = replaceValue;
}
}
var record = context.Record;
var cache = context.Cacher;
switch (record.TableName)
{
// 重构Data列二进制数据
case TableNames.OrderBoxBlock:
{
var data = record["Data"];
if (data is not ConstVar.MyDumperNull and ConstVar.Null)
{
var hex = Encoding.UTF8.GetString(Convert.FromHexString(data));
record["Data"] = hex;
}
break;
}
// 将JsonStr列转换为Data列添加CompressionType列
case TableNames.OrderModuleExtra:
{
record.AppendField("CompressionType", "1");
record.AppendField("Data",
Convert.ToHexString(DeflateArchive.Compress(Convert.FromHexString(record["JsonStr"]))));
record.RemoveField("JsonStr");
break;
}
// 删除ID列让数据库自行递增
// TODO: 数据表改进删除ID列或是替换为流水号
case TableNames.ProcessStepEfficiency:
{
record.RemoveField("ID");
break;
}
case TableNames.ProcessScheduleCapacity:
{
record.RemoveField("ID");
break;
}
case TableNames.SysConfig:
{
record.RemoveField("Key");
break;
}
// 移除PlaceData列如果存在的话生产库已经删除
case TableNames.SimplePlanOrder:
{
if(record.HeaderExists("PlaceData"))
record.RemoveField("PlaceData");
break;
}
default: break;
}
return record;
string ThrowIfNoCached(string? cached, string tableName, string cachedTableName, string cachedColumn, string appendMessage = "")
{
if (cached is null)
throw new InvalidDataException(
$"{tableName}数据异常,在缓存中未找到对应{cachedTableName}.{cachedColumn}\t{appendMessage}");
return cached;
}
};
// 数据缓存
options.RecordCache = null;
// 数据库过滤
options.DatabaseFilter = record =>
{
var companyId = int.Parse(record[tenantDbOptions.TenantKey]); // 每个实体都应存在CompanyID否则异常
return tenantDbOptions.GetDbNameByTenantKeyValue(companyId);
};
// 数据重建
options.RecordReBuild = context =>
{
var record = context.Record;
// 将OrderExtra表迁移至OrderWaveGroup表
if (record.TableName == TableNames.OrderExtra)
{
record.Ignore = true;
var resultList = new List<DataRecord>();
var seq = context.Services.GetRequiredService<SeqService>();
string[] headers = ["OrderNo", "ShardKey", "ConfigType", "ConfigJson", "CompanyID"];
var id = seq.AddCachedSeq(SeqConfig.OrderWaveGroupID);
var orderWaveGroup = new DataRecord(
[id.ToString(), ..headers.Select(c => record[c])],
TableNames.OrderExtraList,
["ID", "OrderNo", "ShardKey", "Type", "ConfigJson", "CompanyID"]);
resultList.Add(orderWaveGroup);
return resultList;
}
// 通过OrderItem重建OrderBlockPlanItem表
if (record.TableName == TableNames.OrderItem)
{
#if FIX_PLAN_ITEM
record.Ignore = true;
#endif
var resultList = new List<DataRecord>();
record.TryGetField("ID", out var itemId);
record.TryGetField("ShardKey", out var shardKey);
record.TryGetField("PlanID", out var planId);
record.TryGetField("PackageID", out var packageId);
record.TryGetField("CompanyID", out var companyId);
if(!int.TryParse(planId, out var pid))
throw new ApplicationException($"数据发生异常OrderItem.PlanID值: {(string.IsNullOrWhiteSpace(planId) ? "NULL" : planId)}");
if (pid > 0)
{
resultList.Add(new DataRecord([itemId, shardKey, planId, companyId],
TableNames.OrderBlockPlanItem,
["ItemID", "ShardKey", "PlanID", "CompanyID"]
));
}
if(!int.TryParse(packageId, out var pkid))
throw new ApplicationException($"数据发生异常OrderItem.PackageID值: {(string.IsNullOrWhiteSpace(packageId) ? "NULL" : packageId)}");
if(pkid > 0)
{
resultList.Add(new DataRecord([itemId, shardKey, packageId, companyId],
TableNames.OrderPackageItem,
[ "ItemID", "ShardKey", "PackageID", "CompanyID" ]
));
}
record.RemoveField("PlanID");
record.RemoveField("PackageID");
return resultList;
}
return ArraySegment<DataRecord>.Empty;
};
});
host.Services.Configure<DatabaseOutputOptions>(options =>
{
options.ConnectionString = outputOptions.ConnectionString;
options.FlushCount = outputOptions.FlushCount;
options.MaxAllowedPacket = outputOptions.MaxAllowedPacket / 2;
options.MaxDatabaseOutputTask = outputOptions.MaxDatabaseOutputTask;
options.TreatJsonAsHex = outputOptions.TreatJsonAsHex;
options.NoOutput = outputOptions.NoOutput;
options.ForUpdate = outputOptions.ForUpdate;
// 配置列的类型以便于在输出时区分二进制内容
// Prod server
options.ColumnTypeConfig = new Dictionary<string, ColumnType>
{
{"machine.Settings", ColumnType.Text},
{"order_block_plan.BlockInfo", ColumnType.Text},
{"order_block_plan.OrderNos", ColumnType.Json},
{"order_block_plan_result.PlaceData", ColumnType.Blob},
{"order_box_block.Data", ColumnType.Blob},
{"order_data_block.RemarkJson", ColumnType.Text},
{"order_data_goods.ExtraProp", ColumnType.Json},
{"order_extra.ConfigJson", ColumnType.Json},
{"order_module_extra.Data", ColumnType.Blob},
{"order_module_extra.JsonStr", ColumnType.Text},
{"order_patch_detail.BlockDetail", ColumnType.Json},
{"order_process_schdule.AreaName", ColumnType.Text},
{"order_process_schdule.ConsigneeAddress", ColumnType.Text},
{"order_process_schdule.ConsigneePhone", ColumnType.Text},
{"order_process_schdule.CustomOrderNo", ColumnType.Text},
{"order_process_schdule.OrderProcessStepName", ColumnType.Text},
{"order_scrap_board.OutLineJson", ColumnType.Text},
{"order_wave_group.ConfigJson", ColumnType.Json},
{"process_info.Users", ColumnType.Text},
{"process_item_exp.ItemJson", ColumnType.Text},
{"report_template.SourceConfig", ColumnType.Text},
{"report_template.Template", ColumnType.Text},
{"simple_package.Items", ColumnType.Json},
{"sys_config.JsonStr", ColumnType.Text},
{"sys_config.Value", ColumnType.Text}
};
options.OutputFinished += ctx =>
{
var seq = ctx.Serivces.GetRequiredService<SeqService>();
seq.ApplyToDatabaseAsync().GetAwaiter().GetResult();
};
});
host.Services.AddLogging(builder =>
{
builder.ClearProviders();
var logger = new LoggerConfiguration()
.MinimumLevel.Verbose()
.WriteTo.Console()
.WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"),
restrictedToMinimumLevel: LogEventLevel.Error)
// .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
.CreateLogger();
builder.AddSerilog(logger);
Log.Logger = logger;
});
host.Services.AddDataSourceFactory();
host.Services.AddErrorRecorderFactory();
host.Services.AddSingleton<ProcessContext>();
host.Services.AddSingleton<SeqService>();
var prodLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ProducerQueueLength");
var consLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ConsumerQueueLength");
var maxCharCount = host.Configuration.GetRequiredSection("RecordQueue").GetValue<long>("MaxByteCount") / 2;
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer, new DataRecordQueue(prodLen, maxCharCount));
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(consLen, maxCharCount))).ToArray());
// host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();
host.Services.AddHostedService<MainHostedService>();
host.Services.AddSingleton<IInputService, FileInputService>();
host.Services.AddSingleton<ITransformService, TransformService>();
host.Services.AddSingleton<IOutputService, OutputService>();
host.Services.AddSingleton<TaskMonitorService>();
// host.Services.AddRedisCache(redisOptions);
host.Services.AddSingleton<ICacher, MemoryCache>();
var app = host.Build();
await app.RunAsync();
}