diff --git a/MesETL.App/Services/ETL/ZstReader.cs b/MesETL.App/Services/ETL/ZstReader.cs index c9704f3..f3435c0 100644 --- a/MesETL.App/Services/ETL/ZstReader.cs +++ b/MesETL.App/Services/ETL/ZstReader.cs @@ -1,4 +1,6 @@ -using Microsoft.Extensions.Logging; +using System.Runtime.InteropServices; +using System.Text; +using Microsoft.Extensions.Logging; using ZstdSharp; namespace MesETL.App.Services.ETL; @@ -11,7 +13,12 @@ public class ZstReader : CsvReader protected new readonly Lazy Reader; private Stream? _stream; - + private readonly List _str = new(1024); + private readonly char[] _charBuffer = new char[1024]; + private int _charLen = 0; + private int _charPos = 0; + + public ZstReader(string filePath, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null) : base(filePath, tableName, headers, delimiter, quoteChar, logger) { @@ -20,6 +27,7 @@ public class ZstReader : CsvReader _stream = new DecompressionStream(File.OpenRead(filePath)); return new StreamReader(_stream); }, false); + ReadBuffer(); } public ZstReader(Stream stream, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null) @@ -27,16 +35,56 @@ public class ZstReader : CsvReader { var ds = new DecompressionStream(stream); Reader = new Lazy(() => new StreamReader(ds), false); + ReadBuffer(); + } + + private int ReadBuffer() + { + _charLen = _charPos = 0; + _charLen = Reader.Value.ReadBlock(_charBuffer); + return _charLen; } public override async ValueTask ReadAsync() { - var str = await Reader.Value.ReadLineAsync(); - if (string.IsNullOrWhiteSpace(str)) + // 缓冲区已经读取完毕,并且流状态为EOF + if (_charPos == _charLen && ReadBuffer() == 0) return false; - var fields = ParseRowFaster(str, QuoteChar, Delimiter[0]); - Current = new DataRecord(fields, TableName, Headers); + do + { + // 读取缓冲区 + var span = _charBuffer.AsSpan(_charPos, _charLen - _charPos); + var newLineIdx = span.IndexOfAny('\r', '\n'); + // 读取到行,结合当前构建字符串转换进行转换 + if (newLineIdx >= 0) + { + if (_str.Count == 0) // => 可以提高一点性能... + { + var fields = ParseRowFaster(span[..newLineIdx], QuoteChar, Delimiter[0]); + Current = new DataRecord(fields, TableName, Headers); + } + else + { + _str.AddRange(span[..newLineIdx]); + var fields = ParseRowFaster(CollectionsMarshal.AsSpan(_str), QuoteChar, Delimiter[0]); + Current = new DataRecord(fields, TableName, Headers); + } + _str.Clear(); + + var ch = span[newLineIdx]; + _charPos += newLineIdx + 1; + if (ch == '\r' && (_charPos < _charLen || ReadBuffer() > 0) && _charBuffer[_charPos] == '\n') // 跳过CRLF + ++_charPos; + return true; + } + // 未读取到行,将缓冲区插入构建字符串 + _str.AddRange(span); + } while (ReadBuffer() > 0); + + var f = ParseRowFaster(CollectionsMarshal.AsSpan(_str), QuoteChar, Delimiter[0]); + Current = new DataRecord(f, TableName, Headers); + _str.Clear(); return true; }