Compare commits
13 Commits
5cda84797b
...
v2023.1
Author | SHA1 | Date | |
---|---|---|---|
3dbfaffd05 | |||
c6d97fdc86 | |||
f689e1b659 | |||
f6af04bfcd | |||
571805250b | |||
aa7041962a | |||
73895fbce4 | |||
913c725fe1 | |||
41a1dc8a4f | |||
8db7c71170 | |||
20cc78c667 | |||
d58c9d5177 | |||
719cd2d8e7 |
@@ -1,9 +0,0 @@
|
||||
namespace MesETL.App.Cache;
|
||||
|
||||
#nullable disable
|
||||
public static class CacheKeys
|
||||
{
|
||||
public static Func<string, string> Order_OrderNo_CompanyID { get; set; }
|
||||
public static Func<string, string> OrderBlockPlan_ID_CompanyID { get; set; }
|
||||
public static Func<string, string> OrderProcess_ID_ShardKey { get; set; }
|
||||
}
|
33
MesETL.App/Cache/CacheKeysFunc.cs
Normal file
33
MesETL.App/Cache/CacheKeysFunc.cs
Normal file
@@ -0,0 +1,33 @@
|
||||
using MesETL.App.Const;
|
||||
|
||||
namespace MesETL.App.Cache;
|
||||
|
||||
#nullable disable
|
||||
public static class CacheKeysFunc
|
||||
{
|
||||
/// <summary>
|
||||
/// Order表 由OrderNo获取对应的CompanyID
|
||||
/// </summary>
|
||||
/// <param name="orderNo"></param>
|
||||
/// <returns></returns>
|
||||
public static string Order_OrderNo_CompanyID(string orderNo) => BuildCacheKey(TableNames.Order, "OrderNo", orderNo, "CompanyID");
|
||||
|
||||
/// <summary>
|
||||
/// OrderBlockPlan表 由ID获取对应的CompanyID
|
||||
/// </summary>
|
||||
/// <param name="id"></param>
|
||||
/// <returns></returns>
|
||||
public static string OrderBlockPlan_ID_CompanyID(string id) => BuildCacheKey(TableNames.OrderBlockPlan, "ID", id, "CompanyID");
|
||||
|
||||
/// <summary>
|
||||
/// OrderProcess表 由ID 获取对应的ShardKey
|
||||
/// </summary>
|
||||
/// <param name="id"></param>
|
||||
/// <returns></returns>
|
||||
public static string OrderProcess_ID_ShardKey(string id) => BuildCacheKey(TableNames.OrderProcess, "ID", id, "ShardKey");
|
||||
|
||||
|
||||
// 数据缓存键格式为[TableName]-[ColumnName@ColumnValue]-[CacheColumnName]
|
||||
static string BuildCacheKey(string tableName, string columnName, string columnValue, string cacheColumnName)
|
||||
=> $"{tableName}-{columnName}@{columnValue}-{cacheColumnName}";
|
||||
}
|
51
MesETL.App/Cache/MemoryCache.cs
Normal file
51
MesETL.App/Cache/MemoryCache.cs
Normal file
@@ -0,0 +1,51 @@
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
namespace MesETL.App.Cache;
|
||||
|
||||
public class MemoryCache : ICacher
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, string> _stringCache = new();
|
||||
private readonly ConcurrentDictionary<string, Dictionary<string, string>> _hashCache = new();
|
||||
|
||||
public static MemoryCache? Instance { get; private set; }
|
||||
|
||||
public MemoryCache()
|
||||
{
|
||||
Instance = this;
|
||||
}
|
||||
|
||||
public Task<string?> GetStringAsync(string key)
|
||||
{
|
||||
return _stringCache.TryGetValue(key, out var value) ? Task.FromResult<string?>(value) : Task.FromResult((string?)null);
|
||||
}
|
||||
|
||||
public Task SetStringAsync(string key, string value)
|
||||
{
|
||||
_stringCache[key] = value;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task<bool> ExistsAsync(string key)
|
||||
{
|
||||
return Task.FromResult(_stringCache.ContainsKey(key));
|
||||
}
|
||||
|
||||
public Task SetHashAsync(string key, IReadOnlyDictionary<string, string> hash)
|
||||
{
|
||||
_hashCache[key] = hash.ToDictionary(x => x.Key, x => x.Value);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task<Dictionary<string, string>> GetHashAsync(string key)
|
||||
{
|
||||
return Task.FromResult(_hashCache[key]);
|
||||
}
|
||||
|
||||
public void Delete(Func<string,bool> keySelector)
|
||||
{
|
||||
foreach (var k in _stringCache.Keys.Where(keySelector))
|
||||
{
|
||||
_stringCache.TryRemove(k, out _);
|
||||
}
|
||||
}
|
||||
}
|
@@ -57,6 +57,7 @@ public static class RedisCacheExtensions
|
||||
{
|
||||
var conn = ConnectionMultiplexer.Connect(options.Configuration
|
||||
?? throw new ApplicationException("未配置Redis连接字符串"));
|
||||
services.AddSingleton(conn);
|
||||
services.AddSingleton<ICacher>(new RedisCache(conn, options.Database, options.InstanceName));
|
||||
return services;
|
||||
}
|
||||
|
8
MesETL.App/Const/ConstVar.cs
Normal file
8
MesETL.App/Const/ConstVar.cs
Normal file
@@ -0,0 +1,8 @@
|
||||
namespace MesETL.App.Const;
|
||||
|
||||
public static class ConstVar
|
||||
{
|
||||
public const string Producer = "Producer";
|
||||
public const string Null = "NULL";
|
||||
public const string MyDumperNull = @"\N";
|
||||
}
|
@@ -1,6 +0,0 @@
|
||||
namespace MesETL.App.Const;
|
||||
|
||||
public static class ProcessStep
|
||||
{
|
||||
public const string Produce = "Producer";
|
||||
}
|
@@ -40,11 +40,11 @@ public class DataRecord : ICloneable
|
||||
return idx;
|
||||
}
|
||||
|
||||
public string? RawField { get; set; }
|
||||
public IList<string> Fields { get; }
|
||||
public IList<string> Headers { get; }
|
||||
public string TableName { get; }
|
||||
public string? Database { get; set; }
|
||||
public long FieldCharCount { get; }
|
||||
|
||||
|
||||
public DataRecord(IEnumerable<string> fields, string tableName, IEnumerable<string> headers, string? database = null)
|
||||
@@ -58,6 +58,8 @@ public class DataRecord : ICloneable
|
||||
throw new ArgumentException(
|
||||
$"The number of fields does not match the number of headers. Expected: {Headers.Count} Got: {Fields.Count} Fields: {string.Join(',', Fields)}",
|
||||
nameof(fields));
|
||||
|
||||
FieldCharCount = Fields.Sum(x => (long)x.Length);
|
||||
}
|
||||
|
||||
public string this[int index]
|
||||
|
@@ -1,4 +1,5 @@
|
||||
using System.Text.RegularExpressions;
|
||||
using System.Text.Json;
|
||||
using System.Text.RegularExpressions;
|
||||
using ZstdSharp;
|
||||
|
||||
namespace MesETL.App.Helpers;
|
||||
@@ -122,4 +123,16 @@ public static partial class DumpDataHelper
|
||||
return await reader.ReadToEndAsync();
|
||||
}
|
||||
|
||||
public static bool IsJson(string str)
|
||||
{
|
||||
try
|
||||
{
|
||||
JsonDocument.Parse(str);
|
||||
return true;
|
||||
}
|
||||
catch (JsonException)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,8 +1,8 @@
|
||||
using MesETL.App.Const;
|
||||
using MesETL.App.HostedServices.Abstractions;
|
||||
using MesETL.App.HostedServices.Abstractions;
|
||||
using MesETL.App.Options;
|
||||
using MesETL.App.Services;
|
||||
using MesETL.App.Services.ETL;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
@@ -33,18 +33,21 @@ public class FileInputService : IInputService
|
||||
private readonly IOptions<DataInputOptions> _dataInputOptions;
|
||||
private readonly ProcessContext _context;
|
||||
private readonly DataReaderFactory _dataReaderFactory;
|
||||
private readonly long _memoryThreshold;
|
||||
|
||||
public FileInputService(ILogger<FileInputService> logger,
|
||||
IOptions<DataInputOptions> dataInputOptions,
|
||||
ProcessContext context,
|
||||
[FromKeyedServices(ProcessStep.Produce)] DataRecordQueue producerQueue,
|
||||
DataReaderFactory dataReaderFactory)
|
||||
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
|
||||
DataReaderFactory dataReaderFactory,
|
||||
IConfiguration configuration)
|
||||
{
|
||||
_logger = logger;
|
||||
_dataInputOptions = dataInputOptions;
|
||||
_context = context;
|
||||
_producerQueue = producerQueue;
|
||||
_dataReaderFactory = dataReaderFactory;
|
||||
_memoryThreshold = (long)(configuration.GetValue<double>("MemoryThreshold", 8) * 1024 * 1024 * 1024);
|
||||
}
|
||||
|
||||
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||
@@ -69,16 +72,26 @@ public class FileInputService : IInputService
|
||||
foreach (var info in orderedInfo)
|
||||
{
|
||||
_logger.LogInformation("Reading file: {FileName}, table: {TableName}", info.FileName, info.TableName);
|
||||
var source = _dataReaderFactory.CreateReader(info.FileName,info.TableName,info.Headers);
|
||||
using var source = _dataReaderFactory.CreateReader(info.FileName,info.TableName,info.Headers);
|
||||
var count = 0;
|
||||
|
||||
while (await source.ReadAsync())
|
||||
{
|
||||
if (GC.GetTotalMemory(false) > _memoryThreshold)
|
||||
{
|
||||
_logger.LogWarning("内存过高,暂缓输入");
|
||||
GC.Collect();
|
||||
await Task.Delay(3000, cancellationToken);
|
||||
}
|
||||
var record = source.Current;
|
||||
_producerQueue.Enqueue(record);
|
||||
await _producerQueue.EnqueueAsync(record);
|
||||
count++;
|
||||
_context.AddInput();
|
||||
}
|
||||
|
||||
_context.AddTableInput(info.TableName, count);
|
||||
_logger.LogInformation("Input of table: '{TableName}' finished", info.TableName);
|
||||
_dataInputOptions.Value.OnTableInputCompleted?.Invoke(info.TableName);
|
||||
}
|
||||
|
||||
_context.CompleteInput();
|
||||
@@ -92,6 +105,7 @@ public class FileInputService : IInputService
|
||||
private IEnumerable<FileInputInfo> GetFilesInOrder(FileInputInfo[] inputFiles)
|
||||
{
|
||||
var tableOrder = _dataInputOptions.Value.TableOrder;
|
||||
var ignoreTable = _dataInputOptions.Value.TableIgnoreList;
|
||||
if (tableOrder is null or { Length: 0 })
|
||||
return inputFiles;
|
||||
|
||||
@@ -103,7 +117,7 @@ public class FileInputService : IInputService
|
||||
{
|
||||
var target = inputFiles.FirstOrDefault(f =>
|
||||
f.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase));
|
||||
if (target is not null)
|
||||
if (target is not null && !ignoreTable.Contains(target.TableName))
|
||||
yield return target;
|
||||
}
|
||||
}
|
||||
|
@@ -5,6 +5,7 @@ using MesETL.App.HostedServices.Abstractions;
|
||||
using MesETL.App.Options;
|
||||
using MesETL.App.Services;
|
||||
using MesETL.App.Services.ErrorRecorder;
|
||||
using MesETL.Shared.Helper;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
@@ -50,6 +51,7 @@ public class MainHostedService : BackgroundService
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation("Command argument detected, execute for each database");
|
||||
var command = _config["Command"];
|
||||
if (!string.IsNullOrEmpty(command))
|
||||
{
|
||||
@@ -59,7 +61,9 @@ public class MainHostedService : BackgroundService
|
||||
}
|
||||
|
||||
_stopwatch = Stopwatch.StartNew();
|
||||
await SetVariableAsync(); // 开启延迟写入,禁用重做日志 >>> 重做日志处于禁用状态时不要关闭数据库服务!
|
||||
var enableUnsafeVar = _config.GetValue<bool>("UnsafeVariable", false);
|
||||
if (enableUnsafeVar)
|
||||
await SetVariableAsync(); // 开启延迟写入,禁用重做日志 >>> 重做日志处于禁用状态时不要关闭数据库服务!
|
||||
|
||||
var monitorTask = Task.Run(async () => await _taskMonitor.Monitor(stoppingToken), stoppingToken);
|
||||
var inputTask = ExecuteAndCatch(
|
||||
@@ -75,7 +79,8 @@ public class MainHostedService : BackgroundService
|
||||
_logger.LogInformation("***** ElapseTime: {Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
|
||||
await Task.Delay(5000, stoppingToken);
|
||||
|
||||
await SetVariableAsync(false); // 关闭延迟写入,开启重做日志
|
||||
if(enableUnsafeVar)
|
||||
await SetVariableAsync(false); // 关闭延迟写入,开启重做日志
|
||||
if (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
await ExportResultAsync();
|
||||
@@ -165,7 +170,7 @@ public class MainHostedService : BackgroundService
|
||||
sb.AppendLine("\n---\n");
|
||||
sb.AppendLine("## Table Output Progress");
|
||||
var tableOutputProgress = _context.TableProgress.Select(pair =>
|
||||
new { Table = pair.Key, Count = pair.Value });
|
||||
new { Table = pair.Key, Count = pair.Value }).OrderBy(s => s.Table);
|
||||
sb.AppendLine(tableOutputProgress.ToMarkdownTable());
|
||||
sb.AppendLine("\n---\n");
|
||||
sb.AppendLine("## Result");
|
||||
|
@@ -1,13 +1,15 @@
|
||||
using MesETL.App.Helpers;
|
||||
using System.Buffers;
|
||||
using MesETL.App.Helpers;
|
||||
using MesETL.App.HostedServices.Abstractions;
|
||||
using MesETL.App.Options;
|
||||
using MesETL.App.Services;
|
||||
using MesETL.App.Services.ErrorRecorder;
|
||||
using MesETL.Shared.Helper;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using MySqlConnector;
|
||||
using MySqlDestination = MesETL.App.Services.ETL.MySqlDestination;
|
||||
using TaskExtensions = MesETL.App.Helpers.TaskExtensions;
|
||||
using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
|
||||
|
||||
namespace MesETL.App.HostedServices;
|
||||
|
||||
@@ -64,13 +66,14 @@ public class OutputService : IOutputService
|
||||
{
|
||||
_logger.LogInformation("*****开启输出线程,数据库: {db} *****", db);
|
||||
var taskManager = new TaskManager(_outputOptions.Value.MaxDatabaseOutputTask);
|
||||
var ignoreOutput = new HashSet<string>(_outputOptions.Value.NoOutput);
|
||||
var tmp = new List<DataRecord>();
|
||||
while (!_context.IsTransformCompleted || queue.Count > 0)
|
||||
{
|
||||
if (ct.IsCancellationRequested)
|
||||
break;
|
||||
|
||||
if (!queue.TryDequeue(out var record)) continue;
|
||||
if (!queue.TryDequeue(out var record) || ignoreOutput.Contains(record.TableName)) continue;
|
||||
|
||||
var dbName = record.Database ?? throw new ApplicationException("输出的记录缺少数据库名");
|
||||
if(dbName != db)
|
||||
@@ -136,6 +139,7 @@ public class OutputService : IOutputService
|
||||
await output.FlushAsync(_outputOptions.Value.MaxAllowedPacket);
|
||||
foreach (var (key, value) in tableOutput)
|
||||
{
|
||||
_context.AddOutput(value);
|
||||
_context.AddTableOutput(key, value);
|
||||
}
|
||||
_logger.LogTrace("Flushed {Count} records", tableOutput.Values.Sum(i => i));
|
||||
|
@@ -1,5 +1,5 @@
|
||||
using System.Diagnostics;
|
||||
using MesETL.App.Const;
|
||||
using System.Text;
|
||||
using MesETL.App.Services;
|
||||
using MesETL.App.Services.Loggers;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
@@ -16,8 +16,10 @@ public class TaskMonitorService
|
||||
private readonly DataRecordQueue _producerQueue;
|
||||
private readonly RecordQueuePool _queuePool;
|
||||
|
||||
private string _outputPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Log/progress.txt");
|
||||
|
||||
public TaskMonitorService(ProcessContext context,
|
||||
[FromKeyedServices(ProcessStep.Produce)]
|
||||
[FromKeyedServices(Const.ConstVar.Producer)]
|
||||
DataRecordQueue producerQueue,
|
||||
RecordQueuePool queuePool,
|
||||
IEnumerable<ITaskMonitorLogger> monitorLoggers)
|
||||
@@ -80,31 +82,37 @@ public class TaskMonitorService
|
||||
{
|
||||
logger.LogStatus("Monitor: Progress status", new Dictionary<string, string>
|
||||
{
|
||||
{"Input",_context.IsInputCompleted ? "completed" : $"running {inputSpeed:F2} records/s" },
|
||||
{"Transform", _context.IsTransformCompleted ? "completed" : $"running {transformSpeed:F2} records/s" },
|
||||
{"Output", _context.IsOutputCompleted ? "completed" : $"running {outputSpeed:F2} records/s" }
|
||||
{"Input",_context.IsInputCompleted ? "OK" : $"{inputSpeed:F2}/s" },
|
||||
{"Transform", _context.IsTransformCompleted ? "OK" : $"{transformSpeed:F2}/s" },
|
||||
{"Output", _context.IsOutputCompleted ? "OK" : $"{outputSpeed:F2}/s" },
|
||||
|
||||
{"| Input Queue", _producerQueue.Count.ToString() },
|
||||
{"Output Queue", _queuePool.Queues.Values.Sum(queue => queue.Count).ToString()},
|
||||
{"Memory", $"{GC.GetTotalMemory(false) / 1024 / 1024} MiB"},
|
||||
});
|
||||
|
||||
logger.LogStatus("Monitor: Table output progress",
|
||||
_context.TableProgress
|
||||
.ToDictionary(kv => kv.Key, kv => kv.Value.ToString()),
|
||||
ITaskMonitorLogger.LogLevel.Progress);
|
||||
|
||||
logger.LogStatus("Monitor: Process count", new Dictionary<string, string>
|
||||
var dict = _context.TableProgress
|
||||
.ToDictionary(kv => kv.Key, kv => $"{kv.Value.input}/{kv.Value.output}");
|
||||
logger.LogStatus("Monitor: Table progress", dict, ITaskMonitorLogger.LogLevel.Progress);
|
||||
var sb = new StringBuilder("Table Progress: \n");
|
||||
foreach (var kv in dict)
|
||||
{
|
||||
{"Input", inputCount.ToString()},
|
||||
{"Transform", transformCount.ToString()},
|
||||
{"Output", outputCount.ToString()}
|
||||
}, ITaskMonitorLogger.LogLevel.Progress);
|
||||
sb.Append(kv.Key).AppendLine(kv.Value);
|
||||
}
|
||||
|
||||
logger.LogStatus("Monitor: Queue", new Dictionary<string, string>
|
||||
{
|
||||
{"Producer queue records", _producerQueue.Count.ToString() },
|
||||
{"Output queues", _queuePool.Queues.Count.ToString() },
|
||||
{"Output queue records", _queuePool.Queues.Values.Sum(queue => queue.Count).ToString()},
|
||||
});
|
||||
sb.AppendLine($"LongestCharCount: {_producerQueue.LongestFieldCharCount}");
|
||||
await File.WriteAllTextAsync(_outputPath, sb.ToString(), CancellationToken.None);
|
||||
|
||||
|
||||
// logger.LogStatus("Monitor: Process count", new Dictionary<string, string>
|
||||
// {
|
||||
// {"Input", inputCount.ToString()},
|
||||
// {"Transform", transformCount.ToString()},
|
||||
// {"Output", outputCount.ToString()}
|
||||
// }, ITaskMonitorLogger.LogLevel.Progress);
|
||||
}
|
||||
|
||||
|
||||
await Task.Delay(5000, stoppingToken);
|
||||
|
||||
lastTime = time;
|
||||
|
@@ -1,5 +1,4 @@
|
||||
using MesETL.App.Cache;
|
||||
using MesETL.App.Const;
|
||||
using MesETL.App.HostedServices.Abstractions;
|
||||
using MesETL.App.Options;
|
||||
using MesETL.App.Services;
|
||||
@@ -28,7 +27,7 @@ public class TransformService : ITransformService
|
||||
|
||||
public TransformService(ILogger<TransformService> logger,
|
||||
IOptions<DataTransformOptions> options,
|
||||
[FromKeyedServices(ProcessStep.Produce)] DataRecordQueue producerQueue,
|
||||
[FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue,
|
||||
RecordQueuePool queuePool,
|
||||
ProcessContext context,
|
||||
ICacher cache,
|
||||
@@ -47,9 +46,26 @@ public class TransformService : ITransformService
|
||||
{
|
||||
_logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
|
||||
|
||||
// var tasks = new List<Task>();
|
||||
// for (int i = 0; i < 4; i++)
|
||||
// {
|
||||
// tasks.Add(Task.Run(TransformWorker, cancellationToken));
|
||||
// }
|
||||
//
|
||||
// await Task.WhenAll(tasks);
|
||||
await TransformWorker();
|
||||
|
||||
_logger.LogInformation("***** Data transformation service finished *****");
|
||||
}
|
||||
|
||||
public async Task TransformWorker()
|
||||
{
|
||||
while (!_context.IsInputCompleted || _producerQueue.Count > 0)
|
||||
{
|
||||
if (!_producerQueue.TryDequeue(out var record)) continue;
|
||||
if (!_producerQueue.TryDequeue(out var record))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
@@ -81,7 +97,7 @@ public class TransformService : ITransformService
|
||||
?? throw new ApplicationException("未配置数据库过滤器");
|
||||
record.Database = dbFilter(record);
|
||||
|
||||
_queuePool[record.Database].Enqueue(record);
|
||||
await _queuePool[record.Database].EnqueueAsync(record);
|
||||
_context.AddTransform();
|
||||
|
||||
if (_options.Value.EnableReBuilder)
|
||||
@@ -94,7 +110,7 @@ public class TransformService : ITransformService
|
||||
{
|
||||
if(dbFilter is not null)
|
||||
rc.Database =dbFilter.Invoke(record);
|
||||
_queuePool[record.Database].Enqueue(rc);
|
||||
await _queuePool[record.Database].EnqueueAsync(rc);
|
||||
_context.AddTransform();
|
||||
}
|
||||
}
|
||||
@@ -111,7 +127,5 @@ public class TransformService : ITransformService
|
||||
}
|
||||
}
|
||||
_context.CompleteTransform();
|
||||
|
||||
_logger.LogInformation("***** Data transformation service finished *****");
|
||||
}
|
||||
}
|
@@ -24,7 +24,7 @@ public class VoidOutputService : IOutputService
|
||||
_logger.LogInformation("***** Void Output Service Started *****");
|
||||
while (!_context.IsTransformCompleted || _queuePool.Queues.Count > 0)
|
||||
{
|
||||
foreach (var pair in _queuePool.Queues) // 内存优化
|
||||
foreach (var pair in _queuePool.Queues)
|
||||
{
|
||||
if (_context.IsTransformCompleted && pair.Value.Count == 0)
|
||||
{
|
||||
|
@@ -22,7 +22,6 @@
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
|
||||
<PackageReference Include="MySqlConnector" Version="2.3.3" />
|
||||
<PackageReference Include="Serilog" Version="3.1.2-dev-02097" />
|
||||
<PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" />
|
||||
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
|
||||
@@ -32,4 +31,8 @@
|
||||
<PackageReference Include="ZstdSharp.Port" Version="0.7.4" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\MesETL.Shared\MesETL.Shared.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
@@ -38,11 +38,15 @@ namespace MesETL.App.Options
|
||||
|
||||
public string[]? TableOrder { get; set; }
|
||||
|
||||
public string[] TableIgnoreList { get; set; } = [];
|
||||
|
||||
/// <summary>
|
||||
/// 配置如何从文件名转换为表名和表头
|
||||
/// </summary>
|
||||
public Func<string, FileInputInfo?>? FileInputMetaBuilder { get; set; } //TODO: 抽离
|
||||
|
||||
public Action<string>? OnTableInputCompleted { get; set; }
|
||||
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
|
@@ -4,7 +4,7 @@ public class DatabaseOutputOptions
|
||||
{
|
||||
public string? ConnectionString { get; set; }
|
||||
|
||||
public int MaxAllowedPacket { get; set; } = 64 * 1024 * 1024;
|
||||
public int MaxAllowedPacket { get; set; } = 32 * 1024 * 1024;
|
||||
|
||||
public int FlushCount { get; set; } = 10000;
|
||||
|
||||
@@ -12,6 +12,9 @@ public class DatabaseOutputOptions
|
||||
|
||||
public bool TreatJsonAsHex { get; set; } = true;
|
||||
|
||||
public string[] NoOutput { get; set; } = [];
|
||||
|
||||
public Dictionary<string, string>? ForUpdate { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 配置导入数据的特殊列
|
||||
@@ -22,4 +25,12 @@ public class DatabaseOutputOptions
|
||||
{
|
||||
return ColumnTypeConfig.GetValueOrDefault($"{table}.{column}", ColumnType.UnDefine);
|
||||
}
|
||||
|
||||
public bool TryGetForUpdate(string table, out string? forUpdate)
|
||||
{
|
||||
forUpdate = null;
|
||||
if (ForUpdate is null || !ForUpdate.TryGetValue(table, out forUpdate))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
@@ -79,6 +79,7 @@ async Task RunProgram()
|
||||
options.UseMock = inputOptions.UseMock;
|
||||
options.TableMockConfig = inputOptions.TableMockConfig;
|
||||
options.MockCountMultiplier = inputOptions.MockCountMultiplier;
|
||||
options.TableIgnoreList = inputOptions.TableIgnoreList;
|
||||
|
||||
// 配置文件输入方法
|
||||
options.FileInputMetaBuilder = fileName =>
|
||||
@@ -113,25 +114,41 @@ async Task RunProgram()
|
||||
return null;
|
||||
};
|
||||
|
||||
options.TableOrder =
|
||||
// 配置表输入完成事件,字典清理
|
||||
options.OnTableInputCompleted = table =>
|
||||
{
|
||||
switch (table)
|
||||
{
|
||||
case TableNames.OrderBlockPlan:
|
||||
MemoryCache.Instance?.Delete(s => s.StartsWith(TableNames.Order + '-'));
|
||||
break;
|
||||
case TableNames.OrderItem:
|
||||
MemoryCache.Instance?.Delete(s => s.StartsWith(TableNames.OrderBlockPlan + '-'));
|
||||
break;
|
||||
case TableNames.OrderProcessSchedule:
|
||||
MemoryCache.Instance?.Delete(s => s.StartsWith(TableNames.OrderProcess + '-'));
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
options.TableOrder = inputOptions.TableOrder ??
|
||||
[
|
||||
TableNames.Machine,
|
||||
|
||||
TableNames.Order,
|
||||
TableNames.OrderBoxBlock, // 依赖Order.CompanyID
|
||||
TableNames.OrderDataBlock, // 依赖Order.CompanyID
|
||||
|
||||
TableNames.OrderBlockPlan,
|
||||
TableNames.OrderBlockPlanResult,// 依赖OrderBlockPlan.CompanyID / 删除
|
||||
|
||||
TableNames.OrderItem,
|
||||
TableNames.OrderDataBlock,
|
||||
TableNames.OrderDataGoods,
|
||||
TableNames.OrderDataParts,
|
||||
TableNames.OrderModule,
|
||||
TableNames.OrderModuleExtra,
|
||||
TableNames.OrderModuleItem,
|
||||
TableNames.OrderPackage,
|
||||
TableNames.OrderPatchDetail,
|
||||
|
||||
TableNames.OrderProcess,
|
||||
TableNames.OrderProcessStep,
|
||||
@@ -212,10 +229,6 @@ async Task RunProgram()
|
||||
|
||||
host.Services.Configure<DataTransformOptions>(options =>
|
||||
{
|
||||
// 数据缓存键格式为[TableName]-[ColumnName@ColumnValue]-[CacheColumnName]
|
||||
static string BuildCacheKey(string tableName, string columnName, string columnValue, string cacheColumnName)
|
||||
=> $"{tableName}-{columnName}@{columnValue}-{cacheColumnName}";
|
||||
|
||||
static string CalculateShardKeyByOrderNo(ReadOnlySpan<char> orderNo)
|
||||
=> $"{orderNo[2..6]}0";
|
||||
|
||||
@@ -236,25 +249,53 @@ async Task RunProgram()
|
||||
// OrderBoxBlock删除对应Order.OrderNo不存在的对象
|
||||
case TableNames.OrderBoxBlock:
|
||||
{
|
||||
if (!await cache.ExistsAsync(CacheKeys.Order_OrderNo_CompanyID(record["OrderNo"])))
|
||||
if (!await cache.ExistsAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])))
|
||||
return false;
|
||||
break;
|
||||
}
|
||||
// OrderDataBlock删除对应Order.OrderNo不存在的对象
|
||||
case TableNames.OrderDataBlock:
|
||||
{
|
||||
if (!await cache.ExistsAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])))
|
||||
return false;
|
||||
break;
|
||||
}
|
||||
// OrderDataParts删除对应Order.OrderNo不存在的对象
|
||||
case TableNames.OrderDataParts:
|
||||
{
|
||||
if (!await cache.ExistsAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])))
|
||||
return false;
|
||||
break;
|
||||
}
|
||||
// OrderBlockPlan删除CreateTime < 202301的
|
||||
case TableNames.OrderBlockPlan:
|
||||
{
|
||||
var time = DateTime.Parse(record["CreateTime"].Trim('"'));
|
||||
var time = DateTime.Parse(record["CreateTime"].Trim('"','\''));
|
||||
if (time < oldestTime)
|
||||
return false;
|
||||
|
||||
// if (!DumpDataHelper.IsJson(record["OrderNos"])) return false; //Json列合法检查
|
||||
break;
|
||||
}
|
||||
// OrderBlockPlanResult删除对应order_block_plan.ID不存在的对象
|
||||
case TableNames.OrderBlockPlanResult:
|
||||
{
|
||||
if (!await cache.ExistsAsync(CacheKeys.OrderBlockPlan_ID_CompanyID(record["ID"])))
|
||||
if (!await cache.ExistsAsync(CacheKeysFunc.OrderBlockPlan_ID_CompanyID(record["ID"])))
|
||||
return false;
|
||||
break;
|
||||
}
|
||||
// case TableNames.OrderBlockPlanResult: // 用SaveTime过滤
|
||||
// {
|
||||
// if (DateTime.Parse(record["SaveTime"].Trim('"', '\'')) < oldestTime)
|
||||
// return false;
|
||||
// break;
|
||||
// }
|
||||
// OrderDataGoods Json列合法检查
|
||||
case TableNames.OrderDataGoods:
|
||||
{
|
||||
// if (!DumpDataHelper.IsJson(record["ExtraProp"])) return false;
|
||||
break;
|
||||
}
|
||||
// OrderModule删除OrderNo < 202301的
|
||||
case TableNames.OrderModule:
|
||||
{
|
||||
@@ -282,7 +323,7 @@ async Task RunProgram()
|
||||
// OrderProcessStepStep删除对应OrderProcess.ID不存在的对象
|
||||
case TableNames.OrderProcessStepItem:
|
||||
{
|
||||
if (!await cache.ExistsAsync(CacheKeys.OrderProcess_ID_ShardKey(record["OrderProcessID"])))
|
||||
if (!await cache.ExistsAsync(CacheKeysFunc.OrderProcess_ID_ShardKey(record["OrderProcessID"])))
|
||||
return false;
|
||||
break;
|
||||
}
|
||||
@@ -297,7 +338,7 @@ async Task RunProgram()
|
||||
// SimplePlanOrder删除CreateTime < 202301的
|
||||
case TableNames.SimplePlanOrder:
|
||||
{
|
||||
var time = DateTime.Parse(record["CreateTime"].Trim('"'));
|
||||
var time = DateTime.Parse(record["CreateTime"].Trim('"', '\''));
|
||||
if (time < oldestTime)
|
||||
return false;
|
||||
break;
|
||||
@@ -308,36 +349,81 @@ async Task RunProgram()
|
||||
};
|
||||
|
||||
// 数据替换
|
||||
/*
|
||||
* 空数据处理:
|
||||
* 某些列生产库为可空,而测试库变为了不可空,则需要根据列的类型对这些列做单独处理
|
||||
* int或任何非无符号整型 -> -1
|
||||
* varchar -> ''(空字符串)
|
||||
* datetime -> '1000-01-01'(datetime最小值)
|
||||
* text -> 0 (16进制0,MyDumper中的text是为16进制)
|
||||
*/
|
||||
const string DefaultInt = "0";
|
||||
const string DefaultStr = "''";
|
||||
const string DefaultDateTime = "'1000-01-01'";
|
||||
const string DefaultText = "0";
|
||||
|
||||
options.RecordModify = async context =>
|
||||
{
|
||||
void ReplaceIfMyDumperNull(DataRecord record, string fieldName, string replaceValue)
|
||||
{
|
||||
if (record[fieldName] is ConstVar.MyDumperNull)
|
||||
{
|
||||
context.Logger.LogWarning("发现不可空的字段为空({TableName}.{FieldName}),填充默认值: {DefaultValue}",
|
||||
record.TableName, fieldName, replaceValue);
|
||||
record[fieldName] = replaceValue;
|
||||
}
|
||||
}
|
||||
|
||||
var record = context.Record;
|
||||
var cache = context.Cacher;
|
||||
switch (record.TableName)
|
||||
{
|
||||
#if USE_TEST_DB
|
||||
// Order表移除IsBatch列
|
||||
case TableNames.Order:
|
||||
record.RemoveField("IsBatch");
|
||||
// Machine处理非空列
|
||||
case TableNames.Machine:
|
||||
ReplaceIfMyDumperNull(record, "Name", DefaultStr);
|
||||
ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime);
|
||||
ReplaceIfMyDumperNull(record, "CreatorID", DefaultInt);
|
||||
ReplaceIfMyDumperNull(record, "EditTime", DefaultDateTime);
|
||||
ReplaceIfMyDumperNull(record, "EditorID", DefaultInt);
|
||||
ReplaceIfMyDumperNull(record, "Settings", DefaultText);
|
||||
break;
|
||||
#endif
|
||||
//OrderBlockPlan将OrderNo长度<2的置空
|
||||
// Order处理非空列
|
||||
case TableNames.Order:
|
||||
ReplaceIfMyDumperNull(record, "Deleted", DefaultInt);
|
||||
break;
|
||||
// OrderBlockPlan处理text->json列
|
||||
case TableNames.OrderBlockPlan:
|
||||
if (record["OrderNos"].Length < 2)
|
||||
// 将所有值为'[]'(即字符串长度小等于2(16进制长度小于4))的置空 [] = 0x5b5d
|
||||
if (record["OrderNos"].Length <= 4)
|
||||
record["OrderNos"] = "NULL";
|
||||
break;
|
||||
// OrderBlockPlanResult表添加CompanyID列
|
||||
// OrderBlockPlanResult,添加CompanyID
|
||||
case TableNames.OrderBlockPlanResult:
|
||||
record.AddField("CompanyID",
|
||||
// 获取OrderBlockPlan.ID -> CompanyID
|
||||
ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.OrderBlockPlan_ID_CompanyID(record["ID"])),
|
||||
TableNames.OrderBlockPlanResult, TableNames.OrderBlockPlan, "ID", "脏数据未处理"));
|
||||
ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.OrderBlockPlan_ID_CompanyID(record["ID"])),
|
||||
TableNames.OrderBlockPlanResult, TableNames.OrderBlockPlan, "ID", "无法获取对应的CompanyID"));
|
||||
break;
|
||||
// OrderBoxBlock添加CompanyID列
|
||||
case TableNames.OrderBoxBlock:
|
||||
record.AddField("CompanyID",
|
||||
// 获取Order.OrderNo -> CompanyID
|
||||
ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.Order_OrderNo_CompanyID(record["OrderNo"])),
|
||||
TableNames.OrderBoxBlock, TableNames.Order, "OrderNo", "脏数据未处理"));
|
||||
ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])),
|
||||
TableNames.OrderBoxBlock, TableNames.Order, "OrderNo", "无法获取对应的CompanyID"));
|
||||
break;
|
||||
// 修正OrderDataBlock.CompanyID
|
||||
case TableNames.OrderDataBlock:
|
||||
record["CompanyID"] =
|
||||
// 获取Order.OrderNo -> CompanyID
|
||||
ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])),
|
||||
TableNames.OrderDataBlock, TableNames.Order, "OrderNo", "无法获取对应的CompanyID");
|
||||
break;
|
||||
// 修正OrderDataParts.CompanyID:
|
||||
case TableNames.OrderDataParts:
|
||||
record["CompanyID"] =
|
||||
// 获取Order.OrderNo -> CompanyID
|
||||
ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"])),
|
||||
TableNames.OrderDataParts, TableNames.Order, "OrderNo", "无法获取对应的CompanyID");
|
||||
break;
|
||||
// OrderModule添加ShardKey列,移除ViewFileName列
|
||||
case TableNames.OrderModule:
|
||||
@@ -347,25 +433,37 @@ async Task RunProgram()
|
||||
// OrderProcess添加ShardKey列,NextStepID的空值转换为0
|
||||
case TableNames.OrderProcess:
|
||||
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
|
||||
#if USE_TEST_DB
|
||||
if(record["NextStepID"] == "\\N")
|
||||
record["NextStepID"] = "0";
|
||||
#endif
|
||||
break;
|
||||
// OrderProcessStep,OrderProcessStepItem添加ShardKey列
|
||||
// OrderProcessStep添加ShardKey
|
||||
case TableNames.OrderProcessStep:
|
||||
record.AddField("ShardKey", CalculateShardKeyByOrderNo(record["OrderNo"]));
|
||||
break;
|
||||
// OrderProcessStepItem添加ShardKey列,处理非空列
|
||||
case TableNames.OrderProcessStepItem:
|
||||
ReplaceIfMyDumperNull(record, "DataID", DefaultInt);
|
||||
record.AddField("ShardKey",
|
||||
// 获取OrderProcess.ID -> ShardKey
|
||||
ThrowIfNoCached(await cache.GetStringAsync(CacheKeys.OrderProcess_ID_ShardKey(record["OrderProcessID"])),
|
||||
TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "脏数据未处理"));
|
||||
ThrowIfNoCached(await cache.GetStringAsync(CacheKeysFunc.OrderProcess_ID_ShardKey(record["OrderProcessID"])),
|
||||
TableNames.OrderProcessStepItem, TableNames.OrderProcessStep, "OrderProcessID", "无法获取对应的ShardKey"));
|
||||
break;
|
||||
// OrderScrapBoard处理非空列
|
||||
case TableNames.OrderScrapBoard:
|
||||
ReplaceIfMyDumperNull(record, "Color", DefaultStr);
|
||||
ReplaceIfMyDumperNull(record, "GoodsName", DefaultStr);
|
||||
ReplaceIfMyDumperNull(record, "Material", DefaultStr);
|
||||
ReplaceIfMyDumperNull(record, "MaterialName", DefaultStr);
|
||||
break;
|
||||
// ProcessItemExp处理非空列
|
||||
case TableNames.ProcessItemExp:
|
||||
ReplaceIfMyDumperNull(record, "MaxPartsID", DefaultInt);
|
||||
ReplaceIfMyDumperNull(record, "ProcessGroupID", DefaultInt);
|
||||
break;
|
||||
// SimplePlanOrder处理非空列,添加Deleted
|
||||
case TableNames.SimplePlanOrder:
|
||||
#if USE_TEST_DB
|
||||
record.RemoveField("ProcessState");
|
||||
#endif
|
||||
ReplaceIfMyDumperNull(record, "CreateTime", DefaultDateTime);
|
||||
ReplaceIfMyDumperNull(record, "UpdateTime", DefaultDateTime);
|
||||
ReplaceIfMyDumperNull(record, "CompanyID", DefaultInt);
|
||||
ReplaceIfMyDumperNull(record, "SingleName", DefaultStr);
|
||||
record.AddField("Deleted", "0");
|
||||
break;
|
||||
}
|
||||
@@ -390,28 +488,22 @@ async Task RunProgram()
|
||||
{
|
||||
// 缓存Order.OrderNo -> CompanyID
|
||||
case TableNames.Order:
|
||||
CacheKeys.Order_OrderNo_CompanyID = orderNo =>
|
||||
BuildCacheKey(TableNames.Order, "OrderNo", orderNo, "CompanyID");
|
||||
await cache.SetStringAsync(
|
||||
CacheKeys.Order_OrderNo_CompanyID(record["OrderNo"]),
|
||||
CacheKeysFunc.Order_OrderNo_CompanyID(record["OrderNo"]),
|
||||
record["CompanyID"]);
|
||||
break;
|
||||
|
||||
// 缓存OrderBlockPlan.ID -> CompanyID
|
||||
case TableNames.OrderBlockPlan:
|
||||
CacheKeys.OrderBlockPlan_ID_CompanyID = id =>
|
||||
BuildCacheKey(TableNames.OrderBlockPlan, "ID", id, "CompanyID");
|
||||
await cache.SetStringAsync(
|
||||
CacheKeys.OrderBlockPlan_ID_CompanyID(record["ID"]),
|
||||
CacheKeysFunc.OrderBlockPlan_ID_CompanyID(record["ID"]),
|
||||
record["CompanyID"]);
|
||||
break;
|
||||
|
||||
// 缓存OrderProcess.ID -> ShardKey
|
||||
case TableNames.OrderProcess:
|
||||
CacheKeys.OrderProcess_ID_ShardKey = id =>
|
||||
BuildCacheKey(TableNames.OrderProcess, "ID", id, "ShardKey");
|
||||
await cache.SetStringAsync(
|
||||
CacheKeys.OrderProcess_ID_ShardKey(record["ID"]),
|
||||
CacheKeysFunc.OrderProcess_ID_ShardKey(record["ID"]),
|
||||
record["ShardKey"]);
|
||||
break;
|
||||
}
|
||||
@@ -465,9 +557,11 @@ async Task RunProgram()
|
||||
{
|
||||
options.ConnectionString = outputOptions.ConnectionString;
|
||||
options.FlushCount = outputOptions.FlushCount;
|
||||
options.MaxAllowedPacket = outputOptions.MaxAllowedPacket;
|
||||
options.MaxAllowedPacket = outputOptions.MaxAllowedPacket / 2;
|
||||
options.MaxDatabaseOutputTask = outputOptions.MaxDatabaseOutputTask;
|
||||
options.TreatJsonAsHex = outputOptions.TreatJsonAsHex;
|
||||
options.NoOutput = outputOptions.NoOutput;
|
||||
options.ForUpdate = outputOptions.ForUpdate;
|
||||
|
||||
#if USE_TEST_DB
|
||||
// Test Server
|
||||
@@ -531,7 +625,7 @@ async Task RunProgram()
|
||||
{ "process_item_exp.ItemJson", ColumnType.Text },
|
||||
{ "report_template.Template", ColumnType.Text },
|
||||
{ "report_template.SourceConfig", ColumnType.Text },
|
||||
{ "order_block_plan.OrderNos", ColumnType.Json },
|
||||
{ "order_block_plan.OrderNos", ColumnType.Text },
|
||||
{ "order_block_plan.BlockInfo", ColumnType.Text },
|
||||
};
|
||||
#endif
|
||||
@@ -552,9 +646,12 @@ async Task RunProgram()
|
||||
host.Services.AddDataSourceFactory();
|
||||
host.Services.AddErrorRecorderFactory();
|
||||
host.Services.AddSingleton<ProcessContext>();
|
||||
host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Produce);
|
||||
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(500_000))).ToArray());
|
||||
host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
|
||||
var prodLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ProducerQueueLength");
|
||||
var consLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ConsumerQueueLength");
|
||||
var maxCharCount = host.Configuration.GetRequiredSection("RecordQueue").GetValue<long>("MaxByteCount") / 2;
|
||||
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer, new DataRecordQueue(prodLen, maxCharCount));
|
||||
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(consLen, maxCharCount))).ToArray());
|
||||
// host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
|
||||
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();
|
||||
|
||||
host.Services.AddHostedService<MainHostedService>();
|
||||
@@ -562,7 +659,8 @@ async Task RunProgram()
|
||||
host.Services.AddSingleton<ITransformService, TransformService>();
|
||||
host.Services.AddSingleton<IOutputService, OutputService>();
|
||||
host.Services.AddSingleton<TaskMonitorService>();
|
||||
host.Services.AddRedisCache(redisOptions);
|
||||
// host.Services.AddRedisCache(redisOptions);
|
||||
host.Services.AddSingleton<ICacher, MemoryCache>();
|
||||
var app = host.Build();
|
||||
await app.RunAsync();
|
||||
}
|
@@ -1,5 +1,6 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
|
||||
|
||||
namespace MesETL.App.Services;
|
||||
|
||||
@@ -10,26 +11,35 @@ public class DataRecordQueue : IDisposable
|
||||
{
|
||||
private readonly BlockingCollection<DataRecord> _queue;
|
||||
|
||||
private long _currentCharCount;
|
||||
private readonly long _maxCharCount = 2_147_483_648; // 4GiB
|
||||
|
||||
public int Count => _queue.Count;
|
||||
public bool IsCompleted => _queue.IsCompleted;
|
||||
public bool IsAddingCompleted => _queue.IsAddingCompleted;
|
||||
|
||||
public long LongestFieldCharCount { get; private set; }
|
||||
|
||||
public event Action? OnRecordWrite;
|
||||
public event Action? OnRecordRead;
|
||||
|
||||
public DataRecordQueue() : this(1000000) // 默认容量最大1M
|
||||
public DataRecordQueue() : this(500_000, 2_147_483_648) // 默认容量最大500K
|
||||
{
|
||||
}
|
||||
|
||||
public DataRecordQueue(int boundedCapacity)
|
||||
public DataRecordQueue(int boundedCapacity, long maxCharCount)
|
||||
{
|
||||
_queue = new BlockingCollection<DataRecord>(boundedCapacity);
|
||||
_maxCharCount = maxCharCount;
|
||||
}
|
||||
|
||||
public void CompleteAdding() => _queue.CompleteAdding();
|
||||
|
||||
public bool TryDequeue([MaybeNullWhen(false)] out DataRecord record)
|
||||
{
|
||||
if (_queue.TryTake(out record))
|
||||
{
|
||||
Interlocked.Add(ref _currentCharCount, -record.FieldCharCount);
|
||||
OnRecordRead?.Invoke();
|
||||
return true;
|
||||
}
|
||||
@@ -37,13 +47,15 @@ public class DataRecordQueue : IDisposable
|
||||
return false;
|
||||
}
|
||||
|
||||
public DataRecord Dequeue() => _queue.Take();
|
||||
|
||||
public void CompleteAdding() => _queue.CompleteAdding();
|
||||
|
||||
public void Enqueue(DataRecord record)
|
||||
public async Task EnqueueAsync(DataRecord record)
|
||||
{
|
||||
var charCount = record.FieldCharCount;
|
||||
LongestFieldCharCount = Math.Max(LongestFieldCharCount, charCount);
|
||||
if(_currentCharCount + charCount > _maxCharCount)
|
||||
await TaskExtensions.WaitUntil(() => _currentCharCount + charCount < _maxCharCount, 50);
|
||||
_queue.Add(record);
|
||||
Interlocked.Add(ref _currentCharCount, charCount);
|
||||
OnRecordWrite?.Invoke();
|
||||
}
|
||||
|
||||
|
@@ -11,27 +11,30 @@ public class CsvReader : IDataReader
|
||||
{
|
||||
protected readonly string? FilePath;
|
||||
protected readonly Lazy<StreamReader> Reader;
|
||||
private Stream? _stream;
|
||||
protected readonly ILogger? Logger;
|
||||
protected readonly string TableName;
|
||||
|
||||
public DataRecord Current { get; protected set; } = null!;
|
||||
public DataRecord Current { get; protected set; } = default!;
|
||||
public string[] Headers { get; }
|
||||
public string? CurrentRaw { get; protected set; }
|
||||
public string Delimiter { get; }
|
||||
public char QuoteChar { get; }
|
||||
|
||||
public CsvReader(Stream stream, string tableName, string[] headers, string delimiter = ",", char quoteChar = '"', ILogger? logger = null)
|
||||
: this(tableName, headers, delimiter, quoteChar, logger)
|
||||
{
|
||||
Reader = new Lazy<StreamReader>(() => new StreamReader(stream));
|
||||
Reader = new Lazy<StreamReader>(() => new StreamReader(stream),false);
|
||||
}
|
||||
|
||||
public CsvReader(string filePath, string tableName, string[] headers, string delimiter = ",", char quoteChar = '"', ILogger? logger = null)
|
||||
: this(tableName, headers, delimiter, quoteChar, logger)
|
||||
{
|
||||
var fs = File.OpenRead(filePath);
|
||||
FilePath = filePath;
|
||||
Reader = new Lazy<StreamReader>(() => new StreamReader(fs));
|
||||
Reader = new Lazy<StreamReader>(() =>
|
||||
{
|
||||
_stream = File.OpenRead(filePath);
|
||||
return new StreamReader(_stream);
|
||||
});
|
||||
}
|
||||
|
||||
private CsvReader(string tableName, string[] headers, string delimiter = ",", char quoteChar = '"', ILogger? logger = null)
|
||||
@@ -50,45 +53,44 @@ public class CsvReader : IDataReader
|
||||
if (string.IsNullOrWhiteSpace(str))
|
||||
return false;
|
||||
|
||||
CurrentRaw = str;
|
||||
|
||||
var fields = ParseRow(str, QuoteChar, Delimiter);
|
||||
Current = new DataRecord(fields, TableName, Headers){RawField = str};
|
||||
var fields = ParseRowFaster(str, QuoteChar, Delimiter[0]);
|
||||
Current = new DataRecord(fields, TableName, Headers);
|
||||
return true;
|
||||
}
|
||||
|
||||
public string[] ParseRow(ReadOnlySpan<char> source, char quoteChar, string delimiter)
|
||||
public static string[] ParseRow(ReadOnlySpan<char> source, char quoteChar, char delimiter)
|
||||
{
|
||||
var result = new List<string>();
|
||||
var index = -1;
|
||||
var current = new StringBuilder();
|
||||
var current = new StringBuilder(source.Length);
|
||||
var hasQuote = false;
|
||||
var hasSlash = false;
|
||||
while (index < source.Length - 1)
|
||||
{
|
||||
index++;
|
||||
if (hasSlash == false && source[index] == '\\')
|
||||
var currChar = source[index];
|
||||
if (hasSlash == false && currChar == '\\')
|
||||
{
|
||||
hasSlash = true;
|
||||
current.Append('\\');
|
||||
continue;
|
||||
}
|
||||
|
||||
if (hasSlash == false && source[index] == quoteChar)
|
||||
if (hasSlash == false && currChar == quoteChar)
|
||||
{
|
||||
hasQuote = !hasQuote;
|
||||
current.Append(source[index]);
|
||||
current.Append(currChar);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (hasQuote == false && source[index] == delimiter[0])
|
||||
if (hasQuote == false && currChar == delimiter)
|
||||
{
|
||||
result.Add(current.ToString());
|
||||
current.Clear();
|
||||
}
|
||||
else
|
||||
{
|
||||
current.Append(source[index]);
|
||||
current.Append(currChar);
|
||||
}
|
||||
|
||||
hasSlash = false;
|
||||
@@ -98,9 +100,61 @@ public class CsvReader : IDataReader
|
||||
return result.ToArray();
|
||||
}
|
||||
|
||||
public static List<string> ParseRowFaster(ReadOnlySpan<char> source, char quoteChar, char delimiter, int columnCount = 10)
|
||||
{
|
||||
var result = new List<string>(columnCount);
|
||||
var index = -1;
|
||||
var hasQuote = false;
|
||||
var hasSlash = false;
|
||||
var start = 0;
|
||||
var end = 0;
|
||||
var len = source.Length - 1;
|
||||
while (index < len)
|
||||
{
|
||||
++index;
|
||||
var currChar = source[index];
|
||||
|
||||
if (!hasSlash)
|
||||
{
|
||||
if (currChar is '\\')
|
||||
{
|
||||
hasSlash = true;
|
||||
++end;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (currChar == quoteChar)
|
||||
{
|
||||
hasQuote = !hasQuote;
|
||||
++end;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (!hasQuote && currChar == delimiter)
|
||||
{
|
||||
result.Add(source[start..(end)].ToString());
|
||||
start = end + 1;
|
||||
++end;
|
||||
}
|
||||
else
|
||||
{
|
||||
++end;
|
||||
}
|
||||
|
||||
hasSlash = false;
|
||||
}
|
||||
|
||||
result.Add(source[start..end].ToString());
|
||||
return result;
|
||||
}
|
||||
|
||||
public virtual void Dispose()
|
||||
{
|
||||
if(Reader.IsValueCreated)
|
||||
if (Reader.IsValueCreated)
|
||||
{
|
||||
Reader.Value.Dispose();
|
||||
_stream?.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,7 +1,9 @@
|
||||
using System.Text;
|
||||
using System.Text.RegularExpressions;
|
||||
using MesETL.App.Const;
|
||||
using MesETL.App.Helpers;
|
||||
using MesETL.App.Options;
|
||||
using MesETL.Shared.Helper;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using MySqlConnector;
|
||||
@@ -60,11 +62,11 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
||||
return;
|
||||
|
||||
var cmd = _conn.CreateCommand();
|
||||
cmd.CommandTimeout = 3 * 60;
|
||||
cmd.CommandTimeout = 0;
|
||||
|
||||
try
|
||||
{
|
||||
var excuseList = GetExcuseList(_recordCache, maxAllowPacket).ToList();
|
||||
var excuseList = GetExcuseList(_recordCache, maxAllowPacket);
|
||||
foreach (var insertSql in excuseList)
|
||||
{
|
||||
cmd.CommandText = insertSql;
|
||||
@@ -104,6 +106,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
||||
public IEnumerable<string> GetExcuseList(IDictionary<string, IList<DataRecord>> tableRecords,int maxAllowPacket)
|
||||
{
|
||||
var sb = new StringBuilder("SET AUTOCOMMIT = 1;\n");
|
||||
var appendCount = 0;
|
||||
foreach (var (tableName, records) in tableRecords)
|
||||
{
|
||||
if (records.Count == 0)
|
||||
@@ -137,9 +140,10 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
||||
|
||||
// 在这里处理特殊列
|
||||
#region HandleFields
|
||||
if (field.Length == 2 && field == @"\N") // MyDumper NULL
|
||||
|
||||
if (field.Length == 2 && field == ConstVar.MyDumperNull) // MyDumper导出的NULL为'\N'('\'不是转义字符)
|
||||
{
|
||||
recordSb.Append("NULL");
|
||||
recordSb.Append(ConstVar.Null);
|
||||
goto Escape;
|
||||
}
|
||||
|
||||
@@ -148,14 +152,18 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
||||
case ColumnType.Text:
|
||||
if(string.IsNullOrEmpty(field))
|
||||
recordSb.Append("''");
|
||||
else if (field == ConstVar.Null)
|
||||
recordSb.Append(ConstVar.Null);
|
||||
else recordSb.Append($"_utf8mb4 0x{field}");
|
||||
break;
|
||||
case ColumnType.Blob:
|
||||
if (string.IsNullOrEmpty(field))
|
||||
recordSb.Append("''");
|
||||
else if (field == ConstVar.Null)
|
||||
recordSb.Append(ConstVar.Null);
|
||||
else recordSb.Append($"0x{field}");
|
||||
break;
|
||||
case ColumnType.Json:
|
||||
case ColumnType.Json:// 生产库没有JSON列,仅用于测试库进行测试
|
||||
if(string.IsNullOrEmpty(field))
|
||||
recordSb.Append("'[]'"); // JObject or JArray?
|
||||
else if (_options.Value.TreatJsonAsHex)
|
||||
@@ -180,6 +188,14 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
||||
// 若字符数量即将大于限制,则返回SQL,清空StringBuilder,保留当前记录的索引值,然后转到StartBuild标签重新开始一轮INSERT
|
||||
if (sb.Length + recordSb.Length + 23 > maxAllowPacket)
|
||||
{
|
||||
if (appendCount == 0) // 如果单条记录超出maxAllowedPacket
|
||||
{
|
||||
sb.Append(recordSb);
|
||||
_logger.LogWarning("{Table}表单条数据的SQL超出了配置的MaxAllowedPacket,字符数{Count}", tableName,
|
||||
sb.Length + recordSb.Length + 23);
|
||||
}
|
||||
|
||||
TryAddForUpdateSuffix(tableName, sb);
|
||||
sb.Append(';').AppendLine();
|
||||
sb.Append("SET AUTOCOMMIT = 1;");
|
||||
yield return sb.ToString();
|
||||
@@ -191,8 +207,10 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
||||
sb.Append(',').AppendLine();
|
||||
noCommas = false;
|
||||
sb.Append(recordSb); // StringBuilder.Append(StringBuilder)不会分配多余的内存
|
||||
appendCount++;
|
||||
}
|
||||
|
||||
TryAddForUpdateSuffix(tableName, sb);
|
||||
sb.Append(';');
|
||||
sb.Append("COMMIT;");
|
||||
yield return sb.ToString();
|
||||
@@ -200,6 +218,23 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 数据必须是同一张表
|
||||
/// </summary>
|
||||
/// <param name="tableName"></param>
|
||||
/// <param name="sb"></param>
|
||||
private void TryAddForUpdateSuffix(string tableName, StringBuilder sb)
|
||||
{
|
||||
var forUpdate = _options.Value.TryGetForUpdate(tableName, out var forUpdateSql);
|
||||
if (forUpdate)
|
||||
{
|
||||
sb.AppendLine($"""
|
||||
AS new
|
||||
ON DUPLICATE KEY UPDATE
|
||||
{forUpdateSql}
|
||||
""");
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
|
@@ -9,20 +9,24 @@ namespace MesETL.App.Services.ETL;
|
||||
public class ZstReader : CsvReader
|
||||
{
|
||||
protected new readonly Lazy<StreamReader> Reader;
|
||||
private Stream? _stream;
|
||||
|
||||
|
||||
public ZstReader(string filePath, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null)
|
||||
: base(filePath, tableName, headers, delimiter, quoteChar, logger)
|
||||
{
|
||||
var ds = new DecompressionStream(File.OpenRead(filePath));
|
||||
Reader = new Lazy<StreamReader>(() => new StreamReader(ds));
|
||||
Reader = new Lazy<StreamReader>(() =>
|
||||
{
|
||||
_stream = new DecompressionStream(File.OpenRead(filePath));
|
||||
return new StreamReader(_stream);
|
||||
}, false);
|
||||
}
|
||||
|
||||
public ZstReader(Stream stream, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null)
|
||||
: base(stream, tableName, headers, delimiter, quoteChar, logger)
|
||||
{
|
||||
var ds = new DecompressionStream(stream);
|
||||
Reader = new Lazy<StreamReader>(() => new StreamReader(ds));
|
||||
Reader = new Lazy<StreamReader>(() => new StreamReader(ds), false);
|
||||
}
|
||||
|
||||
public override async ValueTask<bool> ReadAsync()
|
||||
@@ -31,18 +35,19 @@ public class ZstReader : CsvReader
|
||||
if (string.IsNullOrWhiteSpace(str))
|
||||
return false;
|
||||
|
||||
CurrentRaw = str;
|
||||
|
||||
var fields = ParseRow(str, QuoteChar, Delimiter);
|
||||
Current = new DataRecord(fields, TableName, Headers) {RawField = str};
|
||||
var fields = ParseRowFaster(str, QuoteChar, Delimiter[0]);
|
||||
Current = new DataRecord(fields, TableName, Headers);
|
||||
return true;
|
||||
}
|
||||
|
||||
public override void Dispose()
|
||||
{
|
||||
base.Dispose();
|
||||
if(Reader.IsValueCreated)
|
||||
if (Reader.IsValueCreated)
|
||||
{
|
||||
Reader.Value.Dispose();
|
||||
_stream?.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@@ -1,4 +1,5 @@
|
||||
using MesETL.App.Helpers;
|
||||
using MesETL.Shared.Helper;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace MesETL.App.Services.ErrorRecorder;
|
||||
@@ -24,7 +25,7 @@ public class ErrorRecorder
|
||||
Directory.CreateDirectory(outputDir);
|
||||
var content = $"""
|
||||
### {exception.Message}
|
||||
{record.RawField}
|
||||
{string.Join(',', record.Fields)}
|
||||
""";
|
||||
var path = Path.Combine(outputDir, $"{record.TableName}.errlog");
|
||||
await File.AppendAllTextAsync(path, content);
|
||||
@@ -57,7 +58,7 @@ public class ErrorRecorder
|
||||
var content =
|
||||
$"""
|
||||
### {exception.Message}
|
||||
{record.RawField}
|
||||
{string.Join(',', record.Fields)}
|
||||
""";
|
||||
await writer.WriteLineAsync(content);
|
||||
if (token.IsCancellationRequested)
|
||||
|
@@ -1,4 +1,5 @@
|
||||
using MesETL.App.Cache;
|
||||
using System.Text;
|
||||
using MesETL.App.Cache;
|
||||
|
||||
namespace MesETL.App.Services.Loggers;
|
||||
|
||||
|
@@ -11,7 +11,7 @@ public class ProcessContext
|
||||
private long _inputCount;
|
||||
private long _transformCount;
|
||||
private long _outputCount;
|
||||
private readonly ConcurrentDictionary<string, long> _tableProgress = new();
|
||||
private readonly ConcurrentDictionary<string, (long input, long output)> _tableProgress = new();
|
||||
public bool HasException => _hasException;
|
||||
public bool IsInputCompleted { get; private set; }
|
||||
public bool IsTransformCompleted { get; private set; }
|
||||
@@ -37,7 +37,7 @@ public class ProcessContext
|
||||
|
||||
|
||||
// TableName -> Count
|
||||
public IReadOnlyDictionary<string, long> TableProgress => _tableProgress;
|
||||
public IReadOnlyDictionary<string, (long input, long output)> TableProgress => _tableProgress;
|
||||
|
||||
public void CompleteInput() => IsInputCompleted = true;
|
||||
|
||||
@@ -55,16 +55,21 @@ public class ProcessContext
|
||||
public void AddOutput() => Interlocked.Increment(ref _outputCount);
|
||||
public void AddOutput(int count) => Interlocked.Add(ref _outputCount, count);
|
||||
|
||||
public void AddTableOutput(string table, int count)
|
||||
public void AddTableInput(string table, int count)
|
||||
{
|
||||
_tableProgress.AddOrUpdate(table, count, (k, v) => v + count);
|
||||
AddOutput(count);
|
||||
_tableProgress.AddOrUpdate(table, (input:count, output:0), (k, tuple) =>
|
||||
{
|
||||
tuple.input += count;
|
||||
return tuple;
|
||||
});
|
||||
}
|
||||
|
||||
public long GetTableOutput(string table)
|
||||
public void AddTableOutput(string table, int count)
|
||||
{
|
||||
if(!_tableProgress.TryGetValue(table, out var count))
|
||||
throw new ApplicationException($"未找到表{table}输出记录");
|
||||
return count;
|
||||
_tableProgress.AddOrUpdate(table, (input:0, output:count), (k, tuple) =>
|
||||
{
|
||||
tuple.output += count;
|
||||
return tuple;
|
||||
});
|
||||
}
|
||||
}
|
@@ -9,7 +9,8 @@ public class RecordQueuePool
|
||||
|
||||
public IReadOnlyDictionary<string, DataRecordQueue> Queues => _queues;
|
||||
|
||||
public void AddQueue(string key, int boundedCapacity = 200_0000) => AddQueue(key, new DataRecordQueue(boundedCapacity));
|
||||
public void AddQueue(string key, int boundedCapacity = 200_0000, long maxCharCount = 2_147_483_648)
|
||||
=> AddQueue(key, new DataRecordQueue(boundedCapacity, maxCharCount));
|
||||
|
||||
public void AddQueue(string key, DataRecordQueue queue)
|
||||
{
|
||||
|
@@ -1,5 +1,5 @@
|
||||
using ApplicationException = System.ApplicationException;
|
||||
using TaskExtensions = MesETL.App.Helpers.TaskExtensions;
|
||||
using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
|
||||
|
||||
namespace MesETL.App.Services;
|
||||
|
||||
|
@@ -1,13 +1,18 @@
|
||||
{
|
||||
"MemoryThreshold": 8,
|
||||
"GCIntervalMilliseconds": -1,
|
||||
"UnsafeVariable": false,
|
||||
"Logging": {
|
||||
"LogLevel": {
|
||||
"Default": "Debug"
|
||||
}
|
||||
},
|
||||
"Input":{
|
||||
"InputDir": "D:\\Dump\\MockData", // Csv数据输入目录
|
||||
"InputDir": "D:\\Dump\\NewMockData", // Csv数据输入目录
|
||||
"UseMock": false, // 使用模拟数据进行测试
|
||||
"MockCountMultiplier": 0.5 // 模拟数据量级的乘数
|
||||
"MockCountMultiplier": 1, // 模拟数据量级的乘数
|
||||
"TableOrder": ["order", "order_data_parts"], // 按顺序输入的表
|
||||
"TableIgnoreList": [] // 忽略输入的表
|
||||
},
|
||||
"Transform":{
|
||||
"StrictMode": false, // 设为true时如果数据转换发生错误,立刻停止程序
|
||||
@@ -21,7 +26,17 @@
|
||||
"MaxAllowedPacket": 67108864,
|
||||
"FlushCount": 10000, // 每次提交记录条数
|
||||
"MaxDatabaseOutputTask" : 4, // 每个数据库最大提交任务数
|
||||
"TreatJsonAsHex": false, // 将json列作为16进制格式输出(0x前缀)
|
||||
"TreatJsonAsHex": false, // 将json列作为16进制格式输出(0x前缀),生产库是没有json列的
|
||||
"NoOutput": ["order"],
|
||||
"ForUpdate":
|
||||
{
|
||||
"order_data_parts": "CompanyID = new.CompanyID"
|
||||
}
|
||||
},
|
||||
"RecordQueue":{
|
||||
"ProducerQueueLength": 50000, // 输入队列最大长度
|
||||
"ConsumerQueueLength": 10000, // 每个输出队列最大长度
|
||||
"MaxByteCount": 3221225472 // 队列最大字节数
|
||||
},
|
||||
"RedisCache": {
|
||||
"Configuration": "192.168.1.246:6380",
|
||||
@@ -30,7 +45,7 @@
|
||||
"TenantDb": // 分库配置
|
||||
{
|
||||
"TenantKey" : "CompanyID",
|
||||
"UseDbGroup": "test",
|
||||
"UseDbGroup": "prod",
|
||||
"DbGroups": {
|
||||
"test": {
|
||||
"cferp_test_1": 1000,
|
||||
|
14
MesETL.Clean/MesETL.Clean.csproj
Normal file
14
MesETL.Clean/MesETL.Clean.csproj
Normal file
@@ -0,0 +1,14 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\MesETL.Shared\MesETL.Shared.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
55
MesETL.Clean/Program.cs
Normal file
55
MesETL.Clean/Program.cs
Normal file
@@ -0,0 +1,55 @@
|
||||
using MesETL.Shared.Helper;
|
||||
|
||||
var connStr = GetArg("-s") ?? throw new ApplicationException("未配置数据库连接字符串");
|
||||
var eachLimit = int.Parse(GetArg("-l") ?? "1000");
|
||||
var parallelTask = int.Parse(GetArg("-p") ?? "4");
|
||||
|
||||
var deletionCount = 0;
|
||||
|
||||
Console.WriteLine("Running Deletion...");
|
||||
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
await Task.Delay(5000);
|
||||
Console.WriteLine($"[{DateTime.Now}] DELETE COUNT: {deletionCount}");
|
||||
}
|
||||
});
|
||||
|
||||
await Parallel.ForAsync(0, parallelTask, async (i, token) =>
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
var effectRows = await DatabaseHelper.NonQueryAsync(connStr,
|
||||
$"DELETE FROM `order_data_block` WHERE CompanyID = 0 ORDER BY ID LIMIT {eachLimit};", token);
|
||||
if(effectRows == 0)
|
||||
break;
|
||||
Interlocked.Add(ref deletionCount, effectRows);
|
||||
}
|
||||
});
|
||||
|
||||
Console.WriteLine($"[{DateTime.Now}] DELETE COUNT: {deletionCount}");
|
||||
return;
|
||||
string? GetArg(string instruct)
|
||||
{
|
||||
var idx = Array.IndexOf(args, instruct);
|
||||
if (idx == -1)
|
||||
return null;
|
||||
if (args[idx + 1].StartsWith('-'))
|
||||
throw new ArgumentException("Argument Lost", nameof(instruct));
|
||||
return args[idx + 1];
|
||||
}
|
||||
|
||||
// var match = await DatabaseHelper.QueryTableAsync(connStr,
|
||||
// $"SELECT `ID` FROM `order_data_block` WHERE CompanyID = 0 LIMIT {eachLimit};",
|
||||
// token);
|
||||
// var rows = match.Tables[0].Rows;
|
||||
// if (rows.Count == 0)
|
||||
// return;
|
||||
//
|
||||
// foreach (DataRow row in rows)
|
||||
// {
|
||||
// var id = row["ID"].ToString();
|
||||
// await DatabaseHelper.NonQueryAsync(connStr, $"DELETE FROM `order_data_block` WHERE `ID` = {id}", token);
|
||||
// }
|
@@ -1,15 +1,25 @@
|
||||
using System.Data;
|
||||
using MySqlConnector;
|
||||
|
||||
namespace MesETL.App.Helpers;
|
||||
namespace MesETL.Shared.Helper;
|
||||
|
||||
public static class DatabaseHelper
|
||||
{
|
||||
public static async Task<DataSet> QueryTableAsync(string connStr, string sql)
|
||||
public static MySqlConnection CreateConnection(string connStr)
|
||||
{
|
||||
await using var conn = new MySqlConnection(connStr);
|
||||
var newConnStr = new MySqlConnectionStringBuilder(connStr)
|
||||
{
|
||||
ConnectionTimeout = 30,
|
||||
DefaultCommandTimeout = 0,
|
||||
}.ConnectionString;
|
||||
return new MySqlConnection(newConnStr);
|
||||
}
|
||||
|
||||
public static async Task<DataSet> QueryTableAsync(string connStr, string sql, CancellationToken ct = default)
|
||||
{
|
||||
await using var conn = CreateConnection(connStr);
|
||||
if(conn.State is not ConnectionState.Open)
|
||||
await conn.OpenAsync();
|
||||
await conn.OpenAsync(ct);
|
||||
await using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = sql;
|
||||
var ds = new DataSet();
|
||||
@@ -17,29 +27,29 @@ public static class DatabaseHelper
|
||||
return ds;
|
||||
}
|
||||
|
||||
public static async Task<object?> QueryScalarAsync(string connStr, string sql)
|
||||
public static async Task<object?> QueryScalarAsync(string connStr, string sql, CancellationToken ct = default)
|
||||
{
|
||||
await using var conn = new MySqlConnection(connStr);
|
||||
await using var conn = CreateConnection(connStr);
|
||||
if(conn.State is not ConnectionState.Open)
|
||||
await conn.OpenAsync();
|
||||
await conn.OpenAsync(ct);
|
||||
await using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = sql;
|
||||
return await cmd.ExecuteScalarAsync();
|
||||
return await cmd.ExecuteScalarAsync(ct);
|
||||
}
|
||||
|
||||
public static async Task<int> NonQueryAsync(string connStr, string sql)
|
||||
public static async Task<int> NonQueryAsync(string connStr, string sql, CancellationToken ct = default)
|
||||
{
|
||||
await using var conn = new MySqlConnection(connStr);
|
||||
await using var conn = CreateConnection(connStr);
|
||||
if(conn.State is not ConnectionState.Open)
|
||||
await conn.OpenAsync();
|
||||
await conn.OpenAsync(ct);
|
||||
await using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = sql;
|
||||
return await cmd.ExecuteNonQueryAsync();
|
||||
return await cmd.ExecuteNonQueryAsync(ct);
|
||||
}
|
||||
|
||||
public static async Task<int> TransactionAsync(string connStr, string sql, params MySqlParameter[] parameters)
|
||||
{
|
||||
await using var conn = new MySqlConnection(connStr);
|
||||
await using var conn = CreateConnection(connStr);
|
||||
if(conn.State is not ConnectionState.Open)
|
||||
await conn.OpenAsync();
|
||||
await using var trans = await conn.BeginTransactionAsync();
|
@@ -1,4 +1,4 @@
|
||||
namespace MesETL.App.Helpers;
|
||||
namespace MesETL.Shared.Helper;
|
||||
|
||||
public static class DictionaryExtensions
|
||||
{
|
@@ -1,7 +1,7 @@
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using System.Reflection;
|
||||
|
||||
namespace MesETL.App.Helpers;
|
||||
namespace MesETL.Shared.Helper;
|
||||
#nullable disable
|
||||
public static class EnumerableExtensions
|
||||
{
|
@@ -1,7 +1,7 @@
|
||||
using System.Globalization;
|
||||
using System.Text;
|
||||
|
||||
namespace MesETL.App.Helpers;
|
||||
namespace MesETL.Shared.Helper;
|
||||
|
||||
public static class StringExtensions
|
||||
{
|
@@ -1,4 +1,4 @@
|
||||
namespace MesETL.App.Helpers;
|
||||
namespace MesETL.Shared.Helper;
|
||||
|
||||
public static class TaskExtensions
|
||||
{
|
13
MesETL.Shared/MesETL.Shared.csproj
Normal file
13
MesETL.Shared/MesETL.Shared.csproj
Normal file
@@ -0,0 +1,13 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="MySqlConnector" Version="2.3.5" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
@@ -1,5 +1,6 @@
|
||||
using System.Data;
|
||||
using MesETL.App.Helpers;
|
||||
using MesETL.Shared.Helper;
|
||||
using MySqlConnector;
|
||||
using Xunit.Abstractions;
|
||||
|
||||
|
@@ -1,6 +1,7 @@
|
||||
using System.Data;
|
||||
using System.Text;
|
||||
using MesETL.App.Helpers;
|
||||
using MesETL.Shared.Helper;
|
||||
using MySqlConnector;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
@@ -107,6 +108,24 @@ public class DatabaseToolBox
|
||||
}).ToArray();
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(["mesdb_1"])]
|
||||
[InlineData(["mesdb_2"])]
|
||||
[InlineData(["mesdb_3"])]
|
||||
[InlineData(["mesdb_4"])]
|
||||
[InlineData(["mesdb_5"])]
|
||||
public async Task ShowIndex(string database)
|
||||
{
|
||||
var indexes = await GetAllTableIndexes(database);
|
||||
var sb = new StringBuilder();
|
||||
foreach (var (tableName, indexName, isUnique, columnName, tableIndexType) in indexes!)
|
||||
{
|
||||
sb.AppendLine($"Drop {(isUnique ? "UNIQUE" : string.Empty)} INDEX `{indexName}` ON `{database}`.`{tableName}`;");
|
||||
}
|
||||
|
||||
_output.WriteLine(sb.ToString());
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(["cferp_test_1", "D:/Indexes_cferp_test_1.json"])]
|
||||
[InlineData(["cferp_test_2", "D:/Indexes_cferp_test_2.json"])]
|
||||
|
@@ -1,4 +1,5 @@
|
||||
using MesETL.App.Helpers;
|
||||
using MesETL.Shared.Helper;
|
||||
|
||||
namespace TestProject1;
|
||||
|
||||
|
152
MesETL.Test/Test.cs
Normal file
152
MesETL.Test/Test.cs
Normal file
@@ -0,0 +1,152 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Diagnostics;
|
||||
using MesETL.App.Services.ETL;
|
||||
using MesETL.Shared.Helper;
|
||||
using Xunit.Abstractions;
|
||||
using ZstdSharp;
|
||||
|
||||
namespace TestProject1;
|
||||
|
||||
public class Test
|
||||
{
|
||||
private readonly ITestOutputHelper _output;
|
||||
|
||||
public Test(ITestOutputHelper output)
|
||||
{
|
||||
_output = output;
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData([@"D:\Dump\NewMockData2\cferp.order_box_block.00000.dat.zst"])]
|
||||
public async Task ZstdDecompressTest(string inputFile)
|
||||
{
|
||||
var count = 0;
|
||||
var flag = true;
|
||||
var sw = Stopwatch.StartNew();
|
||||
var reader = new StreamReader(new DecompressionStream(File.OpenRead(inputFile)));
|
||||
var monitor = Task.Run(async () =>
|
||||
{
|
||||
var lastElapse = sw.ElapsedMilliseconds;
|
||||
var lastCount = 0;
|
||||
while (flag)
|
||||
{
|
||||
await Task.Delay(2000);
|
||||
|
||||
_output.WriteLine($"speed: {(count - lastCount) / ((sw.ElapsedMilliseconds - lastElapse) / 1000f)}");
|
||||
lastElapse = sw.ElapsedMilliseconds;
|
||||
lastCount = count;
|
||||
}
|
||||
});
|
||||
while (!reader.EndOfStream)
|
||||
{
|
||||
var str = await reader.ReadLineAsync();
|
||||
char a;
|
||||
// foreach (var c in str)
|
||||
// {
|
||||
// a = c;
|
||||
// }
|
||||
CsvReader.ParseRowFaster(str, '"', ',');
|
||||
count++;
|
||||
}
|
||||
|
||||
flag = false;
|
||||
monitor.Wait();
|
||||
}
|
||||
|
||||
|
||||
public static IEnumerable<object[]> ParseRowData()
|
||||
{
|
||||
yield return
|
||||
[@"20220104020855,""2022-01-04 10:06:46"",1455,""0001-01-01 00:00:00"",""1"",0,""2"",""0"",\N,""0"",22010"];
|
||||
yield return
|
||||
[@"20220104020858,""2022-01-04 15:08:22"",1455,""0001-01-01 00:00:00"",""1"",838,""2"",""0"",""5"",""0"",22010"];
|
||||
yield return
|
||||
[@"5586326,20220104020855,220105981029,""1"",482278,482279,3768774,0,0,""1.000"",1455,22010"];
|
||||
yield return
|
||||
[@"130658,""PD220104002302"",3,4616,""2022-01-04 15:10:40"",1443,""2022-01-04 15:10:40"",""2022-01-04 15:10:51"",0,"""",0,1455,""0001-01-01 00:00:00"",1,5B32303232303130343032303835385D,E590B8E5A1912D2DE590B8E5A1912D2D31382D2D323030302A3630302D2D3130E789872D2D352E3936333B6361696C69616F2D2D79616E73652D2D392D2D323031302A313137342D2D31E789872D2D322E3336,""0"",0"];
|
||||
}
|
||||
[Theory]
|
||||
[MemberData(nameof(ParseRowData))]
|
||||
public void ParseRowFasterTest(string row)
|
||||
{
|
||||
var fields = CsvReader.ParseRowFaster(row, '"', ',');
|
||||
_output.WriteLine(string.Join(',', fields));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void DictMemoryTest()
|
||||
{
|
||||
var dict = new ConcurrentDictionary<string, string>();
|
||||
for (int i = 0; i < 3000000; i++)
|
||||
{
|
||||
dict.AddOrUpdate(Guid.NewGuid().ToString(), Random.Shared.NextInt64(1000000000L, 9999999999L).ToString(), (_, __) => Random.Shared.NextInt64(1000000000L, 9999999999L).ToString());
|
||||
}
|
||||
|
||||
while (true)
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetResult()
|
||||
{
|
||||
var input =
|
||||
"""
|
||||
machine: 19303/19061
|
||||
order: 3416759/3415192
|
||||
order_block_plan: 2934281/1968850
|
||||
order_block_plan_item: 0/235927707
|
||||
order_block_plan_result: 1375479/277667
|
||||
order_box_block: 23457666/23450841
|
||||
order_data_block: 513012248/513012248
|
||||
order_data_goods: 18655270/18655270
|
||||
order_data_parts: 353139066/353139066
|
||||
order_item: 955274320/955274320
|
||||
order_module: 102907480/56935691
|
||||
order_module_extra: 40044077/40044077
|
||||
order_module_item: 49209022/49209022
|
||||
order_package: 12012712/12012712
|
||||
order_package_item: 0/80605124
|
||||
order_process: 4045309/2682043
|
||||
order_process_step: 8343418/5505158
|
||||
order_process_step_item: 14856509/9787696
|
||||
order_scrap_board: 136096/136090
|
||||
process_group: 1577/1543
|
||||
process_info: 9212/9008
|
||||
process_item_exp: 30/30
|
||||
process_schdule_capacity: 42442/42442
|
||||
process_step_efficiency: 8/8
|
||||
report_template: 7358/7338
|
||||
simple_package: 142861/137730
|
||||
simple_plan_order: 1167004/854699
|
||||
simple_plan_order: 0/55677
|
||||
sys_config: 2608/2608
|
||||
work_calendar: 11/11
|
||||
work_shift: 73/73
|
||||
work_time: 77/77
|
||||
order_process_step_item: 14856509/9790701
|
||||
order_process_step: 8343418/5506925
|
||||
order_module: 102907480/56935691
|
||||
order_process: 4045309/2682043
|
||||
report_template: 7358/7358
|
||||
process_info: 9212/9212
|
||||
process_group: 1577/1577
|
||||
order_block_plan_result: 1375479/277667
|
||||
order_box_block: 23457666/23457666
|
||||
order_block_plan: 2934281/1968850
|
||||
order: 3416759/3416759
|
||||
machine: 19303/19303
|
||||
order_scrap_board: 136096/136096
|
||||
""";
|
||||
|
||||
var arr = input.Split('\n').Select(s =>
|
||||
{
|
||||
var x = s.Split(':');
|
||||
var y = x[1].Split('/').Select(i => long.Parse(i)).ToArray();
|
||||
return new {TABLE_NAME = x[0], INPUT = y[0], OUTPUT = y[1], FILTER = y[0] - y[1]};
|
||||
}).OrderBy(s => s.TABLE_NAME);
|
||||
|
||||
_output.WriteLine(arr.ToMarkdownTable());
|
||||
}
|
||||
}
|
18
MesETL.sln
18
MesETL.sln
@@ -4,6 +4,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.App", "MesETL.App\Me
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Test", "MesETL.Test\MesETL.Test.csproj", "{8679D5B6-5853-446E-9882-7B7A8E270500}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Mesdb.Cli", "Mesdb.Cli\Mesdb.Cli.csproj", "{68307B05-3D66-4322-A42F-C044C1E8BA3B}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Shared", "MesETL.Shared\MesETL.Shared.csproj", "{FE134001-0E22-458B-BEF2-29712A29087E}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Clean", "MesETL.Clean\MesETL.Clean.csproj", "{E1B2BED0-EBA6-4A14-BAD5-8EC4E528D7E0}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
@@ -18,5 +24,17 @@ Global
|
||||
{8679D5B6-5853-446E-9882-7B7A8E270500}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{8679D5B6-5853-446E-9882-7B7A8E270500}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{8679D5B6-5853-446E-9882-7B7A8E270500}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{68307B05-3D66-4322-A42F-C044C1E8BA3B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{68307B05-3D66-4322-A42F-C044C1E8BA3B}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{68307B05-3D66-4322-A42F-C044C1E8BA3B}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{68307B05-3D66-4322-A42F-C044C1E8BA3B}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{FE134001-0E22-458B-BEF2-29712A29087E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{FE134001-0E22-458B-BEF2-29712A29087E}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{FE134001-0E22-458B-BEF2-29712A29087E}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{FE134001-0E22-458B-BEF2-29712A29087E}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{E1B2BED0-EBA6-4A14-BAD5-8EC4E528D7E0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{E1B2BED0-EBA6-4A14-BAD5-8EC4E528D7E0}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{E1B2BED0-EBA6-4A14-BAD5-8EC4E528D7E0}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{E1B2BED0-EBA6-4A14-BAD5-8EC4E528D7E0}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
EndGlobal
|
||||
|
52
Mesdb.Cli/BatchDbExtensions.cs
Normal file
52
Mesdb.Cli/BatchDbExtensions.cs
Normal file
@@ -0,0 +1,52 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Data;
|
||||
using MesETL.Shared.Helper;
|
||||
|
||||
namespace Mesdb.Cli;
|
||||
|
||||
public static class BatchDbExtensions
|
||||
{
|
||||
public static async Task<IDictionary<string, IDictionary<string,long>>> CountDatabasesAsync(string connStr, IList<string> dbNames, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var result = new ConcurrentDictionary<string, IDictionary<string,long>>();
|
||||
var tables = await DatabaseHelper.QueryTableAsync(connStr,
|
||||
$"""
|
||||
SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbNames[0]}';
|
||||
""");
|
||||
|
||||
await Parallel.ForEachAsync(dbNames, async (dbName, ct) =>
|
||||
{
|
||||
await Parallel.ForEachAsync(tables.Tables[0].Rows.Cast<DataRow>(), async (row, ct) =>
|
||||
{
|
||||
var tableName = row[0].ToString()!;
|
||||
var count = (long)(await DatabaseHelper.QueryScalarAsync(connStr,
|
||||
$"SELECT COUNT(1) FROM `{dbName}`.`{tableName}`;", ct))!;
|
||||
result.AddOrUpdate(dbName, new ConcurrentDictionary<string, long>(), (db, dict) =>
|
||||
{
|
||||
dict.AddOrUpdate(tableName, count, (table, num) => num + count);
|
||||
return dict;
|
||||
});
|
||||
});
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
public static async Task AnalyzeAllAsync(string connStr, IList<string> dbNames)
|
||||
{
|
||||
var tables = await DatabaseHelper.QueryTableAsync(connStr,
|
||||
$"""
|
||||
SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbNames[0]}';
|
||||
""");
|
||||
|
||||
await Parallel.ForEachAsync(dbNames, async (dbName, ct) =>
|
||||
{
|
||||
await Parallel.ForEachAsync(tables.Tables[0].Rows.Cast<DataRow>(), async (row, ct) =>
|
||||
{
|
||||
var tableName = row[0].ToString()!;
|
||||
var result = (await DatabaseHelper.QueryTableAsync(connStr,
|
||||
$"ANALYZE TABLE `{dbName}`.`{tableName}`;", ct));
|
||||
Console.WriteLine(string.Join('\t', result.Tables[0].Rows[0].ItemArray.Select(x => x.ToString())));
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
22
Mesdb.Cli/Mesdb.Cli.csproj
Normal file
22
Mesdb.Cli/Mesdb.Cli.csproj
Normal file
@@ -0,0 +1,22 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Cocona" Version="2.2.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
|
||||
<PackageReference Include="Serilog" Version="4.0.0-dev-02108" />
|
||||
<PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" />
|
||||
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\MesETL.Shared\MesETL.Shared.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
52
Mesdb.Cli/Program.cs
Normal file
52
Mesdb.Cli/Program.cs
Normal file
@@ -0,0 +1,52 @@
|
||||
using Cocona;
|
||||
using MesETL.Shared.Helper;
|
||||
using Mesdb.Cli;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
|
||||
var host = Host.CreateApplicationBuilder(args);
|
||||
host.Configuration.AddCommandLine(args, new Dictionary<string, string>
|
||||
{
|
||||
{ "-s", "ConnectionString" },
|
||||
{ "--ConnectionString", "ConnectionString" },
|
||||
{ "-B", "Databases" },
|
||||
{ "--Databases", "Databases" },
|
||||
{ "-a", "All" },
|
||||
{ "-c", "Command"},
|
||||
{ "--Command", "Command" },
|
||||
{ "--Sql", "Command" }
|
||||
});
|
||||
host.Build();
|
||||
var connStr = host.Configuration.GetValue<string>("ConnectionString") ?? throw new ApplicationException("没有配置数据库连接字符串");
|
||||
var databases = host.Configuration.GetValue<string>("Databases")?.Split(',').ToList() ?? throw new ApplicationException("没有配置数据库");
|
||||
var all = host.Configuration.GetValue<bool>("All");
|
||||
|
||||
if (args.Length > 1 && args[0] == "count")
|
||||
{
|
||||
var result = await BatchDbExtensions.CountDatabasesAsync(connStr, databases);
|
||||
if (all)
|
||||
{
|
||||
foreach (var (k, v) in result)
|
||||
{
|
||||
Console.WriteLine(k + ":");
|
||||
Console.WriteLine(v.Select(pair => new { TABLE_NAME = pair.Key, COUNT = pair.Value }).ToMarkdownTable());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
var allCount = result.Aggregate(new Dictionary<string, long>(), (dict, pair) =>
|
||||
{
|
||||
foreach (var (k, v) in pair.Value)
|
||||
{
|
||||
dict.AddOrUpdate(k, v, (key, num) => num + v);
|
||||
}
|
||||
return dict;
|
||||
});
|
||||
Console.WriteLine(allCount.Select(pair => new { TABLE_NAME = pair.Key, COUNT = pair.Value }).ToMarkdownTable());
|
||||
}
|
||||
}
|
||||
|
||||
if (args.Length > 1 && args[0] == "analyze")
|
||||
{
|
||||
await BatchDbExtensions.AnalyzeAllAsync(connStr, databases);
|
||||
}
|
44
Mesdb.Cli/Schema/DB.cs
Normal file
44
Mesdb.Cli/Schema/DB.cs
Normal file
@@ -0,0 +1,44 @@
|
||||
using MesETL.Shared.Helper;
|
||||
using MySqlConnector;
|
||||
using Serilog;
|
||||
|
||||
namespace Mesdb.Cli.Schema;
|
||||
|
||||
public class DB
|
||||
{
|
||||
public required string ConnectionString { get; init; }
|
||||
public required IReadOnlyList<Database> Databases { get; init; }
|
||||
|
||||
public static DB Create(string connStr, IEnumerable<string> dbNames)
|
||||
{
|
||||
var databases = new List<Database>();
|
||||
foreach (var dbName in dbNames)
|
||||
{
|
||||
var dbConnStr = new MySqlConnectionStringBuilder(connStr)
|
||||
{
|
||||
Database = dbName
|
||||
}.ConnectionString;
|
||||
|
||||
try
|
||||
{
|
||||
_ = DatabaseHelper.NonQueryAsync(dbConnStr, "SHOW DATABASES;").Result;
|
||||
databases.Add(new Database(dbName, dbConnStr));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Log.Logger.Fatal(e, "无法连接到数据库: {DbName} ", dbName);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
return new DB
|
||||
{
|
||||
ConnectionString = connStr,
|
||||
Databases = databases
|
||||
};
|
||||
}
|
||||
|
||||
private DB()
|
||||
{
|
||||
}
|
||||
}
|
50
Mesdb.Cli/Schema/Database.cs
Normal file
50
Mesdb.Cli/Schema/Database.cs
Normal file
@@ -0,0 +1,50 @@
|
||||
using System.Data;
|
||||
using MesETL.Shared.Helper;
|
||||
using MySqlConnector;
|
||||
|
||||
namespace Mesdb.Cli.Schema;
|
||||
|
||||
public class Database
|
||||
{
|
||||
public static async Task<Table[]> FetchTableAsync(string dbName, string connStr)
|
||||
{
|
||||
var tables = await DatabaseHelper.QueryTableAsync(connStr,
|
||||
$"""
|
||||
SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbName}';
|
||||
""");
|
||||
return tables.Tables[0].Rows.Cast<DataRow>().Select(row => new Table{Name = row[0].ToString()!}).ToArray();
|
||||
}
|
||||
|
||||
|
||||
public string Name { get; }
|
||||
public string ConnectionString { get; }
|
||||
public IReadOnlyList<Table> Tables { get; }
|
||||
|
||||
public Database(string name, string connStr)
|
||||
{
|
||||
var trueConnStr = new MySqlConnectionStringBuilder(connStr)
|
||||
{
|
||||
Database = name
|
||||
}.ConnectionString;
|
||||
|
||||
var tables = FetchTableAsync(name, trueConnStr).Result;
|
||||
Name = name;
|
||||
ConnectionString = trueConnStr;
|
||||
Tables = tables;
|
||||
}
|
||||
|
||||
public Task ExecuteNonQueryAsync(string sql, CancellationToken cancellationToken = default)
|
||||
{
|
||||
return DatabaseHelper.NonQueryAsync(ConnectionString, sql, cancellationToken);
|
||||
}
|
||||
|
||||
public Task<DataSet> ExecuteQueryAsync(string sql, CancellationToken cancellationToken = default)
|
||||
{
|
||||
return DatabaseHelper.QueryTableAsync(ConnectionString, sql, cancellationToken);
|
||||
}
|
||||
|
||||
public Task<object?> ExecuteScalarAsync(string sql, CancellationToken cancellationToken = default)
|
||||
{
|
||||
return DatabaseHelper.QueryScalarAsync(ConnectionString, sql, cancellationToken);
|
||||
}
|
||||
}
|
8
Mesdb.Cli/Schema/Table.cs
Normal file
8
Mesdb.Cli/Schema/Table.cs
Normal file
@@ -0,0 +1,8 @@
|
||||
namespace Mesdb.Cli.Schema;
|
||||
|
||||
public class Table
|
||||
{
|
||||
|
||||
public required string Name { get; init; }
|
||||
|
||||
}
|
Reference in New Issue
Block a user