MES-ETL/MesETL.App/Services/RecordQueuePool.cs
2024-02-09 23:18:34 +08:00

66 lines
1.9 KiB
C#

using System.Collections.Concurrent;
using Microsoft.Extensions.DependencyInjection;
namespace MesETL.App.Services;
public class RecordQueuePool
{
private readonly ConcurrentDictionary<string, DataRecordQueue> _queues = new();
public IReadOnlyDictionary<string, DataRecordQueue> Queues => _queues;
public void AddQueue(string key, int boundedCapacity = 200_0000, long maxCharCount = 2_147_483_648)
=> AddQueue(key, new DataRecordQueue(boundedCapacity, maxCharCount));
public void AddQueue(string key, DataRecordQueue queue)
{
if (!_queues.TryAdd(key, queue))
throw new InvalidOperationException($"请勿添加重复的队列,队列名: {key}");
}
public void RemoveQueue(string key, bool dispose = true)
{
if (!_queues.Remove(key, out var queue))
throw new InvalidOperationException($"未找到对应的队列,队列名:{key}");
if (dispose) queue.Dispose();
}
public DataRecordQueue GetQueue(string key)
{
return _queues[key];
}
public DataRecordQueue this[string key]
{
get => GetQueue(key);
set => AddQueue(key, value);
}
}
public static class MultiRecordQueueExtensions
{
public static IServiceCollection AddRecordQueuePool(this IServiceCollection services, params string[] keys)
{
var pool = new RecordQueuePool();
foreach (var key in keys)
{
pool.AddQueue(key);
}
services.AddSingleton(pool);
return services;
}
public static IServiceCollection AddRecordQueuePool(this IServiceCollection services,
params (string key, DataRecordQueue queue)[] queues)
{
var pool = new RecordQueuePool();
foreach (var (key, queue) in queues)
{
pool.AddQueue(key, queue);
}
services.AddSingleton(pool);
return services;
}
}