添加配置项

This commit is contained in:
陈梓阳 2024-02-15 16:18:50 +08:00
parent f6af04bfcd
commit f689e1b659
10 changed files with 156 additions and 34 deletions

View File

@ -2,6 +2,7 @@
using MesETL.App.Options;
using MesETL.App.Services;
using MesETL.App.Services.ETL;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@ -32,18 +33,21 @@ public class FileInputService : IInputService
private readonly IOptions<DataInputOptions> _dataInputOptions;
private readonly ProcessContext _context;
private readonly DataReaderFactory _dataReaderFactory;
private readonly long _memoryThreshold;
public FileInputService(ILogger<FileInputService> logger,
IOptions<DataInputOptions> dataInputOptions,
ProcessContext context,
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
DataReaderFactory dataReaderFactory)
DataReaderFactory dataReaderFactory,
IConfiguration configuration)
{
_logger = logger;
_dataInputOptions = dataInputOptions;
_context = context;
_producerQueue = producerQueue;
_dataReaderFactory = dataReaderFactory;
_memoryThreshold = (long)(configuration.GetValue<double>("MemoryThreshold", 8) * 1024 * 1024 * 1024);
}
public async Task ExecuteAsync(CancellationToken cancellationToken)
@ -73,6 +77,12 @@ public class FileInputService : IInputService
while (await source.ReadAsync())
{
if (GC.GetTotalMemory(false) > _memoryThreshold)
{
_logger.LogWarning("内存过高,暂缓输入");
GC.Collect();
await Task.Delay(3000, cancellationToken);
}
var record = source.Current;
await _producerQueue.EnqueueAsync(record);
count++;

View File

@ -61,7 +61,9 @@ public class MainHostedService : BackgroundService
}
_stopwatch = Stopwatch.StartNew();
await SetVariableAsync(); // 开启延迟写入,禁用重做日志 >>> 重做日志处于禁用状态时不要关闭数据库服务!
var enableUnsafeVar = _config.GetValue<bool>("UnsafeVariable", false);
if (enableUnsafeVar)
await SetVariableAsync(); // 开启延迟写入,禁用重做日志 >>> 重做日志处于禁用状态时不要关闭数据库服务!
var monitorTask = Task.Run(async () => await _taskMonitor.Monitor(stoppingToken), stoppingToken);
var inputTask = ExecuteAndCatch(
@ -77,7 +79,8 @@ public class MainHostedService : BackgroundService
_logger.LogInformation("***** ElapseTime: {Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
await Task.Delay(5000, stoppingToken);
await SetVariableAsync(false); // 关闭延迟写入,开启重做日志
if(enableUnsafeVar)
await SetVariableAsync(false); // 关闭延迟写入,开启重做日志
if (!stoppingToken.IsCancellationRequested)
{
await ExportResultAsync();
@ -167,7 +170,7 @@ public class MainHostedService : BackgroundService
sb.AppendLine("\n---\n");
sb.AppendLine("## Table Output Progress");
var tableOutputProgress = _context.TableProgress.Select(pair =>
new { Table = pair.Key, Count = pair.Value });
new { Table = pair.Key, Count = pair.Value }).OrderBy(s => s.Table);
sb.AppendLine(tableOutputProgress.ToMarkdownTable());
sb.AppendLine("\n---\n");
sb.AppendLine("## Result");

View File

@ -1,4 +1,5 @@
using MesETL.App.Helpers;
using System.Buffers;
using MesETL.App.Helpers;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
@ -65,13 +66,14 @@ public class OutputService : IOutputService
{
_logger.LogInformation("*****开启输出线程,数据库: {db} *****", db);
var taskManager = new TaskManager(_outputOptions.Value.MaxDatabaseOutputTask);
var ignoreOutput = new HashSet<string>(_outputOptions.Value.NoOutput);
var tmp = new List<DataRecord>();
while (!_context.IsTransformCompleted || queue.Count > 0)
{
if (ct.IsCancellationRequested)
break;
if (!queue.TryDequeue(out var record)) continue;
if (!queue.TryDequeue(out var record) || ignoreOutput.Contains(record.TableName)) continue;
var dbName = record.Database ?? throw new ApplicationException("输出的记录缺少数据库名");
if(dbName != db)

View File

@ -2,7 +2,6 @@
using System.Text;
using MesETL.App.Services;
using MesETL.App.Services.Loggers;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
namespace MesETL.App.HostedServices;
@ -16,32 +15,24 @@ public class TaskMonitorService
private readonly ProcessContext _context;
private readonly DataRecordQueue _producerQueue;
private readonly RecordQueuePool _queuePool;
private readonly IConfiguration _configuration;
private string _outputPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Log/progress.txt");
private readonly int _gcInterval;
public TaskMonitorService(ProcessContext context,
[FromKeyedServices(Const.ConstVar.Producer)]
DataRecordQueue producerQueue,
RecordQueuePool queuePool,
IEnumerable<ITaskMonitorLogger> monitorLoggers,
IConfiguration configuration)
IEnumerable<ITaskMonitorLogger> monitorLoggers)
{
_context = context;
_producerQueue = producerQueue;
_queuePool = queuePool;
_monitorLoggers = monitorLoggers;
_configuration = configuration;
_gcInterval = _configuration.GetValue<int>("GCIntervalMilliseconds)");
}
public async Task Monitor(CancellationToken stoppingToken)
{
var sw = Stopwatch.StartNew();
var lastGCTime = sw.ElapsedMilliseconds;
var lastTime = sw.ElapsedMilliseconds;
var lastInputCount = _context.InputCount;
var lastTransformCount = _context.TransformCount;
@ -84,12 +75,6 @@ public class TaskMonitorService
var transformSpeed = (transformCount - lastTransformCount) / elapseTime;
var outputSpeed = (outputCount - lastOutputCount) / elapseTime;
if(_gcInterval > 0 && time - lastGCTime > _gcInterval)
{
GC.Collect();
lastGCTime = time;
}
// _logger.LogInformation(
// "Task monitor: running: {Running}, error: {Error}, completed: {Completed}, canceled: {Canceled}, outputSpeed: {Speed} records/s",
// running, error, completed, canceled, outputSpeed);

View File

@ -4,7 +4,7 @@ public class DatabaseOutputOptions
{
public string? ConnectionString { get; set; }
public int MaxAllowedPacket { get; set; } = 64 * 1024 * 1024;
public int MaxAllowedPacket { get; set; } = 32 * 1024 * 1024;
public int FlushCount { get; set; } = 10000;
@ -12,6 +12,9 @@ public class DatabaseOutputOptions
public bool TreatJsonAsHex { get; set; } = true;
public string[] NoOutput { get; set; } = [];
public Dictionary<string, string>? ForUpdate { get; set; }
/// <summary>
/// 配置导入数据的特殊列
@ -22,4 +25,12 @@ public class DatabaseOutputOptions
{
return ColumnTypeConfig.GetValueOrDefault($"{table}.{column}", ColumnType.UnDefine);
}
public bool TryGetForUpdate(string table, out string? forUpdate)
{
forUpdate = null;
if (ForUpdate is null || !ForUpdate.TryGetValue(table, out forUpdate))
return false;
return true;
}
}

View File

@ -137,12 +137,12 @@ async Task RunProgram()
TableNames.Order,
TableNames.OrderBoxBlock, // 依赖Order.CompanyID
TableNames.OrderDataBlock, // 依赖Order.CompanyID
TableNames.OrderBlockPlan,
TableNames.OrderBlockPlanResult,// 依赖OrderBlockPlan.CompanyID / 删除
TableNames.OrderItem,
TableNames.OrderDataBlock,
TableNames.OrderDataGoods,
TableNames.OrderDataParts,
TableNames.OrderModule,
@ -253,6 +253,13 @@ async Task RunProgram()
return false;
break;
}
// OrderDataBlock删除对应Order.OrderNo不存在的对象
case TableNames.OrderDataBlock:
{
if (!await cache.ExistsAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])))
return false;
break;
}
// OrderBlockPlan删除CreateTime < 202301的
case TableNames.OrderBlockPlan:
{
@ -397,6 +404,13 @@ async Task RunProgram()
ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])),
TableNames.OrderBoxBlock, TableNames.Order, "OrderNo", "无法获取对应的CompanyID"));
break;
// 修正OrderDataBlock.CompanyID
case TableNames.OrderDataBlock:
record["CompanyID"] =
// 获取Order.OrderNo -> CompanyID
ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])),
TableNames.OrderBoxBlock, TableNames.Order, "OrderNo", "无法获取对应的CompanyID");
break;
// OrderModule添加ShardKey列移除ViewFileName列
case TableNames.OrderModule:
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
@ -529,9 +543,11 @@ async Task RunProgram()
{
options.ConnectionString = outputOptions.ConnectionString;
options.FlushCount = outputOptions.FlushCount;
options.MaxAllowedPacket = outputOptions.MaxAllowedPacket;
options.MaxAllowedPacket = outputOptions.MaxAllowedPacket / 2;
options.MaxDatabaseOutputTask = outputOptions.MaxDatabaseOutputTask;
options.TreatJsonAsHex = outputOptions.TreatJsonAsHex;
options.NoOutput = outputOptions.NoOutput;
options.ForUpdate = outputOptions.ForUpdate;
#if USE_TEST_DB
// Test Server

View File

@ -50,9 +50,6 @@ public class DataRecordQueue : IDisposable
public async Task EnqueueAsync(DataRecord record)
{
if (_queue.Count >= _queue.BoundedCapacity)
await Task.Delay(500);
var charCount = record.FieldCharCount;
LongestFieldCharCount = Math.Max(LongestFieldCharCount, charCount);
if(_currentCharCount + charCount > _maxCharCount)

View File

@ -66,7 +66,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
try
{
var excuseList = GetExcuseList(_recordCache, maxAllowPacket).ToList();
var excuseList = GetExcuseList(_recordCache, maxAllowPacket);
foreach (var insertSql in excuseList)
{
cmd.CommandText = insertSql;
@ -106,6 +106,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
public IEnumerable<string> GetExcuseList(IDictionary<string, IList<DataRecord>> tableRecords,int maxAllowPacket)
{
var sb = new StringBuilder("SET AUTOCOMMIT = 1;\n");
var appendCount = 0;
foreach (var (tableName, records) in tableRecords)
{
if (records.Count == 0)
@ -187,6 +188,14 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
// 若字符数量即将大于限制则返回SQL清空StringBuilder保留当前记录的索引值然后转到StartBuild标签重新开始一轮INSERT
if (sb.Length + recordSb.Length + 23 > maxAllowPacket)
{
if (appendCount == 0) // 如果单条记录超出maxAllowedPacket
{
sb.Append(recordSb);
_logger.LogWarning("{Table}表单条数据的SQL超出了配置的MaxAllowedPacket字符数{Count}", tableName,
sb.Length + recordSb.Length + 23);
}
TryAddForUpdateSuffix(tableName, sb);
sb.Append(';').AppendLine();
sb.Append("SET AUTOCOMMIT = 1;");
yield return sb.ToString();
@ -198,8 +207,10 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
sb.Append(',').AppendLine();
noCommas = false;
sb.Append(recordSb); // StringBuilder.Append(StringBuilder)不会分配多余的内存
appendCount++;
}
TryAddForUpdateSuffix(tableName, sb);
sb.Append(';');
sb.Append("COMMIT;");
yield return sb.ToString();
@ -207,6 +218,23 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
}
}
/// <summary>
/// 数据必须是同一张表
/// </summary>
/// <param name="tableName"></param>
/// <param name="sb"></param>
private void TryAddForUpdateSuffix(string tableName, StringBuilder sb)
{
var forUpdate = _options.Value.TryGetForUpdate(tableName, out var forUpdateSql);
if (forUpdate)
{
sb.AppendLine($"""
AS new
ON DUPLICATE KEY UPDATE
{forUpdateSql}
""");
}
}
public void Dispose()
{

View File

@ -1,15 +1,17 @@
{
"MemoryThreshold": 8,
"GCIntervalMilliseconds": -1,
"UnsafeVariable": false,
"Logging": {
"LogLevel": {
"Default": "Debug"
}
},
"Input":{
"InputDir": "D:\\Dump\\MyDumper-ZST 2024-02-05", // Csv
"InputDir": "D:\\Dump\\NewMockData", // Csv
"UseMock": false, // 使
"MockCountMultiplier": 1, //
"TableOrder": [], //
"TableOrder": ["order", "order_data_block"], //
"TableIgnoreList": [] //
},
"Transform":{
@ -24,7 +26,12 @@
"MaxAllowedPacket": 67108864,
"FlushCount": 10000, //
"MaxDatabaseOutputTask" : 4, //
"TreatJsonAsHex": false // json16(0x)json
"TreatJsonAsHex": false, // json16(0x)json
"NoOutput": ["order"],
"ForUpdate":
{
"order_data_block": "CompanyID = new.CompanyID"
}
},
"RecordQueue":{
"ProducerQueueLength": 50000, //

View File

@ -1,6 +1,7 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using MesETL.App.Services.ETL;
using MesETL.Shared.Helper;
using Xunit.Abstractions;
using ZstdSharp;
@ -86,4 +87,66 @@ public class Test
}
}
[Fact]
public void GetResult()
{
var input =
"""
machine: 19303/19061
order: 3416759/3415192
order_block_plan: 2934281/1968850
order_block_plan_item: 0/235927707
order_block_plan_result: 1375479/277667
order_box_block: 23457666/23450841
order_data_block: 513012248/513012248
order_data_goods: 18655270/18655270
order_data_parts: 353139066/353139066
order_item: 955274320/955274320
order_module: 102907480/56935691
order_module_extra: 40044077/40044077
order_module_item: 49209022/49209022
order_package: 12012712/12012712
order_package_item: 0/80605124
order_process: 4045309/2682043
order_process_step: 8343418/5505158
order_process_step_item: 14856509/9787696
order_scrap_board: 136096/136090
process_group: 1577/1543
process_info: 9212/9008
process_item_exp: 30/30
process_schdule_capacity: 42442/42442
process_step_efficiency: 8/8
report_template: 7358/7338
simple_package: 142861/137730
simple_plan_order: 1167004/854699
simple_plan_order: 0/55677
sys_config: 2608/2608
work_calendar: 11/11
work_shift: 73/73
work_time: 77/77
order_process_step_item: 14856509/9790701
order_process_step: 8343418/5506925
order_module: 102907480/56935691
order_process: 4045309/2682043
report_template: 7358/7358
process_info: 9212/9212
process_group: 1577/1577
order_block_plan_result: 1375479/277667
order_box_block: 23457666/23457666
order_block_plan: 2934281/1968850
order: 3416759/3416759
machine: 19303/19303
order_scrap_board: 136096/136096
""";
var arr = input.Split('\n').Select(s =>
{
var x = s.Split(':');
var y = x[1].Split('/').Select(i => long.Parse(i)).ToArray();
return new {TABLE_NAME = x[0], INPUT = y[0], OUTPUT = y[1], FILTER = y[0] - y[1]};
}).OrderBy(s => s.TABLE_NAME);
_output.WriteLine(arr.ToMarkdownTable());
}
}