using ConsoleApp2.Helpers; using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.Options; using ConsoleApp2.Services; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace ConsoleApp2.HostedServices; /// /// 数据导出服务,将数据导出至MySql服务 /// public class OutputService : IOutputService { private readonly ILogger _logger; private readonly IOptions _outputOptions; private readonly IOptions _transformOptions; private readonly ProcessContext _context; private readonly TaskManager _taskManager; private readonly ErrorRecorder _errorRecorder; public OutputService(ILogger logger, IOptions outputOptions, ProcessContext context, TaskManager taskManager, IOptions transformOptions, ErrorRecorder errorRecorder) { _logger = logger; _outputOptions = outputOptions; _context = context; _taskManager = taskManager; _transformOptions = transformOptions; _errorRecorder = errorRecorder; } public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken) { _logger.LogInformation("***** Mysql output service started *****"); _taskManager.CreateTasks(async () => { //k: database v: records,按照要导出的数据库名分组 var databaseDict = new Dictionary>(); while (!context.IsTransformCompleted || consumerQueue.Count > 0) { if (!consumerQueue.TryDequeue(out var record)) continue; var dbName = record.Database; var records = databaseDict.AddOrUpdate(dbName, [record], (_, list) => { list.Add(record); return list; }); if (records.Count >= tasksOptions.OutPutOptions.FlushCount) { await FlushAsync(dbName, records); records.Clear(); } } foreach (var (db, records) in databaseDict) { if (records.Count > 0) { await FlushAsync(db, records); records.Clear(); } } databaseDict.Clear(); _logger.LogInformation("***** Mysql output thread completed *****"); }, tasksOptions.OutPutOptions.OutPutTaskCount); await _taskManager.WaitAll(); //_context.CompleteOutput(); _logger.LogInformation("***** Mysql output service completed *****"); } private async Task FlushAsync(string dbName, IEnumerable records) { var count = 0; var connStr = _outputOptions.Value.ConnectionString ?? throw new InvalidOperationException("ConnectionString is null"); await using var output = new MySqlDestination($"{connStr};Database={dbName};", _logger, _context, _transformOptions, _errorRecorder); //if (records == null || records.Count() == 0) return; //var dbName = $"cferp_test_1"; //if (records != null && records.Count() > 0) //{ // dbName = $"cferp_test_{records.FirstOrDefault()?.CompanyID}"; //} //await using var output = new MySqlDestination(new MySqlConnectionStringBuilder //{ // Server = "127.0.0.1", // Port = 34309, // Database = dbName, // UserID = "root", // Password = "123456", // MaximumPoolSize = 50, //}.ConnectionString, _logger,true); foreach (var record in records) { await output.WriteRecordAsync(record); count++; } await output.FlushAsync(_outputOptions.Value.MaxAllowedPacket); _context.AddOutput(count); } }