From 3dbfaffd05762ceab17fe651e8761328388cfad7 Mon Sep 17 00:00:00 2001 From: "2817212736@qq.com" <2817212736@qq.com> Date: Fri, 15 Nov 2024 14:10:25 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B4=E7=90=86=E9=A1=B9=E7=9B=AE=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../MesETL.Clean.csproj | 4 -- MesETL.Clean/Program.cs | 55 +++++++++++++++++++ MesETL.sln | 2 +- Mesdb.Cli/BatchDbExtensions.cs | 52 ++++++++++++++++++ Mesdb.Cli/Mesdb.Cli.csproj | 22 ++++++++ {MesETL.Tool => Mesdb.Cli}/Program.cs | 35 +++--------- Mesdb.Cli/Schema/DB.cs | 44 +++++++++++++++ Mesdb.Cli/Schema/Database.cs | 50 +++++++++++++++++ Mesdb.Cli/Schema/Table.cs | 8 +++ 9 files changed, 241 insertions(+), 31 deletions(-) rename MesETL.Tool/MesETL.Tool.csproj => MesETL.Clean/MesETL.Clean.csproj (76%) create mode 100644 MesETL.Clean/Program.cs create mode 100644 Mesdb.Cli/BatchDbExtensions.cs create mode 100644 Mesdb.Cli/Mesdb.Cli.csproj rename {MesETL.Tool => Mesdb.Cli}/Program.cs (54%) create mode 100644 Mesdb.Cli/Schema/DB.cs create mode 100644 Mesdb.Cli/Schema/Database.cs create mode 100644 Mesdb.Cli/Schema/Table.cs diff --git a/MesETL.Tool/MesETL.Tool.csproj b/MesETL.Clean/MesETL.Clean.csproj similarity index 76% rename from MesETL.Tool/MesETL.Tool.csproj rename to MesETL.Clean/MesETL.Clean.csproj index 779e4ca..2bf0899 100644 --- a/MesETL.Tool/MesETL.Tool.csproj +++ b/MesETL.Clean/MesETL.Clean.csproj @@ -7,10 +7,6 @@ enable - - - - diff --git a/MesETL.Clean/Program.cs b/MesETL.Clean/Program.cs new file mode 100644 index 0000000..26df7e2 --- /dev/null +++ b/MesETL.Clean/Program.cs @@ -0,0 +1,55 @@ +using MesETL.Shared.Helper; + +var connStr = GetArg("-s") ?? throw new ApplicationException("未配置数据库连接字符串"); +var eachLimit = int.Parse(GetArg("-l") ?? "1000"); +var parallelTask = int.Parse(GetArg("-p") ?? "4"); + +var deletionCount = 0; + +Console.WriteLine("Running Deletion..."); + +_ = Task.Run(async () => +{ + while (true) + { + await Task.Delay(5000); + Console.WriteLine($"[{DateTime.Now}] DELETE COUNT: {deletionCount}"); + } +}); + +await Parallel.ForAsync(0, parallelTask, async (i, token) => +{ + while (true) + { + var effectRows = await DatabaseHelper.NonQueryAsync(connStr, + $"DELETE FROM `order_data_block` WHERE CompanyID = 0 ORDER BY ID LIMIT {eachLimit};", token); + if(effectRows == 0) + break; + Interlocked.Add(ref deletionCount, effectRows); + } +}); + +Console.WriteLine($"[{DateTime.Now}] DELETE COUNT: {deletionCount}"); +return; +string? GetArg(string instruct) +{ + var idx = Array.IndexOf(args, instruct); + if (idx == -1) + return null; + if (args[idx + 1].StartsWith('-')) + throw new ArgumentException("Argument Lost", nameof(instruct)); + return args[idx + 1]; +} + +// var match = await DatabaseHelper.QueryTableAsync(connStr, +// $"SELECT `ID` FROM `order_data_block` WHERE CompanyID = 0 LIMIT {eachLimit};", +// token); +// var rows = match.Tables[0].Rows; +// if (rows.Count == 0) +// return; +// +// foreach (DataRow row in rows) +// { +// var id = row["ID"].ToString(); +// await DatabaseHelper.NonQueryAsync(connStr, $"DELETE FROM `order_data_block` WHERE `ID` = {id}", token); +// } \ No newline at end of file diff --git a/MesETL.sln b/MesETL.sln index 1bc248f..df1ddba 100644 --- a/MesETL.sln +++ b/MesETL.sln @@ -4,7 +4,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.App", "MesETL.App\Me EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Test", "MesETL.Test\MesETL.Test.csproj", "{8679D5B6-5853-446E-9882-7B7A8E270500}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Tool", "MesETL.Tool\MesETL.Tool.csproj", "{68307B05-3D66-4322-A42F-C044C1E8BA3B}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Mesdb.Cli", "Mesdb.Cli\Mesdb.Cli.csproj", "{68307B05-3D66-4322-A42F-C044C1E8BA3B}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Shared", "MesETL.Shared\MesETL.Shared.csproj", "{FE134001-0E22-458B-BEF2-29712A29087E}" EndProject diff --git a/Mesdb.Cli/BatchDbExtensions.cs b/Mesdb.Cli/BatchDbExtensions.cs new file mode 100644 index 0000000..71fb238 --- /dev/null +++ b/Mesdb.Cli/BatchDbExtensions.cs @@ -0,0 +1,52 @@ +using System.Collections.Concurrent; +using System.Data; +using MesETL.Shared.Helper; + +namespace Mesdb.Cli; + +public static class BatchDbExtensions +{ + public static async Task>> CountDatabasesAsync(string connStr, IList dbNames, CancellationToken cancellationToken = default) + { + var result = new ConcurrentDictionary>(); + var tables = await DatabaseHelper.QueryTableAsync(connStr, + $""" + SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbNames[0]}'; + """); + + await Parallel.ForEachAsync(dbNames, async (dbName, ct) => + { + await Parallel.ForEachAsync(tables.Tables[0].Rows.Cast(), async (row, ct) => + { + var tableName = row[0].ToString()!; + var count = (long)(await DatabaseHelper.QueryScalarAsync(connStr, + $"SELECT COUNT(1) FROM `{dbName}`.`{tableName}`;", ct))!; + result.AddOrUpdate(dbName, new ConcurrentDictionary(), (db, dict) => + { + dict.AddOrUpdate(tableName, count, (table, num) => num + count); + return dict; + }); + }); + }); + return result; + } + + public static async Task AnalyzeAllAsync(string connStr, IList dbNames) + { + var tables = await DatabaseHelper.QueryTableAsync(connStr, + $""" + SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbNames[0]}'; + """); + + await Parallel.ForEachAsync(dbNames, async (dbName, ct) => + { + await Parallel.ForEachAsync(tables.Tables[0].Rows.Cast(), async (row, ct) => + { + var tableName = row[0].ToString()!; + var result = (await DatabaseHelper.QueryTableAsync(connStr, + $"ANALYZE TABLE `{dbName}`.`{tableName}`;", ct)); + Console.WriteLine(string.Join('\t', result.Tables[0].Rows[0].ItemArray.Select(x => x.ToString()))); + }); + }); + } +} \ No newline at end of file diff --git a/Mesdb.Cli/Mesdb.Cli.csproj b/Mesdb.Cli/Mesdb.Cli.csproj new file mode 100644 index 0000000..60b884a --- /dev/null +++ b/Mesdb.Cli/Mesdb.Cli.csproj @@ -0,0 +1,22 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + + + + + + + diff --git a/MesETL.Tool/Program.cs b/Mesdb.Cli/Program.cs similarity index 54% rename from MesETL.Tool/Program.cs rename to Mesdb.Cli/Program.cs index 8f901c1..fd82bd5 100644 --- a/MesETL.Tool/Program.cs +++ b/Mesdb.Cli/Program.cs @@ -1,6 +1,6 @@ -using System.Collections.Concurrent; -using System.Data; +using Cocona; using MesETL.Shared.Helper; +using Mesdb.Cli; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; @@ -11,7 +11,10 @@ host.Configuration.AddCommandLine(args, new Dictionary { "--ConnectionString", "ConnectionString" }, { "-B", "Databases" }, { "--Databases", "Databases" }, - { "-a", "All" } + { "-a", "All" }, + { "-c", "Command"}, + { "--Command", "Command" }, + { "--Sql", "Command" } }); host.Build(); var connStr = host.Configuration.GetValue("ConnectionString") ?? throw new ApplicationException("没有配置数据库连接字符串"); @@ -20,7 +23,7 @@ var all = host.Configuration.GetValue("All"); if (args.Length > 1 && args[0] == "count") { - var result =await CountDatabasesAsync(connStr, databases); + var result = await BatchDbExtensions.CountDatabasesAsync(connStr, databases); if (all) { foreach (var (k, v) in result) @@ -43,27 +46,7 @@ if (args.Length > 1 && args[0] == "count") } } -async Task>> CountDatabasesAsync(string connStr, IList dbNames, CancellationToken cancellationToken = default) +if (args.Length > 1 && args[0] == "analyze") { - var result = new ConcurrentDictionary>(); - var tables = await DatabaseHelper.QueryTableAsync(connStr, - $""" - SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbNames[0]}'; - """); - - await Parallel.ForEachAsync(dbNames, async (dbName, ct) => - { - await Parallel.ForEachAsync(tables.Tables[0].Rows.Cast(), async (row, ct) => - { - var tableName = row[0].ToString()!; - var count = (long)(await DatabaseHelper.QueryScalarAsync(connStr, - $"SELECT COUNT(1) FROM `{dbName}`.`{tableName}`;", ct))!; - result.AddOrUpdate(dbName, new ConcurrentDictionary(), (db, dict) => - { - dict.AddOrUpdate(tableName, count, (table, num) => num + count); - return dict; - }); - }); - }); - return result; + await BatchDbExtensions.AnalyzeAllAsync(connStr, databases); } \ No newline at end of file diff --git a/Mesdb.Cli/Schema/DB.cs b/Mesdb.Cli/Schema/DB.cs new file mode 100644 index 0000000..a6cda37 --- /dev/null +++ b/Mesdb.Cli/Schema/DB.cs @@ -0,0 +1,44 @@ +using MesETL.Shared.Helper; +using MySqlConnector; +using Serilog; + +namespace Mesdb.Cli.Schema; + +public class DB +{ + public required string ConnectionString { get; init; } + public required IReadOnlyList Databases { get; init; } + + public static DB Create(string connStr, IEnumerable dbNames) + { + var databases = new List(); + foreach (var dbName in dbNames) + { + var dbConnStr = new MySqlConnectionStringBuilder(connStr) + { + Database = dbName + }.ConnectionString; + + try + { + _ = DatabaseHelper.NonQueryAsync(dbConnStr, "SHOW DATABASES;").Result; + databases.Add(new Database(dbName, dbConnStr)); + } + catch (Exception e) + { + Log.Logger.Fatal(e, "无法连接到数据库: {DbName} ", dbName); + throw; + } + } + + return new DB + { + ConnectionString = connStr, + Databases = databases + }; + } + + private DB() + { + } +} \ No newline at end of file diff --git a/Mesdb.Cli/Schema/Database.cs b/Mesdb.Cli/Schema/Database.cs new file mode 100644 index 0000000..c2cdb78 --- /dev/null +++ b/Mesdb.Cli/Schema/Database.cs @@ -0,0 +1,50 @@ +using System.Data; +using MesETL.Shared.Helper; +using MySqlConnector; + +namespace Mesdb.Cli.Schema; + +public class Database +{ + public static async Task FetchTableAsync(string dbName, string connStr) + { + var tables = await DatabaseHelper.QueryTableAsync(connStr, + $""" + SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbName}'; + """); + return tables.Tables[0].Rows.Cast().Select(row => new Table{Name = row[0].ToString()!}).ToArray(); + } + + + public string Name { get; } + public string ConnectionString { get; } + public IReadOnlyList Tables { get; } + + public Database(string name, string connStr) + { + var trueConnStr = new MySqlConnectionStringBuilder(connStr) + { + Database = name + }.ConnectionString; + + var tables = FetchTableAsync(name, trueConnStr).Result; + Name = name; + ConnectionString = trueConnStr; + Tables = tables; + } + + public Task ExecuteNonQueryAsync(string sql, CancellationToken cancellationToken = default) + { + return DatabaseHelper.NonQueryAsync(ConnectionString, sql, cancellationToken); + } + + public Task ExecuteQueryAsync(string sql, CancellationToken cancellationToken = default) + { + return DatabaseHelper.QueryTableAsync(ConnectionString, sql, cancellationToken); + } + + public Task ExecuteScalarAsync(string sql, CancellationToken cancellationToken = default) + { + return DatabaseHelper.QueryScalarAsync(ConnectionString, sql, cancellationToken); + } +} \ No newline at end of file diff --git a/Mesdb.Cli/Schema/Table.cs b/Mesdb.Cli/Schema/Table.cs new file mode 100644 index 0000000..aa549b7 --- /dev/null +++ b/Mesdb.Cli/Schema/Table.cs @@ -0,0 +1,8 @@ +namespace Mesdb.Cli.Schema; + +public class Table +{ + + public required string Name { get; init; } + +} \ No newline at end of file