MES-ETL/MesETL.App/Services/DataRecordQueue.cs
2817212736@qq.com b20c56640f 多项新特性和更改:
- 添加模拟数据生成器;
- GC时添加碎片整理;
- 优化日志输出,添加更多DEBUG监控项目;
- 修复输出时分库配置逻辑的严重错误;
- 优化了少许内存性能,减少Lambda闭包分配;
2024-12-20 10:43:05 +08:00

77 lines
2.1 KiB
C#

using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
namespace MesETL.App.Services;
/// <summary>
/// 数据队列
/// </summary>
public class DataRecordQueue : IDisposable
{
private readonly BlockingCollection<DataRecord> _queue;
private long _currentCharCount;
private readonly long _maxCharCount = 2_147_483_648; // 4GiB
public int Count => _queue.Count;
public bool IsCompleted => _queue.IsCompleted;
public bool IsAddingCompleted => _queue.IsAddingCompleted;
public long LongestFieldCharCount { get; private set; }
public event Action? OnRecordWrite;
public event Action? OnRecordRead;
public DataRecordQueue() : this(500_000, 2_147_483_648) // 默认容量最大500K
{
}
public DataRecordQueue(int boundedCapacity, long maxCharCount)
{
_queue = new BlockingCollection<DataRecord>(boundedCapacity);
_maxCharCount = maxCharCount;
}
public void CompleteAdding() => _queue.CompleteAdding();
public bool TryDequeue([MaybeNullWhen(false)] out DataRecord record)
{
if (_queue.TryTake(out record))
{
// if (record.Database is not null)
// {
// Console.WriteLine("out " + record.Database);
// }
Interlocked.Add(ref _currentCharCount, -record.FieldCharCount);
OnRecordRead?.Invoke();
return true;
}
return false;
}
public async Task EnqueueAsync(DataRecord record)
{
var charCount = record.FieldCharCount;
LongestFieldCharCount = Math.Max(LongestFieldCharCount, charCount);
if (_currentCharCount + charCount > _maxCharCount)
{
// 不用Task.WaitUntil是为了防止产生Lambda闭包
while (!(_currentCharCount + charCount < _maxCharCount))
{
await Task.Delay(50);
}
}
_queue.Add(record);
Interlocked.Add(ref _currentCharCount, charCount);
OnRecordWrite?.Invoke();
}
public void Dispose()
{
_queue.Dispose();
}
}