From a74cf5ddb7cb88c4405582e991eaf6a176eb7a6b Mon Sep 17 00:00:00 2001 From: lindj <67092759@qq.com> Date: Tue, 16 Jan 2024 18:00:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B4=E7=90=86=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ConsoleApp2/DataRecord.cs | 13 + ConsoleApp2/HostedServices/OutputService.cs | 5 +- .../HostedServices/TransformService.cs | 11 +- ConsoleApp2/Options/CommandOptions.cs | 2 + ConsoleApp2/Options/DataTransformOptions.cs | 4 +- ConsoleApp2/Program.cs | 426 +++++++++++++----- ConsoleApp2/Services/MySqlDestination.cs | 19 +- 7 files changed, 344 insertions(+), 136 deletions(-) diff --git a/ConsoleApp2/DataRecord.cs b/ConsoleApp2/DataRecord.cs index f22576e..382c811 100644 --- a/ConsoleApp2/DataRecord.cs +++ b/ConsoleApp2/DataRecord.cs @@ -62,4 +62,17 @@ public class DataRecord public int Count => Fields.Length; public bool TryGetField(string columnName, out string value) => TryGetField(this, columnName, out value); + + public bool SetField(string columnName, string value) => SetField(this, columnName,value); + + public bool SetField( DataRecord record,string columnName,string value) + { + if (record.Headers is null) + throw new InvalidOperationException("Headers have not been set."); + var idx = Array.IndexOf(record.Headers, columnName); + if (idx is -1) + throw new IndexOutOfRangeException("Column name not found in this record."); + record.Fields[idx] = value; + return true; + } } \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/OutputService.cs b/ConsoleApp2/HostedServices/OutputService.cs index 4201604..24e1a89 100644 --- a/ConsoleApp2/HostedServices/OutputService.cs +++ b/ConsoleApp2/HostedServices/OutputService.cs @@ -17,6 +17,7 @@ public class OutputService : IOutputService { private readonly ILogger _logger; private readonly DataRecordQueue _consumerQueue; + private readonly IOptions _transOptions; private readonly IOptions _options; private readonly ProcessContext _context; private readonly TaskManager _taskManager; @@ -24,11 +25,13 @@ public class OutputService : IOutputService public OutputService(ILogger logger, [FromKeyedServices(ProcessStep.Consumer)] DataRecordQueue consumerQueue, IOptions options, + IOptions transOptions, ProcessContext context, TaskManager taskManager) { _logger = logger; _consumerQueue = consumerQueue; + _transOptions = transOptions; _options = options; _context = context; _taskManager = taskManager; @@ -99,7 +102,7 @@ public class OutputService : IOutputService await output.WriteRecordAsync(record); count++; } - await output.FlushAsync(_options.Value.MaxAllowedPacket); + await output.FlushAsync(_options.Value.MaxAllowedPacket, _transOptions); _context.AddOutput(count); } } \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/TransformService.cs b/ConsoleApp2/HostedServices/TransformService.cs index 25450f5..5de8baa 100644 --- a/ConsoleApp2/HostedServices/TransformService.cs +++ b/ConsoleApp2/HostedServices/TransformService.cs @@ -5,6 +5,7 @@ using ConsoleApp2.Services; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using StackExchange.Redis; namespace ConsoleApp2.HostedServices; @@ -18,19 +19,21 @@ public class TransformService : ITransformService private readonly DataRecordQueue _producerQueue; private readonly DataRecordQueue _consumerQueue; private readonly ProcessContext _context; + private readonly ConnectionMultiplexer _redisConnection; public TransformService(ILogger logger, IOptions options, [FromKeyedServices(ProcessStep.Producer)]DataRecordQueue producerQueue, [FromKeyedServices(ProcessStep.Consumer)]DataRecordQueue consumerQueue, - ProcessContext context) + ProcessContext context, ConnectionMultiplexer redisConnection) { _logger = logger; _options = options; _producerQueue = producerQueue; _consumerQueue = consumerQueue; _context = context; + _redisConnection= redisConnection; } public async Task ExecuteAsync(CancellationToken cancellationToken) @@ -75,12 +78,14 @@ public class TransformService : ITransformService record[i] = field; } //过滤不要的record - if (_options.Value.RecordFilter?.Invoke(record) == false) continue; + if ( await _options.Value.RecordFilter?.Invoke(record,_redisConnection.GetDatabase()) == false) continue; record.Database = _options.Value.DatabaseFilter?.Invoke(record); //修改record _options.Value.RecordModify?.Invoke(record); + //缓存record + _options.Value.RecordCache?.Invoke(record, _redisConnection.GetDatabase()); //替换record - var replaceRecord = _options.Value.RecordReplace?.Invoke(record); + var replaceRecord =await _options.Value.RecordReplace?.Invoke(record, _redisConnection.GetDatabase()); if (replaceRecord != null) { record = replaceRecord; diff --git a/ConsoleApp2/Options/CommandOptions.cs b/ConsoleApp2/Options/CommandOptions.cs index 78d0881..f1cfa0d 100644 --- a/ConsoleApp2/Options/CommandOptions.cs +++ b/ConsoleApp2/Options/CommandOptions.cs @@ -15,5 +15,7 @@ namespace ConsoleApp2.Options public bool Isutf8mb4 { get; set; } = true; public short OldestShardKey { get; set; } = 23010; + public string OldestTime { get; set; } = "202301"; + } } diff --git a/ConsoleApp2/Options/DataTransformOptions.cs b/ConsoleApp2/Options/DataTransformOptions.cs index 0afb2cb..c33252e 100644 --- a/ConsoleApp2/Options/DataTransformOptions.cs +++ b/ConsoleApp2/Options/DataTransformOptions.cs @@ -15,9 +15,9 @@ public class DataTransformOptions public Func? TransformBinary { get; set; }//Binary转字符串方法 - public Func? RecordFilter { get; set; }//数据过滤方法 + public Func>? RecordFilter { get; set; }//数据过滤方法 public Action? RecordModify { get; set; }//数据修改 - public Func? RecordReplace { get; set; }//数据替换 + public Func>? RecordReplace { get; set; }//数据替换 public Func?>? RecordAdd { get; set; }//数据替换 public Action? RecordCache { get; set; }//数据缓存 diff --git a/ConsoleApp2/Program.cs b/ConsoleApp2/Program.cs index d4fb497..db24d47 100644 --- a/ConsoleApp2/Program.cs +++ b/ConsoleApp2/Program.cs @@ -69,46 +69,46 @@ async Task RunProgram() Console.WriteLine($"OutPutFlushCount:{commandOptions?.FlushCount}"); Console.WriteLine($"OutPutTaskCount:{commandOptions?.TaskCount}"); - + var oldestTime = DateTime.ParseExact(commandOptions.OldestTime, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo); host.Services.Configure(option => { option.TableInfoConfig = new Dictionary { - - //{"order_block_plan_item",new TableInfo{SimulaRowCount=136323566 }},//从order_item表查询,然后程序插入 - //{"order_package_item",new TableInfo{SimulaRowCount=52525224 }},//从order_item表查询,然后程序插入 - //{"order_patch_detail",new TableInfo{SimulaRowCount=10 }},//生产没有这个表,不处理 + + //order_block_plan_item从order_item表查询,然后程序插入 + //order_package_item从order_item表查询,然后程序插入 + //order_patch_detail生产没有这个表,不处理 - //{"machine",new TableInfo{SimulaRowCount=14655 }}, - //{"order",new TableInfo{SimulaRowCount=5019216 }}, - //{"order_block_plan",new TableInfo{SimulaRowCount=2725553 }},//CreateTime < 202301的删除 + {"machine",new TableInfo{SimulaRowCount=14655 }}, + {"order",new TableInfo{SimulaRowCount=5019216 }}, + {"order_block_plan",new TableInfo{SimulaRowCount=2725553 }},//CreateTime < 202301的删除 {"order_block_plan_result",new TableInfo{SimulaRowCount=1174096 }}, - //{"order_box_block",new TableInfo{SimulaRowCount=29755672 }}, - //{"order_data_block",new TableInfo{SimulaRowCount=731800334 }}, - //{"order_data_goods",new TableInfo{SimulaRowCount=25803671 }}, - //{"order_data_parts",new TableInfo{SimulaRowCount=468517543 }}, - //{"order_item",new TableInfo{SimulaRowCount=1345520079 }}, - //{"order_module",new TableInfo{SimulaRowCount=103325385 }}, - //{"order_module_extra",new TableInfo{SimulaRowCount=54361321 }}, - //{"order_module_item",new TableInfo{SimulaRowCount=69173339 }}, - //{"order_package",new TableInfo{SimulaRowCount=16196195 }}, - //{"order_process",new TableInfo{SimulaRowCount=3892685 }},//orderNo < 202301的 - //{"order_process_step",new TableInfo{SimulaRowCount=8050349 }},//orderNo < 202301的删除 - //{"order_process_step_item",new TableInfo{SimulaRowCount=14538058 }},//orderNo < 202301的删除 - //{"order_scrap_board",new TableInfo{SimulaRowCount=123998 }}, - //{"process_group",new TableInfo{SimulaRowCount=1253 }}, - //{"process_info",new TableInfo{SimulaRowCount=7839 }}, - //{"process_item_exp",new TableInfo{SimulaRowCount=28 }}, - //{"process_schdule_capacity",new TableInfo{SimulaRowCount=39736 }}, - //{"process_step_efficiency",new TableInfo{SimulaRowCount=8 }}, - //{"report_template",new TableInfo{SimulaRowCount=7337 }}, - //{"simple_package",new TableInfo{SimulaRowCount=130436 }},//orderNo < 202301的删除 - //{"simple_plan_order",new TableInfo{SimulaRowCount=351470 }},//CreateTime < 202301的删除 - //{"sys_config",new TableInfo{SimulaRowCount=2296 }}, - //{"work_calendar",new TableInfo{SimulaRowCount=11 }}, - //{"work_shift",new TableInfo{SimulaRowCount=59 }}, - //{"work_time",new TableInfo{SimulaRowCount=62 }}, + {"order_box_block",new TableInfo{SimulaRowCount=29755672 }}, + {"order_data_block",new TableInfo{SimulaRowCount=731800334 }}, + {"order_data_goods",new TableInfo{SimulaRowCount=25803671 }}, + {"order_data_parts",new TableInfo{SimulaRowCount=468517543 }}, + {"order_item",new TableInfo{SimulaRowCount=1345520079 }}, + {"order_module",new TableInfo{SimulaRowCount=103325385 }}, + {"order_module_extra",new TableInfo{SimulaRowCount=54361321 }}, + {"order_module_item",new TableInfo{SimulaRowCount=69173339 }}, + {"order_package",new TableInfo{SimulaRowCount=16196195 }}, + {"order_process",new TableInfo{SimulaRowCount=3892685 }},//orderNo < 202301的 + {"order_process_step",new TableInfo{SimulaRowCount=8050349 }},//orderNo < 202301的删除 + {"order_process_step_item",new TableInfo{SimulaRowCount=14538058 }},//orderNo < 202301的删除 + {"order_scrap_board",new TableInfo{SimulaRowCount=123998 }}, + {"process_group",new TableInfo{SimulaRowCount=1253 }}, + {"process_info",new TableInfo{SimulaRowCount=7839 }}, + {"process_item_exp",new TableInfo{SimulaRowCount=28 }}, + {"process_schdule_capacity",new TableInfo{SimulaRowCount=39736 }}, + {"process_step_efficiency",new TableInfo{SimulaRowCount=8 }}, + {"report_template",new TableInfo{SimulaRowCount=7337 }}, + {"simple_package",new TableInfo{SimulaRowCount=130436 }},//orderNo < 202301的删除 + {"simple_plan_order",new TableInfo{SimulaRowCount=351470 }},//CreateTime < 202301的删除 + {"sys_config",new TableInfo{SimulaRowCount=2296 }}, + {"work_calendar",new TableInfo{SimulaRowCount=11 }}, + {"work_shift",new TableInfo{SimulaRowCount=59 }}, + {"work_time",new TableInfo{SimulaRowCount=62 }}, }; }); host.Services.Configure(option => @@ -129,119 +129,295 @@ async Task RunProgram() host.Services.Configure(options => { - options.DatabaseFilter = record => "cferp_test"; + options.DatabaseFilter = record => "cferp_test"; - options.TransformBinary = field => commandOptions != null && commandOptions.Isutf8mb4 ? $"_utf8mb4 0x{field}" : $"0x{field}"; - //数据过滤 - options.RecordFilter = record => - { - var index = Array.IndexOf(record.Headers, "ShardKey"); - if (index > -1) + options.TransformBinary = field => commandOptions != null && commandOptions.Isutf8mb4 ? $"_utf8mb4 0x{field}" : $"0x{field}"; + //数据过滤 + options.RecordFilter = async (record, db) => { - var skString = record.Fields[index]; - short.TryParse(skString, out var sk); - if (sk < commandOptions.OldestShardKey) return false; - } - if (record.TableName == "order_package") - { - var pkNoIndex = Array.IndexOf(record.Headers, "PakageNo"); - if (pkNoIndex > -1) + //var index = Array.IndexOf(record.Headers, "ShardKey"); + if (record.TryGetField("ShardKey", out var skStr)) { - var pkNo = record.Fields[pkNoIndex]; - if (pkNo.Length <= 2) return false; + short.TryParse(skStr, out var sk); + if (sk < commandOptions.OldestShardKey) return false; } - } - if (record.TableName == "order_block_plan") - { - var orderNosIndex = Array.IndexOf(record.Headers, "OrderNos"); - if (orderNosIndex > -1) + + if (record.TryGetField("CreateTime", out var createTime)) { - var pkNo = record.Fields[orderNosIndex]; - if (pkNo.Length <= 2) return false; + _ = DateTime.TryParse(createTime.Replace("\"", ""), out var time); + if (time < oldestTime) return false; } - } - return true; - - }; - //数据修改 - options.RecordModify = (record) => - { - if (record.TableName == "order_process")//修改order_process.NextStepID的默认值为0 - { - var nextStepIdIndex = Array.IndexOf(record.Headers, "NextStepID"); - if (nextStepIdIndex > -1) + if (record.TryGetField("OrderNo", out var orderNo)) { - var idString = record.Fields[nextStepIdIndex]; - - if (idString == "\\N") + try { - record.Fields[nextStepIdIndex] = "0"; + var yearMonth = orderNo.Substring(0, 6); + + var dt = DateTime.ParseExact(yearMonth, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo); + if (dt < oldestTime) return false; + } + catch (Exception ex) + { + return false;//订单号转换失败,跳过 + } + } - } - - }; - //数据替换 - options.RecordReplace = (record) => - { - //删除数据源里simple_plan_order.ProcessState 字段和值 - - if (record.TableName == "simple_plan_order")//修改order_process.NextStepID的默认值为0 - { - var nextStepIdIndex = Array.IndexOf(record.Headers, "ProcessState"); - if (nextStepIdIndex > -1) + if (record.TableName == "order_package") { - var headers = record.Headers.Where(t => t != "ProcessState").ToArray(); - var fs = record.Fields.ToList(); - fs.RemoveAt(nextStepIdIndex); - var fields = fs.ToArray(); - return new DataRecord(fields, record.TableName, headers, record.CompanyID); + if (record.TryGetField("PakageNo", out var pkNo)) + { + if (pkNo.Length <= 2) return false; + } } - } - if (record.TableName == "order_process_step") - { + if (record.TableName == "order_block_plan") + { + if (record.TryGetField("OrderNos", out var nos)) + { + if (nos.Length <= 2) return false; + } + + } + if (record.TableName == "order_process_step" || record.TableName == "order_process_step_item") + { + //如果缓存中不存在OrderProcessID,则丢弃 + if(record.TryGetField("OrderProcessID",out var orderProcessID)) + { + var value = await db.StringGetAsync(orderProcessID); + if (string.IsNullOrEmpty(value.ToString()))return false; + } + } + if (record.TableName == "order_block_plan_result" ) + { + //如果缓存中不存在ID,则丢弃(ID 对应order_block_plan中的ID) + if (record.TryGetField("ID", out var id)) + { + var value = await db.StringGetAsync(id); + if (string.IsNullOrEmpty(value.ToString())) return false; + } + } + return true; - } - return null; - }; - //数据生成 - options.RecordAdd = (record) => - { - var resultList = new List(); - if(record.TableName == "order_item") + }; + //数据修改 + options.RecordModify = (record) => { - - var itemIDIndex = Array.IndexOf(record.Headers, "ItemID"); - var shardKeyIndex = Array.IndexOf(record.Headers, "ShardKey"); - var planIDIndex = Array.IndexOf(record.Headers, "PlanID"); - var packageIDIndex = Array.IndexOf(record.Headers, "PackageID"); - var companyIDIndex = Array.IndexOf(record.Headers, "CompanyID"); + if (record.TableName == "order_process")//修改order_process.NextStepID的默认值为0 + { + + if (record.TryGetField("NextStepID", out var idStr)) + { - resultList.Add(new DataRecord( - new[] { "ItemID", "ShardKey", "PlanID", "CompanyID" }, "order_block_plan_item", - new[] { record.Fields[itemIDIndex], record.Fields[shardKeyIndex], record.Fields[planIDIndex], record.Fields[companyIDIndex] })); - resultList.Add( - new DataRecord(new[] { "ItemID", "ShardKey", "PackageID", "CompanyID" }, "order_package_item", - new[] { record.Fields[itemIDIndex], record.Fields[shardKeyIndex], record.Fields[packageIDIndex], record.Fields[companyIDIndex] })); - } - return resultList; - - }; + if (idStr == "NULL") + { + record.SetField("NextStepID", "0"); + } + } + } + + }; + //数据缓存 options.RecordCache = async (record, db) => { - if(record.TableName == "order_process") + if (record.TableName == "order") { - var skIndex = Array.IndexOf(record.Headers, "ShardKey"); - if(skIndex > -1) + if (record.TryGetField("OrderNo", out var orderNo)) { - var sk = record.Fields[skIndex]; - var idIndex = Array.IndexOf(record.Headers, "ID"); - var id = record.Fields[idIndex]; - await db.SetAddAsync(id, sk); + if (record.TryGetField("CompanyID", out var companyid)) + { + await db.StringSetAsync(orderNo, companyid); + } + } } + if (record.TableName == "order_process") + { + if (record.TryGetField("OrderNo", out var orderNo)) + { + var yearMonth = orderNo.Substring(2, 4); + var sk = yearMonth + "0"; + + if( record.TryGetField("ID", out var id)) + { + try + { + await db.StringSetAsync(id, sk); + } + catch (Exception ex) + { + + } + } + + } + } + if (record.TableName == "order_block_plan") + { + if (record.TryGetField("CompanyID", out var companyid)) + { + record.TryGetField("ID", out var id); + await db.StringSetAsync(id, companyid); + } + } + }; + //数据替换 + options.RecordReplace = async (record, db) => + { + //删除数据源里simple_plan_order.ProcessState 字段和值 + + if (record.TableName == "simple_plan_order")//修改order_process.NextStepID的默认值为0 + { + var nextStepIdIndex = Array.IndexOf(record.Headers, "ProcessState"); + if (nextStepIdIndex > -1) + { + var headers = record.Headers.Where(t => t != "ProcessState").ToArray(); + var fs = record.Fields.ToList(); + fs.RemoveAt(nextStepIdIndex); + var fields = fs.ToArray(); + return new DataRecord(fields, record.TableName, headers, record.CompanyID); + } + } + if (record.TableName == "order")//修改order_process.NextStepID的默认值为0 + { + var nextStepIdIndex = Array.IndexOf(record.Headers, "IsBatch"); + if (nextStepIdIndex > -1) + { + var headers = record.Headers.Where(t => t != "IsBatch").ToArray(); + var fs = record.Fields.ToList(); + fs.RemoveAt(nextStepIdIndex); + var fields = fs.ToArray(); + return new DataRecord(fields, record.TableName, headers, record.CompanyID); + } + } + if (record.TableName == "order_block_plan_result")//修改order_process.NextStepID的默认值为0 + { + if (record.TryGetField("ID", out var id)) + { + var headers = new List(record.Headers); + var fields =new List(record.Fields); + headers.Add("CompanyID"); + var companyidResult =await db.StringGetAsync(id); + _ = int.TryParse(companyidResult.ToString(), out var companyid); + fields.Add(companyid.ToString()); + return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), companyid); + } + } + if(record.TableName == "order_box_block") + { + if (!record.TryGetField("CompanyID", out var companyid)) + { + if (record.TryGetField("OrderNo", out var orderNo)) + { + var headers = new List(record.Headers); + var fields = new List(record.Fields); + headers.Add("CompanyID"); + var companyidResult = await db.StringGetAsync(orderNo); + _ = int.TryParse(companyidResult.ToString(), out var cpid); + fields.Add(cpid.ToString()); + return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), cpid); + } + } + } + if (record.TableName == "order_module") + { + if (record.TryGetField("ViewFileName",out var value)) + { + var index=Array.IndexOf(record.Headers, "ViewFileName"); + var headers = new List(record.Headers); + headers.RemoveAt(index); + var fields = new List(record.Fields); + fields.RemoveAt(index); + return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), record.CompanyID); + } + + } + if (record.TableName == "order_process") + { + if (!record.TryGetField("ShardKey", out var skStr)) + { + if(record.TryGetField("OrderNo", out var orderNo)) + { + var yearMonth = orderNo.Substring(2, 4); + var sk = yearMonth + "0"; + var headers = new List(record.Headers); + var fields = new List(record.Fields); + headers.Add("ShardKey"); + fields.Add(sk); + return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), record.CompanyID); + } + } + } + if(record.TableName == "order_process_step"|| record.TableName == "order_process_step_item") + { + if (!record.TryGetField("ShardKey",out var sk)) + { + if (record.TryGetField("OrderProcessID",out var processID)) + { + try + { + + var shardKey =await db.StringGetAsync(processID); + var headers = new List(record.Headers); + var fields = new List(record.Fields); + headers.Add("ShardKey"); + fields.Add(shardKey.ToString()); + return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), record.CompanyID); + } + catch (Exception ex) + { + + } + } + } + } + if(record.TableName == "order_moudle") + { + if (!record.TryGetField("ShardKey", out var skStr)) + { + if (record.TryGetField("OrderNo", out var orderNo)) + { + var yearMonth = orderNo.Substring(2, 4); + var sk = yearMonth + "0"; + var headers = new List(record.Headers); + var fields = new List(record.Fields); + headers.Add("ShardKey"); + fields.Add(sk); + return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), record.CompanyID); + } + } + } + return null; + }; + //数据生成 + options.RecordAdd = (record) => + { + var resultList = new List(); + if (record.TableName == "order_item") + { + record.TryGetField("ID", out var itemID); + record.TryGetField("ShardKey", out var shardKey); + record.TryGetField("PlanID", out var planID); + record.TryGetField("PackageID", out var packageID); + record.TryGetField("CompanyID", out var companyID); + _=int.TryParse(planID, out var pid); + if (pid > 0) + { + resultList.Add(new DataRecord(new[] { itemID, shardKey, planID, companyID }, + "order_block_plan_item", + new[] { "ItemID", "ShardKey", "PlanID", "CompanyID" })); + } + _ = int.TryParse(packageID, out var pkid); + if(pkid > 0) + { + resultList.Add(new DataRecord(new[] { itemID, shardKey, packageID, companyID }, + "order_package_item", + new[] { "ItemID", "ShardKey", "PackageID", "CompanyID" } + )); + } + } + return resultList; }; options.ColumnTypeConfig = new() @@ -302,10 +478,10 @@ async Task RunProgram() host.Services.AddHostedService(); host.Services.AddHostedService(); - host.Services.AddSingleton(); + host.Services.AddSingleton(); host.Services.AddSingleton(); host.Services.AddSingleton(); - var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get()??new RedisCacheOptions(); + var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get() ?? new RedisCacheOptions(); var redis = ConnectionMultiplexer.Connect(redisOptions.Configuration); host.Services.AddSingleton(redis); var app = host.Build(); diff --git a/ConsoleApp2/Services/MySqlDestination.cs b/ConsoleApp2/Services/MySqlDestination.cs index 44952fa..9594b21 100644 --- a/ConsoleApp2/Services/MySqlDestination.cs +++ b/ConsoleApp2/Services/MySqlDestination.cs @@ -1,6 +1,9 @@ -using System.Text; +using System.Reflection.Metadata; +using System.Text; using ConsoleApp2.Helpers; +using ConsoleApp2.Options; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using MySqlConnector; using ServiceStack; @@ -48,14 +51,14 @@ public class MySqlDestination : IDisposable, IAsyncDisposable } } - public async Task FlushAsync(int maxAllowPacket) + public async Task FlushAsync(int maxAllowPacket, IOptions transOptions) { if (_recordCache.Count == 0) return; var cmd = _conn.CreateCommand(); cmd.CommandTimeout = 3 * 60; - var excuseList = GetExcuseList(_recordCache, maxAllowPacket, _prettyOutput); + var excuseList = GetExcuseList(_recordCache, maxAllowPacket, transOptions, _prettyOutput); try { foreach (var insertSql in excuseList) @@ -77,7 +80,7 @@ public class MySqlDestination : IDisposable, IAsyncDisposable } } - public static IList GetExcuseList(IDictionary> tableRecords,int maxAllowPacket, + public static IList GetExcuseList(IDictionary> tableRecords,int maxAllowPacket, IOptions transOptions, bool prettyOutput = false) { var resultList = new List(); @@ -109,8 +112,14 @@ public class MySqlDestination : IDisposable, IAsyncDisposable for (var j = 0; j < record.Fields.Length; j++) { var field = record.Fields[j]; - if (record.TableName == "order_block_plan_result" && j == 2) + var header = record.Headers[j]; + if (transOptions.Value.GetColumnType(record.TableName, header) ==ColumnType.Blob) { + if (string.IsNullOrEmpty(field)) + { + recordSb.Append("NULL"); + } + else recordSb.Append("0x"+field); } else