From c53d2927bb909d70d63b8084d9d29d9606047332 Mon Sep 17 00:00:00 2001 From: CZY <2817212736@qq.com> Date: Fri, 29 Dec 2023 16:16:05 +0800 Subject: [PATCH] Update --- ConsoleApp2/Entities/DataRecord.cs | 20 ++- ConsoleApp2/Helpers/DumpDataHelper.cs | 15 +- .../CsvInputService.cs} | 56 ++++--- .../HostedServices/DataTransformService.cs | 76 ++++++++++ .../HostedServices/MysqlOutputService.cs | 85 +++++++++++ .../HostedServices/SqlFileOutputService.cs | 64 ++++++++ .../HostedServices/TaskMonitorService.cs | 118 +++++++++++++++ .../HostedServices/VoidOutputService.cs | 35 +++++ ConsoleApp2/MySqlDestination.cs | 128 ---------------- ConsoleApp2/Options/CsvOptions.cs | 2 +- ConsoleApp2/Options/DataTransformOptions.cs | 18 ++- ConsoleApp2/Options/DatabaseOptions.cs | 9 +- ConsoleApp2/ProcessStep.cs | 7 + ConsoleApp2/Program.cs | 122 ++++++++++++---- ConsoleApp2/Services/DataRecordQueue.cs | 67 ++++----- ConsoleApp2/Services/DataTransformService.cs | 46 ------ ConsoleApp2/Services/DatabaseOutputService.cs | 39 ----- ConsoleApp2/{ => Services}/JsvSource.cs | 5 +- ConsoleApp2/Services/MySqlDestination.cs | 137 ++++++++++++++++++ ConsoleApp2/Services/NewCsvSource.cs | 137 ++++++++++++++++++ ConsoleApp2/Services/ProcessContext.cs | 44 ++++++ ConsoleApp2/Services/TaskManager.cs | 2 +- ConsoleApp2/Services/TaskMonitorService.cs | 57 -------- ConsoleApp2/Services/TsvSource.cs | 6 + 24 files changed, 909 insertions(+), 386 deletions(-) rename ConsoleApp2/{CsvConversion.cs => HostedServices/CsvInputService.cs} (52%) create mode 100644 ConsoleApp2/HostedServices/DataTransformService.cs create mode 100644 ConsoleApp2/HostedServices/MysqlOutputService.cs create mode 100644 ConsoleApp2/HostedServices/SqlFileOutputService.cs create mode 100644 ConsoleApp2/HostedServices/TaskMonitorService.cs create mode 100644 ConsoleApp2/HostedServices/VoidOutputService.cs delete mode 100644 ConsoleApp2/MySqlDestination.cs create mode 100644 ConsoleApp2/ProcessStep.cs delete mode 100644 ConsoleApp2/Services/DataTransformService.cs delete mode 100644 ConsoleApp2/Services/DatabaseOutputService.cs rename ConsoleApp2/{ => Services}/JsvSource.cs (88%) create mode 100644 ConsoleApp2/Services/MySqlDestination.cs create mode 100644 ConsoleApp2/Services/NewCsvSource.cs create mode 100644 ConsoleApp2/Services/ProcessContext.cs delete mode 100644 ConsoleApp2/Services/TaskMonitorService.cs create mode 100644 ConsoleApp2/Services/TsvSource.cs diff --git a/ConsoleApp2/Entities/DataRecord.cs b/ConsoleApp2/Entities/DataRecord.cs index b306b57..42ae33a 100644 --- a/ConsoleApp2/Entities/DataRecord.cs +++ b/ConsoleApp2/Entities/DataRecord.cs @@ -1,4 +1,6 @@ -namespace ConsoleApp2.Entities; +using System.Text; + +namespace ConsoleApp2.Entities; public class DataRecord { @@ -27,16 +29,18 @@ public class DataRecord public string[] Fields { get; } - public string[]? Headers { get; } + public string[] Headers { get; } public string TableName { get; } + public string? Database { get; set; } - public DataRecord(string[] fields, string tableName, string[]? headers = null) + + public DataRecord(string[] fields, string tableName, string[] headers) { - if (headers is not null && fields.Length != headers.Length) + if (fields.Length != headers.Length) throw new ArgumentException( - $"The number of fields does not match the number of headers. Expected: {fields.Length} Got: {headers.Length}", + $"The number of fields does not match the number of headers. Expected: {headers.Length} Got: {fields.Length} Fields: {string.Join(',', fields)}", nameof(fields)); Fields = fields; @@ -44,7 +48,11 @@ public class DataRecord Headers = headers; } - public string this[int index] => Fields[index]; + public string this[int index] + { + get => Fields[index]; + set => Fields[index] = value; + } public string this[string columnName] => GetField(this, columnName); diff --git a/ConsoleApp2/Helpers/DumpDataHelper.cs b/ConsoleApp2/Helpers/DumpDataHelper.cs index c754cb4..42c0844 100644 --- a/ConsoleApp2/Helpers/DumpDataHelper.cs +++ b/ConsoleApp2/Helpers/DumpDataHelper.cs @@ -1,4 +1,6 @@ -using System.Text.RegularExpressions; +using System.Text; +using System.Text.RegularExpressions; +using ConsoleApp2.Entities; namespace ConsoleApp2.Helpers; @@ -73,19 +75,24 @@ public static partial class DumpDataHelper if (str.StartsWith('\"')) return false; - var isDigit = true; + var isAllDigit = true; foreach (var c in str) { if (!char.IsAsciiHexDigit(c)) return false; if (!char.IsNumber(c)) - isDigit = false; + isAllDigit = false; } - if (isDigit) + if (isAllDigit) //避免全数字 return false; return true; } + + // public static string EliminateEscapeChars(ReadOnlySpan str) + // { + // char[] escapeChars = ['0','\'']; + // } } \ No newline at end of file diff --git a/ConsoleApp2/CsvConversion.cs b/ConsoleApp2/HostedServices/CsvInputService.cs similarity index 52% rename from ConsoleApp2/CsvConversion.cs rename to ConsoleApp2/HostedServices/CsvInputService.cs index 02e20a7..a5d4213 100644 --- a/ConsoleApp2/CsvConversion.cs +++ b/ConsoleApp2/HostedServices/CsvInputService.cs @@ -1,36 +1,38 @@ -using System.Diagnostics; -using ConsoleApp2.Helpers; +using ConsoleApp2.Helpers; +using ConsoleApp2.Options; using ConsoleApp2.Services; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -namespace ConsoleApp2; +namespace ConsoleApp2.HostedServices; -public class CsvConversion : BackgroundService +public class CsvInputService : BackgroundService { private readonly ILogger _logger; private readonly IOptions _csvOptions; - private readonly DataTransformService _transform; - private readonly TaskManager _taskManager; + private readonly TaskManager _taskManager; // TBD + private readonly DataRecordQueue _producerQueue; + private readonly ProcessContext _context; - - public CsvConversion(ILogger logger, - IOptions csvOptions, - DataTransformService transform, - TaskManager taskManager) + public CsvInputService(ILogger logger, + IOptions csvOptions, + [FromKeyedServices(ProcessStep.Producer)]TaskManager taskManager, + [FromKeyedServices(ProcessStep.Producer)]DataRecordQueue producerQueue, + ProcessContext context) { _logger = logger; _csvOptions = csvOptions; - _transform = transform; _taskManager = taskManager; + _producerQueue = producerQueue; + _context = context; } protected override async Task ExecuteAsync(CancellationToken cancellationToken) { - var sw = Stopwatch.StartNew(); var inputDir = _csvOptions.Value.InputDir; - _logger.LogInformation("Working dir: {InputDir}", inputDir); + _logger.LogInformation("***** Csv input service start, working dir: {InputDir}, thread id: {ThreadId} *****", inputDir, Environment.CurrentManagedThreadId); var files = Directory.GetFiles(inputDir).Where(s => s.EndsWith(".sql") && !s.Contains("schema")).ToArray(); if (files.Length == 0) { @@ -38,39 +40,31 @@ public class CsvConversion : BackgroundService return; } - foreach(var sqlPath in files) + foreach (var sqlPath in files) { _logger.LogInformation("Working sql file: {SqlPath}", sqlPath); var headers = await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(sqlPath); var csvFiles = await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(sqlPath); - var queue = new DataRecordQueue(); foreach (var csvFile in csvFiles) { var csvPath = Path.Combine(inputDir, csvFile); - var source = new JsvSource(csvPath, headers, _logger); + // var source = new JsvSource(csvPath, headers, _logger); + var source = new NewCsvSource(csvPath, headers, logger: _logger); while (await source.ReadAsync()) { - queue.Enqueue(source.Current); + _context.AddInput(); + _producerQueue.Enqueue(source.Current); + if (cancellationToken.IsCancellationRequested) + return; } - - if (queue.Count > 200) - { - var queue1 = queue; - await _taskManager.CreateTask(async () => await _transform.ExecuteAsync(queue1, cancellationToken)); - queue = new DataRecordQueue(); - } - - if (cancellationToken.IsCancellationRequested) - return; } - + _logger.LogInformation("File '{File}' input completed", Path.GetFileName(sqlPath)); } + _context.CompleteInput(); _logger.LogInformation("***** Csv input service completed *****"); - _logger.LogInformation("Elapsed: {Elapsed}", sw.Elapsed); - _taskManager.MainTaskCompleted = true; } } \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/DataTransformService.cs b/ConsoleApp2/HostedServices/DataTransformService.cs new file mode 100644 index 0000000..702c9ab --- /dev/null +++ b/ConsoleApp2/HostedServices/DataTransformService.cs @@ -0,0 +1,76 @@ +using ConsoleApp2.Helpers; +using ConsoleApp2.Options; +using ConsoleApp2.Services; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace ConsoleApp2.HostedServices; + +public class DataTransformService : BackgroundService +{ + private readonly ILogger _logger; + private readonly IOptions _options; + private readonly DataRecordQueue _producerQueue; + private readonly DataRecordQueue _consumerQueue; + private readonly ProcessContext _context; + + + public DataTransformService(ILogger logger, + IOptions options, // TBD: database filter + [FromKeyedServices(ProcessStep.Producer)]DataRecordQueue producerQueue, + [FromKeyedServices(ProcessStep.Consumer)]DataRecordQueue consumerQueue, + ProcessContext context) + { + _logger = logger; + // _taskManager = taskManager; + _options = options; + _producerQueue = producerQueue; + _consumerQueue = consumerQueue; + _context = context; + } + + protected override async Task ExecuteAsync(CancellationToken cancellationToken) + { + _logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId); + while (!_context.IsInputCompleted || _producerQueue.Count > 0) + { + // var dbOptions = _options.Value.DatabaseFilter(record); + if (!_producerQueue.TryDequeue(out var record)) continue; + record.Database = _options.Value.DatabaseFilter?.Invoke(record); + + for (var i = 0; i < record.Fields.Length; i++) + { + var field = record[i]; + + if (field == "\\N") + { + field = "NULL"; + goto Escape; + } + // else if(DumpDataHelper.CheckHexField(field)) + // field = $"0x{field}"; + + switch (_options.Value.GetColumnType(record.TableName, record.Headers[i])) + { + case ColumnType.Blob or ColumnType.Text: + field = string.IsNullOrEmpty(field) ? "''" : $"0x{field}"; + break; + default: + field = field; + break; + } + + Escape: + record[i] = field; + } + + _consumerQueue.Enqueue(record); + _context.AddTransform(); + } + + _context.CompleteTransform(); + _logger.LogInformation("***** Data transformation service completed *****"); + } +} \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/MysqlOutputService.cs b/ConsoleApp2/HostedServices/MysqlOutputService.cs new file mode 100644 index 0000000..6953e2a --- /dev/null +++ b/ConsoleApp2/HostedServices/MysqlOutputService.cs @@ -0,0 +1,85 @@ +using ConsoleApp2.Entities; +using ConsoleApp2.Options; +using ConsoleApp2.Services; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using MySqlConnector; + +namespace ConsoleApp2.HostedServices; + +public class MysqlOutputService : BackgroundService +{ + private readonly ILogger _logger; + private readonly DataRecordQueue _consumerQueue; + private readonly IOptions _options; + private readonly ProcessContext _context; + + public MysqlOutputService(ILogger logger, + [FromKeyedServices(ProcessStep.Consumer)]DataRecordQueue consumerQueue, + IOptions options, + ProcessContext context) + { + _logger = logger; + _consumerQueue = consumerQueue; + _options = options; + _context = context; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("***** Mysql output service started *****"); + + var tasks = new List(); + var records = new List(); + while (!_context.IsTransformCompleted || _consumerQueue.Count > 0) + { + if (!_consumerQueue.TryDequeue(out var record)) continue; + records.Add(record); + + if (records.Count >= 200) + { + var recordsCopy = records; + tasks.Add(Task.Run(async () => await FlushAsync(recordsCopy), stoppingToken)); + records = []; + } + + if (tasks.Count >= 10) + { + await Task.WhenAll(tasks); + tasks.Clear(); + } + } + + await Task.WhenAll(tasks); + await FlushAsync(records); + + _context.CompleteOutput(); + + _logger.LogInformation("***** Mysql output service completed *****"); + } + + private async Task FlushAsync(IEnumerable records) + { + var count = 0; + await using var output = new MySqlDestination(new MySqlConnectionStringBuilder + { + Server = _options.Value.Host, + Port = _options.Value.Port, + Database = _options.Value.Database, + UserID = _options.Value.User, + Password = _options.Value.Password, + ConnectionTimeout = 180, + }.ConnectionString, _logger, true); + + foreach (var record in records) + { + await output.WriteRecordAsync(record); + count++; + } + + await output.FlushAsync(); + _context.AddOutput(count); + } +} \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/SqlFileOutputService.cs b/ConsoleApp2/HostedServices/SqlFileOutputService.cs new file mode 100644 index 0000000..b582ba0 --- /dev/null +++ b/ConsoleApp2/HostedServices/SqlFileOutputService.cs @@ -0,0 +1,64 @@ +using ConsoleApp2.Entities; +using ConsoleApp2.Helpers; +using ConsoleApp2.Services; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace ConsoleApp2.HostedServices; + +public class SqlFileOutputService : BackgroundService +{ + private readonly string _outputFile = "D:/DumpOutput/cferp_test_1.sql"; // + private readonly DataRecordQueue _consumerQueue; + private readonly ILogger _logger; + private readonly ProcessContext _context; + + public SqlFileOutputService( + ILogger logger, + [FromKeyedServices(ProcessStep.Consumer)] + DataRecordQueue consumerQueue, + ProcessContext context) + { + _logger = logger; + _consumerQueue = consumerQueue; + _context = context; + } + + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("***** Sql file output service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId); + var count = 0; + var tableRecords = new Dictionary>(); + while (!_context.IsTransformCompleted || _consumerQueue.Count > 0) + { + if (!_consumerQueue.TryDequeue(out var record)) continue; + + tableRecords.AddOrUpdate(record.TableName, [record], (key, value) => + { + value.Add(record); + return value; + }); + + ++count; + + if (count >= 200) + { + await File.AppendAllTextAsync(_outputFile, + MySqlDestination.SerializeRecords(tableRecords), stoppingToken); + tableRecords.Clear(); + _context.AddOutput(count); + count = 0; + } + } + await File.AppendAllTextAsync(_outputFile, + MySqlDestination.SerializeRecords(tableRecords), stoppingToken); + tableRecords.Clear(); + _context.AddOutput(count); + _context.CompleteOutput(); + + _logger.LogInformation("***** Sql file output service completed *****"); + } + +} \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/TaskMonitorService.cs b/ConsoleApp2/HostedServices/TaskMonitorService.cs new file mode 100644 index 0000000..ed95dae --- /dev/null +++ b/ConsoleApp2/HostedServices/TaskMonitorService.cs @@ -0,0 +1,118 @@ +using System.Diagnostics; +using ConsoleApp2.Services; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace ConsoleApp2.HostedServices; + +public class TaskMonitorService : BackgroundService +{ + private readonly IHostApplicationLifetime _lifetime; + private readonly ILogger _logger; + private readonly ProcessContext _context; + private readonly DataRecordQueue _producerQueue; + private readonly DataRecordQueue _consumerQueue; + + public TaskMonitorService(IHostApplicationLifetime lifetime, + ILogger logger, + ProcessContext context, + [FromKeyedServices(ProcessStep.Producer)] + DataRecordQueue producerQueue, + [FromKeyedServices(ProcessStep.Consumer)] + DataRecordQueue consumerQueue) + { + _lifetime = lifetime; + _logger = logger; + _context = context; + _producerQueue = producerQueue; + _consumerQueue = consumerQueue; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + await Task.Factory.StartNew(Monitor, stoppingToken); + } + + private async Task Monitor() + { + var sw = Stopwatch.StartNew(); + var lastTime = sw.ElapsedMilliseconds; + var lastInputCount = _context.InputCount; + var lastTransformCount = _context.TransformCount; + var lastOutputCount = _context.OutputCount; + + bool endCheck = false; + while (true) + { + EndCheck: + // var running = 0; + // var error = 0; + // var completed = 0; + // var canceled = 0; + // foreach (var task in _taskManager.Tasks) + // { + // switch (task.Status) + // { + // case TaskStatus.Canceled: + // canceled++; + // break; + // case TaskStatus.Faulted: + // error++; + // break; + // case TaskStatus.RanToCompletion: + // completed++; + // break; + // default: + // running++; + // break; + // } + // } + + var time = sw.ElapsedMilliseconds; + var inputCount = _context.InputCount; + var transformCount = _context.TransformCount; + var outputCount = _context.OutputCount; + + var elapseTime = (time - lastTime) / 1000f; + var inputSpeed = (inputCount - lastInputCount) / elapseTime; + var transformSpeed = (transformCount - lastTransformCount) / elapseTime; + var outputSpeed = (outputCount - lastOutputCount) / elapseTime; + + // _logger.LogInformation( + // "Task monitor: running: {Running}, error: {Error}, completed: {Completed}, canceled: {Canceled}, outputSpeed: {Speed} records/s", + // running, error, completed, canceled, outputSpeed); + _logger.LogInformation( + "Process monitor: input: {inputStatus}, transform: {transformStatus}, output: {outputStatus}\nInput: {InputCount}, Transform: {TransformCount}, Output: {OutputCount}", + _context.IsInputCompleted ? "completed" : $"running {inputSpeed:F2} records/s", + _context.IsTransformCompleted ? "completed" : $"running {transformSpeed:F2} records/s", + _context.IsOutputCompleted ? "completed" : $"running {outputSpeed:F2} records/s", + inputCount, + transformCount, + outputCount); + _logger.LogInformation("Queue monitor: producer queue: {ProducerQueue}, consumer queue: {ConsumerQueue}", + _producerQueue.Count, _consumerQueue.Count); + + await Task.Delay(2000); + + lastTime = time; + lastInputCount = inputCount; + lastTransformCount = transformCount; + lastOutputCount = outputCount; + + if (_context is { IsInputCompleted: true, IsTransformCompleted: true, IsOutputCompleted: true }) + { + if (!endCheck) + { + endCheck = true; + goto EndCheck; + } + break; + } + } + + _logger.LogInformation("***** All tasks completed *****"); + _logger.LogInformation("***** ElapseTime: {Time}", (sw.ElapsedMilliseconds / 1000f).ToString("F3")); + // _lifetime.StopApplication(); + } +} \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/VoidOutputService.cs b/ConsoleApp2/HostedServices/VoidOutputService.cs new file mode 100644 index 0000000..04ef787 --- /dev/null +++ b/ConsoleApp2/HostedServices/VoidOutputService.cs @@ -0,0 +1,35 @@ +using ConsoleApp2.Services; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace ConsoleApp2.HostedServices; + +public class VoidOutputService : BackgroundService +{ + private readonly ILogger _logger; + private readonly DataRecordQueue _consumerQueue; + private readonly ProcessContext _context; + + public VoidOutputService([FromKeyedServices(ProcessStep.Consumer)] DataRecordQueue consumerQueue, + ProcessContext context, ILogger logger) + { + _consumerQueue = consumerQueue; + _context = context; + _logger = logger; + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("***** Void output service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId); + while (!_context.IsTransformCompleted || _consumerQueue.Count > 0) + { + if (_consumerQueue.TryDequeue(out var record)) + _context.AddOutput(); + } + + _context.CompleteOutput(); + _logger.LogInformation("***** Void output service completed *****"); + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/ConsoleApp2/MySqlDestination.cs b/ConsoleApp2/MySqlDestination.cs deleted file mode 100644 index 9654cc0..0000000 --- a/ConsoleApp2/MySqlDestination.cs +++ /dev/null @@ -1,128 +0,0 @@ -using System.Text; -using ConsoleApp2.Entities; -using ConsoleApp2.Helpers; -using Microsoft.Extensions.Logging; -using MySqlConnector; - -namespace ConsoleApp2; - -public class MySqlDestination : IDisposable, IAsyncDisposable -{ - private readonly Dictionary> _recordCache; - private readonly MySqlConnection _conn; - private readonly ILogger _logger; - - public static int AddCount; - - public MySqlDestination(string connStr, ILogger logger) - { - _conn = new MySqlConnection(connStr); - _conn.Open(); - _recordCache = new Dictionary>(); - _logger = logger; - } - - public Task WriteRecordAsync(DataRecord record) - { - _recordCache.AddOrUpdate(record.TableName, [record], (key, value) => - { - value.Add(record); - Interlocked.Increment(ref AddCount); - return value; - }); - return Task.CompletedTask; - } - - public async Task WriteRecordsAsync(IEnumerable records) - { - foreach (var record in records) - { - await WriteRecordAsync(record); - } - } - - public async Task FlushAsync() - { - if (_recordCache.Count == 0) - return; - try - { - var cmd = _conn.CreateCommand(); - var sb = new StringBuilder(); - - var count = 0; - foreach (var (tableName, records) in _recordCache) - { - if (records.Count == 0) - continue; - sb.Append($"INSERT INTO `{tableName}`("); - for (var i = 0; i < records[0].Headers.Length; i++) - { - var header = records[0].Headers[i]; - sb.Append($"`{header}`"); - if (i != records[0].Headers.Length - 1) - sb.Append(','); - } - - sb.AppendLine(") VALUES"); - - for (var i = 0; i < records.Count; i++) - { - count++; - var record = records[i]; - sb.Append('('); - for (var j = 0; j < record.Fields.Length; j++) - { - var field = record.Fields[j]; - - #region HandleFields - - if (field == "\\N") - sb.Append("NULL"); - else if (DumpDataHelper.CheckHexField(field)) // TODO: 性能消耗 - { - if (StringExtensions.CheckJsonHex(field)) - sb.Append($"UNHEX(\"{field}\")"); - else - sb.Append($"\"{field}\""); - } - else - sb.Append($"\"{field}\""); - - #endregion - - if (j != record.Fields.Length - 1) - sb.Append(','); - } - - sb.Append(')'); - - if (i != records.Count - 1) // not last field - sb.AppendLine(","); - } - - sb.AppendLine(";\n"); - } - - cmd.CommandText = sb.ToString(); - - await cmd.ExecuteNonQueryAsync(); - _recordCache.Clear(); - } - catch (Exception e) - { - _logger.LogCritical(e, "Error when flushing records"); - throw; - } - } - - public void Dispose() - { - _conn.Dispose(); - } - - public async ValueTask DisposeAsync() - { - await _conn.DisposeAsync(); - } -} \ No newline at end of file diff --git a/ConsoleApp2/Options/CsvOptions.cs b/ConsoleApp2/Options/CsvOptions.cs index b01d43c..9507fd8 100644 --- a/ConsoleApp2/Options/CsvOptions.cs +++ b/ConsoleApp2/Options/CsvOptions.cs @@ -1,4 +1,4 @@ -namespace ConsoleApp2; +namespace ConsoleApp2.Options; public class CsvOptions { diff --git a/ConsoleApp2/Options/DataTransformOptions.cs b/ConsoleApp2/Options/DataTransformOptions.cs index cd1ed95..b8d04cf 100644 --- a/ConsoleApp2/Options/DataTransformOptions.cs +++ b/ConsoleApp2/Options/DataTransformOptions.cs @@ -1,9 +1,21 @@ using ConsoleApp2.Entities; -using ConsoleApp2.Options; -namespace ConsoleApp2; +namespace ConsoleApp2.Options; + +public enum ColumnType +{ + Blob, + Text, + UnDefine, +} public class DataTransformOptions { - public Func DatabaseFilter { get; set; } + public Func? DatabaseFilter { get; set; } + public Dictionary ColumnTypeConfig { get; set; } = new(); // "table.column" -> type + + public ColumnType GetColumnType(string table, string column) + { + return ColumnTypeConfig.GetValueOrDefault($"{table}.{column}", ColumnType.UnDefine); + } } \ No newline at end of file diff --git a/ConsoleApp2/Options/DatabaseOptions.cs b/ConsoleApp2/Options/DatabaseOptions.cs index 3a4a3d0..4cdef2d 100644 --- a/ConsoleApp2/Options/DatabaseOptions.cs +++ b/ConsoleApp2/Options/DatabaseOptions.cs @@ -1,3 +1,10 @@ namespace ConsoleApp2.Options; -public record DatabaseOptions(string Host, uint Port, string Database, string User, string Password); \ No newline at end of file +public class DatabaseOptions +{ + public string Host { get; set; } + public uint Port { get; set; } + public string Database { get; set; } + public string User { get; set; } + public string Password { get; set; } +} \ No newline at end of file diff --git a/ConsoleApp2/ProcessStep.cs b/ConsoleApp2/ProcessStep.cs new file mode 100644 index 0000000..908e918 --- /dev/null +++ b/ConsoleApp2/ProcessStep.cs @@ -0,0 +1,7 @@ +namespace ConsoleApp2; + +public static class ProcessStep +{ + public const string Producer = "Producer"; + public const string Consumer = "Consumer"; +} \ No newline at end of file diff --git a/ConsoleApp2/Program.cs b/ConsoleApp2/Program.cs index 51cd61c..0dd2202 100644 --- a/ConsoleApp2/Program.cs +++ b/ConsoleApp2/Program.cs @@ -1,4 +1,5 @@ using ConsoleApp2; +using ConsoleApp2.HostedServices; using ConsoleApp2.Options; using ConsoleApp2.Services; using Microsoft.Extensions.Configuration; @@ -7,31 +8,98 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Serilog; -ThreadPool.SetMaxThreads(200, 200); -var host = Host.CreateApplicationBuilder(); -host.Configuration.AddCommandLine(args); -host.Services.Configure(option => + +// 加入数据库过滤 +// HostedService不是并行的,完善TaskManager手动开启线程 +// 测试BlockingCollection对速度的影响? + +// 重新同步数据 +// Json列和Blob列不一致 +/* JSV导出带转义的列有误 + * order_data_block表id 4153969 + * order_data_parts表Spec列 + * order_module表Name列 + * process_group表Items列 + */ + +await RunProgram(); +return; + +async Task RunProgram() { - option.DelimiterChar = ','; - option.QuoteChar = '"'; - option.InputDir = "D:/Dump/MyDumper-Csv"; - option.OutputDir = "D:/DumpOutput"; - option.MaxThreads = 12; -}); -host.Services.Configure(options => -{ - var dbOption = new DatabaseOptions("localhost", 33306, "cferp_test_1", "root", "123456"); - options.DatabaseFilter = record => dbOption; -}); -host.Services.AddLogging(builder => -{ - builder.ClearProviders(); - builder.AddSerilog(new LoggerConfiguration().WriteTo.Console().CreateLogger()); -}); -host.Services.AddHostedService(); -host.Services.AddHostedService(); -host.Services.AddSingleton(); -host.Services.AddSingleton(); -host.Services.AddSingleton(); -var app = host.Build(); -await app.RunAsync(); \ No newline at end of file + ThreadPool.SetMaxThreads(200, 200); + var host = Host.CreateApplicationBuilder(); + host.Configuration.AddCommandLine(args); + host.Services.Configure(option => + { + option.DelimiterChar = ','; + option.QuoteChar = '"'; + option.InputDir = "D:/Dump/MyDumper"; + option.OutputDir = "D:/DumpOutput"; + option.MaxThreads = 12; + }); + host.Services.Configure(options => + { + //TODO: Database Filter + options.DatabaseFilter = record => "cferp_test_1"; + options.ColumnTypeConfig = new() + { + { "simple_plan_order.PlaceData", ColumnType.Blob }, + { "order_block_plan_result.PlaceData", ColumnType.Blob }, + { "order_box_block.Data", ColumnType.Blob }, + { "order_data_goods.ExtraProp", ColumnType.Text }, + { "order_module_extra.JsonStr", ColumnType.Text }, + { "process_info.Users", ColumnType.Text }, + { "order_process_schdule.CustomOrderNo", ColumnType.Text }, + { "order_process_schdule.OrderProcessStepName", ColumnType.Text }, + { "order_process_schdule.AreaName", ColumnType.Text }, + { "order_process_schdule.ConsigneeAddress", ColumnType.Text }, + { "order_process_schdule.ConsigneePhone", ColumnType.Text }, + { "report_source.Sql", ColumnType.Text }, + { "report_source.KeyValue", ColumnType.Text }, + { "report_source.Setting", ColumnType.Text }, + { "order_data_block.RemarkJson", ColumnType.Text }, + { "order_patch_detail.BlockDetail", ColumnType.Text }, + { "order_scrap_board.OutLineJson", ColumnType.Text }, + { "simple_package.Items", ColumnType.Text }, + { "order_batch_pack_config.Setting", ColumnType.Text }, + { "machine.Settings", ColumnType.Text }, + { "sys_config.Value", ColumnType.Text }, + { "sys_config.JsonStr", ColumnType.Text }, + { "process_item_exp.ItemJson", ColumnType.Text }, + { "report_template.Template", ColumnType.Text }, + { "report_template.SourceConfig", ColumnType.Text }, + { "order_block_plan.OrderNos", ColumnType.Text }, + { "order_block_plan.BlockInfo", ColumnType.Text }, + }; + }); +// 加入数据库过滤后删除 + host.Services.Configure(options => + { + options.Host = "localhost"; + options.Port = 33306; + options.Database = "cferp_test_1"; + options.User = "root"; + options.Password = "123456"; + }); + host.Services.AddLogging(builder => + { + builder.ClearProviders(); + builder.AddSerilog(new LoggerConfiguration().WriteTo.Console().CreateLogger()); + }); + + host.Services.AddSingleton(); + host.Services.AddKeyedSingleton(ProcessStep.Producer); + host.Services.AddKeyedSingleton(ProcessStep.Consumer); + host.Services.AddKeyedSingleton(ProcessStep.Producer); + host.Services.AddKeyedSingleton(ProcessStep.Consumer); + + host.Services.AddHostedService(); + host.Services.AddHostedService(); + host.Services.AddHostedService(); + host.Services.AddHostedService(); + + + var app = host.Build(); + await app.RunAsync(); +} \ No newline at end of file diff --git a/ConsoleApp2/Services/DataRecordQueue.cs b/ConsoleApp2/Services/DataRecordQueue.cs index 72f4b13..05d63c1 100644 --- a/ConsoleApp2/Services/DataRecordQueue.cs +++ b/ConsoleApp2/Services/DataRecordQueue.cs @@ -1,60 +1,49 @@ using System.Collections.Concurrent; using System.Diagnostics.CodeAnalysis; using ConsoleApp2.Entities; -using Microsoft.Extensions.Logging; namespace ConsoleApp2.Services; -public class DataRecordQueue +public class DataRecordQueue : IDisposable { - /// - /// Indicate that the queue is completed adding. - /// - public bool IsCompletedAdding { get; private set; } - - /// - /// Remark that the queue is completed for adding and empty; - /// - public bool IsCompleted => IsCompletedAdding && _queue.IsEmpty; - - private readonly ConcurrentQueue _queue; + private readonly BlockingCollection _queue; + + public int Count => _queue.Count; + public bool IsCompleted => _queue.IsCompleted; + public bool IsAddingCompleted => _queue.IsAddingCompleted; + + public event Action? OnRecordWrite; + public event Action? OnRecordRead; public DataRecordQueue() { - _queue = new ConcurrentQueue(); - } - - public DataRecordQueue(IEnumerable records) - { - _queue = new ConcurrentQueue(records); + _queue = new BlockingCollection(); } - /// - public void Enqueue(DataRecord item) + public bool TryDequeue([MaybeNullWhen(false)] out DataRecord record) { - _queue.Enqueue(item); + if (_queue.TryTake(out record)) + { + OnRecordRead?.Invoke(); + return true; + } + + return false; } - /// - public bool TryDequeue([MaybeNullWhen(false)] out DataRecord result) + public DataRecord Dequeue() => _queue.Take(); + + public void CompleteAdding() => _queue.CompleteAdding(); + + public void Enqueue(DataRecord record) { - return _queue.TryDequeue(out result); + _queue.Add(record); + OnRecordWrite?.Invoke(); } - /// - public bool TryPeek([MaybeNullWhen(false)] out DataRecord result) + + public void Dispose() { - return _queue.TryPeek(out result); + _queue.Dispose(); } - - /// - public int Count => _queue.Count; - - /// - public bool IsEmpty => _queue.IsEmpty; - - /// - /// Indicate that the queue is completed adding. - /// - public void CompleteAdding() => IsCompletedAdding = true; } \ No newline at end of file diff --git a/ConsoleApp2/Services/DataTransformService.cs b/ConsoleApp2/Services/DataTransformService.cs deleted file mode 100644 index c8a3729..0000000 --- a/ConsoleApp2/Services/DataTransformService.cs +++ /dev/null @@ -1,46 +0,0 @@ -using ConsoleApp2.Entities; -using ConsoleApp2.Helpers; -using ConsoleApp2.Options; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; - -namespace ConsoleApp2.Services; - -public class DataTransformService -{ - private readonly ILogger _logger; - private readonly TaskManager _taskManager; - private readonly DatabaseOutputService _output; - private readonly IOptions _options; - - public DataTransformService(ILogger logger, TaskManager taskManager, DatabaseOutputService output, IOptions options) - { - _logger = logger; - _taskManager = taskManager; - _output = output; - _options = options; - } - - public async Task ExecuteAsync(DataRecordQueue records, CancellationToken cancellationToken = default) - { - _logger.LogInformation("Start transforming data."); - var map = new Dictionary(); - while (records.TryDequeue(out var record)) - { - var dbOptions = _options.Value.DatabaseFilter(record); - map.AddOrUpdate(dbOptions, new DataRecordQueue([record]), (options, queue) => - { - queue.Enqueue(record); - return queue; - }); - } - - foreach (var (dbOptions, queue) in map) - { - await _taskManager.CreateTask(async () => - { - await _output.ExecuteAsync(queue, dbOptions, cancellationToken); - }); - } - } -} \ No newline at end of file diff --git a/ConsoleApp2/Services/DatabaseOutputService.cs b/ConsoleApp2/Services/DatabaseOutputService.cs deleted file mode 100644 index 04948bc..0000000 --- a/ConsoleApp2/Services/DatabaseOutputService.cs +++ /dev/null @@ -1,39 +0,0 @@ -using ConsoleApp2.Entities; -using ConsoleApp2.Options; -using Microsoft.Extensions.Logging; -using MySqlConnector; - -namespace ConsoleApp2.Services; - -public class DatabaseOutputService -{ - private readonly ILogger _logger; - - public DatabaseOutputService(ILogger logger) - { - _logger = logger; - } - - public async Task ExecuteAsync(DataRecordQueue records, DatabaseOptions options, CancellationToken stoppingToken = default) - { - var count = records.Count; - var output = new MySqlDestination(new MySqlConnectionStringBuilder() - { - Server = options.Host, - Port = options.Port, - Database = options.Database, - UserID = options.User, - Password = options.Password, - ConnectionTimeout = 120, - }.ConnectionString, _logger); // TODO: 加入DI - - while (records.TryDequeue(out var record) && !stoppingToken.IsCancellationRequested) - { - await output.WriteRecordAsync(record); - } - - await output.FlushAsync(); - - _logger.LogInformation("Flush {Count} records to database.", count); - } -} \ No newline at end of file diff --git a/ConsoleApp2/JsvSource.cs b/ConsoleApp2/Services/JsvSource.cs similarity index 88% rename from ConsoleApp2/JsvSource.cs rename to ConsoleApp2/Services/JsvSource.cs index 829392f..3ce7762 100644 --- a/ConsoleApp2/JsvSource.cs +++ b/ConsoleApp2/Services/JsvSource.cs @@ -3,7 +3,7 @@ using ConsoleApp2.Helpers; using Microsoft.Extensions.Logging; using ServiceStack.Text; -namespace ConsoleApp2; +namespace ConsoleApp2.Services; public class JsvSource : IDisposable { @@ -35,8 +35,7 @@ public class JsvSource : IDisposable if (string.IsNullOrEmpty(str)) return false; var fields = _jsv.DeserializeFromString(str); - if(Headers is not null && Headers.Length != fields.Length) - throw new InvalidDataException("解析的字段数与指定的列数不匹配"); + Current = new DataRecord(fields, _tableName, Headers); return true; } diff --git a/ConsoleApp2/Services/MySqlDestination.cs b/ConsoleApp2/Services/MySqlDestination.cs new file mode 100644 index 0000000..8725ab1 --- /dev/null +++ b/ConsoleApp2/Services/MySqlDestination.cs @@ -0,0 +1,137 @@ +using System.Text; +using ConsoleApp2.Entities; +using ConsoleApp2.Helpers; +using Microsoft.Extensions.Logging; +using MySqlConnector; + +namespace ConsoleApp2.Services; + +public class MySqlDestination : IDisposable, IAsyncDisposable +{ + private readonly Dictionary> _recordCache; + private readonly MySqlConnection _conn; + private readonly ILogger _logger; + private readonly bool _prettyOutput; + + public static int AddCount; + + public MySqlDestination(string connStr, ILogger logger, bool prettyOutput = false) + { + _conn = new MySqlConnection(connStr); + _conn.Open(); + _recordCache = new Dictionary>(); + _logger = logger; + _prettyOutput = prettyOutput; + } + + public Task WriteRecordAsync(DataRecord record) + { + _recordCache.AddOrUpdate(record.TableName, [record], (key, value) => + { + value.Add(record); + Interlocked.Increment(ref AddCount); + return value; + }); + return Task.CompletedTask; + } + + public async Task WriteRecordsAsync(IEnumerable records) + { + foreach (var record in records) + { + await WriteRecordAsync(record); + } + } + + public async Task FlushAsync() + { + if (_recordCache.Count == 0) + return; + + var cmd = _conn.CreateCommand(); + cmd.CommandText = SerializeRecords(_recordCache, _prettyOutput); + try + { + await cmd.ExecuteNonQueryAsync(); + _recordCache.Clear(); + } + catch (Exception e) + { + _logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000)); + throw; + } + } + + public static string SerializeRecords(IDictionary> tableRecords, + bool prettyOutput = false) + { + var sb = new StringBuilder(); + + foreach (var (tableName, records) in tableRecords) + { + if (records.Count == 0) + continue; + sb.Append($"INSERT INTO `{tableName}`("); + for (var i = 0; i < records[0].Headers.Length; i++) + { + var header = records[0].Headers[i]; + sb.Append($"`{header}`"); + if (i != records[0].Headers.Length - 1) + sb.Append(','); + } + + sb.Append(") VALUES "); + if (prettyOutput) + sb.AppendLine(); + + for (var i = 0; i < records.Count; i++) + { + var record = records[i]; + sb.Append('('); + for (var j = 0; j < record.Fields.Length; j++) + { + var field = record.Fields[j]; + + #region HandleFields + + // if (field == "\\N") + // sb.Append("NULL"); + // else if (DumpDataHelper.CheckHexField(field)) + // { + // // if (StringExtensions.CheckJsonHex(field)) + // sb.Append($"0x{field}"); + // } + // else + // sb.Append($"'{field}'"); + + sb.Append(field); + #endregion + + if (j != record.Fields.Length - 1) + sb.Append(','); + } + + sb.Append(')'); + + if (i != records.Count - 1) // not last field + sb.Append(','); + if (prettyOutput) sb.AppendLine(); + } + + sb.AppendLine(";"); + } + + return sb.ToString(); + } + + + public void Dispose() + { + _conn.Dispose(); + } + + public async ValueTask DisposeAsync() + { + await _conn.DisposeAsync(); + } +} \ No newline at end of file diff --git a/ConsoleApp2/Services/NewCsvSource.cs b/ConsoleApp2/Services/NewCsvSource.cs new file mode 100644 index 0000000..c99e792 --- /dev/null +++ b/ConsoleApp2/Services/NewCsvSource.cs @@ -0,0 +1,137 @@ +using System.Text; +using ConsoleApp2.Entities; +using ConsoleApp2.Helpers; +using Microsoft.Extensions.Logging; + +namespace ConsoleApp2.Services; + +public class NewCsvSource +{ + private readonly string _filePath; + private readonly StreamReader _reader; + private readonly ILogger? _logger; + private readonly string _tableName; + + public DataRecord Current { get; protected set; } + public string[]? Headers { get; } + public string? CurrentRaw { get; private set; } + public string Delimiter { get; private set; } + public char QuoteChar { get; private set; } + + public NewCsvSource(string filePath, string[]? headers = null, string delimiter = ",", char quoteChar = '"', + ILogger? logger = null) + { + _filePath = filePath; + Headers = headers; + _logger = logger; + Delimiter = delimiter; + QuoteChar = quoteChar; + + var fs = File.OpenRead(filePath); + _reader = new StreamReader(fs); + _tableName = DumpDataHelper.GetTableName(filePath); + } + + public async ValueTask ReadAsync() + { + var str = await _reader.ReadLineAsync(); + if (string.IsNullOrWhiteSpace(str)) + return false; + + CurrentRaw = str; + + var fields = ParseRow2(str, QuoteChar, Delimiter); + Current = new DataRecord(fields, _tableName, Headers); + return true; + } + + public string[] ParseRow(string row, char quoteChar, string delimiter) + { + var span = row.AsSpan(); + var result = new List(); + + if (span.Length == 0) + throw new ArgumentException("The row is empty", nameof(row)); + + var isInQuote = span[0] == quoteChar; + var start = 0; + for (var i = 1; i < span.Length; i++) + { + if (span[i] == quoteChar) + { + isInQuote = !isInQuote; + } + // delimiter需要足够复杂 + else if (/*!isInQuote && */span.Length > i + delimiter.Length && span[i..(i + delimiter.Length)].Equals(delimiter, StringComparison.CurrentCulture)) // field matched + { + string field; + if (span[start] == quoteChar && span[i - 1] == quoteChar) // enclosed by quoteChar + field = span[(start + 1)..(i - 1)].ToString(); // escape quoteChar + else + field = span[start..i].ToString(); + + start = i + delimiter.Length; + + if (field == "\\N") + field = "NULL"; + + result.Add(field); + + continue; + } + } + + result.Add(span[start..].ToString()); + + + for (var i = 0; i < result.Count; i++) + { + var field = result[i]; + if (DumpDataHelper.CheckHexField(field) && StringExtensions.CheckJsonHex(field)) + { + result[i] = StringExtensions.FromHex(field); + } + } + + return result.ToArray(); + } + + public string[] ParseRow2(ReadOnlySpan source, char quoteChar, string delimiter) + { + var result = new List(); + var index = -1; + StringBuilder current = new StringBuilder(); + bool hasQuote = false; + bool hasSlash = false; + while (index < source.Length-1) + { + index++; + if (hasSlash == false && source[index] == '\\') + { + hasSlash = true; + current.Append('\\'); + continue; + } + if (hasSlash ==false && source[index] == quoteChar) + { + hasQuote = !hasQuote; + current.Append(source[index]); + continue; + } + if (hasQuote==false && source[index] == delimiter[0]) + { + result.Add(current.ToString()); + current.Clear(); + } + else + { + current.Append(source[index]); + } + + hasSlash = false; + } + + result.Add(current.ToString()); + return result.ToArray(); + } +} \ No newline at end of file diff --git a/ConsoleApp2/Services/ProcessContext.cs b/ConsoleApp2/Services/ProcessContext.cs new file mode 100644 index 0000000..6d5018a --- /dev/null +++ b/ConsoleApp2/Services/ProcessContext.cs @@ -0,0 +1,44 @@ +namespace ConsoleApp2.Services; + +public class ProcessContext +{ + private int _inputCount; + private int _transformCount; + private int _outputCount; + public bool IsInputCompleted { get; private set; } + public bool IsTransformCompleted { get; private set; } + public bool IsOutputCompleted { get; private set; } + + public int InputCount + { + get => _inputCount; + private set => _inputCount = value; + } + + public int TransformCount + { + get => _transformCount; + private set => _transformCount = value; + } + + public int OutputCount + { + get => _outputCount; + private set => _outputCount = value; + } + + public void CompleteInput() => IsInputCompleted = true; + + public void CompleteTransform() => IsTransformCompleted = true; + public void CompleteOutput() => IsOutputCompleted = true; + + public void AddInput() => Interlocked.Increment(ref _inputCount); + + public void AddInput(int count) => Interlocked.Add(ref _inputCount, count); + + public void AddTransform() => Interlocked.Increment(ref _transformCount); + public void AddTransform(int count) => Interlocked.Add(ref _transformCount, count); + + public void AddOutput() => Interlocked.Increment(ref _outputCount); + public void AddOutput(int count) => Interlocked.Add(ref _outputCount, count); +} \ No newline at end of file diff --git a/ConsoleApp2/Services/TaskManager.cs b/ConsoleApp2/Services/TaskManager.cs index 3638d68..f947569 100644 --- a/ConsoleApp2/Services/TaskManager.cs +++ b/ConsoleApp2/Services/TaskManager.cs @@ -22,7 +22,7 @@ public class TaskManager { var task = Task.Factory.StartNew(func); _tasks.Add(task); - _logger.LogInformation("New task created."); + _logger.LogDebug("New task created."); return task; } } \ No newline at end of file diff --git a/ConsoleApp2/Services/TaskMonitorService.cs b/ConsoleApp2/Services/TaskMonitorService.cs deleted file mode 100644 index 532ec55..0000000 --- a/ConsoleApp2/Services/TaskMonitorService.cs +++ /dev/null @@ -1,57 +0,0 @@ -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; - -namespace ConsoleApp2.Services; - -public class TaskMonitorService : BackgroundService -{ - private readonly IHostApplicationLifetime _lifetime; - private readonly TaskManager _taskManager; - private readonly ILogger _logger; - - public TaskMonitorService(IHostApplicationLifetime lifetime, TaskManager taskManager, - ILogger logger) - { - _lifetime = lifetime; - _taskManager = taskManager; - _logger = logger; - } - - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - while (!_taskManager.MainTaskCompleted || _taskManager.RunningTaskCount != 0) - { - var running = 0; - var error = 0; - var completed = 0; - var canceled = 0; - foreach (var task in _taskManager.Tasks) - { - switch (task.Status) - { - case TaskStatus.Running: - running++; - break; - case TaskStatus.Canceled: - canceled++; - break; - case TaskStatus.Faulted: - error++; - break; - case TaskStatus.RanToCompletion: - completed++; - break; - default: - throw new ArgumentOutOfRangeException(); - } - } - - _logger.LogInformation( - "Task monitor: running: {Running}, error: {Error}, completed: {Completed}, canceled: {Canceled}", - running, error, completed, canceled); - await Task.Delay(2000); - } - - _logger.LogInformation("***** All tasks completed *****"); - } -} \ No newline at end of file diff --git a/ConsoleApp2/Services/TsvSource.cs b/ConsoleApp2/Services/TsvSource.cs new file mode 100644 index 0000000..2d105c0 --- /dev/null +++ b/ConsoleApp2/Services/TsvSource.cs @@ -0,0 +1,6 @@ +namespace ConsoleApp2.Services; + +public class TsvSource +{ + +} \ No newline at end of file