MES-ETL/MesETL.App/Services/ETL/MySqlDestination.cs

267 lines
9.3 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Text;
using System.Text.RegularExpressions;
using MesETL.App.Const;
using MesETL.App.Helpers;
using MesETL.App.Options;
using MesETL.Shared.Helper;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySqlConnector;
namespace MesETL.App.Services.ETL;
/// <summary>
/// Mysql导出
/// </summary>
public partial class MySqlDestination : IDisposable, IAsyncDisposable
{
/// <summary>
/// table => records
/// </summary>
private readonly Dictionary<string, IList<DataRecord>> _recordCache;
private readonly MySqlConnection _conn;
private readonly ILogger _logger;
private readonly IOptions<DatabaseOutputOptions> _options;
private readonly ErrorRecorder.OutputErrorRecorder _outputErrorRecorder;
private readonly ProcessContext _context;
public MySqlDestination(
string connStr,
ILogger logger,
IOptions<DatabaseOutputOptions> options,
ErrorRecorder.OutputErrorRecorder outputErrorRecorder,
ProcessContext context)
{
_conn = new MySqlConnection(connStr);
_conn.Open();
_recordCache = new Dictionary<string, IList<DataRecord>>();
_logger = logger;
_options = options;
_outputErrorRecorder = outputErrorRecorder;
_context = context;
}
public Task WriteRecordAsync(DataRecord record)
{
_recordCache.AddOrUpdate(record.TableName, [record], (_, value) =>
{
value.Add(record);
return value;
});
return Task.CompletedTask;
}
public async Task WriteRecordsAsync(IEnumerable<DataRecord> records)
{
foreach (var record in records)
{
await WriteRecordAsync(record);
}
}
public async Task FlushAsync(int maxAllowPacket)
{
if (_recordCache.Count == 0)
return;
await using var cmd = _conn.CreateCommand();
cmd.CommandTimeout = 0;
try
{
var executionList = GetExecutionList(_recordCache, maxAllowPacket);
foreach (var insertSql in executionList)
{
cmd.CommandText = insertSql;
try
{
await cmd.ExecuteNonQueryAsync();
}
catch (Exception e)
{
_logger.LogError(e, "插入数据库时发生错误, sql: {Sql}", cmd.CommandText.Omit(1000));
_context.AddException(e);
var match = MatchTableName().Match(cmd.CommandText);
if (match is { Success: true, Groups.Count: > 1 })
{
var tableName = match.Groups[1].Value;
await _outputErrorRecorder.LogErrorSqlAsync(cmd.CommandText, tableName, e);
}
else await _outputErrorRecorder.LogErrorSqlAsync(cmd.CommandText, e);
}
}
_recordCache.Clear();
}
catch (Exception e)
{
_logger.LogError(e, "序列化记录时发生错误");
throw;
}
finally
{
await cmd.DisposeAsync();
}
}
[GeneratedRegex("INSERT INTO `([^`]+)`")]
private static partial Regex MatchTableName();
public IEnumerable<string> GetExecutionList(IDictionary<string, IList<DataRecord>> tableRecords, int maxAllowPacket)
{
var sb = new StringBuilder("SET AUTOCOMMIT = 1;\n");
var appendCount = 0;
foreach (var (tableName, records) in tableRecords)
{
if (records.Count == 0)
continue;
var recordIdx = 0;
StartBuild:
var noCommas = true;
// 标准列顺序,插入时的字段需要按照该顺序排列
var headers = records[0].Headers;
// INSERT INTO ... VALUES >>>
sb.Append($"INSERT INTO `{tableName}`(");
for (var i = 0; i < headers.Count; i++)
{
var header = records[0].Headers[i];
sb.Append($"`{header}`");
if (i != headers.Count - 1)
sb.Append(',');
}
sb.Append(") VALUES ");
// ([FIELDS]), >>>
for (;recordIdx < records.Count; recordIdx++)
{
var record = records[recordIdx];
// 数据列校验
if (record.Headers.Count != headers.Count)
{
throw new InvalidOperationException($"数据异常,数据列数量出现冲突,表名:{tableName}");
}
var recordSb = new StringBuilder();
recordSb.Append('(');
for (var idx = 0; idx < headers.Count; idx++)
{
var header = headers[idx];
// TODO: 可进行性能优化
var field = record[header];
// 在这里处理特殊列
#region HandleFields
if (field.Length == 2 && field == ConstVar.MyDumperNull) // MyDumper导出的NULL为'\N''\'不是转义字符)
{
recordSb.Append(ConstVar.Null);
goto Escape;
}
switch (_options.Value.GetColumnType(record.TableName, header))
{
case ColumnType.Text:
if(string.IsNullOrEmpty(field))
recordSb.Append("''");
else if (field == ConstVar.Null)
recordSb.Append(ConstVar.Null);
else recordSb.Append($"_utf8mb4 0x{field}");
break;
case ColumnType.Blob:
if (string.IsNullOrEmpty(field))
recordSb.Append("''");
else if (field == ConstVar.Null)
recordSb.Append(ConstVar.Null);
else recordSb.Append($"0x{field}");
break;
case ColumnType.Json: // Mydumper v0.16.7-5导出的Json为字符串且会将逗号转义需要适配
if(string.IsNullOrEmpty(field))
recordSb.Append(ConstVar.Null);
else if (_options.Value.TreatJsonAsHex)
recordSb.Append($"_utf8mb4 0x{field}");
else recordSb.AppendLine(field.Replace("\\,", ","));
break;
case ColumnType.UnDefine:
default:
recordSb.Append(field);
break;
}
Escape:
#endregion
if (idx != headers.Count - 1)
recordSb.Append(',');
}
recordSb.Append(')');
// 若字符数量即将大于限制则返回SQL清空StringBuilder保留当前记录的索引值然后转到StartBuild标签重新开始一轮INSERT
if (sb.Length + recordSb.Length + 23 > maxAllowPacket)
{
if (appendCount == 0) // 如果单条记录超出maxAllowedPacket
{
sb.Append(recordSb);
_logger.LogWarning("{Table}表单条数据的SQL超出了配置的MaxAllowedPacket字符数{Count}", tableName,
sb.Length + recordSb.Length + 23);
}
TryAddForUpdateSuffix(tableName, sb);
sb.Append(';').AppendLine();
sb.Append("SET AUTOCOMMIT = 1;");
yield return sb.ToString();
sb.Clear();
goto StartBuild;
}
if (!noCommas)
sb.Append(',').AppendLine();
noCommas = false;
sb.Append(recordSb); // StringBuilder.Append(StringBuilder)不会分配多余的内存
appendCount++;
}
TryAddForUpdateSuffix(tableName, sb);
sb.Append(';');
sb.Append("COMMIT;");
yield return sb.ToString();
sb.Clear();
}
}
/// <summary>
/// 数据必须是同一张表
/// </summary>
/// <param name="tableName"></param>
/// <param name="sb"></param>
private void TryAddForUpdateSuffix(string tableName, StringBuilder sb)
{
var forUpdate = _options.Value.TryGetForUpdate(tableName, out var forUpdateSql);
if (forUpdate)
{
sb.AppendLine($"""
AS new
ON DUPLICATE KEY UPDATE
{forUpdateSql}
""");
}
}
public void Dispose()
{
_conn.Close();
_conn.Dispose();
_recordCache.Clear();
}
public async ValueTask DisposeAsync()
{
await _conn.CloseAsync();
await _conn.DisposeAsync();
_recordCache.Clear();
}
}