目标数据库增加配置

This commit is contained in:
lindj 2024-01-17 11:40:16 +08:00
parent e1aa621a7d
commit adb31cdc6d
4 changed files with 30 additions and 46 deletions

View File

@ -78,7 +78,7 @@ public class TransformService : ITransformService
record[i] = field; record[i] = field;
} }
//过滤不要的record //过滤不要的record
if ( await _options.Value.RecordFilter?.Invoke(record,_db) == false) continue; //if ( await _options.Value.RecordFilter?.Invoke(record,_db) == false) continue;
record.Database = _options.Value.DatabaseFilter?.Invoke(record); record.Database = _options.Value.DatabaseFilter?.Invoke(record);
//修改record //修改record
_options.Value.RecordModify?.Invoke(record); _options.Value.RecordModify?.Invoke(record);
@ -91,6 +91,7 @@ public class TransformService : ITransformService
record = replaceRecord; record = replaceRecord;
} }
_consumerQueue.Enqueue(record); _consumerQueue.Enqueue(record);
_context.AddTransform();
//数据增加 //数据增加
var addRecords=_options.Value.RecordAdd?.Invoke(record); var addRecords=_options.Value.RecordAdd?.Invoke(record);
if(addRecords != null&& addRecords.Count>0) if(addRecords != null&& addRecords.Count>0)
@ -98,9 +99,9 @@ public class TransformService : ITransformService
foreach(var rc in addRecords) foreach(var rc in addRecords)
{ {
_consumerQueue.Enqueue(rc); _consumerQueue.Enqueue(rc);
_context.AddTransform();
} }
} }
_context.AddTransform();
} }
_context.CompleteTransform(); _context.CompleteTransform();

View File

@ -204,6 +204,14 @@ async Task RunProgram()
//数据修改 //数据修改
options.RecordModify = (record) => options.RecordModify = (record) =>
{ {
if (record.TableName == "order_block_plan")
{
if (record.TryGetField("OrderNos", out var nos))
{
if (nos.Length <= 2) record.SetField("OrderNos", "\"[]\"");
}
}
if (record.TableName == "order_process")//修改order_process.NextStepID的默认值为0 if (record.TableName == "order_process")//修改order_process.NextStepID的默认值为0
{ {
@ -453,14 +461,22 @@ async Task RunProgram()
}); });
host.Services.Configure<DatabaseOutputOptions>(options => host.Services.Configure<DatabaseOutputOptions>(options =>
{ {
options.ConnectionString = new MySqlConnectionStringBuilder //options.ConnectionString = new MySqlConnectionStringBuilder
//{
// Server = "127.0.0.1",
// Port = 33309,
// Database = "cferp_test",
// UserID = "root",
// Password = "123456",
// MaximumPoolSize = 50, // 这个值应当小于 max_connections
//}.ConnectionString;
options.ConnectionString = new MySqlConnectionStringBuilder(host.Configuration.GetConnectionString("MySqlMaster"))
{ {
Server = "127.0.0.1", CharacterSet = "utf8",
Port = 33309, AllowUserVariables = true,
Database = "cferp_test", IgnoreCommandTransaction = true,
UserID = "root", TreatTinyAsBoolean = false,
Password = "123456", MaximumPoolSize = 50
MaximumPoolSize = 50, // 这个值应当小于 max_connections
}.ConnectionString; }.ConnectionString;
options.TaskCount = commandOptions.TaskCount; options.TaskCount = commandOptions.TaskCount;
options.FlushCount = commandOptions.FlushCount; options.FlushCount = commandOptions.FlushCount;

View File

@ -44,30 +44,6 @@ namespace ConsoleApp2.SimulationService
} }
foreach (var tableName in _tableOptions.Value.TableInfoConfig.Keys) foreach (var tableName in _tableOptions.Value.TableInfoConfig.Keys)
{ {
//_logger.LogInformation("Working sql file: {SqlPath}", sqlPath);
//var headers = await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(sqlPath);
//var sqlFileSource = _dataInputOptions.Value.CreateSource?.Invoke(sqlPath,null);
//var headers =await sqlFileSource?.GetHeaders();
//var csvFiles = await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(sqlPath);
//var csvFiles =await sqlFileSource?.GetCsvFiles();
//foreach (var csvFile in csvFiles)
//{
//var csvPath = Path.Combine(inputDir, csvFile);
//// var source = new JsvSource(csvPath, headers, _logger);
//var source = new CsvSource(csvPath, headers, _csvOptions.Value.Delimiter, _csvOptions.Value.QuoteChar, _logger);
//while (await source.ReadAsync())
//{
// _context.AddInput();
// _producerQueue.Enqueue(source.Current);
// if (cancellationToken.IsCancellationRequested)
// return;
//}
//var csvPath = Path.Combine(inputDir, csvFile);
//var tableName = DumpDataHelper.GetTableName(csvPath);
//var dataCount = 1200000000L;//当前表要生成的总数据量
var dataCount = _tableOptions.Value.TableInfoConfig[tableName].SimulaRowCount;//当前表要生成的总数据量 var dataCount = _tableOptions.Value.TableInfoConfig[tableName].SimulaRowCount;//当前表要生成的总数据量
var companyTotallCount = 1000;//当前表每个公司生成的总数据量 var companyTotallCount = 1000;//当前表每个公司生成的总数据量
var tempRecords = new List<DataRecord>(); var tempRecords = new List<DataRecord>();
@ -78,19 +54,7 @@ namespace ConsoleApp2.SimulationService
var getShareKeyTimes = 0;//sharekey生成的次数,每生成一次改变sharekey的值 var getShareKeyTimes = 0;//sharekey生成的次数,每生成一次改变sharekey的值
var getCompanyIDTimes = 0;//公司生成的次数,每生成一次改变companyID的值 var getCompanyIDTimes = 0;//公司生成的次数,每生成一次改变companyID的值
var shareKeyIntervalCount = 0; var shareKeyIntervalCount = 0;
//CsvSource source;
//switch (_dataInputOptions.Value.FileType)
//{
// case InputFileType.CSV:
// source=new CsvSource(csvPath, headers, _csvOptions.Value.Delimiter, _csvOptions.Value.QuoteChar, _logger);
// break;
// case InputFileType.JWT:
// source = new JwtSource(csvPath, headers, _csvOptions.Value.Delimiter, _csvOptions.Value.QuoteChar, _logger);
// break;
// default: break;
//}
//var source = new JwtSource(csvPath, headers, _csvOptions.Value.Delimiter, _csvOptions.Value.QuoteChar, _logger);
var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName); var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName);
var testRecord =await source.GetTestRecord(); var testRecord =await source.GetTestRecord();
for (long i = 1; i <= dataCount; i++) for (long i = 1; i <= dataCount; i++)

View File

@ -3,11 +3,14 @@
"InputFileType": "CSV", "InputFileType": "CSV",
"InputDir": "D:/MyDumper-ZST", "InputDir": "D:/MyDumper-ZST",
"TaskCount": 4, "TaskCount": 4,
"FlushCount": 1000, "FlushCount": 10000,
"Isutf8mb4": true, "Isutf8mb4": true,
"OldestShardKey": 23000, "OldestShardKey": 23000,
"OldestTime": "202301" "OldestTime": "202301"
}, },
"ConnectionStrings": {
"MySqlMaster": "Server=127.0.0.1;Port=33309;UserId=root;Password=123456;Database=cferp_test;"
},
"RedisCacheOptions": { "RedisCacheOptions": {
"Configuration": "localhost:6379" "Configuration": "localhost:6379"
} }