修改sql语句拼接

This commit is contained in:
lindj 2024-01-17 10:05:29 +08:00
parent 2629778c96
commit e1aa621a7d
4 changed files with 9 additions and 15 deletions

View File

@ -19,21 +19,21 @@ public class TransformService : ITransformService
private readonly DataRecordQueue _producerQueue; private readonly DataRecordQueue _producerQueue;
private readonly DataRecordQueue _consumerQueue; private readonly DataRecordQueue _consumerQueue;
private readonly ProcessContext _context; private readonly ProcessContext _context;
private readonly ConnectionMultiplexer _redisConnection; private readonly IDatabase _db;
public TransformService(ILogger<TransformService> logger, public TransformService(ILogger<TransformService> logger,
IOptions<DataTransformOptions> options, IOptions<DataTransformOptions> options,
[FromKeyedServices(ProcessStep.Producer)]DataRecordQueue producerQueue, [FromKeyedServices(ProcessStep.Producer)]DataRecordQueue producerQueue,
[FromKeyedServices(ProcessStep.Consumer)]DataRecordQueue consumerQueue, [FromKeyedServices(ProcessStep.Consumer)]DataRecordQueue consumerQueue,
ProcessContext context, ConnectionMultiplexer redisConnection) ProcessContext context, IDatabase db)
{ {
_logger = logger; _logger = logger;
_options = options; _options = options;
_producerQueue = producerQueue; _producerQueue = producerQueue;
_consumerQueue = consumerQueue; _consumerQueue = consumerQueue;
_context = context; _context = context;
_redisConnection= redisConnection; _db = db;
} }
public async Task ExecuteAsync(CancellationToken cancellationToken) public async Task ExecuteAsync(CancellationToken cancellationToken)
@ -78,14 +78,14 @@ public class TransformService : ITransformService
record[i] = field; record[i] = field;
} }
//过滤不要的record //过滤不要的record
if ( await _options.Value.RecordFilter?.Invoke(record,_redisConnection.GetDatabase()) == false) continue; if ( await _options.Value.RecordFilter?.Invoke(record,_db) == false) continue;
record.Database = _options.Value.DatabaseFilter?.Invoke(record); record.Database = _options.Value.DatabaseFilter?.Invoke(record);
//修改record //修改record
_options.Value.RecordModify?.Invoke(record); _options.Value.RecordModify?.Invoke(record);
//缓存record //缓存record
_options.Value.RecordCache?.Invoke(record, _redisConnection.GetDatabase()); _options.Value.RecordCache?.Invoke(record, _db);
//替换record //替换record
var replaceRecord =await _options.Value.RecordReplace?.Invoke(record, _redisConnection.GetDatabase()); var replaceRecord =await _options.Value.RecordReplace?.Invoke(record, _db);
if (replaceRecord != null) if (replaceRecord != null)
{ {
record = replaceRecord; record = replaceRecord;

View File

@ -15,5 +15,5 @@ public class DatabaseOutputOptions
/// </summary> /// </summary>
public int FlushCount { get; set; } public int FlushCount { get; set; }
public int MaxAllowedPacket { get; set; } = 32*1024*1024; public int MaxAllowedPacket { get; set; } = 64*1024*1024;
} }

View File

@ -483,7 +483,7 @@ async Task RunProgram()
host.Services.AddSingleton<IOutputService, OutputService>(); host.Services.AddSingleton<IOutputService, OutputService>();
var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get<RedisCacheOptions>() ?? new RedisCacheOptions(); var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get<RedisCacheOptions>() ?? new RedisCacheOptions();
var redis = ConnectionMultiplexer.Connect(redisOptions.Configuration); var redis = ConnectionMultiplexer.Connect(redisOptions.Configuration);
host.Services.AddSingleton(redis); host.Services.AddSingleton(redis.GetDatabase());
var app = host.Build(); var app = host.Build();
await app.RunAsync(); await app.RunAsync();
} }

View File

@ -136,16 +136,14 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
if (currentLength + recordSb.Length >= maxAllowPacket) if (currentLength + recordSb.Length >= maxAllowPacket)
{ {
var insertSb = new StringBuilder(headerSb.ToString()); var insertSb = new StringBuilder(headerSb.ToString());
insertSb.Append(string.Join(",", sbList)); insertSb.Append(string.Join(",", sbList));
insertSb.Append(";"); insertSb.Append(";");
resultList.Add(insertSb.ToString()); resultList.Add(insertSb.ToString());
insertSb.Clear(); insertSb.Clear();
sbList.Clear(); sbList.Clear();
currentLength = headerSb.Length;
sbList.Add(recordSb.ToString()); sbList.Add(recordSb.ToString());
currentLength = headerSb.Length + 1;//逗号长度加1
} }
else else
{ {
@ -164,10 +162,6 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
} }
headerSb.Clear(); headerSb.Clear();
} }
if (resultList.Count == 2)
{
var a = 1;
}
return resultList; return resultList;
} }