193 lines
7.6 KiB
C#
193 lines
7.6 KiB
C#
using System.Diagnostics;
|
|
using System.Text;
|
|
using MesETL.App.Helpers;
|
|
using MesETL.App.HostedServices.Abstractions;
|
|
using MesETL.App.Options;
|
|
using MesETL.App.Services;
|
|
using MesETL.App.Services.ErrorRecorder;
|
|
using MesETL.Shared.Helper;
|
|
using Microsoft.Extensions.Configuration;
|
|
using Microsoft.Extensions.Hosting;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using MySqlConnector;
|
|
|
|
namespace MesETL.App.HostedServices;
|
|
|
|
public class MainHostedService : BackgroundService
|
|
{
|
|
private Stopwatch? _stopwatch;
|
|
private readonly IInputService _input;
|
|
private readonly ITransformService _transform;
|
|
private readonly IOutputService _output;
|
|
private readonly TaskMonitorService _taskMonitor;
|
|
private readonly ILogger _logger;
|
|
private readonly ProcessContext _context;
|
|
|
|
private readonly IOptions<DatabaseOutputOptions> _databaseOptions;
|
|
private readonly IOptions<TenantDbOptions> _tenantDbOptions;
|
|
private readonly IConfiguration _config;
|
|
|
|
public MainHostedService(IInputService input,
|
|
ITransformService transform,
|
|
IOutputService output,
|
|
ILogger<MainHostedService> logger,
|
|
IOptions<TenantDbOptions> tenantDbOptions,
|
|
IOptions<DatabaseOutputOptions> databaseOptions,
|
|
IConfiguration config,
|
|
ProcessContext context,
|
|
TaskMonitorService taskMonitor)
|
|
{
|
|
_input = input;
|
|
_transform = transform;
|
|
_output = output;
|
|
_logger = logger;
|
|
_tenantDbOptions = tenantDbOptions;
|
|
_databaseOptions = databaseOptions;
|
|
_config = config;
|
|
_context = context;
|
|
_taskMonitor = taskMonitor;
|
|
}
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
var command = _config["Command"];
|
|
if (!string.IsNullOrEmpty(command))
|
|
{
|
|
_logger.LogInformation("检测到命令参数传入,将对所有配置的数据库执行输入的命令。。。");
|
|
_logger.LogInformation("***** 执行SQL命令 *****");
|
|
await ExecuteEachDatabase(command, stoppingToken);
|
|
_logger.LogInformation("***** 执行完成 *****");
|
|
Environment.Exit(0);
|
|
}
|
|
|
|
_stopwatch = Stopwatch.StartNew();
|
|
var enableUnsafeVar = _config.GetValue<bool>("UnsafeVariable", false);
|
|
if (enableUnsafeVar)
|
|
await SetVariableAsync(); // 开启延迟写入,禁用重做日志 >>> 重做日志处于禁用状态时不要关闭数据库服务!
|
|
|
|
var monitorTask = Task.Run(async () => await _taskMonitor.Monitor(stoppingToken), stoppingToken);
|
|
var inputTask = ExecuteAndCatch(
|
|
async () => await _input.ExecuteAsync(stoppingToken), "文件输入程序出现异常", stoppingToken);
|
|
var transformTask = ExecuteAndCatch(
|
|
async () => await _transform.ExecuteAsync(stoppingToken), "转换程序出现异常", stoppingToken);
|
|
var outputTask = ExecuteAndCatch(
|
|
async () => await _output.ExecuteAsync(stoppingToken), "输出程序出现异常", stoppingToken);
|
|
|
|
await Task.WhenAll(inputTask, transformTask, outputTask);
|
|
_stopwatch.Stop();
|
|
_logger.LogInformation("***** 所有传输任务均已完成 *****");
|
|
_logger.LogInformation("***** 耗时:{Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
|
|
await Task.Delay(5000, stoppingToken);
|
|
|
|
if(enableUnsafeVar)
|
|
await SetVariableAsync(false); // 关闭延迟写入,开启重做日志
|
|
if (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
await ExportResultAsync();
|
|
_logger.LogInformation("传输结果已保存至 {Path}",
|
|
Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md"));
|
|
|
|
Environment.Exit(0);
|
|
}
|
|
else Environment.Exit(1);
|
|
}
|
|
|
|
private Task ExecuteAndCatch(Func<Task> func, string message, CancellationToken ct)
|
|
{
|
|
return Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
await func();
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
_logger.LogCritical(e, "{Msg}\t{ErrMsg}", message, e.Message);
|
|
_context.AddException(e);
|
|
Environment.Exit(1);
|
|
}
|
|
}, ct);
|
|
}
|
|
|
|
private async Task SetVariableAsync(bool enable = true)
|
|
{
|
|
var connStr = _databaseOptions.Value.ConnectionString
|
|
?? throw new ApplicationException("分库配置中没有配置数据库");
|
|
if (enable)
|
|
{
|
|
await DatabaseHelper.NonQueryAsync(connStr,
|
|
"""
|
|
SET GLOBAL innodb_flush_log_at_trx_commit = 0;
|
|
ALTER INSTANCE DISABLE INNODB REDO_LOG;
|
|
""");
|
|
}
|
|
else
|
|
{
|
|
await DatabaseHelper.NonQueryAsync(connStr,
|
|
"""
|
|
SET GLOBAL innodb_flush_log_at_trx_commit = 1;
|
|
ALTER INSTANCE ENABLE INNODB REDO_LOG;
|
|
""");
|
|
}
|
|
}
|
|
|
|
private async Task ExecuteEachDatabase(string command, CancellationToken cancellationToken = default)
|
|
{
|
|
var databases = _tenantDbOptions.Value.DbGroup?.Keys
|
|
?? throw new ApplicationException("分库配置中没有配置数据库");
|
|
var list = new List<Task>();
|
|
foreach (var db in databases)
|
|
{
|
|
var connStr = new MySqlConnectionStringBuilder(_databaseOptions.Value.ConnectionString
|
|
?? throw new ApplicationException("没有配置数据库连接字符串"))
|
|
{
|
|
ConnectionTimeout = 60,
|
|
DefaultCommandTimeout = 0,
|
|
Database = db
|
|
}.ConnectionString;
|
|
var task = Task.Run(async () => await DatabaseHelper.NonQueryAsync(connStr, command),
|
|
cancellationToken);
|
|
list.Add(task);
|
|
}
|
|
|
|
await Task.WhenAll(list);
|
|
}
|
|
|
|
private async Task ExportResultAsync()
|
|
{
|
|
var sb = new StringBuilder();
|
|
if (_context.HasException)
|
|
sb.AppendLine("# 程序执行完毕,**但中途发生了异常**");
|
|
else sb.AppendLine("# 程序执行完毕,没有发生错误");
|
|
sb.AppendLine("## 处理计数");
|
|
var processCount = new[]
|
|
{
|
|
new { 操作 = "输入", 数量 = _context.InputCount },
|
|
new { 操作 = "转换", 数量 = _context.TransformCount },
|
|
new { 操作 = "输出", 数量 = _context.OutputCount }
|
|
};
|
|
sb.AppendLine(processCount.ToMarkdownTable());
|
|
sb.AppendLine("\n---\n");
|
|
sb.AppendLine("## 表输入/输出计数");
|
|
var tableOutputProgress = _context.TableProgress.Select(pair =>
|
|
new { 表名 = pair.Key, 计数 = pair.Value }).OrderBy(s => s.表名);
|
|
sb.AppendLine(tableOutputProgress.ToMarkdownTable());
|
|
sb.AppendLine("\n---\n");
|
|
sb.AppendLine("## 总览");
|
|
var elapsedTime = (_stopwatch!.ElapsedMilliseconds / 1000f);
|
|
var result = new[]
|
|
{
|
|
new { 条目 = "耗时", 值 = elapsedTime.ToString("F2") + " 秒" },
|
|
new
|
|
{
|
|
条目 = "平均处理速度",
|
|
值 = (_context.OutputCount / elapsedTime).ToString("F2") + " 条记录/秒"
|
|
},
|
|
new { 条目 = "内存占用峰值", 值 = _context.MaxMemoryUsage + " 兆字节" }
|
|
};
|
|
sb.AppendLine(result.ToMarkdownTable());
|
|
await File.WriteAllTextAsync(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md"),
|
|
sb.ToString());
|
|
}
|
|
} |