66 lines
1.9 KiB
C#
66 lines
1.9 KiB
C#
using System.Collections.Concurrent;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
|
|
namespace MesETL.App.Services;
|
|
|
|
public class RecordQueuePool
|
|
{
|
|
private readonly ConcurrentDictionary<string, DataRecordQueue> _queues = new();
|
|
|
|
public IReadOnlyDictionary<string, DataRecordQueue> Queues => _queues;
|
|
|
|
public void AddQueue(string key, int boundedCapacity = 200_0000, long maxCharCount = 2_147_483_648)
|
|
=> AddQueue(key, new DataRecordQueue(boundedCapacity, maxCharCount));
|
|
|
|
public void AddQueue(string key, DataRecordQueue queue)
|
|
{
|
|
if (!_queues.TryAdd(key, queue))
|
|
throw new InvalidOperationException($"请勿添加重复的队列,队列名: {key}");
|
|
}
|
|
|
|
public void RemoveQueue(string key, bool dispose = true)
|
|
{
|
|
if (!_queues.Remove(key, out var queue))
|
|
throw new InvalidOperationException($"未找到对应的队列,队列名:{key}");
|
|
if (dispose) queue.Dispose();
|
|
}
|
|
|
|
public DataRecordQueue GetQueue(string key)
|
|
{
|
|
return _queues[key];
|
|
}
|
|
|
|
public DataRecordQueue this[string key]
|
|
{
|
|
get => GetQueue(key);
|
|
set => AddQueue(key, value);
|
|
}
|
|
}
|
|
|
|
public static class MultiRecordQueueExtensions
|
|
{
|
|
public static IServiceCollection AddRecordQueuePool(this IServiceCollection services, params string[] keys)
|
|
{
|
|
var pool = new RecordQueuePool();
|
|
foreach (var key in keys)
|
|
{
|
|
pool.AddQueue(key);
|
|
}
|
|
|
|
services.AddSingleton(pool);
|
|
return services;
|
|
}
|
|
|
|
public static IServiceCollection AddRecordQueuePool(this IServiceCollection services,
|
|
params (string key, DataRecordQueue queue)[] queues)
|
|
{
|
|
var pool = new RecordQueuePool();
|
|
foreach (var (key, queue) in queues)
|
|
{
|
|
pool.AddQueue(key, queue);
|
|
}
|
|
|
|
services.AddSingleton(pool);
|
|
return services;
|
|
}
|
|
} |