From b34ac104ef5388bb26f17ca5eff27afce78a9694 Mon Sep 17 00:00:00 2001 From: "2817212736@qq.com" <2817212736@qq.com> Date: Wed, 11 Dec 2024 18:08:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E8=BF=9BZstReader=E8=AF=BB=E5=8F=96?= =?UTF-8?q?=E6=96=B9=E6=B3=95=EF=BC=8C=E5=A4=A7=E5=B9=85=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=86=85=E5=AD=98=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- MesETL.App/Services/ETL/ZstReader.cs | 60 +++++++++++++++++++++++++--- 1 file changed, 54 insertions(+), 6 deletions(-) diff --git a/MesETL.App/Services/ETL/ZstReader.cs b/MesETL.App/Services/ETL/ZstReader.cs index c9704f3..f3435c0 100644 --- a/MesETL.App/Services/ETL/ZstReader.cs +++ b/MesETL.App/Services/ETL/ZstReader.cs @@ -1,4 +1,6 @@ -using Microsoft.Extensions.Logging; +using System.Runtime.InteropServices; +using System.Text; +using Microsoft.Extensions.Logging; using ZstdSharp; namespace MesETL.App.Services.ETL; @@ -11,7 +13,12 @@ public class ZstReader : CsvReader protected new readonly Lazy Reader; private Stream? _stream; - + private readonly List _str = new(1024); + private readonly char[] _charBuffer = new char[1024]; + private int _charLen = 0; + private int _charPos = 0; + + public ZstReader(string filePath, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null) : base(filePath, tableName, headers, delimiter, quoteChar, logger) { @@ -20,6 +27,7 @@ public class ZstReader : CsvReader _stream = new DecompressionStream(File.OpenRead(filePath)); return new StreamReader(_stream); }, false); + ReadBuffer(); } public ZstReader(Stream stream, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null) @@ -27,16 +35,56 @@ public class ZstReader : CsvReader { var ds = new DecompressionStream(stream); Reader = new Lazy(() => new StreamReader(ds), false); + ReadBuffer(); + } + + private int ReadBuffer() + { + _charLen = _charPos = 0; + _charLen = Reader.Value.ReadBlock(_charBuffer); + return _charLen; } public override async ValueTask ReadAsync() { - var str = await Reader.Value.ReadLineAsync(); - if (string.IsNullOrWhiteSpace(str)) + // 缓冲区已经读取完毕,并且流状态为EOF + if (_charPos == _charLen && ReadBuffer() == 0) return false; - var fields = ParseRowFaster(str, QuoteChar, Delimiter[0]); - Current = new DataRecord(fields, TableName, Headers); + do + { + // 读取缓冲区 + var span = _charBuffer.AsSpan(_charPos, _charLen - _charPos); + var newLineIdx = span.IndexOfAny('\r', '\n'); + // 读取到行,结合当前构建字符串转换进行转换 + if (newLineIdx >= 0) + { + if (_str.Count == 0) // => 可以提高一点性能... + { + var fields = ParseRowFaster(span[..newLineIdx], QuoteChar, Delimiter[0]); + Current = new DataRecord(fields, TableName, Headers); + } + else + { + _str.AddRange(span[..newLineIdx]); + var fields = ParseRowFaster(CollectionsMarshal.AsSpan(_str), QuoteChar, Delimiter[0]); + Current = new DataRecord(fields, TableName, Headers); + } + _str.Clear(); + + var ch = span[newLineIdx]; + _charPos += newLineIdx + 1; + if (ch == '\r' && (_charPos < _charLen || ReadBuffer() > 0) && _charBuffer[_charPos] == '\n') // 跳过CRLF + ++_charPos; + return true; + } + // 未读取到行,将缓冲区插入构建字符串 + _str.AddRange(span); + } while (ReadBuffer() > 0); + + var f = ParseRowFaster(CollectionsMarshal.AsSpan(_str), QuoteChar, Delimiter[0]); + Current = new DataRecord(f, TableName, Headers); + _str.Clear(); return true; }