MES-ETL/ConsoleApp2/HostedServices/OutputService.cs
2024-01-22 16:57:43 +08:00

94 lines
3.4 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options;
using ConsoleApp2.Services;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ConsoleApp2.HostedServices;
/// <summary>
/// 数据导出服务将数据导出至MySql服务
/// </summary>
public class OutputService : IOutputService
{
private readonly ILogger _logger;
private readonly IOptions<DatabaseOutputOptions> _outputOptions;
private readonly IOptions<DataTransformOptions> _transformOptions;
private readonly ProcessContext _context;
private readonly TaskManager _taskManager;
private readonly ErrorRecorder _errorRecorder;
public OutputService(ILogger<OutputService> logger,
IOptions<DatabaseOutputOptions> outputOptions,
ProcessContext context,
TaskManager taskManager,
IOptions<DataTransformOptions> transformOptions,
ErrorRecorder errorRecorder)
{
_logger = logger;
_outputOptions = outputOptions;
_context = context;
_taskManager = taskManager;
_transformOptions = transformOptions;
_errorRecorder = errorRecorder;
}
private int _runingTaskCount;
public int RuningTaskCount
{
get => _runingTaskCount;
}
public void DoTask() => Interlocked.Increment(ref _runingTaskCount);
public void FinishTask() => Interlocked.Decrement(ref _runingTaskCount);
public void ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
{
if (context.IsTransformCompleted == false && consumerQueue.Count < tasksOptions.OutPutOptions.FlushCount) return;
if (RuningTaskCount >= tasksOptions.OutPutOptions.OutPutTaskCount ) return;
var records = new List<DataRecord>();
for (int i = 0; i < tasksOptions.OutPutOptions.FlushCount; i++)
{
if (consumerQueue.TryDequeue(out var record)) records.Add(record);
else break;
}
if (records.Count > 0)
{
ThreadPool.QueueUserWorkItem(async (queueState) =>
{
DoTask();
await FlushAsync(records);
FinishTask();
});
}
}
private async Task FlushAsync(IEnumerable<DataRecord> records)
{
var count = 0;
await using var output = new MySqlDestination(
_outputOptions.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"),
_logger, _context, _transformOptions, _errorRecorder);
//if (records == null || records.Count() == 0) return;
//var dbName = $"cferp_test_1";
//if (records != null && records.Count() > 0)
//{
// dbName = $"cferp_test_{records.FirstOrDefault()?.CompanyID}";
//}
//await using var output = new MySqlDestination(new MySqlConnectionStringBuilder
//{
// Server = "127.0.0.1",
// Port = 34309,
// Database = dbName,
// UserID = "root",
// Password = "123456",
// MaximumPoolSize = 50,
//}.ConnectionString, _logger,true);
foreach (var record in records)
{
await output.WriteRecordAsync(record);
count++;
}
await output.FlushAsync(_outputOptions.Value.MaxAllowedPacket);
_context.AddOutput(count);
}
}