项目重命名

This commit is contained in:
2024-02-01 15:25:42 +08:00
parent 70cf0322e4
commit e0de5d1c58
59 changed files with 46 additions and 211 deletions

View File

@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ConsoleApp2.HostedServices.Abstractions
{
public interface IDataReader : IDisposable
{
DataRecord Current { get; }
ValueTask<bool> ReadAsync();
}
}

View File

@@ -0,0 +1,9 @@
using ConsoleApp2.Options;
using ConsoleApp2.Services;
namespace ConsoleApp2.HostedServices.Abstractions;
public interface IInputService
{
public Task ExecuteAsync(CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,9 @@
using ConsoleApp2.Options;
using ConsoleApp2.Services;
namespace ConsoleApp2.HostedServices.Abstractions;
public interface IOutputService
{
public Task ExecuteAsync(CancellationToken ct);
}

View File

@@ -0,0 +1,9 @@
using ConsoleApp2.Options;
using ConsoleApp2.Services;
namespace ConsoleApp2.HostedServices.Abstractions;
public interface ITransformService
{
public Task ExecuteAsync(CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,111 @@
using ConsoleApp2.Const;
using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options;
using ConsoleApp2.Services;
using ConsoleApp2.Services.ETL;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ConsoleApp2.HostedServices;
public record FileInputInfo
{
public required string FileName { get; init; }
public required string TableName { get; init; }
public required string[] Headers { get; init; }
}
public enum FileInputType
{
MyDumperCsv,
MyDumperZst,
ErrorLog,
}
/// <summary>
/// 从输入目录中导入文件
/// </summary>
public class FileInputService : IInputService
{
private readonly ILogger _logger;
private readonly DataRecordQueue _producerQueue;
private readonly IOptions<DataInputOptions> _dataInputOptions;
private readonly ProcessContext _context;
private readonly DataReaderFactory _dataReaderFactory;
public FileInputService(ILogger<FileInputService> logger,
IOptions<DataInputOptions> dataInputOptions,
ProcessContext context,
[FromKeyedServices(ProcessStep.Produce)] DataRecordQueue producerQueue,
DataReaderFactory dataReaderFactory)
{
_logger = logger;
_dataInputOptions = dataInputOptions;
_context = context;
_producerQueue = producerQueue;
_dataReaderFactory = dataReaderFactory;
}
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
var inputDir = _dataInputOptions.Value.InputDir ?? throw new ApplicationException("未配置文件输入目录");
_logger.LogInformation("***** Input service started, working directory: {InputDir} *****", inputDir);
var trans = _dataInputOptions.Value.FileInputMetaBuilder;
if(trans is null) throw new ApplicationException("未配置文件名-表名映射委托");
FileInputInfo[] infoArr = Directory.GetFiles(inputDir)
.Select(f => trans(f))
.Where(info => info is not null).ToArray()!;
var orderedInfo = GetFilesInOrder(infoArr).ToArray();
_logger.LogInformation("***** {Count} files founded in directory{OrderedCount} files is matched with configuration *****", infoArr.Length, orderedInfo.Length);
foreach (var info in orderedInfo)
{
_logger.LogDebug("Table {TableName}: {FileName}", info.TableName, info.FileName);
}
foreach (var info in orderedInfo)
{
_logger.LogInformation("Reading file: {FileName}, table: {TableName}", info.FileName, info.TableName);
var source = _dataReaderFactory.CreateReader(info.FileName,info.TableName,info.Headers);
while (await source.ReadAsync())
{
var record = source.Current;
_producerQueue.Enqueue(record);
_context.AddInput();
}
_logger.LogInformation("Input of table: '{TableName}' finished", info.TableName);
}
_context.CompleteInput();
_logger.LogInformation("***** Input service finished *****");
}
/// <summary>
/// 读取配置,按照配置的表顺序来返回
/// </summary>
/// <returns></returns>
private IEnumerable<FileInputInfo> GetFilesInOrder(FileInputInfo[] inputFiles)
{
var tableOrder = _dataInputOptions.Value.TableOrder;
if (tableOrder is null or { Length: 0 })
return inputFiles;
return Yield();
IEnumerable<FileInputInfo> Yield()
{
foreach (var tableName in tableOrder)
{
var target = inputFiles.FirstOrDefault(f =>
f.TableName.Equals(tableName, StringComparison.OrdinalIgnoreCase));
if (target is not null)
yield return target;
}
}
}
}

View File

@@ -0,0 +1,186 @@
using System.Diagnostics;
using System.Text;
using ConsoleApp2.Helpers;
using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options;
using ConsoleApp2.Services;
using ConsoleApp2.Services.ErrorRecorder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySqlConnector;
namespace ConsoleApp2.HostedServices;
public class MainHostedService : BackgroundService
{
private Stopwatch? _stopwatch;
private readonly IInputService _input;
private readonly ITransformService _transform;
private readonly IOutputService _output;
private readonly TaskMonitorService _taskMonitor;
private readonly ILogger _logger;
private readonly ProcessContext _context;
private readonly IOptions<DatabaseOutputOptions> _databaseOptions;
private readonly IOptions<TenantDbOptions> _tenantDbOptions;
private readonly IConfiguration _config;
public MainHostedService(IInputService input,
ITransformService transform,
IOutputService output,
ILogger<MainHostedService> logger,
IOptions<TenantDbOptions> tenantDbOptions,
IOptions<DatabaseOutputOptions> databaseOptions,
IConfiguration config,
ProcessContext context,
TaskMonitorService taskMonitor)
{
_input = input;
_transform = transform;
_output = output;
_logger = logger;
_tenantDbOptions = tenantDbOptions;
_databaseOptions = databaseOptions;
_config = config;
_context = context;
_taskMonitor = taskMonitor;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var command = _config["Command"];
if (!string.IsNullOrEmpty(command))
{
_logger.LogInformation("***** Running Sql Command *****");
await ExecuteEachDatabase(command, stoppingToken);
Environment.Exit(0);
}
_stopwatch = Stopwatch.StartNew();
await SetVariableAsync(); // 开启延迟写入,禁用重做日志 >>> 重做日志处于禁用状态时不要关闭数据库服务!
var monitorTask = Task.Run(async () => await _taskMonitor.Monitor(stoppingToken), stoppingToken);
var inputTask = ExecuteAndCatch(
async () => await _input.ExecuteAsync(stoppingToken), "文件输入程序出现异常", stoppingToken);
var transformTask = ExecuteAndCatch(
async () => await _transform.ExecuteAsync(stoppingToken), "转换程序出现异常", stoppingToken);
var outputTask = ExecuteAndCatch(
async () => await _output.ExecuteAsync(stoppingToken), "输出程序出现异常", stoppingToken);
await Task.WhenAll(inputTask, transformTask, outputTask);
_stopwatch.Stop();
_logger.LogInformation("***** All tasks completed *****");
_logger.LogInformation("***** ElapseTime: {Time}", (_stopwatch.ElapsedMilliseconds / 1000f).ToString("F3"));
await Task.Delay(5000, stoppingToken);
await SetVariableAsync(false); // 关闭延迟写入,开启重做日志
if (!stoppingToken.IsCancellationRequested)
{
await ExportResultAsync();
_logger.LogInformation("The execution result export to {Path}",
Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md"));
Environment.Exit(0);
}
else Environment.Exit(1);
}
private Task ExecuteAndCatch(Func<Task> func, string message, CancellationToken ct)
{
return Task.Run(async () =>
{
try
{
await func();
}
catch (Exception e)
{
_logger.LogCritical(e, "{Msg}\t{ErrMsg}", message, e.Message);
_context.AddException(e);
Environment.Exit(1);
}
}, ct);
}
private async Task SetVariableAsync(bool enable = true)
{
var connStr = _databaseOptions.Value.ConnectionString
?? throw new ApplicationException("无法还原索引,因为分库配置中没有配置数据库");
if (enable)
{
await DatabaseHelper.NonQueryAsync(connStr,
"""
SET GLOBAL innodb_flush_log_at_trx_commit = 0;
ALTER INSTANCE DISABLE INNODB REDO_LOG;
""");
}
else
{
await DatabaseHelper.NonQueryAsync(connStr,
"""
SET GLOBAL innodb_flush_log_at_trx_commit = 1;
ALTER INSTANCE ENABLE INNODB REDO_LOG;
""");
}
}
private async Task ExecuteEachDatabase(string command, CancellationToken cancellationToken = default)
{
var databases = _tenantDbOptions.Value.DbList?.Keys
?? throw new ApplicationException("无法还原索引,因为分库配置中没有配置数据库");
var list = new List<Task>();
foreach (var db in databases)
{
var connStr = new MySqlConnectionStringBuilder(_databaseOptions.Value.ConnectionString
?? throw new ApplicationException("无法还原索引,因为没有配置数据库连接字符串"))
{
ConnectionTimeout = 60,
DefaultCommandTimeout = 0,
Database = db
}.ConnectionString;
var task = Task.Run(async () => await DatabaseHelper.NonQueryAsync(connStr, command),
cancellationToken);
list.Add(task);
}
await Task.WhenAll(list);
}
private async Task ExportResultAsync()
{
var sb = new StringBuilder();
if (_context.HasException)
sb.AppendLine("# Program Completed With Error");
else sb.AppendLine("# Program Completed Successfully");
sb.AppendLine("## Process Count");
var processCount = new[]
{
new { State = "Input", Count = _context.InputCount },
new { State = "Transform", Count = _context.TransformCount },
new { State = "Output", Count = _context.OutputCount }
};
sb.AppendLine(processCount.ToMarkdownTable());
sb.AppendLine("\n---\n");
sb.AppendLine("## Table Output Progress");
var tableOutputProgress = _context.TableProgress.Select(pair =>
new { Table = pair.Key, Count = pair.Value });
sb.AppendLine(tableOutputProgress.ToMarkdownTable());
sb.AppendLine("\n---\n");
sb.AppendLine("## Result");
var elapsedTime = (_stopwatch!.ElapsedMilliseconds / 1000f);
var result = new[]
{
new { Field = "ElapsedTime", Value = elapsedTime.ToString("F2") },
new
{
Field = "Average Output Speed",
Value = (_context.OutputCount / elapsedTime).ToString("F2") + "records/s"
}
};
sb.AppendLine(result.ToMarkdownTable());
await File.WriteAllTextAsync(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"Result-{ErrorRecorder.UID}.md"),
sb.ToString());
}
}

View File

@@ -0,0 +1,143 @@
using ConsoleApp2.Helpers;
using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options;
using ConsoleApp2.Services;
using ConsoleApp2.Services.ErrorRecorder;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySqlConnector;
using MySqlDestination = ConsoleApp2.Services.ETL.MySqlDestination;
using TaskExtensions = ConsoleApp2.Helpers.TaskExtensions;
namespace ConsoleApp2.HostedServices;
/// <summary>
/// 数据导出服务将数据导出至MySql服务
/// </summary>
public class OutputService : IOutputService
{
private readonly ILogger _logger;
private readonly IOptions<DatabaseOutputOptions> _outputOptions;
private readonly ProcessContext _context;
private readonly ErrorRecorderFactory _errorRecorderFactory;
private readonly RecordQueuePool _queuePool;
public OutputService(ILogger<OutputService> logger,
IOptions<DatabaseOutputOptions> outputOptions,
ProcessContext context,
RecordQueuePool queuePool,
ErrorRecorderFactory errorRecorderFactory)
{
_logger = logger;
_outputOptions = outputOptions;
_context = context;
_queuePool = queuePool;
_errorRecorderFactory = errorRecorderFactory;
}
public async Task ExecuteAsync(CancellationToken ct)
{
_logger.LogInformation("***** Output service started *****");
var dbTaskManager = new TaskManager(5);
var dbTasks = new Dictionary<string, Task>();
while (!_context.IsTransformCompleted)
{
foreach (var (db, queue) in _queuePool.Queues)
{
if (!dbTasks.ContainsKey(db))
{
dbTasks.Add(db, await dbTaskManager.CreateTaskAsync(
async () => await StartDatabaseWorker(db, queue, ct), ct));
}
}
await Task.Delay(500, ct);
}
await TaskExtensions.WaitUntil(() => dbTaskManager.RunningTaskCount == 0, 25, ct);
_context.CompleteOutput();
_logger.LogInformation("***** Output service finished *****");
}
private async Task StartDatabaseWorker(string db, DataRecordQueue queue, CancellationToken ct = default)
{
_logger.LogInformation("*****开启输出线程,数据库: {db} *****", db);
var taskManager = new TaskManager(_outputOptions.Value.MaxDatabaseOutputTask);
var tmp = new List<DataRecord>();
while (!_context.IsTransformCompleted || queue.Count > 0)
{
if (ct.IsCancellationRequested)
break;
if (!queue.TryDequeue(out var record)) continue;
var dbName = record.Database ?? throw new ApplicationException("输出的记录缺少数据库名");
if(dbName != db)
throw new ApplicationException($"输出记录的数据与当前输出线程不匹配,记录:{dbName}, 输出线程:{db}");
tmp.Add(record);
if (tmp.Count >= _outputOptions.Value.FlushCount)
{
var list = tmp;
tmp = [];
await taskManager.CreateTaskAsync(async arg => // 转换为方法组
{
var tuple = arg as Tuple<string, List<DataRecord>>;
try
{
await FlushAsync(tuple!.Item1, tuple.Item2);
}
catch (Exception e)
{
_logger.LogError(e, "输出记录时发生错误");
throw;
}
}, Tuple.Create(dbName, list), ct);
}
}
// 等待所有子任务完成
await TaskExtensions.WaitUntil(() => taskManager.RunningTaskCount == 0, 10, ct);
// 清理剩余记录
if (tmp.Count > 0)
{
await FlushAsync(db, tmp);
}
_logger.LogInformation("*****输出线程结束,数据库: {db} *****", db);
}
private async Task FlushAsync(string dbName, IEnumerable<DataRecord> records)
{
var connStr = new MySqlConnectionStringBuilder(_outputOptions.Value.ConnectionString
?? throw new ApplicationException("未配置数据库连接字符串"))
{
CharacterSet = "utf8mb4",
AllowUserVariables = true,
IgnoreCommandTransaction = true,
TreatTinyAsBoolean = false,
ConnectionTimeout = 60,
DefaultCommandTimeout = 0,
SslMode = MySqlSslMode.None,
Database = dbName
}.ConnectionString;
await using var output = new MySqlDestination(connStr, _logger,
_outputOptions, _errorRecorderFactory.CreateOutput(dbName), _context);
var tableOutput = new Dictionary<string, int>();
foreach (var record in records)
{
await output.WriteRecordAsync(record);
tableOutput.AddOrUpdate(record.TableName, 1, (_, v) => v + 1);
}
await output.FlushAsync(_outputOptions.Value.MaxAllowedPacket);
foreach (var (key, value) in tableOutput)
{
_context.AddTableOutput(key, value);
}
_logger.LogTrace("Flushed {Count} records", tableOutput.Values.Sum(i => i));
}
}

View File

@@ -0,0 +1,127 @@
using System.Diagnostics;
using ConsoleApp2.Const;
using ConsoleApp2.Services;
using ConsoleApp2.Services.Loggers;
using Microsoft.Extensions.DependencyInjection;
namespace ConsoleApp2.HostedServices;
/// <summary>
/// 任务监控
/// </summary>
public class TaskMonitorService
{
private readonly IEnumerable<ITaskMonitorLogger> _monitorLoggers;
private readonly ProcessContext _context;
private readonly DataRecordQueue _producerQueue;
private readonly RecordQueuePool _queuePool;
public TaskMonitorService(ProcessContext context,
[FromKeyedServices(ProcessStep.Produce)]
DataRecordQueue producerQueue,
RecordQueuePool queuePool,
IEnumerable<ITaskMonitorLogger> monitorLoggers)
{
_context = context;
_producerQueue = producerQueue;
_queuePool = queuePool;
_monitorLoggers = monitorLoggers;
}
public async Task Monitor(CancellationToken stoppingToken)
{
var sw = Stopwatch.StartNew();
var lastTime = sw.ElapsedMilliseconds;
var lastInputCount = _context.InputCount;
var lastTransformCount = _context.TransformCount;
var lastOutputCount = _context.OutputCount;
bool endCheck = false;
while (!stoppingToken.IsCancellationRequested)
{
EndCheck:
// var running = 0;
// var error = 0;
// var completed = 0;
// var canceled = 0;
// foreach (var task in _taskManager.Tasks)
// {
// switch (task.Status)
// {
// case TaskStatus.Canceled:
// canceled++;
// break;
// case TaskStatus.Faulted:
// error++;
// break;
// case TaskStatus.RanToCompletion:
// completed++;
// break;
// default:
// running++;
// break;
// }
// }
var time = sw.ElapsedMilliseconds;
var inputCount = _context.InputCount;
var transformCount = _context.TransformCount;
var outputCount = _context.OutputCount;
var elapseTime = (time - lastTime) / 1000f;
var inputSpeed = (inputCount - lastInputCount) / elapseTime;
var transformSpeed = (transformCount - lastTransformCount) / elapseTime;
var outputSpeed = (outputCount - lastOutputCount) / elapseTime;
// _logger.LogInformation(
// "Task monitor: running: {Running}, error: {Error}, completed: {Completed}, canceled: {Canceled}, outputSpeed: {Speed} records/s",
// running, error, completed, canceled, outputSpeed);
foreach (var logger in _monitorLoggers)
{
logger.LogStatus("Monitor: Progress status", new Dictionary<string, string>
{
{"Input",_context.IsInputCompleted ? "completed" : $"running {inputSpeed:F2} records/s" },
{"Transform", _context.IsTransformCompleted ? "completed" : $"running {transformSpeed:F2} records/s" },
{"Output", _context.IsOutputCompleted ? "completed" : $"running {outputSpeed:F2} records/s" }
});
logger.LogStatus("Monitor: Table output progress",
_context.TableProgress
.ToDictionary(kv => kv.Key, kv => kv.Value.ToString()),
ITaskMonitorLogger.LogLevel.Progress);
logger.LogStatus("Monitor: Process count", new Dictionary<string, string>
{
{"Input", inputCount.ToString()},
{"Transform", transformCount.ToString()},
{"Output", outputCount.ToString()}
}, ITaskMonitorLogger.LogLevel.Progress);
logger.LogStatus("Monitor: Queue", new Dictionary<string, string>
{
{"Producer queue records", _producerQueue.Count.ToString() },
{"Output queues", _queuePool.Queues.Count.ToString() },
{"Output queue records", _queuePool.Queues.Values.Sum(queue => queue.Count).ToString()},
});
}
await Task.Delay(5000, stoppingToken);
lastTime = time;
lastInputCount = inputCount;
lastTransformCount = transformCount;
lastOutputCount = outputCount;
if (_context is { IsInputCompleted: true, IsTransformCompleted: true, IsOutputCompleted: true })
{
if (!endCheck)
{
endCheck = true;
goto EndCheck;
}
break;
}
}
}
}

View File

@@ -0,0 +1,117 @@
using ConsoleApp2.Cache;
using ConsoleApp2.Const;
using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options;
using ConsoleApp2.Services;
using ConsoleApp2.Services.ErrorRecorder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ConsoleApp2.HostedServices;
public record DataTransformContext(DataRecord Record, ICacher Cacher, ILogger Logger);
/// <summary>
/// 数据处理服务,对导入后的数据进行处理
/// </summary>
public class TransformService : ITransformService
{
private readonly ILogger _logger;
private readonly IOptions<DataTransformOptions> _options;
private readonly DataRecordQueue _producerQueue;
private readonly RecordQueuePool _queuePool;
private readonly ProcessContext _context;
private readonly ICacher _cache;
private readonly ErrorRecorderFactory _errorRecorderFactory;
public TransformService(ILogger<TransformService> logger,
IOptions<DataTransformOptions> options,
[FromKeyedServices(ProcessStep.Produce)] DataRecordQueue producerQueue,
RecordQueuePool queuePool,
ProcessContext context,
ICacher cache,
ErrorRecorderFactory errorRecorderFactory)
{
_logger = logger;
_options = options;
_producerQueue = producerQueue;
_queuePool = queuePool;
_context = context;
_cache = cache;
_errorRecorderFactory = errorRecorderFactory;
}
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
while (!_context.IsInputCompleted || _producerQueue.Count > 0)
{
if (!_producerQueue.TryDequeue(out var record)) continue;
try
{
var context = new DataTransformContext(record, _cache, _logger);
if (_options.Value.EnableFilter)
{
// 数据过滤
var filter = _options.Value.RecordFilter;
if (filter is not null && await filter(context) == false) continue;
}
if (_options.Value.EnableReplacer)
{
// 数据替换
var replacer = _options.Value.RecordModify;
if (replacer is not null)
{
record = await replacer(context);
}
}
// 字段缓存
var cacher = _options.Value.RecordCache;
if(cacher is not null)
await cacher.Invoke(context);
//计算需要分流的数据库
var dbFilter = _options.Value.DatabaseFilter
?? throw new ApplicationException("未配置数据库过滤器");
record.Database = dbFilter(record);
_queuePool[record.Database].Enqueue(record);
_context.AddTransform();
if (_options.Value.EnableReBuilder)
{
//数据重建
var addRecords = _options.Value.RecordReBuild?.Invoke(context);
if (addRecords is { Count: > 0 })
{
foreach (var rc in addRecords)
{
if(dbFilter is not null)
rc.Database =dbFilter.Invoke(record);
_queuePool[record.Database].Enqueue(rc);
_context.AddTransform();
}
}
}
}
catch (Exception e)
{
_context.AddException(e);
var errorRecorder = _errorRecorderFactory.CreateTransform();
await errorRecorder.LogErrorRecordAsync(record, e);
if (!_options.Value.StrictMode)
_logger.LogError(e, "数据转换时发生错误");
else throw;
}
}
_context.CompleteTransform();
_logger.LogInformation("***** Data transformation service finished *****");
}
}

View File

@@ -0,0 +1,43 @@
using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Services;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.HostedServices;
// 空输出服务,测试用
public class VoidOutputService : IOutputService
{
private readonly ILogger _logger;
private readonly RecordQueuePool _queuePool;
private readonly ProcessContext _context;
public VoidOutputService(
ProcessContext context, ILogger<VoidOutputService> logger, RecordQueuePool queuePool)
{
_context = context;
_logger = logger;
_queuePool = queuePool;
}
public Task ExecuteAsync(CancellationToken ct)
{
_logger.LogInformation("***** Void Output Service Started *****");
while (!_context.IsTransformCompleted || _queuePool.Queues.Count > 0)
{
foreach (var pair in _queuePool.Queues) // 内存优化
{
if (_context.IsTransformCompleted && pair.Value.Count == 0)
{
_queuePool.RemoveQueue(pair.Key);
continue;
}
if(!pair.Value.TryDequeue(out var record)) continue;
_context.AddOutput();
}
}
_context.CompleteOutput();
_logger.LogInformation("***** Void Output Service Stopped *****");
return Task.CompletedTask;
}
}