diff --git a/ConsoleApp2/HostedServices/TransformService.cs b/ConsoleApp2/HostedServices/TransformService.cs index cd51c56..4d07caf 100644 --- a/ConsoleApp2/HostedServices/TransformService.cs +++ b/ConsoleApp2/HostedServices/TransformService.cs @@ -2,6 +2,7 @@ using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.Options; using ConsoleApp2.Services; +using Microsoft.Extensions.Caching.Distributed; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -19,21 +20,22 @@ public class TransformService : ITransformService private readonly DataRecordQueue _producerQueue; private readonly DataRecordQueue _consumerQueue; private readonly ProcessContext _context; - private readonly IDatabase _db; + private readonly IDistributedCache _cache; public TransformService(ILogger logger, IOptions options, [FromKeyedServices(ProcessStep.Producer)]DataRecordQueue producerQueue, [FromKeyedServices(ProcessStep.Consumer)]DataRecordQueue consumerQueue, - ProcessContext context, IDatabase db) + ProcessContext context, + IDistributedCache cache) { _logger = logger; _options = options; _producerQueue = producerQueue; _consumerQueue = consumerQueue; _context = context; - _db = db; + _cache = cache; } public async Task ExecuteAsync(CancellationToken cancellationToken) @@ -50,14 +52,14 @@ public class TransformService : ITransformService if (!_producerQueue.TryDequeue(out var record)) continue; //过滤不要的record - if ( await _options.Value.RecordFilter?.Invoke(record,_db) == false) continue; + if ( await _options.Value.RecordFilter?.Invoke(record, _cache) == false) continue; record.Database = _options.Value.DatabaseFilter?.Invoke(record); //修改record _options.Value.RecordModify?.Invoke(record); //缓存record - _options.Value.RecordCache?.Invoke(record, _db); + _options.Value.RecordCache?.Invoke(record, _cache); //替换record - var replaceRecord =await _options.Value.RecordReplace?.Invoke(record, _db); + var replaceRecord =await _options.Value.RecordReplace?.Invoke(record, _cache); if (replaceRecord != null) { record = replaceRecord; diff --git a/ConsoleApp2/Options/DataTransformOptions.cs b/ConsoleApp2/Options/DataTransformOptions.cs index c33252e..226877d 100644 --- a/ConsoleApp2/Options/DataTransformOptions.cs +++ b/ConsoleApp2/Options/DataTransformOptions.cs @@ -1,4 +1,5 @@ -using StackExchange.Redis; +using Microsoft.Extensions.Caching.Distributed; +using StackExchange.Redis; namespace ConsoleApp2.Options; @@ -15,11 +16,11 @@ public class DataTransformOptions public Func? TransformBinary { get; set; }//Binary转字符串方法 - public Func>? RecordFilter { get; set; }//数据过滤方法 + public Func>? RecordFilter { get; set; }//数据过滤方法 public Action? RecordModify { get; set; }//数据修改 - public Func>? RecordReplace { get; set; }//数据替换 + public Func>? RecordReplace { get; set; }//数据替换 public Func?>? RecordAdd { get; set; }//数据替换 - public Action? RecordCache { get; set; }//数据缓存 + public Action? RecordCache { get; set; }//数据缓存 /// /// 配置导入数据的特殊列 diff --git a/ConsoleApp2/Program.cs b/ConsoleApp2/Program.cs index e86ed60..a881844 100644 --- a/ConsoleApp2/Program.cs +++ b/ConsoleApp2/Program.cs @@ -1,11 +1,9 @@ using ConsoleApp2; using ConsoleApp2.Const; -using ConsoleApp2.Helpers; using ConsoleApp2.HostedServices; using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.Options; using ConsoleApp2.Services; -using ConsoleApp2.SimulationService; using Microsoft.Extensions.Caching.StackExchangeRedis; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -13,13 +11,9 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using MySqlConnector; using Serilog; -using Serilog.Core; -using StackExchange.Redis; -using System.Reflection.PortableExecutable; +using Microsoft.Extensions.Caching.Distributed; -// 运行之前把Mysql max_allowed_packets 调大 -// 运行之前把process_step表的外键删掉 await RunProgram(); return; @@ -133,7 +127,7 @@ async Task RunProgram() options.TransformBinary = field => commandOptions != null && commandOptions.Isutf8mb4 ? $"_utf8mb4 0x{field}" : $"0x{field}"; //数据过滤 - options.RecordFilter = async (record, db) => + options.RecordFilter = async (record, cache) => { //var index = Array.IndexOf(record.Headers, "ShardKey"); if (record.TryGetField("ShardKey", out var skStr)) @@ -185,8 +179,8 @@ async Task RunProgram() //如果缓存中不存在OrderProcessID,则丢弃 if(record.TryGetField("OrderProcessID",out var orderProcessID)) { - var value = await db.StringGetAsync(orderProcessID); - if (string.IsNullOrEmpty(value.ToString()))return false; + var value = await cache.GetStringAsync(orderProcessID); + if (string.IsNullOrEmpty(value))return false; } } if (record.TableName == "order_block_plan_result" ) @@ -194,8 +188,8 @@ async Task RunProgram() //如果缓存中不存在ID,则丢弃(ID 对应order_block_plan中的ID) if (record.TryGetField("ID", out var id)) { - var value = await db.StringGetAsync(id); - if (string.IsNullOrEmpty(value.ToString())) return false; + var value = await cache.GetStringAsync(id); + if (string.IsNullOrEmpty(value)) return false; } } return true; @@ -227,7 +221,7 @@ async Task RunProgram() }; //数据缓存 - options.RecordCache = async (record, db) => + options.RecordCache = async (record, cache) => { if (record.TableName == "order") { @@ -235,7 +229,7 @@ async Task RunProgram() { if (record.TryGetField("CompanyID", out var companyid)) { - await db.StringSetAsync(orderNo, companyid); + await cache.SetStringAsync(orderNo, companyid); } } @@ -249,14 +243,7 @@ async Task RunProgram() if( record.TryGetField("ID", out var id)) { - try - { - await db.StringSetAsync(id, sk); - } - catch (Exception ex) - { - - } + await cache.SetStringAsync(id, sk); } } @@ -266,12 +253,12 @@ async Task RunProgram() if (record.TryGetField("CompanyID", out var companyid)) { record.TryGetField("ID", out var id); - await db.StringSetAsync(id, companyid); + await cache.SetStringAsync(id, companyid); } } }; //数据替换 - options.RecordReplace = async (record, db) => + options.RecordReplace = async (record, cache) => { //删除数据源里simple_plan_order.ProcessState 字段和值 @@ -306,7 +293,7 @@ async Task RunProgram() var headers = new List(record.Headers); var fields =new List(record.Fields); headers.Add("CompanyID"); - var companyidResult =await db.StringGetAsync(id); + var companyidResult =await cache.GetStringAsync(id); _ = int.TryParse(companyidResult.ToString(), out var companyid); fields.Add(companyid.ToString()); return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), companyid); @@ -321,8 +308,8 @@ async Task RunProgram() var headers = new List(record.Headers); var fields = new List(record.Fields); headers.Add("CompanyID"); - var companyidResult = await db.StringGetAsync(orderNo); - _ = int.TryParse(companyidResult.ToString(), out var cpid); + var companyidResult = await cache.GetStringAsync(orderNo); + _ = int.TryParse(companyidResult, out var cpid); fields.Add(cpid.ToString()); return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), cpid); } @@ -366,11 +353,11 @@ async Task RunProgram() try { - var shardKey =await db.StringGetAsync(processID); + var shardKey =await cache.GetStringAsync(processID); var headers = new List(record.Headers); var fields = new List(record.Fields); headers.Add("ShardKey"); - fields.Add(shardKey.ToString()); + fields.Add(shardKey); return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), record.CompanyID); } catch (Exception ex) @@ -498,9 +485,11 @@ async Task RunProgram() host.Services.AddSingleton(); host.Services.AddSingleton(); var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get() ?? new RedisCacheOptions(); - redisOptions.InstanceName = "mes-etl"; - var redis = ConnectionMultiplexer.Connect(redisOptions.Configuration); - host.Services.AddSingleton(redis.GetDatabase()); + host.Services.AddStackExchangeRedisCache(options => + { + options.Configuration = redisOptions.Configuration; + options.InstanceName = "mes-etl_"; // 缓存键前缀mes-etl_ + }); var app = host.Build(); await app.RunAsync(); } \ No newline at end of file