using System.Data.Common;
using System.Text;
using System.Text.RegularExpressions;
using ConsoleApp2.Helpers;
using ConsoleApp2.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySqlConnector;
using ServiceStack;
namespace ConsoleApp2.Services;
///
/// Mysql导出
///
public partial class MySqlDestination : IDisposable, IAsyncDisposable
{
private readonly Dictionary> _recordCache;
private readonly MySqlConnection _conn;
private readonly ILogger _logger;
private readonly ProcessContext _context;
private readonly IOptions _transformOptions;
private readonly ErrorRecorder _errorRecorder;
public MySqlDestination(
string connStr,
ILogger logger,
ProcessContext context,
IOptions transformOptions,
ErrorRecorder errorRecorder)
{
_conn = new MySqlConnection(connStr);
_conn.Open();
_recordCache = new Dictionary>();
_logger = logger;
_context = context;
_transformOptions = transformOptions;
_errorRecorder = errorRecorder;
}
public Task WriteRecordAsync(DataRecord record)
{
_recordCache.AddOrUpdate(record.TableName, [record], (key, value) =>
{
value.Add(record);
return value;
});
return Task.CompletedTask;
}
public async Task WriteRecordsAsync(IEnumerable records)
{
foreach (var record in records)
{
await WriteRecordAsync(record);
}
}
public async Task FlushAsync(int maxAllowPacket)
{
if (_recordCache.Count == 0)
return;
var cmd = _conn.CreateCommand();
cmd.CommandTimeout = 3 * 60;
try
{
var excuseList = GetExcuseList(_recordCache, maxAllowPacket).ToList();
foreach (var insertSql in excuseList)
{
cmd.CommandText = insertSql;
try
{
await cmd.ExecuteNonQueryAsync();
}
catch (Exception e)
{
_logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000));
_context.AddException(e);
var match = MatchTableName().Match(cmd.CommandText);
if (match is { Success: true, Groups.Count: > 1 })
{
var tableName = match.Groups[1].Value;
await _errorRecorder.LogErrorSqlAsync(cmd.CommandText, tableName, e);
}
else await _errorRecorder.LogErrorSqlAsync(cmd.CommandText, e);
}
}
_recordCache.Clear();
}
catch (Exception e)
{
_logger.LogCritical(e, "Error when serialize records, record:");
_context.AddException(e);
}
finally
{
await cmd.DisposeAsync();
}
}
[GeneratedRegex("INSERT INTO `([^`]+)`")]
private static partial Regex MatchTableName();
public IEnumerable GetExcuseList(IDictionary> tableRecords,int maxAllowPacket)
{
var sb = new StringBuilder();
foreach (var (tableName, records) in tableRecords)
{
if (records.Count == 0)
continue;
var recordIdx = 0;
StartBuild:
var noCommas = true;
// INSERT INTO ... VALUES >>>
sb.Append($"INSERT INTO `{tableName}`(");
for (var i = 0; i < records[0].Headers.Length; i++)
{
var header = records[0].Headers[i];
sb.Append($"`{header}`");
if (i != records[0].Headers.Length - 1)
sb.Append(',');
}
sb.Append(") VALUES ");
// ([FIELDS]), >>>
for (;recordIdx < records.Count; recordIdx++)
{
var record = records[recordIdx];
var recordSb = new StringBuilder();
recordSb.Append('(');
for (var fieldIdx = 0; fieldIdx < record.Fields.Length; fieldIdx++)
{
var field = record.Fields[fieldIdx];
// 在这里处理特殊列
#region HandleFields
if (field == "\\N")
{
recordSb.Append("NULL");
goto Escape;
}
switch (_transformOptions.Value.GetColumnType(record.TableName, record.Headers[fieldIdx]))
{
case ColumnType.Text:
recordSb.Append(string.IsNullOrEmpty(field)
? "''"
: _transformOptions.Value.TransformBinary?.Invoke(field) ?? field);
break;
case ColumnType.Blob:
if (string.IsNullOrEmpty(field))
recordSb.Append("''");
else recordSb.Append($"0x{field}");
break;
case ColumnType.Json:
recordSb.Append(string.IsNullOrEmpty(field)
? "\"[]\""
: _transformOptions.Value.TransformBinary?.Invoke(field) ?? field);
break;
case ColumnType.UnDefine:
default:
recordSb.Append(field);
break;
}
Escape:
#endregion
if (fieldIdx != record.Fields.Length - 1)
recordSb.Append(',');
}
recordSb.Append(')');
// 若字符数量即将大于限制,则返回SQL,清空StringBuilder,保留当前记录的索引值,然后转到StartBuild标签重新开始一轮INSERT
if (sb.Length + recordSb.Length + 1 > maxAllowPacket)
{
sb.Append(';');
yield return sb.ToString();
sb.Clear();
goto StartBuild;
}
if (!noCommas)
sb.Append(',').AppendLine();
noCommas = false;
sb.Append(recordSb); // StringBuilder.Append(StringBuilder)不会分配多余的内存
}
sb.Append(';');
yield return sb.ToString();
sb.Clear();
}
}
public void Dispose()
{
_conn.Close();
_conn.Dispose();
}
public async ValueTask DisposeAsync()
{
await _conn.CloseAsync();
await _conn.DisposeAsync();
}
}