using System.Buffers;
using System.Text;
using MesETL.App.Const;
using MesETL.App.Helpers;
using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Options;
using MesETL.App.Services;
using MesETL.App.Services.ErrorRecorder;
using MesETL.Shared.Helper;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySqlConnector;
using MySqlDestination = MesETL.App.Services.ETL.MySqlDestination;
using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
namespace MesETL.App.HostedServices;
public record DataOutputContext(IServiceProvider Serivces);
///
/// 数据导出服务,将数据导出至MySql服务
///
public class OutputService : IOutputService
{
private readonly ILogger _logger;
private readonly IOptions _outputOptions;
private readonly ProcessContext _context;
private readonly ErrorRecorderFactory _errorRecorderFactory;
private readonly RecordQueuePool _queuePool;
private readonly IServiceProvider _services;
public OutputService(ILogger logger,
IOptions outputOptions,
ProcessContext context,
RecordQueuePool queuePool,
ErrorRecorderFactory errorRecorderFactory,
IServiceProvider services)
{
_logger = logger;
_outputOptions = outputOptions;
_context = context;
_queuePool = queuePool;
_errorRecorderFactory = errorRecorderFactory;
_services = services;
}
public async Task ExecuteAsync(CancellationToken ct)
{
_logger.LogInformation("***** 输出服务已启动 *****");
var dbTaskManager = new TaskManager(_queuePool.Queues.Count);
var dbTasks = new Dictionary();
while (!_context.IsTransformCompleted)
{
foreach (var (db, queue) in _queuePool.Queues)
{
if (!dbTasks.ContainsKey(db))
{
dbTasks.Add(db, await dbTaskManager.CreateTaskAsync(
async () => await StartDatabaseWorker(db, queue, ct), ct));
}
}
await Task.Delay(500, ct);
}
await TaskExtensions.WaitUntil(() => dbTaskManager.RunningTaskCount == 0, 25, ct);
_context.CompleteOutput();
_outputOptions.Value.OutputFinished?.Invoke(new DataOutputContext(_services));
_logger.LogInformation("***** 输出服务执行完毕 *****");
}
private async Task StartDatabaseWorker(string db, DataRecordQueue queue, CancellationToken ct = default)
{
_logger.LogInformation("*****开启输出线程,数据库: {db} *****", db);
var taskManager = new TaskManager(_outputOptions.Value.MaxDatabaseOutputTask);
var ignoreOutput = new HashSet(_outputOptions.Value.NoOutput);
var tmp = new List(_outputOptions.Value.FlushCount);
while (!_context.IsTransformCompleted || queue.Count > 0)
{
if (ct.IsCancellationRequested)
break;
if (!queue.TryDequeue(out var record) || record.Ignore || ignoreOutput.Contains(record.TableName))
continue;
var dbName = record.Database ?? throw new ApplicationException("输出的记录缺少数据库名");
if(dbName != db)
throw new ApplicationException($"输出记录的数据与当前输出线程不匹配,记录:{dbName}, 输出线程:{db}");
tmp.Add(record);
if (tmp.Count >= _outputOptions.Value.FlushCount)
{
var list = tmp;
tmp = [];
await taskManager.CreateTaskAsync(async arg => // 转换为方法组
{
var tuple = arg as Tuple>;
try
{
await FlushAsync(tuple!.Item1, tuple.Item2);
}
catch (Exception e)
{
_logger.LogError(e, "输出记录时发生错误");
throw;
}
}, Tuple.Create(dbName, list), ct);
}
}
// 等待所有子任务完成
await TaskExtensions.WaitUntil(() => taskManager.RunningTaskCount == 0, 10, ct);
_logger.LogDebug("输出线程结束,清理剩余记录[{Count}]", tmp.Count);
// 清理剩余记录
if (tmp.Count > 0)
{
await FlushAsync(db, tmp);
}
_logger.LogInformation("***** 输出线程结束,数据库: {db} *****", db);
}
private async Task FlushAsync(string dbName, IEnumerable records)
{
var connStr = new MySqlConnectionStringBuilder(_outputOptions.Value.ConnectionString
?? throw new ApplicationException("未配置数据库连接字符串"))
{
CharacterSet = "utf8mb4",
AllowUserVariables = true,
IgnoreCommandTransaction = true,
TreatTinyAsBoolean = false,
ConnectionTimeout = 60,
DefaultCommandTimeout = 0,
SslMode = MySqlSslMode.None,
Database = dbName
}.ConnectionString;
await using var output = new MySqlDestination(connStr, _logger,
_outputOptions, _errorRecorderFactory.CreateOutput(dbName), _context);
var tableOutput = new Dictionary();
foreach (var record in records)
{
await output.WriteRecordAsync(record);
tableOutput.AddOrUpdate(record.TableName, 1, (_, v) => v + 1);
}
await output.FlushAsync(_outputOptions.Value.MaxAllowedPacket);
foreach (var (key, value) in tableOutput)
{
_context.AddOutput(value);
_context.AddTableOutput(key, value);
}
_logger.LogTrace("输出任务:刷新了 {Count} 条记录", tableOutput.Values.Sum(i => i));
}
}