This commit is contained in:
2024-01-29 09:29:16 +08:00
parent 4f96b77e55
commit 083090c62b
63 changed files with 2479 additions and 1491 deletions

View File

@@ -1,184 +0,0 @@
using System.Reflection.PortableExecutable;
using System.Text;
using System.Text.RegularExpressions;
using ConsoleApp2.Helpers;
using ConsoleApp2.HostedServices.Abstractions;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services;
/// <summary>
/// CSV文件读取
/// </summary>
public class CsvSource:IDataSource
{
protected readonly string _inputDir;
//protected readonly StreamReader _reader;
private readonly ILogger? _logger;
protected readonly string _tableName;
protected string? _sqlFilePath;
protected readonly string? _sqlFileText;
protected string[]? headers;
protected string[]? csvFiles;
public string? CurrentRaw { get; protected set; }
public string Delimiter { get; private set; }
public char QuoteChar { get; private set; }
public CsvSource(string inputDir,string tableName,string delimiter = ",", char quoteChar = '"',
ILogger? logger = null)
{
_inputDir = inputDir;
_tableName = tableName;
_logger = logger;
Delimiter = delimiter;
QuoteChar = quoteChar;
string pattern = $"^.*\\.{tableName}\\..*\\.sql$";
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success);
}
public string[] ParseRow(string row, char quoteChar, string delimiter)
{
var span = row.AsSpan();
var result = new List<string>();
if (span.Length == 0)
throw new ArgumentException("The row is empty", nameof(row));
var isInQuote = span[0] == quoteChar;
var start = 0;
for (var i = 1; i < span.Length; i++)
{
if (span[i] == quoteChar)
{
isInQuote = !isInQuote;
}
// delimiter需要足够复杂
else if (/*!isInQuote && */span.Length > i + delimiter.Length && span[i..(i + delimiter.Length)].Equals(delimiter, StringComparison.CurrentCulture)) // field matched
{
string field;
if (span[start] == quoteChar && span[i - 1] == quoteChar) // enclosed by quoteChar
field = span[(start + 1)..(i - 1)].ToString(); // escape quoteChar
else
field = span[start..i].ToString();
start = i + delimiter.Length;
if (field == "\\N")
field = "NULL";
result.Add(field);
continue;
}
}
result.Add(span[start..].ToString());
for (var i = 0; i < result.Count; i++)
{
var field = result[i];
if (DumpDataHelper.CheckHexField(field) && StringExtensions.CheckJsonHex(field))
{
result[i] = StringExtensions.FromHex(field);
}
}
return result.ToArray();
}
public string[] ParseRow2(ReadOnlySpan<char> source, char quoteChar, string delimiter)
{
var result = new List<string>();
var index = -1;
StringBuilder current = new StringBuilder();
bool hasQuote = false;
bool hasSlash = false;
while (index < source.Length-1)
{
index++;
if (hasSlash == false && source[index] == '\\')
{
hasSlash = true;
current.Append('\\');
continue;
}
if (hasSlash ==false && source[index] == quoteChar)
{
hasQuote = !hasQuote;
current.Append(source[index]);
continue;
}
if (hasQuote==false && source[index] == delimiter[0])
{
result.Add(current.ToString());
current.Clear();
}
else
{
current.Append(source[index]);
}
hasSlash = false;
}
result.Add(current.ToString());
return result.ToArray();
}
public virtual async Task GetHeaderAndCsvFiles()
{
var text = await File.ReadAllTextAsync(_sqlFilePath);
headers = await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
csvFiles = await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
}
public virtual async Task DoEnqueue(Action<DataRecord> action)
{
await GetHeaderAndCsvFiles();
foreach (var file in csvFiles)
{
var filePath= Path.Combine(_inputDir, file);
using (var fs = File.OpenRead(filePath))
{
using (StreamReader sr = new StreamReader(fs))
{
while (!sr.EndOfStream)
{
var line = await sr.ReadLineAsync();
var fields = ParseRow2(line, QuoteChar, Delimiter);
var record = new DataRecord(fields, _tableName, headers);
action?.Invoke(record);
}
}
}
}
}
public virtual async Task<DataRecord?> GetTestRecord()
{
await GetHeaderAndCsvFiles();
var file = csvFiles.FirstOrDefault();
if (file != null)
{
var filePath = Path.Combine(_inputDir, file);
using (var fs = File.OpenRead(filePath))
{
using (StreamReader sr = new StreamReader(fs))
{
var line = await sr.ReadLineAsync();
var fields = ParseRow2(line, QuoteChar, Delimiter);
var record = new DataRecord(fields, _tableName, headers);
return record;
}
}
}
return null;
}
public void Dispose()
{
// _reader.Dispose();
}
}

View File

@@ -17,9 +17,13 @@ public class DataRecordQueue : IDisposable
public event Action? OnRecordWrite;
public event Action? OnRecordRead;
public DataRecordQueue()
public DataRecordQueue() : this(1000000) // 默认容量最大1M
{
_queue = new BlockingCollection<DataRecord>(2000_000); // 队列最长为20W条记录
}
public DataRecordQueue(int boundedCapacity)
{
_queue = new BlockingCollection<DataRecord>(boundedCapacity);
}
public bool TryDequeue([MaybeNullWhen(false)] out DataRecord record)

View File

@@ -0,0 +1,106 @@
using System.Text;
using ConsoleApp2.HostedServices.Abstractions;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services.ETL;
/// <summary>
/// CSV文件读取
/// </summary>
public class CsvReader : IDataReader
{
protected readonly string? FilePath;
protected readonly Lazy<StreamReader> Reader;
protected readonly ILogger? Logger;
protected readonly string TableName;
public DataRecord Current { get; protected set; } = null!;
public string[] Headers { get; }
public string? CurrentRaw { get; protected set; }
public string Delimiter { get; }
public char QuoteChar { get; }
public CsvReader(Stream stream, string tableName, string[] headers, string delimiter = ",", char quoteChar = '"', ILogger? logger = null)
: this(tableName, headers, delimiter, quoteChar, logger)
{
Reader = new Lazy<StreamReader>(() => new StreamReader(stream));
}
public CsvReader(string filePath, string tableName, string[] headers, string delimiter = ",", char quoteChar = '"', ILogger? logger = null)
: this(tableName, headers, delimiter, quoteChar, logger)
{
var fs = File.OpenRead(filePath);
FilePath = filePath;
Reader = new Lazy<StreamReader>(() => new StreamReader(fs));
}
private CsvReader(string tableName, string[] headers, string delimiter = ",", char quoteChar = '"', ILogger? logger = null)
{
TableName = tableName;
Headers = headers;
Logger = logger;
Delimiter = delimiter;
QuoteChar = quoteChar;
Reader = null!;
}
public virtual async ValueTask<bool> ReadAsync()
{
var str = await Reader.Value.ReadLineAsync();
if (string.IsNullOrWhiteSpace(str))
return false;
CurrentRaw = str;
var fields = ParseRow(str, QuoteChar, Delimiter);
Current = new DataRecord(fields, TableName, Headers){RawField = str};
return true;
}
public string[] ParseRow(ReadOnlySpan<char> source, char quoteChar, string delimiter)
{
var result = new List<string>();
var index = -1;
var current = new StringBuilder();
var hasQuote = false;
var hasSlash = false;
while (index < source.Length - 1)
{
index++;
if (hasSlash == false && source[index] == '\\')
{
hasSlash = true;
current.Append('\\');
continue;
}
if (hasSlash == false && source[index] == quoteChar)
{
hasQuote = !hasQuote;
current.Append(source[index]);
continue;
}
if (hasQuote == false && source[index] == delimiter[0])
{
result.Add(current.ToString());
current.Clear();
}
else
{
current.Append(source[index]);
}
hasSlash = false;
}
result.Add(current.ToString());
return result.ToArray();
}
public virtual void Dispose()
{
if(Reader.IsValueCreated)
Reader.Value.Dispose();
}
}

View File

@@ -0,0 +1,46 @@
using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ConsoleApp2.Services.ETL;
public class DataReaderFactory
{
private readonly ILogger<DataReaderFactory> _logger;
private readonly IOptions<DataInputOptions> _options;
public DataReaderFactory(ILogger<DataReaderFactory> logger, IOptions<DataInputOptions> options)
{
_logger = logger;
_options = options;
}
public IDataReader CreateReader(string filePath, string tableName, string[] headers)
{
if (_options.Value.UseMock)
{
if (_options.Value.TableMockConfig is null)
throw new ApplicationException("未配置表模拟数据量级");
_logger.LogDebug("***** Using {Type} data source *****", "ZSTD mock");
var mockConfig = _options.Value.TableMockConfig.GetValueOrDefault(tableName,
new TableMockConfig { MockCount = 1, UseDeepCopy = false });
mockConfig.MockCount = (long)Math.Ceiling(mockConfig.MockCount * _options.Value.MockCountMultiplier);
return new ZstMockReader(mockConfig, filePath,
tableName, headers, _options.Value.Delimiter, _options.Value.QuoteChar, _logger);
}
_logger.LogDebug("***** Using {Type} data source *****", "ZSTD");
return new ZstReader(filePath, tableName, headers, _options.Value.Delimiter, _options.Value.QuoteChar, _logger);
}
}
public static class DataSourceFactoryExtensions
{
public static IServiceCollection AddDataSourceFactory(this IServiceCollection services)
{
services.AddSingleton<DataReaderFactory>();
return services;
}
}

View File

@@ -1,14 +1,12 @@
using System.Data.Common;
using System.Text;
using System.Text;
using System.Text.RegularExpressions;
using ConsoleApp2.Helpers;
using ConsoleApp2.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySqlConnector;
using ServiceStack;
namespace ConsoleApp2.Services;
namespace ConsoleApp2.Services.ETL;
/// <summary>
/// Mysql导出
@@ -18,29 +16,29 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
private readonly Dictionary<string, IList<DataRecord>> _recordCache;
private readonly MySqlConnection _conn;
private readonly ILogger _logger;
private readonly IOptions<DatabaseOutputOptions> _options;
private readonly ErrorRecorder.OutputErrorRecorder _outputErrorRecorder;
private readonly ProcessContext _context;
private readonly IOptions<DataTransformOptions> _transformOptions;
private readonly ErrorRecorder _errorRecorder;
public MySqlDestination(
string connStr,
ILogger logger,
ProcessContext context,
IOptions<DataTransformOptions> transformOptions,
ErrorRecorder errorRecorder)
IOptions<DatabaseOutputOptions> options,
ErrorRecorder.OutputErrorRecorder outputErrorRecorder,
ProcessContext context)
{
_conn = new MySqlConnection(connStr);
_conn.Open();
_recordCache = new Dictionary<string, IList<DataRecord>>();
_logger = logger;
_options = options;
_outputErrorRecorder = outputErrorRecorder;
_context = context;
_transformOptions = transformOptions;
_errorRecorder = errorRecorder;
}
public Task WriteRecordAsync(DataRecord record)
{
_recordCache.AddOrUpdate(record.TableName, [record], (key, value) =>
_recordCache.AddOrUpdate(record.TableName, [record], (_, value) =>
{
value.Add(record);
return value;
@@ -76,22 +74,23 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
}
catch (Exception e)
{
_logger.LogError(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000));
_logger.LogError(e, "插入数据库时发生错误, sql: {Sql}", cmd.CommandText.Omit(1000));
_context.AddException(e);
var match = MatchTableName().Match(cmd.CommandText);
if (match is { Success: true, Groups.Count: > 1 })
{
var tableName = match.Groups[1].Value;
await _errorRecorder.LogErrorSqlAsync(cmd.CommandText, tableName, e);
await _outputErrorRecorder.LogErrorSqlAsync(cmd.CommandText, tableName, e);
}
else await _errorRecorder.LogErrorSqlAsync(cmd.CommandText, e);
else await _outputErrorRecorder.LogErrorSqlAsync(cmd.CommandText, e);
}
}
_recordCache.Clear();
}
catch (Exception e)
{
_logger.LogError(e, "Error when serialize records, record:");
_logger.LogError(e, "序列化记录时发生错误");
throw;
}
finally
{
@@ -104,7 +103,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
public IEnumerable<string> GetExcuseList(IDictionary<string, IList<DataRecord>> tableRecords,int maxAllowPacket)
{
var sb = new StringBuilder();
var sb = new StringBuilder("SET AUTOCOMMIT = 1;\n");
foreach (var (tableName, records) in tableRecords)
{
if (records.Count == 0)
@@ -116,11 +115,11 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
// INSERT INTO ... VALUES >>>
sb.Append($"INSERT INTO `{tableName}`(");
for (var i = 0; i < records[0].Headers.Length; i++)
for (var i = 0; i < records[0].Headers.Count; i++)
{
var header = records[0].Headers[i];
sb.Append($"`{header}`");
if (i != records[0].Headers.Length - 1)
if (i != records[0].Headers.Count - 1)
sb.Append(',');
}
@@ -132,7 +131,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
var record = records[recordIdx];
var recordSb = new StringBuilder();
recordSb.Append('(');
for (var fieldIdx = 0; fieldIdx < record.Fields.Length; fieldIdx++)
for (var fieldIdx = 0; fieldIdx < record.Fields.Count; fieldIdx++)
{
var field = record.Fields[fieldIdx];
@@ -144,12 +143,12 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
goto Escape;
}
switch (_transformOptions.Value.GetColumnType(record.TableName, record.Headers[fieldIdx]))
switch (_options.Value.GetColumnType(record.TableName, record.Headers[fieldIdx]))
{
case ColumnType.Text:
recordSb.Append(string.IsNullOrEmpty(field)
? "''"
: _transformOptions.Value.TransformBinary?.Invoke(field) ?? field);
if(string.IsNullOrEmpty(field))
recordSb.Append("''");
else recordSb.Append($"_utf8mb4 0x{field}");
break;
case ColumnType.Blob:
if (string.IsNullOrEmpty(field))
@@ -157,9 +156,11 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
else recordSb.Append($"0x{field}");
break;
case ColumnType.Json:
recordSb.Append(string.IsNullOrEmpty(field)
? "\"[]\""
: _transformOptions.Value.TransformBinary?.Invoke(field) ?? field);
if(string.IsNullOrEmpty(field))
recordSb.Append("'[]'"); // JObject or JArray?
else if (_options.Value.TreatJsonAsHex)
recordSb.Append($"_utf8mb4 0x{field}");
else recordSb.AppendLine(field);
break;
case ColumnType.UnDefine:
default:
@@ -170,16 +171,17 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
Escape:
#endregion
if (fieldIdx != record.Fields.Length - 1)
if (fieldIdx != record.Fields.Count - 1)
recordSb.Append(',');
}
recordSb.Append(')');
// 若字符数量即将大于限制则返回SQL清空StringBuilder保留当前记录的索引值然后转到StartBuild标签重新开始一轮INSERT
if (sb.Length + recordSb.Length + 1 > maxAllowPacket)
if (sb.Length + recordSb.Length + 23 > maxAllowPacket)
{
sb.Append(';');
sb.Append(';').AppendLine();
sb.Append("SET AUTOCOMMIT = 1;");
yield return sb.ToString();
sb.Clear();
goto StartBuild;
@@ -192,6 +194,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
}
sb.Append(';');
sb.Append("COMMIT;");
yield return sb.ToString();
sb.Clear();
}

View File

@@ -0,0 +1,64 @@
using ConsoleApp2.Options;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services.ETL;
/// <summary>
/// 截取提供ZST文件中的第一行然后复制成指定数量的数据
/// </summary>
public class ZstMockReader : ZstReader
{
private long _currentCount;
private readonly long _mockCount;
private DataRecord? _template;
private readonly bool _deepCopy;
private readonly string[]? _autoIncrementColumn;
static readonly IReadOnlyList<int> Range = [500, 1500, 2500];
public ZstMockReader(TableMockConfig mockConfig, string filePath, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null) : base(filePath, tableName, headers, delimiter, quoteChar, logger)
{
_mockCount = mockConfig.MockCount;
_deepCopy = mockConfig.UseDeepCopy;
_autoIncrementColumn = mockConfig.AutoIncrementColumn;
}
public ZstMockReader(TableMockConfig mockConfig, Stream stream, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null) : base(stream, tableName, headers, delimiter, quoteChar, logger)
{
_mockCount = mockConfig.MockCount;
_deepCopy = mockConfig.UseDeepCopy;
_autoIncrementColumn = mockConfig.AutoIncrementColumn;
}
public override async ValueTask<bool> ReadAsync()
{
if (_template is null)
{
if (!await base.ReadAsync())
throw new InvalidOperationException("所提供的ZST源为空无法生成模板数据");
_template = Current.Clone() as DataRecord;
if (_template is null)
throw new ApplicationException("记录拷贝失败");
_currentCount++;
return true;
}
if (_deepCopy)
{
Current = _template.Clone() as DataRecord ?? throw new ApplicationException("记录拷贝失败");
if(_autoIncrementColumn is not null)
{
foreach (var column in _autoIncrementColumn)
{
Current[column] = (Convert.ToInt64(Current[column]) + 1).ToString();
_template = Current;
}
}
Current["CompanyID"] = Range[Random.Shared.Next(0, Range.Count)].ToString();//随机CompanyID
}
else Current = _template;
_currentCount++;
return _currentCount < _mockCount;
}
}

View File

@@ -0,0 +1,48 @@
using Microsoft.Extensions.Logging;
using ZstdSharp;
namespace ConsoleApp2.Services.ETL;
/// <summary>
/// 解压ZST文件从中读取CSV数据
/// </summary>
public class ZstReader : CsvReader
{
protected new readonly Lazy<StreamReader> Reader;
public ZstReader(string filePath, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null)
: base(filePath, tableName, headers, delimiter, quoteChar, logger)
{
var ds = new DecompressionStream(File.OpenRead(filePath));
Reader = new Lazy<StreamReader>(() => new StreamReader(ds));
}
public ZstReader(Stream stream, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null)
: base(stream, tableName, headers, delimiter, quoteChar, logger)
{
var ds = new DecompressionStream(stream);
Reader = new Lazy<StreamReader>(() => new StreamReader(ds));
}
public override async ValueTask<bool> ReadAsync()
{
var str = await Reader.Value.ReadLineAsync();
if (string.IsNullOrWhiteSpace(str))
return false;
CurrentRaw = str;
var fields = ParseRow(str, QuoteChar, Delimiter);
Current = new DataRecord(fields, TableName, Headers) {RawField = str};
return true;
}
public override void Dispose()
{
base.Dispose();
if(Reader.IsValueCreated)
Reader.Value.Dispose();
}
}

View File

@@ -0,0 +1,79 @@
using ConsoleApp2.Helpers;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services.ErrorRecorder;
public class ErrorRecorder
{
protected ILogger Logger;
/// <summary>
/// 当次执行标识
/// </summary>
public static readonly string UID = DateTime.Now.ToString("yyyy-MM-dd HH-mm-ss");
public ErrorRecorder(ILogger logger)
{
Logger = logger;
}
public static async Task LogErrorRecordAsync(string outputDir, DataRecord record, Exception exception)
{
if(!Directory.Exists(outputDir))
Directory.CreateDirectory(outputDir);
var content = $"""
### {exception.Message}
{record.RawField}
""";
var path = Path.Combine(outputDir, $"{record.TableName}.errlog");
await File.AppendAllTextAsync(path, content);
}
public static async Task LogErrorRecordAsync(string outputDir, IEnumerable<DataRecord> records, Exception exception)
{
if(!Directory.Exists(outputDir))
Directory.CreateDirectory(outputDir);
var tableMapping = new Dictionary<string, Tuple<List<DataRecord>, StreamWriter>>();
foreach (var record in records)
{
tableMapping.AddOrUpdate(record.TableName,
Tuple.Create((List<DataRecord>) [record], new StreamWriter(File.OpenRead(record.TableName))),
(_, tuple) =>
{
tuple.Item1.Add(record);
return tuple;
});
}
var maxParallelism = 5;
for (var i = 0; i < tableMapping.Count; i+=maxParallelism)
{
await Parallel.ForEachAsync(tableMapping.Take(maxParallelism), async (pair, token) =>
{
var (records, writer) = pair.Value;
foreach (var record in records)
{
var content =
$"""
### {exception.Message}
{record.RawField}
""";
await writer.WriteLineAsync(content);
if (token.IsCancellationRequested)
break;
}
await writer.DisposeAsync();
});
}
}
public void ClearErrorRecords(string dir)
{
Logger.LogInformation("***** Clear error records *****");
foreach (var file in Directory.GetFiles(dir, "*.errlog", SearchOption.AllDirectories))
{
File.Delete(file);
}
}
}

View File

@@ -0,0 +1,27 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services.ErrorRecorder;
public class ErrorRecorderFactory
{
private readonly ILogger<ErrorRecorderFactory> _logger;
public ErrorRecorderFactory(ILogger<ErrorRecorderFactory> logger)
{
_logger = logger;
}
public OutputErrorRecorder CreateOutput(string database) => new(database, _logger);
public TransformErrorRecorder CreateTransform() => new(_logger);
public InputErrorRecorder CreateInput() => new(_logger);
}
public static class ErrorRecorderFactoryExtensions
{
public static IServiceCollection AddErrorRecorderFactory(this IServiceCollection services)
{
services.AddSingleton<ErrorRecorderFactory>();
return services;
}
}

View File

@@ -0,0 +1,19 @@
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services.ErrorRecorder;
public sealed class InputErrorRecorder : ErrorRecorder
{
private readonly string _outputDir =
Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"ErrorRecords/{UID}/Input");
public InputErrorRecorder(ILogger logger) : base(logger)
{
}
public Task LogErrorRecordAsync(DataRecord record, Exception exception) =>
LogErrorRecordAsync(_outputDir, record, exception);
public Task LogErrorRecordAsync(IEnumerable<DataRecord> records, Exception exception) =>
LogErrorRecordAsync(_outputDir, records, exception);
}

View File

@@ -1,27 +1,19 @@
using System.Text;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services;
namespace ConsoleApp2.Services.ErrorRecorder;
public class ErrorRecorder
public sealed class OutputErrorRecorder : ErrorRecorder
{
private readonly string _outputDir = "./ErrorRecords";
private readonly ILogger _logger;
private readonly string _outputDir = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"/ErrorRecords/{UID}/Output");
private readonly string _database;
private readonly Dictionary<string, int> _logIndex = new();
/// <summary>
/// 当次执行标识
/// </summary>
private static readonly string UID = DateTime.Now.ToString("yyyy-MM-dd HH-mm-ss");
public ErrorRecorder(ILogger<ErrorRecorder> logger)
public OutputErrorRecorder(string database, ILogger logger) : base(logger)
{
_logger = logger;
var dir = Path.Combine(_outputDir, UID);
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
}
_database = database;
Logger = logger;
}
/// <summary>
@@ -32,25 +24,28 @@ public class ErrorRecorder
/// <param name="exception"></param>
public async Task LogErrorSqlAsync(string commandText, string tableName, Exception exception)
{
if (!Directory.Exists(_outputDir))
Directory.CreateDirectory(_outputDir);
if (!_logIndex.TryGetValue(tableName, out var idx))
{
idx = 0;
_logIndex.Add(tableName, idx);
}
var filePath = Path.Combine(_outputDir, UID, $"{tableName}-{idx}.errlog");
var filePath = Path.Combine(_outputDir, $"{tableName}-{idx}.errlog");
if (File.Exists(filePath) && new FileInfo(filePath).Length > 10 * 1024 * 1024)
{
++idx;
_logIndex[tableName] = idx;
filePath = Path.Combine(_outputDir, UID, $"{tableName}-{idx}.errlog");
filePath = Path.Combine(_outputDir, $"{tableName}-{idx}.errlog");
}
var content = $"""
/* [{DateTime.Now:yyyy-MM-dd HH:mm:ss}]
* Error occurred when export table '{tableName}':
* Error occurred when export table '{_database}.{tableName}':
* {exception.Message}
*/
USE `{_database}`;
{commandText}
@@ -65,7 +60,9 @@ public class ErrorRecorder
/// <param name="exception"></param>
public async Task LogErrorSqlAsync(string commandText, Exception exception)
{
var filePath = Path.Combine(_outputDir, UID, "UnknownTables.errlog");
if (!Directory.Exists(_outputDir))
Directory.CreateDirectory(_outputDir);
var filePath = Path.Combine(_outputDir, "UnknownTables.errlog");
var content = $"""
/* [{DateTime.Now:yyyy-MM-dd HH:mm:ss}]
* Error occurred when export table with unknown table name:
@@ -78,27 +75,4 @@ public class ErrorRecorder
await File.AppendAllTextAsync(filePath, content, Encoding.UTF8);
}
public async Task LogErrorRecordsAsync(IDictionary<string, DataRecord> records, Exception exception)
{
var pathDict = new Dictionary<string, string>();
foreach (var pair in records)
{
if(!pathDict.TryGetValue(pair.Key, out var path))
{
path = Path.Combine(_outputDir, UID, "ErrorRecords", $"{pair.Key}.errlog");
pathDict.Add(pair.Key, path);
}
//
await File.AppendAllTextAsync(path, string.Join(',', pair.Value.Fields));
}
}
public void ClearErrorRecords()
{
_logger.LogInformation("***** Clear error records *****");
foreach (var file in Directory.GetFiles(_outputDir, "*.errlog", SearchOption.AllDirectories))
{
File.Delete(file);
}
}
}

View File

@@ -0,0 +1,20 @@
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services.ErrorRecorder;
public sealed class TransformErrorRecorder : ErrorRecorder
{
private readonly string _outputDir =
Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"ErrorRecords/{UID}/Transform");
public TransformErrorRecorder(ILogger logger) : base(logger)
{
}
public Task LogErrorRecordAsync(DataRecord record, Exception exception) =>
LogErrorRecordAsync(_outputDir, record, exception);
public Task LogErrorRecordAsync(IEnumerable<DataRecord> records, Exception exception) =>
LogErrorRecordAsync(_outputDir, records, exception);
}

View File

@@ -1,44 +0,0 @@
using ConsoleApp2.Helpers;
using ConsoleApp2.HostedServices.Abstractions;
using Microsoft.Extensions.Logging;
using ServiceStack.Text;
namespace ConsoleApp2.Services;
/// <summary>
/// 读取Jsv格式文件
/// </summary>
[Obsolete]
public class JsvSource:IDataSource
{
private readonly string _inputDir;
private readonly JsvStringSerializer _jsv;
private readonly StreamReader _reader;
// ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable
private readonly ILogger? _logger;
private readonly string _tableName;
public DataRecord Current { get; protected set; } = null!;
public string[]? Headers { get; }
public bool EndOfSource => _reader.EndOfStream;
public JsvSource(string inputDir,string tableName, ILogger? logger = null)
{
_inputDir = inputDir;
_tableName = tableName;
_jsv = new JsvStringSerializer();
// _reader = new StreamReader(filePath);
//Headers = headers;
_logger = logger;
// _logger?.LogInformation("Reading file: {FilePath}", filePath);
//_tableName = DumpDataHelper.GetTableName(filePath);
}
public async Task DoEnqueue(Action<DataRecord> action)
{
}
public void Dispose()
{
_reader.Dispose();
}
}

View File

@@ -0,0 +1,19 @@
using ConsoleApp2.Cache;
namespace ConsoleApp2.Services.Loggers;
public class CacheTaskMonitorLogger : ITaskMonitorLogger
{
private readonly ICacher _cacher;
public CacheTaskMonitorLogger(ICacher cacher)
{
_cacher = cacher;
}
public void LogStatus(string name, IReadOnlyDictionary<string, string> properties, ITaskMonitorLogger.LogLevel logLevel)
{
if(logLevel is ITaskMonitorLogger.LogLevel.Progress)
_cacher.SetHashAsync(name, properties);
}
}

View File

@@ -0,0 +1,12 @@
namespace ConsoleApp2.Services.Loggers;
public interface ITaskMonitorLogger
{
public enum LogLevel
{
Info,
Debug,
Progress,
}
void LogStatus(string name, IReadOnlyDictionary<string, string> properties, LogLevel logLevel = LogLevel.Info);
}

View File

@@ -0,0 +1,41 @@
using System.Text;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services.Loggers;
public class LoggerTaskMonitorLogger : ITaskMonitorLogger
{
private readonly ILogger _logger;
public LoggerTaskMonitorLogger(ILogger<LoggerTaskMonitorLogger> logger)
{
_logger = logger;
}
public void LogStatus(string name, IReadOnlyDictionary<string, string> properties, ITaskMonitorLogger.LogLevel logLevel)
{
var sb = new StringBuilder();
sb.Append($"{name}: {{");
sb.AppendJoin(',', properties.Select((pair, i) => $" {pair.Key}: {pair.Value}"));
sb.Append('}');
// var args = new List<string> { name };
// properties.Aggregate(args, (args, pair) =>
// {
// args.Add(pair.Key);
// args.Add(pair.Value);
// return args;
// });
switch (logLevel)
{
case ITaskMonitorLogger.LogLevel.Info:
_logger.LogInformation("{message}", sb.ToString());
break;
case ITaskMonitorLogger.LogLevel.Progress:
case ITaskMonitorLogger.LogLevel.Debug:
_logger.LogDebug("{message}", sb.ToString());
break;
default:
throw new ArgumentOutOfRangeException(nameof(logLevel), logLevel, null);
}
}
}

View File

@@ -1,39 +1,49 @@
namespace ConsoleApp2.Services;
using System.Collections.Concurrent;
namespace ConsoleApp2.Services;
/// <summary>
/// 处理上下文类,标识处理进度
/// </summary>
public class ProcessContext
{
private int _inputCount;
private int _transformCount;
private int _outputCount;
private bool _hasException;
private long _inputCount;
private long _transformCount;
private long _outputCount;
private readonly ConcurrentDictionary<string, long> _tableProgress = new();
public bool HasException => _hasException;
public bool IsInputCompleted { get; private set; }
public bool IsTransformCompleted { get; private set; }
public bool IsOutputCompleted { get; private set; }
public int InputCount
public long InputCount
{
get => _inputCount;
private set => _inputCount = value;
set => Interlocked.Exchange(ref _inputCount, value);
}
public int TransformCount
public long TransformCount
{
get => _transformCount;
private set => _transformCount = value;
set => Interlocked.Exchange(ref _transformCount, value);
}
public int OutputCount
public long OutputCount
{
get => _outputCount;
private set => _outputCount = value;
set => Interlocked.Exchange(ref _outputCount, value);
}
// TableName -> Count
public IReadOnlyDictionary<string, long> TableProgress => _tableProgress;
public void CompleteInput() => IsInputCompleted = true;
public void CompleteTransform() => IsTransformCompleted = true;
public void CompleteOutput() => IsOutputCompleted = true;
public bool AddException(Exception e) => _hasException = true;
public void AddInput() => Interlocked.Increment(ref _inputCount);
@@ -44,4 +54,17 @@ public class ProcessContext
public void AddOutput() => Interlocked.Increment(ref _outputCount);
public void AddOutput(int count) => Interlocked.Add(ref _outputCount, count);
public void AddTableOutput(string table, int count)
{
_tableProgress.AddOrUpdate(table, count, (k, v) => v + count);
AddOutput(count);
}
public long GetTableOutput(string table)
{
if(!_tableProgress.TryGetValue(table, out var count))
throw new ApplicationException($"未找到表{table}输出记录");
return count;
}
}

View File

@@ -0,0 +1,65 @@
using System.Collections.Concurrent;
using Microsoft.Extensions.DependencyInjection;
namespace ConsoleApp2.Services;
public class RecordQueuePool
{
private readonly ConcurrentDictionary<string, DataRecordQueue> _queues = new();
public IReadOnlyDictionary<string, DataRecordQueue> Queues => _queues;
public void AddQueue(string key, int boundedCapacity = 200_0000) => AddQueue(key, new DataRecordQueue(boundedCapacity));
public void AddQueue(string key, DataRecordQueue queue)
{
if (!_queues.TryAdd(key, queue))
throw new InvalidOperationException($"请勿添加重复的队列,队列名: {key}");
}
public void RemoveQueue(string key, bool dispose = true)
{
if (!_queues.Remove(key, out var queue))
throw new InvalidOperationException($"未找到对应的队列,队列名:{key}");
if (dispose) queue.Dispose();
}
public DataRecordQueue GetQueue(string key)
{
return _queues[key];
}
public DataRecordQueue this[string key]
{
get => GetQueue(key);
set => AddQueue(key, value);
}
}
public static class MultiRecordQueueExtensions
{
public static IServiceCollection AddRecordQueuePool(this IServiceCollection services, params string[] keys)
{
var pool = new RecordQueuePool();
foreach (var key in keys)
{
pool.AddQueue(key);
}
services.AddSingleton(pool);
return services;
}
public static IServiceCollection AddRecordQueuePool(this IServiceCollection services,
params (string key, DataRecordQueue queue)[] queues)
{
var pool = new RecordQueuePool();
foreach (var (key, queue) in queues)
{
pool.AddQueue(key, queue);
}
services.AddSingleton(pool);
return services;
}
}

View File

@@ -1,5 +1,5 @@
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
using ApplicationException = System.ApplicationException;
using TaskExtensions = ConsoleApp2.Helpers.TaskExtensions;
namespace ConsoleApp2.Services;
@@ -8,42 +8,74 @@ namespace ConsoleApp2.Services;
/// </summary>
public class TaskManager
{
private readonly ConcurrentBag<Task> _tasks;
private readonly ILogger _logger;
private int _runningTaskCount;
public int TaskCount => _tasks.Count;
public int RunningTaskCount => _tasks.Count(task => !task.IsCompleted);
public IReadOnlyCollection<Task> Tasks => _tasks;
public int RunningTaskCount => _runningTaskCount;
public int MaxTaskCount { get; }
public TaskManager(ILogger<TaskManager> logger)
public event Action<Exception>? OnException;
public event Action? OnTaskCompleteSuccessfully;
public TaskManager(int maxTaskCount)
{
_tasks = new ConcurrentBag<Task>();
_logger = logger;
MaxTaskCount = maxTaskCount;
}
public void CreateTask(Func<Task> func, CancellationToken cancellationToken = default)
public async ValueTask<Task> CreateTaskAsync(Func<Task> func, CancellationToken cancellationToken = default)
{
var task = Task.Run(func, cancellationToken);
_tasks.Add(task);
_logger.LogDebug("New task created");
await TaskExtensions.WaitUntil(() => _runningTaskCount < MaxTaskCount, 25, cancellationToken);
return RunTask(func, cancellationToken);
}
public async ValueTask<Task> CreateTaskAsync(Func<object?, Task> func, object? arg, CancellationToken ct = default)
{
await TaskExtensions.WaitUntil(() => _runningTaskCount < MaxTaskCount, 25, ct);
return RunTaskNoClosure(func, arg, ct);
}
public void CreateTasks(Func<Task> func,int taskCount, CancellationToken cancellationToken = default)
private Task RunTask(Func<Task> func, CancellationToken cancellationToken = default)
{
for (int i = 0; i < taskCount; i++)
var task = Task.Run(async () =>
{
CreateTask(func, cancellationToken);
}
}
public async Task WaitAll()
{
await Task.WhenAll(_tasks);
try
{
await func();
OnTaskCompleteSuccessfully?.Invoke();
}
catch(Exception ex)
{
OnException?.Invoke(ex);
}
finally
{
Interlocked.Decrement(ref _runningTaskCount);
}
}, cancellationToken);
Interlocked.Increment(ref _runningTaskCount);
return task;
}
public void ClearTask()
private Task RunTaskNoClosure(Func<object?, Task> func, object? arg, CancellationToken cancellationToken = default)
{
if(RunningTaskCount != 0)
throw new InvalidOperationException("Unable to clear task. There are still running tasks");
_tasks.Clear();
var task = Task.Factory.StartNew(async obj => // 性能考虑这个lambda中不要捕获任何外部变量!
{
if (obj is not Tuple<Func<object?, Task>, object?> tuple)
throw new ApplicationException("这个异常不该出现");
try
{
await tuple.Item1(tuple.Item2);
OnTaskCompleteSuccessfully?.Invoke();
}
catch(Exception ex)
{
OnException?.Invoke(ex);
}
finally
{
Interlocked.Decrement(ref _runningTaskCount);
}
}, Tuple.Create(func, arg), cancellationToken).Unwrap();
Interlocked.Increment(ref _runningTaskCount);
return task;
}
}

View File

@@ -1,96 +0,0 @@
using ConsoleApp2.Helpers;
using Microsoft.Extensions.Logging;
using System.IO;
using System.Text.RegularExpressions;
using ZstdSharp;
namespace ConsoleApp2.Services
{
public class ZstSource : CsvSource
{
public ZstSource(string inputDir, string tableName, string delimiter = ",", char quoteChar = '"',
ILogger? logger = null) : base(inputDir, tableName, delimiter = ",", quoteChar = '"', logger = null)
{
//throw new Exception("aaa");
string pattern = $"^.*\\.{tableName}\\..*\\.sql.zst$";
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success);
}
private async Task<string> DecompressFile(string filePath)
{
using (var input = File.OpenRead(filePath))
{
using (var decopress = new DecompressionStream(input))
{
var ms = new MemoryStream();
decopress.CopyTo(ms);
ms.Seek(0, SeekOrigin.Begin);
StreamReader reader = new StreamReader(ms);
var text = await reader.ReadToEndAsync();
return text;
}
}
}
public override async Task GetHeaderAndCsvFiles()
{
var text = await DecompressFile(_sqlFilePath);
headers=await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
csvFiles=await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
}
public override async Task DoEnqueue(Action<DataRecord> action)
{
await GetHeaderAndCsvFiles();
foreach (var file in csvFiles)
{
var filePath = Path.Combine(_inputDir, file);
using (var input = File.OpenRead(filePath))
{
using (var decopress = new DecompressionStream(input))
{
using( var reader = new StreamReader(decopress))
{
while (!reader.EndOfStream)
{
var line = await reader.ReadLineAsync();
var fields = ParseRow2(line, QuoteChar, Delimiter);
var record = new DataRecord(fields, _tableName, headers);
action?.Invoke(record);
}
}
}
}
}
}
public override async Task<DataRecord?> GetTestRecord()
{
await GetHeaderAndCsvFiles();
var file = csvFiles.FirstOrDefault();
if (file != null)
{
var filePath = Path.Combine(_inputDir, file);
using (var input = File.OpenRead(filePath))
{
using (var decopress = new DecompressionStream(input))
{
using (var reader = new StreamReader(decopress))
{
var line = await reader.ReadLineAsync();
var fields = ParseRow2(line, QuoteChar, Delimiter);
var record = new DataRecord(fields, _tableName, headers);
return record;
}
}
}
}
return null;
}
public void Dispose()
{
//_reader.Dispose();
}
}
}