MES-ETL/ConsoleApp2/Services/ZstSource.cs

91 lines
3.4 KiB
C#
Raw Normal View History

2024-01-12 16:50:37 +08:00
using ConsoleApp2.Helpers;
using Microsoft.Extensions.Logging;
using System.Text.RegularExpressions;
using ZstdSharp;
namespace ConsoleApp2.Services
{
public class ZstSource : CsvSource
{
public ZstSource(string inputDir, string tableName, string delimiter = ",", char quoteChar = '"',
ILogger? logger = null) : base(inputDir, tableName, delimiter = ",", quoteChar = '"', logger = null)
{
string pattern = $"^.*\\.{tableName}\\..*\\.sql.zst$";
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success);
}
2024-01-22 15:44:37 +08:00
private static async Task<string> DecompressFile(string filePath)
2024-01-12 16:50:37 +08:00
{
2024-01-22 15:44:37 +08:00
using var input = File.OpenRead(filePath);
2024-01-12 16:50:37 +08:00
{
2024-01-22 15:44:37 +08:00
using var decopress = new DecompressionStream(input);
2024-01-12 16:50:37 +08:00
{
var ms = new MemoryStream();
decopress.CopyTo(ms);
ms.Seek(0, SeekOrigin.Begin);
2024-01-22 15:44:37 +08:00
StreamReader reader = new(ms);
2024-01-12 16:50:37 +08:00
var text = await reader.ReadToEndAsync();
return text;
}
}
}
2024-01-15 17:26:44 +08:00
public override async Task GetHeaderAndCsvFiles()
2024-01-12 16:50:37 +08:00
{
var text = await DecompressFile(_sqlFilePath);
2024-01-22 15:44:37 +08:00
headers= DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
csvFiles= DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
2024-01-12 16:50:37 +08:00
}
public override async Task DoEnqueue(Action<DataRecord> action)
{
2024-01-15 17:26:44 +08:00
await GetHeaderAndCsvFiles();
foreach (var file in csvFiles)
2024-01-12 16:50:37 +08:00
{
var filePath = Path.Combine(_inputDir, file);
2024-01-22 15:44:37 +08:00
using var input = File.OpenRead(filePath);
2024-01-12 16:50:37 +08:00
{
2024-01-22 15:44:37 +08:00
using var decopress = new DecompressionStream(input);
2024-01-12 16:50:37 +08:00
{
2024-01-22 15:44:37 +08:00
using var reader = new StreamReader(decopress);
2024-01-12 16:50:37 +08:00
{
2024-01-15 17:26:44 +08:00
while (!reader.EndOfStream)
{
var line = await reader.ReadLineAsync();
var fields = ParseRow2(line, QuoteChar, Delimiter);
var record = new DataRecord(fields, _tableName, headers);
action?.Invoke(record);
}
2024-01-12 16:50:37 +08:00
}
}
}
2024-01-15 17:26:44 +08:00
2024-01-12 16:50:37 +08:00
}
}
public override async Task<DataRecord?> GetTestRecord()
{
2024-01-15 17:26:44 +08:00
await GetHeaderAndCsvFiles();
var file = csvFiles.FirstOrDefault();
2024-01-12 16:50:37 +08:00
if (file != null)
{
var filePath = Path.Combine(_inputDir, file);
using (var input = File.OpenRead(filePath))
{
using (var decopress = new DecompressionStream(input))
{
2024-01-15 17:26:44 +08:00
using (var reader = new StreamReader(decopress))
{
var line = await reader.ReadLineAsync();
var fields = ParseRow2(line, QuoteChar, Delimiter);
var record = new DataRecord(fields, _tableName, headers);
return record;
}
2024-01-12 16:50:37 +08:00
}
}
}
return null;
}
}
}