This commit is contained in:
陈梓阳 2024-02-01 13:41:59 +08:00
parent 083090c62b
commit 70cf0322e4
8 changed files with 68 additions and 113 deletions

View File

@ -10,6 +10,9 @@
<ItemGroup>
<None Remove="appsettings.json" />
<None Update="RestoreIndex.sql">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>

View File

@ -48,6 +48,8 @@ public class MainHostedService : BackgroundService
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_stopwatch = Stopwatch.StartNew();
await SetVariableAsync(); // 开启延迟写入,禁用重做日志 >>> 重做日志处于禁用状态时绝对不要关闭数据库服务!
var inputTask = ExecuteAndCatch(
async () => await _input.ExecuteAsync(stoppingToken), "文件输入程序出现异常", stoppingToken);
var transformTask = ExecuteAndCatch(
@ -61,7 +63,7 @@ public class MainHostedService : BackgroundService
_logger.LogInformation("***** ElapseTime: {Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
await Task.Delay(5000, stoppingToken);
await SetVariableAsync(false); // 关闭延迟写入,开启重做日志
if (!stoppingToken.IsCancellationRequested)
{
await ExportResultAsync();
@ -92,12 +94,34 @@ public class MainHostedService : BackgroundService
}, ct);
}
private async Task SetVariableAsync(bool enable = true)
{
var connStr = _databaseOptions.Value.ConnectionString
?? throw new ApplicationException("无法还原索引,因为分库配置中没有配置数据库");
if (enable)
{
await DatabaseHelper.NonQueryAsync(connStr,
"""
SET GLOBAL innodb_flush_log_at_trx_commit = 0;
ALTER INSTANCE DISABLE INNODB REDO_LOG;
""");
}
else
{
await DatabaseHelper.NonQueryAsync(connStr,
"""
SET GLOBAL innodb_flush_log_at_trx_commit = 1;
ALTER INSTANCE ENABLE INNODB REDO_LOG;
""");
}
}
/// <summary>
/// 还原所有数据库的索引...
/// </summary>
/// <returns></returns>
/// <exception cref="ApplicationException"></exception>
private Task RestoreIndexAsync()
private async Task RestoreIndexAsync()
{
var databases = _tenantDbOptions.Value.DbList?.Keys
?? throw new ApplicationException("无法还原索引,因为分库配置中没有配置数据库");
@ -107,85 +131,12 @@ public class MainHostedService : BackgroundService
foreach(var db in databases)
{
var task = DatabaseHelper.NonQueryAsync(connStr + $";Database={db};",
"""
CREATE INDEX `idx_CompanyID` ON `machine` (`CompanyID`);
CREATE INDEX `idx_companyid` ON `order` (`CompanyID`);
CREATE INDEX `idx_CompanyID` ON `order_block_plan` (`CompanyID`);
CREATE INDEX `idx_PlanID` ON `order_block_plan_item` (`PlanID`);
CREATE INDEX `idx_orderno` ON `order_box_block` (`OrderNo`);
CREATE INDEX `index_OrderNo` ON `order_data_block` (`OrderNo`);
CREATE INDEX `index_OrderNo` ON `order_data_goods` (`OrderNo`);
CREATE INDEX `index_OrderNo` ON `order_data_parts` (`OrderNo`);
CREATE INDEX `index_ItemNo` ON `order_item` (`ItemNo`);
CREATE INDEX `index_OrderNo` ON `order_item` (`OrderNo`);
CREATE INDEX `index_PackageID` ON `order_item` (`PackageID`);
CREATE INDEX `index_PlanID` ON `order_item` (`PlanID`);
CREATE INDEX `idx_OrderNo` ON `order_module` (`OrderNo`);
CREATE INDEX `index_OrderNo` ON `order_module_extra` (`OrderNo`);
CREATE INDEX `index_OrderNo` ON `order_module_item` (`OrderNo`);
CREATE INDEX `idx_OrderNo` ON `order_package` (`OrderNo`);
CREATE INDEX `idx_PakageNo` ON `order_package` (`PakageNo`);
CREATE INDEX `idx_PackageID` ON `order_package_item` (`PackageID`);
CREATE INDEX `idx_companyid` ON `order_patch_detail` (`CompanyID`);
CREATE INDEX `idx_OrderNo` ON `order_process` (`OrderNo`);
CREATE INDEX `index_CompanyID` ON `order_process_schdule` (`CompanyID`);
CREATE INDEX `IX_order_process_step_OrderProcessID` ON `order_process_step` (`OrderProcessID`);
CREATE INDEX `idx_OrderProcessID` ON `order_process_step_item` (`OrderProcessID`);
CREATE INDEX `idx_OrderProcessStepID` ON `order_process_step_item` (`OrderProcessStepID`);
CREATE INDEX `idx_CompanyID` ON `order_scrap_board` (`CompanyID`);
CREATE INDEX `idx_CompanyID` ON `process_group` (`CompanyID`);
CREATE INDEX `idx_CompanyID` ON `process_info` (`CompanyID`);
CREATE INDEX `index_CompanyID` ON `process_item_exp` (`CompanyID`);
CREATE INDEX `idx_CompanyID` ON `process_schdule_capacity` (`CompanyID`);
CREATE INDEX `idx_CompanyID` ON `process_step_efficiency` (`CompanyID`);
CREATE INDEX `idx_CompanyID` ON `report_template` (`CompanyID`);
CREATE INDEX `indx_OrderNo` ON `simple_package` (`OrderNo`);
CREATE INDEX `idx_CompanyID` ON `simple_plan_order` (`CompanyID`);
CREATE INDEX `idx_CompanyID` ON `sys_config` (`CompanyID`);
CREATE INDEX `idx` ON `work_calendar` (`CompanyID`);
CREATE INDEX `idx_CompanyID` ON `work_shift` (`CompanyID`);
CREATE INDEX `IX_work_time_ShiftID` ON `work_time` (`ShiftID`);
""");
await File.ReadAllTextAsync(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "RestoreIndex.sql"))
);
list.Add(task);
}
return Task.WhenAll(list);
await Task.WhenAll(list);
}
private async Task ExportResultAsync()

View File

@ -5,6 +5,7 @@ using ConsoleApp2.Services;
using ConsoleApp2.Services.ErrorRecorder;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySqlConnector;
using MySqlDestination = ConsoleApp2.Services.ETL.MySqlDestination;
using TaskExtensions = ConsoleApp2.Helpers.TaskExtensions;
@ -110,9 +111,19 @@ public class OutputService : IOutputService
private async Task FlushAsync(string dbName, IEnumerable<DataRecord> records)
{
var connStr = _outputOptions.Value.ConnectionString ??
throw new InvalidOperationException("连接字符串为空");
await using var output = new MySqlDestination($"{connStr};Database={dbName};", _logger,
var connStr = new MySqlConnectionStringBuilder(_outputOptions.Value.ConnectionString
?? throw new ApplicationException("未配置数据库连接字符串"))
{
CharacterSet = "utf8mb4",
AllowUserVariables = true,
IgnoreCommandTransaction = true,
TreatTinyAsBoolean = false,
ConnectionTimeout = 60,
DefaultCommandTimeout = 0,
SslMode = MySqlSslMode.None,
Database = dbName
}.ConnectionString;
await using var output = new MySqlDestination(connStr, _logger,
_outputOptions, _errorRecorderFactory.CreateOutput(dbName), _context);
var tableOutput = new Dictionary<string, int>();

View File

@ -19,11 +19,6 @@ namespace ConsoleApp2.Options
public string Delimiter { get; set; } = ",";
#endregion
/// <summary>
/// yyyyMM
/// </summary>
public string CleanDate { get; set; } = "202301";
#region Mock

View File

@ -19,6 +19,11 @@ public class DataTransformOptions
public bool EnableReplacer { get; set; } = true;
public bool EnableReBuilder { get; set; } = true;
/// <summary>
/// yyyyMM
/// </summary>
public string CleanDate { get; set; } = "202301";
/// <summary>
/// Record -> Database name
/// 对记录进行数据库过滤

View File

@ -59,8 +59,8 @@ async Task RunProgram()
host.Services.Configure<TenantDbOptions>(tenantDbSection);
host.Services.Configure<RedisCacheOptions>(redisSection);
var oldestTime = DateTime.ParseExact(inputOptions.CleanDate, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo);
var oldestTimeInt = int.Parse(inputOptions.CleanDate);
var oldestTime = DateTime.ParseExact(transformOptions.CleanDate, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo);
var oldestTimeInt = int.Parse(transformOptions.CleanDate);
// 输入配置
host.Services.Configure<DataInputOptions>(options =>
@ -453,16 +453,7 @@ async Task RunProgram()
host.Services.Configure<DatabaseOutputOptions>(options =>
{
options.ConnectionString = new MySqlConnectionStringBuilder(outputOptions.ConnectionString ?? throw new ApplicationException("未配置数据库连接字符串"))
{
CharacterSet = "utf8mb4",
AllowUserVariables = true,
IgnoreCommandTransaction = true,
TreatTinyAsBoolean = false,
ConnectionTimeout = 60,
DefaultCommandTimeout = 0,
SslMode = MySqlSslMode.None,
}.ConnectionString;
options.ConnectionString = outputOptions.ConnectionString;
options.FlushCount = outputOptions.FlushCount;
options.MaxAllowedPacket = outputOptions.MaxAllowedPacket;
options.MaxDatabaseOutputTask = outputOptions.MaxDatabaseOutputTask;

View File

@ -137,7 +137,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
// 在这里处理特殊列
#region HandleFields
if (field == "\\N")
if (field.Length == 2 && field == @"\N") // MyDumper NULL
{
recordSb.Append("NULL");
goto Escape;

View File

@ -5,25 +5,24 @@
}
},
"Input":{
"InputDir": "D:\\Dump\\MockData",
"CleanDate": "202301",
"StrictMode": false,
"UseMock": true,
"MockCountMultiplier": 0.05
"InputDir": "D:\\Dump\\MockData", // Csv
"UseMock": true, // 使
"MockCountMultiplier": 0.5 //
},
"Transform":{
"StrictMode": true,
"EnableFilter": false,
"EnableReplacer": false,
"EnableReBuilder": false
"StrictMode": true, // true
"EnableFilter": false, //
"EnableReplacer": false, //
"EnableReBuilder": false, //
"CleanDate": "202301" //
},
"Output":{
"ConnectionString": "Server=127.0.0.1;Port=3306;UserId=root;Password=cfmes123456;", // 'Database='
"MaxAllowedPacket": 67108864,
"FlushCount": 20000,
"MaxDatabaseOutputTask" : 4,
"TreatJsonAsHex": false,
"RestoreIndex": true
"MaxAllowedPacket": 67108864,
"FlushCount": 10000, //
"MaxDatabaseOutputTask" : 4, //
"TreatJsonAsHex": false, // json16(0x)
"RestoreIndex": true // (RestoreIndex.sql)
},
"RedisCache": {
"Configuration": "192.168.1.246:6380",