From 1de3603afe9ee4feb0b6f8a1e755a74be0c2e8b4 Mon Sep 17 00:00:00 2001 From: CZY <2817212736@qq.com> Date: Tue, 16 Jan 2024 15:35:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=86=85=E5=AD=98=E5=88=86?= =?UTF-8?q?=E9=85=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ConsoleApp2/HostedServices/OutputService.cs | 25 +-- .../HostedServices/TransformService.cs | 28 ---- ConsoleApp2/Program.cs | 3 +- ConsoleApp2/Services/MySqlDestination.cs | 144 +++++++++--------- ConsoleApp2/appsettings.json | 2 +- 5 files changed, 89 insertions(+), 113 deletions(-) diff --git a/ConsoleApp2/HostedServices/OutputService.cs b/ConsoleApp2/HostedServices/OutputService.cs index 24e1a89..b4598e3 100644 --- a/ConsoleApp2/HostedServices/OutputService.cs +++ b/ConsoleApp2/HostedServices/OutputService.cs @@ -17,24 +17,24 @@ public class OutputService : IOutputService { private readonly ILogger _logger; private readonly DataRecordQueue _consumerQueue; - private readonly IOptions _transOptions; - private readonly IOptions _options; + private readonly IOptions _outputOptions; + private readonly IOptions _transformOptions; private readonly ProcessContext _context; private readonly TaskManager _taskManager; public OutputService(ILogger logger, [FromKeyedServices(ProcessStep.Consumer)] DataRecordQueue consumerQueue, - IOptions options, - IOptions transOptions, + IOptions outputOptions, ProcessContext context, - TaskManager taskManager) + TaskManager taskManager, + IOptions transformOptions) { _logger = logger; _consumerQueue = consumerQueue; - _transOptions = transOptions; - _options = options; + _outputOptions = outputOptions; _context = context; _taskManager = taskManager; + _transformOptions = transformOptions; } public async Task ExecuteAsync(CancellationToken cancellationToken) @@ -50,9 +50,10 @@ public class OutputService : IOutputService records.Add(record); count++; //_logger.LogInformation(@"*****OutputCount: {count} *****",count); - if (records.Count >= _options.Value.FlushCount) + if (records.Count >= _outputOptions.Value.FlushCount) { await FlushAsync(records); + _context.AddOutput(count); records.Clear(); } if (_context.GetExceptions().Count>0) @@ -67,7 +68,7 @@ public class OutputService : IOutputService records.Clear(); _logger.LogInformation("***** Mysql output thread completed *****"); } - }, _options.Value.TaskCount); + }, _outputOptions.Value.TaskCount); await _taskManager.WaitAll(); //_context.CompleteOutput(); @@ -79,8 +80,8 @@ public class OutputService : IOutputService { var count = 0; await using var output = new MySqlDestination( - _options.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"), - _logger, _context,true); + _outputOptions.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"), + _logger, _context, _transformOptions); //if (records == null || records.Count() == 0) return; //var dbName = $"cferp_test_1"; //if (records != null && records.Count() > 0) @@ -102,7 +103,7 @@ public class OutputService : IOutputService await output.WriteRecordAsync(record); count++; } - await output.FlushAsync(_options.Value.MaxAllowedPacket, _transOptions); + await output.FlushAsync(_outputOptions.Value.MaxAllowedPacket); _context.AddOutput(count); } } \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/TransformService.cs b/ConsoleApp2/HostedServices/TransformService.cs index 547865f..3f67f6e 100644 --- a/ConsoleApp2/HostedServices/TransformService.cs +++ b/ConsoleApp2/HostedServices/TransformService.cs @@ -49,34 +49,6 @@ public class TransformService : ITransformService // var dbOptions = _options.Value.DatabaseFilter(record); if (!_producerQueue.TryDequeue(out var record)) continue; - for (var i = 0; i < record.Fields.Length; i++) - { - var field = record[i]; - - if (field == "\\N") - { - field = "NULL"; - goto Escape; - } - // else if(DumpDataHelper.CheckHexField(field)) - // field = $"0x{field}"; - - switch (_options.Value.GetColumnType(record.TableName, record.Headers[i])) - { - case ColumnType.Text: - - field = string.IsNullOrEmpty(field) ? "''" : _options.Value.TransformBinary?.Invoke(field) ?? field; ; - break; - case ColumnType.Blob: - //field = string.IsNullOrEmpty(field) ? "NULL" : $"0x{field}"; - break; - default: - break; - } - - Escape: - record[i] = field; - } //过滤不要的record if ( await _options.Value.RecordFilter?.Invoke(record,_db) == false) continue; record.Database = _options.Value.DatabaseFilter?.Invoke(record); diff --git a/ConsoleApp2/Program.cs b/ConsoleApp2/Program.cs index c73c8b3..e86ed60 100644 --- a/ConsoleApp2/Program.cs +++ b/ConsoleApp2/Program.cs @@ -218,7 +218,7 @@ async Task RunProgram() if (record.TryGetField("NextStepID", out var idStr)) { - if (idStr == "NULL") + if (idStr == "\\N") { record.SetField("NextStepID", "0"); } @@ -498,6 +498,7 @@ async Task RunProgram() host.Services.AddSingleton(); host.Services.AddSingleton(); var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get() ?? new RedisCacheOptions(); + redisOptions.InstanceName = "mes-etl"; var redis = ConnectionMultiplexer.Connect(redisOptions.Configuration); host.Services.AddSingleton(redis.GetDatabase()); var app = host.Build(); diff --git a/ConsoleApp2/Services/MySqlDestination.cs b/ConsoleApp2/Services/MySqlDestination.cs index 61de339..6df1977 100644 --- a/ConsoleApp2/Services/MySqlDestination.cs +++ b/ConsoleApp2/Services/MySqlDestination.cs @@ -1,5 +1,4 @@ -using System.Reflection.Metadata; -using System.Text; +using System.Text; using ConsoleApp2.Helpers; using ConsoleApp2.Options; using Microsoft.Extensions.Logging; @@ -17,22 +16,18 @@ public class MySqlDestination : IDisposable, IAsyncDisposable private readonly Dictionary> _recordCache; private readonly MySqlConnection _conn; private readonly ILogger _logger; - private readonly bool _prettyOutput; - private readonly int _maxAllowPacket; private readonly ProcessContext _context; - - public MySqlDestination(string connStr, ILogger logger, ProcessContext context,bool prettyOutput = false) + private readonly IOptions _transformOptions; + public MySqlDestination(string connStr, ILogger logger, ProcessContext context, IOptions transformOptions) { _conn = new MySqlConnection(connStr); _conn.Open(); _recordCache = new Dictionary>(); _logger = logger; _context = context; - _prettyOutput = prettyOutput; - - - + _transformOptions = transformOptions; } + public Task WriteRecordAsync(DataRecord record) { _recordCache.AddOrUpdate(record.TableName, [record], (key, value) => @@ -51,20 +46,22 @@ public class MySqlDestination : IDisposable, IAsyncDisposable } } - public async Task FlushAsync(int maxAllowPacket, IOptions transOptions) + public async Task FlushAsync(int maxAllowPacket) { if (_recordCache.Count == 0) return; var cmd = _conn.CreateCommand(); cmd.CommandTimeout = 3 * 60; - var excuseList = GetExcuseList(_recordCache, maxAllowPacket, transOptions, _prettyOutput); + try { + var excuseList = GetExcuseList(_recordCache, maxAllowPacket).ToList(); foreach (var insertSql in excuseList) { cmd.CommandText = insertSql; await cmd.ExecuteNonQueryAsync(); + _logger.LogInformation(@"do insert completed!size:{Length}", cmd.CommandText.Length); } _recordCache.Clear(); } @@ -80,89 +77,94 @@ public class MySqlDestination : IDisposable, IAsyncDisposable } } - public static IList GetExcuseList(IDictionary> tableRecords,int maxAllowPacket, IOptions transOptions, - bool prettyOutput = false) + public IEnumerable GetExcuseList(IDictionary> tableRecords,int maxAllowPacket) { - var resultList = new List(); - var headerSb = new StringBuilder(); - var recordSb = new StringBuilder(); + var sb = new StringBuilder(); foreach (var (tableName, records) in tableRecords) { if (records.Count == 0) continue; - headerSb.Append($"INSERT INTO `{tableName}`("); + + var recordIdx = 0; + StartBuild: + var noCommas = true; + + // INSERT INTO ... VALUES >>> + sb.Append($"INSERT INTO `{tableName}`("); for (var i = 0; i < records[0].Headers.Length; i++) { var header = records[0].Headers[i]; - headerSb.Append($"`{header}`"); + sb.Append($"`{header}`"); if (i != records[0].Headers.Length - 1) - headerSb.Append(','); + sb.Append(','); } - headerSb.Append(") VALUES "); - if (prettyOutput) - headerSb.AppendLine(); - - var sbList = new List(); - var currentLength = headerSb.Length; - for (var i = 0; i < records.Count; i++) + sb.Append(") VALUES "); + + // ([FIELDS]), >>> + for (;recordIdx < records.Count; recordIdx++) { - var record = records[i]; + var record = records[recordIdx]; + var recordSb = new StringBuilder(); recordSb.Append('('); - for (var j = 0; j < record.Fields.Length; j++) + for (var fieldIdx = 0; fieldIdx < record.Fields.Length; fieldIdx++) { - var field = record.Fields[j]; - var header = record.Headers[j]; - if (transOptions.Value.GetColumnType(record.TableName, header) ==ColumnType.Blob) + var field = record.Fields[fieldIdx]; + + // 在这里处理特殊列 + #region HandleFields + if (field == "\\N") { - if (string.IsNullOrEmpty(field)) - { - recordSb.Append("NULL"); - } - else - recordSb.Append("0x"+field); + recordSb.Append("NULL"); + goto Escape; } - else - recordSb.Append(field); - if (j != record.Fields.Length - 1) + + switch (_transformOptions.Value.GetColumnType(record.TableName, record.Headers[fieldIdx])) + { + case ColumnType.Text: + recordSb.Append(string.IsNullOrEmpty(field) + ? "''" + : _transformOptions.Value.TransformBinary?.Invoke(field) ?? field); + break; + case ColumnType.Blob: + if (string.IsNullOrEmpty(field)) + recordSb.Append("NULL"); + else recordSb.Append($"0x{field}"); + break; + case ColumnType.UnDefine: + default: + recordSb.Append(field); + break; + } + + Escape: + + #endregion + if (fieldIdx != record.Fields.Length - 1) recordSb.Append(','); } recordSb.Append(')'); - //if (i != records.Count - 1) // not last field - // recordSb.Append(','); - if (prettyOutput) recordSb.AppendLine(); + // 若字符数量即将大于限制,则返回SQL,清空StringBuilder,保留当前记录的索引值,然后转到StartBuild标签重新开始一轮INSERT + if (sb.Length + recordSb.Length + 1 > maxAllowPacket) + { + sb.Append(';'); + yield return sb.ToString(); + sb.Clear(); + goto StartBuild; + } - if (currentLength + recordSb.Length >= maxAllowPacket) - { - var insertSb = new StringBuilder(headerSb.ToString()); - insertSb.Append(string.Join(",", sbList)); - insertSb.Append(";"); - resultList.Add(insertSb.ToString()); - insertSb.Clear(); - sbList.Clear(); - sbList.Add(recordSb.ToString()); - currentLength = headerSb.Length + 1;//逗号长度加1 - } - else - { - sbList.Add(recordSb.ToString()); - } - currentLength += recordSb.Length; - recordSb.Clear(); + if (!noCommas) + sb.Append(',').AppendLine(); + noCommas = false; + sb.Append(recordSb); // StringBuilder.Append(StringBuilder)不会分配多余的内存 } - if (sbList.Count > 0) - { - var insertSb = new StringBuilder(headerSb.ToString()); - insertSb.Append(string.Join(",", sbList)); - insertSb.Append(";"); - resultList.Add(insertSb.ToString()); - insertSb.Clear(); - } - headerSb.Clear(); + + sb.Append(';'); + yield return sb.ToString(); + sb.Clear(); } - return resultList; } diff --git a/ConsoleApp2/appsettings.json b/ConsoleApp2/appsettings.json index 7c825cd..786133f 100644 --- a/ConsoleApp2/appsettings.json +++ b/ConsoleApp2/appsettings.json @@ -12,6 +12,6 @@ "MySqlMaster": "Server=127.0.0.1;Port=33309;UserId=root;Password=123456;Database=cferp_test;" }, "RedisCacheOptions": { - "Configuration": "localhost:6379" + "Configuration": "192.168.1.246:6380" } }