MES-ETL/ConsoleApp2/Services/MySqlDestination.cs

187 lines
6.1 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Text;
using ConsoleApp2.Helpers;
using ConsoleApp2.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySqlConnector;
using ServiceStack;
namespace ConsoleApp2.Services;
/// <summary>
/// Mysql导出
/// </summary>
public class MySqlDestination : IDisposable, IAsyncDisposable
{
private readonly Dictionary<string, IList<DataRecord>> _recordCache;
private readonly MySqlConnection _conn;
private readonly ILogger _logger;
private readonly ProcessContext _context;
private readonly IOptions<DataTransformOptions> _transformOptions;
public MySqlDestination(string connStr, ILogger logger, ProcessContext context, IOptions<DataTransformOptions> transformOptions)
{
_conn = new MySqlConnection(connStr);
_conn.Open();
_recordCache = new Dictionary<string, IList<DataRecord>>();
_logger = logger;
_context = context;
_transformOptions = transformOptions;
}
public Task WriteRecordAsync(DataRecord record)
{
_recordCache.AddOrUpdate(record.TableName, [record], (key, value) =>
{
value.Add(record);
return value;
});
return Task.CompletedTask;
}
public async Task WriteRecordsAsync(IEnumerable<DataRecord> records)
{
foreach (var record in records)
{
await WriteRecordAsync(record);
}
}
public async Task FlushAsync(int maxAllowPacket)
{
if (_recordCache.Count == 0)
return;
var cmd = _conn.CreateCommand();
cmd.CommandTimeout = 3 * 60;
try
{
var excuseList = GetExcuseList(_recordCache, maxAllowPacket).ToList();
foreach (var insertSql in excuseList)
{
cmd.CommandText = insertSql;
await cmd.ExecuteNonQueryAsync();
}
_recordCache.Clear();
}
catch (Exception e)
{
_logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000));
_context.AddException(e);
throw;
}
finally
{
await cmd.DisposeAsync();
}
}
public IEnumerable<string> GetExcuseList(IDictionary<string, IList<DataRecord>> tableRecords,int maxAllowPacket)
{
var sb = new StringBuilder();
foreach (var (tableName, records) in tableRecords)
{
if (records.Count == 0)
continue;
var recordIdx = 0;
StartBuild:
var noCommas = true;
// INSERT INTO ... VALUES >>>
sb.Append($"INSERT INTO `{tableName}`(");
for (var i = 0; i < records[0].Headers.Length; i++)
{
var header = records[0].Headers[i];
sb.Append($"`{header}`");
if (i != records[0].Headers.Length - 1)
sb.Append(',');
}
sb.Append(") VALUES ");
// ([FIELDS]), >>>
for (;recordIdx < records.Count; recordIdx++)
{
var record = records[recordIdx];
var recordSb = new StringBuilder();
recordSb.Append('(');
for (var fieldIdx = 0; fieldIdx < record.Fields.Length; fieldIdx++)
{
var field = record.Fields[fieldIdx];
// 在这里处理特殊列
#region HandleFields
if (field == "\\N")
{
recordSb.Append("NULL");
goto Escape;
}
switch (_transformOptions.Value.GetColumnType(record.TableName, record.Headers[fieldIdx]))
{
case ColumnType.Text:
recordSb.Append(string.IsNullOrEmpty(field)
? "''"
: _transformOptions.Value.TransformBinary?.Invoke(field) ?? field);
break;
case ColumnType.Blob:
if (string.IsNullOrEmpty(field))
recordSb.Append("NULL");
else recordSb.Append($"0x{field}");
break;
case ColumnType.Json:
recordSb.Append(string.IsNullOrEmpty(field)
? "\"[]\""
: _transformOptions.Value.TransformBinary?.Invoke(field) ?? field);
break;
case ColumnType.UnDefine:
default:
recordSb.Append(field);
break;
}
Escape:
#endregion
if (fieldIdx != record.Fields.Length - 1)
recordSb.Append(',');
}
recordSb.Append(')');
// 若字符数量即将大于限制则返回SQL清空StringBuilder保留当前记录的索引值然后转到StartBuild标签重新开始一轮INSERT
if (sb.Length + recordSb.Length + 1 > maxAllowPacket)
{
sb.Append(';');
yield return sb.ToString();
sb.Clear();
goto StartBuild;
}
if (!noCommas)
sb.Append(',').AppendLine();
noCommas = false;
sb.Append(recordSb); // StringBuilder.Append(StringBuilder)不会分配多余的内存
}
sb.Append(';');
yield return sb.ToString();
sb.Clear();
}
}
public void Dispose()
{
_conn.Close();
_conn.Dispose();
}
public async ValueTask DisposeAsync()
{
await _conn.CloseAsync();
await _conn.DisposeAsync();
}
}