Compare commits

...

2 Commits

Author SHA1 Message Date
77a3909160 改进模拟数据生成器,代码质量优化 2024-12-25 15:09:16 +08:00
4986c60416 修复流水号缓存服务的并行错误;
修复输出线程没有捕获异常的严重错误;
2024-12-20 17:04:19 +08:00
12 changed files with 197 additions and 75 deletions

View File

@ -114,6 +114,7 @@ public class MainHostedService : BackgroundService
{ {
var connStr = _databaseOptions.Value.ConnectionString var connStr = _databaseOptions.Value.ConnectionString
?? throw new ApplicationException("分库配置中没有配置数据库"); ?? throw new ApplicationException("分库配置中没有配置数据库");
_logger.LogWarning("已开启MySQL延迟写入功能并禁用重做日志请注意数据安全");
if (enable) if (enable)
{ {
await DatabaseHelper.NonQueryAsync(connStr, await DatabaseHelper.NonQueryAsync(connStr,

View File

@ -56,7 +56,18 @@ public class OutputService : IOutputService
if (!dbTasks.ContainsKey(db)) if (!dbTasks.ContainsKey(db))
{ {
dbTasks.Add(db, await dbTaskManager.CreateTaskAsync( dbTasks.Add(db, await dbTaskManager.CreateTaskAsync(
async () => await StartDatabaseWorker(db, queue, ct), ct)); async () =>
{
try
{
await StartDatabaseWorker(db, queue, ct);
}
catch (Exception e)
{
_logger.LogError(e, "输出线程发生错误");
_queuePool.RemoveQueue(db);
}
}, ct));
} }
} }
@ -153,6 +164,6 @@ public class OutputService : IOutputService
_context.AddOutput(value); _context.AddOutput(value);
_context.AddTableOutput(key, value); _context.AddTableOutput(key, value);
} }
_logger.LogTrace("输出任务:刷新了 {Count} 条记录", tableOutput.Values.Sum(i => i)); // _logger.LogTrace("输出任务:刷新了 {Count} 条记录", tableOutput.Values.Sum(i => i));
} }
} }

View File

@ -48,26 +48,92 @@ public class TransformService : ITransformService
public async Task ExecuteAsync(CancellationToken cancellationToken) public async Task ExecuteAsync(CancellationToken cancellationToken)
{ {
_logger.LogInformation("***** 数据转换服务已启动, 当前线程ID: {ThreadId} *****", Environment.CurrentManagedThreadId); _logger.LogInformation("***** 数据转换服务已启动 *****");
// var tasks = new List<Task>();
// for (int i = 0; i < 4; i++)
// {
// tasks.Add(Task.Run(TransformWorker, cancellationToken));
// }
//
// await Task.WhenAll(tasks);
await TransformWorker();
await TransformWorker2();
_context.CompleteTransform();
_logger.LogInformation("***** 数据转换服务执行完毕 *****"); _logger.LogInformation("***** 数据转换服务执行完毕 *****");
} }
public async Task TransformWorker() public async Task TransformWorker(DataRecordQueue queue)
{ {
while (!_context.IsInputCompleted || _producerQueue.Count > 0) while (!_context.IsInputCompleted || _producerQueue.Count > 0)
{ {
if (!_producerQueue.TryDequeue(out var record)) if (!_producerQueue.TryDequeue(out var record))
{ {
await Task.Delay(100);
continue;
}
try
{
var context = new DataTransformContext(record, _cache, _logger, _services);
if (_options.Value.EnableFilter)
{
// 数据过滤
var filter = _options.Value.RecordFilter;
if (filter is not null && await filter(context) == false) continue;
}
if (_options.Value.EnableReplacer)
{
// 数据替换
var replacer = _options.Value.RecordModify;
if (replacer is not null)
{
record = await replacer(context);
}
}
// 字段缓存
var cacher = _options.Value.RecordCache;
if(cacher is not null)
await cacher.Invoke(context);
//计算需要分流的数据库
var dbFilter = _options.Value.DatabaseFilter
?? throw new ApplicationException("未配置数据库过滤器");
record.Database = dbFilter(record);
if (_options.Value.EnableReBuilder)
{
//数据重建
var addRecords = _options.Value.RecordReBuild?.Invoke(context);
if (addRecords is { Count: > 0 })
{
foreach (var rc in addRecords)
{
if(dbFilter is not null)
rc.Database = dbFilter.Invoke(record);
await queue.EnqueueAsync(rc);
_context.AddTransform();
}
}
}
await queue.EnqueueAsync(record);
_context.AddTransform();
}
catch (Exception e)
{
_context.AddException(e);
var errorRecorder = _errorRecorderFactory.CreateTransform();
await errorRecorder.LogErrorRecordAsync(record, e);
if (!_options.Value.StrictMode)
_logger.LogError(e, "数据转换时发生错误");
else throw;
}
}
}
public async Task TransformWorker2()
{
while (!_context.IsInputCompleted || _producerQueue.Count > 0)
{
if (!_producerQueue.TryDequeue(out var record))
{
await Task.Delay(100);
continue; continue;
} }

View File

@ -124,6 +124,8 @@ async Task RunProgram()
} }
break; break;
} }
#if USE_TEST_DB
// 测试环境的OrderExtra表没有分区故按照SharedKey清理
// 清理ShardKey < 24010的 // 清理ShardKey < 24010的
case TableNames.OrderExtra: case TableNames.OrderExtra:
{ {
@ -133,7 +135,8 @@ async Task RunProgram()
return false; return false;
} }
break; break;
} }
#endif
// 清理(Status != 0 || Deleted = 1) && ID前四位 < 2401的 // 清理(Status != 0 || Deleted = 1) && ID前四位 < 2401的
case TableNames.OrderScrapBoard: case TableNames.OrderScrapBoard:
{ {
@ -214,16 +217,6 @@ async Task RunProgram()
break; break;
} }
// 移除ViewFileName列
case TableNames.OrderModule:
{
#if USE_TEST_DB
if (record.HeaderExists("ViewFileName"))
#endif
record.RemoveField("ViewFileName");
break;
}
#if USE_TEST_DB
// 删除ID列让数据库自行递增 // 删除ID列让数据库自行递增
// TODO: 数据表改进删除ID列或是替换为流水号 // TODO: 数据表改进删除ID列或是替换为流水号
case TableNames.ProcessStepEfficiency: case TableNames.ProcessStepEfficiency:
@ -236,17 +229,18 @@ async Task RunProgram()
record.RemoveField("ID"); record.RemoveField("ID");
break; break;
} }
case TableNames.SysConfig:
{
record.RemoveField("Key");
break;
}
#if USE_TEST_DB
// 测试环境忽略PlaceData列生产环境会提前将其移除 // 测试环境忽略PlaceData列生产环境会提前将其移除
case TableNames.SimplePlanOrder: case TableNames.SimplePlanOrder:
{ {
record.RemoveField("PlaceData"); record.RemoveField("PlaceData");
break; break;
} }
case TableNames.SysConfig:
{
record.RemoveField("Key");
break;
}
#endif #endif
default: break; default: break;
} }
@ -348,13 +342,15 @@ async Task RunProgram()
host.Services.AddLogging(builder => host.Services.AddLogging(builder =>
{ {
builder.ClearProviders(); builder.ClearProviders();
builder.AddSerilog(new LoggerConfiguration() var logger = new LoggerConfiguration()
.MinimumLevel.Verbose()
.WriteTo.Console() .WriteTo.Console()
.WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"), .WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"),
restrictedToMinimumLevel:LogEventLevel.Error) restrictedToMinimumLevel: LogEventLevel.Error)
// .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用 // .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
.CreateLogger() .CreateLogger();
); builder.AddSerilog(logger);
Log.Logger = logger;
}); });
host.Services.AddDataSourceFactory(); host.Services.AddDataSourceFactory();
@ -372,7 +368,7 @@ async Task RunProgram()
host.Services.AddHostedService<MainHostedService>(); host.Services.AddHostedService<MainHostedService>();
host.Services.AddSingleton<IInputService, FileInputService>(); host.Services.AddSingleton<IInputService, FileInputService>();
host.Services.AddSingleton<ITransformService, TransformService>(); host.Services.AddSingleton<ITransformService, TransformService>();
host.Services.AddSingleton<IOutputService, OutputService>(); host.Services.AddSingleton<IOutputService, VoidOutputService>();
host.Services.AddSingleton<TaskMonitorService>(); host.Services.AddSingleton<TaskMonitorService>();
// host.Services.AddRedisCache(redisOptions); // host.Services.AddRedisCache(redisOptions);
host.Services.AddSingleton<ICacher, MemoryCache>(); host.Services.AddSingleton<ICacher, MemoryCache>();

View File

@ -64,7 +64,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
if (_recordCache.Count == 0) if (_recordCache.Count == 0)
return; return;
var cmd = _conn.CreateCommand(); await using var cmd = _conn.CreateCommand();
cmd.CommandTimeout = 0; cmd.CommandTimeout = 0;
try try
@ -255,11 +255,13 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
{ {
_conn.Close(); _conn.Close();
_conn.Dispose(); _conn.Dispose();
_recordCache.Clear();
} }
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()
{ {
await _conn.CloseAsync(); await _conn.CloseAsync();
await _conn.DisposeAsync(); await _conn.DisposeAsync();
_recordCache.Clear();
} }
} }

View File

@ -1,3 +1,4 @@
using System.Collections.Concurrent;
using System.Diagnostics; using System.Diagnostics;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Text; using System.Text;
@ -11,7 +12,7 @@ namespace MesETL.App.Services.Seq;
public class SeqService public class SeqService
{ {
private readonly string _connectionString; private readonly string _connectionString;
private readonly Dictionary<SeqConfig, long> _cachedSequence; private readonly ConcurrentDictionary<SeqConfig, long> _cachedSequence;
public IReadOnlyDictionary<SeqConfig, long> CachedSequence => _cachedSequence; public IReadOnlyDictionary<SeqConfig, long> CachedSequence => _cachedSequence;
@ -24,7 +25,7 @@ public class SeqService
}; };
_connectionString = builder.ConnectionString; _connectionString = builder.ConnectionString;
_cachedSequence = new Dictionary<SeqConfig, long>(); _cachedSequence = new ConcurrentDictionary<SeqConfig, long>();
} }
private async Task<long> UpdateSequenceID(string name,int step,long max,bool recycle, int add) private async Task<long> UpdateSequenceID(string name,int step,long max,bool recycle, int add)
@ -99,9 +100,9 @@ public class SeqService
/// 移除一个缓存的流水号 /// 移除一个缓存的流水号
/// </summary> /// </summary>
/// <param name="config"></param> /// <param name="config"></param>
public void RemoveCachedSeq(SeqConfig config) public bool RemoveCachedSeq(SeqConfig config)
{ {
_cachedSequence.Remove(config); return _cachedSequence.Remove(config, out _);
} }
/// <summary> /// <summary>

View File

@ -1,4 +1,5 @@
using ApplicationException = System.ApplicationException; using Serilog;
using ApplicationException = System.ApplicationException;
using TaskExtensions = MesETL.Shared.Helper.TaskExtensions; using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
namespace MesETL.App.Services; namespace MesETL.App.Services;
@ -37,6 +38,8 @@ public class TaskManager
{ {
var task = Task.Run(async () => var task = Task.Run(async () =>
{ {
// Log.Logger.Verbose("[任务管理器] 新的任务已创建");
Interlocked.Increment(ref _runningTaskCount);
try try
{ {
await func(); await func();
@ -45,13 +48,13 @@ public class TaskManager
catch(Exception ex) catch(Exception ex)
{ {
OnException?.Invoke(ex); OnException?.Invoke(ex);
Log.Logger.Error(ex, "[任务管理器] 执行任务时出错");
} }
finally finally
{ {
Interlocked.Decrement(ref _runningTaskCount); Interlocked.Decrement(ref _runningTaskCount);
} }
}, cancellationToken); }, cancellationToken);
Interlocked.Increment(ref _runningTaskCount);
return task; return task;
} }
@ -59,8 +62,10 @@ public class TaskManager
{ {
var task = Task.Factory.StartNew(async obj => // 性能考虑这个lambda中不要捕获任何外部变量! var task = Task.Factory.StartNew(async obj => // 性能考虑这个lambda中不要捕获任何外部变量!
{ {
// Log.Logger.Verbose("[任务管理器] 新的任务已创建");
if (obj is not Tuple<Func<object?, Task>, object?> tuple) if (obj is not Tuple<Func<object?, Task>, object?> tuple)
throw new ApplicationException("这个异常不该出现"); throw new ApplicationException("这个异常不该出现");
Interlocked.Increment(ref _runningTaskCount);
try try
{ {
await tuple.Item1(tuple.Item2); await tuple.Item1(tuple.Item2);
@ -69,13 +74,13 @@ public class TaskManager
catch(Exception ex) catch(Exception ex)
{ {
OnException?.Invoke(ex); OnException?.Invoke(ex);
Log.Logger.Error(ex, "[任务管理器] 执行任务时出错");
} }
finally finally
{ {
Interlocked.Decrement(ref _runningTaskCount); Interlocked.Decrement(ref _runningTaskCount);
} }
}, Tuple.Create(func, arg), cancellationToken).Unwrap(); }, Tuple.Create(func, arg), cancellationToken).Unwrap();
Interlocked.Increment(ref _runningTaskCount);
return task; return task;
} }
} }

View File

@ -1,10 +1,10 @@
{ {
"MemoryThreshold": 8, "MemoryThreshold": 6,
"GCIntervalMilliseconds": -1, "GCIntervalMilliseconds": -1,
"UnsafeVariable": false, "UnsafeVariable": true,
"Logging": { "Logging": {
"LogLevel": { "LogLevel": {
"Default": "Debug" "Default": "Trace"
} }
}, },
"Input":{ "Input":{
@ -33,8 +33,8 @@
} }
}, },
"RecordQueue":{ "RecordQueue":{
"ProducerQueueLength": 50000, // "ProducerQueueLength": 20000, //
"ConsumerQueueLength": 10000, // "ConsumerQueueLength": 20000, //
"MaxByteCount": 3221225472 // "MaxByteCount": 3221225472 //
}, },
"RedisCache": { "RedisCache": {
@ -44,7 +44,7 @@
"TenantDb": // "TenantDb": //
{ {
"TenantKey" : "CompanyID", "TenantKey" : "CompanyID",
"UseDbGroup": "prod", "UseDbGroup": "mock",
"DbGroups": { "DbGroups": {
"test": { "test": {
"cferp_test_1": 1000, "cferp_test_1": 1000,
@ -58,6 +58,20 @@
"mesdb_4": 15000, "mesdb_4": 15000,
"mesdb_5": 20000, "mesdb_5": 20000,
"mesdb_6": 2147483647 "mesdb_6": 2147483647
},
"mock":{
"mesdb_1": 5000,
"mesdb_2": 10000,
"mesdb_3": 15000,
"mesdb_4": 20000,
"mesdb_5": 2147483647
},
"mock_void":{
"mesdb_1_void": 5000,
"mesdb_2_void": 10000,
"mesdb_3_void": 15000,
"mesdb_4_void": 20000,
"mesdb_5_void": 2147483647
} }
} }
} }

View File

@ -177,7 +177,6 @@ public class DatabaseToolBox
[InlineData("mesdb_3")] [InlineData("mesdb_3")]
[InlineData("mesdb_4")] [InlineData("mesdb_4")]
[InlineData("mesdb_5")] [InlineData("mesdb_5")]
[InlineData("mesdb_6")]
public async Task TruncateAllTable(string database) public async Task TruncateAllTable(string database)
{ {
var tables = await DatabaseHelper.QueryTableAsync(ConnStr, var tables = await DatabaseHelper.QueryTableAsync(ConnStr,

View File

@ -1,6 +1,8 @@
using System.Runtime;
using MesETL.App.Const; using MesETL.App.Const;
using MesETL.App.HostedServices.Abstractions; using MesETL.App.HostedServices.Abstractions;
using MesETL.App.Services; using MesETL.App.Services;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
@ -13,14 +15,16 @@ public class MockInputService : IInputService
private readonly ProcessContext _context; private readonly ProcessContext _context;
private readonly IOptions<MockInputOptions> _options; private readonly IOptions<MockInputOptions> _options;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly long _memoryThreshold;
public MockInputService([FromKeyedServices(ConstVar.Producer)]DataRecordQueue producerQueue, ProcessContext context, IOptions<MockInputOptions> options, public MockInputService([FromKeyedServices(ConstVar.Producer)]DataRecordQueue producerQueue, ProcessContext context, IOptions<MockInputOptions> options,
ILogger<MockInputService> logger) ILogger<MockInputService> logger, IConfiguration configuration)
{ {
_producerQueue = producerQueue; _producerQueue = producerQueue;
_context = context; _context = context;
_options = options; _options = options;
_logger = logger; _logger = logger;
_memoryThreshold = (long)(configuration.GetValue<double>("MemoryThreshold", 8) * 1024 * 1024 * 1024);
} }
public async Task ExecuteAsync(CancellationToken cancellationToken) public async Task ExecuteAsync(CancellationToken cancellationToken)
@ -32,6 +36,13 @@ public class MockInputService : IInputService
for (int i = 0; i < options.Amount; i++) for (int i = 0; i < options.Amount; i++)
{ {
if (GC.GetTotalMemory(false) > _memoryThreshold)
{
_logger.LogWarning("内存使用率过高,暂缓输入");
GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
GC.Collect();
await Task.Delay(3000, cancellationToken);
}
var ctx = new TableMockContext() var ctx = new TableMockContext()
{ {
Index = i, Index = i,

File diff suppressed because one or more lines are too long

View File

@ -27,21 +27,23 @@ async Task RunProgram()
{ {
ThreadPool.SetMaxThreads(200, 200); ThreadPool.SetMaxThreads(200, 200);
var host = Host.CreateApplicationBuilder(args); var host = Host.CreateApplicationBuilder(args);
var dbGroup = new Dictionary<string, int>() var inputOptions = host.Configuration.GetRequiredSection("Input").Get<DataInputOptions>()
{ ?? throw new ApplicationException("缺少Input配置");
{ "mesdb_1", 5000 },
{ "mesdb_2", 10000 }, var transformOptions = host.Configuration.GetRequiredSection("Transform").Get<DataTransformOptions>()
{ "mesdb_3", 15000 }, ?? throw new ApplicationException("缺少Transform配置");
{ "mesdb_4", 20000 },
{ "mesdb_5", 2147483647 }, var outputOptions = host.Configuration.GetRequiredSection("Output").Get<DatabaseOutputOptions>()
}; ?? throw new ApplicationException("缺少Output配置");
var tenantDbSection = host.Configuration.GetRequiredSection("TenantDb");
var tenantDbOptions = new TenantDbOptions() var tenantDbOptions = new TenantDbOptions()
{ {
TenantKey = "CompanyID", TenantKey = tenantDbSection.GetValue<string>(nameof(TenantDbOptions.TenantKey)) ??
DbGroup = dbGroup, throw new ApplicationException("分库配置缺少分库键TenantKey"),
UseDbGroup = "Prod", UseDbGroup = tenantDbSection.GetValue<string>(nameof(TenantDbOptions.UseDbGroup)) ??
throw new ApplicationException("分库配置缺少使用分库组UseDbGroup")
}; };
host.Services.Configure<TenantDbOptions>(options => host.Services.Configure<TenantDbOptions>(options =>
{ {
@ -49,7 +51,10 @@ async Task RunProgram()
options.DbGroup = tenantDbOptions.DbGroup; options.DbGroup = tenantDbOptions.DbGroup;
options.UseDbGroup = tenantDbOptions.UseDbGroup; options.UseDbGroup = tenantDbOptions.UseDbGroup;
}); });
tenantDbOptions.DbGroup = tenantDbSection.GetRequiredSection($"DbGroups:{tenantDbOptions.UseDbGroup}")
.Get<Dictionary<string, int>>()
?? throw new ApplicationException($"分库配置无法解析分库组{tenantDbOptions.UseDbGroup},请检查配置");
host.Services.Configure<MockInputOptions>(options => host.Services.Configure<MockInputOptions>(options =>
{ {
const float Multiplexer = 0.01F; const float Multiplexer = 0.01F;
@ -206,6 +211,11 @@ async Task RunProgram()
host.Services.Configure<DataTransformOptions>(options => host.Services.Configure<DataTransformOptions>(options =>
{ {
options.StrictMode = transformOptions.StrictMode;
options.EnableFilter = transformOptions.EnableFilter;
options.EnableReplacer = transformOptions.EnableReplacer;
options.EnableReBuilder = transformOptions.EnableReBuilder;
options.DatabaseFilter = record => options.DatabaseFilter = record =>
{ {
var companyId = int.Parse(record[tenantDbOptions.TenantKey]); // 每个实体都应存在CompanyID否则异常 var companyId = int.Parse(record[tenantDbOptions.TenantKey]); // 每个实体都应存在CompanyID否则异常
@ -215,11 +225,16 @@ async Task RunProgram()
host.Services.Configure<DatabaseOutputOptions>(options => host.Services.Configure<DatabaseOutputOptions>(options =>
{ {
options.ConnectionString = "Server=192.168.1.246;Port=3333;UserId=root;Password=123456;"; options.ConnectionString = outputOptions.ConnectionString;
options.FlushCount = 10000; options.FlushCount = outputOptions.FlushCount;
options.MaxAllowedPacket = 67108864; options.MaxAllowedPacket = outputOptions.MaxAllowedPacket / 2;
options.MaxDatabaseOutputTask = 4; options.MaxDatabaseOutputTask = outputOptions.MaxDatabaseOutputTask;
options.TreatJsonAsHex = outputOptions.TreatJsonAsHex;
options.NoOutput = outputOptions.NoOutput;
options.ForUpdate = outputOptions.ForUpdate;
// 配置列的类型以便于在输出时区分二进制内容
// Prod server
options.ColumnTypeConfig = new Dictionary<string, ColumnType> options.ColumnTypeConfig = new Dictionary<string, ColumnType>
{ {
{ "machine.Settings", ColumnType.Text }, { "machine.Settings", ColumnType.Text },
@ -253,14 +268,15 @@ async Task RunProgram()
host.Services.AddLogging(builder => host.Services.AddLogging(builder =>
{ {
builder.ClearProviders(); builder.ClearProviders();
builder.AddSerilog(new LoggerConfiguration() var logger = new LoggerConfiguration()
.MinimumLevel.Debug() .MinimumLevel.Verbose()
.WriteTo.Console() .WriteTo.Console()
.WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"), .WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"),
restrictedToMinimumLevel: LogEventLevel.Error) restrictedToMinimumLevel: LogEventLevel.Error)
// .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用 // .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
.CreateLogger() .CreateLogger();
); builder.AddSerilog(logger);
Log.Logger = logger;
}); });
host.Services.AddDataSourceFactory(); host.Services.AddDataSourceFactory();
@ -271,7 +287,7 @@ async Task RunProgram()
var consLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ConsumerQueueLength"); var consLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ConsumerQueueLength");
var maxCharCount = host.Configuration.GetRequiredSection("RecordQueue").GetValue<long>("MaxByteCount") / 2; var maxCharCount = host.Configuration.GetRequiredSection("RecordQueue").GetValue<long>("MaxByteCount") / 2;
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer, new DataRecordQueue(prodLen, maxCharCount)); host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer, new DataRecordQueue(prodLen, maxCharCount));
host.Services.AddRecordQueuePool(dbGroup.Keys host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys
.Select(key => (key: key, queue: new DataRecordQueue(consLen, maxCharCount))).ToArray()); .Select(key => (key: key, queue: new DataRecordQueue(consLen, maxCharCount))).ToArray());
// host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>(); // host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>(); host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();