MES-ETL/MesETL.App/HostedServices/MainHostedService.cs

206 lines
8.2 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Diagnostics;
using System.Text;
using MesETL.App.Helpers;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
using MesETL.App.Services.ErrorRecorder;
using MesETL.Shared.Helper;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySqlConnector;
namespace MesETL.App.HostedServices;
public class MainHostedService : BackgroundService
{
private Stopwatch? _stopwatch;
private readonly IInputService _input;
private readonly ITransformService _transform;
private readonly IOutputService _output;
private readonly TaskMonitorService _taskMonitor;
private readonly ILogger _logger;
private readonly ProcessContext _context;
private readonly IOptions<DatabaseOutputOptions> _databaseOptions;
private readonly IOptions<TenantDbOptions> _tenantDbOptions;
private readonly IConfiguration _config;
public MainHostedService(IInputService input,
ITransformService transform,
IOutputService output,
ILogger<MainHostedService> logger,
IOptions<TenantDbOptions> tenantDbOptions,
IOptions<DatabaseOutputOptions> databaseOptions,
IConfiguration config,
ProcessContext context,
TaskMonitorService taskMonitor)
{
_input = input;
_transform = transform;
_output = output;
_logger = logger;
_tenantDbOptions = tenantDbOptions;
_databaseOptions = databaseOptions;
_config = config;
_context = context;
_taskMonitor = taskMonitor;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var command = _config["Command"];
if (!string.IsNullOrEmpty(command))
{
_logger.LogInformation("检测到命令参数传入,将对所有配置的数据库执行输入的命令。。。");
_logger.LogInformation("***** 执行SQL命令 *****");
await ExecuteEachDatabase(command, stoppingToken);
_logger.LogInformation("***** 执行完成 *****");
Environment.Exit(0);
}
_stopwatch = Stopwatch.StartNew();
var enableUnsafeVar = _config.GetValue<bool>("UnsafeVariable", false);
if (enableUnsafeVar)
await SetVariableAsync(); // 开启延迟写入,禁用重做日志 >>> 重做日志处于禁用状态时不要关闭数据库服务!
var monitorTask = Task.Run(async () => await _taskMonitor.Monitor(stoppingToken), stoppingToken);
var inputTask = ExecuteAndCatch(
async () => await _input.ExecuteAsync(stoppingToken), "文件输入程序出现异常", stoppingToken);
var transformTask = ExecuteAndCatch(
async () => await _transform.ExecuteAsync(stoppingToken), "转换程序出现异常", stoppingToken);
var outputTask = ExecuteAndCatch(
async () => await _output.ExecuteAsync(stoppingToken), "输出程序出现异常", stoppingToken);
await Task.WhenAll(inputTask, transformTask, outputTask);
_stopwatch.Stop();
_logger.LogInformation("***** 所有传输任务均已完成 *****");
if (_context.HasException)
_logger.LogError("***** 传输过程中有错误发生 *****");
_logger.LogInformation("***** 耗时:{Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
await Task.Delay(5000, stoppingToken);
if(enableUnsafeVar)
await SetVariableAsync(false); // 关闭延迟写入,开启重做日志
if (!stoppingToken.IsCancellationRequested)
{
await ExportResultAsync();
_logger.LogInformation("传输结果已保存至 {Path}",
Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md"));
Environment.Exit(0);
}
else Environment.Exit(1);
}
private Task ExecuteAndCatch(Func<Task> func, string message, CancellationToken ct)
{
return Task.Run(async () =>
{
try
{
await func();
}
catch (Exception e)
{
_logger.LogCritical(e, "{Msg}\t{ErrMsg}", message, e.Message);
_context.AddException(e);
Environment.Exit(1);
}
}, ct);
}
private async Task SetVariableAsync(bool enable = true)
{
var connStr = _databaseOptions.Value.ConnectionString
?? throw new ApplicationException("分库配置中没有配置数据库");
if (enable)
_logger.LogWarning("已开启MySQL延迟写入功能并禁用重做日志请注意数据安全");
else _logger.LogInformation("不安全变量已关闭");
if (enable)
{
await DatabaseHelper.NonQueryAsync(connStr,
"""
SET GLOBAL innodb_flush_log_at_trx_commit = 0;
ALTER INSTANCE DISABLE INNODB REDO_LOG;
""");
}
else
{
await DatabaseHelper.NonQueryAsync(connStr,
"""
SET GLOBAL innodb_flush_log_at_trx_commit = 1;
ALTER INSTANCE ENABLE INNODB REDO_LOG;
""");
}
}
private async Task ExecuteEachDatabase(string command, CancellationToken cancellationToken = default)
{
var databases = _tenantDbOptions.Value.DbGroup?.Keys
?? throw new ApplicationException("分库配置中没有配置数据库");
var list = new List<Task>();
foreach (var db in databases)
{
var connStr = new MySqlConnectionStringBuilder(_databaseOptions.Value.ConnectionString
?? throw new ApplicationException("没有配置数据库连接字符串"))
{
ConnectionTimeout = 60,
DefaultCommandTimeout = 0,
Database = db
}.ConnectionString;
var task = Task.Run(async () => await DatabaseHelper.NonQueryAsync(connStr, command),
cancellationToken);
list.Add(task);
}
await Task.WhenAll(list);
}
private async Task ExportResultAsync()
{
var sb = new StringBuilder();
var title = (_config.GetValue("DryRun", false), _context.HasException) switch
{
(true, true) => "# 试运行结束,**请注意处理异常**",
(true, false) => "# 试运行结束,没有发生异常",
(false, true) => "# 程序执行完毕,**但中途发生了异常**",
(false, false) => "# 程序执行完毕,没有发生错误"
};
sb.AppendLine(title);
sb.AppendLine("## 处理计数");
var processCount = new[]
{
new { = "输入", = _context.InputCount },
new { = "转换", = _context.TransformCount },
new { = "输出", = _context.OutputCount }
};
sb.AppendLine(processCount.ToMarkdownTable());
sb.AppendLine("\n---\n");
sb.AppendLine("## 表输入/输出计数");
var tableOutputProgress = _context.TableProgress.Select(pair =>
new { = pair.Key, = pair.Value }).OrderBy(s => s.);
sb.AppendLine(tableOutputProgress.ToMarkdownTable());
sb.AppendLine("\n---\n");
sb.AppendLine("## 总览");
var elapsedTime = (_stopwatch!.ElapsedMilliseconds / 1000f);
var result = new[]
{
new { = "耗时", = elapsedTime.ToString("F2") + " 秒" },
new
{
= "平均处理速度",
= (_context.OutputCount / elapsedTime).ToString("F2") + " 条记录/秒"
},
new { = "内存占用峰值", = _context.MaxMemoryUsage + " 兆字节" }
};
sb.AppendLine(result.ToMarkdownTable());
await File.WriteAllTextAsync(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md"),
sb.ToString());
}
}