MES-ETL/MesETL.App/HostedServices/OutputService.cs

143 lines
5.3 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using MesETL.App.Helpers;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
using MesETL.App.Services.ErrorRecorder;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySqlConnector;
using MySqlDestination = MesETL.App.Services.ETL.MySqlDestination;
using TaskExtensions = MesETL.App.Helpers.TaskExtensions;
namespace MesETL.App.HostedServices;
/// <summary>
/// 数据导出服务将数据导出至MySql服务
/// </summary>
public class OutputService : IOutputService
{
private readonly ILogger _logger;
private readonly IOptions<DatabaseOutputOptions> _outputOptions;
private readonly ProcessContext _context;
private readonly ErrorRecorderFactory _errorRecorderFactory;
private readonly RecordQueuePool _queuePool;
public OutputService(ILogger<OutputService> logger,
IOptions<DatabaseOutputOptions> outputOptions,
ProcessContext context,
RecordQueuePool queuePool,
ErrorRecorderFactory errorRecorderFactory)
{
_logger = logger;
_outputOptions = outputOptions;
_context = context;
_queuePool = queuePool;
_errorRecorderFactory = errorRecorderFactory;
}
public async Task ExecuteAsync(CancellationToken ct)
{
_logger.LogInformation("***** Output service started *****");
var dbTaskManager = new TaskManager(5);
var dbTasks = new Dictionary<string, Task>();
while (!_context.IsTransformCompleted)
{
foreach (var (db, queue) in _queuePool.Queues)
{
if (!dbTasks.ContainsKey(db))
{
dbTasks.Add(db, await dbTaskManager.CreateTaskAsync(
async () => await StartDatabaseWorker(db, queue, ct), ct));
}
}
await Task.Delay(500, ct);
}
await TaskExtensions.WaitUntil(() => dbTaskManager.RunningTaskCount == 0, 25, ct);
_context.CompleteOutput();
_logger.LogInformation("***** Output service finished *****");
}
private async Task StartDatabaseWorker(string db, DataRecordQueue queue, CancellationToken ct = default)
{
_logger.LogInformation("*****开启输出线程,数据库: {db} *****", db);
var taskManager = new TaskManager(_outputOptions.Value.MaxDatabaseOutputTask);
var tmp = new List<DataRecord>();
while (!_context.IsTransformCompleted || queue.Count > 0)
{
if (ct.IsCancellationRequested)
break;
if (!queue.TryDequeue(out var record)) continue;
var dbName = record.Database ?? throw new ApplicationException("输出的记录缺少数据库名");
if(dbName != db)
throw new ApplicationException($"输出记录的数据与当前输出线程不匹配,记录:{dbName}, 输出线程:{db}");
tmp.Add(record);
if (tmp.Count >= _outputOptions.Value.FlushCount)
{
var list = tmp;
tmp = [];
await taskManager.CreateTaskAsync(async arg => // 转换为方法组
{
var tuple = arg as Tuple<string, List<DataRecord>>;
try
{
await FlushAsync(tuple!.Item1, tuple.Item2);
}
catch (Exception e)
{
_logger.LogError(e, "输出记录时发生错误");
throw;
}
}, Tuple.Create(dbName, list), ct);
}
}
// 等待所有子任务完成
await TaskExtensions.WaitUntil(() => taskManager.RunningTaskCount == 0, 10, ct);
// 清理剩余记录
if (tmp.Count > 0)
{
await FlushAsync(db, tmp);
}
_logger.LogInformation("*****输出线程结束,数据库: {db} *****", db);
}
private async Task FlushAsync(string dbName, IEnumerable<DataRecord> records)
{
var connStr = new MySqlConnectionStringBuilder(_outputOptions.Value.ConnectionString
?? throw new ApplicationException("未配置数据库连接字符串"))
{
CharacterSet = "utf8mb4",
AllowUserVariables = true,
IgnoreCommandTransaction = true,
TreatTinyAsBoolean = false,
ConnectionTimeout = 60,
DefaultCommandTimeout = 0,
SslMode = MySqlSslMode.None,
Database = dbName
}.ConnectionString;
await using var output = new MySqlDestination(connStr, _logger,
_outputOptions, _errorRecorderFactory.CreateOutput(dbName), _context);
var tableOutput = new Dictionary<string, int>();
foreach (var record in records)
{
await output.WriteRecordAsync(record);
tableOutput.AddOrUpdate(record.TableName, 1, (_, v) => v + 1);
}
await output.FlushAsync(_outputOptions.Value.MaxAllowedPacket);
foreach (var (key, value) in tableOutput)
{
_context.AddTableOutput(key, value);
}
_logger.LogTrace("Flushed {Count} records", tableOutput.Values.Sum(i => i));
}
}