From 8da3110ecd6cb942c5dab6e49fb134fb21329b7f Mon Sep 17 00:00:00 2001 From: CZY <2817212736@qq.com> Date: Fri, 19 Jan 2024 11:17:22 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=95=B0=E6=8D=AE=E5=88=86?= =?UTF-8?q?=E5=BA=93;=20=E4=BF=AE=E5=A4=8DtaskManager=E4=B8=AD=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E6=96=B9=E6=B3=95=E6=B2=A1=E6=9C=89=E6=AD=A3=E5=B8=B8?= =?UTF-8?q?=E7=AD=89=E5=BE=85=E7=9A=84=E9=94=99=E8=AF=AF;=20=E5=88=A0?= =?UTF-8?q?=E9=99=A4=E6=97=A0=E7=94=A8=E7=9A=84=E5=BC=82=E5=B8=B8=E6=8D=95?= =?UTF-8?q?=E8=8E=B7;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ConsoleApp2/DataRecord.cs | 6 +-- ConsoleApp2/HostedServices/InputService.cs | 6 +-- .../HostedServices/MainHostedService.cs | 12 ++--- ConsoleApp2/HostedServices/OutputService.cs | 47 ++++++++++--------- .../HostedServices/TaskMonitorService.cs | 1 - .../HostedServices/TransformService.cs | 12 ++--- ConsoleApp2/Options/TenantDbOptions.cs | 23 +++++++++ ConsoleApp2/Program.cs | 8 +++- ConsoleApp2/Services/MySqlDestination.cs | 6 +-- ConsoleApp2/Services/ProcessContext.cs | 10 +--- ConsoleApp2/Services/TaskManager.cs | 7 +-- ConsoleApp2/appsettings.json | 17 ++++++- 12 files changed, 94 insertions(+), 61 deletions(-) create mode 100644 ConsoleApp2/Options/TenantDbOptions.cs diff --git a/ConsoleApp2/DataRecord.cs b/ConsoleApp2/DataRecord.cs index 078dd1a..f6e7bef 100644 --- a/ConsoleApp2/DataRecord.cs +++ b/ConsoleApp2/DataRecord.cs @@ -10,7 +10,7 @@ public class DataRecord value = string.Empty; if (record.Headers is null) throw new InvalidOperationException("Cannot get field when headers of a record have not been set."); - var idx = Array.IndexOf(record.Headers, columnName); //可能可以优化 + var idx = Array.IndexOf(record.Headers, columnName); if (idx == -1) return false; value = record.Fields[idx]; @@ -23,7 +23,7 @@ public class DataRecord throw new InvalidOperationException("Headers have not been set."); var idx = Array.IndexOf(record.Headers, columnName); if (idx is -1) - throw new IndexOutOfRangeException("Column name not found in this record."); + throw new IndexOutOfRangeException($"Column name {columnName} not found in this record, table name {record.TableName}."); return record.Fields[idx]; } @@ -34,7 +34,7 @@ public class DataRecord public string TableName { get; } - public string? Database { get; set; } + public string Database { get; set; } public int CompanyID { get; set; } diff --git a/ConsoleApp2/HostedServices/InputService.cs b/ConsoleApp2/HostedServices/InputService.cs index 3ab6ea6..0ee9874 100644 --- a/ConsoleApp2/HostedServices/InputService.cs +++ b/ConsoleApp2/HostedServices/InputService.cs @@ -49,11 +49,7 @@ public class InputService : IInputService count++; }); - if (_context.GetExceptions().Count > 0) - { - _logger.LogInformation("***** Csv input service is canceled *****"); - return; - } + _logger.LogInformation("table:'{tableName}' input completed", tableName); } diff --git a/ConsoleApp2/HostedServices/MainHostedService.cs b/ConsoleApp2/HostedServices/MainHostedService.cs index 57973a0..d5ce6a4 100644 --- a/ConsoleApp2/HostedServices/MainHostedService.cs +++ b/ConsoleApp2/HostedServices/MainHostedService.cs @@ -34,8 +34,8 @@ public class MainHostedService : BackgroundService } catch (Exception ex) { - _context.AddException(ex); - _logger.LogError("Exception occurred on inputService:{Message},{StackTrace}", ex.Message, ex.StackTrace); + _logger.LogCritical("Exception occurred on inputService:{Message},{StackTrace}", ex.Message, ex.StackTrace); + throw; } }); @@ -47,8 +47,8 @@ public class MainHostedService : BackgroundService } catch (Exception ex) { - _context.AddException(ex); - _logger.LogError("Exception occurred on transformService:{Message},{StackTrace}", ex.Message, ex.StackTrace); + _logger.LogCritical("Exception occurred on transformService:{Message},{StackTrace}", ex.Message, ex.StackTrace); + throw; } }); @@ -60,8 +60,8 @@ public class MainHostedService : BackgroundService } catch (Exception ex) { - _context.AddException(ex); - _logger.LogError("Exception occurred on outputService:{Message},{StackTrace}", ex.Message, ex.StackTrace); + _logger.LogCritical("Exception occurred on outputService:{Message},{StackTrace}", ex.Message, ex.StackTrace); + throw; } }); diff --git a/ConsoleApp2/HostedServices/OutputService.cs b/ConsoleApp2/HostedServices/OutputService.cs index f639410..bdbbc8a 100644 --- a/ConsoleApp2/HostedServices/OutputService.cs +++ b/ConsoleApp2/HostedServices/OutputService.cs @@ -1,8 +1,7 @@ -using ConsoleApp2.Const; +using ConsoleApp2.Helpers; using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.Options; using ConsoleApp2.Services; -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -35,48 +34,54 @@ public class OutputService : IOutputService _errorRecorder = errorRecorder; } - public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context,CancellationToken cancellationToken) + public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken) { _logger.LogInformation("***** Mysql output service started *****"); _taskManager.CreateTasks(async () => { - var records = new List(); + //k: database v: records,按照要导出的数据库名分组 + var databaseDict = new Dictionary>(); while (!context.IsTransformCompleted || consumerQueue.Count > 0) { if (!consumerQueue.TryDequeue(out var record)) continue; - records.Add(record); - //_logger.LogInformation(@"*****OutputCount: {count} *****",count); + var dbName = record.Database; + var records = databaseDict.AddOrUpdate(dbName, [record], (_, list) => + { + list.Add(record); + return list; + }); + if (records.Count >= tasksOptions.OutPutOptions.FlushCount) { - await FlushAsync(records); + await FlushAsync(dbName, records); records.Clear(); } - if (_context.GetExceptions().Count>0) + } + + foreach (var (db, records) in databaseDict) + { + if (records.Count > 0) { - _logger.LogInformation("***** Csv output thread is canceled *****"); - return; + await FlushAsync(db, records); + records.Clear(); } } - if (records.Count > 0) - { - await FlushAsync(records); - records.Clear(); - _logger.LogInformation("***** Mysql output thread completed *****"); - } + + databaseDict.Clear(); + _logger.LogInformation("***** Mysql output thread completed *****"); }, tasksOptions.OutPutOptions.OutPutTaskCount); await _taskManager.WaitAll(); //_context.CompleteOutput(); - _logger.LogInformation(@"***** Mysql output service completed *****"); + _logger.LogInformation("***** Mysql output service completed *****"); } - private async Task FlushAsync(IEnumerable records) + private async Task FlushAsync(string dbName, IEnumerable records) { var count = 0; - await using var output = new MySqlDestination( - _outputOptions.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"), - _logger, _context, _transformOptions, _errorRecorder); + var connStr = _outputOptions.Value.ConnectionString ?? throw new InvalidOperationException("ConnectionString is null"); + await using var output = new MySqlDestination($"{connStr};Database={dbName};", _logger, _context, _transformOptions, _errorRecorder); //if (records == null || records.Count() == 0) return; //var dbName = $"cferp_test_1"; //if (records != null && records.Count() > 0) diff --git a/ConsoleApp2/HostedServices/TaskMonitorService.cs b/ConsoleApp2/HostedServices/TaskMonitorService.cs index 5025202..eb65ee8 100644 --- a/ConsoleApp2/HostedServices/TaskMonitorService.cs +++ b/ConsoleApp2/HostedServices/TaskMonitorService.cs @@ -49,7 +49,6 @@ public class TaskMonitorService : BackgroundService bool endCheck = false; while (true) { - if (_context.GetExceptions().Count>0) return; EndCheck: // var running = 0; // var error = 0; diff --git a/ConsoleApp2/HostedServices/TransformService.cs b/ConsoleApp2/HostedServices/TransformService.cs index 6362595..5de1b13 100644 --- a/ConsoleApp2/HostedServices/TransformService.cs +++ b/ConsoleApp2/HostedServices/TransformService.cs @@ -43,16 +43,10 @@ public class TransformService : ITransformService { while ((!context.IsInputCompleted || producerQueue.Count > 0)) { - if (_context.GetExceptions().Count > 0) - { - _logger.LogInformation("***** Csv transform service is canceled *****"); - return; - } if (!producerQueue.TryDequeue(out var record)) continue; //过滤不要的record if (await _options.Value.RecordFilter?.Invoke(record, _cache) == false) continue; - record.Database = _options.Value.DatabaseFilter?.Invoke(record); //修改record _options.Value.RecordModify?.Invoke(record); //缓存record @@ -63,14 +57,17 @@ public class TransformService : ITransformService { record = replaceRecord; } + //计算需要分流的数据库 + record.Database = _options.Value.DatabaseFilter.Invoke(record); consumerQueue.Enqueue(record); _context.AddTransform(); //数据增加 var addRecords = _options.Value.RecordAdd?.Invoke(record); - if (addRecords != null && addRecords.Count > 0) + if (addRecords is { Count: > 0 }) { foreach (var rc in addRecords) { + rc.Database = _options.Value.DatabaseFilter.Invoke(record); consumerQueue.Enqueue(rc); _context.AddTransform(); } @@ -78,6 +75,7 @@ public class TransformService : ITransformService } context.CompleteTransform(); },tasksOptions.TransformTaskCount,cancellationToken); + await _taskManager.WaitAll(); _logger.LogInformation("***** Data transformation service completed *****"); } } \ No newline at end of file diff --git a/ConsoleApp2/Options/TenantDbOptions.cs b/ConsoleApp2/Options/TenantDbOptions.cs new file mode 100644 index 0000000..fd74ad4 --- /dev/null +++ b/ConsoleApp2/Options/TenantDbOptions.cs @@ -0,0 +1,23 @@ +namespace ConsoleApp2.Options; + +public class TenantDbOptions +{ + public string TenantKey { get; set; } + + /// + /// Key-Value: {DbName}-{TenantKeyLessThan} + /// + public Dictionary DbList { get; set; } + + public string GetDbNameByTenantKeyValue(int tenantKeyValue) + { + // var dictionary = new SortedDictionary(); + // DbList.ForEach(pair => dictionary.Add(pair.Value, pair.Key)); + // 注意配置顺序 + var dbName = DbList.Cast?>() + .FirstOrDefault(pair => pair?.Value != null && pair.Value.Value > tenantKeyValue)!.Value.Key; + return dbName ?? + throw new ArgumentOutOfRangeException(nameof(tenantKeyValue), + $"已配置的数据库中没有任何符合'{nameof(tenantKeyValue)}'值的对象"); + } +} \ No newline at end of file diff --git a/ConsoleApp2/Program.cs b/ConsoleApp2/Program.cs index 7541ad5..9f2fc33 100644 --- a/ConsoleApp2/Program.cs +++ b/ConsoleApp2/Program.cs @@ -122,7 +122,13 @@ async Task RunProgram() host.Services.Configure(options => { - options.DatabaseFilter = record => "cferp_test"; + var tenantDbOptions = host.Configuration.GetRequiredSection("TenantDb").Get() + ?? throw new ApplicationException("分库配置项不存在"); + options.DatabaseFilter = record => + { + var companyId = int.Parse(record[tenantDbOptions.TenantKey]); // 每个实体都应存在CompanyID,否则异常 + return tenantDbOptions.GetDbNameByTenantKeyValue(companyId); + }; options.TransformBinary = field => commandOptions != null && commandOptions.Isutf8mb4 ? $"_utf8mb4 0x{field}" : $"0x{field}"; //数据过滤 diff --git a/ConsoleApp2/Services/MySqlDestination.cs b/ConsoleApp2/Services/MySqlDestination.cs index 9384fb5..2c1f388 100644 --- a/ConsoleApp2/Services/MySqlDestination.cs +++ b/ConsoleApp2/Services/MySqlDestination.cs @@ -76,8 +76,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable } catch (Exception e) { - _logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000)); - _context.AddException(e); + _logger.LogError(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000)); var match = MatchTableName().Match(cmd.CommandText); if (match is { Success: true, Groups.Count: > 1 }) @@ -92,8 +91,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable } catch (Exception e) { - _logger.LogCritical(e, "Error when serialize records, record:"); - _context.AddException(e); + _logger.LogError(e, "Error when serialize records, record:"); } finally { diff --git a/ConsoleApp2/Services/ProcessContext.cs b/ConsoleApp2/Services/ProcessContext.cs index 4bcbec7..cad270d 100644 --- a/ConsoleApp2/Services/ProcessContext.cs +++ b/ConsoleApp2/Services/ProcessContext.cs @@ -8,7 +8,6 @@ public class ProcessContext private int _inputCount; private int _transformCount; private int _outputCount; - private IList _exceptionList = new List(); public bool IsInputCompleted { get; private set; } public bool IsTransformCompleted { get; private set; } public bool IsOutputCompleted { get; private set; } @@ -30,14 +29,7 @@ public class ProcessContext get => _outputCount; private set => _outputCount = value; } - public void AddException(Exception ex) - { - _exceptionList.Add(ex); - } - public IList GetExceptions() - { - return _exceptionList; - } + public void CompleteInput() => IsInputCompleted = true; public void CompleteTransform() => IsTransformCompleted = true; diff --git a/ConsoleApp2/Services/TaskManager.cs b/ConsoleApp2/Services/TaskManager.cs index 38a9b8c..f21b7d3 100644 --- a/ConsoleApp2/Services/TaskManager.cs +++ b/ConsoleApp2/Services/TaskManager.cs @@ -21,13 +21,14 @@ public class TaskManager _logger = logger; } - public void CreateTask(Func func, CancellationToken cancellationToken = default) + public void CreateTask(Func func, CancellationToken cancellationToken = default) { - var task = Task.Factory.StartNew(func, cancellationToken); + var task = Task.Run(func, cancellationToken); _tasks.Add(task); _logger.LogDebug("New task created"); } - public void CreateTasks(Func func,int taskCount, CancellationToken cancellationToken = default) + + public void CreateTasks(Func func,int taskCount, CancellationToken cancellationToken = default) { for (int i = 0; i < taskCount; i++) { diff --git a/ConsoleApp2/appsettings.json b/ConsoleApp2/appsettings.json index b17a3d9..1e10efd 100644 --- a/ConsoleApp2/appsettings.json +++ b/ConsoleApp2/appsettings.json @@ -9,10 +9,25 @@ "OldestTime": "202301" }, "ConnectionStrings": { - "MySqlMaster": "Server=127.0.0.1;Port=33309;UserId=root;Password=123456;Database=cferp_test;" + "MySqlMaster": "Server=127.0.0.1;Port=33309;UserId=root;Password=123456;" // 要分库,不用加'Database='了 }, "RedisCacheOptions": { "Configuration": "192.168.1.246:6380", "InstanceName" : "mes-etl:" + }, + "TenantDb": // 分库配置 + { + "TenantKey" : "CompanyID", + "DbList": { + /* + * 相当于 + * (CompanyId < 1000) -> cferp_test_1 + * (CompanyId < 2000) -> cferp_test_2 + * (CompanyId < 2147483647) -> cferp_test_3 + */ + "cferp_test_1": 1000, + "cferp_test_2": 2000, + "cferp_test_3": 2147483647 + } } }