修复错误的试运行模式
This commit is contained in:
parent
c049e092e4
commit
ab9f7a8253
@ -65,7 +65,10 @@ public class FileInputService : IInputService
|
|||||||
var file = Path.GetFileName(info.FileName);
|
var file = Path.GetFileName(info.FileName);
|
||||||
_logger.LogInformation("正在读取文件:{FileName}, 对应的数据表:{TableName}", file, info.TableName);
|
_logger.LogInformation("正在读取文件:{FileName}, 对应的数据表:{TableName}", file, info.TableName);
|
||||||
using var source = _dataReaderFactory.CreateReader(info.FileName, info.TableName, info.Headers);
|
using var source = _dataReaderFactory.CreateReader(info.FileName, info.TableName, info.Headers);
|
||||||
var count = 0;
|
var countBuffer = 0;
|
||||||
|
|
||||||
|
if (_dryRun && _context.TableProgress.GetValueOrDefault(info.TableName, (input: 0, output: 0)).input >= _dryRunCount)
|
||||||
|
continue;
|
||||||
|
|
||||||
while (await source.ReadAsync())
|
while (await source.ReadAsync())
|
||||||
{
|
{
|
||||||
@ -78,13 +81,23 @@ public class FileInputService : IInputService
|
|||||||
}
|
}
|
||||||
var record = source.Current;
|
var record = source.Current;
|
||||||
await _producerQueue.EnqueueAsync(record);
|
await _producerQueue.EnqueueAsync(record);
|
||||||
count++;
|
countBuffer++;
|
||||||
_context.AddInput();
|
_context.AddInput();
|
||||||
if (_dryRun && count >= _dryRunCount)
|
|
||||||
|
// 避免影响性能,每1000条更新一次表输入进度
|
||||||
|
if (countBuffer >= 1000)
|
||||||
|
{
|
||||||
|
_context.AddTableInput(info.TableName, countBuffer);
|
||||||
|
countBuffer = 0;
|
||||||
|
// 试运行模式下,超出了指定行数则停止输入
|
||||||
|
if (_dryRun && _context.TableProgress[info.TableName].input >= _dryRunCount)
|
||||||
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_context.AddTableInput(info.TableName, count);
|
_context.AddTableInput(info.TableName, countBuffer);
|
||||||
_logger.LogInformation("文件 {File} 输入完成", file);
|
_logger.LogInformation("文件 {File} 输入完成", file);
|
||||||
_dataInputOptions.Value.OnTableInputCompleted?.Invoke(info.TableName);
|
_dataInputOptions.Value.OnTableInputCompleted?.Invoke(info.TableName);
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
"MemoryThreshold": 6,
|
"MemoryThreshold": 6,
|
||||||
"GCIntervalMilliseconds": -1,
|
"GCIntervalMilliseconds": -1,
|
||||||
"UnsafeVariable": true,
|
"UnsafeVariable": true,
|
||||||
"DryRun": false, // 试运行,仅输入每张表的前100000条数据
|
"DryRun": true, // 试运行,仅输入每张表的前100000条数据
|
||||||
"Logging": {
|
"Logging": {
|
||||||
"LogLevel": {
|
"LogLevel": {
|
||||||
"Default": "Trace"
|
"Default": "Trace"
|
||||||
@ -27,7 +27,7 @@
|
|||||||
"MaxAllowedPacket": 67108864,
|
"MaxAllowedPacket": 67108864,
|
||||||
"FlushCount": 10000, // 每次提交记录条数
|
"FlushCount": 10000, // 每次提交记录条数
|
||||||
"MaxDatabaseOutputTask" : 4, // 每个数据库最大提交任务数
|
"MaxDatabaseOutputTask" : 4, // 每个数据库最大提交任务数
|
||||||
"TreatJsonAsHex": false, // 将json列作为16进制格式输出(0x前缀),生产库是没有json列的
|
"TreatJsonAsHex": false, // 使Json列输出时带上"0x"前缀
|
||||||
"NoOutput": [], // 不输出的表
|
"NoOutput": [], // 不输出的表
|
||||||
"ForUpdate":
|
"ForUpdate":
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user