diff --git a/MesETL.App/Const/TableNames.cs b/MesETL.App/Const/TableNames.cs
index c68cdf6..8a1c424 100644
--- a/MesETL.App/Const/TableNames.cs
+++ b/MesETL.App/Const/TableNames.cs
@@ -25,7 +25,7 @@ public static class TableNames
public const string OrderProcessStep = "order_process_step";
public const string OrderProcessStepItem = "order_process_step_item";
public const string OrderScrapBoard = "order_scrap_board";
- public const string OrderWaveGroup = "order_wave_group";
+ public const string OrderExtraList = "order_extra_list";
public const string ProcessGroup = "process_group";
public const string ProcessInfo = "process_info";
public const string ProcessItemExp = "process_item_exp";
diff --git a/MesETL.App/DataRecord.cs b/MesETL.App/DataRecord.cs
index 6ec74bb..689dbef 100644
--- a/MesETL.App/DataRecord.cs
+++ b/MesETL.App/DataRecord.cs
@@ -2,52 +2,12 @@
public class DataRecord : ICloneable
{
- ///
- /// 尝试获取一条记录的某个字段值
- ///
- ///
- ///
- ///
- ///
- ///
- public static bool TryGetField(DataRecord record, string columnName, out string value)
- {
- value = string.Empty;
- if (record.Headers is null)
- throw new InvalidOperationException("Cannot get field when headers of a record have not been set.");
- var idx = IndexOfIgnoreCase(record.Headers, columnName);
- if (idx == -1)
- return false;
- value = record.Fields[idx];
- return true;
- }
-
- ///
- /// 获取一条记录的某个字段值
- /// TODO: 最好能优化至O(1)
- ///
- ///
- ///
- ///
- ///
- ///
- public static string GetField(DataRecord record, string columnName)
- {
- if (record.Headers is null)
- throw new InvalidOperationException("Headers have not been set.");
- var idx = IndexOfIgnoreCase(record.Headers, columnName);
- if (idx is -1)
- throw new IndexOutOfRangeException(
- $"Column name '{columnName}' not found in this record, table name '{record.TableName}'.");
- return record.Fields[idx];
- }
-
private static int IndexOfIgnoreCase(IList list, string value)
{
var idx = -1;
for (var i = 0; i < list.Count; i++)
{
- if (list[i].Equals(value, StringComparison.OrdinalIgnoreCase))
+ if (list[i].Equals(value, StringComparison.OrdinalIgnoreCase))
{
idx = i;
break;
@@ -57,45 +17,54 @@ public class DataRecord : ICloneable
return idx;
}
+ private readonly List _fields;
+ private readonly List _headers;
+
///
/// 字段列表
///
- public IList Fields { get; }
+ public IReadOnlyList Fields => _fields;
+
///
/// 表头列表
///
- public IList Headers { get; }
+ public IReadOnlyList Headers => _headers;
+
///
/// 来源表名
///
public string TableName { get; }
+
///
/// 需要输出的数据库
///
public string? Database { get; set; }
+
///
/// 所有字段的总字符数量
///
public long FieldCharCount { get; }
+
///
/// 忽略这个记录,不会被输出
///
public bool Ignore { get; set; }
- public DataRecord(IEnumerable fields, string tableName, IEnumerable headers, string? database = null)
+ public DataRecord(IEnumerable fields, string tableName, IEnumerable headers,
+ string? database = null)
{
- Fields = fields.ToList();
+ _fields = fields.ToList();
TableName = tableName;
- Headers = headers.ToList();
+ _headers = headers.ToList();
Database = database;
-
- if (Fields.Count != Headers.Count)
+
+ if (_fields.Count != _headers.Count)
throw new ArgumentException(
- $"The number of fields does not match the number of headers. Expected: {Headers.Count} Got: {Fields.Count} Fields: {string.Join(',', Fields)}",
+ $"The number of fields does not match the number of headers. Expected: {_headers.Count} Got: {_fields.Count} Fields: {string.Join(',', _fields)}",
nameof(fields));
-
- FieldCharCount = Fields.Sum(x => (long)x.Length);
+
+ FieldCharCount = _fields.Sum(x => (long)x.Length);
}
///
@@ -104,8 +73,8 @@ public class DataRecord : ICloneable
///
public string this[int index]
{
- get => Fields[index];
- set => Fields[index] = value;
+ get => _fields[index];
+ set => _fields[index] = value;
}
///
@@ -114,61 +83,101 @@ public class DataRecord : ICloneable
///
public string this[string columnName]
{
- get => GetField(this, columnName);
+ get => GetField(columnName);
set => SetField(columnName, value);
}
///
- /// 尝试获取字段值
+ /// 尝试获取某个字段值
///
///
///
///
- public bool TryGetField(string columnName, out string value) => TryGetField(this, columnName, out value);
-
- ///
- /// 为一个字段赋值
- ///
- ///
- ///
- ///
- public bool SetField(string columnName, string value) => SetField(this, columnName, value);
-
- ///
- ///
- ///
- ///
- ///
- ///
- ///
///
- ///
- public static bool SetField(DataRecord record, string columnName, string value)
+ public bool TryGetField(string columnName, out string value)
{
- // 表头检查
- if (record.Headers is null)
- throw new InvalidOperationException("记录的表头尚未设置,无法赋值");
- var idx = IndexOfIgnoreCase(record.Headers, columnName);
- if (idx is -1)
- throw new IndexOutOfRangeException(
- $"列 '{columnName}' 不存在于该纪录中,表名 '{record.TableName}");
- record.Fields[idx] = value;
+ value = string.Empty;
+ if (_headers is null)
+ throw new InvalidOperationException("Cannot get field when headers of a record have not been set.");
+ var idx = IndexOfIgnoreCase(_headers, columnName);
+ if (idx == -1)
+ return false;
+ value = _fields[idx];
return true;
}
-
- public void RemoveField(string columnName)
+
+ ///
+ /// 获取一条记录的某个字段值
+ /// TODO: 最好能优化至O(1)
+ ///
+ ///
+ ///
+ ///
+ ///
+ public string GetField(string columnName)
{
- var idx = IndexOfIgnoreCase(Headers, columnName);
- if (idx == -1)
- throw new InvalidOperationException($"{TableName}: 列名 '{columnName}' 不存在");
-
- Fields.RemoveAt(idx);
- Headers.Remove(columnName);
+ if (_headers is null)
+ throw new InvalidOperationException("记录的表头尚未设置,无法赋值");
+ var idx = IndexOfIgnoreCase(_headers, columnName);
+ if (idx is -1)
+ throw new IndexOutOfRangeException(
+ $"列 '{columnName}' 不存在于该纪录中,表名 '{TableName}");
+ return _fields[idx];
}
- public bool HeaderExists(string columnName) => IndexOfIgnoreCase(Headers, columnName) != -1;
+ ///
+ /// 为记录的一个字段赋值,如果该字段名不存在则会抛出异常
+ ///
+ /// 列名
+ /// 值
+ ///
+ /// 该记录的表头尚未初始化,你可能在错误的阶段调用了该方法
+ /// 输入的字段名不存在于该记录中
+ public void SetField(string columnName, string value)
+ {
+ // 表头检查
+ if (_headers is null)
+ throw new InvalidOperationException("记录的表头尚未设置,无法赋值");
+ var idx = IndexOfIgnoreCase(_headers, columnName);
+ if (idx is -1)
+ throw new IndexOutOfRangeException(
+ $"列 '{columnName}' 不存在于该纪录中,表名 '{TableName}");
+ _fields[idx] = value;
+ }
+
+ ///
+ /// 在记录中追加一个字段
+ ///
+ /// 字段名
+ /// 字段值
+ /// 记录的表头尚未初始化,你可能在错误的阶段调用了此方法
+ /// 提供的字段名已存在于该记录中
+ public void AppendField(string columnName, string value)
+ {
+ if (_headers is null)
+ throw new InvalidOperationException("记录的表头尚未设置,无法赋值");
+ var idx = IndexOfIgnoreCase(_headers, columnName);
+ if (idx is > 0)
+ throw new ArgumentException($"字段名 '{columnName}' 已存在于该记录中,无法重复添加", nameof(columnName));
+
+ _headers.Add(columnName);
+ _fields.Add(value);
+ }
+
+ public void RemoveField(string columnName)
+ {
+ var idx = IndexOfIgnoreCase(_headers, columnName);
+ if (idx == -1)
+ throw new InvalidOperationException($"{TableName}: 列名 '{columnName}' 不存在");
+
+ _fields.RemoveAt(idx);
+ _headers.Remove(columnName);
+ }
+
+ public bool HeaderExists(string columnName) => IndexOfIgnoreCase(_headers, columnName) != -1;
+
public object Clone()
{
- return new DataRecord(new List(Fields), TableName, new List(Headers), Database);
+ return new DataRecord(new List(_fields), TableName, new List(_headers), Database);
}
}
\ No newline at end of file
diff --git a/MesETL.App/HostedServices/MainHostedService.cs b/MesETL.App/HostedServices/MainHostedService.cs
index ef0b145..5570002 100644
--- a/MesETL.App/HostedServices/MainHostedService.cs
+++ b/MesETL.App/HostedServices/MainHostedService.cs
@@ -77,6 +77,8 @@ public class MainHostedService : BackgroundService
await Task.WhenAll(inputTask, transformTask, outputTask);
_stopwatch.Stop();
_logger.LogInformation("***** 所有传输任务均已完成 *****");
+ if (_context.HasException)
+ _logger.LogError("***** 传输过程中有错误发生 *****");
_logger.LogInformation("***** 耗时:{Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
await Task.Delay(5000, stoppingToken);
@@ -114,7 +116,10 @@ public class MainHostedService : BackgroundService
{
var connStr = _databaseOptions.Value.ConnectionString
?? throw new ApplicationException("分库配置中没有配置数据库");
- _logger.LogWarning("已开启MySQL延迟写入功能并禁用重做日志,请注意数据安全");
+ if (enable)
+ _logger.LogWarning("已开启MySQL延迟写入功能并禁用重做日志,请注意数据安全");
+ else _logger.LogInformation("不安全变量已关闭");
+
if (enable)
{
await DatabaseHelper.NonQueryAsync(connStr,
diff --git a/MesETL.App/Program.cs b/MesETL.App/Program.cs
index 369c4d1..79a3832 100644
--- a/MesETL.App/Program.cs
+++ b/MesETL.App/Program.cs
@@ -1,4 +1,4 @@
-#define USE_TEST_DB // 如果使用测试库运行,则加上USE_TEST_DB预处理器指令
+// #define USE_TEST_DB // 如果使用测试库运行,则加上USE_TEST_DB预处理器指令
using System.Text;
using MesETL.App;
@@ -12,6 +12,7 @@ using MesETL.App.Options;
using MesETL.App.Services.ErrorRecorder;
using MesETL.App.Services.Loggers;
using MesETL.App.Services.Seq;
+using MesETL.Shared.Compression;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
@@ -217,6 +218,15 @@ async Task RunProgram()
break;
}
+ // 将JsonStr列转换为Data列,添加CompressionType列
+ case TableNames.OrderModuleExtra:
+ {
+ record.AppendField("CompressionType", "1");
+ record.AppendField("Data",
+ Convert.ToHexString(DeflateArchive.Compress(Convert.FromHexString(record["JsonStr"]))));
+ record.RemoveField("JsonStr");
+ break;
+ }
// 删除ID列,让数据库自行递增
// TODO: 数据表改进,删除ID列或是替换为流水号
case TableNames.ProcessStepEfficiency:
@@ -234,14 +244,13 @@ async Task RunProgram()
record.RemoveField("Key");
break;
}
-#if USE_TEST_DB
- // 测试环境忽略PlaceData列,生产环境会提前将其移除
+ // 移除PlaceData列(如果存在的话,生产库已经删除)
case TableNames.SimplePlanOrder:
{
- record.RemoveField("PlaceData");
+ if(record.HeaderExists("PlaceData"))
+ record.RemoveField("PlaceData");
break;
}
-#endif
default: break;
}
@@ -281,7 +290,7 @@ async Task RunProgram()
var id = seq.AddCachedSeq(SeqConfig.OrderWaveGroupID);
var orderWaveGroup = new DataRecord(
[id.ToString(), ..headers.Select(c => record[c])],
- TableNames.OrderWaveGroup,
+ TableNames.OrderExtraList,
["ID", "OrderNo", "ShardKey", "Type", "ConfigJson", "CompanyID"]);
resultList.Add(orderWaveGroup);
return resultList;
@@ -368,7 +377,7 @@ async Task RunProgram()
host.Services.AddHostedService();
host.Services.AddSingleton();
host.Services.AddSingleton();
- host.Services.AddSingleton();
+ host.Services.AddSingleton();
host.Services.AddSingleton();
// host.Services.AddRedisCache(redisOptions);
host.Services.AddSingleton();
diff --git a/MesETL.App/Services/ProcessContext.cs b/MesETL.App/Services/ProcessContext.cs
index 9db97cd..583dbe4 100644
--- a/MesETL.App/Services/ProcessContext.cs
+++ b/MesETL.App/Services/ProcessContext.cs
@@ -45,7 +45,7 @@ public class ProcessContext
public void CompleteTransform() => IsTransformCompleted = true;
public void CompleteOutput() => IsOutputCompleted = true;
- public bool AddException(Exception e) => _hasException = true;
+ public bool AddException(Exception e) => _hasException = true; // 没打算存起来,暂时先加个标记
public void AddInput() => Interlocked.Increment(ref _inputCount);
diff --git a/MesETL.Shared/Compression/DeflateArchive.cs b/MesETL.Shared/Compression/DeflateArchive.cs
new file mode 100644
index 0000000..9a024ae
--- /dev/null
+++ b/MesETL.Shared/Compression/DeflateArchive.cs
@@ -0,0 +1,39 @@
+using System.IO.Compression;
+
+namespace MesETL.Shared.Compression;
+
+///
+/// Deflate压缩工具类
+///
+public static class DeflateArchive
+{
+ ///
+ /// 解压Deflate
+ ///
+ ///
+ ///
+ public static byte[] Decompress(byte[] input)
+ {
+ using var msi = new MemoryStream(input);
+ using var mso = new MemoryStream();
+ using var ds = new DeflateStream(msi, CompressionMode.Decompress);
+ ds.CopyTo(mso);
+ ds.Flush();
+ return mso.ToArray();
+ }
+
+ ///
+ /// 压缩Deflate
+ ///
+ ///
+ ///
+ public static byte[] Compress(byte[] input)
+ {
+ using var msi = new MemoryStream(input);
+ using var mso = new MemoryStream();
+ using var ds = new DeflateStream(mso, CompressionMode.Compress);
+ msi.CopyTo(ds);
+ ds.Flush();
+ return mso.ToArray();
+ }
+}
\ No newline at end of file
diff --git a/MesETL.Shared/Helper/Helper.Compress.cs b/MesETL.Shared/Helper/Helper.Compress.cs
deleted file mode 100644
index e3ede29..0000000
--- a/MesETL.Shared/Helper/Helper.Compress.cs
+++ /dev/null
@@ -1,22 +0,0 @@
-using System.IO.Compression;
-
-namespace MesETL.Shared.Helper;
-
-public class CompressHelper
-{
- ///
- ///
- ///
- ///
- ///
- public static byte[] CompressDeflate(byte[] data)
- {
- using var src = new MemoryStream(data);
-
- using var outStream = new MemoryStream();
- using var gzip = new DeflateStream(outStream, CompressionMode.Compress);
- src.CopyTo(gzip);
- gzip.Flush();
- return outStream.ToArray();
- }
-}
\ No newline at end of file
diff --git a/MesETL.Test/DatabaseToolBox.cs b/MesETL.Test/DatabaseToolBox.cs
index 1d4d17d..be87840 100644
--- a/MesETL.Test/DatabaseToolBox.cs
+++ b/MesETL.Test/DatabaseToolBox.cs
@@ -12,7 +12,7 @@ namespace TestProject1;
public class DatabaseToolBox
{
private readonly ITestOutputHelper _output;
- public const string ConnStr = "Server=127.0.0.1;Port=3306;UserId=root;Password=123456;";
+ public const string ConnStr = "Server=192.168.1.246;Port=33025;UserId=root;Password=123456;";
public DatabaseToolBox(ITestOutputHelper output)
{
@@ -177,6 +177,7 @@ public class DatabaseToolBox
[InlineData("mesdb_3")]
[InlineData("mesdb_4")]
[InlineData("mesdb_5")]
+ [InlineData("mesdb_6")]
public async Task TruncateAllTable(string database)
{
var tables = await DatabaseHelper.QueryTableAsync(ConnStr,