diff --git a/MesETL.App/Const/TableNames.cs b/MesETL.App/Const/TableNames.cs
index 06a42b2..c68cdf6 100644
--- a/MesETL.App/Const/TableNames.cs
+++ b/MesETL.App/Const/TableNames.cs
@@ -8,9 +8,11 @@ public static class TableNames
public const string OrderBlockPlanItem = "order_block_plan_item";
public const string OrderBlockPlanResult = "order_block_plan_result";
public const string OrderBoxBlock = "order_box_block";
+ public const string OrderBoxViewConfig = "order_box_view_config";
public const string OrderDataBlock = "order_data_block";
public const string OrderDataGoods = "order_data_goods";
public const string OrderDataParts = "order_data_parts";
+ public const string OrderExtra = "order_extra";
public const string OrderItem = "order_item";
public const string OrderModule = "order_module";
public const string OrderModuleExtra = "order_module_extra";
@@ -23,6 +25,7 @@ public static class TableNames
public const string OrderProcessStep = "order_process_step";
public const string OrderProcessStepItem = "order_process_step_item";
public const string OrderScrapBoard = "order_scrap_board";
+ public const string OrderWaveGroup = "order_wave_group";
public const string ProcessGroup = "process_group";
public const string ProcessInfo = "process_info";
public const string ProcessItemExp = "process_item_exp";
diff --git a/MesETL.App/DataRecord.cs b/MesETL.App/DataRecord.cs
index c4c79c6..6ec74bb 100644
--- a/MesETL.App/DataRecord.cs
+++ b/MesETL.App/DataRecord.cs
@@ -2,6 +2,14 @@
public class DataRecord : ICloneable
{
+ ///
+ /// 尝试获取一条记录的某个字段值
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
public static bool TryGetField(DataRecord record, string columnName, out string value)
{
value = string.Empty;
@@ -14,6 +22,15 @@ public class DataRecord : ICloneable
return true;
}
+ ///
+ /// 获取一条记录的某个字段值
+ /// TODO: 最好能优化至O(1)
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
public static string GetField(DataRecord record, string columnName)
{
if (record.Headers is null)
@@ -24,7 +41,7 @@ public class DataRecord : ICloneable
$"Column name '{columnName}' not found in this record, table name '{record.TableName}'.");
return record.Fields[idx];
}
-
+
private static int IndexOfIgnoreCase(IList list, string value)
{
var idx = -1;
@@ -40,11 +57,30 @@ public class DataRecord : ICloneable
return idx;
}
+ ///
+ /// 字段列表
+ ///
public IList Fields { get; }
+ ///
+ /// 表头列表
+ ///
public IList Headers { get; }
+ ///
+ /// 来源表名
+ ///
public string TableName { get; }
+ ///
+ /// 需要输出的数据库
+ ///
public string? Database { get; set; }
+ ///
+ /// 所有字段的总字符数量
+ ///
public long FieldCharCount { get; }
+ ///
+ /// 忽略这个记录,不会被输出
+ ///
+ public bool Ignore { get; set; }
public DataRecord(IEnumerable fields, string tableName, IEnumerable headers, string? database = null)
@@ -62,45 +98,64 @@ public class DataRecord : ICloneable
FieldCharCount = Fields.Sum(x => (long)x.Length);
}
+ ///
+ /// 使用索引访问字段
+ ///
+ ///
public string this[int index]
{
get => Fields[index];
set => Fields[index] = value;
}
+ ///
+ /// 使用列名访问字段,不区分大小写
+ ///
+ ///
public string this[string columnName]
{
get => GetField(this, columnName);
set => SetField(columnName, value);
}
- public int FieldCount => Fields.Count;
-
+ ///
+ /// 尝试获取字段值
+ ///
+ ///
+ ///
+ ///
public bool TryGetField(string columnName, out string value) => TryGetField(this, columnName, out value);
+ ///
+ /// 为一个字段赋值
+ ///
+ ///
+ ///
+ ///
public bool SetField(string columnName, string value) => SetField(this, columnName, value);
- public bool SetField(DataRecord record, string columnName, string value)
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public static bool SetField(DataRecord record, string columnName, string value)
{
+ // 表头检查
if (record.Headers is null)
- throw new InvalidOperationException("Headers have not been set.");
+ throw new InvalidOperationException("记录的表头尚未设置,无法赋值");
var idx = IndexOfIgnoreCase(record.Headers, columnName);
if (idx is -1)
throw new IndexOutOfRangeException(
- $"Column name '{columnName}' not found in this record, table name '{record.TableName}");
+ $"列 '{columnName}' 不存在于该纪录中,表名 '{record.TableName}");
record.Fields[idx] = value;
return true;
}
- public void AddField(string columnName, string value)
- {
- if (IndexOfIgnoreCase(Headers, columnName) != -1)
- throw new InvalidOperationException($"{TableName}: 列名 '{columnName}' 已存在");
-
- Fields.Add(value);
- Headers.Add(columnName);
- }
-
public void RemoveField(string columnName)
{
var idx = IndexOfIgnoreCase(Headers, columnName);
diff --git a/MesETL.App/Helpers/DumpDataHelper.cs b/MesETL.App/Helpers/DumpDataHelper.cs
index 148bcbf..26ecb7a 100644
--- a/MesETL.App/Helpers/DumpDataHelper.cs
+++ b/MesETL.App/Helpers/DumpDataHelper.cs
@@ -1,5 +1,7 @@
using System.Text.Json;
using System.Text.RegularExpressions;
+using MesETL.App.HostedServices;
+using Serilog;
using ZstdSharp;
namespace MesETL.App.Helpers;
@@ -27,16 +29,13 @@ public static partial class DumpDataHelper
string[] ParseHeader(ReadOnlySpan headerStr)
{
headerStr = headerStr[1..^1];
- Span ranges = stackalloc Range[50];
- var count = headerStr.Split(ranges, ',');
- var arr = new string[count];
-
- for (var i = 0; i < count; i++)
+ var headers = new List();
+ foreach (var range in headerStr.Split(','))
{
- arr[i] = headerStr[ranges[i]].Trim("@`").ToString(); // 消除列名的反引号,如果是变量则消除@
+ headers.Add(headerStr[range].Trim("@`").ToString()); // 消除列名的反引号,如果是变量则消除@
}
-
- return arr;
+
+ return headers.ToArray();
}
}
@@ -45,6 +44,7 @@ public static partial class DumpDataHelper
///
///
///
+ [Obsolete("用ParseMyDumperFile替代")]
public static string GetTableNameFromCsvFileName(ReadOnlySpan filePath)
{
filePath = filePath[(filePath.LastIndexOf('\\') + 1)..];
@@ -68,6 +68,30 @@ public static partial class DumpDataHelper
return filePath[(firstDotIdx+1)..secondDotIdx].ToString();
}
+
+ public enum MyDumperFileType { Dat, Sql }
+
+ public record MyDumperFileMeta(string Path, string Database, string TableName, int Index, MyDumperFileType Type);
+
+ public static MyDumperFileMeta ParseMyDumperFile(ReadOnlySpan path)
+ {
+ try
+ {
+ var fileName = Path.GetFileName(path).ToString();
+ var parts = fileName.Split('.');
+ var type = parts[3] switch
+ {
+ "dat" => MyDumperFileType.Dat,
+ "sql" => MyDumperFileType.Sql,
+ _ => throw new ArgumentException("不支持的MyDumper文件类型", nameof(path))
+ };
+ return new MyDumperFileMeta(path.ToString(), parts[0], parts[1], int.Parse(parts[2]), type);
+ }
+ catch (Exception e)
+ {
+ throw new ArgumentException($"此文件不是MyDumper导出的文件 {path}", nameof(path), e);
+ }
+ }
///
/// 从MyDumper导出的SQL文件内容中读取CSV文件名
@@ -122,17 +146,46 @@ public static partial class DumpDataHelper
var reader = new StreamReader(ds);
return await reader.ReadToEndAsync();
}
-
- public static bool IsJson(string str)
+
+ ///
+ /// 适用于文件输入服务以及MyDumper Zst导出目录的文件元数据构建函数
+ ///
+ ///
+ ///
+ ///
+ public static FileInputInfo? MyDumperFileInputMetaBuilder(string filePath)
{
+ // 只查找后缀为.dat.zst的文件
+ if (!filePath.EndsWith(".dat.zst")) return null;
+
+ var fileMeta = ParseMyDumperFile(filePath);
+ var inputDir = Path.GetDirectoryName(filePath);
+ string[]? headers;
try
{
- JsonDocument.Parse(str);
- return true;
+ // 查找同目录下同表的SQL文件
+ var sqlFile = Directory.GetFiles(inputDir!)
+ .SingleOrDefault(f => f.Equals(filePath.Replace(".dat.zst", ".sql.zst")));
+ if (sqlFile is null)
+ {
+ Log.Debug("{TableName}表的SQL文件不存在", fileMeta.TableName);
+ return null;
+ }
+ headers = GetCsvHeadersFromSqlFile(
+ DecompressZstAsStringAsync(File.OpenRead(sqlFile)).Result);
}
- catch (JsonException)
+ catch (InvalidOperationException e)
{
- return false;
+ throw new ApplicationException($"目录下不止一个{fileMeta.TableName}表的SQL文件", e);
}
+
+ return new FileInputInfo
+ {
+ FileName = filePath,
+ TableName = fileMeta.TableName,
+ Headers = headers,
+ Database = fileMeta.Database,
+ Part = fileMeta.Index
+ };
}
}
\ No newline at end of file
diff --git a/MesETL.App/HostedServices/FileInputService.cs b/MesETL.App/HostedServices/FileInputService.cs
index 4c973b6..e1ce00b 100644
--- a/MesETL.App/HostedServices/FileInputService.cs
+++ b/MesETL.App/HostedServices/FileInputService.cs
@@ -1,4 +1,5 @@
-using MesETL.App.HostedServices.Abstractions;
+using MesETL.App.Const;
+using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
using MesETL.App.Services.ETL;
@@ -13,16 +14,11 @@ public record FileInputInfo
{
public required string FileName { get; init; }
public required string TableName { get; init; }
+ public required string Database { get; init; }
+ public required int Part { get; init; }
public required string[] Headers { get; init; }
}
-public enum FileInputType
-{
- MyDumperCsv,
- MyDumperZst,
- ErrorLog,
-}
-
///
/// 从输入目录中导入文件
///
@@ -53,33 +49,23 @@ public class FileInputService : IInputService
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
var inputDir = _dataInputOptions.Value.InputDir ?? throw new ApplicationException("未配置文件输入目录");
- _logger.LogInformation("***** Input service started, working directory: {InputDir} *****", inputDir);
-
- var trans = _dataInputOptions.Value.FileInputMetaBuilder;
- if(trans is null) throw new ApplicationException("未配置文件名-表名映射委托");
- FileInputInfo[] infoArr = Directory.GetFiles(inputDir)
- .Select(f => trans(f))
- .Where(info => info is not null).ToArray()!;
-
- var orderedInfo = GetFilesInOrder(infoArr).ToArray();
-
- _logger.LogInformation("***** {Count} files founded in directory,{OrderedCount} files is matched with configuration *****", infoArr.Length, orderedInfo.Length);
- foreach (var info in orderedInfo)
- {
- _logger.LogDebug("Table {TableName}: {FileName}", info.TableName, info.FileName);
- }
+ _logger.LogInformation("***** 输入服务已启动,工作目录为:{InputDir} *****", inputDir);
+ var orderedInfo = GetOrderedInputInfo(inputDir);
+
foreach (var info in orderedInfo)
{
- _logger.LogInformation("Reading file: {FileName}, table: {TableName}", info.FileName, info.TableName);
- using var source = _dataReaderFactory.CreateReader(info.FileName,info.TableName,info.Headers);
+ var file = Path.GetFileName(info.FileName);
+ _logger.LogInformation("正在读取文件:{FileName}, 对应的数据表:{TableName}", file, info.TableName);
+ using var source = _dataReaderFactory.CreateReader(info.FileName, info.TableName, info.Headers);
var count = 0;
while (await source.ReadAsync())
{
if (GC.GetTotalMemory(false) > _memoryThreshold)
{
- _logger.LogWarning("内存过高,暂缓输入");
+ _logger.LogWarning("内存使用率过高,暂缓输入");
+ GC.Collect();
GC.Collect();
await Task.Delay(3000, cancellationToken);
}
@@ -90,12 +76,34 @@ public class FileInputService : IInputService
}
_context.AddTableInput(info.TableName, count);
- _logger.LogInformation("Input of table: '{TableName}' finished", info.TableName);
+ _logger.LogInformation("文件 {File} 输入完成", file);
_dataInputOptions.Value.OnTableInputCompleted?.Invoke(info.TableName);
}
_context.CompleteInput();
- _logger.LogInformation("***** Input service finished *****");
+ _logger.LogInformation("***** 输入服务已执行完毕 *****");
+ }
+
+ public IEnumerable GetOrderedInputInfo(string dir)
+ {
+ var metaBuilder = _dataInputOptions.Value.FileInputMetaBuilder;
+ if(metaBuilder is null) throw new ApplicationException("未配置文件名->表名的映射委托函数");
+ var files = Directory.GetFiles(dir);
+ FileInputInfo[] infoArr = files
+ .Select(f => metaBuilder(f))
+ .Where(info => info is not null).ToArray()!;
+
+ var orderedInfo = GetFilesInOrder(infoArr).ToArray();
+
+ _logger.LogInformation("***** 输入目录中发现 {Count} 个文件, {InfoCount} 个文件可用,{OrderedCount} 个文件符合当前输入配置 *****",
+ files.Length, infoArr.Length, orderedInfo.Length);
+ foreach (var info in orderedInfo.GroupBy(i => i.TableName))
+ {
+ _logger.LogDebug("表 {TableName} 发现 {FileCount} 个对应文件:\n{FileName}",
+ info.Key, info.Count(), string.Join('\n', info.Select(f => f.FileName)));
+ }
+
+ return orderedInfo;
}
///
@@ -104,7 +112,7 @@ public class FileInputService : IInputService
///
private IEnumerable GetFilesInOrder(FileInputInfo[] inputFiles)
{
- var tableOrder = _dataInputOptions.Value.TableOrder;
+ var tableOrder = _dataInputOptions.Value.TableOrder ?? typeof(TableNames).GetFields().Select(f => f.GetValue(null) as string).ToArray();
var ignoreTable = _dataInputOptions.Value.TableIgnoreList;
if (tableOrder is null or { Length: 0 })
return inputFiles;
@@ -115,10 +123,14 @@ public class FileInputService : IInputService
{
foreach (var tableName in tableOrder)
{
- var target = inputFiles.FirstOrDefault(f =>
- f.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase));
- if (target is not null && !ignoreTable.Contains(target.TableName))
+ var targets = inputFiles.Where(f =>
+ f.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase) &&
+ !ignoreTable.Contains(f.TableName));
+
+ foreach (var target in targets)
+ {
yield return target;
+ }
}
}
}
diff --git a/MesETL.App/HostedServices/MainHostedService.cs b/MesETL.App/HostedServices/MainHostedService.cs
index 233a68c..2b1f7b2 100644
--- a/MesETL.App/HostedServices/MainHostedService.cs
+++ b/MesETL.App/HostedServices/MainHostedService.cs
@@ -51,12 +51,13 @@ public class MainHostedService : BackgroundService
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
- _logger.LogInformation("Command argument detected, execute for each database");
var command = _config["Command"];
if (!string.IsNullOrEmpty(command))
{
- _logger.LogInformation("***** Running Sql Command *****");
+ _logger.LogInformation("检测到命令参数传入,将对所有配置的数据库执行输入的命令。。。");
+ _logger.LogInformation("***** 执行SQL命令 *****");
await ExecuteEachDatabase(command, stoppingToken);
+ _logger.LogInformation("***** 执行完成 *****");
Environment.Exit(0);
}
@@ -75,8 +76,8 @@ public class MainHostedService : BackgroundService
await Task.WhenAll(inputTask, transformTask, outputTask);
_stopwatch.Stop();
- _logger.LogInformation("***** All tasks completed *****");
- _logger.LogInformation("***** ElapseTime: {Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
+ _logger.LogInformation("***** 所有传输任务均已完成 *****");
+ _logger.LogInformation("***** 耗时:{Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
await Task.Delay(5000, stoppingToken);
if(enableUnsafeVar)
@@ -84,7 +85,7 @@ public class MainHostedService : BackgroundService
if (!stoppingToken.IsCancellationRequested)
{
await ExportResultAsync();
- _logger.LogInformation("The execution result export to {Path}",
+ _logger.LogInformation("传输结果已保存至 {Path}",
Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md"));
Environment.Exit(0);
@@ -157,32 +158,33 @@ public class MainHostedService : BackgroundService
{
var sb = new StringBuilder();
if (_context.HasException)
- sb.AppendLine("# Program Completed With Error");
- else sb.AppendLine("# Program Completed Successfully");
- sb.AppendLine("## Process Count");
+ sb.AppendLine("# 程序执行完毕,**但中途发生了异常**");
+ else sb.AppendLine("# 程序执行完毕,没有发生错误");
+ sb.AppendLine("## 处理计数");
var processCount = new[]
{
- new { State = "Input", Count = _context.InputCount },
- new { State = "Transform", Count = _context.TransformCount },
- new { State = "Output", Count = _context.OutputCount }
+ new { 操作 = "输入", 数量 = _context.InputCount },
+ new { 操作 = "转换", 数量 = _context.TransformCount },
+ new { 操作 = "输出", 数量 = _context.OutputCount }
};
sb.AppendLine(processCount.ToMarkdownTable());
sb.AppendLine("\n---\n");
- sb.AppendLine("## Table Output Progress");
+ sb.AppendLine("## 表输入/输出计数");
var tableOutputProgress = _context.TableProgress.Select(pair =>
- new { Table = pair.Key, Count = pair.Value }).OrderBy(s => s.Table);
+ new { 表名 = pair.Key, 计数 = pair.Value }).OrderBy(s => s.表名);
sb.AppendLine(tableOutputProgress.ToMarkdownTable());
sb.AppendLine("\n---\n");
- sb.AppendLine("## Result");
+ sb.AppendLine("## 总览");
var elapsedTime = (_stopwatch!.ElapsedMilliseconds / 1000f);
var result = new[]
{
- new { Field = "ElapsedTime", Value = elapsedTime.ToString("F2") },
+ new { 条目 = "耗时", 值 = elapsedTime.ToString("F2") + " 秒" },
new
{
- Field = "Average Output Speed",
- Value = (_context.OutputCount / elapsedTime).ToString("F2") + "records/s"
- }
+ 条目 = "平均输出速度",
+ 值 = (_context.OutputCount / elapsedTime).ToString("F2") + " 条记录/秒"
+ },
+ new { 条目 = "内存占用峰值", 值 = _context.MaxMemoryUsage + " 兆字节" }
};
sb.AppendLine(result.ToMarkdownTable());
await File.WriteAllTextAsync(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md"),
diff --git a/MesETL.App/HostedServices/OutputService.cs b/MesETL.App/HostedServices/OutputService.cs
index 101b516..901c762 100644
--- a/MesETL.App/HostedServices/OutputService.cs
+++ b/MesETL.App/HostedServices/OutputService.cs
@@ -1,4 +1,5 @@
using System.Buffers;
+using MesETL.App.Const;
using MesETL.App.Helpers;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
@@ -13,6 +14,8 @@ using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
namespace MesETL.App.HostedServices;
+public record DataOutputContext(IServiceProvider Serivces);
+
///
/// 数据导出服务,将数据导出至MySql服务
///
@@ -23,23 +26,26 @@ public class OutputService : IOutputService
private readonly ProcessContext _context;
private readonly ErrorRecorderFactory _errorRecorderFactory;
private readonly RecordQueuePool _queuePool;
+ private readonly IServiceProvider _services;
public OutputService(ILogger logger,
IOptions outputOptions,
ProcessContext context,
RecordQueuePool queuePool,
- ErrorRecorderFactory errorRecorderFactory)
+ ErrorRecorderFactory errorRecorderFactory,
+ IServiceProvider services)
{
_logger = logger;
_outputOptions = outputOptions;
_context = context;
_queuePool = queuePool;
_errorRecorderFactory = errorRecorderFactory;
+ _services = services;
}
public async Task ExecuteAsync(CancellationToken ct)
{
- _logger.LogInformation("***** Output service started *****");
+ _logger.LogInformation("***** 输出服务已启动 *****");
var dbTaskManager = new TaskManager(5);
var dbTasks = new Dictionary();
while (!_context.IsTransformCompleted)
@@ -59,7 +65,8 @@ public class OutputService : IOutputService
await TaskExtensions.WaitUntil(() => dbTaskManager.RunningTaskCount == 0, 25, ct);
_context.CompleteOutput();
- _logger.LogInformation("***** Output service finished *****");
+ _outputOptions.Value.OutputFinished?.Invoke(new DataOutputContext(_services));
+ _logger.LogInformation("***** 输出服务执行完毕 *****");
}
private async Task StartDatabaseWorker(string db, DataRecordQueue queue, CancellationToken ct = default)
@@ -72,8 +79,9 @@ public class OutputService : IOutputService
{
if (ct.IsCancellationRequested)
break;
-
- if (!queue.TryDequeue(out var record) || ignoreOutput.Contains(record.TableName)) continue;
+
+ if (!queue.TryDequeue(out var record) || record.Ignore || ignoreOutput.Contains(record.TableName))
+ continue;
var dbName = record.Database ?? throw new ApplicationException("输出的记录缺少数据库名");
if(dbName != db)
@@ -109,7 +117,7 @@ public class OutputService : IOutputService
await FlushAsync(db, tmp);
}
- _logger.LogInformation("*****输出线程结束,数据库: {db} *****", db);
+ _logger.LogInformation("***** 输出线程结束,数据库: {db} *****", db);
}
private async Task FlushAsync(string dbName, IEnumerable records)
@@ -142,6 +150,6 @@ public class OutputService : IOutputService
_context.AddOutput(value);
_context.AddTableOutput(key, value);
}
- _logger.LogTrace("Flushed {Count} records", tableOutput.Values.Sum(i => i));
+ _logger.LogTrace("输出任务:刷新了 {Count} 条记录", tableOutput.Values.Sum(i => i));
}
}
\ No newline at end of file
diff --git a/MesETL.App/HostedServices/TaskMonitorService.cs b/MesETL.App/HostedServices/TaskMonitorService.cs
index 88d99bd..4f331b4 100644
--- a/MesETL.App/HostedServices/TaskMonitorService.cs
+++ b/MesETL.App/HostedServices/TaskMonitorService.cs
@@ -80,27 +80,29 @@ public class TaskMonitorService
// running, error, completed, canceled, outputSpeed);
foreach (var logger in _monitorLoggers)
{
- logger.LogStatus("Monitor: Progress status", new Dictionary
+ var memory = GC.GetTotalMemory(false) / 1024 / 1024;
+ _context.MaxMemoryUsage = Math.Max(_context.MaxMemoryUsage, memory);
+ logger.LogStatus("系统监控", new Dictionary
{
- {"Input",_context.IsInputCompleted ? "OK" : $"{inputSpeed:F2}/s" },
- {"Transform", _context.IsTransformCompleted ? "OK" : $"{transformSpeed:F2}/s" },
- {"Output", _context.IsOutputCompleted ? "OK" : $"{outputSpeed:F2}/s" },
+ {"输入速度",_context.IsInputCompleted ? "OK" : $"{inputSpeed:F2}/s" },
+ {"转换速度", _context.IsTransformCompleted ? "OK" : $"{transformSpeed:F2}/s" },
+ {"输出速度", _context.IsOutputCompleted ? "OK" : $"{outputSpeed:F2}/s" },
- {"| Input Queue", _producerQueue.Count.ToString() },
- {"Output Queue", _queuePool.Queues.Values.Sum(queue => queue.Count).ToString()},
- {"Memory", $"{GC.GetTotalMemory(false) / 1024 / 1024} MiB"},
+ {"| 输入队列长度", _producerQueue.Count.ToString() },
+ {"输出队列长度", _queuePool.Queues.Values.Sum(queue => queue.Count).ToString()},
+ {"内存使用", $"{memory} MiB"},
});
var dict = _context.TableProgress
.ToDictionary(kv => kv.Key, kv => $"{kv.Value.input}/{kv.Value.output}");
- logger.LogStatus("Monitor: Table progress", dict, ITaskMonitorLogger.LogLevel.Progress);
- var sb = new StringBuilder("Table Progress: \n");
+ logger.LogStatus("系统监控: 表处理进度", dict, ITaskMonitorLogger.LogLevel.Progress);
+ var sb = new StringBuilder("表处理进度:\n");
foreach (var kv in dict)
{
sb.Append(kv.Key).AppendLine(kv.Value);
}
- sb.AppendLine($"LongestCharCount: {_producerQueue.LongestFieldCharCount}");
+ sb.AppendLine($"数据记录字段的最大长度:{_producerQueue.LongestFieldCharCount}");
await File.WriteAllTextAsync(_outputPath, sb.ToString(), CancellationToken.None);
diff --git a/MesETL.App/HostedServices/TransformService.cs b/MesETL.App/HostedServices/TransformService.cs
index 6dcaca6..c2a90d2 100644
--- a/MesETL.App/HostedServices/TransformService.cs
+++ b/MesETL.App/HostedServices/TransformService.cs
@@ -1,4 +1,5 @@
using MesETL.App.Cache;
+using MesETL.App.Const;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
@@ -9,7 +10,7 @@ using Microsoft.Extensions.Options;
namespace MesETL.App.HostedServices;
-public record DataTransformContext(DataRecord Record, ICacher Cacher, ILogger Logger);
+public record DataTransformContext(DataRecord Record, ICacher Cacher, ILogger Logger, IServiceProvider Services);
///
/// 数据处理服务,对导入后的数据进行处理
@@ -23,6 +24,7 @@ public class TransformService : ITransformService
private readonly ProcessContext _context;
private readonly ICacher _cache;
private readonly ErrorRecorderFactory _errorRecorderFactory;
+ private readonly IServiceProvider _services;
public TransformService(ILogger logger,
@@ -31,7 +33,8 @@ public class TransformService : ITransformService
RecordQueuePool queuePool,
ProcessContext context,
ICacher cache,
- ErrorRecorderFactory errorRecorderFactory)
+ ErrorRecorderFactory errorRecorderFactory,
+ IServiceProvider services)
{
_logger = logger;
_options = options;
@@ -40,11 +43,12 @@ public class TransformService : ITransformService
_context = context;
_cache = cache;
_errorRecorderFactory = errorRecorderFactory;
+ _services = services;
}
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
- _logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
+ _logger.LogInformation("***** 数据转换服务已启动, 当前线程ID: {ThreadId} *****", Environment.CurrentManagedThreadId);
// var tasks = new List();
// for (int i = 0; i < 4; i++)
@@ -55,7 +59,7 @@ public class TransformService : ITransformService
// await Task.WhenAll(tasks);
await TransformWorker();
- _logger.LogInformation("***** Data transformation service finished *****");
+ _logger.LogInformation("***** 数据转换服务执行完毕 *****");
}
public async Task TransformWorker()
@@ -66,10 +70,10 @@ public class TransformService : ITransformService
{
continue;
}
-
+
try
{
- var context = new DataTransformContext(record, _cache, _logger);
+ var context = new DataTransformContext(record, _cache, _logger, _services);
if (_options.Value.EnableFilter)
{
// 数据过滤
@@ -85,7 +89,7 @@ public class TransformService : ITransformService
{
record = await replacer(context);
}
- }
+ }
// 字段缓存
var cacher = _options.Value.RecordCache;
@@ -96,9 +100,6 @@ public class TransformService : ITransformService
var dbFilter = _options.Value.DatabaseFilter
?? throw new ApplicationException("未配置数据库过滤器");
record.Database = dbFilter(record);
-
- await _queuePool[record.Database].EnqueueAsync(record);
- _context.AddTransform();
if (_options.Value.EnableReBuilder)
{
@@ -109,12 +110,15 @@ public class TransformService : ITransformService
foreach (var rc in addRecords)
{
if(dbFilter is not null)
- rc.Database =dbFilter.Invoke(record);
+ rc.Database = dbFilter.Invoke(record);
await _queuePool[record.Database].EnqueueAsync(rc);
_context.AddTransform();
}
}
}
+
+ await _queuePool[record.Database].EnqueueAsync(record);
+ _context.AddTransform();
}
catch (Exception e)
{
diff --git a/MesETL.App/Options/DataInputOptions.cs b/MesETL.App/Options/DataInputOptions.cs
index b36fdf1..a9f5560 100644
--- a/MesETL.App/Options/DataInputOptions.cs
+++ b/MesETL.App/Options/DataInputOptions.cs
@@ -4,6 +4,9 @@ namespace MesETL.App.Options
{
public class DataInputOptions
{
+ ///
+ /// 文件输入的目录
+ ///
public string? InputDir { get; set; }
#region CSV
@@ -22,29 +25,46 @@ namespace MesETL.App.Options
#region Mock
+ ///
+ /// 生成模拟数据进行测试
+ /// 启用后在读取数据时会截取ZST文件中的CSV文件的第一条记录,然后复制成指定数量的数据
+ ///
public bool UseMock { get; set; }
+ ///
+ /// 当开启模拟数据生成时,模拟数据的倍数
+ ///
public double MockCountMultiplier { get; set; } = 1;
///
- /// Table -> Mock Count 暂时为手动配置
+ /// 配置每张表生成模拟数据的规则,此属性暂时在程序中配置
///
public Dictionary? TableMockConfig { get; set; }
#endregion
- #region ManualSet
+ #region Reader
+ ///
+ /// 配置输入表及其顺序,如果为空则按照程序默认的顺序。
+ /// 该值如果存在,程序会按照集合中表的顺序来读取数据,不在集合中的表将被忽略!
+ ///
public string[]? TableOrder { get; set; }
+ ///
+ /// 忽略集合中配置的表,不进行读取
+ ///
public string[] TableIgnoreList { get; set; } = [];
///
/// 配置如何从文件名转换为表名和表头
///
- public Func? FileInputMetaBuilder { get; set; } //TODO: 抽离
+ public Func? FileInputMetaBuilder { get; set; }
+ ///
+ /// 表输入完成事件
+ ///
public Action? OnTableInputCompleted { get; set; }
#endregion
diff --git a/MesETL.App/Options/DataTransformOptions.cs b/MesETL.App/Options/DataTransformOptions.cs
index c80af1b..f1c129e 100644
--- a/MesETL.App/Options/DataTransformOptions.cs
+++ b/MesETL.App/Options/DataTransformOptions.cs
@@ -20,26 +20,26 @@ public class DataTransformOptions
///
/// yyyyMM
///
- public string CleanDate { get; set; } = "202301";
+ public string CleanDate { get; set; } = "202401";
///
/// Record -> Database name
- /// 对记录进行数据库过滤
+ /// 决定记录应当被插入到哪一个数据库中
///
public Func? DatabaseFilter { get; set; }
///
/// Context -> Should output
- /// 配置对数据过滤的条件
+ /// 对记录进行过滤,返回false则不输出
///
public Func>? RecordFilter { get; set; }//数据过滤方法
///
/// Context -> New record
- /// 对当前记录进行修改或完整替换
+ /// 对当前记录进行修改或完整替换,你可以在这里修改记录中的字段,或者新增/删除字段
///
public Func>? RecordModify { get; set; }//数据替换
///
/// Context -> New rebuild records
- /// 使用当前记录对某些数据进行重建
+ /// 基于当前记录新增多个记录
///
public Func?>? RecordReBuild { get; set; }//新增数据
///
diff --git a/MesETL.App/Options/DatabaseOutputOptions.cs b/MesETL.App/Options/DatabaseOutputOptions.cs
index d2a9714..0db426a 100644
--- a/MesETL.App/Options/DatabaseOutputOptions.cs
+++ b/MesETL.App/Options/DatabaseOutputOptions.cs
@@ -1,29 +1,66 @@
-namespace MesETL.App.Options;
+using MesETL.App.HostedServices;
+
+namespace MesETL.App.Options;
public class DatabaseOutputOptions
{
+ ///
+ /// 输出数据库的连接字符串
+ ///
public string? ConnectionString { get; set; }
+ ///
+ /// MySql max_allowed_packet变量值大小
+ ///
public int MaxAllowedPacket { get; set; } = 32 * 1024 * 1024;
+ ///
+ /// 每次Insert提交的数据量
+ ///
public int FlushCount { get; set; } = 10000;
+ ///
+ /// 每个数据库最大提交任务数
+ ///
public int MaxDatabaseOutputTask { get; set; } = 4;
+ ///
+ /// 将json列作为16进制格式输出(0x前缀)
+ ///
public bool TreatJsonAsHex { get; set; } = true;
+ ///
+ /// 不对某些表进行输出
+ ///
public string[] NoOutput { get; set; } = [];
+ ///
+ /// 当某张表的键出现重复时,在输出时使用ON DUPLICATE KEY UPDATE更新该条记录
+ /// 表名为键,更新的字段为值
+ ///
+ ///
+ /// {
+ /// // 当order_data_parts表的键出现重复时,使用ON DUPLICATE KEY UPDATE更新已存在记录的CompanyID为新插入记录的值
+ /// "order_data_parts": "CompanyID = new.CompanyID"
+ /// }
+ ///
+ ///
+ ///
public Dictionary? ForUpdate { get; set; }
///
- /// 配置导入数据的特殊列
+ /// 配置导入数据的特殊列,请在代码中配置
///
public Dictionary ColumnTypeConfig { get; set; } = new(); // "table.column" -> type
+
+ ///
+ /// 所有数据都输出完毕时的事件,请在代码中配置
+ ///
+ public Action? OutputFinished { get; set; }
public ColumnType GetColumnType(string table, string column)
{
- return ColumnTypeConfig.GetValueOrDefault($"{table}.{column}", ColumnType.UnDefine);
+ return ColumnTypeConfig.GetValueOrDefault(string.Concat(table, ".", column), ColumnType.UnDefine);
}
public bool TryGetForUpdate(string table, out string? forUpdate)
diff --git a/MesETL.App/Options/RedisCacheOptions.cs b/MesETL.App/Options/RedisCacheOptions.cs
index 10dc669..145099a 100644
--- a/MesETL.App/Options/RedisCacheOptions.cs
+++ b/MesETL.App/Options/RedisCacheOptions.cs
@@ -1,8 +1,20 @@
namespace MesETL.App.Options;
+///
+/// Redis缓存选项
+///
public class RedisCacheOptions
{
+ ///
+ /// Redis连接字符串
+ ///
public string? Configuration { get; init; }
+ ///
+ /// Redis实例名称
+ ///
public string InstanceName { get; init; } = "";
+ ///
+ /// 使用的数据库序号
+ ///
public int Database { get; init; } = 0;
}
\ No newline at end of file
diff --git a/MesETL.App/Options/TableMockConfig.cs b/MesETL.App/Options/TableMockConfig.cs
index ab11d63..45d29e1 100644
--- a/MesETL.App/Options/TableMockConfig.cs
+++ b/MesETL.App/Options/TableMockConfig.cs
@@ -1,5 +1,8 @@
namespace MesETL.App.Options;
+///
+/// 表模拟数据生成规则
+///
public struct TableMockConfig
{
///
diff --git a/MesETL.App/Options/TenantDbOptions.cs b/MesETL.App/Options/TenantDbOptions.cs
index f18cf13..0189bc6 100644
--- a/MesETL.App/Options/TenantDbOptions.cs
+++ b/MesETL.App/Options/TenantDbOptions.cs
@@ -1,5 +1,8 @@
namespace MesETL.App.Options;
+///
+/// 多租户分库配置
+///
public class TenantDbOptions
{
public string? TenantKey { get; set; }
@@ -16,8 +19,21 @@ public class TenantDbOptions
// DbList.ForEach(pair => dictionary.Add(pair.Value, pair.Key));
// 注意配置顺序
if(DbGroup is null) throw new ApplicationException("分库配置中没有发现任何数据库");
- var dbName = DbGroup.Cast?>()
- .FirstOrDefault(pair => pair?.Value != null && pair.Value.Value > tenantKeyValue)!.Value.Key;
+
+ #region 性能较低,不使用
+
+ // var dbName = DbGroup.Cast?>()
+ // .FirstOrDefault(pair => pair?.Value != null && pair.Value.Value > tenantKeyValue)!.Value.Key;
+
+ #endregion
+
+ string? dbName = null;
+ foreach (var (key, value) in DbGroup)
+ {
+ if (value > tenantKeyValue)
+ dbName = key;
+ }
+
return dbName ??
throw new ArgumentOutOfRangeException(nameof(tenantKeyValue),
$"分库配置中没有任何符合'{nameof(tenantKeyValue)}'值的数据库");
diff --git a/MesETL.App/Program.cs b/MesETL.App/Program.cs
index c2299aa..2e66937 100644
--- a/MesETL.App/Program.cs
+++ b/MesETL.App/Program.cs
@@ -1,5 +1,6 @@
-// #define USE_TEST_DB // 测试库的结构与生产库不一样,如果使用测试库运行,则加上USE_TEST_DB预处理器指令
+#define USE_TEST_DB // 如果使用测试库运行,则加上USE_TEST_DB预处理器指令
+using System.Text;
using MesETL.App;
using MesETL.App.Services;
using MesETL.App.Services.ETL;
@@ -10,6 +11,7 @@ using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services.ErrorRecorder;
using MesETL.App.Services.Loggers;
+using MesETL.App.Services.Seq;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
@@ -68,9 +70,10 @@ async Task RunProgram()
options.DbGroup = tenantDbOptions.DbGroup;
});
host.Services.Configure(redisSection);
-
+
var oldestTime = DateTime.ParseExact(transformOptions.CleanDate, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo);
- var oldestTimeInt = int.Parse(transformOptions.CleanDate);
+ var oldestTimeInt_yyyyMM = int.Parse(transformOptions.CleanDate);
+ var oldestTimeInt_yyMM = int.Parse(transformOptions.CleanDate[2..]);
// 输入配置
host.Services.Configure(options =>
@@ -80,151 +83,17 @@ async Task RunProgram()
options.TableMockConfig = inputOptions.TableMockConfig;
options.MockCountMultiplier = inputOptions.MockCountMultiplier;
options.TableIgnoreList = inputOptions.TableIgnoreList;
+ options.TableOrder = inputOptions.TableOrder;
- // 配置文件输入方法
- options.FileInputMetaBuilder = fileName =>
- {
- if (fileName.EndsWith(".dat.zst"))
- {
- var tableName = DumpDataHelper.GetTableNameFromCsvFileName(
- Path.GetFileNameWithoutExtension(fileName)); // 去除.zst
- string[]? headers;
- try
- {
- // 查找同目录下同表的SQL文件
- var sqlFile = Directory.GetFiles(options.InputDir)
- .SingleOrDefault(f => f.Equals(fileName.Replace(".dat.zst",".sql.zst")));
- if (sqlFile is null)
- return null;
- headers = DumpDataHelper.GetCsvHeadersFromSqlFile(
- DumpDataHelper.DecompressZstAsStringAsync(File.OpenRead(sqlFile)).Result);
- }
- catch (InvalidOperationException e)
- {
- throw new ApplicationException($"目录下不止一个{tableName}表的SQL文件", e);
- }
-
- return new FileInputInfo
- {
- FileName = fileName,
- TableName = tableName,
- Headers = headers
- };
- }
- return null;
- };
+ // 配置文件元数据构建方法
+ options.FileInputMetaBuilder = DumpDataHelper.MyDumperFileInputMetaBuilder;
- // 配置表输入完成事件,字典清理
- options.OnTableInputCompleted = table =>
- {
- switch (table)
- {
- case TableNames.OrderBlockPlan:
- MemoryCache.Instance?.Delete(s => s.StartsWith(TableNames.Order + '-'));
- break;
- case TableNames.OrderItem:
- MemoryCache.Instance?.Delete(s => s.StartsWith(TableNames.OrderBlockPlan + '-'));
- break;
- case TableNames.OrderProcessSchedule:
- MemoryCache.Instance?.Delete(s => s.StartsWith(TableNames.OrderProcess + '-'));
- break;
- }
- };
+ // 配置表输入完成事件
+ options.OnTableInputCompleted = null;
- options.TableOrder = inputOptions.TableOrder ??
- [
- TableNames.Machine,
-
- TableNames.Order,
- TableNames.OrderBoxBlock, // 依赖Order.CompanyID
- TableNames.OrderDataBlock, // 依赖Order.CompanyID
-
- TableNames.OrderBlockPlan,
- TableNames.OrderBlockPlanResult,// 依赖OrderBlockPlan.CompanyID / 删除
-
- TableNames.OrderItem,
- TableNames.OrderDataGoods,
- TableNames.OrderDataParts,
- TableNames.OrderModule,
- TableNames.OrderModuleExtra,
- TableNames.OrderModuleItem,
- TableNames.OrderPackage,
-
- TableNames.OrderProcess,
- TableNames.OrderProcessStep,
- TableNames.OrderProcessStepItem,// 依赖OrderProcess.ShardKey / 删除
-
- TableNames.OrderProcessSchedule,
- TableNames.OrderScrapBoard,
- TableNames.ProcessGroup,
- TableNames.ProcessInfo,
- TableNames.ProcessItemExp,
- TableNames.ProcessScheduleCapacity,
- TableNames.ProcessStepEfficiency,
- TableNames.ReportTemplate,
- TableNames.SimplePackage,
- TableNames.SimplePlanOrder,
- TableNames.SysConfig,
- TableNames.WorkCalendar,
- TableNames.WorkShift,
- TableNames.WorkTime
- ];
-
- // options.TableMockConfig = new Dictionary
- // {
- // { TableNames.Machine, new TableMockConfig(true, 14655, ["ID"]) },
- // { TableNames.Order, new TableMockConfig(true, 5019216, ["OrderNo"]) },
- // { TableNames.OrderDataBlock, new TableMockConfig(true, 731800334, ["ID"]) },
- // { TableNames.OrderDataGoods, new TableMockConfig(true, 25803671, ["ID"]) },
- // { TableNames.OrderDataParts, new TableMockConfig(true, 468517543, ["ID"]) },
- // { TableNames.OrderModule, new TableMockConfig(true, 103325385, ["ID"]) },
- // { TableNames.OrderModuleExtra, new TableMockConfig(true, 54361321, ["ID"]) },
- // { TableNames.OrderModuleItem, new TableMockConfig(true, 69173339, ["ID"]) },
- // { TableNames.OrderPackage, new TableMockConfig(true, 16196195, ["ID"]) },
- // { TableNames.OrderProcess, new TableMockConfig(true, 3892685, ["ID"]) },
- // { TableNames.OrderProcessStep, new TableMockConfig(true, 8050349, ["ID"]) },
- // { TableNames.OrderProcessStepItem, new TableMockConfig(true, 14538058, ["ID"]) },
- // { TableNames.OrderScrapBoard, new TableMockConfig(true, 123998, ["ID"]) },
- // { TableNames.ProcessGroup, new TableMockConfig(true, 1253, ["ID"]) },
- // { TableNames.ProcessInfo, new TableMockConfig(true, 7839, ["ID"]) },
- // { TableNames.ProcessItemExp, new TableMockConfig(true, 28, ["ID"]) },
- // { TableNames.ProcessScheduleCapacity, new TableMockConfig(true, 39736, ["ID"]) },
- // { TableNames.ProcessStepEfficiency, new TableMockConfig(true, 8, ["ID"]) },
- // { TableNames.ReportTemplate, new TableMockConfig(true, 7337, ["ID"]) },
- // { TableNames.SimplePackage, new TableMockConfig(true, 130436, ["ID"]) },
- // { TableNames.SysConfig, new TableMockConfig(true, 2296, ["ID"]) },
- // { TableNames.WorkCalendar, new TableMockConfig(true, 11, ["ID"]) },
- // { TableNames.WorkShift, new TableMockConfig(true, 59, ["ID"]) },
- // { TableNames.WorkTime, new TableMockConfig(true, 62, ["ID"]) }
- // };
+ // 配置表模拟数据
options.TableMockConfig = new Dictionary
- {
- { TableNames.Machine, new TableMockConfig(true, 14655, ["ID"]) },
- { TableNames.Order, new TableMockConfig(true, 50192, ["OrderNo"]) },
- { TableNames.OrderDataBlock, new TableMockConfig(true, 7318003, ["ID"]) },
- { TableNames.OrderDataGoods, new TableMockConfig(true, 258036, ["ID"]) },
- { TableNames.OrderDataParts, new TableMockConfig(true, 4685175, ["ID"]) },
- { TableNames.OrderItem, new TableMockConfig(true, 13298896, ["ID"])},
- { TableNames.OrderModule, new TableMockConfig(true, 1033253, ["ID"]) },
- { TableNames.OrderModuleExtra, new TableMockConfig(true, 543613, ["ID"]) },
- { TableNames.OrderModuleItem, new TableMockConfig(true, 691733, ["ID"]) },
- { TableNames.OrderPackage, new TableMockConfig(true, 161961, ["ID"]) },
- { TableNames.OrderProcess, new TableMockConfig(true, 38926, ["ID"]) },
- { TableNames.OrderProcessStep, new TableMockConfig(true, 80503, ["ID"]) },
- { TableNames.OrderProcessStepItem, new TableMockConfig(true, 145380, ["ID"]) },
- { TableNames.OrderScrapBoard, new TableMockConfig(true, 1239, ["ID"]) },
- { TableNames.ProcessGroup, new TableMockConfig(true, 125, ["ID"]) },
- { TableNames.ProcessInfo, new TableMockConfig(true, 783, ["ID"]) },
- { TableNames.ProcessItemExp, new TableMockConfig(true, 28, ["ID"]) },
- { TableNames.ProcessScheduleCapacity, new TableMockConfig(true, 39736, ["ID"]) },
- { TableNames.ProcessStepEfficiency, new TableMockConfig(true, 8, ["ID"]) },
- { TableNames.ReportTemplate, new TableMockConfig(true, 7337, ["ID"]) },
- { TableNames.SimplePackage, new TableMockConfig(true, 130436, ["ID"]) },
- { TableNames.SysConfig, new TableMockConfig(true, 2296, ["Key"]) },
- { TableNames.WorkCalendar, new TableMockConfig(true, 11, ["ID"]) },
- { TableNames.WorkShift, new TableMockConfig(true, 59, ["ID"]) },
- { TableNames.WorkTime, new TableMockConfig(true, 62, ["ID"]) }
- };
+ { };
});
host.Services.Configure(options =>
@@ -240,109 +109,64 @@ async Task RunProgram()
// order_block_plan_item和order_package_item表不导入,根据order_item数据直接重建
// 数据清理
- options.RecordFilter = async context =>
+ options.RecordFilter = async context => // TODO: OPT: oldestTime等外部变量会产生闭包
{
var record = context.Record;
- var cache = context.Cacher;
switch (record.TableName)
{
- // OrderBoxBlock删除对应Order.OrderNo不存在的对象
- case TableNames.OrderBoxBlock:
- {
- if (!await cache.ExistsAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])))
- return false;
- break;
- }
- // OrderDataBlock删除对应Order.OrderNo不存在的对象
- case TableNames.OrderDataBlock:
- {
- if (!await cache.ExistsAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])))
- return false;
- break;
- }
- // OrderDataParts删除对应Order.OrderNo不存在的对象
- case TableNames.OrderDataParts:
- {
- if (!await cache.ExistsAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])))
- return false;
- break;
- }
- // OrderBlockPlan删除CreateTime < 202301的
+ // 清理CreateTime < 202401的
case TableNames.OrderBlockPlan:
{
- var time = DateTime.Parse(record["CreateTime"].Trim('"','\''));
- if (time < oldestTime)
+ var creationTime = DateTime.Parse(record["CreateTime"].AsSpan().Trim(['"', '\'']));
+ if (creationTime < oldestTime)
+ {
return false;
-
- // if (!DumpDataHelper.IsJson(record["OrderNos"])) return false; //Json列合法检查
+ }
break;
}
- // OrderBlockPlanResult删除对应order_block_plan.ID不存在的对象
- case TableNames.OrderBlockPlanResult:
+ // 清理ShardKey < 24010的
+ case TableNames.OrderExtra:
{
- if (!await cache.ExistsAsync(CacheKeysFunc.OrderBlockPlan_ID_CompanyID(record["ID"])))
+ var shardKey = int.Parse(record["ShardKey"].AsSpan()[..4]);
+ if (shardKey < oldestTimeInt_yyMM)
+ {
return false;
+ }
break;
}
- // case TableNames.OrderBlockPlanResult: // 用SaveTime过滤
- // {
- // if (DateTime.Parse(record["SaveTime"].Trim('"', '\'')) < oldestTime)
- // return false;
- // break;
- // }
- // OrderDataGoods Json列合法检查
- case TableNames.OrderDataGoods:
+ // 清理(Status != 0 || Deleted = 1) && ID前四位 < 2401的
+ case TableNames.OrderScrapBoard:
{
- // if (!DumpDataHelper.IsJson(record["ExtraProp"])) return false;
- break;
- }
- // OrderModule删除OrderNo < 202301的
- case TableNames.OrderModule:
- {
- var orderNo = record["OrderNo"];
- if(int.Parse(orderNo.AsSpan(0, 6).ToString()) < oldestTimeInt)
+ var status = record["Status"].AsSpan();
+ var deleted = record["Deleted"].AsSpan();
+ var idPref = int.Parse(record["ID"].AsSpan()[..4]);
+ if ((status is not "0" || deleted is "1") && idPref < oldestTimeInt_yyMM)
+ {
return false;
+ }
break;
}
- // OrderProcess删除OrderNo < 202301的
- case TableNames.OrderProcess:
- {
- var orderNo = record["OrderNo"];
- if(int.Parse(orderNo.AsSpan(0, 6).ToString()) < oldestTimeInt)
- return false;
- break;
- }
- // OrderProcessStep删除OrderNo < 202301的
- case TableNames.OrderProcessStep:
- {
- var orderNo = record["OrderNo"];
- if(int.Parse(orderNo.AsSpan(0, 6).ToString()) < oldestTimeInt)
- return false;
- break;
- }
- // OrderProcessStepStep删除对应OrderProcess.ID不存在的对象
- case TableNames.OrderProcessStepItem:
- {
- if (!await cache.ExistsAsync(CacheKeysFunc.OrderProcess_ID_ShardKey(record["OrderProcessID"])))
- return false;
- break;
- }
- // SimplePackage删除OrderNo < 202301的
+ // 清理OrderNo < 202401的
case TableNames.SimplePackage:
{
- var orderNo = record["OrderNo"];
- if(int.Parse(orderNo.AsSpan(0, 6).ToString()) < oldestTimeInt)
+ var orderNo = int.Parse(record["OrderNo"].AsSpan()[..4]);
+ if (orderNo < oldestTimeInt_yyMM)
+ {
return false;
+ }
break;
}
- // SimplePlanOrder删除CreateTime < 202301的
+ // 清理CreateTime < 202401的
case TableNames.SimplePlanOrder:
{
- var time = DateTime.Parse(record["CreateTime"].Trim('"', '\''));
- if (time < oldestTime)
+ var creationTime = DateTime.Parse(record["CreateTime"].AsSpan().Trim(['"', '\'']));
+ if (creationTime < oldestTime)
+ {
return false;
+ }
break;
}
+ default: break;
}
return true;
@@ -378,94 +202,53 @@ async Task RunProgram()
var cache = context.Cacher;
switch (record.TableName)
{
- // Machine处理非空列
- case TableNames.Machine:
- ReplaceIfMyDumperNull(record, "Name", DefaultStr);
- ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime);
- ReplaceIfMyDumperNull(record, "CreatorID", DefaultInt);
- ReplaceIfMyDumperNull(record, "EditTime", DefaultDateTime);
- ReplaceIfMyDumperNull(record, "EditorID", DefaultInt);
- ReplaceIfMyDumperNull(record, "Settings", DefaultText);
- break;
- // Order处理非空列
- case TableNames.Order:
- ReplaceIfMyDumperNull(record, "Deleted", DefaultInt);
- break;
- // OrderBlockPlan处理text->json列
- case TableNames.OrderBlockPlan:
- // 将所有值为'[]'(即字符串长度小等于2(16进制长度小于4))的置空 [] = 0x5b5d
- if (record["OrderNos"].Length <= 4)
- record["OrderNos"] = "NULL";
- break;
- // OrderBlockPlanResult,添加CompanyID
- case TableNames.OrderBlockPlanResult:
- record.AddField("CompanyID",
- // 获取OrderBlockPlan.ID -> CompanyID
- ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.OrderBlockPlan_ID_CompanyID(record["ID"])),
- TableNames.OrderBlockPlanResult, TableNames.OrderBlockPlan, "ID", "无法获取对应的CompanyID"));
- break;
- // OrderBoxBlock添加CompanyID列
+ // 重构Data列二进制数据
case TableNames.OrderBoxBlock:
- record.AddField("CompanyID",
- // 获取Order.OrderNo -> CompanyID
- ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])),
- TableNames.OrderBoxBlock, TableNames.Order, "OrderNo", "无法获取对应的CompanyID"));
+ {
+ var data = record["Data"];
+ if (data is not ConstVar.MyDumperNull and ConstVar.Null)
+ {
+ var hex = Encoding.UTF8.GetString(Convert.FromHexString(data));
+ record["Data"] = hex;
+ }
+
break;
- // 修正OrderDataBlock.CompanyID
- case TableNames.OrderDataBlock:
- record["CompanyID"] =
- // 获取Order.OrderNo -> CompanyID
- ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])),
- TableNames.OrderDataBlock, TableNames.Order, "OrderNo", "无法获取对应的CompanyID");
- break;
- // 修正OrderDataParts.CompanyID:
- case TableNames.OrderDataParts:
- record["CompanyID"] =
- // 获取Order.OrderNo -> CompanyID
- ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])),
- TableNames.OrderDataParts, TableNames.Order, "OrderNo", "无法获取对应的CompanyID");
- break;
- // OrderModule添加ShardKey列,移除ViewFileName列
+ }
+ // 移除ViewFileName列
case TableNames.OrderModule:
- record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
- record.RemoveField("ViewFileName");
+ {
+#if USE_TEST_DB
+ if (record.HeaderExists("ViewFileName"))
+#endif
+ record.RemoveField("ViewFileName");
break;
- // OrderProcess添加ShardKey列,NextStepID的空值转换为0
- case TableNames.OrderProcess:
- record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
+ }
+#if USE_TEST_DB
+ // 删除ID列,让数据库自行递增
+ // TODO: 数据表改进,删除ID列或是替换为流水号
+ case TableNames.ProcessStepEfficiency:
+ {
+ record.RemoveField("ID");
break;
- // OrderProcessStep添加ShardKey
- case TableNames.OrderProcessStep:
- record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
+ }
+ case TableNames.ProcessScheduleCapacity:
+ {
+ record.RemoveField("ID");
break;
- // OrderProcessStepItem添加ShardKey列,处理非空列
- case TableNames.OrderProcessStepItem:
- ReplaceIfMyDumperNull(record, "DataID", DefaultInt);
- record.AddField("ShardKey",
- // 获取OrderProcess.ID -> ShardKey
- ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.OrderProcess_ID_ShardKey(record["OrderProcessID"])),
- TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "无法获取对应的ShardKey"));
- break;
- // OrderScrapBoard处理非空列
- case TableNames.OrderScrapBoard:
- ReplaceIfMyDumperNull(record, "Color", DefaultStr);
- ReplaceIfMyDumperNull(record, "GoodsName", DefaultStr);
- ReplaceIfMyDumperNull(record, "Material", DefaultStr);
- ReplaceIfMyDumperNull(record, "MaterialName", DefaultStr);
- break;
- // ProcessItemExp处理非空列
- case TableNames.ProcessItemExp:
- ReplaceIfMyDumperNull(record, "MaxPartsID", DefaultInt);
- ReplaceIfMyDumperNull(record, "ProcessGroupID", DefaultInt);
- break;
- // SimplePlanOrder处理非空列,添加Deleted
+ }
+ // 测试环境忽略PlaceData列,生产环境会提前将其移除
case TableNames.SimplePlanOrder:
- ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime);
- ReplaceIfMyDumperNull(record, "UpdateTime", DefaultDateTime);
- ReplaceIfMyDumperNull(record, "CompanyID", DefaultInt);
- ReplaceIfMyDumperNull(record, "SingleName", DefaultStr);
- record.AddField("Deleted", "0");
+ {
+ record.RemoveField("PlaceData");
break;
+ }
+ case TableNames.SysConfig:
+ {
+ record.RemoveField("Key");
+ break;
+ }
+#endif
+ default: break;
}
return record;
@@ -480,34 +263,7 @@ async Task RunProgram()
};
// 数据缓存
- options.RecordCache = async context =>
- {
- var record = context.Record;
- var cache = context.Cacher;
- switch (record.TableName)
- {
- // 缓存Order.OrderNo -> CompanyID
- case TableNames.Order:
- await cache.SetStringAsync(
- CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"]),
- record["CompanyID"]);
- break;
-
- // 缓存OrderBlockPlan.ID -> CompanyID
- case TableNames.OrderBlockPlan:
- await cache.SetStringAsync(
- CacheKeysFunc.OrderBlockPlan_ID_CompanyID(record["ID"]),
- record["CompanyID"]);
- break;
-
- // 缓存OrderProcess.ID -> ShardKey
- case TableNames.OrderProcess:
- await cache.SetStringAsync(
- CacheKeysFunc.OrderProcess_ID_ShardKey(record["ID"]),
- record["ShardKey"]);
- break;
- }
- };
+ options.RecordCache = null;
// 数据库过滤
options.DatabaseFilter = record =>
@@ -520,36 +276,24 @@ async Task RunProgram()
options.RecordReBuild = context =>
{
var record = context.Record;
- var resultList = new List();
- // 分流OrderItem表
- if (record.TableName == TableNames.OrderItem)
+
+ // 将OrderExtra表迁移至OrderWaveGroup表
+ if (record.TableName == TableNames.OrderExtra)
{
- record.TryGetField("ID", out var itemId);
- record.TryGetField("ShardKey", out var shardKey);
- record.TryGetField("PlanID", out var planId);
- record.TryGetField("PackageID", out var packageId);
- record.TryGetField("CompanyID", out var companyId);
- if(!int.TryParse(planId, out var pid))
- throw new ApplicationException($"数据发生异常:OrderItem.PlanID,值: {planId}");
- if (pid > 0)
- {
- resultList.Add(new DataRecord(new[] { itemId, shardKey, planId, companyId },
- TableNames.OrderBlockPlanItem,
- ["ItemID", "ShardKey", "PlanID", "CompanyID"]
- ));
- }
- if(!int.TryParse(packageId, out var pkid))
- throw new ApplicationException($"数据发生异常:OrderItem.PackageID,值: {packageId}");
- if(pkid > 0)
- {
- resultList.Add(new DataRecord(new[] { itemId, shardKey, packageId, companyId },
- TableNames.OrderPackageItem,
- [ "ItemID", "ShardKey", "PackageID", "CompanyID" ]
- ));
- }
+ record.Ignore = true;
+ var resultList = new List();
+ var seq = context.Services.GetRequiredService();
+ string[] headers = ["OrderNo", "ShardKey", "ConfigType", "ConfigJson", "CompanyID"];
+ var id = seq.AddCachedSeq(SeqConfig.OrderWaveGroupID);
+ var orderWaveGroup = new DataRecord(
+ [id.ToString(), ..headers.Select(c => record[c])],
+ TableNames.OrderWaveGroup,
+ ["ID", "OrderNo", "ShardKey", "Type", "ConfigJson", "CompanyID"]);
+ resultList.Add(orderWaveGroup);
+ return resultList;
}
- return resultList;
+ return ArraySegment.Empty;
};
});
@@ -562,73 +306,43 @@ async Task RunProgram()
options.TreatJsonAsHex = outputOptions.TreatJsonAsHex;
options.NoOutput = outputOptions.NoOutput;
options.ForUpdate = outputOptions.ForUpdate;
-
-#if USE_TEST_DB
- // Test Server
- options.ColumnTypeConfig = new Dictionary
- {
- { "simple_plan_order.PlaceData", ColumnType.Blob },
- { "order_block_plan_result.PlaceData", ColumnType.Blob },
- { "order_box_block.Data", ColumnType.Blob },
- { "order_data_goods.ExtraProp", ColumnType.Json },
- { "order_module_extra.JsonStr", ColumnType.Text },
- { "process_info.Users", ColumnType.Text },
- { "order_process_schdule.CustomOrderNo", ColumnType.Text },
- { "order_process_schdule.OrderProcessStepName", ColumnType.Text },
- { "order_process_schdule.AreaName", ColumnType.Text },
- { "order_process_schdule.ConsigneeAddress", ColumnType.Text },
- { "order_process_schdule.ConsigneePhone", ColumnType.Text },
- { "report_source.Sql", ColumnType.Text },
- { "report_source.KeyValue", ColumnType.Text },
- { "report_source.Setting", ColumnType.Text },
- { "order_data_block.RemarkJson", ColumnType.Text },
- { "order_patch_detail.BlockDetail", ColumnType.Json },
- { "order_scrap_board.OutLineJson", ColumnType.Text },
- { "simple_package.Items", ColumnType.Json },
- { "order_batch_pack_config.Setting", ColumnType.Text },
- { "machine.Settings", ColumnType.Text },
- { "sys_config.Value", ColumnType.Text },
- { "sys_config.JsonStr", ColumnType.Text },
- { "process_item_exp.ItemJson", ColumnType.Text },
- { "report_template.Template", ColumnType.Text },
- { "report_template.SourceConfig", ColumnType.Text },
- { "order_block_plan.OrderNos", ColumnType.Json },
- { "order_block_plan.BlockInfo", ColumnType.Text },
- };
-#else
- // 配置列类型
+
+ // 配置列的类型以便于在输出时区分二进制内容
// Prod server
options.ColumnTypeConfig = new Dictionary
{
- { "simple_plan_order.PlaceData", ColumnType.Blob },
- { "order_block_plan_result.PlaceData", ColumnType.Blob },
- { "order_box_block.Data", ColumnType.Blob },
- { "order_data_goods.ExtraProp", ColumnType.Text },
- { "order_module_extra.JsonStr", ColumnType.Text },
- { "process_info.Users", ColumnType.Text },
- { "order_process_schdule.CustomOrderNo", ColumnType.Text },
- { "order_process_schdule.OrderProcessStepName", ColumnType.Text },
- { "order_process_schdule.AreaName", ColumnType.Text },
- { "order_process_schdule.ConsigneeAddress", ColumnType.Text },
- { "order_process_schdule.ConsigneePhone", ColumnType.Text },
- { "report_source.Sql", ColumnType.Text },
- { "report_source.KeyValue", ColumnType.Text },
- { "report_source.Setting", ColumnType.Text },
- { "order_data_block.RemarkJson", ColumnType.Text },
- { "order_patch_detail.BlockDetail", ColumnType.Text },
- { "order_scrap_board.OutLineJson", ColumnType.Text },
- { "simple_package.Items", ColumnType.Text },
- { "order_batch_pack_config.Setting", ColumnType.Text },
- { "machine.Settings", ColumnType.Text },
- { "sys_config.Value", ColumnType.Text },
- { "sys_config.JsonStr", ColumnType.Text },
- { "process_item_exp.ItemJson", ColumnType.Text },
- { "report_template.Template", ColumnType.Text },
- { "report_template.SourceConfig", ColumnType.Text },
- { "order_block_plan.OrderNos", ColumnType.Text },
- { "order_block_plan.BlockInfo", ColumnType.Text },
+ {"machine.Settings", ColumnType.Text},
+ {"order_block_plan.BlockInfo", ColumnType.Text},
+ {"order_block_plan.OrderNos", ColumnType.Json},
+ {"order_block_plan_result.PlaceData", ColumnType.Blob},
+ {"order_box_block.Data", ColumnType.Blob},
+ {"order_data_block.RemarkJson", ColumnType.Text},
+ {"order_data_goods.ExtraProp", ColumnType.Json},
+ {"order_extra.ConfigJson", ColumnType.Json},
+ {"order_module_extra.Data", ColumnType.Blob},
+ {"order_module_extra.JsonStr", ColumnType.Text},
+ {"order_patch_detail.BlockDetail", ColumnType.Json},
+ {"order_process_schdule.AreaName", ColumnType.Text},
+ {"order_process_schdule.ConsigneeAddress", ColumnType.Text},
+ {"order_process_schdule.ConsigneePhone", ColumnType.Text},
+ {"order_process_schdule.CustomOrderNo", ColumnType.Text},
+ {"order_process_schdule.OrderProcessStepName", ColumnType.Text},
+ {"order_scrap_board.OutLineJson", ColumnType.Text},
+ {"order_wave_group.ConfigJson", ColumnType.Json},
+ {"process_info.Users", ColumnType.Text},
+ {"process_item_exp.ItemJson", ColumnType.Text},
+ {"report_template.SourceConfig", ColumnType.Text},
+ {"report_template.Template", ColumnType.Text},
+ {"simple_package.Items", ColumnType.Json},
+ {"sys_config.JsonStr", ColumnType.Text},
+ {"sys_config.Value", ColumnType.Text}
+ };
+
+ options.OutputFinished += ctx =>
+ {
+ var seq = ctx.Serivces.GetRequiredService();
+ seq.ApplyToDatabaseAsync().GetAwaiter().GetResult();
};
-#endif
});
host.Services.AddLogging(builder =>
@@ -646,6 +360,7 @@ async Task RunProgram()
host.Services.AddDataSourceFactory();
host.Services.AddErrorRecorderFactory();
host.Services.AddSingleton();
+ host.Services.AddSingleton();
var prodLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue("ProducerQueueLength");
var consLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue("ConsumerQueueLength");
var maxCharCount = host.Configuration.GetRequiredSection("RecordQueue").GetValue("MaxByteCount") / 2;
diff --git a/MesETL.App/Services/ETL/MySqlDestination.cs b/MesETL.App/Services/ETL/MySqlDestination.cs
index 7051949..6972238 100644
--- a/MesETL.App/Services/ETL/MySqlDestination.cs
+++ b/MesETL.App/Services/ETL/MySqlDestination.cs
@@ -15,6 +15,9 @@ namespace MesETL.App.Services.ETL;
///
public partial class MySqlDestination : IDisposable, IAsyncDisposable
{
+ ///
+ /// table => records
+ ///
private readonly Dictionary> _recordCache;
private readonly MySqlConnection _conn;
private readonly ILogger _logger;
@@ -66,8 +69,8 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
try
{
- var excuseList = GetExcuseList(_recordCache, maxAllowPacket);
- foreach (var insertSql in excuseList)
+ var executionList = GetExecutionList(_recordCache, maxAllowPacket);
+ foreach (var insertSql in executionList)
{
cmd.CommandText = insertSql;
try
@@ -103,7 +106,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
[GeneratedRegex("INSERT INTO `([^`]+)`")]
private static partial Regex MatchTableName();
- public IEnumerable GetExcuseList(IDictionary> tableRecords,int maxAllowPacket)
+ public IEnumerable GetExecutionList(IDictionary> tableRecords, int maxAllowPacket)
{
var sb = new StringBuilder("SET AUTOCOMMIT = 1;\n");
var appendCount = 0;
@@ -116,13 +119,16 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
StartBuild:
var noCommas = true;
+ // 标准列顺序,插入时的字段需要按照该顺序排列
+ var headers = records[0].Headers;
+
// INSERT INTO ... VALUES >>>
sb.Append($"INSERT INTO `{tableName}`(");
- for (var i = 0; i < records[0].Headers.Count; i++)
+ for (var i = 0; i < headers.Count; i++)
{
var header = records[0].Headers[i];
sb.Append($"`{header}`");
- if (i != records[0].Headers.Count - 1)
+ if (i != headers.Count - 1)
sb.Append(',');
}
@@ -132,11 +138,20 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
for (;recordIdx < records.Count; recordIdx++)
{
var record = records[recordIdx];
+
+ // 数据列校验
+ if (record.Headers.Count != headers.Count)
+ {
+ throw new InvalidOperationException($"数据异常,数据列数量出现冲突,表名:{tableName}");
+ }
+
var recordSb = new StringBuilder();
recordSb.Append('(');
- for (var fieldIdx = 0; fieldIdx < record.Fields.Count; fieldIdx++)
+ for (var idx = 0; idx < headers.Count; idx++)
{
- var field = record.Fields[fieldIdx];
+ var header = headers[idx];
+ // TODO: 可进行性能优化
+ var field = record[header];
// 在这里处理特殊列
#region HandleFields
@@ -147,7 +162,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
goto Escape;
}
- switch (_options.Value.GetColumnType(record.TableName, record.Headers[fieldIdx]))
+ switch (_options.Value.GetColumnType(record.TableName, header))
{
case ColumnType.Text:
if(string.IsNullOrEmpty(field))
@@ -163,12 +178,12 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
recordSb.Append(ConstVar.Null);
else recordSb.Append($"0x{field}");
break;
- case ColumnType.Json:// 生产库没有JSON列,仅用于测试库进行测试
- if(string.IsNullOrEmpty(field))
- recordSb.Append("'[]'"); // JObject or JArray?
+ case ColumnType.Json: // Mydumper v0.16.7-5导出的Json为字符串,且会将逗号转义,需要适配
+ if(string.IsNullOrEmpty(field))
+ recordSb.Append(ConstVar.Null);
else if (_options.Value.TreatJsonAsHex)
recordSb.Append($"_utf8mb4 0x{field}");
- else recordSb.AppendLine(field);
+ else recordSb.AppendLine(field.Replace("\\,", ","));
break;
case ColumnType.UnDefine:
default:
@@ -179,7 +194,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
Escape:
#endregion
- if (fieldIdx != record.Fields.Count - 1)
+ if (idx != headers.Count - 1)
recordSb.Append(',');
}
diff --git a/MesETL.App/Services/Loggers/LoggerTaskMonitorLogger.cs b/MesETL.App/Services/Loggers/LoggerTaskMonitorLogger.cs
index 9af02f3..e386f0e 100644
--- a/MesETL.App/Services/Loggers/LoggerTaskMonitorLogger.cs
+++ b/MesETL.App/Services/Loggers/LoggerTaskMonitorLogger.cs
@@ -17,7 +17,7 @@ public class LoggerTaskMonitorLogger : ITaskMonitorLogger
var sb = new StringBuilder();
sb.Append($"{name}: {{");
sb.AppendJoin(',', properties.Select((pair, i) => $" {pair.Key}: {pair.Value}"));
- sb.Append('}');
+ sb.Append([' ', '}']);
// var args = new List { name };
// properties.Aggregate(args, (args, pair) =>
// {
diff --git a/MesETL.App/Services/ProcessContext.cs b/MesETL.App/Services/ProcessContext.cs
index 5ca8ab6..80cfb34 100644
--- a/MesETL.App/Services/ProcessContext.cs
+++ b/MesETL.App/Services/ProcessContext.cs
@@ -35,6 +35,8 @@ public class ProcessContext
set => Interlocked.Exchange(ref _outputCount, value);
}
+ public long MaxMemoryUsage { get; set; }
+
// TableName -> Count
public IReadOnlyDictionary TableProgress => _tableProgress;
diff --git a/MesETL.App/Services/Seq/SeqConfig.cs b/MesETL.App/Services/Seq/SeqConfig.cs
new file mode 100644
index 0000000..6ae5d67
--- /dev/null
+++ b/MesETL.App/Services/Seq/SeqConfig.cs
@@ -0,0 +1,42 @@
+// ReSharper disable InconsistentNaming
+namespace MesETL.App.Services.Seq;
+
+public class SeqConfig(string Name, bool Recycle = true, int Step = 1, long Max = 999_999_999)
+{
+ public string Name { get; init; } = Name;
+ public bool Recycle { get; init; } = Recycle;
+ public int Step { get; init; } = Step;
+ public long Max { get; init; } = Max;
+
+ public static readonly SeqConfig ItemNo = new("seq_ItemNo", true, 1, 999_999_999);
+ public static readonly SeqConfig OrderModuleID = new("seq_order_module_id", false);
+ public static readonly SeqConfig OrderDataID = new("seq_order_data_id", false);
+ public static readonly SeqConfig OrderItemID = new("seq_order_item", false);
+ public static readonly SeqConfig ProcessStepID = new("seq_step_id", false);
+ public static readonly SeqConfig PackageNo = new("seq_pack_no", true, 1, 9_999_999);
+ public static readonly SeqConfig PlanNo = new("seq_plan_order", true, 1, 999_999);
+ public static readonly SeqConfig SimplePlanNo = new("seq_simple_plan_order", true, 1, 999_999);
+
+ // 下面这些类型的流水号在BaseService添加实体时进行生成
+ public static readonly SeqConfig MachineID = new("seq_machine_id", false);
+ public static readonly SeqConfig OrderBlockPlanID = new("seq_order_block_plan_id", false);
+ public static readonly SeqConfig OrderDataGoodsID = new("seq_order_data_goods_id", false);
+ public static readonly SeqConfig OrderPackageID = new("seq_order_pack_id", false);
+ public static readonly SeqConfig OrderProcessID = new("seq_order_process_id", false);
+ public static readonly SeqConfig OrderProcessStepItemID = new("seq_order_process_step_item_id", false);
+ public static readonly SeqConfig ProcessGroupID = new("seq_process_group_id", false);
+ public static readonly SeqConfig ProcessInfoID = new("seq_process_info_id", false);
+ public static readonly SeqConfig ProcessItemExpID = new("seq_process_item_exp_id", false);
+ public static readonly SeqConfig ProcessScheduleCapacityID = new("seq_process_schedule_capacity_id", false);
+ public static readonly SeqConfig ProcessStepEfficiencyID = new("seq_process_step_efficiency_id", false);
+ public static readonly SeqConfig ReportTemplateID = new("seq_report_template_id", false);
+ public static readonly SeqConfig SysConfigKey = new("seq_sys_config_key", false);
+ public static readonly SeqConfig WorkCalendarID = new("seq_work_calendar_id", false);
+ public static readonly SeqConfig WorkShiftID = new("seq_work_shift_id", false);
+ public static readonly SeqConfig WorkTimeID = new("seq_work_time_id", false);
+ public static readonly SeqConfig OrderPatchDetailID = new("seq_order_patch_detail_id", false);
+ public static readonly SeqConfig OrderModuleExtraID = new("seq_order_module_extra_id", false);
+ public static readonly SeqConfig SimplePackageID = new("seq_simple_pack_id", false);
+ public static readonly SeqConfig OrderModuleItemID = new("seq_order_module_item_id", false);
+ public static readonly SeqConfig OrderWaveGroupID = new("seq_order_wave_group_id", false);
+}
\ No newline at end of file
diff --git a/MesETL.App/Services/Seq/SeqService.cs b/MesETL.App/Services/Seq/SeqService.cs
new file mode 100644
index 0000000..42da8fe
--- /dev/null
+++ b/MesETL.App/Services/Seq/SeqService.cs
@@ -0,0 +1,134 @@
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Text;
+using MesETL.App.Options;
+using MesETL.Shared.Helper;
+using Microsoft.Extensions.Options;
+using MySqlConnector;
+
+namespace MesETL.App.Services.Seq;
+
+public class SeqService
+{
+ private readonly string _connectionString;
+ private readonly Dictionary _cachedSequence;
+
+ public IReadOnlyDictionary CachedSequence => _cachedSequence;
+
+ public SeqService(IOptions options)
+ {
+ var connStr = options.Value.ConnectionString ?? throw new ApplicationException("未配置输出数据库连接字符串");
+ var builder = new MySqlConnectionStringBuilder(connStr)
+ {
+ Database = "mes_global"
+ };
+ _connectionString = builder.ConnectionString;
+
+ _cachedSequence = new Dictionary();
+ }
+
+ private async Task UpdateSequenceID(string name,int step,long max,bool recycle, int add)
+ {
+ var sql = new StringBuilder(
+ $"""
+ INSERT INTO seq (SeqName,CurrentVal,Increment,MinVal,MaxVal,UpdateTime)
+ VALUES ({name},{add},{step},1,{max},NOW())
+ ON DUPLICATE KEY UPDATE UpdateTime = NOW(),
+ """);
+ if (recycle)
+ {
+ sql.Append($"CurrentVal = (@updatedVal := IF(CurrentVal + {add} >= MaxVal, {add}, CurrentVal + {add}));");
+ }
+ else
+ {
+ sql.Append($"CurrentVal = (@updatedVal := CurrentVal + {add});");
+ }
+ sql.Append("SELECT @updatedVal;");
+ var result = await DatabaseHelper.QueryScalarAsync(_connectionString, sql.ToString());
+ return Convert.ToInt64(result);
+ }
+
+ public async Task PeekKey(SeqConfig config)
+ {
+ var sql = $"SELECT CurrentVal FROM seq WHERE SeqName = '{config.Name}' LIMIT 1;";
+ return Convert.ToInt64(await DatabaseHelper.QueryScalarAsync(_connectionString, sql));
+ }
+
+ public async Task GetKeys(SeqConfig config, int count)
+ {
+ if (count < 1) return [];
+
+ var list = new long[count];
+ var add = config.Step * count;
+ var lastId = await UpdateSequenceID(config.Name, config.Step, config.Max, config.Recycle, add);
+ var step = Convert.ToInt64(config.Step);
+ for (var i = count - 1; i > -1; i--)
+ {
+ list[i] = lastId;
+ lastId -= step;
+ }
+
+ return list;
+ }
+
+ ///
+ /// 添加并取得一个缓存的流水号
+ ///
+ ///
+ ///
+ public long AddCachedSeq(SeqConfig config)
+ {
+ if (!_cachedSequence.TryGetValue(config, out var val))
+ {
+ var seq = PeekKey(config).GetAwaiter().GetResult();
+ val = seq;
+ _cachedSequence[config] = val;
+ }
+
+ var step = config.Step;
+ if (config.Recycle)
+ {
+ val = val + step >= config.Max ? val : val + step;
+ }
+ else val += step;
+ _cachedSequence[config] = val;
+ return val;
+ }
+
+ ///
+ /// 移除一个缓存的流水号
+ ///
+ ///
+ public void RemoveCachedSeq(SeqConfig config)
+ {
+ _cachedSequence.Remove(config);
+ }
+
+ ///
+ /// 清空所有缓存的流水号
+ ///
+ public void ClearCache()
+ {
+ _cachedSequence.Clear();
+ }
+
+ ///
+ /// 将缓存的流水号应用至数据库
+ ///
+ public async Task ApplyToDatabaseAsync()
+ {
+ var sql = GenerateCachedSeqSql();
+ await DatabaseHelper.NonQueryAsync(_connectionString, sql);
+ }
+
+ private string GenerateCachedSeqSql()
+ {
+ var sb = new StringBuilder();
+ foreach (var kv in _cachedSequence)
+ {
+ sb.AppendLine($"UPDATE seq SET CurrentVal = {kv.Value} WHERE SeqName = '{kv.Key.Name}';");
+ }
+
+ return sb.ToString();
+ }
+}
\ No newline at end of file
diff --git a/MesETL.App/appsettings.json b/MesETL.App/appsettings.json
index afe2598..40ecb6b 100644
--- a/MesETL.App/appsettings.json
+++ b/MesETL.App/appsettings.json
@@ -8,29 +8,28 @@
}
},
"Input":{
- "InputDir": "D:\\Dump\\NewMockData", // Csv数据输入目录
+ "InputDir": "D:\\Data\\DatabaseDump\\MyDumper-ZST 2024-12-3", // Csv数据输入目录
"UseMock": false, // 使用模拟数据进行测试
"MockCountMultiplier": 1, // 模拟数据量级的乘数
- "TableOrder": ["order", "order_data_parts"], // 按顺序输入的表
+// "TableOrder": ["order_extra"], // 按顺序输入的表
"TableIgnoreList": [] // 忽略输入的表
},
"Transform":{
- "StrictMode": false, // 设为true时如果数据转换发生错误,立刻停止程序
+ "StrictMode": true, // 设为true时如果数据转换发生错误,立刻停止程序
"EnableFilter": true, // 启用数据过滤
"EnableReplacer": true, // 启用数据修改
"EnableReBuilder": true, // 启用数据重建
- "CleanDate": "202301" // 当数据过滤开启时,删除这个时间之前的数据
+ "CleanDate": "202401" // 当数据过滤开启时,删除这个时间之前的数据
},
"Output":{
- "ConnectionString": "Server=127.0.0.1;Port=3306;UserId=root;Password=cfmes123456;", // 要分库,不用加'Database='了
+ "ConnectionString": "Server=127.0.0.1;Port=3306;UserId=root;Password=123456;", // 要分库,不用加'Database='了
"MaxAllowedPacket": 67108864,
"FlushCount": 10000, // 每次提交记录条数
"MaxDatabaseOutputTask" : 4, // 每个数据库最大提交任务数
"TreatJsonAsHex": false, // 将json列作为16进制格式输出(0x前缀),生产库是没有json列的
- "NoOutput": ["order"],
+ "NoOutput": [], // 不输出的表
"ForUpdate":
{
- "order_data_parts": "CompanyID = new.CompanyID"
}
},
"RecordQueue":{
@@ -54,10 +53,11 @@
},
"prod":{
"mesdb_1": 5000,
- "mesdb_2": 10000,
- "mesdb_3": 15000,
- "mesdb_4": 20000,
- "mesdb_5": 2147483647
+ "mesdb_2": 7500,
+ "mesdb_3": 10000,
+ "mesdb_4": 15000,
+ "mesdb_5": 20000,
+ "mesdb_6": 2147483647
}
}
}
diff --git a/MesETL.Shared/Helper/DatabaseHelper.cs b/MesETL.Shared/Helper/DatabaseHelper.cs
index 6abf1d4..f23ede9 100644
--- a/MesETL.Shared/Helper/DatabaseHelper.cs
+++ b/MesETL.Shared/Helper/DatabaseHelper.cs
@@ -5,6 +5,11 @@ namespace MesETL.Shared.Helper;
public static class DatabaseHelper
{
+ ///
+ /// 创建一个MySql连接
+ ///
+ ///
+ ///
public static MySqlConnection CreateConnection(string connStr)
{
var newConnStr = new MySqlConnectionStringBuilder(connStr)
@@ -15,6 +20,13 @@ public static class DatabaseHelper
return new MySqlConnection(newConnStr);
}
+ ///
+ /// 使用语句查询数据库
+ ///
+ ///
+ ///
+ ///
+ ///
public static async Task QueryTableAsync(string connStr, string sql, CancellationToken ct = default)
{
await using var conn = CreateConnection(connStr);
@@ -27,6 +39,13 @@ public static class DatabaseHelper
return ds;
}
+ ///
+ /// 使用语句进行标量查询
+ ///
+ ///
+ ///
+ ///
+ ///
public static async Task