206 lines
8.2 KiB
C#
206 lines
8.2 KiB
C#
using System.Diagnostics;
|
||
using System.Text;
|
||
using MesETL.App.Helpers;
|
||
using MesETL.App.HostedServices.Abstractions;
|
||
using MesETL.App.Options;
|
||
using MesETL.App.Services;
|
||
using MesETL.App.Services.ErrorRecorder;
|
||
using MesETL.Shared.Helper;
|
||
using Microsoft.Extensions.Configuration;
|
||
using Microsoft.Extensions.Hosting;
|
||
using Microsoft.Extensions.Logging;
|
||
using Microsoft.Extensions.Options;
|
||
using MySqlConnector;
|
||
|
||
namespace MesETL.App.HostedServices;
|
||
|
||
public class MainHostedService : BackgroundService
|
||
{
|
||
private Stopwatch? _stopwatch;
|
||
private readonly IInputService _input;
|
||
private readonly ITransformService _transform;
|
||
private readonly IOutputService _output;
|
||
private readonly TaskMonitorService _taskMonitor;
|
||
private readonly ILogger _logger;
|
||
private readonly ProcessContext _context;
|
||
|
||
private readonly IOptions<DatabaseOutputOptions> _databaseOptions;
|
||
private readonly IOptions<TenantDbOptions> _tenantDbOptions;
|
||
private readonly IConfiguration _config;
|
||
|
||
public MainHostedService(IInputService input,
|
||
ITransformService transform,
|
||
IOutputService output,
|
||
ILogger<MainHostedService> logger,
|
||
IOptions<TenantDbOptions> tenantDbOptions,
|
||
IOptions<DatabaseOutputOptions> databaseOptions,
|
||
IConfiguration config,
|
||
ProcessContext context,
|
||
TaskMonitorService taskMonitor)
|
||
{
|
||
_input = input;
|
||
_transform = transform;
|
||
_output = output;
|
||
_logger = logger;
|
||
_tenantDbOptions = tenantDbOptions;
|
||
_databaseOptions = databaseOptions;
|
||
_config = config;
|
||
_context = context;
|
||
_taskMonitor = taskMonitor;
|
||
}
|
||
|
||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||
{
|
||
var command = _config["Command"];
|
||
if (!string.IsNullOrEmpty(command))
|
||
{
|
||
_logger.LogInformation("检测到命令参数传入,将对所有配置的数据库执行输入的命令。。。");
|
||
_logger.LogInformation("***** 执行SQL命令 *****");
|
||
await ExecuteEachDatabase(command, stoppingToken);
|
||
_logger.LogInformation("***** 执行完成 *****");
|
||
Environment.Exit(0);
|
||
}
|
||
|
||
_stopwatch = Stopwatch.StartNew();
|
||
var enableUnsafeVar = _config.GetValue<bool>("UnsafeVariable", false);
|
||
if (enableUnsafeVar)
|
||
await SetVariableAsync(); // 开启延迟写入,禁用重做日志 >>> 重做日志处于禁用状态时不要关闭数据库服务!
|
||
|
||
var monitorTask = Task.Run(async () => await _taskMonitor.Monitor(stoppingToken), stoppingToken);
|
||
var inputTask = ExecuteAndCatch(
|
||
async () => await _input.ExecuteAsync(stoppingToken), "文件输入程序出现异常", stoppingToken);
|
||
var transformTask = ExecuteAndCatch(
|
||
async () => await _transform.ExecuteAsync(stoppingToken), "转换程序出现异常", stoppingToken);
|
||
var outputTask = ExecuteAndCatch(
|
||
async () => await _output.ExecuteAsync(stoppingToken), "输出程序出现异常", stoppingToken);
|
||
|
||
await Task.WhenAll(inputTask, transformTask, outputTask);
|
||
_stopwatch.Stop();
|
||
_logger.LogInformation("***** 所有传输任务均已完成 *****");
|
||
if (_context.HasException)
|
||
_logger.LogError("***** 传输过程中有错误发生 *****");
|
||
_logger.LogInformation("***** 耗时:{Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
|
||
await Task.Delay(5000, stoppingToken);
|
||
|
||
if(enableUnsafeVar)
|
||
await SetVariableAsync(false); // 关闭延迟写入,开启重做日志
|
||
if (!stoppingToken.IsCancellationRequested)
|
||
{
|
||
await ExportResultAsync();
|
||
_logger.LogInformation("传输结果已保存至 {Path}",
|
||
Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md"));
|
||
|
||
Environment.Exit(0);
|
||
}
|
||
else Environment.Exit(1);
|
||
}
|
||
|
||
private Task ExecuteAndCatch(Func<Task> func, string message, CancellationToken ct)
|
||
{
|
||
return Task.Run(async () =>
|
||
{
|
||
try
|
||
{
|
||
await func();
|
||
}
|
||
catch (Exception e)
|
||
{
|
||
_logger.LogCritical(e, "{Msg}\t{ErrMsg}", message, e.Message);
|
||
_context.AddException(e);
|
||
Environment.Exit(1);
|
||
}
|
||
}, ct);
|
||
}
|
||
|
||
private async Task SetVariableAsync(bool enable = true)
|
||
{
|
||
var connStr = _databaseOptions.Value.ConnectionString
|
||
?? throw new ApplicationException("分库配置中没有配置数据库");
|
||
if (enable)
|
||
_logger.LogWarning("已开启MySQL延迟写入功能并禁用重做日志,请注意数据安全");
|
||
else _logger.LogInformation("不安全变量已关闭");
|
||
|
||
if (enable)
|
||
{
|
||
await DatabaseHelper.NonQueryAsync(connStr,
|
||
"""
|
||
SET GLOBAL innodb_flush_log_at_trx_commit = 0;
|
||
ALTER INSTANCE DISABLE INNODB REDO_LOG;
|
||
""");
|
||
}
|
||
else
|
||
{
|
||
await DatabaseHelper.NonQueryAsync(connStr,
|
||
"""
|
||
SET GLOBAL innodb_flush_log_at_trx_commit = 1;
|
||
ALTER INSTANCE ENABLE INNODB REDO_LOG;
|
||
""");
|
||
}
|
||
}
|
||
|
||
private async Task ExecuteEachDatabase(string command, CancellationToken cancellationToken = default)
|
||
{
|
||
var databases = _tenantDbOptions.Value.DbGroup?.Keys
|
||
?? throw new ApplicationException("分库配置中没有配置数据库");
|
||
var list = new List<Task>();
|
||
foreach (var db in databases)
|
||
{
|
||
var connStr = new MySqlConnectionStringBuilder(_databaseOptions.Value.ConnectionString
|
||
?? throw new ApplicationException("没有配置数据库连接字符串"))
|
||
{
|
||
ConnectionTimeout = 60,
|
||
DefaultCommandTimeout = 0,
|
||
Database = db
|
||
}.ConnectionString;
|
||
var task = Task.Run(async () => await DatabaseHelper.NonQueryAsync(connStr, command),
|
||
cancellationToken);
|
||
list.Add(task);
|
||
}
|
||
|
||
await Task.WhenAll(list);
|
||
}
|
||
|
||
private async Task ExportResultAsync()
|
||
{
|
||
var sb = new StringBuilder();
|
||
|
||
var title = (_config.GetValue("DryRun", false), _context.HasException) switch
|
||
{
|
||
(true, true) => "# 试运行结束,**请注意处理异常**",
|
||
(true, false) => "# 试运行结束,没有发生异常",
|
||
(false, true) => "# 程序执行完毕,**但中途发生了异常**",
|
||
(false, false) => "# 程序执行完毕,没有发生错误"
|
||
};
|
||
sb.AppendLine(title);
|
||
|
||
sb.AppendLine("## 处理计数");
|
||
var processCount = new[]
|
||
{
|
||
new { 操作 = "输入", 数量 = _context.InputCount },
|
||
new { 操作 = "转换", 数量 = _context.TransformCount },
|
||
new { 操作 = "输出", 数量 = _context.OutputCount }
|
||
};
|
||
sb.AppendLine(processCount.ToMarkdownTable());
|
||
sb.AppendLine("\n---\n");
|
||
sb.AppendLine("## 表输入/输出计数");
|
||
var tableOutputProgress = _context.TableProgress.Select(pair =>
|
||
new { 表名 = pair.Key, 计数 = pair.Value }).OrderBy(s => s.表名);
|
||
sb.AppendLine(tableOutputProgress.ToMarkdownTable());
|
||
sb.AppendLine("\n---\n");
|
||
sb.AppendLine("## 总览");
|
||
var elapsedTime = (_stopwatch!.ElapsedMilliseconds / 1000f);
|
||
var result = new[]
|
||
{
|
||
new { 条目 = "耗时", 值 = elapsedTime.ToString("F2") + " 秒" },
|
||
new
|
||
{
|
||
条目 = "平均处理速度",
|
||
值 = (_context.OutputCount / elapsedTime).ToString("F2") + " 条记录/秒"
|
||
},
|
||
new { 条目 = "内存占用峰值", 值 = _context.MaxMemoryUsage + " 兆字节" }
|
||
};
|
||
sb.AppendLine(result.ToMarkdownTable());
|
||
await File.WriteAllTextAsync(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md"),
|
||
sb.ToString());
|
||
}
|
||
} |