diff --git a/ConsoleApp2/HostedServices/OutputService.cs b/ConsoleApp2/HostedServices/OutputService.cs index a6f2ccd..563f9b5 100644 --- a/ConsoleApp2/HostedServices/OutputService.cs +++ b/ConsoleApp2/HostedServices/OutputService.cs @@ -5,8 +5,6 @@ using ConsoleApp2.Services; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using MySqlConnector; -using System.Threading; namespace ConsoleApp2.HostedServices; @@ -21,13 +19,15 @@ public class OutputService : IOutputService private readonly IOptions _transformOptions; private readonly ProcessContext _context; private readonly TaskManager _taskManager; + private readonly ErrorRecorder _errorRecorder; public OutputService(ILogger logger, [FromKeyedServices(ProcessStep.Consumer)] DataRecordQueue consumerQueue, IOptions outputOptions, ProcessContext context, TaskManager taskManager, - IOptions transformOptions) + IOptions transformOptions, + ErrorRecorder errorRecorder) { _logger = logger; _consumerQueue = consumerQueue; @@ -35,6 +35,7 @@ public class OutputService : IOutputService _context = context; _taskManager = taskManager; _transformOptions = transformOptions; + _errorRecorder = errorRecorder; } public async Task ExecuteAsync(CancellationToken cancellationToken) @@ -78,7 +79,7 @@ public class OutputService : IOutputService var count = 0; await using var output = new MySqlDestination( _outputOptions.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"), - _logger, _context, _transformOptions); + _logger, _context, _transformOptions, _errorRecorder); //if (records == null || records.Count() == 0) return; //var dbName = $"cferp_test_1"; //if (records != null && records.Count() > 0) diff --git a/ConsoleApp2/Program.cs b/ConsoleApp2/Program.cs index 891e5dd..40d65f8 100644 --- a/ConsoleApp2/Program.cs +++ b/ConsoleApp2/Program.cs @@ -469,6 +469,7 @@ async Task RunProgram() options.TaskCount = commandOptions.TaskCount; options.FlushCount = commandOptions.FlushCount; }); + host.Services.AddLogging(builder => { builder.ClearProviders(); @@ -484,6 +485,7 @@ async Task RunProgram() host.Services.AddKeyedSingleton(ProcessStep.Producer); host.Services.AddKeyedSingleton(ProcessStep.Consumer); host.Services.AddTransient(); + host.Services.AddSingleton(); host.Services.AddHostedService(); host.Services.AddHostedService(); diff --git a/ConsoleApp2/Services/ErrorRecorder.cs b/ConsoleApp2/Services/ErrorRecorder.cs new file mode 100644 index 0000000..dd24f12 --- /dev/null +++ b/ConsoleApp2/Services/ErrorRecorder.cs @@ -0,0 +1,104 @@ +using System.Text; +using Microsoft.Extensions.Logging; + +namespace ConsoleApp2.Services; + +public class ErrorRecorder +{ + private readonly string _outputDir = "./ErrorRecords"; + private readonly ILogger _logger; + private readonly Dictionary _logIndex = new(); + + /// + /// 当次执行标识 + /// + private static readonly string UID = DateTime.Now.ToString("yyyy-MM-dd HH-mm-ss"); + + public ErrorRecorder(ILogger logger) + { + _logger = logger; + var dir = Path.Combine(_outputDir, UID); + if (!Directory.Exists(dir)) + { + Directory.CreateDirectory(dir); + } + } + + /// + /// 记录已知表名发生错误的SQL + /// + /// + /// + /// + public async Task LogErrorSqlAsync(string commandText, string tableName, Exception exception) + { + if (!_logIndex.TryGetValue(tableName, out var idx)) + { + idx = 0; + _logIndex.Add(tableName, idx); + } + var filePath = Path.Combine(_outputDir, UID, $"{tableName}-{idx}.errlog"); + + if (File.Exists(filePath) && new FileInfo(filePath).Length > 10 * 1024 * 1024) + { + ++idx; + _logIndex[tableName] = idx; + filePath = Path.Combine(_outputDir, UID, $"{tableName}-{idx}.errlog"); + } + var content = $""" + /* [{DateTime.Now:yyyy-MM-dd HH:mm:ss}] + * Error occurred when export table '{tableName}': + * {exception.Message} + */ + + {commandText} + + + """; + await File.AppendAllTextAsync(filePath, content, Encoding.UTF8); + } + + /// + /// 记录发生错误的SQL + /// + /// + /// + public async Task LogErrorSqlAsync(string commandText, Exception exception) + { + var filePath = Path.Combine(_outputDir, UID, "UnknownTables.errlog"); + var content = $""" + /* [{DateTime.Now:yyyy-MM-dd HH:mm:ss}] + * Error occurred when export table with unknown table name: + * {exception.Message} + */ + {commandText} + + + """; + await File.AppendAllTextAsync(filePath, content, Encoding.UTF8); + } + + public async Task LogErrorRecordsAsync(IDictionary records, Exception exception) + { + var pathDict = new Dictionary(); + foreach (var pair in records) + { + if(!pathDict.TryGetValue(pair.Key, out var path)) + { + path = Path.Combine(_outputDir, UID, "ErrorRecords", $"{pair.Key}.errlog"); + pathDict.Add(pair.Key, path); + } + // + await File.AppendAllTextAsync(path, string.Join(',', pair.Value.Fields)); + } + } + + public void ClearErrorRecords() + { + _logger.LogInformation("***** Clear error records *****"); + foreach (var file in Directory.GetFiles(_outputDir, "*.errlog", SearchOption.AllDirectories)) + { + File.Delete(file); + } + } +} \ No newline at end of file diff --git a/ConsoleApp2/Services/MySqlDestination.cs b/ConsoleApp2/Services/MySqlDestination.cs index dab0df6..8f38a8e 100644 --- a/ConsoleApp2/Services/MySqlDestination.cs +++ b/ConsoleApp2/Services/MySqlDestination.cs @@ -1,4 +1,6 @@ -using System.Text; +using System.Data.Common; +using System.Text; +using System.Text.RegularExpressions; using ConsoleApp2.Helpers; using ConsoleApp2.Options; using Microsoft.Extensions.Logging; @@ -11,14 +13,21 @@ namespace ConsoleApp2.Services; /// /// Mysql导出 /// -public class MySqlDestination : IDisposable, IAsyncDisposable +public partial class MySqlDestination : IDisposable, IAsyncDisposable { private readonly Dictionary> _recordCache; private readonly MySqlConnection _conn; private readonly ILogger _logger; private readonly ProcessContext _context; private readonly IOptions _transformOptions; - public MySqlDestination(string connStr, ILogger logger, ProcessContext context, IOptions transformOptions) + private readonly ErrorRecorder _errorRecorder; + + public MySqlDestination( + string connStr, + ILogger logger, + ProcessContext context, + IOptions transformOptions, + ErrorRecorder errorRecorder) { _conn = new MySqlConnection(connStr); _conn.Open(); @@ -26,6 +35,7 @@ public class MySqlDestination : IDisposable, IAsyncDisposable _logger = logger; _context = context; _transformOptions = transformOptions; + _errorRecorder = errorRecorder; } public Task WriteRecordAsync(DataRecord record) @@ -60,22 +70,37 @@ public class MySqlDestination : IDisposable, IAsyncDisposable foreach (var insertSql in excuseList) { cmd.CommandText = insertSql; - await cmd.ExecuteNonQueryAsync(); - + try + { + await cmd.ExecuteNonQueryAsync(); + } + catch (Exception e) + { + _logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000)); + + var match = MatchTableName().Match(cmd.CommandText); + if (match is { Success: true, Groups.Count: > 1 }) + { + var tableName = match.Groups[1].Value; + await _errorRecorder.LogErrorSqlAsync(cmd.CommandText, tableName, e); + } + else await _errorRecorder.LogErrorSqlAsync(cmd.CommandText, e); + } } _recordCache.Clear(); } catch (Exception e) { - _logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000)); - _context.AddException(e); - throw; + _logger.LogCritical(e, "Error when serialize records, record:"); } finally { await cmd.DisposeAsync(); } } + + [GeneratedRegex("INSERT INTO `([^`]+)`")] + private static partial Regex MatchTableName(); public IEnumerable GetExcuseList(IDictionary> tableRecords,int maxAllowPacket) {