using ConsoleApp2.Helpers; using Microsoft.Extensions.Logging; using System.IO; using System.Text.RegularExpressions; using ZstdSharp; namespace ConsoleApp2.Services { public class ZstSource : CsvSource { public ZstSource(string inputDir, string tableName, string delimiter = ",", char quoteChar = '"', ILogger? logger = null) : base(inputDir, tableName, delimiter = ",", quoteChar = '"', logger = null) { //throw new Exception("aaa"); string pattern = $"^.*\\.{tableName}\\..*\\.sql.zst$"; _sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success); } private async Task DecompressFile(string filePath) { using (var input = File.OpenRead(filePath)) { using (var decopress = new DecompressionStream(input)) { var ms = new MemoryStream(); decopress.CopyTo(ms); ms.Seek(0, SeekOrigin.Begin); StreamReader reader = new StreamReader(ms); var text = await reader.ReadToEndAsync(); return text; } } } public override async Task GetHeaderAndCsvFiles() { var text = await DecompressFile(_sqlFilePath); headers=await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text); csvFiles=await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'")); } public override async Task DoEnqueue(Action action) { await GetHeaderAndCsvFiles(); foreach (var file in csvFiles) { var filePath = Path.Combine(_inputDir, file); using (var input = File.OpenRead(filePath)) { using (var decopress = new DecompressionStream(input)) { using( var reader = new StreamReader(decopress)) { while (!reader.EndOfStream) { var line = await reader.ReadLineAsync(); var fields = ParseRow2(line, QuoteChar, Delimiter); var record = new DataRecord(fields, _tableName, headers); action?.Invoke(record); } } } } } } public override async Task GetTestRecord() { await GetHeaderAndCsvFiles(); var file = csvFiles.FirstOrDefault(); if (file != null) { var filePath = Path.Combine(_inputDir, file); using (var input = File.OpenRead(filePath)) { using (var decopress = new DecompressionStream(input)) { using (var reader = new StreamReader(decopress)) { var line = await reader.ReadLineAsync(); var fields = ParseRow2(line, QuoteChar, Delimiter); var record = new DataRecord(fields, _tableName, headers); return record; } } } } return null; } public void Dispose() { //_reader.Dispose(); } } }