diff --git a/ConsoleApp2/HostedServices/TransformService.cs b/ConsoleApp2/HostedServices/TransformService.cs index d0dd3c1..933ece8 100644 --- a/ConsoleApp2/HostedServices/TransformService.cs +++ b/ConsoleApp2/HostedServices/TransformService.cs @@ -78,7 +78,7 @@ public class TransformService : ITransformService record[i] = field; } //过滤不要的record - if ( await _options.Value.RecordFilter?.Invoke(record,_db) == false) continue; + //if ( await _options.Value.RecordFilter?.Invoke(record,_db) == false) continue; record.Database = _options.Value.DatabaseFilter?.Invoke(record); //修改record _options.Value.RecordModify?.Invoke(record); @@ -91,6 +91,7 @@ public class TransformService : ITransformService record = replaceRecord; } _consumerQueue.Enqueue(record); + _context.AddTransform(); //数据增加 var addRecords=_options.Value.RecordAdd?.Invoke(record); if(addRecords != null&& addRecords.Count>0) @@ -98,9 +99,9 @@ public class TransformService : ITransformService foreach(var rc in addRecords) { _consumerQueue.Enqueue(rc); + _context.AddTransform(); } } - _context.AddTransform(); } _context.CompleteTransform(); diff --git a/ConsoleApp2/Program.cs b/ConsoleApp2/Program.cs index 9abcba6..c73c8b3 100644 --- a/ConsoleApp2/Program.cs +++ b/ConsoleApp2/Program.cs @@ -204,6 +204,14 @@ async Task RunProgram() //数据修改 options.RecordModify = (record) => { + if (record.TableName == "order_block_plan") + { + if (record.TryGetField("OrderNos", out var nos)) + { + if (nos.Length <= 2) record.SetField("OrderNos", "\"[]\""); + } + + } if (record.TableName == "order_process")//修改order_process.NextStepID的默认值为0 { @@ -453,14 +461,22 @@ async Task RunProgram() }); host.Services.Configure(options => { - options.ConnectionString = new MySqlConnectionStringBuilder + //options.ConnectionString = new MySqlConnectionStringBuilder + //{ + // Server = "127.0.0.1", + // Port = 33309, + // Database = "cferp_test", + // UserID = "root", + // Password = "123456", + // MaximumPoolSize = 50, // 这个值应当小于 max_connections + //}.ConnectionString; + options.ConnectionString = new MySqlConnectionStringBuilder(host.Configuration.GetConnectionString("MySqlMaster")) { - Server = "127.0.0.1", - Port = 33309, - Database = "cferp_test", - UserID = "root", - Password = "123456", - MaximumPoolSize = 50, // 这个值应当小于 max_connections + CharacterSet = "utf8", + AllowUserVariables = true, + IgnoreCommandTransaction = true, + TreatTinyAsBoolean = false, + MaximumPoolSize = 50 }.ConnectionString; options.TaskCount = commandOptions.TaskCount; options.FlushCount = commandOptions.FlushCount; diff --git a/ConsoleApp2/SimulationService/SimulationInputService.cs b/ConsoleApp2/SimulationService/SimulationInputService.cs index e8c5ba7..90e4789 100644 --- a/ConsoleApp2/SimulationService/SimulationInputService.cs +++ b/ConsoleApp2/SimulationService/SimulationInputService.cs @@ -44,30 +44,6 @@ namespace ConsoleApp2.SimulationService } foreach (var tableName in _tableOptions.Value.TableInfoConfig.Keys) { - //_logger.LogInformation("Working sql file: {SqlPath}", sqlPath); - //var headers = await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(sqlPath); - //var sqlFileSource = _dataInputOptions.Value.CreateSource?.Invoke(sqlPath,null); - //var headers =await sqlFileSource?.GetHeaders(); - //var csvFiles = await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(sqlPath); - //var csvFiles =await sqlFileSource?.GetCsvFiles(); - - //foreach (var csvFile in csvFiles) - //{ - //var csvPath = Path.Combine(inputDir, csvFile); - //// var source = new JsvSource(csvPath, headers, _logger); - //var source = new CsvSource(csvPath, headers, _csvOptions.Value.Delimiter, _csvOptions.Value.QuoteChar, _logger); - - //while (await source.ReadAsync()) - //{ - // _context.AddInput(); - // _producerQueue.Enqueue(source.Current); - // if (cancellationToken.IsCancellationRequested) - // return; - //} - //var csvPath = Path.Combine(inputDir, csvFile); - //var tableName = DumpDataHelper.GetTableName(csvPath); - - //var dataCount = 1200000000L;//当前表要生成的总数据量 var dataCount = _tableOptions.Value.TableInfoConfig[tableName].SimulaRowCount;//当前表要生成的总数据量 var companyTotallCount = 1000;//当前表每个公司生成的总数据量 var tempRecords = new List(); @@ -78,19 +54,7 @@ namespace ConsoleApp2.SimulationService var getShareKeyTimes = 0;//sharekey生成的次数,每生成一次,改变sharekey的值 var getCompanyIDTimes = 0;//公司生成的次数,每生成一次,改变companyID的值 var shareKeyIntervalCount = 0; - //CsvSource source; - //switch (_dataInputOptions.Value.FileType) - //{ - // case InputFileType.CSV: - // source=new CsvSource(csvPath, headers, _csvOptions.Value.Delimiter, _csvOptions.Value.QuoteChar, _logger); - // break; - // case InputFileType.JWT: - // source = new JwtSource(csvPath, headers, _csvOptions.Value.Delimiter, _csvOptions.Value.QuoteChar, _logger); - // break; - // default: break; - //} - //var source = new JwtSource(csvPath, headers, _csvOptions.Value.Delimiter, _csvOptions.Value.QuoteChar, _logger); var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName); var testRecord =await source.GetTestRecord(); for (long i = 1; i <= dataCount; i++) diff --git a/ConsoleApp2/appsettings.json b/ConsoleApp2/appsettings.json index eee4066..7c825cd 100644 --- a/ConsoleApp2/appsettings.json +++ b/ConsoleApp2/appsettings.json @@ -3,11 +3,14 @@ "InputFileType": "CSV", "InputDir": "D:/MyDumper-ZST", "TaskCount": 4, - "FlushCount": 1000, + "FlushCount": 10000, "Isutf8mb4": true, "OldestShardKey": 23000, "OldestTime": "202301" }, + "ConnectionStrings": { + "MySqlMaster": "Server=127.0.0.1;Port=33309;UserId=root;Password=123456;Database=cferp_test;" + }, "RedisCacheOptions": { "Configuration": "localhost:6379" }