MES-ETL/ConsoleApp2/HostedServices/OutputService.cs
lindj f167256082 修复获取cachekey错误问题
output使用ThreadPool.QueueUserWorkItem
2024-01-22 16:55:47 +08:00

106 lines
3.7 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using ConsoleApp2.Const;
using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options;
using ConsoleApp2.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ConsoleApp2.HostedServices;
/// <summary>
/// 数据导出服务将数据导出至MySql服务
/// </summary>
public class OutputService : IOutputService
{
private readonly ILogger _logger;
private readonly IOptions<DatabaseOutputOptions> _outputOptions;
private readonly IOptions<DataTransformOptions> _transformOptions;
private readonly ProcessContext _context;
private readonly TaskManager _taskManager;
private readonly ErrorRecorder _errorRecorder;
public OutputService(ILogger<OutputService> logger,
IOptions<DatabaseOutputOptions> outputOptions,
ProcessContext context,
TaskManager taskManager,
IOptions<DataTransformOptions> transformOptions,
ErrorRecorder errorRecorder)
{
_logger = logger;
_outputOptions = outputOptions;
_context = context;
_taskManager = taskManager;
_transformOptions = transformOptions;
_errorRecorder = errorRecorder;
}
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
{
_logger.LogInformation("***** Mysql output service started *****");
var records = new List<DataRecord>();
while (!context.IsTransformCompleted || consumerQueue.Count > 0)
{
if (!consumerQueue.TryDequeue(out var record)) continue;
records.Add(record);
if (records.Count >= tasksOptions.OutPutOptions.FlushCount)
{
var temp= new List<DataRecord>(records);
ThreadPool.QueueUserWorkItem(async (queueState) =>
{
await FlushAsync(temp);
});
records.Clear();
}
if (_context.GetExceptions().Count > 0)
{
_logger.LogInformation("***** Csv output thread is canceled *****");
return;
}
}
if (records.Count > 0)
{
var temp = new List<DataRecord>(records);
ThreadPool.QueueUserWorkItem(async (queueState) =>
{
await FlushAsync(temp);
});
records.Clear();
_logger.LogInformation("***** Mysql output thread completed *****");
}
}
private async Task FlushAsync(IEnumerable<DataRecord> records)
{
var count = 0;
await using var output = new MySqlDestination(
_outputOptions.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"),
_logger, _context, _transformOptions, _errorRecorder);
//if (records == null || records.Count() == 0) return;
//var dbName = $"cferp_test_1";
//if (records != null && records.Count() > 0)
//{
// dbName = $"cferp_test_{records.FirstOrDefault()?.CompanyID}";
//}
//await using var output = new MySqlDestination(new MySqlConnectionStringBuilder
//{
// Server = "127.0.0.1",
// Port = 34309,
// Database = dbName,
// UserID = "root",
// Password = "123456",
// MaximumPoolSize = 50,
//}.ConnectionString, _logger,true);
foreach (var record in records)
{
await output.WriteRecordAsync(record);
count++;
}
await output.FlushAsync(_outputOptions.Value.MaxAllowedPacket);
_context.AddOutput(count);
}
}