using ConsoleApp2.Const; using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.Options; using ConsoleApp2.Services; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using MySqlConnector; using System.Threading; namespace ConsoleApp2.HostedServices; /// /// 数据导出服务,将数据导出至MySql服务 /// public class OutputService : IOutputService { private readonly ILogger _logger; private readonly DataRecordQueue _consumerQueue; private readonly IOptions _transOptions; private readonly IOptions _options; private readonly ProcessContext _context; private readonly TaskManager _taskManager; public OutputService(ILogger logger, [FromKeyedServices(ProcessStep.Consumer)] DataRecordQueue consumerQueue, IOptions options, IOptions transOptions, ProcessContext context, TaskManager taskManager) { _logger = logger; _consumerQueue = consumerQueue; _transOptions = transOptions; _options = options; _context = context; _taskManager = taskManager; } public async Task ExecuteAsync(CancellationToken cancellationToken) { _logger.LogInformation("***** Mysql output service started *****"); var count = 0; _taskManager.CreateTasks(async () => { var records = new List(); while (!_context.IsTransformCompleted || _consumerQueue.Count > 0) { if (!_consumerQueue.TryDequeue(out var record)) continue; records.Add(record); count++; //_logger.LogInformation(@"*****OutputCount: {count} *****",count); if (records.Count >= _options.Value.FlushCount) { await FlushAsync(records); records.Clear(); } if (_context.GetExceptions().Count>0) { _logger.LogInformation("***** Csv output thread is canceled *****"); return; } } if (records.Count > 0) { await FlushAsync(records); records.Clear(); _logger.LogInformation("***** Mysql output thread completed *****"); } }, _options.Value.TaskCount); await _taskManager.WaitAll(); //_context.CompleteOutput(); _logger.LogInformation(@"***** Mysql output service completed *****"); } private async Task FlushAsync(IEnumerable records) { var count = 0; await using var output = new MySqlDestination( _options.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"), _logger, _context,true); //if (records == null || records.Count() == 0) return; //var dbName = $"cferp_test_1"; //if (records != null && records.Count() > 0) //{ // dbName = $"cferp_test_{records.FirstOrDefault()?.CompanyID}"; //} //await using var output = new MySqlDestination(new MySqlConnectionStringBuilder //{ // Server = "127.0.0.1", // Port = 34309, // Database = dbName, // UserID = "root", // Password = "123456", // MaximumPoolSize = 50, //}.ConnectionString, _logger,true); foreach (var record in records) { await output.WriteRecordAsync(record); count++; } await output.FlushAsync(_options.Value.MaxAllowedPacket, _transOptions); _context.AddOutput(count); } }