Compare commits

...

8 Commits

Author SHA1 Message Date
CZY
c6d97fdc86 新增清理规则 2024-02-26 09:26:18 +08:00
CZY
f689e1b659 添加配置项 2024-02-15 16:18:50 +08:00
CZY
f6af04bfcd fix cache error 2024-02-10 17:45:13 +08:00
CZY
571805250b Optimize structure 2024-02-10 17:12:26 +08:00
CZY
aa7041962a add gc interval 2024-02-10 00:05:50 +08:00
CZY
73895fbce4 Update 2024-02-09 23:18:34 +08:00
CZY
913c725fe1 update 2024-02-09 13:41:40 +08:00
CZY
41a1dc8a4f Csv解析性能优化 2024-02-08 22:19:59 +08:00
36 changed files with 703 additions and 109 deletions

View File

@ -0,0 +1,51 @@
using System.Collections.Concurrent;
namespace MesETL.App.Cache;
public class MemoryCache : ICacher
{
private readonly ConcurrentDictionary<string, string> _stringCache = new();
private readonly ConcurrentDictionary<string, Dictionary<string, string>> _hashCache = new();
public static MemoryCache? Instance { get; private set; }
public MemoryCache()
{
Instance = this;
}
public Task<string?> GetStringAsync(string key)
{
return _stringCache.TryGetValue(key, out var value) ? Task.FromResult<string?>(value) : Task.FromResult((string?)null);
}
public Task SetStringAsync(string key, string value)
{
_stringCache[key] = value;
return Task.CompletedTask;
}
public Task<bool> ExistsAsync(string key)
{
return Task.FromResult(_stringCache.ContainsKey(key));
}
public Task SetHashAsync(string key, IReadOnlyDictionary<string, string> hash)
{
_hashCache[key] = hash.ToDictionary(x => x.Key, x => x.Value);
return Task.CompletedTask;
}
public Task<Dictionary<string, string>> GetHashAsync(string key)
{
return Task.FromResult(_hashCache[key]);
}
public void Delete(Func<string,bool> keySelector)
{
foreach (var k in _stringCache.Keys.Where(keySelector))
{
_stringCache.TryRemove(k, out _);
}
}
}

View File

@ -57,6 +57,7 @@ public static class RedisCacheExtensions
{ {
var conn = ConnectionMultiplexer.Connect(options.Configuration var conn = ConnectionMultiplexer.Connect(options.Configuration
?? throw new ApplicationException("未配置Redis连接字符串")); ?? throw new ApplicationException("未配置Redis连接字符串"));
services.AddSingleton(conn);
services.AddSingleton<ICacher>(new RedisCache(conn, options.Database, options.InstanceName)); services.AddSingleton<ICacher>(new RedisCache(conn, options.Database, options.InstanceName));
return services; return services;
} }

View File

@ -44,6 +44,7 @@ public class DataRecord : ICloneable
public IList<string> Headers { get; } public IList<string> Headers { get; }
public string TableName { get; } public string TableName { get; }
public string? Database { get; set; } public string? Database { get; set; }
public long FieldCharCount { get; }
public DataRecord(IEnumerable<string> fields, string tableName, IEnumerable<string> headers, string? database = null) public DataRecord(IEnumerable<string> fields, string tableName, IEnumerable<string> headers, string? database = null)
@ -57,6 +58,8 @@ public class DataRecord : ICloneable
throw new ArgumentException( throw new ArgumentException(
$"The number of fields does not match the number of headers. Expected: {Headers.Count} Got: {Fields.Count} Fields: {string.Join(',', Fields)}", $"The number of fields does not match the number of headers. Expected: {Headers.Count} Got: {Fields.Count} Fields: {string.Join(',', Fields)}",
nameof(fields)); nameof(fields));
FieldCharCount = Fields.Sum(x => (long)x.Length);
} }
public string this[int index] public string this[int index]

View File

@ -2,6 +2,7 @@
using MesETL.App.Options; using MesETL.App.Options;
using MesETL.App.Services; using MesETL.App.Services;
using MesETL.App.Services.ETL; using MesETL.App.Services.ETL;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
@ -32,18 +33,21 @@ public class FileInputService : IInputService
private readonly IOptions<DataInputOptions> _dataInputOptions; private readonly IOptions<DataInputOptions> _dataInputOptions;
private readonly ProcessContext _context; private readonly ProcessContext _context;
private readonly DataReaderFactory _dataReaderFactory; private readonly DataReaderFactory _dataReaderFactory;
private readonly long _memoryThreshold;
public FileInputService(ILogger<FileInputService> logger, public FileInputService(ILogger<FileInputService> logger,
IOptions<DataInputOptions> dataInputOptions, IOptions<DataInputOptions> dataInputOptions,
ProcessContext context, ProcessContext context,
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue, [FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
DataReaderFactory dataReaderFactory) DataReaderFactory dataReaderFactory,
IConfiguration configuration)
{ {
_logger = logger; _logger = logger;
_dataInputOptions = dataInputOptions; _dataInputOptions = dataInputOptions;
_context = context; _context = context;
_producerQueue = producerQueue; _producerQueue = producerQueue;
_dataReaderFactory = dataReaderFactory; _dataReaderFactory = dataReaderFactory;
_memoryThreshold = (long)(configuration.GetValue<double>("MemoryThreshold", 8) * 1024 * 1024 * 1024);
} }
public async Task ExecuteAsync(CancellationToken cancellationToken) public async Task ExecuteAsync(CancellationToken cancellationToken)
@ -69,15 +73,25 @@ public class FileInputService : IInputService
{ {
_logger.LogInformation("Reading file: {FileName}, table: {TableName}", info.FileName, info.TableName); _logger.LogInformation("Reading file: {FileName}, table: {TableName}", info.FileName, info.TableName);
using var source = _dataReaderFactory.CreateReader(info.FileName,info.TableName,info.Headers); using var source = _dataReaderFactory.CreateReader(info.FileName,info.TableName,info.Headers);
var count = 0;
while (await source.ReadAsync()) while (await source.ReadAsync())
{ {
if (GC.GetTotalMemory(false) > _memoryThreshold)
{
_logger.LogWarning("内存过高,暂缓输入");
GC.Collect();
await Task.Delay(3000, cancellationToken);
}
var record = source.Current; var record = source.Current;
_producerQueue.Enqueue(record); await _producerQueue.EnqueueAsync(record);
count++;
_context.AddInput(); _context.AddInput();
} }
_context.AddTableInput(info.TableName, count);
_logger.LogInformation("Input of table: '{TableName}' finished", info.TableName); _logger.LogInformation("Input of table: '{TableName}' finished", info.TableName);
_dataInputOptions.Value.OnTableInputCompleted?.Invoke(info.TableName);
} }
_context.CompleteInput(); _context.CompleteInput();
@ -91,6 +105,7 @@ public class FileInputService : IInputService
private IEnumerable<FileInputInfo> GetFilesInOrder(FileInputInfo[] inputFiles) private IEnumerable<FileInputInfo> GetFilesInOrder(FileInputInfo[] inputFiles)
{ {
var tableOrder = _dataInputOptions.Value.TableOrder; var tableOrder = _dataInputOptions.Value.TableOrder;
var ignoreTable = _dataInputOptions.Value.TableIgnoreList;
if (tableOrder is null or { Length: 0 }) if (tableOrder is null or { Length: 0 })
return inputFiles; return inputFiles;
@ -102,7 +117,7 @@ public class FileInputService : IInputService
{ {
var target = inputFiles.FirstOrDefault(f => var target = inputFiles.FirstOrDefault(f =>
f.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase)); f.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase));
if (target is not null) if (target is not null && !ignoreTable.Contains(target.TableName))
yield return target; yield return target;
} }
} }

View File

@ -5,6 +5,7 @@ using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options; using MesETL.App.Options;
using MesETL.App.Services; using MesETL.App.Services;
using MesETL.App.Services.ErrorRecorder; using MesETL.App.Services.ErrorRecorder;
using MesETL.Shared.Helper;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -60,7 +61,9 @@ public class MainHostedService : BackgroundService
} }
_stopwatch = Stopwatch.StartNew(); _stopwatch = Stopwatch.StartNew();
await SetVariableAsync(); // 开启延迟写入,禁用重做日志 >>> 重做日志处于禁用状态时不要关闭数据库服务! var enableUnsafeVar = _config.GetValue<bool>("UnsafeVariable", false);
if (enableUnsafeVar)
await SetVariableAsync(); // 开启延迟写入,禁用重做日志 >>> 重做日志处于禁用状态时不要关闭数据库服务!
var monitorTask = Task.Run(async () => await _taskMonitor.Monitor(stoppingToken), stoppingToken); var monitorTask = Task.Run(async () => await _taskMonitor.Monitor(stoppingToken), stoppingToken);
var inputTask = ExecuteAndCatch( var inputTask = ExecuteAndCatch(
@ -76,7 +79,8 @@ public class MainHostedService : BackgroundService
_logger.LogInformation("***** ElapseTime: {Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3")); _logger.LogInformation("***** ElapseTime: {Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
await Task.Delay(5000, stoppingToken); await Task.Delay(5000, stoppingToken);
await SetVariableAsync(false); // 关闭延迟写入,开启重做日志 if(enableUnsafeVar)
await SetVariableAsync(false); // 关闭延迟写入,开启重做日志
if (!stoppingToken.IsCancellationRequested) if (!stoppingToken.IsCancellationRequested)
{ {
await ExportResultAsync(); await ExportResultAsync();
@ -166,7 +170,7 @@ public class MainHostedService : BackgroundService
sb.AppendLine("\n---\n"); sb.AppendLine("\n---\n");
sb.AppendLine("## Table Output Progress"); sb.AppendLine("## Table Output Progress");
var tableOutputProgress = _context.TableProgress.Select(pair => var tableOutputProgress = _context.TableProgress.Select(pair =>
new { Table = pair.Key, Count = pair.Value }); new { Table = pair.Key, Count = pair.Value }).OrderBy(s => s.Table);
sb.AppendLine(tableOutputProgress.ToMarkdownTable()); sb.AppendLine(tableOutputProgress.ToMarkdownTable());
sb.AppendLine("\n---\n"); sb.AppendLine("\n---\n");
sb.AppendLine("## Result"); sb.AppendLine("## Result");

View File

@ -1,13 +1,15 @@
using MesETL.App.Helpers; using System.Buffers;
using MesETL.App.Helpers;
using MesETL.App.HostedServices.Abstractions; using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options; using MesETL.App.Options;
using MesETL.App.Services; using MesETL.App.Services;
using MesETL.App.Services.ErrorRecorder; using MesETL.App.Services.ErrorRecorder;
using MesETL.Shared.Helper;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using MySqlConnector; using MySqlConnector;
using MySqlDestination = MesETL.App.Services.ETL.MySqlDestination; using MySqlDestination = MesETL.App.Services.ETL.MySqlDestination;
using TaskExtensions = MesETL.App.Helpers.TaskExtensions; using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
namespace MesETL.App.HostedServices; namespace MesETL.App.HostedServices;
@ -64,13 +66,14 @@ public class OutputService : IOutputService
{ {
_logger.LogInformation("*****开启输出线程,数据库: {db} *****", db); _logger.LogInformation("*****开启输出线程,数据库: {db} *****", db);
var taskManager = new TaskManager(_outputOptions.Value.MaxDatabaseOutputTask); var taskManager = new TaskManager(_outputOptions.Value.MaxDatabaseOutputTask);
var ignoreOutput = new HashSet<string>(_outputOptions.Value.NoOutput);
var tmp = new List<DataRecord>(); var tmp = new List<DataRecord>();
while (!_context.IsTransformCompleted || queue.Count > 0) while (!_context.IsTransformCompleted || queue.Count > 0)
{ {
if (ct.IsCancellationRequested) if (ct.IsCancellationRequested)
break; break;
if (!queue.TryDequeue(out var record)) continue; if (!queue.TryDequeue(out var record) || ignoreOutput.Contains(record.TableName)) continue;
var dbName = record.Database ?? throw new ApplicationException("输出的记录缺少数据库名"); var dbName = record.Database ?? throw new ApplicationException("输出的记录缺少数据库名");
if(dbName != db) if(dbName != db)
@ -136,6 +139,7 @@ public class OutputService : IOutputService
await output.FlushAsync(_outputOptions.Value.MaxAllowedPacket); await output.FlushAsync(_outputOptions.Value.MaxAllowedPacket);
foreach (var (key, value) in tableOutput) foreach (var (key, value) in tableOutput)
{ {
_context.AddOutput(value);
_context.AddTableOutput(key, value); _context.AddTableOutput(key, value);
} }
_logger.LogTrace("Flushed {Count} records", tableOutput.Values.Sum(i => i)); _logger.LogTrace("Flushed {Count} records", tableOutput.Values.Sum(i => i));

View File

@ -1,4 +1,5 @@
using System.Diagnostics; using System.Diagnostics;
using System.Text;
using MesETL.App.Services; using MesETL.App.Services;
using MesETL.App.Services.Loggers; using MesETL.App.Services.Loggers;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
@ -15,6 +16,8 @@ public class TaskMonitorService
private readonly DataRecordQueue _producerQueue; private readonly DataRecordQueue _producerQueue;
private readonly RecordQueuePool _queuePool; private readonly RecordQueuePool _queuePool;
private string _outputPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Log/progress.txt");
public TaskMonitorService(ProcessContext context, public TaskMonitorService(ProcessContext context,
[FromKeyedServices(Const.ConstVar.Producer)] [FromKeyedServices(Const.ConstVar.Producer)]
DataRecordQueue producerQueue, DataRecordQueue producerQueue,
@ -79,31 +82,37 @@ public class TaskMonitorService
{ {
logger.LogStatus("Monitor: Progress status", new Dictionary<string, string> logger.LogStatus("Monitor: Progress status", new Dictionary<string, string>
{ {
{"Input",_context.IsInputCompleted ? "completed" : $"running {inputSpeed:F2} records/s" }, {"Input",_context.IsInputCompleted ? "OK" : $"{inputSpeed:F2}/s" },
{"Transform", _context.IsTransformCompleted ? "completed" : $"running {transformSpeed:F2} records/s" }, {"Transform", _context.IsTransformCompleted ? "OK" : $"{transformSpeed:F2}/s" },
{"Output", _context.IsOutputCompleted ? "completed" : $"running {outputSpeed:F2} records/s" } {"Output", _context.IsOutputCompleted ? "OK" : $"{outputSpeed:F2}/s" },
{"| Input Queue", _producerQueue.Count.ToString() },
{"Output Queue", _queuePool.Queues.Values.Sum(queue => queue.Count).ToString()},
{"Memory", $"{GC.GetTotalMemory(false) / 1024 / 1024} MiB"},
}); });
logger.LogStatus("Monitor: Table output progress", var dict = _context.TableProgress
_context.TableProgress .ToDictionary(kv => kv.Key, kv => $"{kv.Value.input}/{kv.Value.output}");
.ToDictionary(kv => kv.Key, kv => kv.Value.ToString()), logger.LogStatus("Monitor: Table progress", dict, ITaskMonitorLogger.LogLevel.Progress);
ITaskMonitorLogger.LogLevel.Progress); var sb = new StringBuilder("Table Progress: \n");
foreach (var kv in dict)
logger.LogStatus("Monitor: Process count", new Dictionary<string, string>
{ {
{"Input", inputCount.ToString()}, sb.Append(kv.Key).AppendLine(kv.Value);
{"Transform", transformCount.ToString()}, }
{"Output", outputCount.ToString()}
}, ITaskMonitorLogger.LogLevel.Progress);
logger.LogStatus("Monitor: Queue", new Dictionary<string, string> sb.AppendLine($"LongestCharCount: {_producerQueue.LongestFieldCharCount}");
{ await File.WriteAllTextAsync(_outputPath, sb.ToString(), CancellationToken.None);
{"Producer queue records", _producerQueue.Count.ToString() },
{"Output queues", _queuePool.Queues.Count.ToString() },
{"Output queue records", _queuePool.Queues.Values.Sum(queue => queue.Count).ToString()}, // logger.LogStatus("Monitor: Process count", new Dictionary<string, string>
}); // {
// {"Input", inputCount.ToString()},
// {"Transform", transformCount.ToString()},
// {"Output", outputCount.ToString()}
// }, ITaskMonitorLogger.LogLevel.Progress);
} }
await Task.Delay(5000, stoppingToken); await Task.Delay(5000, stoppingToken);
lastTime = time; lastTime = time;

View File

@ -46,9 +46,26 @@ public class TransformService : ITransformService
{ {
_logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId); _logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
// var tasks = new List<Task>();
// for (int i = 0; i < 4; i++)
// {
// tasks.Add(Task.Run(TransformWorker, cancellationToken));
// }
//
// await Task.WhenAll(tasks);
await TransformWorker();
_logger.LogInformation("***** Data transformation service finished *****");
}
public async Task TransformWorker()
{
while (!_context.IsInputCompleted || _producerQueue.Count > 0) while (!_context.IsInputCompleted || _producerQueue.Count > 0)
{ {
if (!_producerQueue.TryDequeue(out var record)) continue; if (!_producerQueue.TryDequeue(out var record))
{
continue;
}
try try
{ {
@ -80,7 +97,7 @@ public class TransformService : ITransformService
?? throw new ApplicationException("未配置数据库过滤器"); ?? throw new ApplicationException("未配置数据库过滤器");
record.Database = dbFilter(record); record.Database = dbFilter(record);
_queuePool[record.Database].Enqueue(record); await _queuePool[record.Database].EnqueueAsync(record);
_context.AddTransform(); _context.AddTransform();
if (_options.Value.EnableReBuilder) if (_options.Value.EnableReBuilder)
@ -93,7 +110,7 @@ public class TransformService : ITransformService
{ {
if(dbFilter is not null) if(dbFilter is not null)
rc.Database =dbFilter.Invoke(record); rc.Database =dbFilter.Invoke(record);
_queuePool[record.Database].Enqueue(rc); await _queuePool[record.Database].EnqueueAsync(rc);
_context.AddTransform(); _context.AddTransform();
} }
} }
@ -110,7 +127,5 @@ public class TransformService : ITransformService
} }
} }
_context.CompleteTransform(); _context.CompleteTransform();
_logger.LogInformation("***** Data transformation service finished *****");
} }
} }

View File

@ -24,7 +24,7 @@ public class VoidOutputService : IOutputService
_logger.LogInformation("***** Void Output Service Started *****"); _logger.LogInformation("***** Void Output Service Started *****");
while (!_context.IsTransformCompleted || _queuePool.Queues.Count > 0) while (!_context.IsTransformCompleted || _queuePool.Queues.Count > 0)
{ {
foreach (var pair in _queuePool.Queues) // 内存优化 foreach (var pair in _queuePool.Queues)
{ {
if (_context.IsTransformCompleted && pair.Value.Count == 0) if (_context.IsTransformCompleted && pair.Value.Count == 0)
{ {

View File

@ -22,7 +22,6 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" /> <PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="MySqlConnector" Version="2.3.3" />
<PackageReference Include="Serilog" Version="3.1.2-dev-02097" /> <PackageReference Include="Serilog" Version="3.1.2-dev-02097" />
<PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" /> <PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" /> <PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
@ -32,4 +31,8 @@
<PackageReference Include="ZstdSharp.Port" Version="0.7.4" /> <PackageReference Include="ZstdSharp.Port" Version="0.7.4" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<ProjectReference Include="..\MesETL.Shared\MesETL.Shared.csproj" />
</ItemGroup>
</Project> </Project>

View File

@ -38,11 +38,15 @@ namespace MesETL.App.Options
public string[]? TableOrder { get; set; } public string[]? TableOrder { get; set; }
public string[] TableIgnoreList { get; set; } = [];
/// <summary> /// <summary>
/// 配置如何从文件名转换为表名和表头 /// 配置如何从文件名转换为表名和表头
/// </summary> /// </summary>
public Func<string, FileInputInfo?>? FileInputMetaBuilder { get; set; } //TODO: 抽离 public Func<string, FileInputInfo?>? FileInputMetaBuilder { get; set; } //TODO: 抽离
public Action<string>? OnTableInputCompleted { get; set; }
#endregion #endregion
} }
} }

View File

@ -4,7 +4,7 @@ public class DatabaseOutputOptions
{ {
public string? ConnectionString { get; set; } public string? ConnectionString { get; set; }
public int MaxAllowedPacket { get; set; } = 64 * 1024 * 1024; public int MaxAllowedPacket { get; set; } = 32 * 1024 * 1024;
public int FlushCount { get; set; } = 10000; public int FlushCount { get; set; } = 10000;
@ -12,6 +12,9 @@ public class DatabaseOutputOptions
public bool TreatJsonAsHex { get; set; } = true; public bool TreatJsonAsHex { get; set; } = true;
public string[] NoOutput { get; set; } = [];
public Dictionary<string, string>? ForUpdate { get; set; }
/// <summary> /// <summary>
/// 配置导入数据的特殊列 /// 配置导入数据的特殊列
@ -22,4 +25,12 @@ public class DatabaseOutputOptions
{ {
return ColumnTypeConfig.GetValueOrDefault($"{table}.{column}", ColumnType.UnDefine); return ColumnTypeConfig.GetValueOrDefault($"{table}.{column}", ColumnType.UnDefine);
} }
public bool TryGetForUpdate(string table, out string? forUpdate)
{
forUpdate = null;
if (ForUpdate is null || !ForUpdate.TryGetValue(table, out forUpdate))
return false;
return true;
}
} }

View File

@ -79,6 +79,7 @@ async Task RunProgram()
options.UseMock = inputOptions.UseMock; options.UseMock = inputOptions.UseMock;
options.TableMockConfig = inputOptions.TableMockConfig; options.TableMockConfig = inputOptions.TableMockConfig;
options.MockCountMultiplier = inputOptions.MockCountMultiplier; options.MockCountMultiplier = inputOptions.MockCountMultiplier;
options.TableIgnoreList = inputOptions.TableIgnoreList;
// 配置文件输入方法 // 配置文件输入方法
options.FileInputMetaBuilder = fileName => options.FileInputMetaBuilder = fileName =>
@ -113,27 +114,41 @@ async Task RunProgram()
return null; return null;
}; };
options.TableOrder = // 配置表输入完成事件,字典清理
options.OnTableInputCompleted = table =>
{
switch (table)
{
case TableNames.OrderBlockPlan:
MemoryCache.Instance?.Delete(s => s.StartsWith(TableNames.Order + '-'));
break;
case TableNames.OrderItem:
MemoryCache.Instance?.Delete(s => s.StartsWith(TableNames.OrderBlockPlan + '-'));
break;
case TableNames.OrderProcessSchedule:
MemoryCache.Instance?.Delete(s => s.StartsWith(TableNames.OrderProcess + '-'));
break;
}
};
options.TableOrder = inputOptions.TableOrder ??
[ [
TableNames.Machine, TableNames.Machine,
TableNames.Order, TableNames.Order,
TableNames.OrderBoxBlock, // 依赖Order.CompanyID TableNames.OrderBoxBlock, // 依赖Order.CompanyID
TableNames.OrderDataBlock, // 依赖Order.CompanyID
TableNames.OrderBlockPlan, TableNames.OrderBlockPlan,
TableNames.OrderBlockPlanResult,// 依赖OrderBlockPlan.CompanyID / 删除 TableNames.OrderBlockPlanResult,// 依赖OrderBlockPlan.CompanyID / 删除
TableNames.OrderItem, TableNames.OrderItem,
TableNames.OrderDataBlock,
TableNames.OrderDataGoods, TableNames.OrderDataGoods,
TableNames.OrderDataParts, TableNames.OrderDataParts,
TableNames.OrderModule, TableNames.OrderModule,
TableNames.OrderModuleExtra, TableNames.OrderModuleExtra,
TableNames.OrderModuleItem, TableNames.OrderModuleItem,
TableNames.OrderPackage, TableNames.OrderPackage,
#if USE_TEST_DB
TableNames.OrderPatchDetail,
#endif
TableNames.OrderProcess, TableNames.OrderProcess,
TableNames.OrderProcessStep, TableNames.OrderProcessStep,
@ -238,14 +253,28 @@ async Task RunProgram()
return false; return false;
break; break;
} }
// OrderBlockPlan删除CreateTime < 202301的Json列合法检查 // OrderDataBlock删除对应Order.OrderNo不存在的对象
case TableNames.OrderDataBlock:
{
if (!await cache.ExistsAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])))
return false;
break;
}
// OrderDataParts删除对应Order.OrderNo不存在的对象
case TableNames.OrderDataParts:
{
if (!await cache.ExistsAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])))
return false;
break;
}
// OrderBlockPlan删除CreateTime < 202301的
case TableNames.OrderBlockPlan: case TableNames.OrderBlockPlan:
{ {
var time = DateTime.Parse(record["CreateTime"].Trim('"','\'')); var time = DateTime.Parse(record["CreateTime"].Trim('"','\''));
if (time < oldestTime) if (time < oldestTime)
return false; return false;
// if (!DumpDataHelper.IsJson(record["OrderNos"])) return false; // if (!DumpDataHelper.IsJson(record["OrderNos"])) return false; //Json列合法检查
break; break;
} }
// OrderBlockPlanResult删除对应order_block_plan.ID不存在的对象 // OrderBlockPlanResult删除对应order_block_plan.ID不存在的对象
@ -255,10 +284,16 @@ async Task RunProgram()
return false; return false;
break; break;
} }
// case TableNames.OrderBlockPlanResult: // 用SaveTime过滤
// {
// if (DateTime.Parse(record["SaveTime"].Trim('"', '\'')) < oldestTime)
// return false;
// break;
// }
// OrderDataGoods Json列合法检查 // OrderDataGoods Json列合法检查
case TableNames.OrderDataGoods: case TableNames.OrderDataGoods:
{ {
if (!DumpDataHelper.IsJson(record["ExtraProp"])) return false; // if (!DumpDataHelper.IsJson(record["ExtraProp"])) return false;
break; break;
} }
// OrderModule删除OrderNo < 202301的 // OrderModule删除OrderNo < 202301的
@ -376,6 +411,20 @@ async Task RunProgram()
ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])), ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])),
TableNames.OrderBoxBlock, TableNames.Order, "OrderNo", "无法获取对应的CompanyID")); TableNames.OrderBoxBlock, TableNames.Order, "OrderNo", "无法获取对应的CompanyID"));
break; break;
// 修正OrderDataBlock.CompanyID
case TableNames.OrderDataBlock:
record["CompanyID"] =
// 获取Order.OrderNo -> CompanyID
ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])),
TableNames.OrderDataBlock, TableNames.Order, "OrderNo", "无法获取对应的CompanyID");
break;
// 修正OrderDataParts.CompanyID:
case TableNames.OrderDataParts:
record["CompanyID"] =
// 获取Order.OrderNo -> CompanyID
ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])),
TableNames.OrderDataParts, TableNames.Order, "OrderNo", "无法获取对应的CompanyID");
break;
// OrderModule添加ShardKey列移除ViewFileName列 // OrderModule添加ShardKey列移除ViewFileName列
case TableNames.OrderModule: case TableNames.OrderModule:
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"])); record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
@ -508,9 +557,11 @@ async Task RunProgram()
{ {
options.ConnectionString = outputOptions.ConnectionString; options.ConnectionString = outputOptions.ConnectionString;
options.FlushCount = outputOptions.FlushCount; options.FlushCount = outputOptions.FlushCount;
options.MaxAllowedPacket = outputOptions.MaxAllowedPacket; options.MaxAllowedPacket = outputOptions.MaxAllowedPacket / 2;
options.MaxDatabaseOutputTask = outputOptions.MaxDatabaseOutputTask; options.MaxDatabaseOutputTask = outputOptions.MaxDatabaseOutputTask;
options.TreatJsonAsHex = outputOptions.TreatJsonAsHex; options.TreatJsonAsHex = outputOptions.TreatJsonAsHex;
options.NoOutput = outputOptions.NoOutput;
options.ForUpdate = outputOptions.ForUpdate;
#if USE_TEST_DB #if USE_TEST_DB
// Test Server // Test Server
@ -595,9 +646,12 @@ async Task RunProgram()
host.Services.AddDataSourceFactory(); host.Services.AddDataSourceFactory();
host.Services.AddErrorRecorderFactory(); host.Services.AddErrorRecorderFactory();
host.Services.AddSingleton<ProcessContext>(); host.Services.AddSingleton<ProcessContext>();
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer, new DataRecordQueue(200_000)); var prodLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ProducerQueueLength");
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(60_000))).ToArray()); var consLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ConsumerQueueLength");
host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>(); var maxCharCount = host.Configuration.GetRequiredSection("RecordQueue").GetValue<long>("MaxByteCount") / 2;
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer, new DataRecordQueue(prodLen, maxCharCount));
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(consLen, maxCharCount))).ToArray());
// host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>(); host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();
host.Services.AddHostedService<MainHostedService>(); host.Services.AddHostedService<MainHostedService>();
@ -605,7 +659,8 @@ async Task RunProgram()
host.Services.AddSingleton<ITransformService, TransformService>(); host.Services.AddSingleton<ITransformService, TransformService>();
host.Services.AddSingleton<IOutputService, OutputService>(); host.Services.AddSingleton<IOutputService, OutputService>();
host.Services.AddSingleton<TaskMonitorService>(); host.Services.AddSingleton<TaskMonitorService>();
host.Services.AddRedisCache(redisOptions); // host.Services.AddRedisCache(redisOptions);
host.Services.AddSingleton<ICacher, MemoryCache>();
var app = host.Build(); var app = host.Build();
await app.RunAsync(); await app.RunAsync();
} }

View File

@ -1,5 +1,6 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis; using System.Diagnostics.CodeAnalysis;
using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
namespace MesETL.App.Services; namespace MesETL.App.Services;
@ -10,26 +11,35 @@ public class DataRecordQueue : IDisposable
{ {
private readonly BlockingCollection<DataRecord> _queue; private readonly BlockingCollection<DataRecord> _queue;
private long _currentCharCount;
private readonly long _maxCharCount = 2_147_483_648; // 4GiB
public int Count => _queue.Count; public int Count => _queue.Count;
public bool IsCompleted => _queue.IsCompleted; public bool IsCompleted => _queue.IsCompleted;
public bool IsAddingCompleted => _queue.IsAddingCompleted; public bool IsAddingCompleted => _queue.IsAddingCompleted;
public long LongestFieldCharCount { get; private set; }
public event Action? OnRecordWrite; public event Action? OnRecordWrite;
public event Action? OnRecordRead; public event Action? OnRecordRead;
public DataRecordQueue() : this(500_000) // 默认容量最大500K public DataRecordQueue() : this(500_000, 2_147_483_648) // 默认容量最大500K
{ {
} }
public DataRecordQueue(int boundedCapacity) public DataRecordQueue(int boundedCapacity, long maxCharCount)
{ {
_queue = new BlockingCollection<DataRecord>(boundedCapacity); _queue = new BlockingCollection<DataRecord>(boundedCapacity);
_maxCharCount = maxCharCount;
} }
public void CompleteAdding() => _queue.CompleteAdding();
public bool TryDequeue([MaybeNullWhen(false)] out DataRecord record) public bool TryDequeue([MaybeNullWhen(false)] out DataRecord record)
{ {
if (_queue.TryTake(out record)) if (_queue.TryTake(out record))
{ {
Interlocked.Add(ref _currentCharCount, -record.FieldCharCount);
OnRecordRead?.Invoke(); OnRecordRead?.Invoke();
return true; return true;
} }
@ -37,13 +47,15 @@ public class DataRecordQueue : IDisposable
return false; return false;
} }
public DataRecord Dequeue() => _queue.Take();
public void CompleteAdding() => _queue.CompleteAdding(); public async Task EnqueueAsync(DataRecord record)
public void Enqueue(DataRecord record)
{ {
var charCount = record.FieldCharCount;
LongestFieldCharCount = Math.Max(LongestFieldCharCount, charCount);
if(_currentCharCount + charCount > _maxCharCount)
await TaskExtensions.WaitUntil(() => _currentCharCount + charCount < _maxCharCount, 50);
_queue.Add(record); _queue.Add(record);
Interlocked.Add(ref _currentCharCount, charCount);
OnRecordWrite?.Invoke(); OnRecordWrite?.Invoke();
} }

View File

@ -11,10 +11,11 @@ public class CsvReader : IDataReader
{ {
protected readonly string? FilePath; protected readonly string? FilePath;
protected readonly Lazy<StreamReader> Reader; protected readonly Lazy<StreamReader> Reader;
private Stream? _stream;
protected readonly ILogger? Logger; protected readonly ILogger? Logger;
protected readonly string TableName; protected readonly string TableName;
public DataRecord Current { get; protected set; } = null!; public DataRecord Current { get; protected set; } = default!;
public string[] Headers { get; } public string[] Headers { get; }
public string Delimiter { get; } public string Delimiter { get; }
public char QuoteChar { get; } public char QuoteChar { get; }
@ -22,15 +23,18 @@ public class CsvReader : IDataReader
public CsvReader(Stream stream, string tableName, string[] headers, string delimiter = ",", char quoteChar = '"', ILogger? logger = null) public CsvReader(Stream stream, string tableName, string[] headers, string delimiter = ",", char quoteChar = '"', ILogger? logger = null)
: this(tableName, headers, delimiter, quoteChar, logger) : this(tableName, headers, delimiter, quoteChar, logger)
{ {
Reader = new Lazy<StreamReader>(() => new StreamReader(stream)); Reader = new Lazy<StreamReader>(() => new StreamReader(stream),false);
} }
public CsvReader(string filePath, string tableName, string[] headers, string delimiter = ",", char quoteChar = '"', ILogger? logger = null) public CsvReader(string filePath, string tableName, string[] headers, string delimiter = ",", char quoteChar = '"', ILogger? logger = null)
: this(tableName, headers, delimiter, quoteChar, logger) : this(tableName, headers, delimiter, quoteChar, logger)
{ {
var fs = File.OpenRead(filePath);
FilePath = filePath; FilePath = filePath;
Reader = new Lazy<StreamReader>(() => new StreamReader(fs)); Reader = new Lazy<StreamReader>(() =>
{
_stream = File.OpenRead(filePath);
return new StreamReader(_stream);
});
} }
private CsvReader(string tableName, string[] headers, string delimiter = ",", char quoteChar = '"', ILogger? logger = null) private CsvReader(string tableName, string[] headers, string delimiter = ",", char quoteChar = '"', ILogger? logger = null)
@ -49,43 +53,44 @@ public class CsvReader : IDataReader
if (string.IsNullOrWhiteSpace(str)) if (string.IsNullOrWhiteSpace(str))
return false; return false;
var fields = ParseRow(str, QuoteChar, Delimiter); var fields = ParseRowFaster(str, QuoteChar, Delimiter[0]);
Current = new DataRecord(fields, TableName, Headers); Current = new DataRecord(fields, TableName, Headers);
return true; return true;
} }
public string[] ParseRow(ReadOnlySpan<char> source, char quoteChar, string delimiter) public static string[] ParseRow(ReadOnlySpan<char> source, char quoteChar, char delimiter)
{ {
var result = new List<string>(); var result = new List<string>();
var index = -1; var index = -1;
var current = new StringBuilder(); var current = new StringBuilder(source.Length);
var hasQuote = false; var hasQuote = false;
var hasSlash = false; var hasSlash = false;
while (index < source.Length - 1) while (index < source.Length - 1)
{ {
index++; index++;
if (hasSlash == false && source[index] == '\\') var currChar = source[index];
if (hasSlash == false && currChar == '\\')
{ {
hasSlash = true; hasSlash = true;
current.Append('\\'); current.Append('\\');
continue; continue;
} }
if (hasSlash == false && source[index] == quoteChar) if (hasSlash == false && currChar == quoteChar)
{ {
hasQuote = !hasQuote; hasQuote = !hasQuote;
current.Append(source[index]); current.Append(currChar);
continue; continue;
} }
if (hasQuote == false && source[index] == delimiter[0]) if (hasQuote == false && currChar == delimiter)
{ {
result.Add(current.ToString()); result.Add(current.ToString());
current.Clear(); current.Clear();
} }
else else
{ {
current.Append(source[index]); current.Append(currChar);
} }
hasSlash = false; hasSlash = false;
@ -95,9 +100,61 @@ public class CsvReader : IDataReader
return result.ToArray(); return result.ToArray();
} }
public static List<string> ParseRowFaster(ReadOnlySpan<char> source, char quoteChar, char delimiter, int columnCount = 10)
{
var result = new List<string>(columnCount);
var index = -1;
var hasQuote = false;
var hasSlash = false;
var start = 0;
var end = 0;
var len = source.Length - 1;
while (index < len)
{
++index;
var currChar = source[index];
if (!hasSlash)
{
if (currChar is '\\')
{
hasSlash = true;
++end;
continue;
}
if (currChar == quoteChar)
{
hasQuote = !hasQuote;
++end;
continue;
}
}
if (!hasQuote && currChar == delimiter)
{
result.Add(source[start..(end)].ToString());
start = end + 1;
++end;
}
else
{
++end;
}
hasSlash = false;
}
result.Add(source[start..end].ToString());
return result;
}
public virtual void Dispose() public virtual void Dispose()
{ {
if(Reader.IsValueCreated) if (Reader.IsValueCreated)
{
Reader.Value.Dispose(); Reader.Value.Dispose();
_stream?.Dispose();
}
} }
} }

View File

@ -3,6 +3,7 @@ using System.Text.RegularExpressions;
using MesETL.App.Const; using MesETL.App.Const;
using MesETL.App.Helpers; using MesETL.App.Helpers;
using MesETL.App.Options; using MesETL.App.Options;
using MesETL.Shared.Helper;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using MySqlConnector; using MySqlConnector;
@ -61,11 +62,11 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
return; return;
var cmd = _conn.CreateCommand(); var cmd = _conn.CreateCommand();
cmd.CommandTimeout = 3 * 60; cmd.CommandTimeout = 0;
try try
{ {
var excuseList = GetExcuseList(_recordCache, maxAllowPacket).ToList(); var excuseList = GetExcuseList(_recordCache, maxAllowPacket);
foreach (var insertSql in excuseList) foreach (var insertSql in excuseList)
{ {
cmd.CommandText = insertSql; cmd.CommandText = insertSql;
@ -105,6 +106,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
public IEnumerable<string> GetExcuseList(IDictionary<string, IList<DataRecord>> tableRecords,int maxAllowPacket) public IEnumerable<string> GetExcuseList(IDictionary<string, IList<DataRecord>> tableRecords,int maxAllowPacket)
{ {
var sb = new StringBuilder("SET AUTOCOMMIT = 1;\n"); var sb = new StringBuilder("SET AUTOCOMMIT = 1;\n");
var appendCount = 0;
foreach (var (tableName, records) in tableRecords) foreach (var (tableName, records) in tableRecords)
{ {
if (records.Count == 0) if (records.Count == 0)
@ -186,6 +188,14 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
// 若字符数量即将大于限制则返回SQL清空StringBuilder保留当前记录的索引值然后转到StartBuild标签重新开始一轮INSERT // 若字符数量即将大于限制则返回SQL清空StringBuilder保留当前记录的索引值然后转到StartBuild标签重新开始一轮INSERT
if (sb.Length + recordSb.Length + 23 > maxAllowPacket) if (sb.Length + recordSb.Length + 23 > maxAllowPacket)
{ {
if (appendCount == 0) // 如果单条记录超出maxAllowedPacket
{
sb.Append(recordSb);
_logger.LogWarning("{Table}表单条数据的SQL超出了配置的MaxAllowedPacket字符数{Count}", tableName,
sb.Length + recordSb.Length + 23);
}
TryAddForUpdateSuffix(tableName, sb);
sb.Append(';').AppendLine(); sb.Append(';').AppendLine();
sb.Append("SET AUTOCOMMIT = 1;"); sb.Append("SET AUTOCOMMIT = 1;");
yield return sb.ToString(); yield return sb.ToString();
@ -197,8 +207,10 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
sb.Append(',').AppendLine(); sb.Append(',').AppendLine();
noCommas = false; noCommas = false;
sb.Append(recordSb); // StringBuilder.Append(StringBuilder)不会分配多余的内存 sb.Append(recordSb); // StringBuilder.Append(StringBuilder)不会分配多余的内存
appendCount++;
} }
TryAddForUpdateSuffix(tableName, sb);
sb.Append(';'); sb.Append(';');
sb.Append("COMMIT;"); sb.Append("COMMIT;");
yield return sb.ToString(); yield return sb.ToString();
@ -206,6 +218,23 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
} }
} }
/// <summary>
/// 数据必须是同一张表
/// </summary>
/// <param name="tableName"></param>
/// <param name="sb"></param>
private void TryAddForUpdateSuffix(string tableName, StringBuilder sb)
{
var forUpdate = _options.Value.TryGetForUpdate(tableName, out var forUpdateSql);
if (forUpdate)
{
sb.AppendLine($"""
AS new
ON DUPLICATE KEY UPDATE
{forUpdateSql}
""");
}
}
public void Dispose() public void Dispose()
{ {

View File

@ -9,20 +9,24 @@ namespace MesETL.App.Services.ETL;
public class ZstReader : CsvReader public class ZstReader : CsvReader
{ {
protected new readonly Lazy<StreamReader> Reader; protected new readonly Lazy<StreamReader> Reader;
private Stream? _stream;
public ZstReader(string filePath, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null) public ZstReader(string filePath, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null)
: base(filePath, tableName, headers, delimiter, quoteChar, logger) : base(filePath, tableName, headers, delimiter, quoteChar, logger)
{ {
var ds = new DecompressionStream(File.OpenRead(filePath)); Reader = new Lazy<StreamReader>(() =>
Reader = new Lazy<StreamReader>(() => new StreamReader(ds)); {
_stream = new DecompressionStream(File.OpenRead(filePath));
return new StreamReader(_stream);
}, false);
} }
public ZstReader(Stream stream, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null) public ZstReader(Stream stream, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null)
: base(stream, tableName, headers, delimiter, quoteChar, logger) : base(stream, tableName, headers, delimiter, quoteChar, logger)
{ {
var ds = new DecompressionStream(stream); var ds = new DecompressionStream(stream);
Reader = new Lazy<StreamReader>(() => new StreamReader(ds)); Reader = new Lazy<StreamReader>(() => new StreamReader(ds), false);
} }
public override async ValueTask<bool> ReadAsync() public override async ValueTask<bool> ReadAsync()
@ -31,7 +35,7 @@ public class ZstReader : CsvReader
if (string.IsNullOrWhiteSpace(str)) if (string.IsNullOrWhiteSpace(str))
return false; return false;
var fields = ParseRow(str, QuoteChar, Delimiter); var fields = ParseRowFaster(str, QuoteChar, Delimiter[0]);
Current = new DataRecord(fields, TableName, Headers); Current = new DataRecord(fields, TableName, Headers);
return true; return true;
} }
@ -39,8 +43,11 @@ public class ZstReader : CsvReader
public override void Dispose() public override void Dispose()
{ {
base.Dispose(); base.Dispose();
if(Reader.IsValueCreated) if (Reader.IsValueCreated)
{
Reader.Value.Dispose(); Reader.Value.Dispose();
_stream?.Dispose();
}
} }
} }

View File

@ -1,4 +1,5 @@
using MesETL.App.Helpers; using MesETL.App.Helpers;
using MesETL.Shared.Helper;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
namespace MesETL.App.Services.ErrorRecorder; namespace MesETL.App.Services.ErrorRecorder;

View File

@ -1,4 +1,5 @@
using MesETL.App.Cache; using System.Text;
using MesETL.App.Cache;
namespace MesETL.App.Services.Loggers; namespace MesETL.App.Services.Loggers;

View File

@ -11,7 +11,7 @@ public class ProcessContext
private long _inputCount; private long _inputCount;
private long _transformCount; private long _transformCount;
private long _outputCount; private long _outputCount;
private readonly ConcurrentDictionary<string, long> _tableProgress = new(); private readonly ConcurrentDictionary<string, (long input, long output)> _tableProgress = new();
public bool HasException => _hasException; public bool HasException => _hasException;
public bool IsInputCompleted { get; private set; } public bool IsInputCompleted { get; private set; }
public bool IsTransformCompleted { get; private set; } public bool IsTransformCompleted { get; private set; }
@ -37,7 +37,7 @@ public class ProcessContext
// TableName -> Count // TableName -> Count
public IReadOnlyDictionary<string, long> TableProgress => _tableProgress; public IReadOnlyDictionary<string, (long input, long output)> TableProgress => _tableProgress;
public void CompleteInput() => IsInputCompleted = true; public void CompleteInput() => IsInputCompleted = true;
@ -55,16 +55,21 @@ public class ProcessContext
public void AddOutput() => Interlocked.Increment(ref _outputCount); public void AddOutput() => Interlocked.Increment(ref _outputCount);
public void AddOutput(int count) => Interlocked.Add(ref _outputCount, count); public void AddOutput(int count) => Interlocked.Add(ref _outputCount, count);
public void AddTableOutput(string table, int count) public void AddTableInput(string table, int count)
{ {
_tableProgress.AddOrUpdate(table, count, (k, v) => v + count); _tableProgress.AddOrUpdate(table, (input:count, output:0), (k, tuple) =>
AddOutput(count); {
tuple.input += count;
return tuple;
});
} }
public long GetTableOutput(string table) public void AddTableOutput(string table, int count)
{ {
if(!_tableProgress.TryGetValue(table, out var count)) _tableProgress.AddOrUpdate(table, (input:0, output:count), (k, tuple) =>
throw new ApplicationException($"未找到表{table}输出记录"); {
return count; tuple.output += count;
return tuple;
});
} }
} }

View File

@ -9,7 +9,8 @@ public class RecordQueuePool
public IReadOnlyDictionary<string, DataRecordQueue> Queues => _queues; public IReadOnlyDictionary<string, DataRecordQueue> Queues => _queues;
public void AddQueue(string key, int boundedCapacity = 200_0000) => AddQueue(key, new DataRecordQueue(boundedCapacity)); public void AddQueue(string key, int boundedCapacity = 200_0000, long maxCharCount = 2_147_483_648)
=> AddQueue(key, new DataRecordQueue(boundedCapacity, maxCharCount));
public void AddQueue(string key, DataRecordQueue queue) public void AddQueue(string key, DataRecordQueue queue)
{ {

View File

@ -1,5 +1,5 @@
using ApplicationException = System.ApplicationException; using ApplicationException = System.ApplicationException;
using TaskExtensions = MesETL.App.Helpers.TaskExtensions; using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
namespace MesETL.App.Services; namespace MesETL.App.Services;

View File

@ -1,13 +1,18 @@
{ {
"MemoryThreshold": 8,
"GCIntervalMilliseconds": -1,
"UnsafeVariable": false,
"Logging": { "Logging": {
"LogLevel": { "LogLevel": {
"Default": "Debug" "Default": "Debug"
} }
}, },
"Input":{ "Input":{
"InputDir": "D:\\Dump\\MyDumper-ZST 2024-02-05", // Csv "InputDir": "D:\\Dump\\NewMockData", // Csv
"UseMock": false, // 使 "UseMock": false, // 使
"MockCountMultiplier": 1 // "MockCountMultiplier": 1, //
"TableOrder": ["order", "order_data_parts"], //
"TableIgnoreList": [] //
}, },
"Transform":{ "Transform":{
"StrictMode": false, // true "StrictMode": false, // true
@ -21,7 +26,17 @@
"MaxAllowedPacket": 67108864, "MaxAllowedPacket": 67108864,
"FlushCount": 10000, // "FlushCount": 10000, //
"MaxDatabaseOutputTask" : 4, // "MaxDatabaseOutputTask" : 4, //
"TreatJsonAsHex": false // json16(0x)json "TreatJsonAsHex": false, // json16(0x)json
"NoOutput": ["order"],
"ForUpdate":
{
"order_data_parts": "CompanyID = new.CompanyID"
}
},
"RecordQueue":{
"ProducerQueueLength": 50000, //
"ConsumerQueueLength": 10000, //
"MaxByteCount": 3221225472 //
}, },
"RedisCache": { "RedisCache": {
"Configuration": "192.168.1.246:6380", "Configuration": "192.168.1.246:6380",

View File

@ -1,7 +1,7 @@
using System.Data; using System.Data;
using MySqlConnector; using MySqlConnector;
namespace MesETL.App.Helpers; namespace MesETL.Shared.Helper;
public static class DatabaseHelper public static class DatabaseHelper
{ {
@ -15,11 +15,11 @@ public static class DatabaseHelper
return new MySqlConnection(newConnStr); return new MySqlConnection(newConnStr);
} }
public static async Task<DataSet> QueryTableAsync(string connStr, string sql) public static async Task<DataSet> QueryTableAsync(string connStr, string sql, CancellationToken ct = default)
{ {
await using var conn = CreateConnection(connStr); await using var conn = CreateConnection(connStr);
if(conn.State is not ConnectionState.Open) if(conn.State is not ConnectionState.Open)
await conn.OpenAsync(); await conn.OpenAsync(ct);
await using var cmd = conn.CreateCommand(); await using var cmd = conn.CreateCommand();
cmd.CommandText = sql; cmd.CommandText = sql;
var ds = new DataSet(); var ds = new DataSet();
@ -27,24 +27,24 @@ public static class DatabaseHelper
return ds; return ds;
} }
public static async Task<object?> QueryScalarAsync(string connStr, string sql) public static async Task<object?> QueryScalarAsync(string connStr, string sql, CancellationToken ct = default)
{ {
await using var conn = CreateConnection(connStr); await using var conn = CreateConnection(connStr);
if(conn.State is not ConnectionState.Open) if(conn.State is not ConnectionState.Open)
await conn.OpenAsync(); await conn.OpenAsync(ct);
await using var cmd = conn.CreateCommand(); await using var cmd = conn.CreateCommand();
cmd.CommandText = sql; cmd.CommandText = sql;
return await cmd.ExecuteScalarAsync(); return await cmd.ExecuteScalarAsync(ct);
} }
public static async Task<int> NonQueryAsync(string connStr, string sql) public static async Task<int> NonQueryAsync(string connStr, string sql, CancellationToken ct = default)
{ {
await using var conn = CreateConnection(connStr); await using var conn = CreateConnection(connStr);
if(conn.State is not ConnectionState.Open) if(conn.State is not ConnectionState.Open)
await conn.OpenAsync(); await conn.OpenAsync(ct);
await using var cmd = conn.CreateCommand(); await using var cmd = conn.CreateCommand();
cmd.CommandText = sql; cmd.CommandText = sql;
return await cmd.ExecuteNonQueryAsync(); return await cmd.ExecuteNonQueryAsync(ct);
} }
public static async Task<int> TransactionAsync(string connStr, string sql, params MySqlParameter[] parameters) public static async Task<int> TransactionAsync(string connStr, string sql, params MySqlParameter[] parameters)

View File

@ -1,4 +1,4 @@
namespace MesETL.App.Helpers; namespace MesETL.Shared.Helper;
public static class DictionaryExtensions public static class DictionaryExtensions
{ {

View File

@ -1,7 +1,7 @@
using System.Diagnostics.CodeAnalysis; using System.Diagnostics.CodeAnalysis;
using System.Reflection; using System.Reflection;
namespace MesETL.App.Helpers; namespace MesETL.Shared.Helper;
#nullable disable #nullable disable
public static class EnumerableExtensions public static class EnumerableExtensions
{ {

View File

@ -1,7 +1,7 @@
using System.Globalization; using System.Globalization;
using System.Text; using System.Text;
namespace MesETL.App.Helpers; namespace MesETL.Shared.Helper;
public static class StringExtensions public static class StringExtensions
{ {

View File

@ -1,4 +1,4 @@
namespace MesETL.App.Helpers; namespace MesETL.Shared.Helper;
public static class TaskExtensions public static class TaskExtensions
{ {

View File

@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MySqlConnector" Version="2.3.5" />
</ItemGroup>
</Project>

View File

@ -1,5 +1,6 @@
using System.Data; using System.Data;
using MesETL.App.Helpers; using MesETL.App.Helpers;
using MesETL.Shared.Helper;
using MySqlConnector; using MySqlConnector;
using Xunit.Abstractions; using Xunit.Abstractions;

View File

@ -1,6 +1,7 @@
using System.Data; using System.Data;
using System.Text; using System.Text;
using MesETL.App.Helpers; using MesETL.App.Helpers;
using MesETL.Shared.Helper;
using MySqlConnector; using MySqlConnector;
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
@ -107,6 +108,24 @@ public class DatabaseToolBox
}).ToArray(); }).ToArray();
} }
[Theory]
[InlineData(["mesdb_1"])]
[InlineData(["mesdb_2"])]
[InlineData(["mesdb_3"])]
[InlineData(["mesdb_4"])]
[InlineData(["mesdb_5"])]
public async Task ShowIndex(string database)
{
var indexes = await GetAllTableIndexes(database);
var sb = new StringBuilder();
foreach (var (tableName, indexName, isUnique, columnName, tableIndexType) in indexes!)
{
sb.AppendLine($"Drop {(isUnique ? "UNIQUE" : string.Empty)} INDEX `{indexName}` ON `{database}`.`{tableName}`;");
}
_output.WriteLine(sb.ToString());
}
[Theory] [Theory]
[InlineData(["cferp_test_1", "D:/Indexes_cferp_test_1.json"])] [InlineData(["cferp_test_1", "D:/Indexes_cferp_test_1.json"])]
[InlineData(["cferp_test_2", "D:/Indexes_cferp_test_2.json"])] [InlineData(["cferp_test_2", "D:/Indexes_cferp_test_2.json"])]

View File

@ -1,4 +1,5 @@
using MesETL.App.Helpers; using MesETL.App.Helpers;
using MesETL.Shared.Helper;
namespace TestProject1; namespace TestProject1;

152
MesETL.Test/Test.cs Normal file
View File

@ -0,0 +1,152 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using MesETL.App.Services.ETL;
using MesETL.Shared.Helper;
using Xunit.Abstractions;
using ZstdSharp;
namespace TestProject1;
public class Test
{
private readonly ITestOutputHelper _output;
public Test(ITestOutputHelper output)
{
_output = output;
}
[Theory]
[InlineData([@"D:\Dump\NewMockData2\cferp.order_box_block.00000.dat.zst"])]
public async Task ZstdDecompressTest(string inputFile)
{
var count = 0;
var flag = true;
var sw = Stopwatch.StartNew();
var reader = new StreamReader(new DecompressionStream(File.OpenRead(inputFile)));
var monitor = Task.Run(async () =>
{
var lastElapse = sw.ElapsedMilliseconds;
var lastCount = 0;
while (flag)
{
await Task.Delay(2000);
_output.WriteLine($"speed: {(count - lastCount) / ((sw.ElapsedMilliseconds - lastElapse) / 1000f)}");
lastElapse = sw.ElapsedMilliseconds;
lastCount = count;
}
});
while (!reader.EndOfStream)
{
var str = await reader.ReadLineAsync();
char a;
// foreach (var c in str)
// {
// a = c;
// }
CsvReader.ParseRowFaster(str, '"', ',');
count++;
}
flag = false;
monitor.Wait();
}
public static IEnumerable<object[]> ParseRowData()
{
yield return
[@"20220104020855,""2022-01-04 10:06:46"",1455,""0001-01-01 00:00:00"",""1"",0,""2"",""0"",\N,""0"",22010"];
yield return
[@"20220104020858,""2022-01-04 15:08:22"",1455,""0001-01-01 00:00:00"",""1"",838,""2"",""0"",""5"",""0"",22010"];
yield return
[@"5586326,20220104020855,220105981029,""1"",482278,482279,3768774,0,0,""1.000"",1455,22010"];
yield return
[@"130658,""PD220104002302"",3,4616,""2022-01-04 15:10:40"",1443,""2022-01-04 15:10:40"",""2022-01-04 15:10:51"",0,"""",0,1455,""0001-01-01 00:00:00"",1,5B32303232303130343032303835385D,E590B8E5A1912D2DE590B8E5A1912D2D31382D2D323030302A3630302D2D3130E789872D2D352E3936333B6361696C69616F2D2D79616E73652D2D392D2D323031302A313137342D2D31E789872D2D322E3336,""0"",0"];
}
[Theory]
[MemberData(nameof(ParseRowData))]
public void ParseRowFasterTest(string row)
{
var fields = CsvReader.ParseRowFaster(row, '"', ',');
_output.WriteLine(string.Join(',', fields));
}
[Fact]
public void DictMemoryTest()
{
var dict = new ConcurrentDictionary<string, string>();
for (int i = 0; i < 3000000; i++)
{
dict.AddOrUpdate(Guid.NewGuid().ToString(), Random.Shared.NextInt64(1000000000L, 9999999999L).ToString(), (_, __) => Random.Shared.NextInt64(1000000000L, 9999999999L).ToString());
}
while (true)
{
}
}
[Fact]
public void GetResult()
{
var input =
"""
machine: 19303/19061
order: 3416759/3415192
order_block_plan: 2934281/1968850
order_block_plan_item: 0/235927707
order_block_plan_result: 1375479/277667
order_box_block: 23457666/23450841
order_data_block: 513012248/513012248
order_data_goods: 18655270/18655270
order_data_parts: 353139066/353139066
order_item: 955274320/955274320
order_module: 102907480/56935691
order_module_extra: 40044077/40044077
order_module_item: 49209022/49209022
order_package: 12012712/12012712
order_package_item: 0/80605124
order_process: 4045309/2682043
order_process_step: 8343418/5505158
order_process_step_item: 14856509/9787696
order_scrap_board: 136096/136090
process_group: 1577/1543
process_info: 9212/9008
process_item_exp: 30/30
process_schdule_capacity: 42442/42442
process_step_efficiency: 8/8
report_template: 7358/7338
simple_package: 142861/137730
simple_plan_order: 1167004/854699
simple_plan_order: 0/55677
sys_config: 2608/2608
work_calendar: 11/11
work_shift: 73/73
work_time: 77/77
order_process_step_item: 14856509/9790701
order_process_step: 8343418/5506925
order_module: 102907480/56935691
order_process: 4045309/2682043
report_template: 7358/7358
process_info: 9212/9212
process_group: 1577/1577
order_block_plan_result: 1375479/277667
order_box_block: 23457666/23457666
order_block_plan: 2934281/1968850
order: 3416759/3416759
machine: 19303/19303
order_scrap_board: 136096/136096
""";
var arr = input.Split('\n').Select(s =>
{
var x = s.Split(':');
var y = x[1].Split('/').Select(i => long.Parse(i)).ToArray();
return new {TABLE_NAME = x[0], INPUT = y[0], OUTPUT = y[1], FILTER = y[0] - y[1]};
}).OrderBy(s => s.TABLE_NAME);
_output.WriteLine(arr.ToMarkdownTable());
}
}

View File

@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\MesETL.Shared\MesETL.Shared.csproj" />
</ItemGroup>
</Project>

69
MesETL.Tool/Program.cs Normal file
View File

@ -0,0 +1,69 @@
using System.Collections.Concurrent;
using System.Data;
using MesETL.Shared.Helper;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
var host = Host.CreateApplicationBuilder(args);
host.Configuration.AddCommandLine(args, new Dictionary<string, string>
{
{ "-s", "ConnectionString" },
{ "--ConnectionString", "ConnectionString" },
{ "-B", "Databases" },
{ "--Databases", "Databases" },
{ "-a", "All" }
});
host.Build();
var connStr = host.Configuration.GetValue<string>("ConnectionString") ?? throw new ApplicationException("没有配置数据库连接字符串");
var databases = host.Configuration.GetValue<string>("Databases")?.Split(',').ToList() ?? throw new ApplicationException("没有配置数据库");
var all = host.Configuration.GetValue<bool>("All");
if (args.Length > 1 && args[0] == "count")
{
var result =await CountDatabasesAsync(connStr, databases);
if (all)
{
foreach (var (k, v) in result)
{
Console.WriteLine(k + ":");
Console.WriteLine(v.Select(pair => new { TABLE_NAME = pair.Key, COUNT = pair.Value }).ToMarkdownTable());
}
}
else
{
var allCount = result.Aggregate(new Dictionary<string, long>(), (dict, pair) =>
{
foreach (var (k, v) in pair.Value)
{
dict.AddOrUpdate(k, v, (key, num) => num + v);
}
return dict;
});
Console.WriteLine(allCount.Select(pair => new { TABLE_NAME = pair.Key, COUNT = pair.Value }).ToMarkdownTable());
}
}
async Task<IDictionary<string, IDictionary<string,long>>> CountDatabasesAsync(string connStr, IList<string> dbNames, CancellationToken cancellationToken = default)
{
var result = new ConcurrentDictionary<string, IDictionary<string,long>>();
var tables = await DatabaseHelper.QueryTableAsync(connStr,
$"""
SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbNames[0]}';
""");
await Parallel.ForEachAsync(dbNames, async (dbName, ct) =>
{
await Parallel.ForEachAsync(tables.Tables[0].Rows.Cast<DataRow>(), async (row, ct) =>
{
var tableName = row[0].ToString()!;
var count = (long)(await DatabaseHelper.QueryScalarAsync(connStr,
$"SELECT COUNT(1) FROM `{dbName}`.`{tableName}`;", ct))!;
result.AddOrUpdate(dbName, new ConcurrentDictionary<string, long>(), (db, dict) =>
{
dict.AddOrUpdate(tableName, count, (table, num) => num + count);
return dict;
});
});
});
return result;
}

View File

@ -4,6 +4,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.App", "MesETL.App\Me
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Test", "MesETL.Test\MesETL.Test.csproj", "{8679D5B6-5853-446E-9882-7B7A8E270500}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Test", "MesETL.Test\MesETL.Test.csproj", "{8679D5B6-5853-446E-9882-7B7A8E270500}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Tool", "MesETL.Tool\MesETL.Tool.csproj", "{68307B05-3D66-4322-A42F-C044C1E8BA3B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Shared", "MesETL.Shared\MesETL.Shared.csproj", "{FE134001-0E22-458B-BEF2-29712A29087E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Clean", "MesETL.Clean\MesETL.Clean.csproj", "{E1B2BED0-EBA6-4A14-BAD5-8EC4E528D7E0}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@ -18,5 +24,17 @@ Global
{8679D5B6-5853-446E-9882-7B7A8E270500}.Debug|Any CPU.Build.0 = Debug|Any CPU {8679D5B6-5853-446E-9882-7B7A8E270500}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8679D5B6-5853-446E-9882-7B7A8E270500}.Release|Any CPU.ActiveCfg = Release|Any CPU {8679D5B6-5853-446E-9882-7B7A8E270500}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8679D5B6-5853-446E-9882-7B7A8E270500}.Release|Any CPU.Build.0 = Release|Any CPU {8679D5B6-5853-446E-9882-7B7A8E270500}.Release|Any CPU.Build.0 = Release|Any CPU
{68307B05-3D66-4322-A42F-C044C1E8BA3B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{68307B05-3D66-4322-A42F-C044C1E8BA3B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{68307B05-3D66-4322-A42F-C044C1E8BA3B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{68307B05-3D66-4322-A42F-C044C1E8BA3B}.Release|Any CPU.Build.0 = Release|Any CPU
{FE134001-0E22-458B-BEF2-29712A29087E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FE134001-0E22-458B-BEF2-29712A29087E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FE134001-0E22-458B-BEF2-29712A29087E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FE134001-0E22-458B-BEF2-29712A29087E}.Release|Any CPU.Build.0 = Release|Any CPU
{E1B2BED0-EBA6-4A14-BAD5-8EC4E528D7E0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E1B2BED0-EBA6-4A14-BAD5-8EC4E528D7E0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E1B2BED0-EBA6-4A14-BAD5-8EC4E528D7E0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E1B2BED0-EBA6-4A14-BAD5-8EC4E528D7E0}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
EndGlobal EndGlobal