From 8037fc74de546896dd08fa5f4b145d970f7b00fa Mon Sep 17 00:00:00 2001 From: "2817212736@qq.com" <2817212736@qq.com> Date: Fri, 27 Dec 2024 15:18:08 +0800 Subject: [PATCH] =?UTF-8?q?DataRecord=E7=BB=93=E6=9E=84=E6=94=B9=E8=BF=9B?= =?UTF-8?q?=EF=BC=8C=E6=9B=B4=E6=96=B02025=E5=B9=B4=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E8=A1=A8=E8=BD=AC=E6=8D=A2=E8=A7=84=E5=88=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- MesETL.App/Const/TableNames.cs | 2 +- MesETL.App/DataRecord.cs | 193 +++++++++--------- .../HostedServices/MainHostedService.cs | 7 +- MesETL.App/Program.cs | 23 ++- MesETL.App/Services/ProcessContext.cs | 2 +- MesETL.Shared/Compression/DeflateArchive.cs | 39 ++++ MesETL.Shared/Helper/Helper.Compress.cs | 22 -- MesETL.Test/DatabaseToolBox.cs | 3 +- 8 files changed, 166 insertions(+), 125 deletions(-) create mode 100644 MesETL.Shared/Compression/DeflateArchive.cs delete mode 100644 MesETL.Shared/Helper/Helper.Compress.cs diff --git a/MesETL.App/Const/TableNames.cs b/MesETL.App/Const/TableNames.cs index c68cdf6..8a1c424 100644 --- a/MesETL.App/Const/TableNames.cs +++ b/MesETL.App/Const/TableNames.cs @@ -25,7 +25,7 @@ public static class TableNames public const string OrderProcessStep = "order_process_step"; public const string OrderProcessStepItem = "order_process_step_item"; public const string OrderScrapBoard = "order_scrap_board"; - public const string OrderWaveGroup = "order_wave_group"; + public const string OrderExtraList = "order_extra_list"; public const string ProcessGroup = "process_group"; public const string ProcessInfo = "process_info"; public const string ProcessItemExp = "process_item_exp"; diff --git a/MesETL.App/DataRecord.cs b/MesETL.App/DataRecord.cs index 6ec74bb..689dbef 100644 --- a/MesETL.App/DataRecord.cs +++ b/MesETL.App/DataRecord.cs @@ -2,52 +2,12 @@ public class DataRecord : ICloneable { - /// - /// 尝试获取一条记录的某个字段值 - /// - /// - /// - /// - /// - /// - public static bool TryGetField(DataRecord record, string columnName, out string value) - { - value = string.Empty; - if (record.Headers is null) - throw new InvalidOperationException("Cannot get field when headers of a record have not been set."); - var idx = IndexOfIgnoreCase(record.Headers, columnName); - if (idx == -1) - return false; - value = record.Fields[idx]; - return true; - } - - /// - /// 获取一条记录的某个字段值 - /// TODO: 最好能优化至O(1) - /// - /// - /// - /// - /// - /// - public static string GetField(DataRecord record, string columnName) - { - if (record.Headers is null) - throw new InvalidOperationException("Headers have not been set."); - var idx = IndexOfIgnoreCase(record.Headers, columnName); - if (idx is -1) - throw new IndexOutOfRangeException( - $"Column name '{columnName}' not found in this record, table name '{record.TableName}'."); - return record.Fields[idx]; - } - private static int IndexOfIgnoreCase(IList list, string value) { var idx = -1; for (var i = 0; i < list.Count; i++) { - if (list[i].Equals(value, StringComparison.OrdinalIgnoreCase)) + if (list[i].Equals(value, StringComparison.OrdinalIgnoreCase)) { idx = i; break; @@ -57,45 +17,54 @@ public class DataRecord : ICloneable return idx; } + private readonly List _fields; + private readonly List _headers; + /// /// 字段列表 /// - public IList Fields { get; } + public IReadOnlyList Fields => _fields; + /// /// 表头列表 /// - public IList Headers { get; } + public IReadOnlyList Headers => _headers; + /// /// 来源表名 /// public string TableName { get; } + /// /// 需要输出的数据库 /// public string? Database { get; set; } + /// /// 所有字段的总字符数量 /// public long FieldCharCount { get; } + /// /// 忽略这个记录,不会被输出 /// public bool Ignore { get; set; } - public DataRecord(IEnumerable fields, string tableName, IEnumerable headers, string? database = null) + public DataRecord(IEnumerable fields, string tableName, IEnumerable headers, + string? database = null) { - Fields = fields.ToList(); + _fields = fields.ToList(); TableName = tableName; - Headers = headers.ToList(); + _headers = headers.ToList(); Database = database; - - if (Fields.Count != Headers.Count) + + if (_fields.Count != _headers.Count) throw new ArgumentException( - $"The number of fields does not match the number of headers. Expected: {Headers.Count} Got: {Fields.Count} Fields: {string.Join(',', Fields)}", + $"The number of fields does not match the number of headers. Expected: {_headers.Count} Got: {_fields.Count} Fields: {string.Join(',', _fields)}", nameof(fields)); - - FieldCharCount = Fields.Sum(x => (long)x.Length); + + FieldCharCount = _fields.Sum(x => (long)x.Length); } /// @@ -104,8 +73,8 @@ public class DataRecord : ICloneable /// public string this[int index] { - get => Fields[index]; - set => Fields[index] = value; + get => _fields[index]; + set => _fields[index] = value; } /// @@ -114,61 +83,101 @@ public class DataRecord : ICloneable /// public string this[string columnName] { - get => GetField(this, columnName); + get => GetField(columnName); set => SetField(columnName, value); } /// - /// 尝试获取字段值 + /// 尝试获取某个字段值 /// /// /// /// - public bool TryGetField(string columnName, out string value) => TryGetField(this, columnName, out value); - - /// - /// 为一个字段赋值 - /// - /// - /// - /// - public bool SetField(string columnName, string value) => SetField(this, columnName, value); - - /// - /// - /// - /// - /// - /// - /// /// - /// - public static bool SetField(DataRecord record, string columnName, string value) + public bool TryGetField(string columnName, out string value) { - // 表头检查 - if (record.Headers is null) - throw new InvalidOperationException("记录的表头尚未设置,无法赋值"); - var idx = IndexOfIgnoreCase(record.Headers, columnName); - if (idx is -1) - throw new IndexOutOfRangeException( - $"列 '{columnName}' 不存在于该纪录中,表名 '{record.TableName}"); - record.Fields[idx] = value; + value = string.Empty; + if (_headers is null) + throw new InvalidOperationException("Cannot get field when headers of a record have not been set."); + var idx = IndexOfIgnoreCase(_headers, columnName); + if (idx == -1) + return false; + value = _fields[idx]; return true; } - - public void RemoveField(string columnName) + + /// + /// 获取一条记录的某个字段值 + /// TODO: 最好能优化至O(1) + /// + /// + /// + /// + /// + public string GetField(string columnName) { - var idx = IndexOfIgnoreCase(Headers, columnName); - if (idx == -1) - throw new InvalidOperationException($"{TableName}: 列名 '{columnName}' 不存在"); - - Fields.RemoveAt(idx); - Headers.Remove(columnName); + if (_headers is null) + throw new InvalidOperationException("记录的表头尚未设置,无法赋值"); + var idx = IndexOfIgnoreCase(_headers, columnName); + if (idx is -1) + throw new IndexOutOfRangeException( + $"列 '{columnName}' 不存在于该纪录中,表名 '{TableName}"); + return _fields[idx]; } - public bool HeaderExists(string columnName) => IndexOfIgnoreCase(Headers, columnName) != -1; + /// + /// 为记录的一个字段赋值,如果该字段名不存在则会抛出异常 + /// + /// 列名 + /// 值 + /// + /// 该记录的表头尚未初始化,你可能在错误的阶段调用了该方法 + /// 输入的字段名不存在于该记录中 + public void SetField(string columnName, string value) + { + // 表头检查 + if (_headers is null) + throw new InvalidOperationException("记录的表头尚未设置,无法赋值"); + var idx = IndexOfIgnoreCase(_headers, columnName); + if (idx is -1) + throw new IndexOutOfRangeException( + $"列 '{columnName}' 不存在于该纪录中,表名 '{TableName}"); + _fields[idx] = value; + } + + /// + /// 在记录中追加一个字段 + /// + /// 字段名 + /// 字段值 + /// 记录的表头尚未初始化,你可能在错误的阶段调用了此方法 + /// 提供的字段名已存在于该记录中 + public void AppendField(string columnName, string value) + { + if (_headers is null) + throw new InvalidOperationException("记录的表头尚未设置,无法赋值"); + var idx = IndexOfIgnoreCase(_headers, columnName); + if (idx is > 0) + throw new ArgumentException($"字段名 '{columnName}' 已存在于该记录中,无法重复添加", nameof(columnName)); + + _headers.Add(columnName); + _fields.Add(value); + } + + public void RemoveField(string columnName) + { + var idx = IndexOfIgnoreCase(_headers, columnName); + if (idx == -1) + throw new InvalidOperationException($"{TableName}: 列名 '{columnName}' 不存在"); + + _fields.RemoveAt(idx); + _headers.Remove(columnName); + } + + public bool HeaderExists(string columnName) => IndexOfIgnoreCase(_headers, columnName) != -1; + public object Clone() { - return new DataRecord(new List(Fields), TableName, new List(Headers), Database); + return new DataRecord(new List(_fields), TableName, new List(_headers), Database); } } \ No newline at end of file diff --git a/MesETL.App/HostedServices/MainHostedService.cs b/MesETL.App/HostedServices/MainHostedService.cs index ef0b145..5570002 100644 --- a/MesETL.App/HostedServices/MainHostedService.cs +++ b/MesETL.App/HostedServices/MainHostedService.cs @@ -77,6 +77,8 @@ public class MainHostedService : BackgroundService await Task.WhenAll(inputTask, transformTask, outputTask); _stopwatch.Stop(); _logger.LogInformation("***** 所有传输任务均已完成 *****"); + if (_context.HasException) + _logger.LogError("***** 传输过程中有错误发生 *****"); _logger.LogInformation("***** 耗时:{Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3")); await Task.Delay(5000, stoppingToken); @@ -114,7 +116,10 @@ public class MainHostedService : BackgroundService { var connStr = _databaseOptions.Value.ConnectionString ?? throw new ApplicationException("分库配置中没有配置数据库"); - _logger.LogWarning("已开启MySQL延迟写入功能并禁用重做日志,请注意数据安全"); + if (enable) + _logger.LogWarning("已开启MySQL延迟写入功能并禁用重做日志,请注意数据安全"); + else _logger.LogInformation("不安全变量已关闭"); + if (enable) { await DatabaseHelper.NonQueryAsync(connStr, diff --git a/MesETL.App/Program.cs b/MesETL.App/Program.cs index 369c4d1..79a3832 100644 --- a/MesETL.App/Program.cs +++ b/MesETL.App/Program.cs @@ -1,4 +1,4 @@ -#define USE_TEST_DB // 如果使用测试库运行,则加上USE_TEST_DB预处理器指令 +// #define USE_TEST_DB // 如果使用测试库运行,则加上USE_TEST_DB预处理器指令 using System.Text; using MesETL.App; @@ -12,6 +12,7 @@ using MesETL.App.Options; using MesETL.App.Services.ErrorRecorder; using MesETL.App.Services.Loggers; using MesETL.App.Services.Seq; +using MesETL.Shared.Compression; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -217,6 +218,15 @@ async Task RunProgram() break; } + // 将JsonStr列转换为Data列,添加CompressionType列 + case TableNames.OrderModuleExtra: + { + record.AppendField("CompressionType", "1"); + record.AppendField("Data", + Convert.ToHexString(DeflateArchive.Compress(Convert.FromHexString(record["JsonStr"])))); + record.RemoveField("JsonStr"); + break; + } // 删除ID列,让数据库自行递增 // TODO: 数据表改进,删除ID列或是替换为流水号 case TableNames.ProcessStepEfficiency: @@ -234,14 +244,13 @@ async Task RunProgram() record.RemoveField("Key"); break; } -#if USE_TEST_DB - // 测试环境忽略PlaceData列,生产环境会提前将其移除 + // 移除PlaceData列(如果存在的话,生产库已经删除) case TableNames.SimplePlanOrder: { - record.RemoveField("PlaceData"); + if(record.HeaderExists("PlaceData")) + record.RemoveField("PlaceData"); break; } -#endif default: break; } @@ -281,7 +290,7 @@ async Task RunProgram() var id = seq.AddCachedSeq(SeqConfig.OrderWaveGroupID); var orderWaveGroup = new DataRecord( [id.ToString(), ..headers.Select(c => record[c])], - TableNames.OrderWaveGroup, + TableNames.OrderExtraList, ["ID", "OrderNo", "ShardKey", "Type", "ConfigJson", "CompanyID"]); resultList.Add(orderWaveGroup); return resultList; @@ -368,7 +377,7 @@ async Task RunProgram() host.Services.AddHostedService(); host.Services.AddSingleton(); host.Services.AddSingleton(); - host.Services.AddSingleton(); + host.Services.AddSingleton(); host.Services.AddSingleton(); // host.Services.AddRedisCache(redisOptions); host.Services.AddSingleton(); diff --git a/MesETL.App/Services/ProcessContext.cs b/MesETL.App/Services/ProcessContext.cs index 9db97cd..583dbe4 100644 --- a/MesETL.App/Services/ProcessContext.cs +++ b/MesETL.App/Services/ProcessContext.cs @@ -45,7 +45,7 @@ public class ProcessContext public void CompleteTransform() => IsTransformCompleted = true; public void CompleteOutput() => IsOutputCompleted = true; - public bool AddException(Exception e) => _hasException = true; + public bool AddException(Exception e) => _hasException = true; // 没打算存起来,暂时先加个标记 public void AddInput() => Interlocked.Increment(ref _inputCount); diff --git a/MesETL.Shared/Compression/DeflateArchive.cs b/MesETL.Shared/Compression/DeflateArchive.cs new file mode 100644 index 0000000..9a024ae --- /dev/null +++ b/MesETL.Shared/Compression/DeflateArchive.cs @@ -0,0 +1,39 @@ +using System.IO.Compression; + +namespace MesETL.Shared.Compression; + +/// +/// Deflate压缩工具类 +/// +public static class DeflateArchive +{ + /// + /// 解压Deflate + /// + /// + /// + public static byte[] Decompress(byte[] input) + { + using var msi = new MemoryStream(input); + using var mso = new MemoryStream(); + using var ds = new DeflateStream(msi, CompressionMode.Decompress); + ds.CopyTo(mso); + ds.Flush(); + return mso.ToArray(); + } + + /// + /// 压缩Deflate + /// + /// + /// + public static byte[] Compress(byte[] input) + { + using var msi = new MemoryStream(input); + using var mso = new MemoryStream(); + using var ds = new DeflateStream(mso, CompressionMode.Compress); + msi.CopyTo(ds); + ds.Flush(); + return mso.ToArray(); + } +} \ No newline at end of file diff --git a/MesETL.Shared/Helper/Helper.Compress.cs b/MesETL.Shared/Helper/Helper.Compress.cs deleted file mode 100644 index e3ede29..0000000 --- a/MesETL.Shared/Helper/Helper.Compress.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System.IO.Compression; - -namespace MesETL.Shared.Helper; - -public class CompressHelper -{ - /// - /// - /// - /// - /// - public static byte[] CompressDeflate(byte[] data) - { - using var src = new MemoryStream(data); - - using var outStream = new MemoryStream(); - using var gzip = new DeflateStream(outStream, CompressionMode.Compress); - src.CopyTo(gzip); - gzip.Flush(); - return outStream.ToArray(); - } -} \ No newline at end of file diff --git a/MesETL.Test/DatabaseToolBox.cs b/MesETL.Test/DatabaseToolBox.cs index 1d4d17d..be87840 100644 --- a/MesETL.Test/DatabaseToolBox.cs +++ b/MesETL.Test/DatabaseToolBox.cs @@ -12,7 +12,7 @@ namespace TestProject1; public class DatabaseToolBox { private readonly ITestOutputHelper _output; - public const string ConnStr = "Server=127.0.0.1;Port=3306;UserId=root;Password=123456;"; + public const string ConnStr = "Server=192.168.1.246;Port=33025;UserId=root;Password=123456;"; public DatabaseToolBox(ITestOutputHelper output) { @@ -177,6 +177,7 @@ public class DatabaseToolBox [InlineData("mesdb_3")] [InlineData("mesdb_4")] [InlineData("mesdb_5")] + [InlineData("mesdb_6")] public async Task TruncateAllTable(string database) { var tables = await DatabaseHelper.QueryTableAsync(ConnStr,