MES-ETL/ConsoleApp2/Services/ZstSource.cs
2024-01-22 16:58:05 +08:00

91 lines
3.4 KiB
C#

using ConsoleApp2.Helpers;
using Microsoft.Extensions.Logging;
using System.Text.RegularExpressions;
using ZstdSharp;
namespace ConsoleApp2.Services
{
public class ZstSource : CsvSource
{
public ZstSource(string inputDir, string tableName, string delimiter = ",", char quoteChar = '"',
ILogger? logger = null) : base(inputDir, tableName, delimiter = ",", quoteChar = '"', logger = null)
{
}
private static async Task<string> DecompressFile(string filePath)
{
using var input = File.OpenRead(filePath);
{
using var decopress = new DecompressionStream(input);
{
var ms = new MemoryStream();
decopress.CopyTo(ms);
ms.Seek(0, SeekOrigin.Begin);
StreamReader reader = new(ms);
var text = await reader.ReadToEndAsync();
return text;
}
}
}
public override async Task GetHeaderAndCsvFiles()
{
string pattern = $"^.*\\.{_tableName}\\..*\\.sql.zst$";
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success) ?? "";
var text = await DecompressFile(_sqlFilePath);
headers= DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
csvFiles= DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
}
public override async Task DoEnqueue(Action<DataRecord> action)
{
await GetHeaderAndCsvFiles();
foreach (var file in csvFiles)
{
var filePath = Path.Combine(_inputDir, file);
using var input = File.OpenRead(filePath);
{
using var decopress = new DecompressionStream(input);
{
using var reader = new StreamReader(decopress);
{
while (!reader.EndOfStream)
{
var line = await reader.ReadLineAsync();
var fields = ParseRow2(line, QuoteChar, Delimiter);
var record = new DataRecord(fields, _tableName, headers);
action?.Invoke(record);
}
}
}
}
}
}
public override async Task<DataRecord?> GetTestRecord()
{
await GetHeaderAndCsvFiles();
var file = csvFiles?.FirstOrDefault();
if (file != null)
{
var filePath = Path.Combine(_inputDir, file);
using var input = File.OpenRead(filePath);
{
using var decopress = new DecompressionStream(input);
{
using var reader = new StreamReader(decopress);
{
var line = await reader.ReadLineAsync();
var fields = ParseRow2(line, QuoteChar, Delimiter);
var record = new DataRecord(fields, _tableName, headers);
return record;
}
}
}
}
return null;
}
}
}