MES-ETL/MesETL.App/Services/ETL/ZstReader.cs

101 lines
3.4 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Runtime.InteropServices;
using System.Text;
using Microsoft.Extensions.Logging;
using ZstdSharp;
namespace MesETL.App.Services.ETL;
/// <summary>
/// 解压ZST文件从中读取CSV数据
/// </summary>
public class ZstReader : CsvReader
{
protected new readonly Lazy<StreamReader> Reader;
private Stream? _stream;
private readonly List<char> _str = new(1024);
private readonly char[] _charBuffer = new char[1024];
private int _charLen = 0;
private int _charPos = 0;
public ZstReader(string filePath, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null)
: base(filePath, tableName, headers, delimiter, quoteChar, logger)
{
Reader = new Lazy<StreamReader>(() =>
{
_stream = new DecompressionStream(File.OpenRead(filePath));
return new StreamReader(_stream);
}, false);
ReadBuffer();
}
public ZstReader(Stream stream, string tableName, string[] headers, string delimiter = ",", char quoteChar = '\"', ILogger? logger = null)
: base(stream, tableName, headers, delimiter, quoteChar, logger)
{
var ds = new DecompressionStream(stream);
Reader = new Lazy<StreamReader>(() => new StreamReader(ds), false);
ReadBuffer();
}
private int ReadBuffer()
{
_charLen = _charPos = 0;
_charLen = Reader.Value.ReadBlock(_charBuffer);
return _charLen;
}
public override async ValueTask<bool> ReadAsync()
{
// 缓冲区已经读取完毕并且流状态为EOF
if (_charPos == _charLen && ReadBuffer() == 0)
return false;
do
{
// 读取缓冲区
var span = _charBuffer.AsSpan(_charPos, _charLen - _charPos);
var newLineIdx = span.IndexOfAny('\r', '\n');
// 读取到行,结合当前构建字符串转换进行转换
if (newLineIdx >= 0)
{
if (_str.Count == 0) // => 可以提高一点性能...
{
var fields = ParseRowFaster(span[..newLineIdx], QuoteChar, Delimiter[0]);
Current = new DataRecord(fields, TableName, Headers);
}
else
{
_str.AddRange(span[..newLineIdx]);
var fields = ParseRowFaster(CollectionsMarshal.AsSpan(_str), QuoteChar, Delimiter[0]);
Current = new DataRecord(fields, TableName, Headers);
}
_str.Clear();
var ch = span[newLineIdx];
_charPos += newLineIdx + 1;
if (ch == '\r' && (_charPos < _charLen || ReadBuffer() > 0) && _charBuffer[_charPos] == '\n') // 跳过CRLF
++_charPos;
return true;
}
// 未读取到行,将缓冲区插入构建字符串
_str.AddRange(span);
} while (ReadBuffer() > 0);
var f = ParseRowFaster(CollectionsMarshal.AsSpan(_str), QuoteChar, Delimiter[0]);
Current = new DataRecord(f, TableName, Headers);
_str.Clear();
return true;
}
public override void Dispose()
{
base.Dispose();
if (Reader.IsValueCreated)
{
Reader.Value.Dispose();
_stream?.Dispose();
}
}
}