Compare commits
9 Commits
b20c56640f
...
main
Author | SHA1 | Date | |
---|---|---|---|
ab9f7a8253 | |||
c049e092e4 | |||
1f7213a75c | |||
6d281f65c9 | |||
d2d7b21620 | |||
27ea80d359 | |||
8037fc74de | |||
77a3909160 | |||
4986c60416 |
@@ -25,7 +25,7 @@ public static class TableNames
|
|||||||
public const string OrderProcessStep = "order_process_step";
|
public const string OrderProcessStep = "order_process_step";
|
||||||
public const string OrderProcessStepItem = "order_process_step_item";
|
public const string OrderProcessStepItem = "order_process_step_item";
|
||||||
public const string OrderScrapBoard = "order_scrap_board";
|
public const string OrderScrapBoard = "order_scrap_board";
|
||||||
public const string OrderWaveGroup = "order_wave_group";
|
public const string OrderExtraList = "order_extra_list";
|
||||||
public const string ProcessGroup = "process_group";
|
public const string ProcessGroup = "process_group";
|
||||||
public const string ProcessInfo = "process_info";
|
public const string ProcessInfo = "process_info";
|
||||||
public const string ProcessItemExp = "process_item_exp";
|
public const string ProcessItemExp = "process_item_exp";
|
||||||
|
@@ -2,52 +2,12 @@
|
|||||||
|
|
||||||
public class DataRecord : ICloneable
|
public class DataRecord : ICloneable
|
||||||
{
|
{
|
||||||
/// <summary>
|
|
||||||
/// 尝试获取一条记录的某个字段值
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="record"></param>
|
|
||||||
/// <param name="columnName"></param>
|
|
||||||
/// <param name="value"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
/// <exception cref="InvalidOperationException"></exception>
|
|
||||||
public static bool TryGetField(DataRecord record, string columnName, out string value)
|
|
||||||
{
|
|
||||||
value = string.Empty;
|
|
||||||
if (record.Headers is null)
|
|
||||||
throw new InvalidOperationException("Cannot get field when headers of a record have not been set.");
|
|
||||||
var idx = IndexOfIgnoreCase(record.Headers, columnName);
|
|
||||||
if (idx == -1)
|
|
||||||
return false;
|
|
||||||
value = record.Fields[idx];
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 获取一条记录的某个字段值
|
|
||||||
/// TODO: 最好能优化至O(1)
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="record"></param>
|
|
||||||
/// <param name="columnName"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
/// <exception cref="InvalidOperationException"></exception>
|
|
||||||
/// <exception cref="IndexOutOfRangeException"></exception>
|
|
||||||
public static string GetField(DataRecord record, string columnName)
|
|
||||||
{
|
|
||||||
if (record.Headers is null)
|
|
||||||
throw new InvalidOperationException("Headers have not been set.");
|
|
||||||
var idx = IndexOfIgnoreCase(record.Headers, columnName);
|
|
||||||
if (idx is -1)
|
|
||||||
throw new IndexOutOfRangeException(
|
|
||||||
$"Column name '{columnName}' not found in this record, table name '{record.TableName}'.");
|
|
||||||
return record.Fields[idx];
|
|
||||||
}
|
|
||||||
|
|
||||||
private static int IndexOfIgnoreCase(IList<string> list, string value)
|
private static int IndexOfIgnoreCase(IList<string> list, string value)
|
||||||
{
|
{
|
||||||
var idx = -1;
|
var idx = -1;
|
||||||
for (var i = 0; i < list.Count; i++)
|
for (var i = 0; i < list.Count; i++)
|
||||||
{
|
{
|
||||||
if (list[i].Equals(value, StringComparison.OrdinalIgnoreCase))
|
if (list[i].Equals(value, StringComparison.OrdinalIgnoreCase))
|
||||||
{
|
{
|
||||||
idx = i;
|
idx = i;
|
||||||
break;
|
break;
|
||||||
@@ -57,45 +17,54 @@ public class DataRecord : ICloneable
|
|||||||
return idx;
|
return idx;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private readonly List<string> _fields;
|
||||||
|
private readonly List<string> _headers;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 字段列表
|
/// 字段列表
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public IList<string> Fields { get; }
|
public IReadOnlyList<string> Fields => _fields;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 表头列表
|
/// 表头列表
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public IList<string> Headers { get; }
|
public IReadOnlyList<string> Headers => _headers;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 来源表名
|
/// 来源表名
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public string TableName { get; }
|
public string TableName { get; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 需要输出的数据库
|
/// 需要输出的数据库
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public string? Database { get; set; }
|
public string? Database { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 所有字段的总字符数量
|
/// 所有字段的总字符数量
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public long FieldCharCount { get; }
|
public long FieldCharCount { get; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 忽略这个记录,不会被输出
|
/// 忽略这个记录,不会被输出
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public bool Ignore { get; set; }
|
public bool Ignore { get; set; }
|
||||||
|
|
||||||
|
|
||||||
public DataRecord(IEnumerable<string> fields, string tableName, IEnumerable<string> headers, string? database = null)
|
public DataRecord(IEnumerable<string> fields, string tableName, IEnumerable<string> headers,
|
||||||
|
string? database = null)
|
||||||
{
|
{
|
||||||
Fields = fields.ToList();
|
_fields = fields.ToList();
|
||||||
TableName = tableName;
|
TableName = tableName;
|
||||||
Headers = headers.ToList();
|
_headers = headers.ToList();
|
||||||
Database = database;
|
Database = database;
|
||||||
|
|
||||||
if (Fields.Count != Headers.Count)
|
if (_fields.Count != _headers.Count)
|
||||||
throw new ArgumentException(
|
throw new ArgumentException(
|
||||||
$"The number of fields does not match the number of headers. Expected: {Headers.Count} Got: {Fields.Count} Fields: {string.Join(',', Fields)}",
|
$"The number of fields does not match the number of headers. Expected: {_headers.Count} Got: {_fields.Count} Fields: {string.Join(',', _fields)}",
|
||||||
nameof(fields));
|
nameof(fields));
|
||||||
|
|
||||||
FieldCharCount = Fields.Sum(x => (long)x.Length);
|
FieldCharCount = _fields.Sum(x => (long)x.Length);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -104,8 +73,8 @@ public class DataRecord : ICloneable
|
|||||||
/// <param name="index"></param>
|
/// <param name="index"></param>
|
||||||
public string this[int index]
|
public string this[int index]
|
||||||
{
|
{
|
||||||
get => Fields[index];
|
get => _fields[index];
|
||||||
set => Fields[index] = value;
|
set => _fields[index] = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -114,61 +83,101 @@ public class DataRecord : ICloneable
|
|||||||
/// <param name="columnName"></param>
|
/// <param name="columnName"></param>
|
||||||
public string this[string columnName]
|
public string this[string columnName]
|
||||||
{
|
{
|
||||||
get => GetField(this, columnName);
|
get => GetField(columnName);
|
||||||
set => SetField(columnName, value);
|
set => SetField(columnName, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 尝试获取字段值
|
/// 尝试获取某个字段值
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="columnName"></param>
|
/// <param name="columnName"></param>
|
||||||
/// <param name="value"></param>
|
/// <param name="value"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public bool TryGetField(string columnName, out string value) => TryGetField(this, columnName, out value);
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 为一个字段赋值
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="columnName"></param>
|
|
||||||
/// <param name="value"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public bool SetField(string columnName, string value) => SetField(this, columnName, value);
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
///
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="record"></param>
|
|
||||||
/// <param name="columnName"></param>
|
|
||||||
/// <param name="value"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
/// <exception cref="InvalidOperationException"></exception>
|
/// <exception cref="InvalidOperationException"></exception>
|
||||||
/// <exception cref="IndexOutOfRangeException"></exception>
|
public bool TryGetField(string columnName, out string value)
|
||||||
public static bool SetField(DataRecord record, string columnName, string value)
|
|
||||||
{
|
{
|
||||||
// 表头检查
|
value = string.Empty;
|
||||||
if (record.Headers is null)
|
if (_headers is null)
|
||||||
throw new InvalidOperationException("记录的表头尚未设置,无法赋值");
|
throw new InvalidOperationException("Cannot get field when headers of a record have not been set.");
|
||||||
var idx = IndexOfIgnoreCase(record.Headers, columnName);
|
var idx = IndexOfIgnoreCase(_headers, columnName);
|
||||||
if (idx is -1)
|
if (idx == -1)
|
||||||
throw new IndexOutOfRangeException(
|
return false;
|
||||||
$"列 '{columnName}' 不存在于该纪录中,表名 '{record.TableName}");
|
value = _fields[idx];
|
||||||
record.Fields[idx] = value;
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void RemoveField(string columnName)
|
/// <summary>
|
||||||
|
/// 获取一条记录的某个字段值
|
||||||
|
/// TODO: 最好能优化至O(1)
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="columnName"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
/// <exception cref="InvalidOperationException"></exception>
|
||||||
|
/// <exception cref="IndexOutOfRangeException"></exception>
|
||||||
|
public string GetField(string columnName)
|
||||||
{
|
{
|
||||||
var idx = IndexOfIgnoreCase(Headers, columnName);
|
if (_headers is null)
|
||||||
if (idx == -1)
|
throw new InvalidOperationException("记录的表头尚未设置,无法赋值");
|
||||||
throw new InvalidOperationException($"{TableName}: 列名 '{columnName}' 不存在");
|
var idx = IndexOfIgnoreCase(_headers, columnName);
|
||||||
|
if (idx is -1)
|
||||||
Fields.RemoveAt(idx);
|
throw new IndexOutOfRangeException(
|
||||||
Headers.Remove(columnName);
|
$"列 '{columnName}' 不存在于该纪录中,表名 '{TableName}");
|
||||||
|
return _fields[idx];
|
||||||
}
|
}
|
||||||
|
|
||||||
public bool HeaderExists(string columnName) => IndexOfIgnoreCase(Headers, columnName) != -1;
|
/// <summary>
|
||||||
|
/// 为记录的一个字段赋值,如果该字段名不存在则会抛出异常
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="columnName">列名</param>
|
||||||
|
/// <param name="value">值</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
/// <exception cref="InvalidOperationException">该记录的表头尚未初始化,你可能在错误的阶段调用了该方法</exception>
|
||||||
|
/// <exception cref="IndexOutOfRangeException">输入的字段名不存在于该记录中</exception>
|
||||||
|
public void SetField(string columnName, string value)
|
||||||
|
{
|
||||||
|
// 表头检查
|
||||||
|
if (_headers is null)
|
||||||
|
throw new InvalidOperationException("记录的表头尚未设置,无法赋值");
|
||||||
|
var idx = IndexOfIgnoreCase(_headers, columnName);
|
||||||
|
if (idx is -1)
|
||||||
|
throw new IndexOutOfRangeException(
|
||||||
|
$"列 '{columnName}' 不存在于该纪录中,表名 '{TableName}");
|
||||||
|
_fields[idx] = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 在记录中追加一个字段
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="columnName">字段名</param>
|
||||||
|
/// <param name="value">字段值</param>
|
||||||
|
/// <exception cref="InvalidOperationException">记录的表头尚未初始化,你可能在错误的阶段调用了此方法</exception>
|
||||||
|
/// <exception cref="ArgumentException">提供的字段名已存在于该记录中</exception>
|
||||||
|
public void AppendField(string columnName, string value)
|
||||||
|
{
|
||||||
|
if (_headers is null)
|
||||||
|
throw new InvalidOperationException("记录的表头尚未设置,无法赋值");
|
||||||
|
var idx = IndexOfIgnoreCase(_headers, columnName);
|
||||||
|
if (idx is > 0)
|
||||||
|
throw new ArgumentException($"字段名 '{columnName}' 已存在于该记录中,无法重复添加", nameof(columnName));
|
||||||
|
|
||||||
|
_headers.Add(columnName);
|
||||||
|
_fields.Add(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void RemoveField(string columnName)
|
||||||
|
{
|
||||||
|
var idx = IndexOfIgnoreCase(_headers, columnName);
|
||||||
|
if (idx == -1)
|
||||||
|
throw new InvalidOperationException($"{TableName}: 列名 '{columnName}' 不存在");
|
||||||
|
|
||||||
|
_fields.RemoveAt(idx);
|
||||||
|
_headers.Remove(columnName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool HeaderExists(string columnName) => IndexOfIgnoreCase(_headers, columnName) != -1;
|
||||||
|
|
||||||
public object Clone()
|
public object Clone()
|
||||||
{
|
{
|
||||||
return new DataRecord(new List<string>(Fields), TableName, new List<string>(Headers), Database);
|
return new DataRecord(new List<string>(_fields), TableName, new List<string>(_headers), Database);
|
||||||
}
|
}
|
||||||
}
|
}
|
@@ -31,6 +31,8 @@ public class FileInputService : IInputService
|
|||||||
private readonly ProcessContext _context;
|
private readonly ProcessContext _context;
|
||||||
private readonly DataReaderFactory _dataReaderFactory;
|
private readonly DataReaderFactory _dataReaderFactory;
|
||||||
private readonly long _memoryThreshold;
|
private readonly long _memoryThreshold;
|
||||||
|
private readonly bool _dryRun;
|
||||||
|
private readonly int _dryRunCount;
|
||||||
|
|
||||||
public FileInputService(ILogger<FileInputService> logger,
|
public FileInputService(ILogger<FileInputService> logger,
|
||||||
IOptions<DataInputOptions> dataInputOptions,
|
IOptions<DataInputOptions> dataInputOptions,
|
||||||
@@ -45,12 +47,16 @@ public class FileInputService : IInputService
|
|||||||
_producerQueue = producerQueue;
|
_producerQueue = producerQueue;
|
||||||
_dataReaderFactory = dataReaderFactory;
|
_dataReaderFactory = dataReaderFactory;
|
||||||
_memoryThreshold = (long)(configuration.GetValue<double>("MemoryThreshold", 8) * 1024 * 1024 * 1024);
|
_memoryThreshold = (long)(configuration.GetValue<double>("MemoryThreshold", 8) * 1024 * 1024 * 1024);
|
||||||
|
_dryRun = configuration.GetValue("DryRun", false);
|
||||||
|
_dryRunCount = configuration.GetValue("DryRunCount", 100_000);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
var inputDir = _dataInputOptions.Value.InputDir ?? throw new ApplicationException("未配置文件输入目录");
|
var inputDir = _dataInputOptions.Value.InputDir ?? throw new ApplicationException("未配置文件输入目录");
|
||||||
_logger.LogInformation("***** 输入服务已启动,工作目录为:{InputDir} *****", inputDir);
|
_logger.LogInformation("***** 输入服务已启动,工作目录为:{InputDir} *****", inputDir);
|
||||||
|
if (_dryRun)
|
||||||
|
_logger.LogInformation("***** 试运行模式已开启,只读取前 {Count} 行数据 *****", _dryRunCount);
|
||||||
|
|
||||||
var orderedInfo = GetOrderedInputInfo(inputDir);
|
var orderedInfo = GetOrderedInputInfo(inputDir);
|
||||||
|
|
||||||
@@ -59,7 +65,10 @@ public class FileInputService : IInputService
|
|||||||
var file = Path.GetFileName(info.FileName);
|
var file = Path.GetFileName(info.FileName);
|
||||||
_logger.LogInformation("正在读取文件:{FileName}, 对应的数据表:{TableName}", file, info.TableName);
|
_logger.LogInformation("正在读取文件:{FileName}, 对应的数据表:{TableName}", file, info.TableName);
|
||||||
using var source = _dataReaderFactory.CreateReader(info.FileName, info.TableName, info.Headers);
|
using var source = _dataReaderFactory.CreateReader(info.FileName, info.TableName, info.Headers);
|
||||||
var count = 0;
|
var countBuffer = 0;
|
||||||
|
|
||||||
|
if (_dryRun && _context.TableProgress.GetValueOrDefault(info.TableName, (input: 0, output: 0)).input >= _dryRunCount)
|
||||||
|
continue;
|
||||||
|
|
||||||
while (await source.ReadAsync())
|
while (await source.ReadAsync())
|
||||||
{
|
{
|
||||||
@@ -72,17 +81,29 @@ public class FileInputService : IInputService
|
|||||||
}
|
}
|
||||||
var record = source.Current;
|
var record = source.Current;
|
||||||
await _producerQueue.EnqueueAsync(record);
|
await _producerQueue.EnqueueAsync(record);
|
||||||
count++;
|
countBuffer++;
|
||||||
_context.AddInput();
|
_context.AddInput();
|
||||||
|
|
||||||
|
// 避免影响性能,每1000条更新一次表输入进度
|
||||||
|
if (countBuffer >= 1000)
|
||||||
|
{
|
||||||
|
_context.AddTableInput(info.TableName, countBuffer);
|
||||||
|
countBuffer = 0;
|
||||||
|
// 试运行模式下,超出了指定行数则停止输入
|
||||||
|
if (_dryRun && _context.TableProgress[info.TableName].input >= _dryRunCount)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_context.AddTableInput(info.TableName, count);
|
_context.AddTableInput(info.TableName, countBuffer);
|
||||||
_logger.LogInformation("文件 {File} 输入完成", file);
|
_logger.LogInformation("文件 {File} 输入完成", file);
|
||||||
_dataInputOptions.Value.OnTableInputCompleted?.Invoke(info.TableName);
|
_dataInputOptions.Value.OnTableInputCompleted?.Invoke(info.TableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
_context.CompleteInput();
|
_context.CompleteInput();
|
||||||
_logger.LogInformation("***** 输入服务已执行完毕 *****");
|
_logger.LogInformation("***** 输入服务{DryRun}已执行完毕 *****", _dryRun ? " (试运行)" : "");
|
||||||
}
|
}
|
||||||
|
|
||||||
public IEnumerable<FileInputInfo> GetOrderedInputInfo(string dir)
|
public IEnumerable<FileInputInfo> GetOrderedInputInfo(string dir)
|
||||||
|
@@ -77,6 +77,8 @@ public class MainHostedService : BackgroundService
|
|||||||
await Task.WhenAll(inputTask, transformTask, outputTask);
|
await Task.WhenAll(inputTask, transformTask, outputTask);
|
||||||
_stopwatch.Stop();
|
_stopwatch.Stop();
|
||||||
_logger.LogInformation("***** 所有传输任务均已完成 *****");
|
_logger.LogInformation("***** 所有传输任务均已完成 *****");
|
||||||
|
if (_context.HasException)
|
||||||
|
_logger.LogError("***** 传输过程中有错误发生 *****");
|
||||||
_logger.LogInformation("***** 耗时:{Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
|
_logger.LogInformation("***** 耗时:{Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
|
||||||
await Task.Delay(5000, stoppingToken);
|
await Task.Delay(5000, stoppingToken);
|
||||||
|
|
||||||
@@ -114,6 +116,10 @@ public class MainHostedService : BackgroundService
|
|||||||
{
|
{
|
||||||
var connStr = _databaseOptions.Value.ConnectionString
|
var connStr = _databaseOptions.Value.ConnectionString
|
||||||
?? throw new ApplicationException("分库配置中没有配置数据库");
|
?? throw new ApplicationException("分库配置中没有配置数据库");
|
||||||
|
if (enable)
|
||||||
|
_logger.LogWarning("已开启MySQL延迟写入功能并禁用重做日志,请注意数据安全");
|
||||||
|
else _logger.LogInformation("不安全变量已关闭");
|
||||||
|
|
||||||
if (enable)
|
if (enable)
|
||||||
{
|
{
|
||||||
await DatabaseHelper.NonQueryAsync(connStr,
|
await DatabaseHelper.NonQueryAsync(connStr,
|
||||||
@@ -157,9 +163,16 @@ public class MainHostedService : BackgroundService
|
|||||||
private async Task ExportResultAsync()
|
private async Task ExportResultAsync()
|
||||||
{
|
{
|
||||||
var sb = new StringBuilder();
|
var sb = new StringBuilder();
|
||||||
if (_context.HasException)
|
|
||||||
sb.AppendLine("# 程序执行完毕,**但中途发生了异常**");
|
var title = (_config.GetValue("DryRun", false), _context.HasException) switch
|
||||||
else sb.AppendLine("# 程序执行完毕,没有发生错误");
|
{
|
||||||
|
(true, true) => "# 试运行结束,**请注意处理异常**",
|
||||||
|
(true, false) => "# 试运行结束,没有发生异常",
|
||||||
|
(false, true) => "# 程序执行完毕,**但中途发生了异常**",
|
||||||
|
(false, false) => "# 程序执行完毕,没有发生错误"
|
||||||
|
};
|
||||||
|
sb.AppendLine(title);
|
||||||
|
|
||||||
sb.AppendLine("## 处理计数");
|
sb.AppendLine("## 处理计数");
|
||||||
var processCount = new[]
|
var processCount = new[]
|
||||||
{
|
{
|
||||||
|
@@ -56,7 +56,18 @@ public class OutputService : IOutputService
|
|||||||
if (!dbTasks.ContainsKey(db))
|
if (!dbTasks.ContainsKey(db))
|
||||||
{
|
{
|
||||||
dbTasks.Add(db, await dbTaskManager.CreateTaskAsync(
|
dbTasks.Add(db, await dbTaskManager.CreateTaskAsync(
|
||||||
async () => await StartDatabaseWorker(db, queue, ct), ct));
|
async () =>
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await StartDatabaseWorker(db, queue, ct);
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
_logger.LogError(e, "输出线程发生错误");
|
||||||
|
_queuePool.RemoveQueue(db);
|
||||||
|
}
|
||||||
|
}, ct));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -153,6 +164,6 @@ public class OutputService : IOutputService
|
|||||||
_context.AddOutput(value);
|
_context.AddOutput(value);
|
||||||
_context.AddTableOutput(key, value);
|
_context.AddTableOutput(key, value);
|
||||||
}
|
}
|
||||||
_logger.LogTrace("输出任务:刷新了 {Count} 条记录", tableOutput.Values.Sum(i => i));
|
// _logger.LogTrace("输出任务:刷新了 {Count} 条记录", tableOutput.Values.Sum(i => i));
|
||||||
}
|
}
|
||||||
}
|
}
|
@@ -48,26 +48,92 @@ public class TransformService : ITransformService
|
|||||||
|
|
||||||
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("***** 数据转换服务已启动, 当前线程ID: {ThreadId} *****", Environment.CurrentManagedThreadId);
|
_logger.LogInformation("***** 数据转换服务已启动 *****");
|
||||||
|
|
||||||
// var tasks = new List<Task>();
|
|
||||||
// for (int i = 0; i < 4; i++)
|
|
||||||
// {
|
|
||||||
// tasks.Add(Task.Run(TransformWorker, cancellationToken));
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// await Task.WhenAll(tasks);
|
|
||||||
await TransformWorker();
|
|
||||||
|
|
||||||
|
await TransformWorker2();
|
||||||
|
|
||||||
|
_context.CompleteTransform();
|
||||||
_logger.LogInformation("***** 数据转换服务执行完毕 *****");
|
_logger.LogInformation("***** 数据转换服务执行完毕 *****");
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task TransformWorker()
|
public async Task TransformWorker(DataRecordQueue queue)
|
||||||
{
|
{
|
||||||
while (!_context.IsInputCompleted || _producerQueue.Count > 0)
|
while (!_context.IsInputCompleted || _producerQueue.Count > 0)
|
||||||
{
|
{
|
||||||
if (!_producerQueue.TryDequeue(out var record))
|
if (!_producerQueue.TryDequeue(out var record))
|
||||||
{
|
{
|
||||||
|
await Task.Delay(100);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var context = new DataTransformContext(record, _cache, _logger, _services);
|
||||||
|
if (_options.Value.EnableFilter)
|
||||||
|
{
|
||||||
|
// 数据过滤
|
||||||
|
var filter = _options.Value.RecordFilter;
|
||||||
|
if (filter is not null && await filter(context) == false) continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_options.Value.EnableReplacer)
|
||||||
|
{
|
||||||
|
// 数据替换
|
||||||
|
var replacer = _options.Value.RecordModify;
|
||||||
|
if (replacer is not null)
|
||||||
|
{
|
||||||
|
record = await replacer(context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 字段缓存
|
||||||
|
var cacher = _options.Value.RecordCache;
|
||||||
|
if(cacher is not null)
|
||||||
|
await cacher.Invoke(context);
|
||||||
|
|
||||||
|
//计算需要分流的数据库
|
||||||
|
var dbFilter = _options.Value.DatabaseFilter
|
||||||
|
?? throw new ApplicationException("未配置数据库过滤器");
|
||||||
|
record.Database = dbFilter(record);
|
||||||
|
|
||||||
|
if (_options.Value.EnableReBuilder)
|
||||||
|
{
|
||||||
|
//数据重建
|
||||||
|
var addRecords = _options.Value.RecordReBuild?.Invoke(context);
|
||||||
|
if (addRecords is { Count: > 0 })
|
||||||
|
{
|
||||||
|
foreach (var rc in addRecords)
|
||||||
|
{
|
||||||
|
if(dbFilter is not null)
|
||||||
|
rc.Database = dbFilter.Invoke(record);
|
||||||
|
await queue.EnqueueAsync(rc);
|
||||||
|
_context.AddTransform();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await queue.EnqueueAsync(record);
|
||||||
|
_context.AddTransform();
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
_context.AddException(e);
|
||||||
|
var errorRecorder = _errorRecorderFactory.CreateTransform();
|
||||||
|
await errorRecorder.LogErrorRecordAsync(record, e);
|
||||||
|
if (!_options.Value.StrictMode)
|
||||||
|
_logger.LogError(e, "数据转换时发生错误");
|
||||||
|
else throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task TransformWorker2()
|
||||||
|
{
|
||||||
|
while (!_context.IsInputCompleted || _producerQueue.Count > 0)
|
||||||
|
{
|
||||||
|
if (!_producerQueue.TryDequeue(out var record))
|
||||||
|
{
|
||||||
|
await Task.Delay(100);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -1,4 +1,4 @@
|
|||||||
#define USE_TEST_DB // 如果使用测试库运行,则加上USE_TEST_DB预处理器指令
|
// #define FIX_PLAN_ITEM // 测试环境对OrderBlockPlanItem表进行修复时使用
|
||||||
|
|
||||||
using System.Text;
|
using System.Text;
|
||||||
using MesETL.App;
|
using MesETL.App;
|
||||||
@@ -12,6 +12,7 @@ using MesETL.App.Options;
|
|||||||
using MesETL.App.Services.ErrorRecorder;
|
using MesETL.App.Services.ErrorRecorder;
|
||||||
using MesETL.App.Services.Loggers;
|
using MesETL.App.Services.Loggers;
|
||||||
using MesETL.App.Services.Seq;
|
using MesETL.App.Services.Seq;
|
||||||
|
using MesETL.Shared.Compression;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Hosting;
|
using Microsoft.Extensions.Hosting;
|
||||||
@@ -124,15 +125,10 @@ async Task RunProgram()
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// 清理ShardKey < 24010的
|
// 忽略OrderBlockPlanItem
|
||||||
case TableNames.OrderExtra:
|
case TableNames.OrderBlockPlanItem:
|
||||||
{
|
{
|
||||||
var shardKey = int.Parse(record["ShardKey"].AsSpan()[..4]);
|
return false;
|
||||||
if (shardKey < oldestTimeInt_yyMM)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
// 清理(Status != 0 || Deleted = 1) && ID前四位 < 2401的
|
// 清理(Status != 0 || Deleted = 1) && ID前四位 < 2401的
|
||||||
case TableNames.OrderScrapBoard:
|
case TableNames.OrderScrapBoard:
|
||||||
@@ -214,16 +210,15 @@ async Task RunProgram()
|
|||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// 移除ViewFileName列
|
// 将JsonStr列转换为Data列,添加CompressionType列
|
||||||
case TableNames.OrderModule:
|
case TableNames.OrderModuleExtra:
|
||||||
{
|
{
|
||||||
#if USE_TEST_DB
|
record.AppendField("CompressionType", "1");
|
||||||
if (record.HeaderExists("ViewFileName"))
|
record.AppendField("Data",
|
||||||
#endif
|
Convert.ToHexString(DeflateArchive.Compress(Convert.FromHexString(record["JsonStr"]))));
|
||||||
record.RemoveField("ViewFileName");
|
record.RemoveField("JsonStr");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
#if USE_TEST_DB
|
|
||||||
// 删除ID列,让数据库自行递增
|
// 删除ID列,让数据库自行递增
|
||||||
// TODO: 数据表改进,删除ID列或是替换为流水号
|
// TODO: 数据表改进,删除ID列或是替换为流水号
|
||||||
case TableNames.ProcessStepEfficiency:
|
case TableNames.ProcessStepEfficiency:
|
||||||
@@ -236,18 +231,18 @@ async Task RunProgram()
|
|||||||
record.RemoveField("ID");
|
record.RemoveField("ID");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// 测试环境忽略PlaceData列,生产环境会提前将其移除
|
|
||||||
case TableNames.SimplePlanOrder:
|
|
||||||
{
|
|
||||||
record.RemoveField("PlaceData");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case TableNames.SysConfig:
|
case TableNames.SysConfig:
|
||||||
{
|
{
|
||||||
record.RemoveField("Key");
|
record.RemoveField("Key");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
#endif
|
// 移除PlaceData列(如果存在的话,生产库已经删除)
|
||||||
|
case TableNames.SimplePlanOrder:
|
||||||
|
{
|
||||||
|
if(record.HeaderExists("PlaceData"))
|
||||||
|
record.RemoveField("PlaceData");
|
||||||
|
break;
|
||||||
|
}
|
||||||
default: break;
|
default: break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -287,11 +282,49 @@ async Task RunProgram()
|
|||||||
var id = seq.AddCachedSeq(SeqConfig.OrderWaveGroupID);
|
var id = seq.AddCachedSeq(SeqConfig.OrderWaveGroupID);
|
||||||
var orderWaveGroup = new DataRecord(
|
var orderWaveGroup = new DataRecord(
|
||||||
[id.ToString(), ..headers.Select(c => record[c])],
|
[id.ToString(), ..headers.Select(c => record[c])],
|
||||||
TableNames.OrderWaveGroup,
|
TableNames.OrderExtraList,
|
||||||
["ID", "OrderNo", "ShardKey", "Type", "ConfigJson", "CompanyID"]);
|
["ID", "OrderNo", "ShardKey", "Type", "ConfigJson", "CompanyID"]);
|
||||||
resultList.Add(orderWaveGroup);
|
resultList.Add(orderWaveGroup);
|
||||||
return resultList;
|
return resultList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 通过OrderItem重建OrderBlockPlanItem表
|
||||||
|
if (record.TableName == TableNames.OrderItem)
|
||||||
|
{
|
||||||
|
#if FIX_PLAN_ITEM
|
||||||
|
record.Ignore = true;
|
||||||
|
#endif
|
||||||
|
var resultList = new List<DataRecord>();
|
||||||
|
record.TryGetField("ID", out var itemId);
|
||||||
|
record.TryGetField("ShardKey", out var shardKey);
|
||||||
|
record.TryGetField("PlanID", out var planId);
|
||||||
|
record.TryGetField("PackageID", out var packageId);
|
||||||
|
record.TryGetField("CompanyID", out var companyId);
|
||||||
|
if(!int.TryParse(planId, out var pid))
|
||||||
|
throw new ApplicationException($"数据发生异常:OrderItem.PlanID,值: {(string.IsNullOrWhiteSpace(planId) ? "NULL" : planId)}");
|
||||||
|
if (pid > 0)
|
||||||
|
{
|
||||||
|
resultList.Add(new DataRecord([itemId, shardKey, planId, companyId],
|
||||||
|
TableNames.OrderBlockPlanItem,
|
||||||
|
["ItemID", "ShardKey", "PlanID", "CompanyID"]
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if(!int.TryParse(packageId, out var pkid))
|
||||||
|
throw new ApplicationException($"数据发生异常:OrderItem.PackageID,值: {(string.IsNullOrWhiteSpace(packageId) ? "NULL" : packageId)}");
|
||||||
|
if(pkid > 0)
|
||||||
|
{
|
||||||
|
resultList.Add(new DataRecord([itemId, shardKey, packageId, companyId],
|
||||||
|
TableNames.OrderPackageItem,
|
||||||
|
[ "ItemID", "ShardKey", "PackageID", "CompanyID" ]
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
record.RemoveField("PlanID");
|
||||||
|
record.RemoveField("PackageID");
|
||||||
|
|
||||||
|
return resultList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
return ArraySegment<DataRecord>.Empty;
|
return ArraySegment<DataRecord>.Empty;
|
||||||
};
|
};
|
||||||
@@ -348,13 +381,15 @@ async Task RunProgram()
|
|||||||
host.Services.AddLogging(builder =>
|
host.Services.AddLogging(builder =>
|
||||||
{
|
{
|
||||||
builder.ClearProviders();
|
builder.ClearProviders();
|
||||||
builder.AddSerilog(new LoggerConfiguration()
|
var logger = new LoggerConfiguration()
|
||||||
|
.MinimumLevel.Verbose()
|
||||||
.WriteTo.Console()
|
.WriteTo.Console()
|
||||||
.WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"),
|
.WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"),
|
||||||
restrictedToMinimumLevel:LogEventLevel.Error)
|
restrictedToMinimumLevel: LogEventLevel.Error)
|
||||||
// .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
|
// .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
|
||||||
.CreateLogger()
|
.CreateLogger();
|
||||||
);
|
builder.AddSerilog(logger);
|
||||||
|
Log.Logger = logger;
|
||||||
});
|
});
|
||||||
|
|
||||||
host.Services.AddDataSourceFactory();
|
host.Services.AddDataSourceFactory();
|
||||||
|
@@ -108,7 +108,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
|||||||
|
|
||||||
public IEnumerable<string> GetExecutionList(IDictionary<string, IList<DataRecord>> tableRecords, int maxAllowPacket)
|
public IEnumerable<string> GetExecutionList(IDictionary<string, IList<DataRecord>> tableRecords, int maxAllowPacket)
|
||||||
{
|
{
|
||||||
var sb = new StringBuilder("SET AUTOCOMMIT = 1;\n");
|
var sb = new StringBuilder(_options.Value.FlushCount * 128);
|
||||||
var appendCount = 0;
|
var appendCount = 0;
|
||||||
foreach (var (tableName, records) in tableRecords)
|
foreach (var (tableName, records) in tableRecords)
|
||||||
{
|
{
|
||||||
@@ -117,6 +117,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
|||||||
|
|
||||||
var recordIdx = 0;
|
var recordIdx = 0;
|
||||||
StartBuild:
|
StartBuild:
|
||||||
|
sb.AppendLine("SET AUTOCOMMIT = 0;\n");
|
||||||
var noCommas = true;
|
var noCommas = true;
|
||||||
|
|
||||||
// 标准列顺序,插入时的字段需要按照该顺序排列
|
// 标准列顺序,插入时的字段需要按照该顺序排列
|
||||||
@@ -212,7 +213,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
|||||||
|
|
||||||
TryAddForUpdateSuffix(tableName, sb);
|
TryAddForUpdateSuffix(tableName, sb);
|
||||||
sb.Append(';').AppendLine();
|
sb.Append(';').AppendLine();
|
||||||
sb.Append("SET AUTOCOMMIT = 1;");
|
sb.Append("COMMIT;");
|
||||||
yield return sb.ToString();
|
yield return sb.ToString();
|
||||||
sb.Clear();
|
sb.Clear();
|
||||||
goto StartBuild;
|
goto StartBuild;
|
||||||
@@ -255,11 +256,13 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
|||||||
{
|
{
|
||||||
_conn.Close();
|
_conn.Close();
|
||||||
_conn.Dispose();
|
_conn.Dispose();
|
||||||
|
_recordCache.Clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
public async ValueTask DisposeAsync()
|
public async ValueTask DisposeAsync()
|
||||||
{
|
{
|
||||||
await _conn.CloseAsync();
|
await _conn.CloseAsync();
|
||||||
await _conn.DisposeAsync();
|
await _conn.DisposeAsync();
|
||||||
|
_recordCache.Clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
@@ -45,7 +45,7 @@ public class ProcessContext
|
|||||||
|
|
||||||
public void CompleteTransform() => IsTransformCompleted = true;
|
public void CompleteTransform() => IsTransformCompleted = true;
|
||||||
public void CompleteOutput() => IsOutputCompleted = true;
|
public void CompleteOutput() => IsOutputCompleted = true;
|
||||||
public bool AddException(Exception e) => _hasException = true;
|
public bool AddException(Exception e) => _hasException = true; // 没打算存起来,暂时先加个标记
|
||||||
|
|
||||||
public void AddInput() => Interlocked.Increment(ref _inputCount);
|
public void AddInput() => Interlocked.Increment(ref _inputCount);
|
||||||
|
|
||||||
@@ -59,17 +59,16 @@ public class ProcessContext
|
|||||||
|
|
||||||
public void AddTableInput(string table, int count)
|
public void AddTableInput(string table, int count)
|
||||||
{
|
{
|
||||||
if (!_tableProgress.TryAdd(table, (input: count, output: 0)))
|
_tableProgress.AddOrUpdate(table, (input: count, output: 0), (k, tuple) =>
|
||||||
{
|
{
|
||||||
var tuple = _tableProgress[table];
|
|
||||||
tuple.input += count;
|
tuple.input += count;
|
||||||
_tableProgress[table] = tuple;
|
return tuple;
|
||||||
}
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void AddTableOutput(string table, int count)
|
public void AddTableOutput(string table, int count)
|
||||||
{
|
{
|
||||||
_tableProgress.AddOrUpdate(table, (input:0, output:count), (k, tuple) =>
|
_tableProgress.AddOrUpdate(table, (input: 0, output: count), (k, tuple) =>
|
||||||
{
|
{
|
||||||
tuple.output += count;
|
tuple.output += count;
|
||||||
return tuple;
|
return tuple;
|
||||||
|
@@ -1,3 +1,4 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using System.Runtime.CompilerServices;
|
using System.Runtime.CompilerServices;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
@@ -11,7 +12,7 @@ namespace MesETL.App.Services.Seq;
|
|||||||
public class SeqService
|
public class SeqService
|
||||||
{
|
{
|
||||||
private readonly string _connectionString;
|
private readonly string _connectionString;
|
||||||
private readonly Dictionary<SeqConfig, long> _cachedSequence;
|
private readonly ConcurrentDictionary<SeqConfig, long> _cachedSequence;
|
||||||
|
|
||||||
public IReadOnlyDictionary<SeqConfig, long> CachedSequence => _cachedSequence;
|
public IReadOnlyDictionary<SeqConfig, long> CachedSequence => _cachedSequence;
|
||||||
|
|
||||||
@@ -24,7 +25,7 @@ public class SeqService
|
|||||||
};
|
};
|
||||||
_connectionString = builder.ConnectionString;
|
_connectionString = builder.ConnectionString;
|
||||||
|
|
||||||
_cachedSequence = new Dictionary<SeqConfig, long>();
|
_cachedSequence = new ConcurrentDictionary<SeqConfig, long>();
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<long> UpdateSequenceID(string name,int step,long max,bool recycle, int add)
|
private async Task<long> UpdateSequenceID(string name,int step,long max,bool recycle, int add)
|
||||||
@@ -99,9 +100,9 @@ public class SeqService
|
|||||||
/// 移除一个缓存的流水号
|
/// 移除一个缓存的流水号
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="config"></param>
|
/// <param name="config"></param>
|
||||||
public void RemoveCachedSeq(SeqConfig config)
|
public bool RemoveCachedSeq(SeqConfig config)
|
||||||
{
|
{
|
||||||
_cachedSequence.Remove(config);
|
return _cachedSequence.Remove(config, out _);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
using ApplicationException = System.ApplicationException;
|
using Serilog;
|
||||||
|
using ApplicationException = System.ApplicationException;
|
||||||
using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
|
using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
|
||||||
|
|
||||||
namespace MesETL.App.Services;
|
namespace MesETL.App.Services;
|
||||||
@@ -37,6 +38,8 @@ public class TaskManager
|
|||||||
{
|
{
|
||||||
var task = Task.Run(async () =>
|
var task = Task.Run(async () =>
|
||||||
{
|
{
|
||||||
|
// Log.Logger.Verbose("[任务管理器] 新的任务已创建");
|
||||||
|
Interlocked.Increment(ref _runningTaskCount);
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await func();
|
await func();
|
||||||
@@ -45,13 +48,13 @@ public class TaskManager
|
|||||||
catch(Exception ex)
|
catch(Exception ex)
|
||||||
{
|
{
|
||||||
OnException?.Invoke(ex);
|
OnException?.Invoke(ex);
|
||||||
|
Log.Logger.Error(ex, "[任务管理器] 执行任务时出错");
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
Interlocked.Decrement(ref _runningTaskCount);
|
Interlocked.Decrement(ref _runningTaskCount);
|
||||||
}
|
}
|
||||||
}, cancellationToken);
|
}, cancellationToken);
|
||||||
Interlocked.Increment(ref _runningTaskCount);
|
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -59,8 +62,10 @@ public class TaskManager
|
|||||||
{
|
{
|
||||||
var task = Task.Factory.StartNew(async obj => // 性能考虑,这个lambda中不要捕获任何外部变量!
|
var task = Task.Factory.StartNew(async obj => // 性能考虑,这个lambda中不要捕获任何外部变量!
|
||||||
{
|
{
|
||||||
|
// Log.Logger.Verbose("[任务管理器] 新的任务已创建");
|
||||||
if (obj is not Tuple<Func<object?, Task>, object?> tuple)
|
if (obj is not Tuple<Func<object?, Task>, object?> tuple)
|
||||||
throw new ApplicationException("这个异常不该出现");
|
throw new ApplicationException("这个异常不该出现");
|
||||||
|
Interlocked.Increment(ref _runningTaskCount);
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await tuple.Item1(tuple.Item2);
|
await tuple.Item1(tuple.Item2);
|
||||||
@@ -69,13 +74,13 @@ public class TaskManager
|
|||||||
catch(Exception ex)
|
catch(Exception ex)
|
||||||
{
|
{
|
||||||
OnException?.Invoke(ex);
|
OnException?.Invoke(ex);
|
||||||
|
Log.Logger.Error(ex, "[任务管理器] 执行任务时出错");
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
Interlocked.Decrement(ref _runningTaskCount);
|
Interlocked.Decrement(ref _runningTaskCount);
|
||||||
}
|
}
|
||||||
}, Tuple.Create(func, arg), cancellationToken).Unwrap();
|
}, Tuple.Create(func, arg), cancellationToken).Unwrap();
|
||||||
Interlocked.Increment(ref _runningTaskCount);
|
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
}
|
}
|
@@ -1,17 +1,18 @@
|
|||||||
{
|
{
|
||||||
"MemoryThreshold": 8,
|
"MemoryThreshold": 6,
|
||||||
"GCIntervalMilliseconds": -1,
|
"GCIntervalMilliseconds": -1,
|
||||||
"UnsafeVariable": false,
|
"UnsafeVariable": true,
|
||||||
|
"DryRun": true, // 试运行,仅输入每张表的前100000条数据
|
||||||
"Logging": {
|
"Logging": {
|
||||||
"LogLevel": {
|
"LogLevel": {
|
||||||
"Default": "Debug"
|
"Default": "Trace"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"Input":{
|
"Input":{
|
||||||
"InputDir": "D:\\Data\\DatabaseDump\\MyDumper-ZST 2024-12-3", // Csv数据输入目录
|
"InputDir": "D:\\Data\\DatabaseDump\\Prod_Mock_CSV_2024-12-31", // Csv数据输入目录
|
||||||
"UseMock": false, // 使用模拟数据进行测试
|
"UseMock": false, // 使用模拟数据进行测试
|
||||||
"MockCountMultiplier": 1, // 模拟数据量级的乘数
|
"MockCountMultiplier": 1, // 模拟数据量级的乘数
|
||||||
// "TableOrder": ["order_block_plan_item"], // 按顺序输入的表
|
// "TableOrder": ["order_item"], // 按顺序输入的表
|
||||||
"TableIgnoreList": [] // 忽略输入的表
|
"TableIgnoreList": [] // 忽略输入的表
|
||||||
},
|
},
|
||||||
"Transform":{
|
"Transform":{
|
||||||
@@ -26,15 +27,15 @@
|
|||||||
"MaxAllowedPacket": 67108864,
|
"MaxAllowedPacket": 67108864,
|
||||||
"FlushCount": 10000, // 每次提交记录条数
|
"FlushCount": 10000, // 每次提交记录条数
|
||||||
"MaxDatabaseOutputTask" : 4, // 每个数据库最大提交任务数
|
"MaxDatabaseOutputTask" : 4, // 每个数据库最大提交任务数
|
||||||
"TreatJsonAsHex": false, // 将json列作为16进制格式输出(0x前缀),生产库是没有json列的
|
"TreatJsonAsHex": false, // 使Json列输出时带上"0x"前缀
|
||||||
"NoOutput": [], // 不输出的表
|
"NoOutput": [], // 不输出的表
|
||||||
"ForUpdate":
|
"ForUpdate":
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"RecordQueue":{
|
"RecordQueue":{
|
||||||
"ProducerQueueLength": 50000, // 输入队列最大长度
|
"ProducerQueueLength": 20000, // 输入队列最大长度
|
||||||
"ConsumerQueueLength": 10000, // 每个输出队列最大长度
|
"ConsumerQueueLength": 20000, // 每个输出队列最大长度
|
||||||
"MaxByteCount": 3221225472 // 队列最大字节数
|
"MaxByteCount": 3221225472 // 队列最大字节数
|
||||||
},
|
},
|
||||||
"RedisCache": {
|
"RedisCache": {
|
||||||
@@ -58,6 +59,13 @@
|
|||||||
"mesdb_4": 15000,
|
"mesdb_4": 15000,
|
||||||
"mesdb_5": 20000,
|
"mesdb_5": 20000,
|
||||||
"mesdb_6": 2147483647
|
"mesdb_6": 2147483647
|
||||||
|
},
|
||||||
|
"mock_void":{
|
||||||
|
"mesdb_1_void": 5000,
|
||||||
|
"mesdb_2_void": 10000,
|
||||||
|
"mesdb_3_void": 15000,
|
||||||
|
"mesdb_4_void": 20000,
|
||||||
|
"mesdb_5_void": 2147483647
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
39
MesETL.Shared/Compression/DeflateArchive.cs
Normal file
39
MesETL.Shared/Compression/DeflateArchive.cs
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
using System.IO.Compression;
|
||||||
|
|
||||||
|
namespace MesETL.Shared.Compression;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Deflate压缩工具类
|
||||||
|
/// </summary>
|
||||||
|
public static class DeflateArchive
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 解压Deflate
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="input"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static byte[] Decompress(byte[] input)
|
||||||
|
{
|
||||||
|
using var msi = new MemoryStream(input);
|
||||||
|
using var mso = new MemoryStream();
|
||||||
|
using var ds = new DeflateStream(msi, CompressionMode.Decompress);
|
||||||
|
ds.CopyTo(mso);
|
||||||
|
ds.Flush();
|
||||||
|
return mso.ToArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 压缩Deflate
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="input"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static byte[] Compress(byte[] input)
|
||||||
|
{
|
||||||
|
using var msi = new MemoryStream(input);
|
||||||
|
using var mso = new MemoryStream();
|
||||||
|
using var ds = new DeflateStream(mso, CompressionMode.Compress);
|
||||||
|
msi.CopyTo(ds);
|
||||||
|
ds.Flush();
|
||||||
|
return mso.ToArray();
|
||||||
|
}
|
||||||
|
}
|
@@ -1,22 +0,0 @@
|
|||||||
using System.IO.Compression;
|
|
||||||
|
|
||||||
namespace MesETL.Shared.Helper;
|
|
||||||
|
|
||||||
public class CompressHelper
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
///
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="data"></param>
|
|
||||||
/// <returns></returns>
|
|
||||||
public static byte[] CompressDeflate(byte[] data)
|
|
||||||
{
|
|
||||||
using var src = new MemoryStream(data);
|
|
||||||
|
|
||||||
using var outStream = new MemoryStream();
|
|
||||||
using var gzip = new DeflateStream(outStream, CompressionMode.Compress);
|
|
||||||
src.CopyTo(gzip);
|
|
||||||
gzip.Flush();
|
|
||||||
return outStream.ToArray();
|
|
||||||
}
|
|
||||||
}
|
|
@@ -12,7 +12,7 @@ namespace TestProject1;
|
|||||||
public class DatabaseToolBox
|
public class DatabaseToolBox
|
||||||
{
|
{
|
||||||
private readonly ITestOutputHelper _output;
|
private readonly ITestOutputHelper _output;
|
||||||
public const string ConnStr = "Server=127.0.0.1;Port=3306;UserId=root;Password=123456;";
|
public const string ConnStr = "Server=localhost;Port=3306;UserId=root;Password=123456;";
|
||||||
|
|
||||||
public DatabaseToolBox(ITestOutputHelper output)
|
public DatabaseToolBox(ITestOutputHelper output)
|
||||||
{
|
{
|
||||||
@@ -156,9 +156,11 @@ public class DatabaseToolBox
|
|||||||
}
|
}
|
||||||
|
|
||||||
[Theory]
|
[Theory]
|
||||||
[InlineData(["cferp_test_1"])]
|
[InlineData(["mesdb_1"])]
|
||||||
[InlineData(["cferp_test_2"])]
|
[InlineData(["mesdb_2"])]
|
||||||
[InlineData(["cferp_test_3"])]
|
[InlineData(["mesdb_3"])]
|
||||||
|
[InlineData(["mesdb_4"])]
|
||||||
|
[InlineData(["mesdb_5"])]
|
||||||
public async Task DropAllIndex(string database)
|
public async Task DropAllIndex(string database)
|
||||||
{
|
{
|
||||||
var indexes = await GetAllTableIndexes(database);
|
var indexes = await GetAllTableIndexes(database);
|
||||||
@@ -196,4 +198,27 @@ public class DatabaseToolBox
|
|||||||
}
|
}
|
||||||
await DatabaseHelper.NonQueryAsync(ConnStr, sb.ToString());
|
await DatabaseHelper.NonQueryAsync(ConnStr, sb.ToString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Theory]
|
||||||
|
[InlineData("cferp_test_1")]
|
||||||
|
[InlineData("cferp_test_2")]
|
||||||
|
[InlineData("cferp_test_3")]
|
||||||
|
public async Task AnalyzeAllTable(string database)
|
||||||
|
{
|
||||||
|
var tables = await DatabaseHelper.QueryTableAsync(ConnStr,
|
||||||
|
$"""
|
||||||
|
SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{database}';
|
||||||
|
""");
|
||||||
|
var sb = new StringBuilder();
|
||||||
|
sb.AppendLine($"USE `{database}`;");
|
||||||
|
foreach (DataRow row in tables.Tables[0].Rows)
|
||||||
|
{
|
||||||
|
var tableName = row["TABLE_NAME"].ToString();
|
||||||
|
var sql = $"""
|
||||||
|
ANALYZE TABLE `{tableName}`;
|
||||||
|
""";
|
||||||
|
sb.AppendLine(sql);
|
||||||
|
}
|
||||||
|
await DatabaseHelper.NonQueryAsync(ConnStr, sb.ToString());
|
||||||
|
}
|
||||||
}
|
}
|
@@ -1,6 +1,8 @@
|
|||||||
|
using System.Runtime;
|
||||||
using MesETL.App.Const;
|
using MesETL.App.Const;
|
||||||
using MesETL.App.HostedServices.Abstractions;
|
using MesETL.App.HostedServices.Abstractions;
|
||||||
using MesETL.App.Services;
|
using MesETL.App.Services;
|
||||||
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
@@ -13,14 +15,16 @@ public class MockInputService : IInputService
|
|||||||
private readonly ProcessContext _context;
|
private readonly ProcessContext _context;
|
||||||
private readonly IOptions<MockInputOptions> _options;
|
private readonly IOptions<MockInputOptions> _options;
|
||||||
private readonly ILogger _logger;
|
private readonly ILogger _logger;
|
||||||
|
private readonly long _memoryThreshold;
|
||||||
|
|
||||||
public MockInputService([FromKeyedServices(ConstVar.Producer)]DataRecordQueue producerQueue, ProcessContext context, IOptions<MockInputOptions> options,
|
public MockInputService([FromKeyedServices(ConstVar.Producer)]DataRecordQueue producerQueue, ProcessContext context, IOptions<MockInputOptions> options,
|
||||||
ILogger<MockInputService> logger)
|
ILogger<MockInputService> logger, IConfiguration configuration)
|
||||||
{
|
{
|
||||||
_producerQueue = producerQueue;
|
_producerQueue = producerQueue;
|
||||||
_context = context;
|
_context = context;
|
||||||
_options = options;
|
_options = options;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
|
_memoryThreshold = (long)(configuration.GetValue<double>("MemoryThreshold", 8) * 1024 * 1024 * 1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||||
@@ -32,6 +36,13 @@ public class MockInputService : IInputService
|
|||||||
|
|
||||||
for (int i = 0; i < options.Amount; i++)
|
for (int i = 0; i < options.Amount; i++)
|
||||||
{
|
{
|
||||||
|
if (GC.GetTotalMemory(false) > _memoryThreshold)
|
||||||
|
{
|
||||||
|
_logger.LogWarning("内存使用率过高,暂缓输入");
|
||||||
|
GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
|
||||||
|
GC.Collect();
|
||||||
|
await Task.Delay(3000, cancellationToken);
|
||||||
|
}
|
||||||
var ctx = new TableMockContext()
|
var ctx = new TableMockContext()
|
||||||
{
|
{
|
||||||
Index = i,
|
Index = i,
|
||||||
|
File diff suppressed because one or more lines are too long
@@ -27,21 +27,23 @@ async Task RunProgram()
|
|||||||
{
|
{
|
||||||
ThreadPool.SetMaxThreads(200, 200);
|
ThreadPool.SetMaxThreads(200, 200);
|
||||||
var host = Host.CreateApplicationBuilder(args);
|
var host = Host.CreateApplicationBuilder(args);
|
||||||
|
|
||||||
var dbGroup = new Dictionary<string, int>()
|
var inputOptions = host.Configuration.GetRequiredSection("Input").Get<DataInputOptions>()
|
||||||
{
|
?? throw new ApplicationException("缺少Input配置");
|
||||||
{ "mesdb_1", 5000 },
|
|
||||||
{ "mesdb_2", 10000 },
|
var transformOptions = host.Configuration.GetRequiredSection("Transform").Get<DataTransformOptions>()
|
||||||
{ "mesdb_3", 15000 },
|
?? throw new ApplicationException("缺少Transform配置");
|
||||||
{ "mesdb_4", 20000 },
|
|
||||||
{ "mesdb_5", 2147483647 },
|
var outputOptions = host.Configuration.GetRequiredSection("Output").Get<DatabaseOutputOptions>()
|
||||||
};
|
?? throw new ApplicationException("缺少Output配置");
|
||||||
|
|
||||||
|
var tenantDbSection = host.Configuration.GetRequiredSection("TenantDb");
|
||||||
var tenantDbOptions = new TenantDbOptions()
|
var tenantDbOptions = new TenantDbOptions()
|
||||||
{
|
{
|
||||||
TenantKey = "CompanyID",
|
TenantKey = tenantDbSection.GetValue<string>(nameof(TenantDbOptions.TenantKey)) ??
|
||||||
DbGroup = dbGroup,
|
throw new ApplicationException("分库配置缺少分库键TenantKey"),
|
||||||
UseDbGroup = "Prod",
|
UseDbGroup = tenantDbSection.GetValue<string>(nameof(TenantDbOptions.UseDbGroup)) ??
|
||||||
|
throw new ApplicationException("分库配置缺少使用分库组UseDbGroup")
|
||||||
};
|
};
|
||||||
host.Services.Configure<TenantDbOptions>(options =>
|
host.Services.Configure<TenantDbOptions>(options =>
|
||||||
{
|
{
|
||||||
@@ -49,10 +51,13 @@ async Task RunProgram()
|
|||||||
options.DbGroup = tenantDbOptions.DbGroup;
|
options.DbGroup = tenantDbOptions.DbGroup;
|
||||||
options.UseDbGroup = tenantDbOptions.UseDbGroup;
|
options.UseDbGroup = tenantDbOptions.UseDbGroup;
|
||||||
});
|
});
|
||||||
|
tenantDbOptions.DbGroup = tenantDbSection.GetRequiredSection($"DbGroups:{tenantDbOptions.UseDbGroup}")
|
||||||
|
.Get<Dictionary<string, int>>()
|
||||||
|
?? throw new ApplicationException($"分库配置无法解析分库组{tenantDbOptions.UseDbGroup},请检查配置");
|
||||||
|
|
||||||
host.Services.Configure<MockInputOptions>(options =>
|
host.Services.Configure<MockInputOptions>(options =>
|
||||||
{
|
{
|
||||||
const float Multiplexer = 0.01F;
|
const float Multiplexer = 1F;
|
||||||
var SampleSharedKeys = Enumerable.Range(0, 11).Select(i => (23010 + i * 10).ToString()).Concat(
|
var SampleSharedKeys = Enumerable.Range(0, 11).Select(i => (23010 + i * 10).ToString()).Concat(
|
||||||
Enumerable.Range(0, 11).Select(i => (24010 + i * 10).ToString())).ToArray();
|
Enumerable.Range(0, 11).Select(i => (24010 + i * 10).ToString())).ToArray();
|
||||||
options.Rules = new Dictionary<string, TableMockOptions>()
|
options.Rules = new Dictionary<string, TableMockOptions>()
|
||||||
@@ -206,6 +211,11 @@ async Task RunProgram()
|
|||||||
|
|
||||||
host.Services.Configure<DataTransformOptions>(options =>
|
host.Services.Configure<DataTransformOptions>(options =>
|
||||||
{
|
{
|
||||||
|
options.StrictMode = transformOptions.StrictMode;
|
||||||
|
options.EnableFilter = transformOptions.EnableFilter;
|
||||||
|
options.EnableReplacer = transformOptions.EnableReplacer;
|
||||||
|
options.EnableReBuilder = transformOptions.EnableReBuilder;
|
||||||
|
|
||||||
options.DatabaseFilter = record =>
|
options.DatabaseFilter = record =>
|
||||||
{
|
{
|
||||||
var companyId = int.Parse(record[tenantDbOptions.TenantKey]); // 每个实体都应存在CompanyID,否则异常
|
var companyId = int.Parse(record[tenantDbOptions.TenantKey]); // 每个实体都应存在CompanyID,否则异常
|
||||||
@@ -215,11 +225,16 @@ async Task RunProgram()
|
|||||||
|
|
||||||
host.Services.Configure<DatabaseOutputOptions>(options =>
|
host.Services.Configure<DatabaseOutputOptions>(options =>
|
||||||
{
|
{
|
||||||
options.ConnectionString = "Server=192.168.1.246;Port=3333;UserId=root;Password=123456;";
|
options.ConnectionString = outputOptions.ConnectionString;
|
||||||
options.FlushCount = 10000;
|
options.FlushCount = outputOptions.FlushCount;
|
||||||
options.MaxAllowedPacket = 67108864;
|
options.MaxAllowedPacket = outputOptions.MaxAllowedPacket / 2;
|
||||||
options.MaxDatabaseOutputTask = 4;
|
options.MaxDatabaseOutputTask = outputOptions.MaxDatabaseOutputTask;
|
||||||
|
options.TreatJsonAsHex = outputOptions.TreatJsonAsHex;
|
||||||
|
options.NoOutput = outputOptions.NoOutput;
|
||||||
|
options.ForUpdate = outputOptions.ForUpdate;
|
||||||
|
|
||||||
|
// 配置列的类型以便于在输出时区分二进制内容
|
||||||
|
// Prod server
|
||||||
options.ColumnTypeConfig = new Dictionary<string, ColumnType>
|
options.ColumnTypeConfig = new Dictionary<string, ColumnType>
|
||||||
{
|
{
|
||||||
{ "machine.Settings", ColumnType.Text },
|
{ "machine.Settings", ColumnType.Text },
|
||||||
@@ -253,14 +268,15 @@ async Task RunProgram()
|
|||||||
host.Services.AddLogging(builder =>
|
host.Services.AddLogging(builder =>
|
||||||
{
|
{
|
||||||
builder.ClearProviders();
|
builder.ClearProviders();
|
||||||
builder.AddSerilog(new LoggerConfiguration()
|
var logger = new LoggerConfiguration()
|
||||||
.MinimumLevel.Debug()
|
.MinimumLevel.Verbose()
|
||||||
.WriteTo.Console()
|
.WriteTo.Console()
|
||||||
.WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"),
|
.WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"),
|
||||||
restrictedToMinimumLevel: LogEventLevel.Error)
|
restrictedToMinimumLevel: LogEventLevel.Error)
|
||||||
// .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
|
// .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
|
||||||
.CreateLogger()
|
.CreateLogger();
|
||||||
);
|
builder.AddSerilog(logger);
|
||||||
|
Log.Logger = logger;
|
||||||
});
|
});
|
||||||
|
|
||||||
host.Services.AddDataSourceFactory();
|
host.Services.AddDataSourceFactory();
|
||||||
@@ -271,7 +287,7 @@ async Task RunProgram()
|
|||||||
var consLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ConsumerQueueLength");
|
var consLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ConsumerQueueLength");
|
||||||
var maxCharCount = host.Configuration.GetRequiredSection("RecordQueue").GetValue<long>("MaxByteCount") / 2;
|
var maxCharCount = host.Configuration.GetRequiredSection("RecordQueue").GetValue<long>("MaxByteCount") / 2;
|
||||||
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer, new DataRecordQueue(prodLen, maxCharCount));
|
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer, new DataRecordQueue(prodLen, maxCharCount));
|
||||||
host.Services.AddRecordQueuePool(dbGroup.Keys
|
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys
|
||||||
.Select(key => (key: key, queue: new DataRecordQueue(consLen, maxCharCount))).ToArray());
|
.Select(key => (key: key, queue: new DataRecordQueue(consLen, maxCharCount))).ToArray());
|
||||||
// host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
|
// host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
|
||||||
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();
|
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();
|
||||||
|
Reference in New Issue
Block a user