MES-ETL/MesETL.App/HostedServices/MainHostedService.cs

199 lines
7.9 KiB
C#
Raw Normal View History

2024-01-29 09:29:16 +08:00
using System.Diagnostics;
using System.Text;
using MesETL.App.Helpers;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
using MesETL.App.Services.ErrorRecorder;
2024-02-10 17:12:26 +08:00
using MesETL.Shared.Helper;
2024-01-29 09:29:16 +08:00
using Microsoft.Extensions.Configuration;
2024-01-04 09:00:44 +08:00
using Microsoft.Extensions.Hosting;
2024-01-12 16:50:37 +08:00
using Microsoft.Extensions.Logging;
2024-01-29 09:29:16 +08:00
using Microsoft.Extensions.Options;
2024-02-01 15:25:42 +08:00
using MySqlConnector;
2024-01-29 09:29:16 +08:00
namespace MesETL.App.HostedServices;
2024-01-04 09:00:44 +08:00
public class MainHostedService : BackgroundService
{
2024-01-29 09:29:16 +08:00
private Stopwatch? _stopwatch;
2024-01-04 09:00:44 +08:00
private readonly IInputService _input;
private readonly ITransformService _transform;
private readonly IOutputService _output;
2024-02-01 15:25:42 +08:00
private readonly TaskMonitorService _taskMonitor;
2024-01-29 09:29:16 +08:00
private readonly ILogger _logger;
2024-01-12 16:50:37 +08:00
private readonly ProcessContext _context;
2024-01-29 09:29:16 +08:00
private readonly IOptions<DatabaseOutputOptions> _databaseOptions;
private readonly IOptions<TenantDbOptions> _tenantDbOptions;
private readonly IConfiguration _config;
2024-01-04 09:00:44 +08:00
2024-01-29 09:29:16 +08:00
public MainHostedService(IInputService input,
ITransformService transform,
IOutputService output,
ILogger<MainHostedService> logger,
IOptions<TenantDbOptions> tenantDbOptions,
IOptions<DatabaseOutputOptions> databaseOptions,
IConfiguration config,
2024-02-01 15:25:42 +08:00
ProcessContext context,
TaskMonitorService taskMonitor)
2024-01-04 09:00:44 +08:00
{
_input = input;
_transform = transform;
_output = output;
2024-01-29 09:29:16 +08:00
_logger = logger;
_tenantDbOptions = tenantDbOptions;
_databaseOptions = databaseOptions;
_config = config;
2024-01-12 16:50:37 +08:00
_context = context;
2024-02-01 15:25:42 +08:00
_taskMonitor = taskMonitor;
2024-01-04 09:00:44 +08:00
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
2024-02-01 15:25:42 +08:00
var command = _config["Command"];
if (!string.IsNullOrEmpty(command))
{
2024-12-10 14:03:09 +08:00
_logger.LogInformation("检测到命令参数传入,将对所有配置的数据库执行输入的命令。。。");
_logger.LogInformation("***** 执行SQL命令 *****");
2024-02-01 15:25:42 +08:00
await ExecuteEachDatabase(command, stoppingToken);
2024-12-10 14:03:09 +08:00
_logger.LogInformation("***** 执行完成 *****");
2024-02-01 15:25:42 +08:00
Environment.Exit(0);
}
2024-02-01 13:41:59 +08:00
2024-02-01 15:25:42 +08:00
_stopwatch = Stopwatch.StartNew();
2024-02-15 16:18:50 +08:00
var enableUnsafeVar = _config.GetValue<bool>("UnsafeVariable", false);
if (enableUnsafeVar)
await SetVariableAsync(); // 开启延迟写入,禁用重做日志 >>> 重做日志处于禁用状态时不要关闭数据库服务!
2024-02-01 15:25:42 +08:00
var monitorTask = Task.Run(async () => await _taskMonitor.Monitor(stoppingToken), stoppingToken);
2024-01-29 09:29:16 +08:00
var inputTask = ExecuteAndCatch(
async () => await _input.ExecuteAsync(stoppingToken), "文件输入程序出现异常", stoppingToken);
var transformTask = ExecuteAndCatch(
async () => await _transform.ExecuteAsync(stoppingToken), "转换程序出现异常", stoppingToken);
var outputTask = ExecuteAndCatch(
async () => await _output.ExecuteAsync(stoppingToken), "输出程序出现异常", stoppingToken);
await Task.WhenAll(inputTask, transformTask, outputTask);
_stopwatch.Stop();
2024-12-10 14:03:09 +08:00
_logger.LogInformation("***** 所有传输任务均已完成 *****");
if (_context.HasException)
_logger.LogError("***** 传输过程中有错误发生 *****");
2024-12-10 14:03:09 +08:00
_logger.LogInformation("***** 耗时:{Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
2024-01-29 09:29:16 +08:00
await Task.Delay(5000, stoppingToken);
2024-02-15 16:18:50 +08:00
if(enableUnsafeVar)
await SetVariableAsync(false); // 关闭延迟写入,开启重做日志
2024-01-29 09:29:16 +08:00
if (!stoppingToken.IsCancellationRequested)
2024-01-12 16:50:37 +08:00
{
2024-01-29 09:29:16 +08:00
await ExportResultAsync();
2024-12-10 14:03:09 +08:00
_logger.LogInformation("传输结果已保存至 {Path}",
2024-01-29 09:29:16 +08:00
Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md"));
Environment.Exit(0);
}
else Environment.Exit(1);
}
private Task ExecuteAndCatch(Func<Task> func, string message, CancellationToken ct)
{
return Task.Run(async () =>
{
try
2024-01-12 16:50:37 +08:00
{
2024-01-29 09:29:16 +08:00
await func();
}
catch (Exception e)
2024-01-12 16:50:37 +08:00
{
2024-01-29 09:29:16 +08:00
_logger.LogCritical(e, "{Msg}\t{ErrMsg}", message, e.Message);
_context.AddException(e);
Environment.Exit(1);
}
}, ct);
}
2024-02-01 13:41:59 +08:00
private async Task SetVariableAsync(bool enable = true)
{
var connStr = _databaseOptions.Value.ConnectionString
?? throw new ApplicationException("分库配置中没有配置数据库");
if (enable)
_logger.LogWarning("已开启MySQL延迟写入功能并禁用重做日志请注意数据安全");
else _logger.LogInformation("不安全变量已关闭");
2024-02-01 13:41:59 +08:00
if (enable)
{
await DatabaseHelper.NonQueryAsync(connStr,
"""
SET GLOBAL innodb_flush_log_at_trx_commit = 0;
ALTER INSTANCE DISABLE INNODB REDO_LOG;
""");
}
else
{
await DatabaseHelper.NonQueryAsync(connStr,
"""
SET GLOBAL innodb_flush_log_at_trx_commit = 1;
ALTER INSTANCE ENABLE INNODB REDO_LOG;
""");
}
}
2024-02-01 15:25:42 +08:00
private async Task ExecuteEachDatabase(string command, CancellationToken cancellationToken = default)
2024-01-29 09:29:16 +08:00
{
var databases = _tenantDbOptions.Value.DbGroup?.Keys
?? throw new ApplicationException("分库配置中没有配置数据库");
2024-01-29 09:29:16 +08:00
var list = new List<Task>();
2024-02-01 15:25:42 +08:00
foreach (var db in databases)
2024-01-29 09:29:16 +08:00
{
2024-02-01 15:25:42 +08:00
var connStr = new MySqlConnectionStringBuilder(_databaseOptions.Value.ConnectionString
?? throw new ApplicationException("没有配置数据库连接字符串"))
2024-02-01 15:25:42 +08:00
{
ConnectionTimeout = 60,
DefaultCommandTimeout = 0,
Database = db
}.ConnectionString;
var task = Task.Run(async () => await DatabaseHelper.NonQueryAsync(connStr, command),
cancellationToken);
2024-01-29 09:29:16 +08:00
list.Add(task);
}
2024-02-01 13:41:59 +08:00
await Task.WhenAll(list);
2024-01-29 09:29:16 +08:00
}
private async Task ExportResultAsync()
{
var sb = new StringBuilder();
if (_context.HasException)
2024-12-10 14:03:09 +08:00
sb.AppendLine("# 程序执行完毕,**但中途发生了异常**");
else sb.AppendLine("# 程序执行完毕,没有发生错误");
sb.AppendLine("## 处理计数");
2024-01-29 09:29:16 +08:00
var processCount = new[]
2024-01-18 14:36:36 +08:00
{
2024-12-10 14:03:09 +08:00
new { = "输入", = _context.InputCount },
new { = "转换", = _context.TransformCount },
new { = "输出", = _context.OutputCount }
2024-01-18 14:36:36 +08:00
};
2024-01-29 09:29:16 +08:00
sb.AppendLine(processCount.ToMarkdownTable());
sb.AppendLine("\n---\n");
2024-12-10 14:03:09 +08:00
sb.AppendLine("## 表输入/输出计数");
2024-01-29 09:29:16 +08:00
var tableOutputProgress = _context.TableProgress.Select(pair =>
2024-12-10 14:03:09 +08:00
new { = pair.Key, = pair.Value }).OrderBy(s => s.);
2024-01-29 09:29:16 +08:00
sb.AppendLine(tableOutputProgress.ToMarkdownTable());
sb.AppendLine("\n---\n");
2024-12-10 14:03:09 +08:00
sb.AppendLine("## 总览");
2024-01-29 09:29:16 +08:00
var elapsedTime = (_stopwatch!.ElapsedMilliseconds / 1000f);
var result = new[]
2024-01-18 14:36:36 +08:00
{
2024-12-10 14:03:09 +08:00
new { = "耗时", = elapsedTime.ToString("F2") + " 秒" },
2024-01-29 09:29:16 +08:00
new
{
= "平均处理速度",
2024-12-10 14:03:09 +08:00
= (_context.OutputCount / elapsedTime).ToString("F2") + " 条记录/秒"
},
new { = "内存占用峰值", = _context.MaxMemoryUsage + " 兆字节" }
2024-01-18 14:36:36 +08:00
};
2024-01-29 09:29:16 +08:00
sb.AppendLine(result.ToMarkdownTable());
await File.WriteAllTextAsync(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md"),
sb.ToString());
2024-01-04 09:00:44 +08:00
}
}