268 lines
9.4 KiB
C#
268 lines
9.4 KiB
C#
using System.Text;
|
||
using System.Text.RegularExpressions;
|
||
using MesETL.App.Const;
|
||
using MesETL.App.Helpers;
|
||
using MesETL.App.Options;
|
||
using MesETL.Shared.Helper;
|
||
using Microsoft.Extensions.Logging;
|
||
using Microsoft.Extensions.Options;
|
||
using MySqlConnector;
|
||
|
||
namespace MesETL.App.Services.ETL;
|
||
|
||
/// <summary>
|
||
/// Mysql导出
|
||
/// </summary>
|
||
public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
||
{
|
||
/// <summary>
|
||
/// table => records
|
||
/// </summary>
|
||
private readonly Dictionary<string, IList<DataRecord>> _recordCache;
|
||
private readonly MySqlConnection _conn;
|
||
private readonly ILogger _logger;
|
||
private readonly IOptions<DatabaseOutputOptions> _options;
|
||
private readonly ErrorRecorder.OutputErrorRecorder _outputErrorRecorder;
|
||
private readonly ProcessContext _context;
|
||
|
||
public MySqlDestination(
|
||
string connStr,
|
||
ILogger logger,
|
||
IOptions<DatabaseOutputOptions> options,
|
||
ErrorRecorder.OutputErrorRecorder outputErrorRecorder,
|
||
ProcessContext context)
|
||
{
|
||
_conn = new MySqlConnection(connStr);
|
||
_conn.Open();
|
||
_recordCache = new Dictionary<string, IList<DataRecord>>();
|
||
_logger = logger;
|
||
_options = options;
|
||
_outputErrorRecorder = outputErrorRecorder;
|
||
_context = context;
|
||
}
|
||
|
||
public Task WriteRecordAsync(DataRecord record)
|
||
{
|
||
_recordCache.AddOrUpdate(record.TableName, [record], (_, value) =>
|
||
{
|
||
value.Add(record);
|
||
return value;
|
||
});
|
||
return Task.CompletedTask;
|
||
}
|
||
|
||
public async Task WriteRecordsAsync(IEnumerable<DataRecord> records)
|
||
{
|
||
foreach (var record in records)
|
||
{
|
||
await WriteRecordAsync(record);
|
||
}
|
||
}
|
||
|
||
public async Task FlushAsync(int maxAllowPacket)
|
||
{
|
||
if (_recordCache.Count == 0)
|
||
return;
|
||
|
||
var cmd = _conn.CreateCommand();
|
||
cmd.CommandTimeout = 0;
|
||
|
||
try
|
||
{
|
||
var executionList = GetExecutionList(_recordCache, maxAllowPacket);
|
||
foreach (var insertSql in executionList)
|
||
{
|
||
cmd.CommandText = insertSql;
|
||
try
|
||
{
|
||
await cmd.ExecuteNonQueryAsync();
|
||
}
|
||
catch (Exception e)
|
||
{
|
||
_logger.LogError(e, "插入数据库时发生错误, sql: {Sql}", cmd.CommandText.Omit(1000));
|
||
_context.AddException(e);
|
||
var match = MatchTableName().Match(cmd.CommandText);
|
||
if (match is { Success: true, Groups.Count: > 1 })
|
||
{
|
||
var tableName = match.Groups[1].Value;
|
||
await _outputErrorRecorder.LogErrorSqlAsync(cmd.CommandText, tableName, e);
|
||
}
|
||
else await _outputErrorRecorder.LogErrorSqlAsync(cmd.CommandText, e);
|
||
}
|
||
}
|
||
_recordCache.Clear();
|
||
}
|
||
catch (Exception e)
|
||
{
|
||
_logger.LogError(e, "序列化记录时发生错误");
|
||
throw;
|
||
}
|
||
finally
|
||
{
|
||
await cmd.DisposeAsync();
|
||
}
|
||
}
|
||
|
||
[GeneratedRegex("INSERT INTO `([^`]+)`")]
|
||
private static partial Regex MatchTableName();
|
||
|
||
public IEnumerable<string> GetExecutionList(IDictionary<string, IList<DataRecord>> tableRecords, int maxAllowPacket)
|
||
{
|
||
var sb = new StringBuilder(_options.Value.FlushCount * 128);
|
||
var appendCount = 0;
|
||
foreach (var (tableName, records) in tableRecords)
|
||
{
|
||
if (records.Count == 0)
|
||
continue;
|
||
|
||
var recordIdx = 0;
|
||
StartBuild:
|
||
sb.AppendLine("SET AUTOCOMMIT = 0;\n");
|
||
var noCommas = true;
|
||
|
||
// 标准列顺序,插入时的字段需要按照该顺序排列
|
||
var headers = records[0].Headers;
|
||
|
||
// INSERT INTO ... VALUES >>>
|
||
sb.Append($"INSERT INTO `{tableName}`(");
|
||
for (var i = 0; i < headers.Count; i++)
|
||
{
|
||
var header = records[0].Headers[i];
|
||
sb.Append($"`{header}`");
|
||
if (i != headers.Count - 1)
|
||
sb.Append(',');
|
||
}
|
||
|
||
sb.Append(") VALUES ");
|
||
|
||
// ([FIELDS]), >>>
|
||
for (;recordIdx < records.Count; recordIdx++)
|
||
{
|
||
var record = records[recordIdx];
|
||
|
||
// 数据列校验
|
||
if (record.Headers.Count != headers.Count)
|
||
{
|
||
throw new InvalidOperationException($"数据异常,数据列数量出现冲突,表名:{tableName}");
|
||
}
|
||
|
||
var recordSb = new StringBuilder();
|
||
recordSb.Append('(');
|
||
for (var idx = 0; idx < headers.Count; idx++)
|
||
{
|
||
var header = headers[idx];
|
||
// TODO: 可进行性能优化
|
||
var field = record[header];
|
||
|
||
// 在这里处理特殊列
|
||
#region HandleFields
|
||
|
||
if (field.Length == 2 && field == ConstVar.MyDumperNull) // MyDumper导出的NULL为'\N'('\'不是转义字符)
|
||
{
|
||
recordSb.Append(ConstVar.Null);
|
||
goto Escape;
|
||
}
|
||
|
||
switch (_options.Value.GetColumnType(record.TableName, header))
|
||
{
|
||
case ColumnType.Text:
|
||
if(string.IsNullOrEmpty(field))
|
||
recordSb.Append("''");
|
||
else if (field == ConstVar.Null)
|
||
recordSb.Append(ConstVar.Null);
|
||
else recordSb.Append($"_utf8mb4 0x{field}");
|
||
break;
|
||
case ColumnType.Blob:
|
||
if (string.IsNullOrEmpty(field))
|
||
recordSb.Append("''");
|
||
else if (field == ConstVar.Null)
|
||
recordSb.Append(ConstVar.Null);
|
||
else recordSb.Append($"0x{field}");
|
||
break;
|
||
case ColumnType.Json: // Mydumper v0.16.7-5导出的Json为字符串,且会将逗号转义,需要适配
|
||
if(string.IsNullOrEmpty(field))
|
||
recordSb.Append(ConstVar.Null);
|
||
else if (_options.Value.TreatJsonAsHex)
|
||
recordSb.Append($"_utf8mb4 0x{field}");
|
||
else recordSb.AppendLine(field.Replace("\\,", ","));
|
||
break;
|
||
case ColumnType.UnDefine:
|
||
default:
|
||
recordSb.Append(field);
|
||
break;
|
||
}
|
||
|
||
Escape:
|
||
|
||
#endregion
|
||
if (idx != headers.Count - 1)
|
||
recordSb.Append(',');
|
||
}
|
||
|
||
recordSb.Append(')');
|
||
|
||
// 若字符数量即将大于限制,则返回SQL,清空StringBuilder,保留当前记录的索引值,然后转到StartBuild标签重新开始一轮INSERT
|
||
if (sb.Length + recordSb.Length + 23 > maxAllowPacket)
|
||
{
|
||
if (appendCount == 0) // 如果单条记录超出maxAllowedPacket
|
||
{
|
||
sb.Append(recordSb);
|
||
_logger.LogWarning("{Table}表单条数据的SQL超出了配置的MaxAllowedPacket,字符数{Count}", tableName,
|
||
sb.Length + recordSb.Length + 23);
|
||
}
|
||
|
||
TryAddForUpdateSuffix(tableName, sb);
|
||
sb.Append(';').AppendLine();
|
||
sb.Append("COMMIT;");
|
||
yield return sb.ToString();
|
||
sb.Clear();
|
||
goto StartBuild;
|
||
}
|
||
|
||
if (!noCommas)
|
||
sb.Append(',').AppendLine();
|
||
noCommas = false;
|
||
sb.Append(recordSb); // StringBuilder.Append(StringBuilder)不会分配多余的内存
|
||
appendCount++;
|
||
}
|
||
|
||
TryAddForUpdateSuffix(tableName, sb);
|
||
sb.Append(';');
|
||
sb.Append("COMMIT;");
|
||
yield return sb.ToString();
|
||
sb.Clear();
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 数据必须是同一张表
|
||
/// </summary>
|
||
/// <param name="tableName"></param>
|
||
/// <param name="sb"></param>
|
||
private void TryAddForUpdateSuffix(string tableName, StringBuilder sb)
|
||
{
|
||
var forUpdate = _options.Value.TryGetForUpdate(tableName, out var forUpdateSql);
|
||
if (forUpdate)
|
||
{
|
||
sb.AppendLine($"""
|
||
AS new
|
||
ON DUPLICATE KEY UPDATE
|
||
{forUpdateSql}
|
||
""");
|
||
}
|
||
}
|
||
|
||
public void Dispose()
|
||
{
|
||
_conn.Close();
|
||
_conn.Dispose();
|
||
_recordCache.Clear();
|
||
}
|
||
|
||
public async ValueTask DisposeAsync()
|
||
{
|
||
await _conn.CloseAsync();
|
||
await _conn.DisposeAsync();
|
||
_recordCache.Clear();
|
||
}
|
||
} |