优化内存分配

This commit is contained in:
陈梓阳 2024-01-16 15:35:54 +08:00 committed by lindj
parent dadb36b1c9
commit 1de3603afe
5 changed files with 89 additions and 113 deletions

View File

@ -17,24 +17,24 @@ public class OutputService : IOutputService
{ {
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly DataRecordQueue _consumerQueue; private readonly DataRecordQueue _consumerQueue;
private readonly IOptions<DataTransformOptions> _transOptions; private readonly IOptions<DatabaseOutputOptions> _outputOptions;
private readonly IOptions<DatabaseOutputOptions> _options; private readonly IOptions<DataTransformOptions> _transformOptions;
private readonly ProcessContext _context; private readonly ProcessContext _context;
private readonly TaskManager _taskManager; private readonly TaskManager _taskManager;
public OutputService(ILogger<OutputService> logger, public OutputService(ILogger<OutputService> logger,
[FromKeyedServices(ProcessStep.Consumer)] DataRecordQueue consumerQueue, [FromKeyedServices(ProcessStep.Consumer)] DataRecordQueue consumerQueue,
IOptions<DatabaseOutputOptions> options, IOptions<DatabaseOutputOptions> outputOptions,
IOptions<DataTransformOptions> transOptions,
ProcessContext context, ProcessContext context,
TaskManager taskManager) TaskManager taskManager,
IOptions<DataTransformOptions> transformOptions)
{ {
_logger = logger; _logger = logger;
_consumerQueue = consumerQueue; _consumerQueue = consumerQueue;
_transOptions = transOptions; _outputOptions = outputOptions;
_options = options;
_context = context; _context = context;
_taskManager = taskManager; _taskManager = taskManager;
_transformOptions = transformOptions;
} }
public async Task ExecuteAsync(CancellationToken cancellationToken) public async Task ExecuteAsync(CancellationToken cancellationToken)
@ -50,9 +50,10 @@ public class OutputService : IOutputService
records.Add(record); records.Add(record);
count++; count++;
//_logger.LogInformation(@"*****OutputCount: {count} *****",count); //_logger.LogInformation(@"*****OutputCount: {count} *****",count);
if (records.Count >= _options.Value.FlushCount) if (records.Count >= _outputOptions.Value.FlushCount)
{ {
await FlushAsync(records); await FlushAsync(records);
_context.AddOutput(count);
records.Clear(); records.Clear();
} }
if (_context.GetExceptions().Count>0) if (_context.GetExceptions().Count>0)
@ -67,7 +68,7 @@ public class OutputService : IOutputService
records.Clear(); records.Clear();
_logger.LogInformation("***** Mysql output thread completed *****"); _logger.LogInformation("***** Mysql output thread completed *****");
} }
}, _options.Value.TaskCount); }, _outputOptions.Value.TaskCount);
await _taskManager.WaitAll(); await _taskManager.WaitAll();
//_context.CompleteOutput(); //_context.CompleteOutput();
@ -79,8 +80,8 @@ public class OutputService : IOutputService
{ {
var count = 0; var count = 0;
await using var output = new MySqlDestination( await using var output = new MySqlDestination(
_options.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"), _outputOptions.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"),
_logger, _context,true); _logger, _context, _transformOptions);
//if (records == null || records.Count() == 0) return; //if (records == null || records.Count() == 0) return;
//var dbName = $"cferp_test_1"; //var dbName = $"cferp_test_1";
//if (records != null && records.Count() > 0) //if (records != null && records.Count() > 0)
@ -102,7 +103,7 @@ public class OutputService : IOutputService
await output.WriteRecordAsync(record); await output.WriteRecordAsync(record);
count++; count++;
} }
await output.FlushAsync(_options.Value.MaxAllowedPacket, _transOptions); await output.FlushAsync(_outputOptions.Value.MaxAllowedPacket);
_context.AddOutput(count); _context.AddOutput(count);
} }
} }

View File

@ -49,34 +49,6 @@ public class TransformService : ITransformService
// var dbOptions = _options.Value.DatabaseFilter(record); // var dbOptions = _options.Value.DatabaseFilter(record);
if (!_producerQueue.TryDequeue(out var record)) continue; if (!_producerQueue.TryDequeue(out var record)) continue;
for (var i = 0; i < record.Fields.Length; i++)
{
var field = record[i];
if (field == "\\N")
{
field = "NULL";
goto Escape;
}
// else if(DumpDataHelper.CheckHexField(field))
// field = $"0x{field}";
switch (_options.Value.GetColumnType(record.TableName, record.Headers[i]))
{
case ColumnType.Text:
field = string.IsNullOrEmpty(field) ? "''" : _options.Value.TransformBinary?.Invoke(field) ?? field; ;
break;
case ColumnType.Blob:
//field = string.IsNullOrEmpty(field) ? "NULL" : $"0x{field}";
break;
default:
break;
}
Escape:
record[i] = field;
}
//过滤不要的record //过滤不要的record
if ( await _options.Value.RecordFilter?.Invoke(record,_db) == false) continue; if ( await _options.Value.RecordFilter?.Invoke(record,_db) == false) continue;
record.Database = _options.Value.DatabaseFilter?.Invoke(record); record.Database = _options.Value.DatabaseFilter?.Invoke(record);

View File

@ -218,7 +218,7 @@ async Task RunProgram()
if (record.TryGetField("NextStepID", out var idStr)) if (record.TryGetField("NextStepID", out var idStr))
{ {
if (idStr == "NULL") if (idStr == "\\N")
{ {
record.SetField("NextStepID", "0"); record.SetField("NextStepID", "0");
} }
@ -498,6 +498,7 @@ async Task RunProgram()
host.Services.AddSingleton<ITransformService, TransformService>(); host.Services.AddSingleton<ITransformService, TransformService>();
host.Services.AddSingleton<IOutputService, OutputService>(); host.Services.AddSingleton<IOutputService, OutputService>();
var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get<RedisCacheOptions>() ?? new RedisCacheOptions(); var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get<RedisCacheOptions>() ?? new RedisCacheOptions();
redisOptions.InstanceName = "mes-etl";
var redis = ConnectionMultiplexer.Connect(redisOptions.Configuration); var redis = ConnectionMultiplexer.Connect(redisOptions.Configuration);
host.Services.AddSingleton(redis.GetDatabase()); host.Services.AddSingleton(redis.GetDatabase());
var app = host.Build(); var app = host.Build();

View File

@ -1,5 +1,4 @@
using System.Reflection.Metadata; using System.Text;
using System.Text;
using ConsoleApp2.Helpers; using ConsoleApp2.Helpers;
using ConsoleApp2.Options; using ConsoleApp2.Options;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -17,22 +16,18 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
private readonly Dictionary<string, IList<DataRecord>> _recordCache; private readonly Dictionary<string, IList<DataRecord>> _recordCache;
private readonly MySqlConnection _conn; private readonly MySqlConnection _conn;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly bool _prettyOutput;
private readonly int _maxAllowPacket;
private readonly ProcessContext _context; private readonly ProcessContext _context;
private readonly IOptions<DataTransformOptions> _transformOptions;
public MySqlDestination(string connStr, ILogger logger, ProcessContext context,bool prettyOutput = false) public MySqlDestination(string connStr, ILogger logger, ProcessContext context, IOptions<DataTransformOptions> transformOptions)
{ {
_conn = new MySqlConnection(connStr); _conn = new MySqlConnection(connStr);
_conn.Open(); _conn.Open();
_recordCache = new Dictionary<string, IList<DataRecord>>(); _recordCache = new Dictionary<string, IList<DataRecord>>();
_logger = logger; _logger = logger;
_context = context; _context = context;
_prettyOutput = prettyOutput; _transformOptions = transformOptions;
} }
public Task WriteRecordAsync(DataRecord record) public Task WriteRecordAsync(DataRecord record)
{ {
_recordCache.AddOrUpdate(record.TableName, [record], (key, value) => _recordCache.AddOrUpdate(record.TableName, [record], (key, value) =>
@ -51,20 +46,22 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
} }
} }
public async Task FlushAsync(int maxAllowPacket, IOptions<DataTransformOptions> transOptions) public async Task FlushAsync(int maxAllowPacket)
{ {
if (_recordCache.Count == 0) if (_recordCache.Count == 0)
return; return;
var cmd = _conn.CreateCommand(); var cmd = _conn.CreateCommand();
cmd.CommandTimeout = 3 * 60; cmd.CommandTimeout = 3 * 60;
var excuseList = GetExcuseList(_recordCache, maxAllowPacket, transOptions, _prettyOutput);
try try
{ {
var excuseList = GetExcuseList(_recordCache, maxAllowPacket).ToList();
foreach (var insertSql in excuseList) foreach (var insertSql in excuseList)
{ {
cmd.CommandText = insertSql; cmd.CommandText = insertSql;
await cmd.ExecuteNonQueryAsync(); await cmd.ExecuteNonQueryAsync();
_logger.LogInformation(@"do insert completed!size:{Length}", cmd.CommandText.Length);
} }
_recordCache.Clear(); _recordCache.Clear();
} }
@ -80,89 +77,94 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
} }
} }
public static IList<string> GetExcuseList(IDictionary<string, IList<DataRecord>> tableRecords,int maxAllowPacket, IOptions<DataTransformOptions> transOptions, public IEnumerable<string> GetExcuseList(IDictionary<string, IList<DataRecord>> tableRecords,int maxAllowPacket)
bool prettyOutput = false)
{ {
var resultList = new List<string>(); var sb = new StringBuilder();
var headerSb = new StringBuilder();
var recordSb = new StringBuilder();
foreach (var (tableName, records) in tableRecords) foreach (var (tableName, records) in tableRecords)
{ {
if (records.Count == 0) if (records.Count == 0)
continue; continue;
headerSb.Append($"INSERT INTO `{tableName}`(");
var recordIdx = 0;
StartBuild:
var noCommas = true;
// INSERT INTO ... VALUES >>>
sb.Append($"INSERT INTO `{tableName}`(");
for (var i = 0; i < records[0].Headers.Length; i++) for (var i = 0; i < records[0].Headers.Length; i++)
{ {
var header = records[0].Headers[i]; var header = records[0].Headers[i];
headerSb.Append($"`{header}`"); sb.Append($"`{header}`");
if (i != records[0].Headers.Length - 1) if (i != records[0].Headers.Length - 1)
headerSb.Append(','); sb.Append(',');
} }
headerSb.Append(") VALUES "); sb.Append(") VALUES ");
if (prettyOutput)
headerSb.AppendLine(); // ([FIELDS]), >>>
for (;recordIdx < records.Count; recordIdx++)
var sbList = new List<string>();
var currentLength = headerSb.Length;
for (var i = 0; i < records.Count; i++)
{ {
var record = records[i]; var record = records[recordIdx];
var recordSb = new StringBuilder();
recordSb.Append('('); recordSb.Append('(');
for (var j = 0; j < record.Fields.Length; j++) for (var fieldIdx = 0; fieldIdx < record.Fields.Length; fieldIdx++)
{ {
var field = record.Fields[j]; var field = record.Fields[fieldIdx];
var header = record.Headers[j];
if (transOptions.Value.GetColumnType(record.TableName, header) ==ColumnType.Blob) // 在这里处理特殊列
#region HandleFields
if (field == "\\N")
{ {
if (string.IsNullOrEmpty(field)) recordSb.Append("NULL");
{ goto Escape;
recordSb.Append("NULL");
}
else
recordSb.Append("0x"+field);
} }
else
recordSb.Append(field); switch (_transformOptions.Value.GetColumnType(record.TableName, record.Headers[fieldIdx]))
if (j != record.Fields.Length - 1) {
case ColumnType.Text:
recordSb.Append(string.IsNullOrEmpty(field)
? "''"
: _transformOptions.Value.TransformBinary?.Invoke(field) ?? field);
break;
case ColumnType.Blob:
if (string.IsNullOrEmpty(field))
recordSb.Append("NULL");
else recordSb.Append($"0x{field}");
break;
case ColumnType.UnDefine:
default:
recordSb.Append(field);
break;
}
Escape:
#endregion
if (fieldIdx != record.Fields.Length - 1)
recordSb.Append(','); recordSb.Append(',');
} }
recordSb.Append(')'); recordSb.Append(')');
//if (i != records.Count - 1) // not last field // 若字符数量即将大于限制则返回SQL清空StringBuilder保留当前记录的索引值然后转到StartBuild标签重新开始一轮INSERT
// recordSb.Append(','); if (sb.Length + recordSb.Length + 1 > maxAllowPacket)
if (prettyOutput) recordSb.AppendLine(); {
sb.Append(';');
yield return sb.ToString();
sb.Clear();
goto StartBuild;
}
if (currentLength + recordSb.Length >= maxAllowPacket) if (!noCommas)
{ sb.Append(',').AppendLine();
var insertSb = new StringBuilder(headerSb.ToString()); noCommas = false;
insertSb.Append(string.Join(",", sbList)); sb.Append(recordSb); // StringBuilder.Append(StringBuilder)不会分配多余的内存
insertSb.Append(";");
resultList.Add(insertSb.ToString());
insertSb.Clear();
sbList.Clear();
sbList.Add(recordSb.ToString());
currentLength = headerSb.Length + 1;//逗号长度加1
}
else
{
sbList.Add(recordSb.ToString());
}
currentLength += recordSb.Length;
recordSb.Clear();
} }
if (sbList.Count > 0)
{ sb.Append(';');
var insertSb = new StringBuilder(headerSb.ToString()); yield return sb.ToString();
insertSb.Append(string.Join(",", sbList)); sb.Clear();
insertSb.Append(";");
resultList.Add(insertSb.ToString());
insertSb.Clear();
}
headerSb.Clear();
} }
return resultList;
} }

View File

@ -12,6 +12,6 @@
"MySqlMaster": "Server=127.0.0.1;Port=33309;UserId=root;Password=123456;Database=cferp_test;" "MySqlMaster": "Server=127.0.0.1;Port=33309;UserId=root;Password=123456;Database=cferp_test;"
}, },
"RedisCacheOptions": { "RedisCacheOptions": {
"Configuration": "localhost:6379" "Configuration": "192.168.1.246:6380"
} }
} }