Compare commits

...

7 Commits

12 changed files with 286 additions and 168 deletions

View File

@@ -25,7 +25,7 @@ public static class TableNames
public const string OrderProcessStep = "order_process_step";
public const string OrderProcessStepItem = "order_process_step_item";
public const string OrderScrapBoard = "order_scrap_board";
public const string OrderWaveGroup = "order_wave_group";
public const string OrderExtraList = "order_extra_list";
public const string ProcessGroup = "process_group";
public const string ProcessInfo = "process_info";
public const string ProcessItemExp = "process_item_exp";

View File

@@ -2,52 +2,12 @@
public class DataRecord : ICloneable
{
/// <summary>
/// 尝试获取一条记录的某个字段值
/// </summary>
/// <param name="record"></param>
/// <param name="columnName"></param>
/// <param name="value"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
public static bool TryGetField(DataRecord record, string columnName, out string value)
{
value = string.Empty;
if (record.Headers is null)
throw new InvalidOperationException("Cannot get field when headers of a record have not been set.");
var idx = IndexOfIgnoreCase(record.Headers, columnName);
if (idx == -1)
return false;
value = record.Fields[idx];
return true;
}
/// <summary>
/// 获取一条记录的某个字段值
/// TODO: 最好能优化至O(1)
/// </summary>
/// <param name="record"></param>
/// <param name="columnName"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
/// <exception cref="IndexOutOfRangeException"></exception>
public static string GetField(DataRecord record, string columnName)
{
if (record.Headers is null)
throw new InvalidOperationException("Headers have not been set.");
var idx = IndexOfIgnoreCase(record.Headers, columnName);
if (idx is -1)
throw new IndexOutOfRangeException(
$"Column name '{columnName}' not found in this record, table name '{record.TableName}'.");
return record.Fields[idx];
}
private static int IndexOfIgnoreCase(IList<string> list, string value)
{
var idx = -1;
for (var i = 0; i < list.Count; i++)
{
if (list[i].Equals(value, StringComparison.OrdinalIgnoreCase))
if (list[i].Equals(value, StringComparison.OrdinalIgnoreCase))
{
idx = i;
break;
@@ -57,45 +17,54 @@ public class DataRecord : ICloneable
return idx;
}
private readonly List<string> _fields;
private readonly List<string> _headers;
/// <summary>
/// 字段列表
/// </summary>
public IList<string> Fields { get; }
public IReadOnlyList<string> Fields => _fields;
/// <summary>
/// 表头列表
/// </summary>
public IList<string> Headers { get; }
public IReadOnlyList<string> Headers => _headers;
/// <summary>
/// 来源表名
/// </summary>
public string TableName { get; }
/// <summary>
/// 需要输出的数据库
/// </summary>
public string? Database { get; set; }
/// <summary>
/// 所有字段的总字符数量
/// </summary>
public long FieldCharCount { get; }
/// <summary>
/// 忽略这个记录,不会被输出
/// </summary>
public bool Ignore { get; set; }
public DataRecord(IEnumerable<string> fields, string tableName, IEnumerable<string> headers, string? database = null)
public DataRecord(IEnumerable<string> fields, string tableName, IEnumerable<string> headers,
string? database = null)
{
Fields = fields.ToList();
_fields = fields.ToList();
TableName = tableName;
Headers = headers.ToList();
_headers = headers.ToList();
Database = database;
if (Fields.Count != Headers.Count)
if (_fields.Count != _headers.Count)
throw new ArgumentException(
$"The number of fields does not match the number of headers. Expected: {Headers.Count} Got: {Fields.Count} Fields: {string.Join(',', Fields)}",
$"The number of fields does not match the number of headers. Expected: {_headers.Count} Got: {_fields.Count} Fields: {string.Join(',', _fields)}",
nameof(fields));
FieldCharCount = Fields.Sum(x => (long)x.Length);
FieldCharCount = _fields.Sum(x => (long)x.Length);
}
/// <summary>
@@ -104,8 +73,8 @@ public class DataRecord : ICloneable
/// <param name="index"></param>
public string this[int index]
{
get => Fields[index];
set => Fields[index] = value;
get => _fields[index];
set => _fields[index] = value;
}
/// <summary>
@@ -114,61 +83,101 @@ public class DataRecord : ICloneable
/// <param name="columnName"></param>
public string this[string columnName]
{
get => GetField(this, columnName);
get => GetField(columnName);
set => SetField(columnName, value);
}
/// <summary>
/// 尝试获取字段值
/// 尝试获取某个字段值
/// </summary>
/// <param name="columnName"></param>
/// <param name="value"></param>
/// <returns></returns>
public bool TryGetField(string columnName, out string value) => TryGetField(this, columnName, out value);
/// <summary>
/// 为一个字段赋值
/// </summary>
/// <param name="columnName"></param>
/// <param name="value"></param>
/// <returns></returns>
public bool SetField(string columnName, string value) => SetField(this, columnName, value);
/// <summary>
///
/// </summary>
/// <param name="record"></param>
/// <param name="columnName"></param>
/// <param name="value"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
/// <exception cref="IndexOutOfRangeException"></exception>
public static bool SetField(DataRecord record, string columnName, string value)
public bool TryGetField(string columnName, out string value)
{
// 表头检查
if (record.Headers is null)
throw new InvalidOperationException("记录的表头尚未设置,无法赋值");
var idx = IndexOfIgnoreCase(record.Headers, columnName);
if (idx is -1)
throw new IndexOutOfRangeException(
$"列 '{columnName}' 不存在于该纪录中,表名 '{record.TableName}");
record.Fields[idx] = value;
value = string.Empty;
if (_headers is null)
throw new InvalidOperationException("Cannot get field when headers of a record have not been set.");
var idx = IndexOfIgnoreCase(_headers, columnName);
if (idx == -1)
return false;
value = _fields[idx];
return true;
}
public void RemoveField(string columnName)
/// <summary>
/// 获取一条记录的某个字段值
/// TODO: 最好能优化至O(1)
/// </summary>
/// <param name="columnName"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
/// <exception cref="IndexOutOfRangeException"></exception>
public string GetField(string columnName)
{
var idx = IndexOfIgnoreCase(Headers, columnName);
if (idx == -1)
throw new InvalidOperationException($"{TableName}: 列名 '{columnName}' 不存在");
Fields.RemoveAt(idx);
Headers.Remove(columnName);
if (_headers is null)
throw new InvalidOperationException("记录的表头尚未设置,无法赋值");
var idx = IndexOfIgnoreCase(_headers, columnName);
if (idx is -1)
throw new IndexOutOfRangeException(
$"列 '{columnName}' 不存在于该纪录中,表名 '{TableName}");
return _fields[idx];
}
public bool HeaderExists(string columnName) => IndexOfIgnoreCase(Headers, columnName) != -1;
/// <summary>
/// 为记录的一个字段赋值,如果该字段名不存在则会抛出异常
/// </summary>
/// <param name="columnName">列名</param>
/// <param name="value">值</param>
/// <returns></returns>
/// <exception cref="InvalidOperationException">该记录的表头尚未初始化,你可能在错误的阶段调用了该方法</exception>
/// <exception cref="IndexOutOfRangeException">输入的字段名不存在于该记录中</exception>
public void SetField(string columnName, string value)
{
// 表头检查
if (_headers is null)
throw new InvalidOperationException("记录的表头尚未设置,无法赋值");
var idx = IndexOfIgnoreCase(_headers, columnName);
if (idx is -1)
throw new IndexOutOfRangeException(
$"列 '{columnName}' 不存在于该纪录中,表名 '{TableName}");
_fields[idx] = value;
}
/// <summary>
/// 在记录中追加一个字段
/// </summary>
/// <param name="columnName">字段名</param>
/// <param name="value">字段值</param>
/// <exception cref="InvalidOperationException">记录的表头尚未初始化,你可能在错误的阶段调用了此方法</exception>
/// <exception cref="ArgumentException">提供的字段名已存在于该记录中</exception>
public void AppendField(string columnName, string value)
{
if (_headers is null)
throw new InvalidOperationException("记录的表头尚未设置,无法赋值");
var idx = IndexOfIgnoreCase(_headers, columnName);
if (idx is > 0)
throw new ArgumentException($"字段名 '{columnName}' 已存在于该记录中,无法重复添加", nameof(columnName));
_headers.Add(columnName);
_fields.Add(value);
}
public void RemoveField(string columnName)
{
var idx = IndexOfIgnoreCase(_headers, columnName);
if (idx == -1)
throw new InvalidOperationException($"{TableName}: 列名 '{columnName}' 不存在");
_fields.RemoveAt(idx);
_headers.Remove(columnName);
}
public bool HeaderExists(string columnName) => IndexOfIgnoreCase(_headers, columnName) != -1;
public object Clone()
{
return new DataRecord(new List<string>(Fields), TableName, new List<string>(Headers), Database);
return new DataRecord(new List<string>(_fields), TableName, new List<string>(_headers), Database);
}
}

View File

@@ -31,6 +31,8 @@ public class FileInputService : IInputService
private readonly ProcessContext _context;
private readonly DataReaderFactory _dataReaderFactory;
private readonly long _memoryThreshold;
private readonly bool _dryRun;
private readonly int _dryRunCount;
public FileInputService(ILogger<FileInputService> logger,
IOptions<DataInputOptions> dataInputOptions,
@@ -45,12 +47,16 @@ public class FileInputService : IInputService
_producerQueue = producerQueue;
_dataReaderFactory = dataReaderFactory;
_memoryThreshold = (long)(configuration.GetValue<double>("MemoryThreshold", 8) * 1024 * 1024 * 1024);
_dryRun = configuration.GetValue("DryRun", false);
_dryRunCount = configuration.GetValue("DryRunCount", 100_000);
}
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
var inputDir = _dataInputOptions.Value.InputDir ?? throw new ApplicationException("未配置文件输入目录");
_logger.LogInformation("***** 输入服务已启动,工作目录为:{InputDir} *****", inputDir);
if (_dryRun)
_logger.LogInformation("***** 试运行模式已开启,只读取前 {Count} 行数据 *****", _dryRunCount);
var orderedInfo = GetOrderedInputInfo(inputDir);
@@ -59,7 +65,10 @@ public class FileInputService : IInputService
var file = Path.GetFileName(info.FileName);
_logger.LogInformation("正在读取文件:{FileName}, 对应的数据表:{TableName}", file, info.TableName);
using var source = _dataReaderFactory.CreateReader(info.FileName, info.TableName, info.Headers);
var count = 0;
var countBuffer = 0;
if (_dryRun && _context.TableProgress.GetValueOrDefault(info.TableName, (input: 0, output: 0)).input >= _dryRunCount)
continue;
while (await source.ReadAsync())
{
@@ -72,17 +81,29 @@ public class FileInputService : IInputService
}
var record = source.Current;
await _producerQueue.EnqueueAsync(record);
count++;
countBuffer++;
_context.AddInput();
// 避免影响性能每1000条更新一次表输入进度
if (countBuffer >= 1000)
{
_context.AddTableInput(info.TableName, countBuffer);
countBuffer = 0;
// 试运行模式下,超出了指定行数则停止输入
if (_dryRun && _context.TableProgress[info.TableName].input >= _dryRunCount)
{
break;
}
}
}
_context.AddTableInput(info.TableName, count);
_context.AddTableInput(info.TableName, countBuffer);
_logger.LogInformation("文件 {File} 输入完成", file);
_dataInputOptions.Value.OnTableInputCompleted?.Invoke(info.TableName);
}
_context.CompleteInput();
_logger.LogInformation("***** 输入服务已执行完毕 *****");
_logger.LogInformation("***** 输入服务{DryRun}已执行完毕 *****", _dryRun ? " (试运行)" : "");
}
public IEnumerable<FileInputInfo> GetOrderedInputInfo(string dir)

View File

@@ -77,6 +77,8 @@ public class MainHostedService : BackgroundService
await Task.WhenAll(inputTask, transformTask, outputTask);
_stopwatch.Stop();
_logger.LogInformation("***** 所有传输任务均已完成 *****");
if (_context.HasException)
_logger.LogError("***** 传输过程中有错误发生 *****");
_logger.LogInformation("***** 耗时:{Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
await Task.Delay(5000, stoppingToken);
@@ -114,7 +116,10 @@ public class MainHostedService : BackgroundService
{
var connStr = _databaseOptions.Value.ConnectionString
?? throw new ApplicationException("分库配置中没有配置数据库");
_logger.LogWarning("已开启MySQL延迟写入功能并禁用重做日志请注意数据安全");
if (enable)
_logger.LogWarning("已开启MySQL延迟写入功能并禁用重做日志请注意数据安全");
else _logger.LogInformation("不安全变量已关闭");
if (enable)
{
await DatabaseHelper.NonQueryAsync(connStr,
@@ -158,9 +163,16 @@ public class MainHostedService : BackgroundService
private async Task ExportResultAsync()
{
var sb = new StringBuilder();
if (_context.HasException)
sb.AppendLine("# 程序执行完毕,**但中途发生了异常**");
else sb.AppendLine("# 程序执行完毕,没有发生错误");
var title = (_config.GetValue("DryRun", false), _context.HasException) switch
{
(true, true) => "# 试运行结束,**请注意处理异常**",
(true, false) => "# 试运行结束,没有发生异常",
(false, true) => "# 程序执行完毕,**但中途发生了异常**",
(false, false) => "# 程序执行完毕,没有发生错误"
};
sb.AppendLine(title);
sb.AppendLine("## 处理计数");
var processCount = new[]
{

View File

@@ -1,4 +1,4 @@
#define USE_TEST_DB // 如果使用测试库运行则加上USE_TEST_DB预处理器指令
// #define FIX_PLAN_ITEM // 测试环境对OrderBlockPlanItem表进行修复时使用
using System.Text;
using MesETL.App;
@@ -12,6 +12,7 @@ using MesETL.App.Options;
using MesETL.App.Services.ErrorRecorder;
using MesETL.App.Services.Loggers;
using MesETL.App.Services.Seq;
using MesETL.Shared.Compression;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
@@ -124,19 +125,11 @@ async Task RunProgram()
}
break;
}
#if USE_TEST_DB
// 测试环境的OrderExtra表没有分区故按照SharedKey清理
// 清理ShardKey < 24010的
case TableNames.OrderExtra:
// 忽略OrderBlockPlanItem
case TableNames.OrderBlockPlanItem:
{
var shardKey = int.Parse(record["ShardKey"].AsSpan()[..4]);
if (shardKey < oldestTimeInt_yyMM)
{
return false;
}
break;
}
#endif
return false;
}
// 清理(Status != 0 || Deleted = 1) && ID前四位 < 2401的
case TableNames.OrderScrapBoard:
{
@@ -217,6 +210,15 @@ async Task RunProgram()
break;
}
// 将JsonStr列转换为Data列添加CompressionType列
case TableNames.OrderModuleExtra:
{
record.AppendField("CompressionType", "1");
record.AppendField("Data",
Convert.ToHexString(DeflateArchive.Compress(Convert.FromHexString(record["JsonStr"]))));
record.RemoveField("JsonStr");
break;
}
// 删除ID列让数据库自行递增
// TODO: 数据表改进删除ID列或是替换为流水号
case TableNames.ProcessStepEfficiency:
@@ -234,14 +236,13 @@ async Task RunProgram()
record.RemoveField("Key");
break;
}
#if USE_TEST_DB
// 测试环境忽略PlaceData列生产环境会提前将其移除
// 移除PlaceData列如果存在的话生产库已经删除
case TableNames.SimplePlanOrder:
{
record.RemoveField("PlaceData");
if(record.HeaderExists("PlaceData"))
record.RemoveField("PlaceData");
break;
}
#endif
default: break;
}
@@ -281,11 +282,49 @@ async Task RunProgram()
var id = seq.AddCachedSeq(SeqConfig.OrderWaveGroupID);
var orderWaveGroup = new DataRecord(
[id.ToString(), ..headers.Select(c => record[c])],
TableNames.OrderWaveGroup,
TableNames.OrderExtraList,
["ID", "OrderNo", "ShardKey", "Type", "ConfigJson", "CompanyID"]);
resultList.Add(orderWaveGroup);
return resultList;
}
// 通过OrderItem重建OrderBlockPlanItem表
if (record.TableName == TableNames.OrderItem)
{
#if FIX_PLAN_ITEM
record.Ignore = true;
#endif
var resultList = new List<DataRecord>();
record.TryGetField("ID", out var itemId);
record.TryGetField("ShardKey", out var shardKey);
record.TryGetField("PlanID", out var planId);
record.TryGetField("PackageID", out var packageId);
record.TryGetField("CompanyID", out var companyId);
if(!int.TryParse(planId, out var pid))
throw new ApplicationException($"数据发生异常OrderItem.PlanID值: {(string.IsNullOrWhiteSpace(planId) ? "NULL" : planId)}");
if (pid > 0)
{
resultList.Add(new DataRecord([itemId, shardKey, planId, companyId],
TableNames.OrderBlockPlanItem,
["ItemID", "ShardKey", "PlanID", "CompanyID"]
));
}
if(!int.TryParse(packageId, out var pkid))
throw new ApplicationException($"数据发生异常OrderItem.PackageID值: {(string.IsNullOrWhiteSpace(packageId) ? "NULL" : packageId)}");
if(pkid > 0)
{
resultList.Add(new DataRecord([itemId, shardKey, packageId, companyId],
TableNames.OrderPackageItem,
[ "ItemID", "ShardKey", "PackageID", "CompanyID" ]
));
}
record.RemoveField("PlanID");
record.RemoveField("PackageID");
return resultList;
}
return ArraySegment<DataRecord>.Empty;
};
@@ -368,7 +407,7 @@ async Task RunProgram()
host.Services.AddHostedService<MainHostedService>();
host.Services.AddSingleton<IInputService, FileInputService>();
host.Services.AddSingleton<ITransformService, TransformService>();
host.Services.AddSingleton<IOutputService, VoidOutputService>();
host.Services.AddSingleton<IOutputService, OutputService>();
host.Services.AddSingleton<TaskMonitorService>();
// host.Services.AddRedisCache(redisOptions);
host.Services.AddSingleton<ICacher, MemoryCache>();

View File

@@ -64,7 +64,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
if (_recordCache.Count == 0)
return;
await using var cmd = _conn.CreateCommand();
var cmd = _conn.CreateCommand();
cmd.CommandTimeout = 0;
try
@@ -108,7 +108,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
public IEnumerable<string> GetExecutionList(IDictionary<string, IList<DataRecord>> tableRecords, int maxAllowPacket)
{
var sb = new StringBuilder("SET AUTOCOMMIT = 1;\n");
var sb = new StringBuilder(_options.Value.FlushCount * 128);
var appendCount = 0;
foreach (var (tableName, records) in tableRecords)
{
@@ -117,6 +117,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
var recordIdx = 0;
StartBuild:
sb.AppendLine("SET AUTOCOMMIT = 0;\n");
var noCommas = true;
// 标准列顺序,插入时的字段需要按照该顺序排列
@@ -212,7 +213,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
TryAddForUpdateSuffix(tableName, sb);
sb.Append(';').AppendLine();
sb.Append("SET AUTOCOMMIT = 1;");
sb.Append("COMMIT;");
yield return sb.ToString();
sb.Clear();
goto StartBuild;

View File

@@ -45,7 +45,7 @@ public class ProcessContext
public void CompleteTransform() => IsTransformCompleted = true;
public void CompleteOutput() => IsOutputCompleted = true;
public bool AddException(Exception e) => _hasException = true;
public bool AddException(Exception e) => _hasException = true; // 没打算存起来,暂时先加个标记
public void AddInput() => Interlocked.Increment(ref _inputCount);
@@ -59,17 +59,16 @@ public class ProcessContext
public void AddTableInput(string table, int count)
{
if (!_tableProgress.TryAdd(table, (input: count, output: 0)))
_tableProgress.AddOrUpdate(table, (input: count, output: 0), (k, tuple) =>
{
var tuple = _tableProgress[table];
tuple.input += count;
_tableProgress[table] = tuple;
}
return tuple;
});
}
public void AddTableOutput(string table, int count)
{
_tableProgress.AddOrUpdate(table, (input:0, output:count), (k, tuple) =>
_tableProgress.AddOrUpdate(table, (input: 0, output: count), (k, tuple) =>
{
tuple.output += count;
return tuple;

View File

@@ -2,16 +2,17 @@
"MemoryThreshold": 6,
"GCIntervalMilliseconds": -1,
"UnsafeVariable": true,
"DryRun": true, // 试运行仅输入每张表的前100000条数据
"Logging": {
"LogLevel": {
"Default": "Trace"
}
},
"Input":{
"InputDir": "D:\\Data\\DatabaseDump\\MyDumper-ZST 2024-12-3", // Csv数据输入目录
"InputDir": "D:\\Data\\DatabaseDump\\Prod_Mock_CSV_2024-12-31", // Csv数据输入目录
"UseMock": false, // 使用模拟数据进行测试
"MockCountMultiplier": 1, // 模拟数据量级的乘数
// "TableOrder": ["order_block_plan_item"], // 按顺序输入的表
// "TableOrder": ["order_item"], // 按顺序输入的表
"TableIgnoreList": [] // 忽略输入的表
},
"Transform":{
@@ -26,7 +27,7 @@
"MaxAllowedPacket": 67108864,
"FlushCount": 10000, // 每次提交记录条数
"MaxDatabaseOutputTask" : 4, // 每个数据库最大提交任务数
"TreatJsonAsHex": false, // 将json列作为16进制格式输出(0x前缀)生产库是没有json列的
"TreatJsonAsHex": false, // 使Json列输出时带上"0x"前缀
"NoOutput": [], // 不输出的表
"ForUpdate":
{
@@ -44,7 +45,7 @@
"TenantDb": // 分库配置
{
"TenantKey" : "CompanyID",
"UseDbGroup": "mock",
"UseDbGroup": "prod",
"DbGroups": {
"test": {
"cferp_test_1": 1000,
@@ -59,13 +60,6 @@
"mesdb_5": 20000,
"mesdb_6": 2147483647
},
"mock":{
"mesdb_1": 5000,
"mesdb_2": 10000,
"mesdb_3": 15000,
"mesdb_4": 20000,
"mesdb_5": 2147483647
},
"mock_void":{
"mesdb_1_void": 5000,
"mesdb_2_void": 10000,

View File

@@ -0,0 +1,39 @@
using System.IO.Compression;
namespace MesETL.Shared.Compression;
/// <summary>
/// Deflate压缩工具类
/// </summary>
public static class DeflateArchive
{
/// <summary>
/// 解压Deflate
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
public static byte[] Decompress(byte[] input)
{
using var msi = new MemoryStream(input);
using var mso = new MemoryStream();
using var ds = new DeflateStream(msi, CompressionMode.Decompress);
ds.CopyTo(mso);
ds.Flush();
return mso.ToArray();
}
/// <summary>
/// 压缩Deflate
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
public static byte[] Compress(byte[] input)
{
using var msi = new MemoryStream(input);
using var mso = new MemoryStream();
using var ds = new DeflateStream(mso, CompressionMode.Compress);
msi.CopyTo(ds);
ds.Flush();
return mso.ToArray();
}
}

View File

@@ -1,22 +0,0 @@
using System.IO.Compression;
namespace MesETL.Shared.Helper;
public class CompressHelper
{
/// <summary>
///
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
public static byte[] CompressDeflate(byte[] data)
{
using var src = new MemoryStream(data);
using var outStream = new MemoryStream();
using var gzip = new DeflateStream(outStream, CompressionMode.Compress);
src.CopyTo(gzip);
gzip.Flush();
return outStream.ToArray();
}
}

View File

@@ -12,7 +12,7 @@ namespace TestProject1;
public class DatabaseToolBox
{
private readonly ITestOutputHelper _output;
public const string ConnStr = "Server=127.0.0.1;Port=3306;UserId=root;Password=123456;";
public const string ConnStr = "Server=localhost;Port=3306;UserId=root;Password=123456;";
public DatabaseToolBox(ITestOutputHelper output)
{
@@ -156,9 +156,11 @@ public class DatabaseToolBox
}
[Theory]
[InlineData(["cferp_test_1"])]
[InlineData(["cferp_test_2"])]
[InlineData(["cferp_test_3"])]
[InlineData(["mesdb_1"])]
[InlineData(["mesdb_2"])]
[InlineData(["mesdb_3"])]
[InlineData(["mesdb_4"])]
[InlineData(["mesdb_5"])]
public async Task DropAllIndex(string database)
{
var indexes = await GetAllTableIndexes(database);
@@ -177,6 +179,7 @@ public class DatabaseToolBox
[InlineData("mesdb_3")]
[InlineData("mesdb_4")]
[InlineData("mesdb_5")]
[InlineData("mesdb_6")]
public async Task TruncateAllTable(string database)
{
var tables = await DatabaseHelper.QueryTableAsync(ConnStr,
@@ -195,4 +198,27 @@ public class DatabaseToolBox
}
await DatabaseHelper.NonQueryAsync(ConnStr, sb.ToString());
}
[Theory]
[InlineData("cferp_test_1")]
[InlineData("cferp_test_2")]
[InlineData("cferp_test_3")]
public async Task AnalyzeAllTable(string database)
{
var tables = await DatabaseHelper.QueryTableAsync(ConnStr,
$"""
SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{database}';
""");
var sb = new StringBuilder();
sb.AppendLine($"USE `{database}`;");
foreach (DataRow row in tables.Tables[0].Rows)
{
var tableName = row["TABLE_NAME"].ToString();
var sql = $"""
ANALYZE TABLE `{tableName}`;
""";
sb.AppendLine(sql);
}
await DatabaseHelper.NonQueryAsync(ConnStr, sb.ToString());
}
}

View File

@@ -57,7 +57,7 @@ async Task RunProgram()
host.Services.Configure<MockInputOptions>(options =>
{
const float Multiplexer = 0.01F;
const float Multiplexer = 1F;
var SampleSharedKeys = Enumerable.Range(0, 11).Select(i => (23010 + i * 10).ToString()).Concat(
Enumerable.Range(0, 11).Select(i => (24010 + i * 10).ToString())).ToArray();
options.Rules = new Dictionary<string, TableMockOptions>()