整理项目结构

This commit is contained in:
陈梓阳 2024-11-15 14:10:25 +08:00
parent c6d97fdc86
commit 3dbfaffd05
9 changed files with 241 additions and 31 deletions

View File

@ -7,10 +7,6 @@
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\MesETL.Shared\MesETL.Shared.csproj" />
</ItemGroup>

55
MesETL.Clean/Program.cs Normal file
View File

@ -0,0 +1,55 @@
using MesETL.Shared.Helper;
var connStr = GetArg("-s") ?? throw new ApplicationException("未配置数据库连接字符串");
var eachLimit = int.Parse(GetArg("-l") ?? "1000");
var parallelTask = int.Parse(GetArg("-p") ?? "4");
var deletionCount = 0;
Console.WriteLine("Running Deletion...");
_ = Task.Run(async () =>
{
while (true)
{
await Task.Delay(5000);
Console.WriteLine($"[{DateTime.Now}] DELETE COUNT: {deletionCount}");
}
});
await Parallel.ForAsync(0, parallelTask, async (i, token) =>
{
while (true)
{
var effectRows = await DatabaseHelper.NonQueryAsync(connStr,
$"DELETE FROM `order_data_block` WHERE CompanyID = 0 ORDER BY ID LIMIT {eachLimit};", token);
if(effectRows == 0)
break;
Interlocked.Add(ref deletionCount, effectRows);
}
});
Console.WriteLine($"[{DateTime.Now}] DELETE COUNT: {deletionCount}");
return;
string? GetArg(string instruct)
{
var idx = Array.IndexOf(args, instruct);
if (idx == -1)
return null;
if (args[idx + 1].StartsWith('-'))
throw new ArgumentException("Argument Lost", nameof(instruct));
return args[idx + 1];
}
// var match = await DatabaseHelper.QueryTableAsync(connStr,
// $"SELECT `ID` FROM `order_data_block` WHERE CompanyID = 0 LIMIT {eachLimit};",
// token);
// var rows = match.Tables[0].Rows;
// if (rows.Count == 0)
// return;
//
// foreach (DataRow row in rows)
// {
// var id = row["ID"].ToString();
// await DatabaseHelper.NonQueryAsync(connStr, $"DELETE FROM `order_data_block` WHERE `ID` = {id}", token);
// }

View File

@ -4,7 +4,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.App", "MesETL.App\Me
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Test", "MesETL.Test\MesETL.Test.csproj", "{8679D5B6-5853-446E-9882-7B7A8E270500}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Tool", "MesETL.Tool\MesETL.Tool.csproj", "{68307B05-3D66-4322-A42F-C044C1E8BA3B}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Mesdb.Cli", "Mesdb.Cli\Mesdb.Cli.csproj", "{68307B05-3D66-4322-A42F-C044C1E8BA3B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MesETL.Shared", "MesETL.Shared\MesETL.Shared.csproj", "{FE134001-0E22-458B-BEF2-29712A29087E}"
EndProject

View File

@ -0,0 +1,52 @@
using System.Collections.Concurrent;
using System.Data;
using MesETL.Shared.Helper;
namespace Mesdb.Cli;
public static class BatchDbExtensions
{
public static async Task<IDictionary<string, IDictionary<string,long>>> CountDatabasesAsync(string connStr, IList<string> dbNames, CancellationToken cancellationToken = default)
{
var result = new ConcurrentDictionary<string, IDictionary<string,long>>();
var tables = await DatabaseHelper.QueryTableAsync(connStr,
$"""
SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbNames[0]}';
""");
await Parallel.ForEachAsync(dbNames, async (dbName, ct) =>
{
await Parallel.ForEachAsync(tables.Tables[0].Rows.Cast<DataRow>(), async (row, ct) =>
{
var tableName = row[0].ToString()!;
var count = (long)(await DatabaseHelper.QueryScalarAsync(connStr,
$"SELECT COUNT(1) FROM `{dbName}`.`{tableName}`;", ct))!;
result.AddOrUpdate(dbName, new ConcurrentDictionary<string, long>(), (db, dict) =>
{
dict.AddOrUpdate(tableName, count, (table, num) => num + count);
return dict;
});
});
});
return result;
}
public static async Task AnalyzeAllAsync(string connStr, IList<string> dbNames)
{
var tables = await DatabaseHelper.QueryTableAsync(connStr,
$"""
SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbNames[0]}';
""");
await Parallel.ForEachAsync(dbNames, async (dbName, ct) =>
{
await Parallel.ForEachAsync(tables.Tables[0].Rows.Cast<DataRow>(), async (row, ct) =>
{
var tableName = row[0].ToString()!;
var result = (await DatabaseHelper.QueryTableAsync(connStr,
$"ANALYZE TABLE `{dbName}`.`{tableName}`;", ct));
Console.WriteLine(string.Join('\t', result.Tables[0].Rows[0].ItemArray.Select(x => x.ToString())));
});
});
}
}

View File

@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Cocona" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Serilog" Version="4.0.0-dev-02108" />
<PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\MesETL.Shared\MesETL.Shared.csproj" />
</ItemGroup>
</Project>

View File

@ -1,6 +1,6 @@
using System.Collections.Concurrent;
using System.Data;
using Cocona;
using MesETL.Shared.Helper;
using Mesdb.Cli;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
@ -11,7 +11,10 @@ host.Configuration.AddCommandLine(args, new Dictionary<string, string>
{ "--ConnectionString", "ConnectionString" },
{ "-B", "Databases" },
{ "--Databases", "Databases" },
{ "-a", "All" }
{ "-a", "All" },
{ "-c", "Command"},
{ "--Command", "Command" },
{ "--Sql", "Command" }
});
host.Build();
var connStr = host.Configuration.GetValue<string>("ConnectionString") ?? throw new ApplicationException("没有配置数据库连接字符串");
@ -20,7 +23,7 @@ var all = host.Configuration.GetValue<bool>("All");
if (args.Length > 1 && args[0] == "count")
{
var result =await CountDatabasesAsync(connStr, databases);
var result = await BatchDbExtensions.CountDatabasesAsync(connStr, databases);
if (all)
{
foreach (var (k, v) in result)
@ -43,27 +46,7 @@ if (args.Length > 1 && args[0] == "count")
}
}
async Task<IDictionary<string, IDictionary<string,long>>> CountDatabasesAsync(string connStr, IList<string> dbNames, CancellationToken cancellationToken = default)
if (args.Length > 1 && args[0] == "analyze")
{
var result = new ConcurrentDictionary<string, IDictionary<string,long>>();
var tables = await DatabaseHelper.QueryTableAsync(connStr,
$"""
SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbNames[0]}';
""");
await Parallel.ForEachAsync(dbNames, async (dbName, ct) =>
{
await Parallel.ForEachAsync(tables.Tables[0].Rows.Cast<DataRow>(), async (row, ct) =>
{
var tableName = row[0].ToString()!;
var count = (long)(await DatabaseHelper.QueryScalarAsync(connStr,
$"SELECT COUNT(1) FROM `{dbName}`.`{tableName}`;", ct))!;
result.AddOrUpdate(dbName, new ConcurrentDictionary<string, long>(), (db, dict) =>
{
dict.AddOrUpdate(tableName, count, (table, num) => num + count);
return dict;
});
});
});
return result;
await BatchDbExtensions.AnalyzeAllAsync(connStr, databases);
}

44
Mesdb.Cli/Schema/DB.cs Normal file
View File

@ -0,0 +1,44 @@
using MesETL.Shared.Helper;
using MySqlConnector;
using Serilog;
namespace Mesdb.Cli.Schema;
public class DB
{
public required string ConnectionString { get; init; }
public required IReadOnlyList<Database> Databases { get; init; }
public static DB Create(string connStr, IEnumerable<string> dbNames)
{
var databases = new List<Database>();
foreach (var dbName in dbNames)
{
var dbConnStr = new MySqlConnectionStringBuilder(connStr)
{
Database = dbName
}.ConnectionString;
try
{
_ = DatabaseHelper.NonQueryAsync(dbConnStr, "SHOW DATABASES;").Result;
databases.Add(new Database(dbName, dbConnStr));
}
catch (Exception e)
{
Log.Logger.Fatal(e, "无法连接到数据库: {DbName} ", dbName);
throw;
}
}
return new DB
{
ConnectionString = connStr,
Databases = databases
};
}
private DB()
{
}
}

View File

@ -0,0 +1,50 @@
using System.Data;
using MesETL.Shared.Helper;
using MySqlConnector;
namespace Mesdb.Cli.Schema;
public class Database
{
public static async Task<Table[]> FetchTableAsync(string dbName, string connStr)
{
var tables = await DatabaseHelper.QueryTableAsync(connStr,
$"""
SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = '{dbName}';
""");
return tables.Tables[0].Rows.Cast<DataRow>().Select(row => new Table{Name = row[0].ToString()!}).ToArray();
}
public string Name { get; }
public string ConnectionString { get; }
public IReadOnlyList<Table> Tables { get; }
public Database(string name, string connStr)
{
var trueConnStr = new MySqlConnectionStringBuilder(connStr)
{
Database = name
}.ConnectionString;
var tables = FetchTableAsync(name, trueConnStr).Result;
Name = name;
ConnectionString = trueConnStr;
Tables = tables;
}
public Task ExecuteNonQueryAsync(string sql, CancellationToken cancellationToken = default)
{
return DatabaseHelper.NonQueryAsync(ConnectionString, sql, cancellationToken);
}
public Task<DataSet> ExecuteQueryAsync(string sql, CancellationToken cancellationToken = default)
{
return DatabaseHelper.QueryTableAsync(ConnectionString, sql, cancellationToken);
}
public Task<object?> ExecuteScalarAsync(string sql, CancellationToken cancellationToken = default)
{
return DatabaseHelper.QueryScalarAsync(ConnectionString, sql, cancellationToken);
}
}

View File

@ -0,0 +1,8 @@
namespace Mesdb.Cli.Schema;
public class Table
{
public required string Name { get; init; }
}