优化性能

This commit is contained in:
陈梓阳 2024-02-08 17:38:23 +08:00
parent 20cc78c667
commit 8db7c71170
10 changed files with 67 additions and 58 deletions

View File

@ -1,9 +0,0 @@
namespace MesETL.App.Cache;
#nullable disable
public static class CacheKeys
{
public static Func<string, string> Order_OrderNo_CompanyID { get; set; }
public static Func<string, string> OrderBlockPlan_ID_CompanyID { get; set; }
public static Func<string, string> OrderProcess_ID_ShardKey { get; set; }
}

View File

@ -0,0 +1,33 @@
using MesETL.App.Const;
namespace MesETL.App.Cache;
#nullable disable
public static class CacheKeysFunc
{
/// <summary>
/// Order表 由OrderNo获取对应的CompanyID
/// </summary>
/// <param name="orderNo"></param>
/// <returns></returns>
public static string Order_OrderNo_CompanyID(string orderNo) => BuildCacheKey(TableNames.Order, "OrderNo", orderNo, "CompanyID");
/// <summary>
/// OrderBlockPlan表 由ID获取对应的CompanyID
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public static string OrderBlockPlan_ID_CompanyID(string id) => BuildCacheKey(TableNames.OrderBlockPlan, "ID", id, "CompanyID");
/// <summary>
/// OrderProcess表 由ID 获取对应的ShardKey
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public static string OrderProcess_ID_ShardKey(string id) => BuildCacheKey(TableNames.OrderProcess, "ID", id, "ShardKey");
// 数据缓存键格式为[TableName]-[ColumnName@ColumnValue]-[CacheColumnName]
static string BuildCacheKey(string tableName, string columnName, string columnValue, string cacheColumnName)
=> $"{tableName}-{columnName}@{columnValue}-{cacheColumnName}";
}

View File

@ -40,7 +40,6 @@ public class DataRecord : ICloneable
return idx; return idx;
} }
public string? RawField { get; set; }
public IList<string> Fields { get; } public IList<string> Fields { get; }
public IList<string> Headers { get; } public IList<string> Headers { get; }
public string TableName { get; } public string TableName { get; }

View File

@ -68,7 +68,7 @@ public class FileInputService : IInputService
foreach (var info in orderedInfo) foreach (var info in orderedInfo)
{ {
_logger.LogInformation("Reading file: {FileName}, table: {TableName}", info.FileName, info.TableName); _logger.LogInformation("Reading file: {FileName}, table: {TableName}", info.FileName, info.TableName);
var source = _dataReaderFactory.CreateReader(info.FileName,info.TableName,info.Headers); using var source = _dataReaderFactory.CreateReader(info.FileName,info.TableName,info.Headers);
while (await source.ReadAsync()) while (await source.ReadAsync())
{ {

View File

@ -1,6 +1,5 @@
// #define USE_TEST_DB // 测试库的结构与生产库不一样如果使用测试库运行则加上USE_TEST_DB预处理器指令 // #define USE_TEST_DB // 测试库的结构与生产库不一样如果使用测试库运行则加上USE_TEST_DB预处理器指令
using System.Text.Json;
using MesETL.App; using MesETL.App;
using MesETL.App.Services; using MesETL.App.Services;
using MesETL.App.Services.ETL; using MesETL.App.Services.ETL;
@ -215,10 +214,6 @@ async Task RunProgram()
host.Services.Configure<DataTransformOptions>(options => host.Services.Configure<DataTransformOptions>(options =>
{ {
// 数据缓存键格式为[TableName]-[ColumnName@ColumnValue]-[CacheColumnName]
static string BuildCacheKey(string tableName, string columnName, string columnValue, string cacheColumnName)
=> $"{tableName}-{columnName}@{columnValue}-{cacheColumnName}";
static string CalculateShardKeyByOrderNo(ReadOnlySpan<char> orderNo) static string CalculateShardKeyByOrderNo(ReadOnlySpan<char> orderNo)
=> $"{orderNo[2..6]}0"; => $"{orderNo[2..6]}0";
@ -239,7 +234,7 @@ async Task RunProgram()
// OrderBoxBlock删除对应Order.OrderNo不存在的对象 // OrderBoxBlock删除对应Order.OrderNo不存在的对象
case TableNames.OrderBoxBlock: case TableNames.OrderBoxBlock:
{ {
if (!await cache.ExistsAsync(CacheKeys.Order_OrderNo_CompanyID(record["OrderNo"]))) if (!await cache.ExistsAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])))
return false; return false;
break; break;
} }
@ -256,7 +251,7 @@ async Task RunProgram()
// OrderBlockPlanResult删除对应order_block_plan.ID不存在的对象 // OrderBlockPlanResult删除对应order_block_plan.ID不存在的对象
case TableNames.OrderBlockPlanResult: case TableNames.OrderBlockPlanResult:
{ {
if (!await cache.ExistsAsync(CacheKeys.OrderBlockPlan_ID_CompanyID(record["ID"]))) if (!await cache.ExistsAsync(CacheKeysFunc.OrderBlockPlan_ID_CompanyID(record["ID"])))
return false; return false;
break; break;
} }
@ -293,7 +288,7 @@ async Task RunProgram()
// OrderProcessStepStep删除对应OrderProcess.ID不存在的对象 // OrderProcessStepStep删除对应OrderProcess.ID不存在的对象
case TableNames.OrderProcessStepItem: case TableNames.OrderProcessStepItem:
{ {
if (!await cache.ExistsAsync(CacheKeys.OrderProcess_ID_ShardKey(record["OrderProcessID"]))) if (!await cache.ExistsAsync(CacheKeysFunc.OrderProcess_ID_ShardKey(record["OrderProcessID"])))
return false; return false;
break; break;
} }
@ -331,17 +326,19 @@ async Task RunProgram()
const string DefaultStr = "''"; const string DefaultStr = "''";
const string DefaultDateTime = "'1000-01-01'"; const string DefaultDateTime = "'1000-01-01'";
const string DefaultText = "0"; const string DefaultText = "0";
static void ReplaceIfMyDumperNull(DataRecord record, string fieldName, string replaceValue)
{
Log.Logger.Warning("发现不可空的字段为空({TableName}.{FieldName}),填充默认值: {DefaultValue}",
record.TableName, fieldName, replaceValue);
if (record[fieldName] is ConstVar.MyDumperNull)
record[fieldName] = replaceValue;
}
options.RecordModify = async context => options.RecordModify = async context =>
{ {
void ReplaceIfMyDumperNull(DataRecord record, string fieldName, string replaceValue)
{
if (record[fieldName] is ConstVar.MyDumperNull)
{
context.Logger.LogWarning("发现不可空的字段为空({TableName}.{FieldName}),填充默认值: {DefaultValue}",
record.TableName, fieldName, replaceValue);
record[fieldName] = replaceValue;
}
}
var record = context.Record; var record = context.Record;
var cache = context.Cacher; var cache = context.Cacher;
switch (record.TableName) switch (record.TableName)
@ -367,17 +364,17 @@ async Task RunProgram()
break; break;
// OrderBlockPlanResult添加CompanyID // OrderBlockPlanResult添加CompanyID
case TableNames.OrderBlockPlanResult: case TableNames.OrderBlockPlanResult:
record.AddField("CompanyID", record.AddField("CompanyID",
// 获取OrderBlockPlan.ID -> CompanyID // 获取OrderBlockPlan.ID -> CompanyID
ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.OrderBlockPlan_ID_CompanyID(record["ID"])), ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.OrderBlockPlan_ID_CompanyID(record["ID"])),
TableNames.OrderBlockPlanResult, TableNames.OrderBlockPlan, "ID", "脏数据未处理")); TableNames.OrderBlockPlanResult, TableNames.OrderBlockPlan, "ID", "无法获取对应的CompanyID"));
break; break;
// OrderBoxBlock添加CompanyID列 // OrderBoxBlock添加CompanyID列
case TableNames.OrderBoxBlock: case TableNames.OrderBoxBlock:
record.AddField("CompanyID", record.AddField("CompanyID",
// 获取Order.OrderNo -> CompanyID // 获取Order.OrderNo -> CompanyID
ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.Order_OrderNo_CompanyID(record["OrderNo"])), ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])),
TableNames.OrderBoxBlock, TableNames.Order, "OrderNo", "脏数据未处理")); TableNames.OrderBoxBlock, TableNames.Order, "OrderNo", "无法获取对应的CompanyID"));
break; break;
// OrderModule添加ShardKey列移除ViewFileName列 // OrderModule添加ShardKey列移除ViewFileName列
case TableNames.OrderModule: case TableNames.OrderModule:
@ -397,8 +394,8 @@ async Task RunProgram()
ReplaceIfMyDumperNull(record, "DataID", DefaultInt); ReplaceIfMyDumperNull(record, "DataID", DefaultInt);
record.AddField("ShardKey", record.AddField("ShardKey",
// 获取OrderProcess.ID -> ShardKey // 获取OrderProcess.ID -> ShardKey
ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.OrderProcess_ID_ShardKey(record["OrderProcessID"])), ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.OrderProcess_ID_ShardKey(record["OrderProcessID"])),
TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "脏数据未处理")); TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "无法获取对应的ShardKey"));
break; break;
// OrderScrapBoard处理非空列 // OrderScrapBoard处理非空列
case TableNames.OrderScrapBoard: case TableNames.OrderScrapBoard:
@ -442,28 +439,22 @@ async Task RunProgram()
{ {
// 缓存Order.OrderNo -> CompanyID // 缓存Order.OrderNo -> CompanyID
case TableNames.Order: case TableNames.Order:
CacheKeys.Order_OrderNo_CompanyID = orderNo =>
BuildCacheKey(TableNames.Order, "OrderNo", orderNo, "CompanyID");
await cache.SetStringAsync( await cache.SetStringAsync(
CacheKeys.Order_OrderNo_CompanyID(record["OrderNo"]), CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"]),
record["CompanyID"]); record["CompanyID"]);
break; break;
// 缓存OrderBlockPlan.ID -> CompanyID // 缓存OrderBlockPlan.ID -> CompanyID
case TableNames.OrderBlockPlan: case TableNames.OrderBlockPlan:
CacheKeys.OrderBlockPlan_ID_CompanyID = id =>
BuildCacheKey(TableNames.OrderBlockPlan, "ID", id, "CompanyID");
await cache.SetStringAsync( await cache.SetStringAsync(
CacheKeys.OrderBlockPlan_ID_CompanyID(record["ID"]), CacheKeysFunc.OrderBlockPlan_ID_CompanyID(record["ID"]),
record["CompanyID"]); record["CompanyID"]);
break; break;
// 缓存OrderProcess.ID -> ShardKey // 缓存OrderProcess.ID -> ShardKey
case TableNames.OrderProcess: case TableNames.OrderProcess:
CacheKeys.OrderProcess_ID_ShardKey = id =>
BuildCacheKey(TableNames.OrderProcess, "ID", id, "ShardKey");
await cache.SetStringAsync( await cache.SetStringAsync(
CacheKeys.OrderProcess_ID_ShardKey(record["ID"]), CacheKeysFunc.OrderProcess_ID_ShardKey(record["ID"]),
record["ShardKey"]); record["ShardKey"]);
break; break;
} }
@ -604,8 +595,8 @@ async Task RunProgram()
host.Services.AddDataSourceFactory(); host.Services.AddDataSourceFactory();
host.Services.AddErrorRecorderFactory(); host.Services.AddErrorRecorderFactory();
host.Services.AddSingleton<ProcessContext>(); host.Services.AddSingleton<ProcessContext>();
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer); host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer, new DataRecordQueue(200_000));
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(500_000))).ToArray()); host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(60_000))).ToArray());
host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>(); host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>(); host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();

View File

@ -17,7 +17,7 @@ public class DataRecordQueue : IDisposable
public event Action? OnRecordWrite; public event Action? OnRecordWrite;
public event Action? OnRecordRead; public event Action? OnRecordRead;
public DataRecordQueue() : this(1000000) // 默认容量最大1M public DataRecordQueue() : this(500_000) // 默认容量最大500K
{ {
} }

View File

@ -16,7 +16,6 @@ public class CsvReader : IDataReader
public DataRecord Current { get; protected set; } = null!; public DataRecord Current { get; protected set; } = null!;
public string[] Headers { get; } public string[] Headers { get; }
public string? CurrentRaw { get; protected set; }
public string Delimiter { get; } public string Delimiter { get; }
public char QuoteChar { get; } public char QuoteChar { get; }
@ -50,10 +49,8 @@ public class CsvReader : IDataReader
if (string.IsNullOrWhiteSpace(str)) if (string.IsNullOrWhiteSpace(str))
return false; return false;
CurrentRaw = str;
var fields = ParseRow(str, QuoteChar, Delimiter); var fields = ParseRow(str, QuoteChar, Delimiter);
Current = new DataRecord(fields, TableName, Headers){RawField = str}; Current = new DataRecord(fields, TableName, Headers);
return true; return true;
} }

View File

@ -31,10 +31,8 @@ public class ZstReader : CsvReader
if (string.IsNullOrWhiteSpace(str)) if (string.IsNullOrWhiteSpace(str))
return false; return false;
CurrentRaw = str;
var fields = ParseRow(str, QuoteChar, Delimiter); var fields = ParseRow(str, QuoteChar, Delimiter);
Current = new DataRecord(fields, TableName, Headers) {RawField = str}; Current = new DataRecord(fields, TableName, Headers);
return true; return true;
} }

View File

@ -24,7 +24,7 @@ public class ErrorRecorder
Directory.CreateDirectory(outputDir); Directory.CreateDirectory(outputDir);
var content = $""" var content = $"""
### {exception.Message} ### {exception.Message}
{record.RawField} {string.Join(',', record.Fields)}
"""; """;
var path = Path.Combine(outputDir, $"{record.TableName}.errlog"); var path = Path.Combine(outputDir, $"{record.TableName}.errlog");
await File.AppendAllTextAsync(path, content); await File.AppendAllTextAsync(path, content);
@ -57,7 +57,7 @@ public class ErrorRecorder
var content = var content =
$""" $"""
### {exception.Message} ### {exception.Message}
{record.RawField} {string.Join(',', record.Fields)}
"""; """;
await writer.WriteLineAsync(content); await writer.WriteLineAsync(content);
if (token.IsCancellationRequested) if (token.IsCancellationRequested)

View File

@ -5,9 +5,9 @@
} }
}, },
"Input":{ "Input":{
"InputDir": "D:\\Dump\\MockData", // Csv "InputDir": "D:\\Dump\\MyDumper-ZST 2024-02-05", // Csv
"UseMock": false, // 使 "UseMock": false, // 使
"MockCountMultiplier": 0 // "MockCountMultiplier": 1 //
}, },
"Transform":{ "Transform":{
"StrictMode": false, // true "StrictMode": false, // true
@ -30,7 +30,7 @@
"TenantDb": // "TenantDb": //
{ {
"TenantKey" : "CompanyID", "TenantKey" : "CompanyID",
"UseDbGroup": "test", "UseDbGroup": "prod",
"DbGroups": { "DbGroups": {
"test": { "test": {
"cferp_test_1": 1000, "cferp_test_1": 1000,