Compare commits

...

2 Commits

5 changed files with 18 additions and 16 deletions

View File

@ -64,7 +64,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
if (_recordCache.Count == 0) if (_recordCache.Count == 0)
return; return;
await using var cmd = _conn.CreateCommand(); var cmd = _conn.CreateCommand();
cmd.CommandTimeout = 0; cmd.CommandTimeout = 0;
try try
@ -108,7 +108,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
public IEnumerable<string> GetExecutionList(IDictionary<string, IList<DataRecord>> tableRecords, int maxAllowPacket) public IEnumerable<string> GetExecutionList(IDictionary<string, IList<DataRecord>> tableRecords, int maxAllowPacket)
{ {
var sb = new StringBuilder("SET AUTOCOMMIT = 1;\n"); var sb = new StringBuilder(_options.Value.FlushCount * 128);
var appendCount = 0; var appendCount = 0;
foreach (var (tableName, records) in tableRecords) foreach (var (tableName, records) in tableRecords)
{ {
@ -117,6 +117,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
var recordIdx = 0; var recordIdx = 0;
StartBuild: StartBuild:
sb.AppendLine("SET AUTOCOMMIT = 0;\n");
var noCommas = true; var noCommas = true;
// 标准列顺序,插入时的字段需要按照该顺序排列 // 标准列顺序,插入时的字段需要按照该顺序排列
@ -212,7 +213,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
TryAddForUpdateSuffix(tableName, sb); TryAddForUpdateSuffix(tableName, sb);
sb.Append(';').AppendLine(); sb.Append(';').AppendLine();
sb.Append("SET AUTOCOMMIT = 1;"); sb.Append("COMMIT;");
yield return sb.ToString(); yield return sb.ToString();
sb.Clear(); sb.Clear();
goto StartBuild; goto StartBuild;

View File

@ -59,17 +59,16 @@ public class ProcessContext
public void AddTableInput(string table, int count) public void AddTableInput(string table, int count)
{ {
if (!_tableProgress.TryAdd(table, (input: count, output: 0))) _tableProgress.AddOrUpdate(table, (input: count, output: 0), (k, tuple) =>
{ {
var tuple = _tableProgress[table];
tuple.input += count; tuple.input += count;
_tableProgress[table] = tuple; return tuple;
} });
} }
public void AddTableOutput(string table, int count) public void AddTableOutput(string table, int count)
{ {
_tableProgress.AddOrUpdate(table, (input:0, output:count), (k, tuple) => _tableProgress.AddOrUpdate(table, (input: 0, output: count), (k, tuple) =>
{ {
tuple.output += count; tuple.output += count;
return tuple; return tuple;

View File

@ -8,10 +8,10 @@
} }
}, },
"Input":{ "Input":{
"InputDir": "D:\\Data\\DatabaseDump\\MyDumper-ZST 2024-12-3", // Csv "InputDir": "D:\\Data\\DatabaseDump\\Prod_Mock_CSV_2024-12-31", // Csv
"UseMock": false, // 使 "UseMock": false, // 使
"MockCountMultiplier": 1, // "MockCountMultiplier": 1, //
// "TableOrder": ["order_block_plan_item"], // "TableOrder": ["order_item"], //
"TableIgnoreList": [] // "TableIgnoreList": [] //
}, },
"Transform":{ "Transform":{

View File

@ -12,7 +12,7 @@ namespace TestProject1;
public class DatabaseToolBox public class DatabaseToolBox
{ {
private readonly ITestOutputHelper _output; private readonly ITestOutputHelper _output;
public const string ConnStr = "Server=192.168.1.246;Port=33025;UserId=root;Password=123456;"; public const string ConnStr = "Server=localhost;Port=3306;UserId=root;Password=123456;";
public DatabaseToolBox(ITestOutputHelper output) public DatabaseToolBox(ITestOutputHelper output)
{ {
@ -156,9 +156,11 @@ public class DatabaseToolBox
} }
[Theory] [Theory]
[InlineData(["cferp_test_1"])] [InlineData(["mesdb_1"])]
[InlineData(["cferp_test_2"])] [InlineData(["mesdb_2"])]
[InlineData(["cferp_test_3"])] [InlineData(["mesdb_3"])]
[InlineData(["mesdb_4"])]
[InlineData(["mesdb_5"])]
public async Task DropAllIndex(string database) public async Task DropAllIndex(string database)
{ {
var indexes = await GetAllTableIndexes(database); var indexes = await GetAllTableIndexes(database);

View File

@ -57,7 +57,7 @@ async Task RunProgram()
host.Services.Configure<MockInputOptions>(options => host.Services.Configure<MockInputOptions>(options =>
{ {
const float Multiplexer = 0.01F; const float Multiplexer = 1F;
var SampleSharedKeys = Enumerable.Range(0, 11).Select(i => (23010 + i * 10).ToString()).Concat( var SampleSharedKeys = Enumerable.Range(0, 11).Select(i => (23010 + i * 10).ToString()).Concat(
Enumerable.Range(0, 11).Select(i => (24010 + i * 10).ToString())).ToArray(); Enumerable.Range(0, 11).Select(i => (24010 + i * 10).ToString())).ToArray();
options.Rules = new Dictionary<string, TableMockOptions>() options.Rules = new Dictionary<string, TableMockOptions>()