添加异常记录器,记录输出时发生异常的SQL;

This commit is contained in:
陈梓阳 2024-01-18 15:03:45 +08:00
parent 1f9c9e0c13
commit 6ec782ec93
4 changed files with 144 additions and 12 deletions

View File

@ -5,8 +5,6 @@ using ConsoleApp2.Services;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using MySqlConnector;
using System.Threading;
namespace ConsoleApp2.HostedServices; namespace ConsoleApp2.HostedServices;
@ -21,13 +19,15 @@ public class OutputService : IOutputService
private readonly IOptions<DataTransformOptions> _transformOptions; private readonly IOptions<DataTransformOptions> _transformOptions;
private readonly ProcessContext _context; private readonly ProcessContext _context;
private readonly TaskManager _taskManager; private readonly TaskManager _taskManager;
private readonly ErrorRecorder _errorRecorder;
public OutputService(ILogger<OutputService> logger, public OutputService(ILogger<OutputService> logger,
[FromKeyedServices(ProcessStep.Consumer)] DataRecordQueue consumerQueue, [FromKeyedServices(ProcessStep.Consumer)] DataRecordQueue consumerQueue,
IOptions<DatabaseOutputOptions> outputOptions, IOptions<DatabaseOutputOptions> outputOptions,
ProcessContext context, ProcessContext context,
TaskManager taskManager, TaskManager taskManager,
IOptions<DataTransformOptions> transformOptions) IOptions<DataTransformOptions> transformOptions,
ErrorRecorder errorRecorder)
{ {
_logger = logger; _logger = logger;
_consumerQueue = consumerQueue; _consumerQueue = consumerQueue;
@ -35,6 +35,7 @@ public class OutputService : IOutputService
_context = context; _context = context;
_taskManager = taskManager; _taskManager = taskManager;
_transformOptions = transformOptions; _transformOptions = transformOptions;
_errorRecorder = errorRecorder;
} }
public async Task ExecuteAsync(CancellationToken cancellationToken) public async Task ExecuteAsync(CancellationToken cancellationToken)
@ -78,7 +79,7 @@ public class OutputService : IOutputService
var count = 0; var count = 0;
await using var output = new MySqlDestination( await using var output = new MySqlDestination(
_outputOptions.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"), _outputOptions.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"),
_logger, _context, _transformOptions); _logger, _context, _transformOptions, _errorRecorder);
//if (records == null || records.Count() == 0) return; //if (records == null || records.Count() == 0) return;
//var dbName = $"cferp_test_1"; //var dbName = $"cferp_test_1";
//if (records != null && records.Count() > 0) //if (records != null && records.Count() > 0)

View File

@ -469,6 +469,7 @@ async Task RunProgram()
options.TaskCount = commandOptions.TaskCount; options.TaskCount = commandOptions.TaskCount;
options.FlushCount = commandOptions.FlushCount; options.FlushCount = commandOptions.FlushCount;
}); });
host.Services.AddLogging(builder => host.Services.AddLogging(builder =>
{ {
builder.ClearProviders(); builder.ClearProviders();
@ -484,6 +485,7 @@ async Task RunProgram()
host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Producer); host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Producer);
host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Consumer); host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Consumer);
host.Services.AddTransient<TaskManager>(); host.Services.AddTransient<TaskManager>();
host.Services.AddSingleton<ErrorRecorder>();
host.Services.AddHostedService<MainHostedService>(); host.Services.AddHostedService<MainHostedService>();
host.Services.AddHostedService<TaskMonitorService>(); host.Services.AddHostedService<TaskMonitorService>();

View File

@ -0,0 +1,104 @@
using System.Text;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services;
public class ErrorRecorder
{
private readonly string _outputDir = "./ErrorRecords";
private readonly ILogger _logger;
private readonly Dictionary<string, int> _logIndex = new();
/// <summary>
/// 当次执行标识
/// </summary>
private static readonly string UID = DateTime.Now.ToString("yyyy-MM-dd HH-mm-ss");
public ErrorRecorder(ILogger<ErrorRecorder> logger)
{
_logger = logger;
var dir = Path.Combine(_outputDir, UID);
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
}
}
/// <summary>
/// 记录已知表名发生错误的SQL
/// </summary>
/// <param name="commandText"></param>
/// <param name="tableName"></param>
/// <param name="exception"></param>
public async Task LogErrorSqlAsync(string commandText, string tableName, Exception exception)
{
if (!_logIndex.TryGetValue(tableName, out var idx))
{
idx = 0;
_logIndex.Add(tableName, idx);
}
var filePath = Path.Combine(_outputDir, UID, $"{tableName}-{idx}.errlog");
if (File.Exists(filePath) && new FileInfo(filePath).Length > 10 * 1024 * 1024)
{
++idx;
_logIndex[tableName] = idx;
filePath = Path.Combine(_outputDir, UID, $"{tableName}-{idx}.errlog");
}
var content = $"""
/* [{DateTime.Now:yyyy-MM-dd HH:mm:ss}]
* Error occurred when export table '{tableName}':
* {exception.Message}
*/
{commandText}
""";
await File.AppendAllTextAsync(filePath, content, Encoding.UTF8);
}
/// <summary>
/// 记录发生错误的SQL
/// </summary>
/// <param name="commandText"></param>
/// <param name="exception"></param>
public async Task LogErrorSqlAsync(string commandText, Exception exception)
{
var filePath = Path.Combine(_outputDir, UID, "UnknownTables.errlog");
var content = $"""
/* [{DateTime.Now:yyyy-MM-dd HH:mm:ss}]
* Error occurred when export table with unknown table name:
* {exception.Message}
*/
{commandText}
""";
await File.AppendAllTextAsync(filePath, content, Encoding.UTF8);
}
public async Task LogErrorRecordsAsync(IDictionary<string, DataRecord> records, Exception exception)
{
var pathDict = new Dictionary<string, string>();
foreach (var pair in records)
{
if(!pathDict.TryGetValue(pair.Key, out var path))
{
path = Path.Combine(_outputDir, UID, "ErrorRecords", $"{pair.Key}.errlog");
pathDict.Add(pair.Key, path);
}
//
await File.AppendAllTextAsync(path, string.Join(',', pair.Value.Fields));
}
}
public void ClearErrorRecords()
{
_logger.LogInformation("***** Clear error records *****");
foreach (var file in Directory.GetFiles(_outputDir, "*.errlog", SearchOption.AllDirectories))
{
File.Delete(file);
}
}
}

View File

@ -1,4 +1,6 @@
using System.Text; using System.Data.Common;
using System.Text;
using System.Text.RegularExpressions;
using ConsoleApp2.Helpers; using ConsoleApp2.Helpers;
using ConsoleApp2.Options; using ConsoleApp2.Options;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -11,14 +13,21 @@ namespace ConsoleApp2.Services;
/// <summary> /// <summary>
/// Mysql导出 /// Mysql导出
/// </summary> /// </summary>
public class MySqlDestination : IDisposable, IAsyncDisposable public partial class MySqlDestination : IDisposable, IAsyncDisposable
{ {
private readonly Dictionary<string, IList<DataRecord>> _recordCache; private readonly Dictionary<string, IList<DataRecord>> _recordCache;
private readonly MySqlConnection _conn; private readonly MySqlConnection _conn;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly ProcessContext _context; private readonly ProcessContext _context;
private readonly IOptions<DataTransformOptions> _transformOptions; private readonly IOptions<DataTransformOptions> _transformOptions;
public MySqlDestination(string connStr, ILogger logger, ProcessContext context, IOptions<DataTransformOptions> transformOptions) private readonly ErrorRecorder _errorRecorder;
public MySqlDestination(
string connStr,
ILogger logger,
ProcessContext context,
IOptions<DataTransformOptions> transformOptions,
ErrorRecorder errorRecorder)
{ {
_conn = new MySqlConnection(connStr); _conn = new MySqlConnection(connStr);
_conn.Open(); _conn.Open();
@ -26,6 +35,7 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
_logger = logger; _logger = logger;
_context = context; _context = context;
_transformOptions = transformOptions; _transformOptions = transformOptions;
_errorRecorder = errorRecorder;
} }
public Task WriteRecordAsync(DataRecord record) public Task WriteRecordAsync(DataRecord record)
@ -60,22 +70,37 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
foreach (var insertSql in excuseList) foreach (var insertSql in excuseList)
{ {
cmd.CommandText = insertSql; cmd.CommandText = insertSql;
await cmd.ExecuteNonQueryAsync(); try
{
await cmd.ExecuteNonQueryAsync();
}
catch (Exception e)
{
_logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000));
var match = MatchTableName().Match(cmd.CommandText);
if (match is { Success: true, Groups.Count: > 1 })
{
var tableName = match.Groups[1].Value;
await _errorRecorder.LogErrorSqlAsync(cmd.CommandText, tableName, e);
}
else await _errorRecorder.LogErrorSqlAsync(cmd.CommandText, e);
}
} }
_recordCache.Clear(); _recordCache.Clear();
} }
catch (Exception e) catch (Exception e)
{ {
_logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000)); _logger.LogCritical(e, "Error when serialize records, record:");
_context.AddException(e);
throw;
} }
finally finally
{ {
await cmd.DisposeAsync(); await cmd.DisposeAsync();
} }
} }
[GeneratedRegex("INSERT INTO `([^`]+)`")]
private static partial Regex MatchTableName();
public IEnumerable<string> GetExcuseList(IDictionary<string, IList<DataRecord>> tableRecords,int maxAllowPacket) public IEnumerable<string> GetExcuseList(IDictionary<string, IList<DataRecord>> tableRecords,int maxAllowPacket)
{ {