diff --git a/ConsoleApp2/ProcessStep.cs b/ConsoleApp2/Const/ProcessStep.cs similarity index 80% rename from ConsoleApp2/ProcessStep.cs rename to ConsoleApp2/Const/ProcessStep.cs index 908e918..fb5c4bd 100644 --- a/ConsoleApp2/ProcessStep.cs +++ b/ConsoleApp2/Const/ProcessStep.cs @@ -1,4 +1,4 @@ -namespace ConsoleApp2; +namespace ConsoleApp2.Const; public static class ProcessStep { diff --git a/ConsoleApp2/Entities/DataRecord.cs b/ConsoleApp2/DataRecord.cs similarity index 93% rename from ConsoleApp2/Entities/DataRecord.cs rename to ConsoleApp2/DataRecord.cs index 42ae33a..630eefb 100644 --- a/ConsoleApp2/Entities/DataRecord.cs +++ b/ConsoleApp2/DataRecord.cs @@ -1,6 +1,5 @@ -using System.Text; +namespace ConsoleApp2; -namespace ConsoleApp2.Entities; public class DataRecord { @@ -9,7 +8,7 @@ public class DataRecord value = string.Empty; if (record.Headers is null) throw new InvalidOperationException("Cannot get field when headers of a record have not been set."); - var idx = Array.IndexOf(record.Headers, columnName); + var idx = Array.IndexOf(record.Headers, columnName); //可能可以优化 if (idx == -1) return false; value = record.Fields[idx]; diff --git a/ConsoleApp2/Helpers/DumpDataHelper.cs b/ConsoleApp2/Helpers/DumpDataHelper.cs index 42c0844..5b38a5c 100644 --- a/ConsoleApp2/Helpers/DumpDataHelper.cs +++ b/ConsoleApp2/Helpers/DumpDataHelper.cs @@ -1,6 +1,5 @@ using System.Text; using System.Text.RegularExpressions; -using ConsoleApp2.Entities; namespace ConsoleApp2.Helpers; diff --git a/ConsoleApp2/Helpers/HashExtension.cs b/ConsoleApp2/Helpers/HashExtension.cs deleted file mode 100644 index 76fd359..0000000 --- a/ConsoleApp2/Helpers/HashExtension.cs +++ /dev/null @@ -1,249 +0,0 @@ -using System.Security.Cryptography; -using System.Text; - -namespace ConsoleApp2.Helpers; - -public static class HashExtensions -{ - /// - /// 计算32位MD5码 - /// - /// 字符串 - /// 返回哈希值格式 true:英文大写,false:英文小写 - /// - public static string ToMd5Hash(this string word, bool toUpper = true) - { - try - { - var MD5CSP = MD5.Create(); - var bytValue = Encoding.UTF8.GetBytes(word); - var bytHash = MD5CSP.ComputeHash(bytValue); - MD5CSP.Clear(); - //根据计算得到的Hash码翻译为MD5码 - var sHash = ""; - foreach (var t in bytHash) - { - long i = t / 16; - var sTemp = i > 9 ? ((char)(i - 10 + 0x41)).ToString() : ((char)(i + 0x30)).ToString(); - i = t % 16; - if (i > 9) - { - sTemp += ((char)(i - 10 + 0x41)).ToString(); - } - else - { - sTemp += ((char)(i + 0x30)).ToString(); - } - - sHash += sTemp; - } - - //根据大小写规则决定返回的字符串 - return toUpper ? sHash : sHash.ToLower(); - } - catch (System.Exception ex) - { - throw new System.Exception(ex.Message); - } - } - - public static string ToMd5Hash(this Stream stream, bool toUpper = true) - { - using var md5Hash = MD5.Create(); - var bytes = md5Hash.ComputeHash(stream); - return ToHashString(bytes, toUpper); - } - - /// - /// 计算SHA-1码 - /// - /// 字符串 - /// 返回哈希值格式 true:英文大写,false:英文小写 - /// - public static string ToSHA1Hash(this string word, bool toUpper = true) - { - try - { - var SHA1CSP = SHA1.Create(); - var bytValue = Encoding.UTF8.GetBytes(word); - var bytHash = SHA1CSP.ComputeHash(bytValue); - SHA1CSP.Clear(); - //根据计算得到的Hash码翻译为SHA-1码 - var sHash = ""; - foreach (var t in bytHash) - { - long i = t / 16; - var sTemp = i > 9 ? ((char)(i - 10 + 0x41)).ToString() : ((char)(i + 0x30)).ToString(); - i = t % 16; - if (i > 9) - { - sTemp += ((char)(i - 10 + 0x41)).ToString(); - } - else - { - sTemp += ((char)(i + 0x30)).ToString(); - } - - sHash += sTemp; - } - - //根据大小写规则决定返回的字符串 - return toUpper ? sHash : sHash.ToLower(); - } - catch (System.Exception ex) - { - throw new System.Exception(ex.Message); - } - } - - /// - /// 计算SHA-256码 - /// - /// 字符串 - /// 返回哈希值格式 true:英文大写,false:英文小写 - /// - public static string ToSHA256Hash(this string word, bool toUpper = true) - { - try - { - var SHA256CSP = SHA256.Create(); - var bytValue = Encoding.UTF8.GetBytes(word); - var bytHash = SHA256CSP.ComputeHash(bytValue); - SHA256CSP.Clear(); - //根据计算得到的Hash码翻译为SHA-1码 - var sHash = ""; - foreach (var t in bytHash) - { - long i = t / 16; - var sTemp = i > 9 ? ((char)(i - 10 + 0x41)).ToString() : ((char)(i + 0x30)).ToString(); - i = t % 16; - if (i > 9) - { - sTemp += ((char)(i - 10 + 0x41)).ToString(); - } - else - { - sTemp += ((char)(i + 0x30)).ToString(); - } - - sHash += sTemp; - } - - //根据大小写规则决定返回的字符串 - return toUpper ? sHash : sHash.ToLower(); - } - catch (System.Exception ex) - { - throw new System.Exception(ex.Message); - } - } - - /// - /// 计算SHA-256码 - /// - /// - /// - /// - public static string ToSHA256Hash(this Stream stream, bool toUpper = true) - { - using var sha256Hash = SHA256.Create(); - var bytes = sha256Hash.ComputeHash(stream); - return ToHashString(bytes, toUpper); - } - - /// - /// 计算SHA-384码 - /// - /// 字符串 - /// 返回哈希值格式 true:英文大写,false:英文小写 - /// - public static string ToSHA384Hash(this string word, bool toUpper = true) - { - try - { - var SHA384CSP = SHA384.Create(); - var bytValue = Encoding.UTF8.GetBytes(word); - var bytHash = SHA384CSP.ComputeHash(bytValue); - SHA384CSP.Clear(); - //根据计算得到的Hash码翻译为SHA-1码 - var sHash = ""; - foreach (var t in bytHash) - { - long i = t / 16; - var sTemp = i > 9 ? ((char)(i - 10 + 0x41)).ToString() : ((char)(i + 0x30)).ToString(); - i = t % 16; - if (i > 9) - { - sTemp += ((char)(i - 10 + 0x41)).ToString(); - } - else - { - sTemp += ((char)(i + 0x30)).ToString(); - } - - sHash += sTemp; - } - - //根据大小写规则决定返回的字符串 - return toUpper ? sHash : sHash.ToLower(); - } - catch (System.Exception ex) - { - throw new System.Exception(ex.Message); - } - } - - /// - /// 计算SHA-512码 - /// - /// 字符串 - /// 返回哈希值格式 true:英文大写,false:英文小写 - /// - public static string ToSHA512Hash(this string word, bool toUpper = true) - { - try - { - var SHA512CSP = SHA512.Create(); - var bytValue = Encoding.UTF8.GetBytes(word); - var bytHash = SHA512CSP.ComputeHash(bytValue); - SHA512CSP.Clear(); - //根据计算得到的Hash码翻译为SHA-1码 - var sHash = ""; - foreach (var t in bytHash) - { - long i = t / 16; - var sTemp = i > 9 ? ((char)(i - 10 + 0x41)).ToString() : ((char)(i + 0x30)).ToString(); - i = t % 16; - if (i > 9) - { - sTemp += ((char)(i - 10 + 0x41)).ToString(); - } - else - { - sTemp += ((char)(i + 0x30)).ToString(); - } - - sHash += sTemp; - } - - //根据大小写规则决定返回的字符串 - return toUpper ? sHash : sHash.ToLower(); - } - catch (System.Exception ex) - { - throw new System.Exception(ex.Message); - } - } - - private static string ToHashString(byte[] bytes, bool toUpper = true) - { - var builder = new StringBuilder(); - foreach (var t in bytes) - { - builder.Append(t.ToString("x2")); - } - - var str = builder.ToString(); - return toUpper ? str.ToUpper() : str.ToLower(); - } -} \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/Abstractions/IInputService.cs b/ConsoleApp2/HostedServices/Abstractions/IInputService.cs new file mode 100644 index 0000000..4294a44 --- /dev/null +++ b/ConsoleApp2/HostedServices/Abstractions/IInputService.cs @@ -0,0 +1,6 @@ +namespace ConsoleApp2.HostedServices.Abstractions; + +public interface IInputService +{ + public Task ExecuteAsync(CancellationToken cancellationToken); +} \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/Abstractions/IOutputService.cs b/ConsoleApp2/HostedServices/Abstractions/IOutputService.cs new file mode 100644 index 0000000..ff3377b --- /dev/null +++ b/ConsoleApp2/HostedServices/Abstractions/IOutputService.cs @@ -0,0 +1,6 @@ +namespace ConsoleApp2.HostedServices.Abstractions; + +public interface IOutputService +{ + public Task ExecuteAsync(CancellationToken cancellationToken); +} \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/Abstractions/ITransformService.cs b/ConsoleApp2/HostedServices/Abstractions/ITransformService.cs new file mode 100644 index 0000000..ee06b81 --- /dev/null +++ b/ConsoleApp2/HostedServices/Abstractions/ITransformService.cs @@ -0,0 +1,6 @@ +namespace ConsoleApp2.HostedServices.Abstractions; + +public interface ITransformService +{ + public Task ExecuteAsync(CancellationToken cancellationToken); +} \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/CsvInputService.cs b/ConsoleApp2/HostedServices/InputService.cs similarity index 79% rename from ConsoleApp2/HostedServices/CsvInputService.cs rename to ConsoleApp2/HostedServices/InputService.cs index a5d4213..c6385a0 100644 --- a/ConsoleApp2/HostedServices/CsvInputService.cs +++ b/ConsoleApp2/HostedServices/InputService.cs @@ -1,35 +1,36 @@ -using ConsoleApp2.Helpers; +using ConsoleApp2.Const; +using ConsoleApp2.Helpers; +using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.Options; using ConsoleApp2.Services; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace ConsoleApp2.HostedServices; -public class CsvInputService : BackgroundService +/// +/// 从MyDumper导出的CSV文件中导入表头和数据 +/// +public class InputService : IInputService { private readonly ILogger _logger; private readonly IOptions _csvOptions; - private readonly TaskManager _taskManager; // TBD private readonly DataRecordQueue _producerQueue; private readonly ProcessContext _context; - public CsvInputService(ILogger logger, - IOptions csvOptions, - [FromKeyedServices(ProcessStep.Producer)]TaskManager taskManager, + public InputService(ILogger logger, + IOptions csvOptions, [FromKeyedServices(ProcessStep.Producer)]DataRecordQueue producerQueue, ProcessContext context) { _logger = logger; _csvOptions = csvOptions; - _taskManager = taskManager; _producerQueue = producerQueue; _context = context; } - protected override async Task ExecuteAsync(CancellationToken cancellationToken) + public async Task ExecuteAsync(CancellationToken cancellationToken) { var inputDir = _csvOptions.Value.InputDir; _logger.LogInformation("***** Csv input service start, working dir: {InputDir}, thread id: {ThreadId} *****", inputDir, Environment.CurrentManagedThreadId); @@ -50,7 +51,7 @@ public class CsvInputService : BackgroundService { var csvPath = Path.Combine(inputDir, csvFile); // var source = new JsvSource(csvPath, headers, _logger); - var source = new NewCsvSource(csvPath, headers, logger: _logger); + var source = new CsvSource(csvPath, headers, _csvOptions.Value.Delimiter, _csvOptions.Value.QuoteChar, _logger); while (await source.ReadAsync()) { diff --git a/ConsoleApp2/HostedServices/MainHostedService.cs b/ConsoleApp2/HostedServices/MainHostedService.cs new file mode 100644 index 0000000..1e2d2db --- /dev/null +++ b/ConsoleApp2/HostedServices/MainHostedService.cs @@ -0,0 +1,30 @@ +using ConsoleApp2.HostedServices.Abstractions; +using Microsoft.Extensions.Hosting; + +namespace ConsoleApp2.HostedServices; + +public class MainHostedService : BackgroundService +{ + private readonly IInputService _input; + private readonly ITransformService _transform; + private readonly IOutputService _output; + + public MainHostedService(IInputService input, ITransformService transform, IOutputService output) + { + _input = input; + _transform = transform; + _output = output; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var tasks = new List() + { + Task.Run(async () => await _input.ExecuteAsync(stoppingToken), stoppingToken), + Task.Run(async () => await _transform.ExecuteAsync(stoppingToken), stoppingToken), + Task.Run(async () => await _output.ExecuteAsync(stoppingToken), stoppingToken), + }; + await Task.WhenAll(tasks); + // await Task.Run(async () => await _output.ExecuteAsync(stoppingToken), stoppingToken); + } +} \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/MysqlOutputService.cs b/ConsoleApp2/HostedServices/OutputService.cs similarity index 53% rename from ConsoleApp2/HostedServices/MysqlOutputService.cs rename to ConsoleApp2/HostedServices/OutputService.cs index 6953e2a..c1647c0 100644 --- a/ConsoleApp2/HostedServices/MysqlOutputService.cs +++ b/ConsoleApp2/HostedServices/OutputService.cs @@ -1,78 +1,76 @@ -using ConsoleApp2.Entities; +using ConsoleApp2.Const; +using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.Options; using ConsoleApp2.Services; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using MySqlConnector; namespace ConsoleApp2.HostedServices; -public class MysqlOutputService : BackgroundService +/// +/// 数据导出服务,将数据导出至MySql服务 +/// +public class OutputService : IOutputService { private readonly ILogger _logger; private readonly DataRecordQueue _consumerQueue; - private readonly IOptions _options; + private readonly IOptions _options; private readonly ProcessContext _context; + private readonly TaskManager _taskManager; - public MysqlOutputService(ILogger logger, - [FromKeyedServices(ProcessStep.Consumer)]DataRecordQueue consumerQueue, - IOptions options, - ProcessContext context) + public OutputService(ILogger logger, + [FromKeyedServices(ProcessStep.Consumer)] DataRecordQueue consumerQueue, + IOptions options, + ProcessContext context, + TaskManager taskManager) { _logger = logger; _consumerQueue = consumerQueue; _options = options; _context = context; + _taskManager = taskManager; } - protected override async Task ExecuteAsync(CancellationToken stoppingToken) + public async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("***** Mysql output service started *****"); - var tasks = new List(); var records = new List(); while (!_context.IsTransformCompleted || _consumerQueue.Count > 0) { if (!_consumerQueue.TryDequeue(out var record)) continue; records.Add(record); - if (records.Count >= 200) + if (records.Count >= _options.Value.FlushCount) { var recordsCopy = records; - tasks.Add(Task.Run(async () => await FlushAsync(recordsCopy), stoppingToken)); + _taskManager.CreateTask(async () => await FlushAsync(recordsCopy), stoppingToken); records = []; } - if (tasks.Count >= 10) + if (_taskManager.TaskCount >= _options.Value.MaxTask) { - await Task.WhenAll(tasks); - tasks.Clear(); + await _taskManager.WaitAll(); + _taskManager.ClearTask(); } } - await Task.WhenAll(tasks); + await _taskManager.WaitAll(); await FlushAsync(records); - + _context.CompleteOutput(); - + _logger.LogInformation("***** Mysql output service completed *****"); } private async Task FlushAsync(IEnumerable records) { var count = 0; - await using var output = new MySqlDestination(new MySqlConnectionStringBuilder - { - Server = _options.Value.Host, - Port = _options.Value.Port, - Database = _options.Value.Database, - UserID = _options.Value.User, - Password = _options.Value.Password, - ConnectionTimeout = 180, - }.ConnectionString, _logger, true); - + await using var output = new MySqlDestination( + _options.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"), + _logger, true); + foreach (var record in records) { await output.WriteRecordAsync(record); diff --git a/ConsoleApp2/HostedServices/SqlFileOutputService.cs b/ConsoleApp2/HostedServices/SqlFileOutputService.cs deleted file mode 100644 index b582ba0..0000000 --- a/ConsoleApp2/HostedServices/SqlFileOutputService.cs +++ /dev/null @@ -1,64 +0,0 @@ -using ConsoleApp2.Entities; -using ConsoleApp2.Helpers; -using ConsoleApp2.Services; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; - -namespace ConsoleApp2.HostedServices; - -public class SqlFileOutputService : BackgroundService -{ - private readonly string _outputFile = "D:/DumpOutput/cferp_test_1.sql"; // - private readonly DataRecordQueue _consumerQueue; - private readonly ILogger _logger; - private readonly ProcessContext _context; - - public SqlFileOutputService( - ILogger logger, - [FromKeyedServices(ProcessStep.Consumer)] - DataRecordQueue consumerQueue, - ProcessContext context) - { - _logger = logger; - _consumerQueue = consumerQueue; - _context = context; - } - - - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - _logger.LogInformation("***** Sql file output service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId); - var count = 0; - var tableRecords = new Dictionary>(); - while (!_context.IsTransformCompleted || _consumerQueue.Count > 0) - { - if (!_consumerQueue.TryDequeue(out var record)) continue; - - tableRecords.AddOrUpdate(record.TableName, [record], (key, value) => - { - value.Add(record); - return value; - }); - - ++count; - - if (count >= 200) - { - await File.AppendAllTextAsync(_outputFile, - MySqlDestination.SerializeRecords(tableRecords), stoppingToken); - tableRecords.Clear(); - _context.AddOutput(count); - count = 0; - } - } - await File.AppendAllTextAsync(_outputFile, - MySqlDestination.SerializeRecords(tableRecords), stoppingToken); - tableRecords.Clear(); - _context.AddOutput(count); - _context.CompleteOutput(); - - _logger.LogInformation("***** Sql file output service completed *****"); - } - -} \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/TaskMonitorService.cs b/ConsoleApp2/HostedServices/TaskMonitorService.cs index ed95dae..eb65ee8 100644 --- a/ConsoleApp2/HostedServices/TaskMonitorService.cs +++ b/ConsoleApp2/HostedServices/TaskMonitorService.cs @@ -1,4 +1,5 @@ using System.Diagnostics; +using ConsoleApp2.Const; using ConsoleApp2.Services; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -6,6 +7,9 @@ using Microsoft.Extensions.Logging; namespace ConsoleApp2.HostedServices; +/// +/// 任务监控 +/// public class TaskMonitorService : BackgroundService { private readonly IHostApplicationLifetime _lifetime; @@ -93,7 +97,7 @@ public class TaskMonitorService : BackgroundService _logger.LogInformation("Queue monitor: producer queue: {ProducerQueue}, consumer queue: {ConsumerQueue}", _producerQueue.Count, _consumerQueue.Count); - await Task.Delay(2000); + await Task.Delay(5000); lastTime = time; lastInputCount = inputCount; diff --git a/ConsoleApp2/HostedServices/DataTransformService.cs b/ConsoleApp2/HostedServices/TransformService.cs similarity index 83% rename from ConsoleApp2/HostedServices/DataTransformService.cs rename to ConsoleApp2/HostedServices/TransformService.cs index 702c9ab..ae2597c 100644 --- a/ConsoleApp2/HostedServices/DataTransformService.cs +++ b/ConsoleApp2/HostedServices/TransformService.cs @@ -1,14 +1,17 @@ -using ConsoleApp2.Helpers; +using ConsoleApp2.Const; +using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.Options; using ConsoleApp2.Services; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace ConsoleApp2.HostedServices; -public class DataTransformService : BackgroundService +/// +/// 数据处理服务,对导入后的数据进行处理 +/// +public class TransformService : ITransformService { private readonly ILogger _logger; private readonly IOptions _options; @@ -17,21 +20,20 @@ public class DataTransformService : BackgroundService private readonly ProcessContext _context; - public DataTransformService(ILogger logger, - IOptions options, // TBD: database filter + public TransformService(ILogger logger, + IOptions options, [FromKeyedServices(ProcessStep.Producer)]DataRecordQueue producerQueue, [FromKeyedServices(ProcessStep.Consumer)]DataRecordQueue consumerQueue, ProcessContext context) { _logger = logger; - // _taskManager = taskManager; _options = options; _producerQueue = producerQueue; _consumerQueue = consumerQueue; _context = context; } - protected override async Task ExecuteAsync(CancellationToken cancellationToken) + public async Task ExecuteAsync(CancellationToken cancellationToken) { _logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId); while (!_context.IsInputCompleted || _producerQueue.Count > 0) @@ -58,7 +60,6 @@ public class DataTransformService : BackgroundService field = string.IsNullOrEmpty(field) ? "''" : $"0x{field}"; break; default: - field = field; break; } @@ -66,6 +67,8 @@ public class DataTransformService : BackgroundService record[i] = field; } + // TODO: 数据处理/过滤/复制 + _consumerQueue.Enqueue(record); _context.AddTransform(); } diff --git a/ConsoleApp2/HostedServices/VoidOutputService.cs b/ConsoleApp2/HostedServices/VoidOutputService.cs index 04ef787..2de895c 100644 --- a/ConsoleApp2/HostedServices/VoidOutputService.cs +++ b/ConsoleApp2/HostedServices/VoidOutputService.cs @@ -1,11 +1,14 @@ -using ConsoleApp2.Services; +using ConsoleApp2.Const; +using ConsoleApp2.HostedServices.Abstractions; +using ConsoleApp2.Services; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; namespace ConsoleApp2.HostedServices; -public class VoidOutputService : BackgroundService +// 空输出服务,测试用 +public class VoidOutputService : IOutputService { private readonly ILogger _logger; private readonly DataRecordQueue _consumerQueue; @@ -19,7 +22,7 @@ public class VoidOutputService : BackgroundService _logger = logger; } - protected override Task ExecuteAsync(CancellationToken stoppingToken) + public Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("***** Void output service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId); while (!_context.IsTransformCompleted || _consumerQueue.Count > 0) diff --git a/ConsoleApp2/Options/CsvOptions.cs b/ConsoleApp2/Options/CsvOptions.cs index 9507fd8..ab03ee6 100644 --- a/ConsoleApp2/Options/CsvOptions.cs +++ b/ConsoleApp2/Options/CsvOptions.cs @@ -3,26 +3,17 @@ public class CsvOptions { /// - /// The directory to input csv and sql file. + /// MyDumper导出的CSV文件目录 /// public string InputDir { get; set; } = "./"; - /// - /// The output directory. - /// - public string OutputDir { get; set; } = "./Output"; /// - /// The ASCII char that fields are enclosed by. Default is '"'. + /// 字符串的包围符号,默认为双引号" /// public char QuoteChar { get; set; } = '"'; /// - /// The ASCII char that fields are separated by. Default is ','. + /// 每个字段的分割符,默认逗号, /// - public char DelimiterChar { get; set; } = ','; - - /// - /// The max number of threads to use. - /// - public int MaxThreads { get; set; } = 12; + public string Delimiter { get; set; } = ","; } \ No newline at end of file diff --git a/ConsoleApp2/Options/DataTransformOptions.cs b/ConsoleApp2/Options/DataTransformOptions.cs index b8d04cf..6ee7125 100644 --- a/ConsoleApp2/Options/DataTransformOptions.cs +++ b/ConsoleApp2/Options/DataTransformOptions.cs @@ -1,6 +1,4 @@ -using ConsoleApp2.Entities; - -namespace ConsoleApp2.Options; +namespace ConsoleApp2.Options; public enum ColumnType { @@ -12,6 +10,10 @@ public enum ColumnType public class DataTransformOptions { public Func? DatabaseFilter { get; set; } + + /// + /// 配置导入数据的特殊列 + /// public Dictionary ColumnTypeConfig { get; set; } = new(); // "table.column" -> type public ColumnType GetColumnType(string table, string column) diff --git a/ConsoleApp2/Options/DatabaseOptions.cs b/ConsoleApp2/Options/DatabaseOptions.cs deleted file mode 100644 index 4cdef2d..0000000 --- a/ConsoleApp2/Options/DatabaseOptions.cs +++ /dev/null @@ -1,10 +0,0 @@ -namespace ConsoleApp2.Options; - -public class DatabaseOptions -{ - public string Host { get; set; } - public uint Port { get; set; } - public string Database { get; set; } - public string User { get; set; } - public string Password { get; set; } -} \ No newline at end of file diff --git a/ConsoleApp2/Options/DatabaseOutputOptions.cs b/ConsoleApp2/Options/DatabaseOutputOptions.cs new file mode 100644 index 0000000..03a03ce --- /dev/null +++ b/ConsoleApp2/Options/DatabaseOutputOptions.cs @@ -0,0 +1,17 @@ +namespace ConsoleApp2.Options; + +public class DatabaseOutputOptions +{ + /// + /// 数据库连接字符串 + /// + public string? ConnectionString { get; set; } + /// + /// 输出服务的最大任务(Task)数 + /// + public int MaxTask { get; set; } + /// + /// 每个任务每次提交到数据库的记录数量(每N条构建一次SQL语句) + /// + public int FlushCount { get; set; } +} \ No newline at end of file diff --git a/ConsoleApp2/Program.cs b/ConsoleApp2/Program.cs index 0dd2202..0fad605 100644 --- a/ConsoleApp2/Program.cs +++ b/ConsoleApp2/Program.cs @@ -1,26 +1,17 @@ -using ConsoleApp2; +using ConsoleApp2.Const; using ConsoleApp2.HostedServices; +using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.Options; using ConsoleApp2.Services; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using MySqlConnector; using Serilog; - -// 加入数据库过滤 -// HostedService不是并行的,完善TaskManager手动开启线程 -// 测试BlockingCollection对速度的影响? - -// 重新同步数据 -// Json列和Blob列不一致 -/* JSV导出带转义的列有误 - * order_data_block表id 4153969 - * order_data_parts表Spec列 - * order_module表Name列 - * process_group表Items列 - */ +// 运行之前把Mysql max_allowed_packets 调大 +// 运行之前把process_step表的外键删掉 await RunProgram(); return; @@ -29,18 +20,15 @@ async Task RunProgram() { ThreadPool.SetMaxThreads(200, 200); var host = Host.CreateApplicationBuilder(); - host.Configuration.AddCommandLine(args); + host.Configuration.AddCommandLine(args); host.Services.Configure(option => { - option.DelimiterChar = ','; + option.Delimiter = ","; option.QuoteChar = '"'; - option.InputDir = "D:/Dump/MyDumper"; - option.OutputDir = "D:/DumpOutput"; - option.MaxThreads = 12; + option.InputDir = "D:/Dump"; }); host.Services.Configure(options => { - //TODO: Database Filter options.DatabaseFilter = record => "cferp_test_1"; options.ColumnTypeConfig = new() { @@ -73,14 +61,20 @@ async Task RunProgram() { "order_block_plan.BlockInfo", ColumnType.Text }, }; }); -// 加入数据库过滤后删除 - host.Services.Configure(options => + + host.Services.Configure(options => { - options.Host = "localhost"; - options.Port = 33306; - options.Database = "cferp_test_1"; - options.User = "root"; - options.Password = "123456"; + options.ConnectionString = new MySqlConnectionStringBuilder + { + Server = "127.0.0.1", + Port = 33306, + Database = "cferp_test_1", + UserID = "root", + Password = "123456", + MaximumPoolSize = 50, // 这个值应当小于 max_connections + }.ConnectionString; + options.MaxTask = 16; + options.FlushCount = 200; }); host.Services.AddLogging(builder => { @@ -91,14 +85,13 @@ async Task RunProgram() host.Services.AddSingleton(); host.Services.AddKeyedSingleton(ProcessStep.Producer); host.Services.AddKeyedSingleton(ProcessStep.Consumer); - host.Services.AddKeyedSingleton(ProcessStep.Producer); - host.Services.AddKeyedSingleton(ProcessStep.Consumer); + host.Services.AddTransient(); + host.Services.AddHostedService(); host.Services.AddHostedService(); - host.Services.AddHostedService(); - host.Services.AddHostedService(); - host.Services.AddHostedService(); - + host.Services.AddSingleton(); + host.Services.AddSingleton(); + host.Services.AddSingleton(); var app = host.Build(); await app.RunAsync(); diff --git a/ConsoleApp2/README.md b/ConsoleApp2/README.md new file mode 100644 index 0000000..3cf588b --- /dev/null +++ b/ConsoleApp2/README.md @@ -0,0 +1,49 @@ +## 说明 +使用该程序来对MyDumper导出的CSV数据进行读取,转换,然后导出到其他数据库中。 + +1. 用MyDumper从数据库导出CSV数据 + + 使用MyDumper Docker镜像 + ```sh + docker run --rm --net=host -v D:/Dump:/home/backup mydumper/mydumper:v0.15.2-6 mydumper ` + -h 127.0.0.1 -P 33306 -u root -p 123456 ` + -B cferp_test --no-schemas --csv --hex-blob ` + -o /home/backup + ``` + 将挂载卷,数据库连接和输出目录替换 + 不导出数据库结构(--no-schemas), + 导出完的目录下应当包含.sql文件以及.dat文件 + +2. 在Program.cs中修改`CsvOptions`配置 + ```cs + host.Services.Configure(option => + { + option.Delimiter = ","; + option.QuoteChar = '"'; + option.InputDir = "D:/Dump/Test"; + }); + ``` + 将`option.InputDir`配置为MyDumper导出的数据目录 + +3. 在Program.cs中修改`DatabaseOutputOptions`配置 + ```cs + host.Services.Configure(options => + { + options.ConnectionString = new MySqlConnectionStringBuilder + { + Server = "127.0.0.1", + Port = 33306, + Database = "cferp_test_1", + UserID = "root", + Password = "123456", + MaximumPoolSize = 50, + }.ConnectionString; + options.MaxTask = 16; + options.FlushCount = 200; + }); + ``` + 将`MySqlConnectionStringBuilder`的属性修改为程序要导出至的数据库 + > 后续将这些配置通过命令行传递 + +4. 运行程序 + > 注意,测试数据库`cferp_test`中的`order_process_step`表存在外键,如果要导出到和测试库同结构的数据库,记得先把外键删除。 \ No newline at end of file diff --git a/ConsoleApp2/Services/NewCsvSource.cs b/ConsoleApp2/Services/CsvSource.cs similarity index 94% rename from ConsoleApp2/Services/NewCsvSource.cs rename to ConsoleApp2/Services/CsvSource.cs index c99e792..50b26c6 100644 --- a/ConsoleApp2/Services/NewCsvSource.cs +++ b/ConsoleApp2/Services/CsvSource.cs @@ -1,24 +1,26 @@ using System.Text; -using ConsoleApp2.Entities; using ConsoleApp2.Helpers; using Microsoft.Extensions.Logging; namespace ConsoleApp2.Services; -public class NewCsvSource +/// +/// CSV文件读取 +/// +public class CsvSource { private readonly string _filePath; private readonly StreamReader _reader; private readonly ILogger? _logger; private readonly string _tableName; - public DataRecord Current { get; protected set; } + public DataRecord Current { get; private set; } public string[]? Headers { get; } public string? CurrentRaw { get; private set; } public string Delimiter { get; private set; } public char QuoteChar { get; private set; } - public NewCsvSource(string filePath, string[]? headers = null, string delimiter = ",", char quoteChar = '"', + public CsvSource(string filePath, string[]? headers = null, string delimiter = ",", char quoteChar = '"', ILogger? logger = null) { _filePath = filePath; diff --git a/ConsoleApp2/Services/DataRecordQueue.cs b/ConsoleApp2/Services/DataRecordQueue.cs index 05d63c1..e988423 100644 --- a/ConsoleApp2/Services/DataRecordQueue.cs +++ b/ConsoleApp2/Services/DataRecordQueue.cs @@ -1,9 +1,11 @@ using System.Collections.Concurrent; using System.Diagnostics.CodeAnalysis; -using ConsoleApp2.Entities; namespace ConsoleApp2.Services; +/// +/// 数据队列 +/// public class DataRecordQueue : IDisposable { private readonly BlockingCollection _queue; @@ -17,7 +19,7 @@ public class DataRecordQueue : IDisposable public DataRecordQueue() { - _queue = new BlockingCollection(); + _queue = new BlockingCollection(200_000); // 队列最长为20W条记录 } public bool TryDequeue([MaybeNullWhen(false)] out DataRecord record) diff --git a/ConsoleApp2/Services/JsvSource.cs b/ConsoleApp2/Services/JsvSource.cs index 3ce7762..c2764b3 100644 --- a/ConsoleApp2/Services/JsvSource.cs +++ b/ConsoleApp2/Services/JsvSource.cs @@ -1,10 +1,13 @@ -using ConsoleApp2.Entities; -using ConsoleApp2.Helpers; +using ConsoleApp2.Helpers; using Microsoft.Extensions.Logging; using ServiceStack.Text; namespace ConsoleApp2.Services; +/// +/// 读取Jsv格式文件 +/// +[Obsolete] public class JsvSource : IDisposable { private readonly string _filePath; diff --git a/ConsoleApp2/Services/MySqlDestination.cs b/ConsoleApp2/Services/MySqlDestination.cs index 8725ab1..47f6f4e 100644 --- a/ConsoleApp2/Services/MySqlDestination.cs +++ b/ConsoleApp2/Services/MySqlDestination.cs @@ -1,11 +1,13 @@ using System.Text; -using ConsoleApp2.Entities; using ConsoleApp2.Helpers; using Microsoft.Extensions.Logging; using MySqlConnector; namespace ConsoleApp2.Services; +/// +/// Mysql导出 +/// public class MySqlDestination : IDisposable, IAsyncDisposable { private readonly Dictionary> _recordCache; @@ -13,8 +15,6 @@ public class MySqlDestination : IDisposable, IAsyncDisposable private readonly ILogger _logger; private readonly bool _prettyOutput; - public static int AddCount; - public MySqlDestination(string connStr, ILogger logger, bool prettyOutput = false) { _conn = new MySqlConnection(connStr); @@ -29,7 +29,6 @@ public class MySqlDestination : IDisposable, IAsyncDisposable _recordCache.AddOrUpdate(record.TableName, [record], (key, value) => { value.Add(record); - Interlocked.Increment(ref AddCount); return value; }); return Task.CompletedTask; @@ -60,6 +59,10 @@ public class MySqlDestination : IDisposable, IAsyncDisposable _logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000)); throw; } + finally + { + await cmd.DisposeAsync(); + } } public static string SerializeRecords(IDictionary> tableRecords, @@ -91,22 +94,7 @@ public class MySqlDestination : IDisposable, IAsyncDisposable for (var j = 0; j < record.Fields.Length; j++) { var field = record.Fields[j]; - - #region HandleFields - - // if (field == "\\N") - // sb.Append("NULL"); - // else if (DumpDataHelper.CheckHexField(field)) - // { - // // if (StringExtensions.CheckJsonHex(field)) - // sb.Append($"0x{field}"); - // } - // else - // sb.Append($"'{field}'"); - sb.Append(field); - #endregion - if (j != record.Fields.Length - 1) sb.Append(','); } @@ -127,11 +115,13 @@ public class MySqlDestination : IDisposable, IAsyncDisposable public void Dispose() { + _conn.Close(); _conn.Dispose(); } public async ValueTask DisposeAsync() { + await _conn.CloseAsync(); await _conn.DisposeAsync(); } } \ No newline at end of file diff --git a/ConsoleApp2/Services/ProcessContext.cs b/ConsoleApp2/Services/ProcessContext.cs index 6d5018a..17c90ba 100644 --- a/ConsoleApp2/Services/ProcessContext.cs +++ b/ConsoleApp2/Services/ProcessContext.cs @@ -1,5 +1,8 @@ namespace ConsoleApp2.Services; +/// +/// 处理上下文类,标识处理进度 +/// public class ProcessContext { private int _inputCount; diff --git a/ConsoleApp2/Services/TaskManager.cs b/ConsoleApp2/Services/TaskManager.cs index f947569..1e8af7f 100644 --- a/ConsoleApp2/Services/TaskManager.cs +++ b/ConsoleApp2/Services/TaskManager.cs @@ -3,14 +3,17 @@ using Microsoft.Extensions.Logging; namespace ConsoleApp2.Services; +/// +/// 快速批量创建和等待任务 +/// public class TaskManager { private readonly ConcurrentBag _tasks; private readonly ILogger _logger; + public int TaskCount => _tasks.Count; public int RunningTaskCount => _tasks.Count(task => !task.IsCompleted); public IReadOnlyCollection Tasks => _tasks; - public bool MainTaskCompleted { get; set; } public TaskManager(ILogger logger) { @@ -18,11 +21,22 @@ public class TaskManager _logger = logger; } - public Task CreateTask(Func func) + public void CreateTask(Func func, CancellationToken cancellationToken = default) { - var task = Task.Factory.StartNew(func); + var task = Task.Factory.StartNew(func, cancellationToken); _tasks.Add(task); - _logger.LogDebug("New task created."); - return task; + _logger.LogDebug("New task created"); + } + + public async Task WaitAll() + { + await Task.WhenAll(_tasks); + } + + public void ClearTask() + { + if(RunningTaskCount != 0) + throw new InvalidOperationException("Unable to clear task. There are still running tasks"); + _tasks.Clear(); } } \ No newline at end of file diff --git a/ConsoleApp2/Services/TsvSource.cs b/ConsoleApp2/Services/TsvSource.cs deleted file mode 100644 index 2d105c0..0000000 --- a/ConsoleApp2/Services/TsvSource.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace ConsoleApp2.Services; - -public class TsvSource -{ - -} \ No newline at end of file