From 8db7c71170dfc2f399aad6963b2d6d69411bfec7 Mon Sep 17 00:00:00 2001 From: CZY <2817212736@qq.com> Date: Thu, 8 Feb 2024 17:38:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- MesETL.App/Cache/CacheKeys.cs | 9 --- MesETL.App/Cache/CacheKeysFunc.cs | 33 +++++++++++ MesETL.App/DataRecord.cs | 1 - MesETL.App/HostedServices/FileInputService.cs | 2 +- MesETL.App/Program.cs | 59 ++++++++----------- MesETL.App/Services/DataRecordQueue.cs | 2 +- MesETL.App/Services/ETL/CsvReader.cs | 5 +- MesETL.App/Services/ETL/ZstReader.cs | 4 +- .../Services/ErrorRecorder/ErrorRecorder.cs | 4 +- MesETL.App/appsettings.json | 6 +- 10 files changed, 67 insertions(+), 58 deletions(-) delete mode 100644 MesETL.App/Cache/CacheKeys.cs create mode 100644 MesETL.App/Cache/CacheKeysFunc.cs diff --git a/MesETL.App/Cache/CacheKeys.cs b/MesETL.App/Cache/CacheKeys.cs deleted file mode 100644 index 150ba66..0000000 --- a/MesETL.App/Cache/CacheKeys.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace MesETL.App.Cache; - -#nullable disable -public static class CacheKeys -{ - public static Func Order_OrderNo_CompanyID { get; set; } - public static Func OrderBlockPlan_ID_CompanyID { get; set; } - public static Func OrderProcess_ID_ShardKey { get; set; } -} \ No newline at end of file diff --git a/MesETL.App/Cache/CacheKeysFunc.cs b/MesETL.App/Cache/CacheKeysFunc.cs new file mode 100644 index 0000000..33ad260 --- /dev/null +++ b/MesETL.App/Cache/CacheKeysFunc.cs @@ -0,0 +1,33 @@ +using MesETL.App.Const; + +namespace MesETL.App.Cache; + +#nullable disable +public static class CacheKeysFunc +{ + /// + /// Order表 由OrderNo获取对应的CompanyID + /// + /// + /// + public static string Order_OrderNo_CompanyID(string orderNo) => BuildCacheKey(TableNames.Order, "OrderNo", orderNo, "CompanyID"); + + /// + /// OrderBlockPlan表 由ID获取对应的CompanyID + /// + /// + /// + public static string OrderBlockPlan_ID_CompanyID(string id) => BuildCacheKey(TableNames.OrderBlockPlan, "ID", id, "CompanyID"); + + /// + /// OrderProcess表 由ID 获取对应的ShardKey + /// + /// + /// + public static string OrderProcess_ID_ShardKey(string id) => BuildCacheKey(TableNames.OrderProcess, "ID", id, "ShardKey"); + + + // 数据缓存键格式为[TableName]-[ColumnName@ColumnValue]-[CacheColumnName] + static string BuildCacheKey(string tableName, string columnName, string columnValue, string cacheColumnName) + => $"{tableName}-{columnName}@{columnValue}-{cacheColumnName}"; +} \ No newline at end of file diff --git a/MesETL.App/DataRecord.cs b/MesETL.App/DataRecord.cs index 3e93954..8ee37a3 100644 --- a/MesETL.App/DataRecord.cs +++ b/MesETL.App/DataRecord.cs @@ -40,7 +40,6 @@ public class DataRecord : ICloneable return idx; } - public string? RawField { get; set; } public IList Fields { get; } public IList Headers { get; } public string TableName { get; } diff --git a/MesETL.App/HostedServices/FileInputService.cs b/MesETL.App/HostedServices/FileInputService.cs index ae359c8..00a9ec7 100644 --- a/MesETL.App/HostedServices/FileInputService.cs +++ b/MesETL.App/HostedServices/FileInputService.cs @@ -68,7 +68,7 @@ public class FileInputService : IInputService foreach (var info in orderedInfo) { _logger.LogInformation("Reading file: {FileName}, table: {TableName}", info.FileName, info.TableName); - var source = _dataReaderFactory.CreateReader(info.FileName,info.TableName,info.Headers); + using var source = _dataReaderFactory.CreateReader(info.FileName,info.TableName,info.Headers); while (await source.ReadAsync()) { diff --git a/MesETL.App/Program.cs b/MesETL.App/Program.cs index 08b91f4..4bd7d6f 100644 --- a/MesETL.App/Program.cs +++ b/MesETL.App/Program.cs @@ -1,6 +1,5 @@ // #define USE_TEST_DB // 测试库的结构与生产库不一样,如果使用测试库运行,则加上USE_TEST_DB预处理器指令 -using System.Text.Json; using MesETL.App; using MesETL.App.Services; using MesETL.App.Services.ETL; @@ -215,10 +214,6 @@ async Task RunProgram() host.Services.Configure(options => { - // 数据缓存键格式为[TableName]-[ColumnName@ColumnValue]-[CacheColumnName] - static string BuildCacheKey(string tableName, string columnName, string columnValue, string cacheColumnName) - => $"{tableName}-{columnName}@{columnValue}-{cacheColumnName}"; - static string CalculateShardKeyByOrderNo(ReadOnlySpan orderNo) => $"{orderNo[2..6]}0"; @@ -239,7 +234,7 @@ async Task RunProgram() // OrderBoxBlock删除对应Order.OrderNo不存在的对象 case TableNames.OrderBoxBlock: { - if (!await cache.ExistsAsync(CacheKeys.Order_OrderNo_CompanyID(record["OrderNo"]))) + if (!await cache.ExistsAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"]))) return false; break; } @@ -256,7 +251,7 @@ async Task RunProgram() // OrderBlockPlanResult删除对应order_block_plan.ID不存在的对象 case TableNames.OrderBlockPlanResult: { - if (!await cache.ExistsAsync(CacheKeys.OrderBlockPlan_ID_CompanyID(record["ID"]))) + if (!await cache.ExistsAsync(CacheKeysFunc.OrderBlockPlan_ID_CompanyID(record["ID"]))) return false; break; } @@ -293,7 +288,7 @@ async Task RunProgram() // OrderProcessStepStep删除对应OrderProcess.ID不存在的对象 case TableNames.OrderProcessStepItem: { - if (!await cache.ExistsAsync(CacheKeys.OrderProcess_ID_ShardKey(record["OrderProcessID"]))) + if (!await cache.ExistsAsync(CacheKeysFunc.OrderProcess_ID_ShardKey(record["OrderProcessID"]))) return false; break; } @@ -331,17 +326,19 @@ async Task RunProgram() const string DefaultStr = "''"; const string DefaultDateTime = "'1000-01-01'"; const string DefaultText = "0"; - - static void ReplaceIfMyDumperNull(DataRecord record, string fieldName, string replaceValue) - { - Log.Logger.Warning("发现不可空的字段为空({TableName}.{FieldName}),填充默认值: {DefaultValue}", - record.TableName, fieldName, replaceValue); - if (record[fieldName] is ConstVar.MyDumperNull) - record[fieldName] = replaceValue; - } options.RecordModify = async context => { + void ReplaceIfMyDumperNull(DataRecord record, string fieldName, string replaceValue) + { + if (record[fieldName] is ConstVar.MyDumperNull) + { + context.Logger.LogWarning("发现不可空的字段为空({TableName}.{FieldName}),填充默认值: {DefaultValue}", + record.TableName, fieldName, replaceValue); + record[fieldName] = replaceValue; + } + } + var record = context.Record; var cache = context.Cacher; switch (record.TableName) @@ -367,17 +364,17 @@ async Task RunProgram() break; // OrderBlockPlanResult,添加CompanyID case TableNames.OrderBlockPlanResult: - record.AddField("CompanyID", + record.AddField("CompanyID", // 获取OrderBlockPlan.ID -> CompanyID - ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.OrderBlockPlan_ID_CompanyID(record["ID"])), - TableNames.OrderBlockPlanResult, TableNames.OrderBlockPlan, "ID", "脏数据未处理")); + ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.OrderBlockPlan_ID_CompanyID(record["ID"])), + TableNames.OrderBlockPlanResult, TableNames.OrderBlockPlan, "ID", "无法获取对应的CompanyID")); break; // OrderBoxBlock添加CompanyID列 case TableNames.OrderBoxBlock: record.AddField("CompanyID", // 获取Order.OrderNo -> CompanyID - ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.Order_OrderNo_CompanyID(record["OrderNo"])), - TableNames.OrderBoxBlock, TableNames.Order, "OrderNo", "脏数据未处理")); + ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])), + TableNames.OrderBoxBlock, TableNames.Order, "OrderNo", "无法获取对应的CompanyID")); break; // OrderModule添加ShardKey列,移除ViewFileName列 case TableNames.OrderModule: @@ -397,8 +394,8 @@ async Task RunProgram() ReplaceIfMyDumperNull(record, "DataID", DefaultInt); record.AddField("ShardKey", // 获取OrderProcess.ID -> ShardKey - ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.OrderProcess_ID_ShardKey(record["OrderProcessID"])), - TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "脏数据未处理")); + ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.OrderProcess_ID_ShardKey(record["OrderProcessID"])), + TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "无法获取对应的ShardKey")); break; // OrderScrapBoard处理非空列 case TableNames.OrderScrapBoard: @@ -442,28 +439,22 @@ async Task RunProgram() { // 缓存Order.OrderNo -> CompanyID case TableNames.Order: - CacheKeys.Order_OrderNo_CompanyID = orderNo => - BuildCacheKey(TableNames.Order, "OrderNo", orderNo, "CompanyID"); await cache.SetStringAsync( - CacheKeys.Order_OrderNo_CompanyID(record["OrderNo"]), + CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"]), record["CompanyID"]); break; // 缓存OrderBlockPlan.ID -> CompanyID case TableNames.OrderBlockPlan: - CacheKeys.OrderBlockPlan_ID_CompanyID = id => - BuildCacheKey(TableNames.OrderBlockPlan, "ID", id, "CompanyID"); await cache.SetStringAsync( - CacheKeys.OrderBlockPlan_ID_CompanyID(record["ID"]), + CacheKeysFunc.OrderBlockPlan_ID_CompanyID(record["ID"]), record["CompanyID"]); break; // 缓存OrderProcess.ID -> ShardKey case TableNames.OrderProcess: - CacheKeys.OrderProcess_ID_ShardKey = id => - BuildCacheKey(TableNames.OrderProcess, "ID", id, "ShardKey"); await cache.SetStringAsync( - CacheKeys.OrderProcess_ID_ShardKey(record["ID"]), + CacheKeysFunc.OrderProcess_ID_ShardKey(record["ID"]), record["ShardKey"]); break; } @@ -604,8 +595,8 @@ async Task RunProgram() host.Services.AddDataSourceFactory(); host.Services.AddErrorRecorderFactory(); host.Services.AddSingleton(); - host.Services.AddKeyedSingleton(ConstVar.Producer); - host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(500_000))).ToArray()); + host.Services.AddKeyedSingleton(ConstVar.Producer, new DataRecordQueue(200_000)); + host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(60_000))).ToArray()); host.Services.AddSingleton(); host.Services.AddSingleton(); diff --git a/MesETL.App/Services/DataRecordQueue.cs b/MesETL.App/Services/DataRecordQueue.cs index fa877ae..729a7cf 100644 --- a/MesETL.App/Services/DataRecordQueue.cs +++ b/MesETL.App/Services/DataRecordQueue.cs @@ -17,7 +17,7 @@ public class DataRecordQueue : IDisposable public event Action? OnRecordWrite; public event Action? OnRecordRead; - public DataRecordQueue() : this(1000000) // 默认容量最大1M + public DataRecordQueue() : this(500_000) // 默认容量最大500K { } diff --git a/MesETL.App/Services/ETL/CsvReader.cs b/MesETL.App/Services/ETL/CsvReader.cs index cfb02c4..cfb6d6f 100644 --- a/MesETL.App/Services/ETL/CsvReader.cs +++ b/MesETL.App/Services/ETL/CsvReader.cs @@ -16,7 +16,6 @@ public class CsvReader : IDataReader public DataRecord Current { get; protected set; } = null!; public string[] Headers { get; } - public string? CurrentRaw { get; protected set; } public string Delimiter { get; } public char QuoteChar { get; } @@ -50,10 +49,8 @@ public class CsvReader : IDataReader if (string.IsNullOrWhiteSpace(str)) return false; - CurrentRaw = str; - var fields = ParseRow(str, QuoteChar, Delimiter); - Current = new DataRecord(fields, TableName, Headers){RawField = str}; + Current = new DataRecord(fields, TableName, Headers); return true; } diff --git a/MesETL.App/Services/ETL/ZstReader.cs b/MesETL.App/Services/ETL/ZstReader.cs index 583e1e2..65b1716 100644 --- a/MesETL.App/Services/ETL/ZstReader.cs +++ b/MesETL.App/Services/ETL/ZstReader.cs @@ -31,10 +31,8 @@ public class ZstReader : CsvReader if (string.IsNullOrWhiteSpace(str)) return false; - CurrentRaw = str; - var fields = ParseRow(str, QuoteChar, Delimiter); - Current = new DataRecord(fields, TableName, Headers) {RawField = str}; + Current = new DataRecord(fields, TableName, Headers); return true; } diff --git a/MesETL.App/Services/ErrorRecorder/ErrorRecorder.cs b/MesETL.App/Services/ErrorRecorder/ErrorRecorder.cs index 602a9f0..01afa77 100644 --- a/MesETL.App/Services/ErrorRecorder/ErrorRecorder.cs +++ b/MesETL.App/Services/ErrorRecorder/ErrorRecorder.cs @@ -24,7 +24,7 @@ public class ErrorRecorder Directory.CreateDirectory(outputDir); var content = $""" ### {exception.Message} - {record.RawField} + {string.Join(',', record.Fields)} """; var path = Path.Combine(outputDir, $"{record.TableName}.errlog"); await File.AppendAllTextAsync(path, content); @@ -57,7 +57,7 @@ public class ErrorRecorder var content = $""" ### {exception.Message} - {record.RawField} + {string.Join(',', record.Fields)} """; await writer.WriteLineAsync(content); if (token.IsCancellationRequested) diff --git a/MesETL.App/appsettings.json b/MesETL.App/appsettings.json index 5b18fee..3608187 100644 --- a/MesETL.App/appsettings.json +++ b/MesETL.App/appsettings.json @@ -5,9 +5,9 @@ } }, "Input":{ - "InputDir": "D:\\Dump\\MockData", // Csv数据输入目录 + "InputDir": "D:\\Dump\\MyDumper-ZST 2024-02-05", // Csv数据输入目录 "UseMock": false, // 使用模拟数据进行测试 - "MockCountMultiplier": 0 // 模拟数据量级的乘数 + "MockCountMultiplier": 1 // 模拟数据量级的乘数 }, "Transform":{ "StrictMode": false, // 设为true时如果数据转换发生错误,立刻停止程序 @@ -30,7 +30,7 @@ "TenantDb": // 分库配置 { "TenantKey" : "CompanyID", - "UseDbGroup": "test", + "UseDbGroup": "prod", "DbGroups": { "test": { "cferp_test_1": 1000,