This commit is contained in:
2023-12-28 15:18:03 +08:00
commit 6b88f5bd40
20 changed files with 1544 additions and 0 deletions

View File

@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="MySqlConnector" Version="2.3.3" />
<PackageReference Include="Serilog" Version="3.1.2-dev-02097" />
<PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
<PackageReference Include="ServiceStack.Text" Version="8.0.0" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,76 @@
using System.Diagnostics;
using ConsoleApp2.Helpers;
using ConsoleApp2.Services;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ConsoleApp2;
public class CsvConversion : BackgroundService
{
private readonly ILogger _logger;
private readonly IOptions<CsvOptions> _csvOptions;
private readonly DataTransformService _transform;
private readonly TaskManager _taskManager;
public CsvConversion(ILogger<CsvConversion> logger,
IOptions<CsvOptions> csvOptions,
DataTransformService transform,
TaskManager taskManager)
{
_logger = logger;
_csvOptions = csvOptions;
_transform = transform;
_taskManager = taskManager;
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
var sw = Stopwatch.StartNew();
var inputDir = _csvOptions.Value.InputDir;
_logger.LogInformation("Working dir: {InputDir}", inputDir);
var files = Directory.GetFiles(inputDir).Where(s => s.EndsWith(".sql") && !s.Contains("schema")).ToArray();
if (files.Length == 0)
{
_logger.LogInformation("No sql files found in {InputDir}", inputDir);
return;
}
foreach(var sqlPath in files)
{
_logger.LogInformation("Working sql file: {SqlPath}", sqlPath);
var headers = await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(sqlPath);
var csvFiles = await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(sqlPath);
var queue = new DataRecordQueue();
foreach (var csvFile in csvFiles)
{
var csvPath = Path.Combine(inputDir, csvFile);
var source = new JsvSource(csvPath, headers, _logger);
while (await source.ReadAsync())
{
queue.Enqueue(source.Current);
}
if (queue.Count > 200)
{
var queue1 = queue;
await _taskManager.CreateTask(async () => await _transform.ExecuteAsync(queue1, cancellationToken));
queue = new DataRecordQueue();
}
if (cancellationToken.IsCancellationRequested)
return;
}
_logger.LogInformation("File '{File}' input completed", Path.GetFileName(sqlPath));
}
_logger.LogInformation("***** Csv input service completed *****");
_logger.LogInformation("Elapsed: {Elapsed}", sw.Elapsed);
_taskManager.MainTaskCompleted = true;
}
}

View File

@@ -0,0 +1,54 @@
namespace ConsoleApp2.Entities;
public class DataRecord
{
public static bool TryGetField(DataRecord record, string columnName, out string value)
{
value = string.Empty;
if (record.Headers is null)
throw new InvalidOperationException("Cannot get field when headers of a record have not been set.");
var idx = Array.IndexOf(record.Headers, columnName);
if (idx == -1)
return false;
value = record.Fields[idx];
return true;
}
public static string GetField(DataRecord record, string columnName)
{
if (record.Headers is null)
throw new InvalidOperationException("Headers have not been set.");
var idx = Array.IndexOf(record.Headers, columnName);
if (idx is -1)
throw new IndexOutOfRangeException("Column name not found in this record.");
return record.Fields[idx];
}
public string[] Fields { get; }
public string[]? Headers { get; }
public string TableName { get; }
public DataRecord(string[] fields, string tableName, string[]? headers = null)
{
if (headers is not null && fields.Length != headers.Length)
throw new ArgumentException(
$"The number of fields does not match the number of headers. Expected: {fields.Length} Got: {headers.Length}",
nameof(fields));
Fields = fields;
TableName = tableName;
Headers = headers;
}
public string this[int index] => Fields[index];
public string this[string columnName] => GetField(this, columnName);
public int Count => Fields.Length;
public bool TryGetField(string columnName, out string value) => TryGetField(this, columnName, out value);
}

View File

@@ -0,0 +1,91 @@
using System.Text.RegularExpressions;
namespace ConsoleApp2.Helpers;
public static partial class DumpDataHelper
{
[GeneratedRegex(@"'.+\.dat'")]
private static partial Regex MatchDatFile();
[GeneratedRegex(@"\([^)]*\)")]
private static partial Regex MatchBrackets();
public static async Task<string[]> GetCsvHeadersFromSqlFileAsync(string filePath)
{
var txt = await File.ReadAllTextAsync(filePath);
var match = MatchBrackets().Match(txt);
return ParseHeader(match.ValueSpan);
}
private static string[] ParseHeader(ReadOnlySpan<char> headerStr)
{
headerStr = headerStr[1..^1];
Span<Range> ranges = stackalloc Range[50];
var count = headerStr.Split(ranges, ',');
var arr = new string[count];
for (var i = 0; i < count; i++)
{
arr[i] = headerStr[ranges[i]].Trim("@`").ToString(); // 消除列名的反引号,如果是变量则消除@
}
return arr;
}
public static string GetTableName(ReadOnlySpan<char> filePath)
{
filePath = filePath[(filePath.LastIndexOf('\\') + 1)..];
var firstDotIdx = -1;
var secondDotIdx = -1;
var times = 0;
for (var i = 0; i < filePath.Length; i++)
{
if (filePath[i] == '.')
{
++times;
if(times == 1)
firstDotIdx = i;
if (times == 2)
{
secondDotIdx = i;
break;
}
}
}
return filePath[(firstDotIdx+1)..secondDotIdx].ToString();
}
public static async Task<string[]> GetCsvFileNamesFromSqlFileAsync(string filePath)
{
var txt = await File.ReadAllTextAsync(filePath);
var matches = MatchDatFile().Matches(txt);
return matches.Select(match => match.ValueSpan[1..^1].ToString()).ToArray();
}
public static bool CheckHexField(string? str)
{
if (string.IsNullOrWhiteSpace(str))
return false;
if (str.StartsWith('\"'))
return false;
var isDigit = true;
foreach (var c in str)
{
if (!char.IsAsciiHexDigit(c))
return false;
if (!char.IsNumber(c))
isDigit = false;
}
if (isDigit)
return false;
return true;
}
}

View File

@@ -0,0 +1,30 @@
namespace ConsoleApp2.Helpers;
public static class DictionaryExtensions
{
/// <summary>
/// 根据指定的键是否存在来添加或是更新字典
/// </summary>
/// <param name="this"></param>
/// <param name="key">指定的键</param>
/// <param name="addValue">如果指定的键不存在,则向字典添加该值</param>
/// <param name="updateFactory">如果指定的键存在,则根据该委托的返回值修改字典中对应的值</param>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <returns>添加或是修改后的值</returns>
public static TValue AddOrUpdate<TKey, TValue>(this IDictionary<TKey, TValue> @this, TKey key, TValue addValue,
Func<TKey, TValue, TValue> updateFactory)
{
if (!@this.TryGetValue(key, out var value))
{
@this.Add(key, addValue);
}
else
{
@this[key] = updateFactory(key, value);
}
return @this[key];
}
}

View File

@@ -0,0 +1,47 @@
using System.Globalization;
using System.Text;
namespace ConsoleApp2.Helpers;
public static class StringExtensions
{
public static string Omit(this ReadOnlySpan<char> @this, int maxLength)
{
if (@this.Length > maxLength)
return @this[..maxLength].ToString() + "...";
return @this.ToString();
}
public static string Omit(this string @this, int maxLength) => Omit(@this.AsSpan(), maxLength);
public static string FromHex(ReadOnlySpan<char> hexString, Encoding? encoding = null)
{
encoding ??= Encoding.UTF8;
var realLength = 0;
for (var i = hexString.Length - 2; i >= 0; i -= 2)
{
var b = byte.Parse(hexString.Slice(i, 2), NumberStyles.HexNumber, CultureInfo.InvariantCulture);
if (b != 0) //not NULL character
{
realLength = i + 2;
break;
}
}
var bytes = new byte[realLength / 2];
for (var i = 0; i < bytes.Length; i++)
{
bytes[i] = byte.Parse(hexString.Slice(i * 2, 2), NumberStyles.HexNumber, CultureInfo.InvariantCulture);
}
return encoding.GetString(bytes);
}
public static bool CheckJsonHex(ReadOnlySpan<char> hexStr)
{
if (hexStr.Length < 2)
return false;
return FromHex(hexStr[..2]) is ['{'] or ['['];
}
}

View File

@@ -0,0 +1,249 @@
using System.Security.Cryptography;
using System.Text;
namespace ConsoleApp2.Helpers;
public static class HashExtensions
{
/// <summary>
/// 计算32位MD5码
/// </summary>
/// <param name="word">字符串</param>
/// <param name="toUpper">返回哈希值格式 true英文大写false英文小写</param>
/// <returns></returns>
public static string ToMd5Hash(this string word, bool toUpper = true)
{
try
{
var MD5CSP = MD5.Create();
var bytValue = Encoding.UTF8.GetBytes(word);
var bytHash = MD5CSP.ComputeHash(bytValue);
MD5CSP.Clear();
//根据计算得到的Hash码翻译为MD5码
var sHash = "";
foreach (var t in bytHash)
{
long i = t / 16;
var sTemp = i > 9 ? ((char)(i - 10 + 0x41)).ToString() : ((char)(i + 0x30)).ToString();
i = t % 16;
if (i > 9)
{
sTemp += ((char)(i - 10 + 0x41)).ToString();
}
else
{
sTemp += ((char)(i + 0x30)).ToString();
}
sHash += sTemp;
}
//根据大小写规则决定返回的字符串
return toUpper ? sHash : sHash.ToLower();
}
catch (System.Exception ex)
{
throw new System.Exception(ex.Message);
}
}
public static string ToMd5Hash(this Stream stream, bool toUpper = true)
{
using var md5Hash = MD5.Create();
var bytes = md5Hash.ComputeHash(stream);
return ToHashString(bytes, toUpper);
}
/// <summary>
/// 计算SHA-1码
/// </summary>
/// <param name="word">字符串</param>
/// <param name="toUpper">返回哈希值格式 true英文大写false英文小写</param>
/// <returns></returns>
public static string ToSHA1Hash(this string word, bool toUpper = true)
{
try
{
var SHA1CSP = SHA1.Create();
var bytValue = Encoding.UTF8.GetBytes(word);
var bytHash = SHA1CSP.ComputeHash(bytValue);
SHA1CSP.Clear();
//根据计算得到的Hash码翻译为SHA-1码
var sHash = "";
foreach (var t in bytHash)
{
long i = t / 16;
var sTemp = i > 9 ? ((char)(i - 10 + 0x41)).ToString() : ((char)(i + 0x30)).ToString();
i = t % 16;
if (i > 9)
{
sTemp += ((char)(i - 10 + 0x41)).ToString();
}
else
{
sTemp += ((char)(i + 0x30)).ToString();
}
sHash += sTemp;
}
//根据大小写规则决定返回的字符串
return toUpper ? sHash : sHash.ToLower();
}
catch (System.Exception ex)
{
throw new System.Exception(ex.Message);
}
}
/// <summary>
/// 计算SHA-256码
/// </summary>
/// <param name="word">字符串</param>
/// <param name="toUpper">返回哈希值格式 true英文大写false英文小写</param>
/// <returns></returns>
public static string ToSHA256Hash(this string word, bool toUpper = true)
{
try
{
var SHA256CSP = SHA256.Create();
var bytValue = Encoding.UTF8.GetBytes(word);
var bytHash = SHA256CSP.ComputeHash(bytValue);
SHA256CSP.Clear();
//根据计算得到的Hash码翻译为SHA-1码
var sHash = "";
foreach (var t in bytHash)
{
long i = t / 16;
var sTemp = i > 9 ? ((char)(i - 10 + 0x41)).ToString() : ((char)(i + 0x30)).ToString();
i = t % 16;
if (i > 9)
{
sTemp += ((char)(i - 10 + 0x41)).ToString();
}
else
{
sTemp += ((char)(i + 0x30)).ToString();
}
sHash += sTemp;
}
//根据大小写规则决定返回的字符串
return toUpper ? sHash : sHash.ToLower();
}
catch (System.Exception ex)
{
throw new System.Exception(ex.Message);
}
}
/// <summary>
/// 计算SHA-256码
/// </summary>
/// <param name="stream"></param>
/// <param name="toUpper"></param>
/// <returns></returns>
public static string ToSHA256Hash(this Stream stream, bool toUpper = true)
{
using var sha256Hash = SHA256.Create();
var bytes = sha256Hash.ComputeHash(stream);
return ToHashString(bytes, toUpper);
}
/// <summary>
/// 计算SHA-384码
/// </summary>
/// <param name="word">字符串</param>
/// <param name="toUpper">返回哈希值格式 true英文大写false英文小写</param>
/// <returns></returns>
public static string ToSHA384Hash(this string word, bool toUpper = true)
{
try
{
var SHA384CSP = SHA384.Create();
var bytValue = Encoding.UTF8.GetBytes(word);
var bytHash = SHA384CSP.ComputeHash(bytValue);
SHA384CSP.Clear();
//根据计算得到的Hash码翻译为SHA-1码
var sHash = "";
foreach (var t in bytHash)
{
long i = t / 16;
var sTemp = i > 9 ? ((char)(i - 10 + 0x41)).ToString() : ((char)(i + 0x30)).ToString();
i = t % 16;
if (i > 9)
{
sTemp += ((char)(i - 10 + 0x41)).ToString();
}
else
{
sTemp += ((char)(i + 0x30)).ToString();
}
sHash += sTemp;
}
//根据大小写规则决定返回的字符串
return toUpper ? sHash : sHash.ToLower();
}
catch (System.Exception ex)
{
throw new System.Exception(ex.Message);
}
}
/// <summary>
/// 计算SHA-512码
/// </summary>
/// <param name="word">字符串</param>
/// <param name="toUpper">返回哈希值格式 true英文大写false英文小写</param>
/// <returns></returns>
public static string ToSHA512Hash(this string word, bool toUpper = true)
{
try
{
var SHA512CSP = SHA512.Create();
var bytValue = Encoding.UTF8.GetBytes(word);
var bytHash = SHA512CSP.ComputeHash(bytValue);
SHA512CSP.Clear();
//根据计算得到的Hash码翻译为SHA-1码
var sHash = "";
foreach (var t in bytHash)
{
long i = t / 16;
var sTemp = i > 9 ? ((char)(i - 10 + 0x41)).ToString() : ((char)(i + 0x30)).ToString();
i = t % 16;
if (i > 9)
{
sTemp += ((char)(i - 10 + 0x41)).ToString();
}
else
{
sTemp += ((char)(i + 0x30)).ToString();
}
sHash += sTemp;
}
//根据大小写规则决定返回的字符串
return toUpper ? sHash : sHash.ToLower();
}
catch (System.Exception ex)
{
throw new System.Exception(ex.Message);
}
}
private static string ToHashString(byte[] bytes, bool toUpper = true)
{
var builder = new StringBuilder();
foreach (var t in bytes)
{
builder.Append(t.ToString("x2"));
}
var str = builder.ToString();
return toUpper ? str.ToUpper() : str.ToLower();
}
}

48
ConsoleApp2/JsvSource.cs Normal file
View File

@@ -0,0 +1,48 @@
using ConsoleApp2.Entities;
using ConsoleApp2.Helpers;
using Microsoft.Extensions.Logging;
using ServiceStack.Text;
namespace ConsoleApp2;
public class JsvSource : IDisposable
{
private readonly string _filePath;
private readonly JsvStringSerializer _jsv;
private readonly StreamReader _reader;
// ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable
private readonly ILogger? _logger;
private readonly string _tableName;
public DataRecord Current { get; protected set; } = null!;
public string[]? Headers { get; }
public bool EndOfSource => _reader.EndOfStream;
public JsvSource(string filePath, string[]? headers = null, ILogger? logger = null)
{
_filePath = filePath;
_jsv = new JsvStringSerializer();
_reader = new StreamReader(filePath);
Headers = headers;
_logger = logger;
// _logger?.LogInformation("Reading file: {FilePath}", filePath);
_tableName = DumpDataHelper.GetTableName(filePath);
}
public async ValueTask<bool> ReadAsync()
{
var str = await _reader.ReadLineAsync();
if (string.IsNullOrEmpty(str))
return false;
var fields = _jsv.DeserializeFromString<string[]>(str);
if(Headers is not null && Headers.Length != fields.Length)
throw new InvalidDataException("解析的字段数与指定的列数不匹配");
Current = new DataRecord(fields, _tableName, Headers);
return true;
}
public void Dispose()
{
_reader.Dispose();
}
}

View File

@@ -0,0 +1,128 @@
using System.Text;
using ConsoleApp2.Entities;
using ConsoleApp2.Helpers;
using Microsoft.Extensions.Logging;
using MySqlConnector;
namespace ConsoleApp2;
public class MySqlDestination : IDisposable, IAsyncDisposable
{
private readonly Dictionary<string, List<DataRecord>> _recordCache;
private readonly MySqlConnection _conn;
private readonly ILogger _logger;
public static int AddCount;
public MySqlDestination(string connStr, ILogger logger)
{
_conn = new MySqlConnection(connStr);
_conn.Open();
_recordCache = new Dictionary<string, List<DataRecord>>();
_logger = logger;
}
public Task WriteRecordAsync(DataRecord record)
{
_recordCache.AddOrUpdate(record.TableName, [record], (key, value) =>
{
value.Add(record);
Interlocked.Increment(ref AddCount);
return value;
});
return Task.CompletedTask;
}
public async Task WriteRecordsAsync(IEnumerable<DataRecord> records)
{
foreach (var record in records)
{
await WriteRecordAsync(record);
}
}
public async Task FlushAsync()
{
if (_recordCache.Count == 0)
return;
try
{
var cmd = _conn.CreateCommand();
var sb = new StringBuilder();
var count = 0;
foreach (var (tableName, records) in _recordCache)
{
if (records.Count == 0)
continue;
sb.Append($"INSERT INTO `{tableName}`(");
for (var i = 0; i < records[0].Headers.Length; i++)
{
var header = records[0].Headers[i];
sb.Append($"`{header}`");
if (i != records[0].Headers.Length - 1)
sb.Append(',');
}
sb.AppendLine(") VALUES");
for (var i = 0; i < records.Count; i++)
{
count++;
var record = records[i];
sb.Append('(');
for (var j = 0; j < record.Fields.Length; j++)
{
var field = record.Fields[j];
#region HandleFields
if (field == "\\N")
sb.Append("NULL");
else if (DumpDataHelper.CheckHexField(field)) // TODO: 性能消耗
{
if (StringExtensions.CheckJsonHex(field))
sb.Append($"UNHEX(\"{field}\")");
else
sb.Append($"\"{field}\"");
}
else
sb.Append($"\"{field}\"");
#endregion
if (j != record.Fields.Length - 1)
sb.Append(',');
}
sb.Append(')');
if (i != records.Count - 1) // not last field
sb.AppendLine(",");
}
sb.AppendLine(";\n");
}
cmd.CommandText = sb.ToString();
await cmd.ExecuteNonQueryAsync();
_recordCache.Clear();
}
catch (Exception e)
{
_logger.LogCritical(e, "Error when flushing records");
throw;
}
}
public void Dispose()
{
_conn.Dispose();
}
public async ValueTask DisposeAsync()
{
await _conn.DisposeAsync();
}
}

View File

@@ -0,0 +1,28 @@
namespace ConsoleApp2;
public class CsvOptions
{
/// <summary>
/// The directory to input csv and sql file.
/// </summary>
public string InputDir { get; set; } = "./";
/// <summary>
/// The output directory.
/// </summary>
public string OutputDir { get; set; } = "./Output";
/// <summary>
/// The ASCII char that fields are enclosed by. Default is '"'.
/// </summary>
public char QuoteChar { get; set; } = '"';
/// <summary>
/// The ASCII char that fields are separated by. Default is ','.
/// </summary>
public char DelimiterChar { get; set; } = ',';
/// <summary>
/// The max number of threads to use.
/// </summary>
public int MaxThreads { get; set; } = 12;
}

View File

@@ -0,0 +1,9 @@
using ConsoleApp2.Entities;
using ConsoleApp2.Options;
namespace ConsoleApp2;
public class DataTransformOptions
{
public Func<DataRecord, DatabaseOptions> DatabaseFilter { get; set; }
}

View File

@@ -0,0 +1,3 @@
namespace ConsoleApp2.Options;
public record DatabaseOptions(string Host, uint Port, string Database, string User, string Password);

37
ConsoleApp2/Program.cs Normal file
View File

@@ -0,0 +1,37 @@
using ConsoleApp2;
using ConsoleApp2.Options;
using ConsoleApp2.Services;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Serilog;
ThreadPool.SetMaxThreads(200, 200);
var host = Host.CreateApplicationBuilder();
host.Configuration.AddCommandLine(args);
host.Services.Configure<CsvOptions>(option =>
{
option.DelimiterChar = ',';
option.QuoteChar = '"';
option.InputDir = "D:/Dump/MyDumper-Csv";
option.OutputDir = "D:/DumpOutput";
option.MaxThreads = 12;
});
host.Services.Configure<DataTransformOptions>(options =>
{
var dbOption = new DatabaseOptions("localhost", 33306, "cferp_test_1", "root", "123456");
options.DatabaseFilter = record => dbOption;
});
host.Services.AddLogging(builder =>
{
builder.ClearProviders();
builder.AddSerilog(new LoggerConfiguration().WriteTo.Console().CreateLogger());
});
host.Services.AddHostedService<CsvConversion>();
host.Services.AddHostedService<TaskMonitorService>();
host.Services.AddSingleton<TaskManager>();
host.Services.AddSingleton<DatabaseOutputService>();
host.Services.AddSingleton<DataTransformService>();
var app = host.Build();
await app.RunAsync();

View File

@@ -0,0 +1,60 @@
using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using ConsoleApp2.Entities;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services;
public class DataRecordQueue
{
/// <summary>
/// Indicate that the queue is completed adding.
/// </summary>
public bool IsCompletedAdding { get; private set; }
/// <summary>
/// Remark that the queue is completed for adding and empty;
/// </summary>
public bool IsCompleted => IsCompletedAdding && _queue.IsEmpty;
private readonly ConcurrentQueue<DataRecord> _queue;
public DataRecordQueue()
{
_queue = new ConcurrentQueue<DataRecord>();
}
public DataRecordQueue(IEnumerable<DataRecord> records)
{
_queue = new ConcurrentQueue<DataRecord>(records);
}
/// <inheritdoc cref="ConcurrentQueue{T}.Enqueue"/>
public void Enqueue(DataRecord item)
{
_queue.Enqueue(item);
}
/// <inheritdoc cref="ConcurrentQueue{T}.TryDequeue"/>
public bool TryDequeue([MaybeNullWhen(false)] out DataRecord result)
{
return _queue.TryDequeue(out result);
}
/// <inheritdoc cref="ConcurrentQueue{T}.TryPeek"/>
public bool TryPeek([MaybeNullWhen(false)] out DataRecord result)
{
return _queue.TryPeek(out result);
}
/// <inheritdoc cref="ConcurrentQueue{T}.Count"/>
public int Count => _queue.Count;
/// <inheritdoc cref="ConcurrentQueue{T}.IsEmpty"/>
public bool IsEmpty => _queue.IsEmpty;
/// <summary>
/// Indicate that the queue is completed adding.
/// </summary>
public void CompleteAdding() => IsCompletedAdding = true;
}

View File

@@ -0,0 +1,46 @@
using ConsoleApp2.Entities;
using ConsoleApp2.Helpers;
using ConsoleApp2.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ConsoleApp2.Services;
public class DataTransformService
{
private readonly ILogger _logger;
private readonly TaskManager _taskManager;
private readonly DatabaseOutputService _output;
private readonly IOptions<DataTransformOptions> _options;
public DataTransformService(ILogger<DataTransformService> logger, TaskManager taskManager, DatabaseOutputService output, IOptions<DataTransformOptions> options)
{
_logger = logger;
_taskManager = taskManager;
_output = output;
_options = options;
}
public async Task ExecuteAsync(DataRecordQueue records, CancellationToken cancellationToken = default)
{
_logger.LogInformation("Start transforming data.");
var map = new Dictionary<DatabaseOptions, DataRecordQueue>();
while (records.TryDequeue(out var record))
{
var dbOptions = _options.Value.DatabaseFilter(record);
map.AddOrUpdate(dbOptions, new DataRecordQueue([record]), (options, queue) =>
{
queue.Enqueue(record);
return queue;
});
}
foreach (var (dbOptions, queue) in map)
{
await _taskManager.CreateTask(async () =>
{
await _output.ExecuteAsync(queue, dbOptions, cancellationToken);
});
}
}
}

View File

@@ -0,0 +1,39 @@
using ConsoleApp2.Entities;
using ConsoleApp2.Options;
using Microsoft.Extensions.Logging;
using MySqlConnector;
namespace ConsoleApp2.Services;
public class DatabaseOutputService
{
private readonly ILogger _logger;
public DatabaseOutputService(ILogger<DatabaseOutputService> logger)
{
_logger = logger;
}
public async Task ExecuteAsync(DataRecordQueue records, DatabaseOptions options, CancellationToken stoppingToken = default)
{
var count = records.Count;
var output = new MySqlDestination(new MySqlConnectionStringBuilder()
{
Server = options.Host,
Port = options.Port,
Database = options.Database,
UserID = options.User,
Password = options.Password,
ConnectionTimeout = 120,
}.ConnectionString, _logger); // TODO: 加入DI
while (records.TryDequeue(out var record) && !stoppingToken.IsCancellationRequested)
{
await output.WriteRecordAsync(record);
}
await output.FlushAsync();
_logger.LogInformation("Flush {Count} records to database.", count);
}
}

View File

@@ -0,0 +1,28 @@
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services;
public class TaskManager
{
private readonly ConcurrentBag<Task> _tasks;
private readonly ILogger _logger;
public int RunningTaskCount => _tasks.Count(task => !task.IsCompleted);
public IReadOnlyCollection<Task> Tasks => _tasks;
public bool MainTaskCompleted { get; set; }
public TaskManager(ILogger<TaskManager> logger)
{
_tasks = new ConcurrentBag<Task>();
_logger = logger;
}
public Task<TResult> CreateTask<TResult>(Func<TResult> func)
{
var task = Task.Factory.StartNew(func);
_tasks.Add(task);
_logger.LogInformation("New task created.");
return task;
}
}

View File

@@ -0,0 +1,57 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services;
public class TaskMonitorService : BackgroundService
{
private readonly IHostApplicationLifetime _lifetime;
private readonly TaskManager _taskManager;
private readonly ILogger<TaskMonitorService> _logger;
public TaskMonitorService(IHostApplicationLifetime lifetime, TaskManager taskManager,
ILogger<TaskMonitorService> logger)
{
_lifetime = lifetime;
_taskManager = taskManager;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!_taskManager.MainTaskCompleted || _taskManager.RunningTaskCount != 0)
{
var running = 0;
var error = 0;
var completed = 0;
var canceled = 0;
foreach (var task in _taskManager.Tasks)
{
switch (task.Status)
{
case TaskStatus.Running:
running++;
break;
case TaskStatus.Canceled:
canceled++;
break;
case TaskStatus.Faulted:
error++;
break;
case TaskStatus.RanToCompletion:
completed++;
break;
default:
throw new ArgumentOutOfRangeException();
}
}
_logger.LogInformation(
"Task monitor: running: {Running}, error: {Error}, completed: {Completed}, canceled: {Canceled}",
running, error, completed, canceled);
await Task.Delay(2000);
}
_logger.LogInformation("***** All tasks completed *****");
}
}