This commit is contained in:
2024-02-09 13:41:40 +08:00
parent 41a1dc8a4f
commit 913c725fe1
4 changed files with 15 additions and 9 deletions

View File

@@ -595,15 +595,17 @@ async Task RunProgram()
host.Services.AddDataSourceFactory();
host.Services.AddErrorRecorderFactory();
host.Services.AddSingleton<ProcessContext>();
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer, new DataRecordQueue(200_000));
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(60_000))).ToArray());
var prodLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ProducerQueueLength");
var consLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ConsumerQueueLength");
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer, new DataRecordQueue(prodLen));
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys.Select(key => (key:key, queue:new DataRecordQueue(consLen))).ToArray());
host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();
host.Services.AddHostedService<MainHostedService>();
host.Services.AddSingleton<IInputService, FileInputService>();
host.Services.AddSingleton<ITransformService, TransformService>();
host.Services.AddSingleton<IOutputService, OutputService>();
host.Services.AddSingleton<IOutputService, VoidOutputService>();
host.Services.AddSingleton<TaskMonitorService>();
host.Services.AddRedisCache(redisOptions);
var app = host.Build();

View File

@@ -49,7 +49,7 @@ public class CsvReader : IDataReader
if (string.IsNullOrWhiteSpace(str))
return false;
var fields = ParseRow(str, QuoteChar, Delimiter[0]);
var fields = ParseRowFaster(str, QuoteChar, Delimiter[0]);
Current = new DataRecord(fields, TableName, Headers);
return true;
}

View File

@@ -11,9 +11,9 @@
},
"Transform":{
"StrictMode": false, // 设为true时如果数据转换发生错误立刻停止程序
"EnableFilter": true, // 启用数据过滤
"EnableReplacer": true, // 启用数据修改
"EnableReBuilder": true, // 启用数据重建
"EnableFilter": false, // 启用数据过滤
"EnableReplacer": false, // 启用数据修改
"EnableReBuilder": false, // 启用数据重建
"CleanDate": "202301" // 当数据过滤开启时,删除这个时间之前的数据
},
"Output":{
@@ -23,9 +23,13 @@
"MaxDatabaseOutputTask" : 4, // 每个数据库最大提交任务数
"TreatJsonAsHex": false // 将json列作为16进制格式输出(0x前缀)生产库是没有json列的
},
"RecordQueue":{
"ProducerQueueLength": 50000, // 输入队列最大长度
"ConsumerQueueLength": 10000, // 每个输出队列最大长度
},
"RedisCache": {
"Configuration": "192.168.1.246:6380",
"InstanceName" : "mes-etl:"
"InstanceName" : "mes-etl:"
},
"TenantDb": // 分库配置
{

View File

@@ -38,7 +38,7 @@ public class Test
while (!reader.EndOfStream)
{
var str = await reader.ReadLineAsync();
// char a;
char a;
// foreach (var c in str)
// {
// a = c;