diff --git a/ConsoleApp2/HostedServices/TransformService.cs b/ConsoleApp2/HostedServices/TransformService.cs index 5de8baa..d0dd3c1 100644 --- a/ConsoleApp2/HostedServices/TransformService.cs +++ b/ConsoleApp2/HostedServices/TransformService.cs @@ -19,21 +19,21 @@ public class TransformService : ITransformService private readonly DataRecordQueue _producerQueue; private readonly DataRecordQueue _consumerQueue; private readonly ProcessContext _context; - private readonly ConnectionMultiplexer _redisConnection; + private readonly IDatabase _db; public TransformService(ILogger logger, IOptions options, [FromKeyedServices(ProcessStep.Producer)]DataRecordQueue producerQueue, [FromKeyedServices(ProcessStep.Consumer)]DataRecordQueue consumerQueue, - ProcessContext context, ConnectionMultiplexer redisConnection) + ProcessContext context, IDatabase db) { _logger = logger; _options = options; _producerQueue = producerQueue; _consumerQueue = consumerQueue; _context = context; - _redisConnection= redisConnection; + _db = db; } public async Task ExecuteAsync(CancellationToken cancellationToken) @@ -78,14 +78,14 @@ public class TransformService : ITransformService record[i] = field; } //过滤不要的record - if ( await _options.Value.RecordFilter?.Invoke(record,_redisConnection.GetDatabase()) == false) continue; + if ( await _options.Value.RecordFilter?.Invoke(record,_db) == false) continue; record.Database = _options.Value.DatabaseFilter?.Invoke(record); //修改record _options.Value.RecordModify?.Invoke(record); //缓存record - _options.Value.RecordCache?.Invoke(record, _redisConnection.GetDatabase()); + _options.Value.RecordCache?.Invoke(record, _db); //替换record - var replaceRecord =await _options.Value.RecordReplace?.Invoke(record, _redisConnection.GetDatabase()); + var replaceRecord =await _options.Value.RecordReplace?.Invoke(record, _db); if (replaceRecord != null) { record = replaceRecord; diff --git a/ConsoleApp2/Options/DatabaseOutputOptions.cs b/ConsoleApp2/Options/DatabaseOutputOptions.cs index cd8f2da..811650c 100644 --- a/ConsoleApp2/Options/DatabaseOutputOptions.cs +++ b/ConsoleApp2/Options/DatabaseOutputOptions.cs @@ -15,5 +15,5 @@ public class DatabaseOutputOptions /// public int FlushCount { get; set; } - public int MaxAllowedPacket { get; set; } = 32*1024*1024; + public int MaxAllowedPacket { get; set; } = 64*1024*1024; } \ No newline at end of file diff --git a/ConsoleApp2/Program.cs b/ConsoleApp2/Program.cs index db24d47..9abcba6 100644 --- a/ConsoleApp2/Program.cs +++ b/ConsoleApp2/Program.cs @@ -483,7 +483,7 @@ async Task RunProgram() host.Services.AddSingleton(); var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get() ?? new RedisCacheOptions(); var redis = ConnectionMultiplexer.Connect(redisOptions.Configuration); - host.Services.AddSingleton(redis); + host.Services.AddSingleton(redis.GetDatabase()); var app = host.Build(); await app.RunAsync(); } \ No newline at end of file diff --git a/ConsoleApp2/Services/MySqlDestination.cs b/ConsoleApp2/Services/MySqlDestination.cs index 9594b21..61de339 100644 --- a/ConsoleApp2/Services/MySqlDestination.cs +++ b/ConsoleApp2/Services/MySqlDestination.cs @@ -136,16 +136,14 @@ public class MySqlDestination : IDisposable, IAsyncDisposable if (currentLength + recordSb.Length >= maxAllowPacket) { - var insertSb = new StringBuilder(headerSb.ToString()); - insertSb.Append(string.Join(",", sbList)); insertSb.Append(";"); resultList.Add(insertSb.ToString()); insertSb.Clear(); sbList.Clear(); - currentLength = headerSb.Length; sbList.Add(recordSb.ToString()); + currentLength = headerSb.Length + 1;//逗号长度加1 } else { @@ -164,10 +162,6 @@ public class MySqlDestination : IDisposable, IAsyncDisposable } headerSb.Clear(); } - if (resultList.Count == 2) - { - var a = 1; - } return resultList; }