This commit is contained in:
lindj 2024-01-19 13:47:35 +08:00
parent 45ad15a065
commit e3f6ecbd91
7 changed files with 80 additions and 61 deletions

View File

@ -27,7 +27,7 @@ public class InputService : IInputService
_context = context; _context = context;
} }
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, ProcessContext context,CancellationToken cancellationToken) public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, ProcessContext context, CancellationToken cancellationToken)
{ {
var inputDir = _dataInputOptions.Value.InputDir; var inputDir = _dataInputOptions.Value.InputDir;
_logger.LogInformation("***** Csv input service start, working dir: {InputDir}, thread id: {ThreadId} *****", inputDir, Environment.CurrentManagedThreadId); _logger.LogInformation("***** Csv input service start, working dir: {InputDir}, thread id: {ThreadId} *****", inputDir, Environment.CurrentManagedThreadId);

View File

@ -75,36 +75,38 @@ public class MainHostedService : BackgroundService
{"order_item",new TableInfo{SimulaRowCount=1345520079 }}, {"order_item",new TableInfo{SimulaRowCount=1345520079 }},
{"simple_plan_order",new TableInfo{SimulaRowCount=351470 }},//CreateTime < 202301的删除 {"simple_plan_order",new TableInfo{SimulaRowCount=351470 }},//CreateTime < 202301的删除
}; };
taskFun(new TasksOptions { TableInfoConfig = bigTablesDic, OutPutOptions = new OutPutOptions { FlushCount = 10000, OutPutTaskCount = 2 } }, var bigTableContext = new ProcessContext();
new DataRecordQueue(), new DataRecordQueue(),new ProcessContext()); var bigTableOptions = new TasksOptions { TableInfoConfig = bigTablesDic, OutPutOptions = new OutPutOptions { FlushCount = 10000, OutPutTaskCount = 2 } };
//taskFun(bigTableOptions,new DataRecordQueue(), new DataRecordQueue(), bigTableContext);
var smallTablesDic = new Dictionary<string, TableInfo> var smallTablesDic = new Dictionary<string, TableInfo>
{ {
{"machine",new TableInfo{SimulaRowCount=14655 }}, //{"machine",new TableInfo{SimulaRowCount=14655 }},
{"order",new TableInfo{SimulaRowCount=5019216 }}, //{"order",new TableInfo{SimulaRowCount=5019216 }},
{"order_data_block",new TableInfo{SimulaRowCount=731800334 }}, //{"order_data_block",new TableInfo{SimulaRowCount=731800334 }},
{"order_data_goods",new TableInfo{SimulaRowCount=25803671 }}, //{"order_data_goods",new TableInfo{SimulaRowCount=25803671 }},
{"order_data_parts",new TableInfo{SimulaRowCount=468517543 }}, //{"order_data_parts",new TableInfo{SimulaRowCount=468517543 }},
{"order_module",new TableInfo{SimulaRowCount=103325385 }}, //{"order_module",new TableInfo{SimulaRowCount=103325385 }},
{"order_module_extra",new TableInfo{SimulaRowCount=54361321 }}, //{"order_module_extra",new TableInfo{SimulaRowCount=54361321 }},
{"order_module_item",new TableInfo{SimulaRowCount=69173339 }}, //{"order_module_item",new TableInfo{SimulaRowCount=69173339 }},
{"order_package",new TableInfo{SimulaRowCount=16196195 }}, //{"order_package",new TableInfo{SimulaRowCount=16196195 }},
{"order_process",new TableInfo{SimulaRowCount=3892685 }},//orderNo < 202301的 {"order_process",new TableInfo{SimulaRowCount=3892685 }},//orderNo < 202301的
{"order_process_step",new TableInfo{SimulaRowCount=8050349 }},//orderNo < 202301的删除 {"order_process_step",new TableInfo{SimulaRowCount=8050349 }},//orderNo < 202301的删除
{"order_process_step_item",new TableInfo{SimulaRowCount=14538058 }},//orderNo < 202301的删除 {"order_process_step_item",new TableInfo{SimulaRowCount=14538058 }},//orderNo < 202301的删除
{"order_scrap_board",new TableInfo{SimulaRowCount=123998 }}, //{"order_scrap_board",new TableInfo{SimulaRowCount=123998 }},
{"process_group",new TableInfo{SimulaRowCount=1253 }}, //{"process_group",new TableInfo{SimulaRowCount=1253 }},
{"process_info",new TableInfo{SimulaRowCount=7839 }}, //{"process_info",new TableInfo{SimulaRowCount=7839 }},
{"process_item_exp",new TableInfo{SimulaRowCount=28 }}, //{"process_item_exp",new TableInfo{SimulaRowCount=28 }},
{"process_schdule_capacity",new TableInfo{SimulaRowCount=39736 }}, //{"process_schdule_capacity",new TableInfo{SimulaRowCount=39736 }},
{"process_step_efficiency",new TableInfo{SimulaRowCount=8 }}, //{"process_step_efficiency",new TableInfo{SimulaRowCount=8 }},
{"report_template",new TableInfo{SimulaRowCount=7337 }}, //{"report_template",new TableInfo{SimulaRowCount=7337 }},
{"simple_package",new TableInfo{SimulaRowCount=130436 }},//orderNo < 202301的删除 //{"simple_package",new TableInfo{SimulaRowCount=130436 }},//orderNo < 202301的删除
{"sys_config",new TableInfo{SimulaRowCount=2296 }}, //{"sys_config",new TableInfo{SimulaRowCount=2296 }},
{"work_calendar",new TableInfo{SimulaRowCount=11 }}, //{"work_calendar",new TableInfo{SimulaRowCount=11 }},
{"work_shift",new TableInfo{SimulaRowCount=59 }}, //{"work_shift",new TableInfo{SimulaRowCount=59 }},
{"work_time",new TableInfo{SimulaRowCount=62 }}, //{"work_time",new TableInfo{SimulaRowCount=62 }},
}; };
var smallTableContext = new ProcessContext();
taskFun(new TasksOptions { TableInfoConfig = smallTablesDic, OutPutOptions = new OutPutOptions { FlushCount = 20000, OutPutTaskCount = 4 } }, taskFun(new TasksOptions { TableInfoConfig = smallTablesDic, OutPutOptions = new OutPutOptions { FlushCount = 20000, OutPutTaskCount = 4 } },
new DataRecordQueue(), new DataRecordQueue(), new ProcessContext()); new DataRecordQueue(), new DataRecordQueue(), smallTableContext);
} }
} }

View File

@ -35,40 +35,47 @@ public class OutputService : IOutputService
_errorRecorder = errorRecorder; _errorRecorder = errorRecorder;
} }
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context,CancellationToken cancellationToken) public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
{ {
_logger.LogInformation("***** Mysql output service started *****"); _logger.LogInformation("***** Mysql output service started *****");
_taskManager.CreateTasks(async () =>
var records = new List<DataRecord>();
while (!context.IsTransformCompleted || consumerQueue.Count > 0)
{ {
var records = new List<DataRecord>(); if (!consumerQueue.TryDequeue(out var record)) continue;
while (!context.IsTransformCompleted || consumerQueue.Count > 0) records.Add(record);
if (records.Count >= tasksOptions.OutPutOptions.FlushCount)
{ {
if (!consumerQueue.TryDequeue(out var record)) continue; var temp= new List<DataRecord>();
records.Add(record); temp.AddRange(records);
//_logger.LogInformation(@"*****OutputCount: {count} *****",count);
if (records.Count >= tasksOptions.OutPutOptions.FlushCount) ThreadPool.QueueUserWorkItem(async (queueState) =>
{ {
await FlushAsync(records); await FlushAsync(temp);
records.Clear(); });
}
if (_context.GetExceptions().Count>0)
{
_logger.LogInformation("***** Csv output thread is canceled *****");
return;
}
}
if (records.Count > 0)
{
await FlushAsync(records);
records.Clear(); records.Clear();
_logger.LogInformation("***** Mysql output thread completed *****");
} }
}, tasksOptions.OutPutOptions.OutPutTaskCount); if (_context.GetExceptions().Count > 0)
{
await _taskManager.WaitAll(); _logger.LogInformation("***** Csv output thread is canceled *****");
//_context.CompleteOutput(); return;
_logger.LogInformation(@"***** Mysql output service completed *****"); }
}
if (records.Count > 0)
{
var temp = new List<DataRecord>();
temp.AddRange(records);
ThreadPool.QueueUserWorkItem(async (queueState) =>
{
await FlushAsync(temp);
});
records.Clear();
context.AddFinishTask();
_logger.LogInformation("***** Finished Tasks Count:{FinishTaskCount} *****", context.FinishTaskCount);
_logger.LogInformation("***** Mysql output thread completed *****");
}
} }
private async Task FlushAsync(IEnumerable<DataRecord> records) private async Task FlushAsync(IEnumerable<DataRecord> records)

View File

@ -38,9 +38,6 @@ public class TransformService : ITransformService
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken) public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
{ {
_logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId); _logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
_taskManager.CreateTasks(async () =>
{
while ((!context.IsInputCompleted || producerQueue.Count > 0)) while ((!context.IsInputCompleted || producerQueue.Count > 0))
{ {
if (_context.GetExceptions().Count > 0) if (_context.GetExceptions().Count > 0)
@ -77,7 +74,7 @@ public class TransformService : ITransformService
} }
} }
context.CompleteTransform(); context.CompleteTransform();
},tasksOptions.TransformTaskCount,cancellationToken);
_logger.LogInformation("***** Data transformation service completed *****"); _logger.LogInformation("***** Data transformation service completed *****");
} }
} }

View File

@ -57,7 +57,7 @@ async Task RunProgram()
//}, "请输入单次插入的行数(默认为20000):"); //}, "请输入单次插入的行数(默认为20000):");
ThreadPool.SetMaxThreads(200, 200); ThreadPool.SetMaxThreads(8, 4);
var host = Host.CreateApplicationBuilder(args); var host = Host.CreateApplicationBuilder(args);
var commandOptions = host.Configuration.GetSection("CmdOptions").Get<CommandOptions>() ?? new CommandOptions(); var commandOptions = host.Configuration.GetSection("CmdOptions").Get<CommandOptions>() ?? new CommandOptions();
Console.WriteLine($"InputDir:{commandOptions?.InputDir}"); Console.WriteLine($"InputDir:{commandOptions?.InputDir}");

View File

@ -109,6 +109,10 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
var sb = new StringBuilder(); var sb = new StringBuilder();
foreach (var (tableName, records) in tableRecords) foreach (var (tableName, records) in tableRecords)
{ {
if (tableName == "order_process_step")
{
var a = 1;
}
if (records.Count == 0) if (records.Count == 0)
continue; continue;

View File

@ -1,4 +1,6 @@
namespace ConsoleApp2.Services; using System.Collections.Concurrent;
namespace ConsoleApp2.Services;
/// <summary> /// <summary>
/// 处理上下文类,标识处理进度 /// 处理上下文类,标识处理进度
@ -8,7 +10,8 @@ public class ProcessContext
private int _inputCount; private int _inputCount;
private int _transformCount; private int _transformCount;
private int _outputCount; private int _outputCount;
private IList<Exception> _exceptionList = new List<Exception>(); private int _finishedTaskCount;
private ConcurrentBag<Exception> _exceptionList = new ConcurrentBag<Exception>();
public bool IsInputCompleted { get; private set; } public bool IsInputCompleted { get; private set; }
public bool IsTransformCompleted { get; private set; } public bool IsTransformCompleted { get; private set; }
public bool IsOutputCompleted { get; private set; } public bool IsOutputCompleted { get; private set; }
@ -34,7 +37,13 @@ public class ProcessContext
{ {
_exceptionList.Add(ex); _exceptionList.Add(ex);
} }
public IList<Exception> GetExceptions()
public void AddFinishTask() => Interlocked.Increment(ref _finishedTaskCount);
public int FinishTaskCount
{
get => _finishedTaskCount;
}
public ConcurrentBag<Exception> GetExceptions()
{ {
return _exceptionList; return _exceptionList;
} }