multTasks #2
@ -69,6 +69,7 @@ public class MainHostedService : BackgroundService
|
|||||||
|
|
||||||
var bigTablesDic = new Dictionary<string, TableInfo>
|
var bigTablesDic = new Dictionary<string, TableInfo>
|
||||||
{
|
{
|
||||||
|
{"order",new TableInfo{SimulaRowCount=5019216 }},
|
||||||
{"order_block_plan",new TableInfo{SimulaRowCount=2725553 }},//CreateTime < 202301的删除
|
{"order_block_plan",new TableInfo{SimulaRowCount=2725553 }},//CreateTime < 202301的删除
|
||||||
{"order_block_plan_result",new TableInfo{SimulaRowCount=1174096 }},
|
{"order_block_plan_result",new TableInfo{SimulaRowCount=1174096 }},
|
||||||
{"order_box_block",new TableInfo{SimulaRowCount=29755672 }},
|
{"order_box_block",new TableInfo{SimulaRowCount=29755672 }},
|
||||||
@ -81,7 +82,6 @@ public class MainHostedService : BackgroundService
|
|||||||
var smallTablesDic = new Dictionary<string, TableInfo>
|
var smallTablesDic = new Dictionary<string, TableInfo>
|
||||||
{
|
{
|
||||||
{"machine",new TableInfo{SimulaRowCount=14655 }},
|
{"machine",new TableInfo{SimulaRowCount=14655 }},
|
||||||
{"order",new TableInfo{SimulaRowCount=5019216 }},
|
|
||||||
{"order_data_block",new TableInfo{SimulaRowCount=731800334 }},
|
{"order_data_block",new TableInfo{SimulaRowCount=731800334 }},
|
||||||
{"order_data_goods",new TableInfo{SimulaRowCount=25803671 }},
|
{"order_data_goods",new TableInfo{SimulaRowCount=25803671 }},
|
||||||
{"order_data_parts",new TableInfo{SimulaRowCount=468517543 }},
|
{"order_data_parts",new TableInfo{SimulaRowCount=468517543 }},
|
||||||
|
@ -62,17 +62,17 @@ public class TransformService : ITransformService
|
|||||||
}
|
}
|
||||||
consumerQueue.Enqueue(record);
|
consumerQueue.Enqueue(record);
|
||||||
_context.AddTransform();
|
_context.AddTransform();
|
||||||
//数据增加
|
//数据增加
|
||||||
var addRecords = _options.Value.RecordAdd?.Invoke(record);
|
var addRecords = _options.Value.RecordAdd?.Invoke(record);
|
||||||
if (addRecords != null && addRecords.Count > 0)
|
if (addRecords != null && addRecords.Count > 0)
|
||||||
|
{
|
||||||
|
foreach (var rc in addRecords)
|
||||||
{
|
{
|
||||||
foreach (var rc in addRecords)
|
consumerQueue.Enqueue(rc);
|
||||||
{
|
_context.AddTransform();
|
||||||
consumerQueue.Enqueue(rc);
|
|
||||||
_context.AddTransform();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
context.CompleteTransform();
|
context.CompleteTransform();
|
||||||
|
|
||||||
_logger.LogInformation("***** Data transformation service completed *****");
|
_logger.LogInformation("***** Data transformation service completed *****");
|
||||||
|
@ -14,7 +14,6 @@ using Serilog;
|
|||||||
using Microsoft.Extensions.Caching.Distributed;
|
using Microsoft.Extensions.Caching.Distributed;
|
||||||
using Serilog.Events;
|
using Serilog.Events;
|
||||||
|
|
||||||
|
|
||||||
await RunProgram();
|
await RunProgram();
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -463,7 +462,8 @@ async Task RunProgram()
|
|||||||
AllowUserVariables = true,
|
AllowUserVariables = true,
|
||||||
IgnoreCommandTransaction = true,
|
IgnoreCommandTransaction = true,
|
||||||
TreatTinyAsBoolean = false,
|
TreatTinyAsBoolean = false,
|
||||||
MaximumPoolSize = 50
|
MaximumPoolSize = 50,
|
||||||
|
SslMode = MySqlSslMode.None,
|
||||||
}.ConnectionString;
|
}.ConnectionString;
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -486,7 +486,7 @@ async Task RunProgram()
|
|||||||
|
|
||||||
host.Services.AddHostedService<MainHostedService>();
|
host.Services.AddHostedService<MainHostedService>();
|
||||||
host.Services.AddHostedService<TaskMonitorService>();
|
host.Services.AddHostedService<TaskMonitorService>();
|
||||||
host.Services.AddSingleton<IInputService, InputService>();
|
host.Services.AddSingleton<IInputService,InputService>();
|
||||||
host.Services.AddSingleton<ITransformService, TransformService>();
|
host.Services.AddSingleton<ITransformService, TransformService>();
|
||||||
host.Services.AddSingleton<IOutputService, OutputService>();
|
host.Services.AddSingleton<IOutputService, OutputService>();
|
||||||
var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get<RedisCacheOptions>() ?? new RedisCacheOptions();
|
var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get<RedisCacheOptions>() ?? new RedisCacheOptions();
|
||||||
|
@ -39,93 +39,99 @@ namespace ConsoleApp2.SimulationService
|
|||||||
}
|
}
|
||||||
foreach (var tableName in tasksOptions.TableInfoConfig.Keys)
|
foreach (var tableName in tasksOptions.TableInfoConfig.Keys)
|
||||||
{
|
{
|
||||||
var dataCount = tasksOptions.TableInfoConfig[tableName].SimulaRowCount;//当前表要生成的总数据量
|
_logger.LogInformation("Working table: {tableName}", tableName);
|
||||||
var companyTotallCount = 1000;//当前表每个公司生成的总数据量
|
|
||||||
var tempRecords = new List<DataRecord>();
|
|
||||||
var sk = DataHelper.shareKeys.First();
|
|
||||||
var companyID = DataHelper.companyIds.First();
|
|
||||||
|
|
||||||
var shareKeyInterval = 20000;//每个sharekey的数据量
|
var dataCount = tasksOptions.TableInfoConfig[tableName].SimulaRowCount;//当前表要生成的总数据量
|
||||||
var getShareKeyTimes = 0;//sharekey生成的次数,每生成一次,改变sharekey的值
|
var companyTotallCount = 1000;//当前表每个公司生成的总数据量
|
||||||
var getCompanyIDTimes = 0;//公司生成的次数,每生成一次,改变companyID的值
|
var tempRecords = new List<DataRecord>();
|
||||||
var shareKeyIntervalCount = 0;
|
var sk = DataHelper.shareKeys.First();
|
||||||
|
var companyID = DataHelper.companyIds.First();
|
||||||
|
|
||||||
var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName);
|
var shareKeyInterval = 20000;//每个sharekey的数据量
|
||||||
var testRecord =await source.GetTestRecord();
|
var getShareKeyTimes = 0;//sharekey生成的次数,每生成一次,改变sharekey的值
|
||||||
for (long i = 1; i <= dataCount; i++)
|
var getCompanyIDTimes = 0;//公司生成的次数,每生成一次,改变companyID的值
|
||||||
|
var shareKeyIntervalCount = 0;
|
||||||
|
|
||||||
|
var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName);
|
||||||
|
var testRecord = await source.GetTestRecord();
|
||||||
|
for (long i = 1; i <= dataCount; i++)
|
||||||
|
{
|
||||||
|
shareKeyIntervalCount++;
|
||||||
|
if (shareKeyIntervalCount > shareKeyInterval)
|
||||||
{
|
{
|
||||||
shareKeyIntervalCount++;
|
sk = DataHelper.GetShareKey(getShareKeyTimes);
|
||||||
if (shareKeyIntervalCount > shareKeyInterval)
|
getShareKeyTimes++;
|
||||||
|
shareKeyIntervalCount = 0;
|
||||||
|
}
|
||||||
|
var fields = new string[testRecord.Fields.Length];
|
||||||
|
Array.Copy(testRecord.Fields, fields, testRecord.Fields.Length);
|
||||||
|
var record = new DataRecord(fields, testRecord.TableName, testRecord.Headers, companyID);
|
||||||
|
//更新record的ID、OrderNo,ShardKey值
|
||||||
|
if (record.Headers.Contains("ID"))
|
||||||
|
{
|
||||||
|
var index = Array.IndexOf(record.Headers, "ID");
|
||||||
|
if (index > -1)
|
||||||
{
|
{
|
||||||
sk = DataHelper.GetShareKey(getShareKeyTimes);
|
record.Fields[index] = i.ToString();
|
||||||
getShareKeyTimes++;
|
|
||||||
shareKeyIntervalCount = 0;
|
|
||||||
}
|
|
||||||
var fields = new string[testRecord.Fields.Length];
|
|
||||||
Array.Copy(testRecord.Fields, fields, testRecord.Fields.Length);
|
|
||||||
var record = new DataRecord(fields, testRecord.TableName, testRecord.Headers, companyID);
|
|
||||||
//更新record的ID、OrderNo,ShardKey值
|
|
||||||
if (record.Headers.Contains("ID"))
|
|
||||||
{
|
|
||||||
var index = Array.IndexOf(record.Headers, "ID");
|
|
||||||
if (index > -1)
|
|
||||||
{
|
|
||||||
record.Fields[index] = i.ToString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (record.TableName == "order_box_block" && record.Headers.Contains("BoxID"))
|
|
||||||
{
|
|
||||||
var index = Array.IndexOf(record.Headers, "BoxID");
|
|
||||||
if (index > -1)
|
|
||||||
{
|
|
||||||
record.Fields[index] = i.ToString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if ((record.TableName == "order_block_plan_item" || record.TableName == "order_package_item") && record.Headers.Contains("ItemID"))
|
|
||||||
{
|
|
||||||
var index = Array.IndexOf(record.Headers, "ItemID");
|
|
||||||
if (index > -1)
|
|
||||||
{
|
|
||||||
record.Fields[index] = i.ToString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (record.TableName == "order" && record.Headers.Contains("OrderNo"))
|
|
||||||
{
|
|
||||||
var index = Array.IndexOf(record.Headers, "OrderNo");
|
|
||||||
if (index > -1)
|
|
||||||
{
|
|
||||||
record.Fields[index] = i.ToString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (record.Headers.Contains("ShardKey"))
|
|
||||||
{
|
|
||||||
var index = Array.IndexOf(record.Headers, "ShardKey");
|
|
||||||
if (index > -1)
|
|
||||||
{
|
|
||||||
record.Fields[index] = sk.ToString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tempRecords.Add(record);
|
|
||||||
if (tempRecords.Count >= companyTotallCount || i >= dataCount - 1)
|
|
||||||
{
|
|
||||||
foreach (var rc in tempRecords)
|
|
||||||
{
|
|
||||||
_context.AddInput();
|
|
||||||
producerQueue.Enqueue(rc);
|
|
||||||
if (cancellationToken.IsCancellationRequested)
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
tempRecords.Clear();
|
|
||||||
companyID = DataHelper.GetCompanyId(getCompanyIDTimes);
|
|
||||||
getCompanyIDTimes++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_logger.LogInformation("table:'{tableName}' simulation input completed", tableName);
|
if (record.TableName == "order_box_block" && record.Headers.Contains("BoxID"))
|
||||||
|
{
|
||||||
|
var index = Array.IndexOf(record.Headers, "BoxID");
|
||||||
|
if (index > -1)
|
||||||
|
{
|
||||||
|
record.Fields[index] = i.ToString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ((record.TableName == "order_block_plan_item" || record.TableName == "order_package_item") && record.Headers.Contains("ItemID"))
|
||||||
|
{
|
||||||
|
var index = Array.IndexOf(record.Headers, "ItemID");
|
||||||
|
if (index > -1)
|
||||||
|
{
|
||||||
|
record.Fields[index] = i.ToString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (record.TableName == "order" && record.Headers.Contains("OrderNo"))
|
||||||
|
{
|
||||||
|
var index = Array.IndexOf(record.Headers, "OrderNo");
|
||||||
|
if (index > -1)
|
||||||
|
{
|
||||||
|
record.Fields[index] = i.ToString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (record.Headers.Contains("ShardKey"))
|
||||||
|
{
|
||||||
|
var index = Array.IndexOf(record.Headers, "ShardKey");
|
||||||
|
if (index > -1)
|
||||||
|
{
|
||||||
|
record.Fields[index] = sk.ToString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tempRecords.Add(record);
|
||||||
|
if (tempRecords.Count >= companyTotallCount || i >= dataCount - 1)
|
||||||
|
{
|
||||||
|
foreach (var rc in tempRecords)
|
||||||
|
{
|
||||||
|
_context.AddInput();
|
||||||
|
if(_context.InputCount== 2000000)
|
||||||
|
{
|
||||||
|
var a = 1;
|
||||||
|
}
|
||||||
|
producerQueue.Enqueue(rc);
|
||||||
|
if (cancellationToken.IsCancellationRequested)
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
tempRecords.Clear();
|
||||||
|
companyID = DataHelper.GetCompanyId(getCompanyIDTimes);
|
||||||
|
getCompanyIDTimes++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_logger.LogInformation("table:'{tableName}' simulation input completed", tableName);
|
||||||
//}
|
//}
|
||||||
//_logger.LogInformation("File '{File}' input completed", Path.GetFileName(sqlPath));
|
//_logger.LogInformation("File '{File}' input completed", Path.GetFileName(sqlPath));
|
||||||
}
|
}
|
||||||
|
|
||||||
_context.CompleteInput();
|
context.CompleteInput();
|
||||||
_logger.LogInformation("***** Csv input service completed *****");
|
_logger.LogInformation("***** Csv input service completed *****");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user