MES-ETL/ConsoleApp2/HostedServices/OutputService.cs
2024-01-04 09:00:44 +08:00

83 lines
2.6 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using ConsoleApp2.Const;
using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options;
using ConsoleApp2.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ConsoleApp2.HostedServices;
/// <summary>
/// 数据导出服务将数据导出至MySql服务
/// </summary>
public class OutputService : IOutputService
{
private readonly ILogger _logger;
private readonly DataRecordQueue _consumerQueue;
private readonly IOptions<DatabaseOutputOptions> _options;
private readonly ProcessContext _context;
private readonly TaskManager _taskManager;
public OutputService(ILogger<OutputService> logger,
[FromKeyedServices(ProcessStep.Consumer)] DataRecordQueue consumerQueue,
IOptions<DatabaseOutputOptions> options,
ProcessContext context,
TaskManager taskManager)
{
_logger = logger;
_consumerQueue = consumerQueue;
_options = options;
_context = context;
_taskManager = taskManager;
}
public async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("***** Mysql output service started *****");
var records = new List<DataRecord>();
while (!_context.IsTransformCompleted || _consumerQueue.Count > 0)
{
if (!_consumerQueue.TryDequeue(out var record)) continue;
records.Add(record);
if (records.Count >= _options.Value.FlushCount)
{
var recordsCopy = records;
_taskManager.CreateTask(async () => await FlushAsync(recordsCopy), stoppingToken);
records = [];
}
if (_taskManager.TaskCount >= _options.Value.MaxTask)
{
await _taskManager.WaitAll();
_taskManager.ClearTask();
}
}
await _taskManager.WaitAll();
await FlushAsync(records);
_context.CompleteOutput();
_logger.LogInformation("***** Mysql output service completed *****");
}
private async Task FlushAsync(IEnumerable<DataRecord> records)
{
var count = 0;
await using var output = new MySqlDestination(
_options.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"),
_logger, true);
foreach (var record in records)
{
await output.WriteRecordAsync(record);
count++;
}
await output.FlushAsync();
_context.AddOutput(count);
}
}