This commit is contained in:
2024-01-12 16:50:37 +08:00
parent eab3695f53
commit 0984853c79
28 changed files with 1115 additions and 166 deletions

View File

@@ -1,5 +1,7 @@
using System.Text;
using System.Text.RegularExpressions;
using ConsoleApp2.Helpers;
using ConsoleApp2.HostedServices.Abstractions;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services;
@@ -7,45 +9,52 @@ namespace ConsoleApp2.Services;
/// <summary>
/// CSV文件读取
/// </summary>
public class CsvSource
public class CsvSource:IDataSource
{
private readonly string _filePath;
private readonly StreamReader _reader;
protected readonly string _inputDir;
//protected readonly StreamReader _reader;
private readonly ILogger? _logger;
private readonly string _tableName;
protected readonly string _tableName;
protected string _sqlFilePath;
protected readonly string _sqlFileText;
public DataRecord Current { get; private set; }
public string[]? Headers { get; }
public string? CurrentRaw { get; private set; }
//public DataRecord Current { get; protected set; }
//public string[]? Headers { get; }
public string? CurrentRaw { get; protected set; }
public string Delimiter { get; private set; }
public char QuoteChar { get; private set; }
public CsvSource(string filePath, string[]? headers = null, string delimiter = ",", char quoteChar = '"',
public CsvSource(string inputDir,string tableName,string delimiter = ",", char quoteChar = '"',
ILogger? logger = null)
{
_filePath = filePath;
Headers = headers;
_inputDir = inputDir;
_tableName = tableName;
//Headers = headers;
_logger = logger;
Delimiter = delimiter;
QuoteChar = quoteChar;
var fs = File.OpenRead(filePath);
_reader = new StreamReader(fs);
_tableName = DumpDataHelper.GetTableName(filePath);
//var fs = File.OpenRead(filePath);
//_reader = new StreamReader(fs);
//_tableName = DumpDataHelper.GetTableName(filePath);
string pattern = $"^.*\\.{tableName}\\..*\\.sql$";
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success);
}
public async ValueTask<bool> ReadAsync()
{
var str = await _reader.ReadLineAsync();
if (string.IsNullOrWhiteSpace(str))
return false;
//public virtual async ValueTask<bool> ReadAsync()
//{
// var str = await _reader.ReadLineAsync();
// if (string.IsNullOrWhiteSpace(str))
// return false;
CurrentRaw = str;
// CurrentRaw = str;
var fields = ParseRow2(str, QuoteChar, Delimiter);
Current = new DataRecord(fields, _tableName, Headers);
return true;
}
// var fields = ParseRow2(str, QuoteChar, Delimiter);
// Current = new DataRecord(fields, _tableName, Headers);
// return true;
//}
public string[] ParseRow(string row, char quoteChar, string delimiter)
{
@@ -136,4 +145,64 @@ public class CsvSource
result.Add(current.ToString());
return result.ToArray();
}
public virtual async Task<string[]> GetHeaders()
{
var text = await File.ReadAllTextAsync(_sqlFilePath);
return await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
}
public virtual async Task<string[]> GetCsvFiles()
{
var text= await File.ReadAllTextAsync(_sqlFilePath);
return await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text,new Regex(@"'.+\.dat'"));
}
public virtual async Task DoEnqueue(Action<DataRecord> action)
{
var sourceFiles =await GetCsvFiles();
foreach (var file in sourceFiles)
{
var headers = await GetHeaders();
var filePath= Path.Combine(_inputDir, file);
using (var fs = File.OpenRead(filePath))
{
using (StreamReader sr = new StreamReader(fs))
{
while (!sr.EndOfStream)
{
var line = await sr.ReadLineAsync();
var fields = ParseRow2(line, QuoteChar, Delimiter);
var record = new DataRecord(fields, _tableName, headers);
action?.Invoke(record);
}
}
}
}
}
public virtual async Task<DataRecord?> GetTestRecord()
{
var sourceFiles = await GetCsvFiles();
var file = sourceFiles.FirstOrDefault();
if (file != null)
{
var headers = await GetHeaders();
var filePath = Path.Combine(_inputDir, file);
using (var fs = File.OpenRead(filePath))
{
using (StreamReader sr = new StreamReader(fs))
{
var line = await sr.ReadLineAsync();
var fields = ParseRow2(line, QuoteChar, Delimiter);
var record = new DataRecord(fields, _tableName, headers);
return record;
}
}
}
return null;
}
public void Dispose()
{
// _reader.Dispose();
}
}

View File

@@ -19,7 +19,7 @@ public class DataRecordQueue : IDisposable
public DataRecordQueue()
{
_queue = new BlockingCollection<DataRecord>(200_000); // 队列最长为20W条记录
_queue = new BlockingCollection<DataRecord>(2000_000); // 队列最长为20W条记录
}
public bool TryDequeue([MaybeNullWhen(false)] out DataRecord record)

View File

@@ -1,4 +1,5 @@
using ConsoleApp2.Helpers;
using ConsoleApp2.HostedServices.Abstractions;
using Microsoft.Extensions.Logging;
using ServiceStack.Text;
@@ -8,9 +9,9 @@ namespace ConsoleApp2.Services;
/// 读取Jsv格式文件
/// </summary>
[Obsolete]
public class JsvSource : IDisposable
public class JsvSource:IDataSource
{
private readonly string _filePath;
private readonly string _inputDir;
private readonly JsvStringSerializer _jsv;
private readonly StreamReader _reader;
// ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable
@@ -21,29 +22,22 @@ public class JsvSource : IDisposable
public string[]? Headers { get; }
public bool EndOfSource => _reader.EndOfStream;
public JsvSource(string filePath, string[]? headers = null, ILogger? logger = null)
public JsvSource(string inputDir,string tableName, ILogger? logger = null)
{
_filePath = filePath;
_inputDir = inputDir;
_tableName = tableName;
_jsv = new JsvStringSerializer();
_reader = new StreamReader(filePath);
Headers = headers;
// _reader = new StreamReader(filePath);
//Headers = headers;
_logger = logger;
// _logger?.LogInformation("Reading file: {FilePath}", filePath);
_tableName = DumpDataHelper.GetTableName(filePath);
//_tableName = DumpDataHelper.GetTableName(filePath);
}
public async ValueTask<bool> ReadAsync()
public async Task DoEnqueue(Action<DataRecord> action)
{
var str = await _reader.ReadLineAsync();
if (string.IsNullOrEmpty(str))
return false;
var fields = _jsv.DeserializeFromString<string[]>(str);
Current = new DataRecord(fields, _tableName, Headers);
return true;
}
public void Dispose()
public void Dispose()
{
_reader.Dispose();
}

View File

@@ -2,6 +2,7 @@
using ConsoleApp2.Helpers;
using Microsoft.Extensions.Logging;
using MySqlConnector;
using ServiceStack;
namespace ConsoleApp2.Services;
@@ -14,16 +15,21 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
private readonly MySqlConnection _conn;
private readonly ILogger _logger;
private readonly bool _prettyOutput;
public MySqlDestination(string connStr, ILogger logger, bool prettyOutput = false)
private readonly int _maxAllowPacket;
private readonly ProcessContext _context;
private static StringBuilder recordSb = new StringBuilder();
public MySqlDestination(string connStr, ILogger logger, ProcessContext context,bool prettyOutput = false)
{
_conn = new MySqlConnection(connStr);
_conn.Open();
_recordCache = new Dictionary<string, IList<DataRecord>>();
_logger = logger;
_context = context;
_prettyOutput = prettyOutput;
}
}
public Task WriteRecordAsync(DataRecord record)
{
_recordCache.AddOrUpdate(record.TableName, [record], (key, value) =>
@@ -42,74 +48,113 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
}
}
public async Task FlushAsync()
public async Task FlushAsync(int maxAllowPacket)
{
if (_recordCache.Count == 0)
return;
var cmd = _conn.CreateCommand();
cmd.CommandText = SerializeRecords(_recordCache, _prettyOutput);
//var cmd = _conn.CreateCommand();
//cmd.CommandTimeout = 3 * 60;
try
{
await cmd.ExecuteNonQueryAsync();
var excuseList = GetExcuseList(_recordCache, maxAllowPacket, _prettyOutput);
//foreach (var insertSql in excuseList)
//{
// //cmd.CommandText = insertSql;
// //await cmd.ExecuteNonQueryAsync();
// //_logger.LogInformation(@"do insert completed!size:{Length}", cmd.CommandText.Length);
//}
_recordCache.Clear();
}
catch (Exception e)
{
_logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000));
//_logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000));
_context.AddException(e);
throw;
}
finally
{
await cmd.DisposeAsync();
//await cmd.DisposeAsync();
}
}
public static string SerializeRecords(IDictionary<string, IList<DataRecord>> tableRecords,
bool prettyOutput = false)
public static IList<string> GetExcuseList(IDictionary<string, IList<DataRecord>> tableRecords,int maxAllowPacket,
bool prettyOutput = false)
{
var sb = new StringBuilder();
var resultList = new List<string>();
var headerSb = string.Empty;
//var recordSb = new StringBuilder();
recordSb.Clear();
foreach (var (tableName, records) in tableRecords)
{
if (records.Count == 0)
continue;
sb.Append($"INSERT INTO `{tableName}`(");
headerSb=$"INSERT INTO `{tableName}`(";
for (var i = 0; i < records[0].Headers.Length; i++)
{
var header = records[0].Headers[i];
sb.Append($"`{header}`");
headerSb+=$"`{header}`";
if (i != records[0].Headers.Length - 1)
sb.Append(',');
headerSb.Append(',');
}
sb.Append(") VALUES ");
headerSb+=") VALUES ";
if (prettyOutput)
sb.AppendLine();
headerSb+="/r/n";
var sbList = new List<string>();
var currentLength = headerSb.Length;
for (var i = 0; i < records.Count; i++)
{
var record = records[i];
sb.Append('(');
recordSb.Append('(');
for (var j = 0; j < record.Fields.Length; j++)
{
var field = record.Fields[j];
sb.Append(field);
recordSb.Append(field);
if (j != record.Fields.Length - 1)
sb.Append(',');
recordSb.Append(',');
}
sb.Append(')');
recordSb.Append(')');
if (i != records.Count - 1) // not last field
sb.Append(',');
if (prettyOutput) sb.AppendLine();
//if (i != records.Count - 1) // not last field
// recordSb.Append(',');
if (prettyOutput) recordSb.AppendLine();
if (currentLength + recordSb.Length >= maxAllowPacket)
{
var insertSb = headerSb;
insertSb+=string.Join(",", sbList);
insertSb += ";";
resultList.Add(insertSb);
insertSb=String.Empty;
sbList.Clear();
currentLength = headerSb.Length;
sbList.Add(recordSb.ToString());
}
else
{
sbList.Add(recordSb.ToString());
}
currentLength += recordSb.Length;
recordSb.Clear();
}
sb.AppendLine(";");
if (sbList.Count > 0)
{
var insertSb = headerSb.ToString();
insertSb += string.Join(",", sbList);
insertSb += ";";
resultList.Add(insertSb.ToString());
insertSb=string.Empty;
}
headerSb=string.Empty;
}
return sb.ToString();
return resultList;
}

View File

@@ -8,6 +8,7 @@ public class ProcessContext
private int _inputCount;
private int _transformCount;
private int _outputCount;
private IList<Exception> _exceptionList = new List<Exception>();
public bool IsInputCompleted { get; private set; }
public bool IsTransformCompleted { get; private set; }
public bool IsOutputCompleted { get; private set; }
@@ -29,7 +30,14 @@ public class ProcessContext
get => _outputCount;
private set => _outputCount = value;
}
public void AddException(Exception ex)
{
_exceptionList.Add(ex);
}
public IList<Exception> GetExceptions()
{
return _exceptionList;
}
public void CompleteInput() => IsInputCompleted = true;
public void CompleteTransform() => IsTransformCompleted = true;

View File

@@ -27,7 +27,13 @@ public class TaskManager
_tasks.Add(task);
_logger.LogDebug("New task created");
}
public void CreateTasks<TResult>(Func<TResult> func,int taskCount, CancellationToken cancellationToken = default)
{
for (int i = 0; i < taskCount; i++)
{
CreateTask(func, cancellationToken);
}
}
public async Task WaitAll()
{
await Task.WhenAll(_tasks);

View File

@@ -0,0 +1,126 @@
using ConsoleApp2.Helpers;
using Microsoft.Extensions.Logging;
using System.IO;
using System.Text.RegularExpressions;
using ZstdSharp;
namespace ConsoleApp2.Services
{
public class ZstSource : CsvSource
{
public ZstSource(string inputDir, string tableName, string delimiter = ",", char quoteChar = '"',
ILogger? logger = null) : base(inputDir, tableName, delimiter = ",", quoteChar = '"', logger = null)
{
//throw new Exception("aaa");
string pattern = $"^.*\\.{tableName}\\..*\\.sql.zst$";
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success);
}
private async Task<string> DecompressFile(string filePath)
{
using (var input = File.OpenRead(filePath))
{
using (var decopress = new DecompressionStream(input))
{
var ms = new MemoryStream();
decopress.CopyTo(ms);
ms.Seek(0, SeekOrigin.Begin);
StreamReader reader = new StreamReader(ms);
var text = await reader.ReadToEndAsync();
return text;
}
}
}
public override async Task<string[]> GetHeaders()
{
var text = await DecompressFile(_sqlFilePath);
return await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
}
public override async Task<string[]> GetCsvFiles()
{
var text = await DecompressFile(_sqlFilePath);
return await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
}
public override async Task DoEnqueue(Action<DataRecord> action)
{
var sourceFiles = await GetCsvFiles();
var headers = await GetHeaders();
foreach (var file in sourceFiles)
{
var filePath = Path.Combine(_inputDir, file);
using (var input = File.OpenRead(filePath))
{
using (var decopress = new DecompressionStream(input))
{
var ms = new MemoryStream();
decopress.CopyTo(ms);
ms.Seek(0, SeekOrigin.Begin);
StreamReader reader = new StreamReader(ms);
while (!reader.EndOfStream)
{
var line = await reader.ReadLineAsync();
var fields = ParseRow2(line, QuoteChar, Delimiter);
var record = new DataRecord(fields, _tableName, headers);
action?.Invoke(record);
}
}
}
//var headers = await GetHeaders();
//using (StreamReader sr = new StreamReader(file))
//{
// while (!sr.EndOfStream)
// {
// var line = await sr.ReadLineAsync();
// var fields = ParseRow2(line, QuoteChar, Delimiter);
// var record = new DataRecord(fields, _tableName, headers);
// action?.Invoke(record);
// }
//}
}
}
public override async Task<DataRecord?> GetTestRecord()
{
var sourceFiles = await GetCsvFiles();
var file = sourceFiles.FirstOrDefault();
if (file != null)
{
var headers = await GetHeaders();
var filePath = Path.Combine(_inputDir, file);
using (var input = File.OpenRead(filePath))
{
using (var decopress = new DecompressionStream(input))
{
var ms = new MemoryStream();
decopress.CopyTo(ms);
ms.Seek(0, SeekOrigin.Begin);
StreamReader reader = new StreamReader(ms);
var line = await reader.ReadLineAsync();
var fields = ParseRow2(line, QuoteChar, Delimiter);
var record = new DataRecord(fields, _tableName, headers);
return record;
}
}
//using (var fs = File.OpenRead(filePath))
//{
// using (StreamReader sr = new StreamReader(fs))
// {
// var line = await sr.ReadLineAsync();
// var fields = ParseRow2(line, QuoteChar, Delimiter);
// var record = new DataRecord(fields, _tableName, headers);
// return record;
// }
//}
}
return null;
}
public void Dispose()
{
//_reader.Dispose();
}
}
}