diff --git a/MesETL.Tool/MesETL.Tool.csproj b/MesETL.Clean/MesETL.Clean.csproj
similarity index 76%
rename from MesETL.Tool/MesETL.Tool.csproj
rename to MesETL.Clean/MesETL.Clean.csproj
index 779e4ca..2bf0899 100644
--- a/MesETL.Tool/MesETL.Tool.csproj
+++ b/MesETL.Clean/MesETL.Clean.csproj
@@ -7,10 +7,6 @@
enable
-
-
-
-
diff --git a/MesETL.Clean/Program.cs b/MesETL.Clean/Program.cs
new file mode 100644
index 0000000..26df7e2
--- /dev/null
+++ b/MesETL.Clean/Program.cs
@@ -0,0 +1,55 @@
+using MesETL.Shared.Helper;
+
+var connStr = GetArg("-s") ?? throw new ApplicationException("未配置数据库连接字符串");
+var eachLimit = int.Parse(GetArg("-l") ?? "1000");
+var parallelTask = int.Parse(GetArg("-p") ?? "4");
+
+var deletionCount = 0;
+
+Console.WriteLine("Running Deletion...");
+
+_ = Task.Run(async () =>
+{
+ while (true)
+ {
+ await Task.Delay(5000);
+ Console.WriteLine($"[{DateTime.Now}] DELETE COUNT: {deletionCount}");
+ }
+});
+
+await Parallel.ForAsync(0, parallelTask, async (i, token) =>
+{
+ while (true)
+ {
+ var effectRows = await DatabaseHelper.NonQueryAsync(connStr,
+ $"DELETE FROM `order_data_block` WHERE CompanyID = 0 ORDER BY ID LIMIT {eachLimit};", token);
+ if(effectRows == 0)
+ break;
+ Interlocked.Add(ref deletionCount, effectRows);
+ }
+});
+
+Console.WriteLine($"[{DateTime.Now}] DELETE COUNT: {deletionCount}");
+return;
+string? GetArg(string instruct)
+{
+ var idx = Array.IndexOf(args, instruct);
+ if (idx == -1)
+ return null;
+ if (args[idx + 1].StartsWith('-'))
+ throw new ArgumentException("Argument Lost", nameof(instruct));
+ return args[idx + 1];
+}
+
+// var match = await DatabaseHelper.QueryTableAsync(connStr,
+// $"SELECT `ID` FROM `order_data_block` WHERE CompanyID = 0 LIMIT {eachLimit};",
+// token);
+// var rows = match.Tables[0].Rows;
+// if (rows.Count == 0)
+// return;
+//
+// foreach (DataRow row in rows)
+// {
+// var id = row["ID"].ToString();
+// await DatabaseHelper.NonQueryAsync(connStr, $"DELETE FROM `order_data_block` WHERE `ID` = {id}", token);
+// }
\ No newline at end of file
diff --git a/MesETL.sln b/MesETL.sln
index 1bc248f..df1ddba 100644
--- a/MesETL.sln
+++ b/MesETL.sln
@@ -4,7 +4,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.App", "MesETL.App\Me
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Test", "MesETL.Test\MesETL.Test.csproj", "{8679D5B6-5853-446E-9882-7B7A8E270500}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Tool", "MesETL.Tool\MesETL.Tool.csproj", "{68307B05-3D66-4322-A42F-C044C1E8BA3B}"
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Mesdb.Cli", "Mesdb.Cli\Mesdb.Cli.csproj", "{68307B05-3D66-4322-A42F-C044C1E8BA3B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Shared", "MesETL.Shared\MesETL.Shared.csproj", "{FE134001-0E22-458B-BEF2-29712A29087E}"
EndProject
diff --git a/Mesdb.Cli/BatchDbExtensions.cs b/Mesdb.Cli/BatchDbExtensions.cs
new file mode 100644
index 0000000..71fb238
--- /dev/null
+++ b/Mesdb.Cli/BatchDbExtensions.cs
@@ -0,0 +1,52 @@
+using System.Collections.Concurrent;
+using System.Data;
+using MesETL.Shared.Helper;
+
+namespace Mesdb.Cli;
+
+public static class BatchDbExtensions
+{
+ public static async Task>> CountDatabasesAsync(string connStr, IList dbNames, CancellationToken cancellationToken = default)
+ {
+ var result = new ConcurrentDictionary>();
+ var tables = await DatabaseHelper.QueryTableAsync(connStr,
+ $"""
+ SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbNames[0]}';
+ """);
+
+ await Parallel.ForEachAsync(dbNames, async (dbName, ct) =>
+ {
+ await Parallel.ForEachAsync(tables.Tables[0].Rows.Cast(), async (row, ct) =>
+ {
+ var tableName = row[0].ToString()!;
+ var count = (long)(await DatabaseHelper.QueryScalarAsync(connStr,
+ $"SELECT COUNT(1) FROM `{dbName}`.`{tableName}`;", ct))!;
+ result.AddOrUpdate(dbName, new ConcurrentDictionary(), (db, dict) =>
+ {
+ dict.AddOrUpdate(tableName, count, (table, num) => num + count);
+ return dict;
+ });
+ });
+ });
+ return result;
+ }
+
+ public static async Task AnalyzeAllAsync(string connStr, IList dbNames)
+ {
+ var tables = await DatabaseHelper.QueryTableAsync(connStr,
+ $"""
+ SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbNames[0]}';
+ """);
+
+ await Parallel.ForEachAsync(dbNames, async (dbName, ct) =>
+ {
+ await Parallel.ForEachAsync(tables.Tables[0].Rows.Cast(), async (row, ct) =>
+ {
+ var tableName = row[0].ToString()!;
+ var result = (await DatabaseHelper.QueryTableAsync(connStr,
+ $"ANALYZE TABLE `{dbName}`.`{tableName}`;", ct));
+ Console.WriteLine(string.Join('\t', result.Tables[0].Rows[0].ItemArray.Select(x => x.ToString())));
+ });
+ });
+ }
+}
\ No newline at end of file
diff --git a/Mesdb.Cli/Mesdb.Cli.csproj b/Mesdb.Cli/Mesdb.Cli.csproj
new file mode 100644
index 0000000..60b884a
--- /dev/null
+++ b/Mesdb.Cli/Mesdb.Cli.csproj
@@ -0,0 +1,22 @@
+
+
+
+ Exe
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/MesETL.Tool/Program.cs b/Mesdb.Cli/Program.cs
similarity index 54%
rename from MesETL.Tool/Program.cs
rename to Mesdb.Cli/Program.cs
index 8f901c1..fd82bd5 100644
--- a/MesETL.Tool/Program.cs
+++ b/Mesdb.Cli/Program.cs
@@ -1,6 +1,6 @@
-using System.Collections.Concurrent;
-using System.Data;
+using Cocona;
using MesETL.Shared.Helper;
+using Mesdb.Cli;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
@@ -11,7 +11,10 @@ host.Configuration.AddCommandLine(args, new Dictionary
{ "--ConnectionString", "ConnectionString" },
{ "-B", "Databases" },
{ "--Databases", "Databases" },
- { "-a", "All" }
+ { "-a", "All" },
+ { "-c", "Command"},
+ { "--Command", "Command" },
+ { "--Sql", "Command" }
});
host.Build();
var connStr = host.Configuration.GetValue("ConnectionString") ?? throw new ApplicationException("没有配置数据库连接字符串");
@@ -20,7 +23,7 @@ var all = host.Configuration.GetValue("All");
if (args.Length > 1 && args[0] == "count")
{
- var result =await CountDatabasesAsync(connStr, databases);
+ var result = await BatchDbExtensions.CountDatabasesAsync(connStr, databases);
if (all)
{
foreach (var (k, v) in result)
@@ -43,27 +46,7 @@ if (args.Length > 1 && args[0] == "count")
}
}
-async Task>> CountDatabasesAsync(string connStr, IList dbNames, CancellationToken cancellationToken = default)
+if (args.Length > 1 && args[0] == "analyze")
{
- var result = new ConcurrentDictionary>();
- var tables = await DatabaseHelper.QueryTableAsync(connStr,
- $"""
- SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbNames[0]}';
- """);
-
- await Parallel.ForEachAsync(dbNames, async (dbName, ct) =>
- {
- await Parallel.ForEachAsync(tables.Tables[0].Rows.Cast(), async (row, ct) =>
- {
- var tableName = row[0].ToString()!;
- var count = (long)(await DatabaseHelper.QueryScalarAsync(connStr,
- $"SELECT COUNT(1) FROM `{dbName}`.`{tableName}`;", ct))!;
- result.AddOrUpdate(dbName, new ConcurrentDictionary(), (db, dict) =>
- {
- dict.AddOrUpdate(tableName, count, (table, num) => num + count);
- return dict;
- });
- });
- });
- return result;
+ await BatchDbExtensions.AnalyzeAllAsync(connStr, databases);
}
\ No newline at end of file
diff --git a/Mesdb.Cli/Schema/DB.cs b/Mesdb.Cli/Schema/DB.cs
new file mode 100644
index 0000000..a6cda37
--- /dev/null
+++ b/Mesdb.Cli/Schema/DB.cs
@@ -0,0 +1,44 @@
+using MesETL.Shared.Helper;
+using MySqlConnector;
+using Serilog;
+
+namespace Mesdb.Cli.Schema;
+
+public class DB
+{
+ public required string ConnectionString { get; init; }
+ public required IReadOnlyList Databases { get; init; }
+
+ public static DB Create(string connStr, IEnumerable dbNames)
+ {
+ var databases = new List();
+ foreach (var dbName in dbNames)
+ {
+ var dbConnStr = new MySqlConnectionStringBuilder(connStr)
+ {
+ Database = dbName
+ }.ConnectionString;
+
+ try
+ {
+ _ = DatabaseHelper.NonQueryAsync(dbConnStr, "SHOW DATABASES;").Result;
+ databases.Add(new Database(dbName, dbConnStr));
+ }
+ catch (Exception e)
+ {
+ Log.Logger.Fatal(e, "无法连接到数据库: {DbName} ", dbName);
+ throw;
+ }
+ }
+
+ return new DB
+ {
+ ConnectionString = connStr,
+ Databases = databases
+ };
+ }
+
+ private DB()
+ {
+ }
+}
\ No newline at end of file
diff --git a/Mesdb.Cli/Schema/Database.cs b/Mesdb.Cli/Schema/Database.cs
new file mode 100644
index 0000000..c2cdb78
--- /dev/null
+++ b/Mesdb.Cli/Schema/Database.cs
@@ -0,0 +1,50 @@
+using System.Data;
+using MesETL.Shared.Helper;
+using MySqlConnector;
+
+namespace Mesdb.Cli.Schema;
+
+public class Database
+{
+ public static async Task FetchTableAsync(string dbName, string connStr)
+ {
+ var tables = await DatabaseHelper.QueryTableAsync(connStr,
+ $"""
+ SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbName}';
+ """);
+ return tables.Tables[0].Rows.Cast().Select(row => new Table{Name = row[0].ToString()!}).ToArray();
+ }
+
+
+ public string Name { get; }
+ public string ConnectionString { get; }
+ public IReadOnlyList Tables { get; }
+
+ public Database(string name, string connStr)
+ {
+ var trueConnStr = new MySqlConnectionStringBuilder(connStr)
+ {
+ Database = name
+ }.ConnectionString;
+
+ var tables = FetchTableAsync(name, trueConnStr).Result;
+ Name = name;
+ ConnectionString = trueConnStr;
+ Tables = tables;
+ }
+
+ public Task ExecuteNonQueryAsync(string sql, CancellationToken cancellationToken = default)
+ {
+ return DatabaseHelper.NonQueryAsync(ConnectionString, sql, cancellationToken);
+ }
+
+ public Task ExecuteQueryAsync(string sql, CancellationToken cancellationToken = default)
+ {
+ return DatabaseHelper.QueryTableAsync(ConnectionString, sql, cancellationToken);
+ }
+
+ public Task