MES-ETL/ConsoleApp2/HostedServices/OutputService.cs
CZY 8da3110ecd 添加数据分库;
修复taskManager中异步方法没有正常等待的错误;
删除无用的异常捕获;
2024-01-19 11:17:22 +08:00

109 lines
4.0 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using ConsoleApp2.Helpers;
using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options;
using ConsoleApp2.Services;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ConsoleApp2.HostedServices;
/// <summary>
/// 数据导出服务将数据导出至MySql服务
/// </summary>
public class OutputService : IOutputService
{
private readonly ILogger _logger;
private readonly IOptions<DatabaseOutputOptions> _outputOptions;
private readonly IOptions<DataTransformOptions> _transformOptions;
private readonly ProcessContext _context;
private readonly TaskManager _taskManager;
private readonly ErrorRecorder _errorRecorder;
public OutputService(ILogger<OutputService> logger,
IOptions<DatabaseOutputOptions> outputOptions,
ProcessContext context,
TaskManager taskManager,
IOptions<DataTransformOptions> transformOptions,
ErrorRecorder errorRecorder)
{
_logger = logger;
_outputOptions = outputOptions;
_context = context;
_taskManager = taskManager;
_transformOptions = transformOptions;
_errorRecorder = errorRecorder;
}
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
{
_logger.LogInformation("***** Mysql output service started *****");
_taskManager.CreateTasks(async () =>
{
//k: database v: records按照要导出的数据库名分组
var databaseDict = new Dictionary<string, List<DataRecord>>();
while (!context.IsTransformCompleted || consumerQueue.Count > 0)
{
if (!consumerQueue.TryDequeue(out var record)) continue;
var dbName = record.Database;
var records = databaseDict.AddOrUpdate(dbName, [record], (_, list) =>
{
list.Add(record);
return list;
});
if (records.Count >= tasksOptions.OutPutOptions.FlushCount)
{
await FlushAsync(dbName, records);
records.Clear();
}
}
foreach (var (db, records) in databaseDict)
{
if (records.Count > 0)
{
await FlushAsync(db, records);
records.Clear();
}
}
databaseDict.Clear();
_logger.LogInformation("***** Mysql output thread completed *****");
}, tasksOptions.OutPutOptions.OutPutTaskCount);
await _taskManager.WaitAll();
//_context.CompleteOutput();
_logger.LogInformation("***** Mysql output service completed *****");
}
private async Task FlushAsync(string dbName, IEnumerable<DataRecord> records)
{
var count = 0;
var connStr = _outputOptions.Value.ConnectionString ?? throw new InvalidOperationException("ConnectionString is null");
await using var output = new MySqlDestination($"{connStr};Database={dbName};", _logger, _context, _transformOptions, _errorRecorder);
//if (records == null || records.Count() == 0) return;
//var dbName = $"cferp_test_1";
//if (records != null && records.Count() > 0)
//{
// dbName = $"cferp_test_{records.FirstOrDefault()?.CompanyID}";
//}
//await using var output = new MySqlDestination(new MySqlConnectionStringBuilder
//{
// Server = "127.0.0.1",
// Port = 34309,
// Database = dbName,
// UserID = "root",
// Password = "123456",
// MaximumPoolSize = 50,
//}.ConnectionString, _logger,true);
foreach (var record in records)
{
await output.WriteRecordAsync(record);
count++;
}
await output.FlushAsync(_outputOptions.Value.MaxAllowedPacket);
_context.AddOutput(count);
}
}