diff --git a/MesETL.App/Services/ETL/MySqlDestination.cs b/MesETL.App/Services/ETL/MySqlDestination.cs index 02ab5d7..05d2379 100644 --- a/MesETL.App/Services/ETL/MySqlDestination.cs +++ b/MesETL.App/Services/ETL/MySqlDestination.cs @@ -64,7 +64,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable if (_recordCache.Count == 0) return; - await using var cmd = _conn.CreateCommand(); + var cmd = _conn.CreateCommand(); cmd.CommandTimeout = 0; try @@ -108,7 +108,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable public IEnumerable GetExecutionList(IDictionary> tableRecords, int maxAllowPacket) { - var sb = new StringBuilder("SET AUTOCOMMIT = 1;\n"); + var sb = new StringBuilder(_options.Value.FlushCount * 128); var appendCount = 0; foreach (var (tableName, records) in tableRecords) { @@ -117,6 +117,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable var recordIdx = 0; StartBuild: + sb.AppendLine("SET AUTOCOMMIT = 0;\n"); var noCommas = true; // 标准列顺序,插入时的字段需要按照该顺序排列 @@ -212,7 +213,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable TryAddForUpdateSuffix(tableName, sb); sb.Append(';').AppendLine(); - sb.Append("SET AUTOCOMMIT = 1;"); + sb.Append("COMMIT;"); yield return sb.ToString(); sb.Clear(); goto StartBuild; diff --git a/MesETL.App/Services/ProcessContext.cs b/MesETL.App/Services/ProcessContext.cs index d9c26be..7eb5f93 100644 --- a/MesETL.App/Services/ProcessContext.cs +++ b/MesETL.App/Services/ProcessContext.cs @@ -61,7 +61,7 @@ public class ProcessContext { _tableProgress.AddOrUpdate(table, (input: count, output: 0), (k, tuple) => { - tuple.output += count; + tuple.input += count; return tuple; }); } diff --git a/MesETL.App/appsettings.json b/MesETL.App/appsettings.json index e5e512c..7afa662 100644 --- a/MesETL.App/appsettings.json +++ b/MesETL.App/appsettings.json @@ -8,10 +8,10 @@ } }, "Input":{ - "InputDir": "D:\\Data\\DatabaseDump\\MyDumper-ZST 2024-12-3", // Csv数据输入目录 + "InputDir": "D:\\Data\\DatabaseDump\\Prod_Mock_CSV_2024-12-31", // Csv数据输入目录 "UseMock": false, // 使用模拟数据进行测试 "MockCountMultiplier": 1, // 模拟数据量级的乘数 -// "TableOrder": ["order_block_plan_item"], // 按顺序输入的表 + "TableOrder": ["order_item"], // 按顺序输入的表 "TableIgnoreList": [] // 忽略输入的表 }, "Transform":{ diff --git a/MesETL.Test/DatabaseToolBox.cs b/MesETL.Test/DatabaseToolBox.cs index be87840..3423149 100644 --- a/MesETL.Test/DatabaseToolBox.cs +++ b/MesETL.Test/DatabaseToolBox.cs @@ -12,7 +12,7 @@ namespace TestProject1; public class DatabaseToolBox { private readonly ITestOutputHelper _output; - public const string ConnStr = "Server=192.168.1.246;Port=33025;UserId=root;Password=123456;"; + public const string ConnStr = "Server=localhost;Port=3306;UserId=root;Password=123456;"; public DatabaseToolBox(ITestOutputHelper output) { @@ -156,9 +156,11 @@ public class DatabaseToolBox } [Theory] - [InlineData(["cferp_test_1"])] - [InlineData(["cferp_test_2"])] - [InlineData(["cferp_test_3"])] + [InlineData(["mesdb_1"])] + [InlineData(["mesdb_2"])] + [InlineData(["mesdb_3"])] + [InlineData(["mesdb_4"])] + [InlineData(["mesdb_5"])] public async Task DropAllIndex(string database) { var indexes = await GetAllTableIndexes(database); diff --git a/Mesdb.DataGenerator/Program.cs b/Mesdb.DataGenerator/Program.cs index a5a8ec2..8ed25d5 100644 --- a/Mesdb.DataGenerator/Program.cs +++ b/Mesdb.DataGenerator/Program.cs @@ -57,7 +57,7 @@ async Task RunProgram() host.Services.Configure(options => { - const float Multiplexer = 0.01F; + const float Multiplexer = 1F; var SampleSharedKeys = Enumerable.Range(0, 11).Select(i => (23010 + i * 10).ToString()).Concat( Enumerable.Range(0, 11).Select(i => (24010 + i * 10).ToString())).ToArray(); options.Rules = new Dictionary()