From 6b88f5bd40fb80329714204828f80a4932dad192 Mon Sep 17 00:00:00 2001
From: CZY <2817212736@qq.com>
Date: Thu, 28 Dec 2023 15:18:03 +0800
Subject: [PATCH] Init
---
.gitignore | 478 ++++++++++++++++++
ConsoleApp2.sln | 16 +
ConsoleApp2/ConsoleApp2.csproj | 20 +
ConsoleApp2/CsvConversion.cs | 76 +++
ConsoleApp2/Entities/DataRecord.cs | 54 ++
ConsoleApp2/Helpers/DumpDataHelper.cs | 91 ++++
ConsoleApp2/Helpers/Extensions.Dictionary.cs | 30 ++
ConsoleApp2/Helpers/Extensions.String.cs | 47 ++
ConsoleApp2/Helpers/HashExtension.cs | 249 +++++++++
ConsoleApp2/JsvSource.cs | 48 ++
ConsoleApp2/MySqlDestination.cs | 128 +++++
ConsoleApp2/Options/CsvOptions.cs | 28 +
ConsoleApp2/Options/DataTransformOptions.cs | 9 +
ConsoleApp2/Options/DatabaseOptions.cs | 3 +
ConsoleApp2/Program.cs | 37 ++
ConsoleApp2/Services/DataRecordQueue.cs | 60 +++
ConsoleApp2/Services/DataTransformService.cs | 46 ++
ConsoleApp2/Services/DatabaseOutputService.cs | 39 ++
ConsoleApp2/Services/TaskManager.cs | 28 +
ConsoleApp2/Services/TaskMonitorService.cs | 57 +++
20 files changed, 1544 insertions(+)
create mode 100644 .gitignore
create mode 100644 ConsoleApp2.sln
create mode 100644 ConsoleApp2/ConsoleApp2.csproj
create mode 100644 ConsoleApp2/CsvConversion.cs
create mode 100644 ConsoleApp2/Entities/DataRecord.cs
create mode 100644 ConsoleApp2/Helpers/DumpDataHelper.cs
create mode 100644 ConsoleApp2/Helpers/Extensions.Dictionary.cs
create mode 100644 ConsoleApp2/Helpers/Extensions.String.cs
create mode 100644 ConsoleApp2/Helpers/HashExtension.cs
create mode 100644 ConsoleApp2/JsvSource.cs
create mode 100644 ConsoleApp2/MySqlDestination.cs
create mode 100644 ConsoleApp2/Options/CsvOptions.cs
create mode 100644 ConsoleApp2/Options/DataTransformOptions.cs
create mode 100644 ConsoleApp2/Options/DatabaseOptions.cs
create mode 100644 ConsoleApp2/Program.cs
create mode 100644 ConsoleApp2/Services/DataRecordQueue.cs
create mode 100644 ConsoleApp2/Services/DataTransformService.cs
create mode 100644 ConsoleApp2/Services/DatabaseOutputService.cs
create mode 100644 ConsoleApp2/Services/TaskManager.cs
create mode 100644 ConsoleApp2/Services/TaskMonitorService.cs
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..b70927a
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,478 @@
+# ---> JetBrains
+# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
+# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
+.idea/
+# User-specific stuff
+.idea/**/workspace.xml
+.idea/**/tasks.xml
+.idea/**/usage.statistics.xml
+.idea/**/dictionaries
+.idea/**/shelf
+
+# AWS User-specific
+.idea/**/aws.xml
+
+# Generated files
+.idea/**/contentModel.xml
+
+# Sensitive or high-churn files
+.idea/**/dataSources/
+.idea/**/dataSources.ids
+.idea/**/dataSources.local.xml
+.idea/**/sqlDataSources.xml
+.idea/**/dynamic.xml
+.idea/**/uiDesigner.xml
+.idea/**/dbnavigator.xml
+
+# Gradle
+.idea/**/gradle.xml
+.idea/**/libraries
+
+# Gradle and Maven with auto-import
+# When using Gradle or Maven with auto-import, you should exclude module files,
+# since they will be recreated, and may cause churn. Uncomment if using
+# auto-import.
+# .idea/artifacts
+# .idea/compiler.xml
+# .idea/jarRepositories.xml
+# .idea/modules.xml
+# .idea/*.iml
+# .idea/modules
+# *.iml
+# *.ipr
+
+# CMake
+cmake-build-*/
+
+# Mongo Explorer plugin
+.idea/**/mongoSettings.xml
+
+# File-based project format
+*.iws
+
+# IntelliJ
+out/
+
+# mpeltonen/sbt-idea plugin
+.idea_modules/
+
+# JIRA plugin
+atlassian-ide-plugin.xml
+
+# Cursive Clojure plugin
+.idea/replstate.xml
+
+# SonarLint plugin
+.idea/sonarlint/
+
+# Crashlytics plugin (for Android Studio and IntelliJ)
+com_crashlytics_export_strings.xml
+crashlytics.properties
+crashlytics-build.properties
+fabric.properties
+
+# Editor-based Rest Client
+.idea/httpRequests
+
+# Android studio 3.1+ serialized cache file
+.idea/caches/build_file_checksums.ser
+
+# ---> VisualStudio
+## Ignore Visual Studio temporary files, build results, and
+## files generated by popular Visual Studio add-ons.
+##
+## Get latest from https://github.com/github/gitignore/blob/main/VisualStudio.gitignore
+
+# User-specific files
+*.rsuser
+*.suo
+*.user
+*.userosscache
+*.sln.docstates
+
+# User-specific files (MonoDevelop/Xamarin Studio)
+*.userprefs
+
+# Mono auto generated files
+mono_crash.*
+
+# Build results
+[Dd]ebug/
+[Dd]ebugPublic/
+[Rr]elease/
+[Rr]eleases/
+x64/
+x86/
+[Ww][Ii][Nn]32/
+[Aa][Rr][Mm]/
+[Aa][Rr][Mm]64/
+bld/
+[Bb]in/
+[Oo]bj/
+[Ll]og/
+[Ll]ogs/
+
+# Visual Studio 2015/2017 cache/options directory
+.vs/
+# Uncomment if you have tasks that create the project's static files in wwwroot
+#wwwroot/
+
+# Visual Studio 2017 auto generated files
+Generated\ Files/
+
+# MSTest test Results
+[Tt]est[Rr]esult*/
+[Bb]uild[Ll]og.*
+
+# NUnit
+*.VisualState.xml
+TestResult.xml
+nunit-*.xml
+
+# Build Results of an ATL Project
+[Dd]ebugPS/
+[Rr]eleasePS/
+dlldata.c
+
+# Benchmark Results
+BenchmarkDotNet.Artifacts/
+
+# .NET Core
+project.lock.json
+project.fragment.lock.json
+artifacts/
+
+# ASP.NET Scaffolding
+ScaffoldingReadMe.txt
+
+# StyleCop
+StyleCopReport.xml
+
+# Files built by Visual Studio
+*_i.c
+*_p.c
+*_h.h
+*.ilk
+*.meta
+*.obj
+*.iobj
+*.pch
+*.pdb
+*.ipdb
+*.pgc
+*.pgd
+*.rsp
+*.sbr
+*.tlb
+*.tli
+*.tlh
+*.tmp
+*.tmp_proj
+*_wpftmp.csproj
+*.log
+*.tlog
+*.vspscc
+*.vssscc
+.builds
+*.pidb
+*.svclog
+*.scc
+
+# Chutzpah Test files
+_Chutzpah*
+
+# Visual C++ cache files
+ipch/
+*.aps
+*.ncb
+*.opendb
+*.opensdf
+*.sdf
+*.cachefile
+*.VC.db
+*.VC.VC.opendb
+
+# Visual Studio profiler
+*.psess
+*.vsp
+*.vspx
+*.sap
+
+# Visual Studio Trace Files
+*.e2e
+
+# TFS 2012 Local Workspace
+$tf/
+
+# Guidance Automation Toolkit
+*.gpState
+
+# ReSharper is a .NET coding add-in
+_ReSharper*/
+*.[Rr]e[Ss]harper
+*.DotSettings.user
+
+# TeamCity is a build add-in
+_TeamCity*
+
+# DotCover is a Code Coverage Tool
+*.dotCover
+
+# AxoCover is a Code Coverage Tool
+.axoCover/*
+!.axoCover/settings.json
+
+# Coverlet is a free, cross platform Code Coverage Tool
+coverage*.json
+coverage*.xml
+coverage*.info
+
+# Visual Studio code coverage results
+*.coverage
+*.coveragexml
+
+# NCrunch
+_NCrunch_*
+.*crunch*.local.xml
+nCrunchTemp_*
+
+# MightyMoose
+*.mm.*
+AutoTest.Net/
+
+# Web workbench (sass)
+.sass-cache/
+
+# Installshield output folder
+[Ee]xpress/
+
+# DocProject is a documentation generator add-in
+DocProject/buildhelp/
+DocProject/Help/*.HxT
+DocProject/Help/*.HxC
+DocProject/Help/*.hhc
+DocProject/Help/*.hhk
+DocProject/Help/*.hhp
+DocProject/Help/Html2
+DocProject/Help/html
+
+# Click-Once directory
+publish/
+
+# Publish Web Output
+*.[Pp]ublish.xml
+*.azurePubxml
+# Note: Comment the next line if you want to checkin your web deploy settings,
+# but database connection strings (with potential passwords) will be unencrypted
+*.pubxml
+*.publishproj
+
+# Microsoft Azure Web App publish settings. Comment the next line if you want to
+# checkin your Azure Web App publish settings, but sensitive information contained
+# in these scripts will be unencrypted
+PublishScripts/
+
+# NuGet Packages
+*.nupkg
+# NuGet Symbol Packages
+*.snupkg
+# The packages folder can be ignored because of Package Restore
+**/[Pp]ackages/*
+# except build/, which is used as an MSBuild target.
+!**/[Pp]ackages/build/
+# Uncomment if necessary however generally it will be regenerated when needed
+#!**/[Pp]ackages/repositories.config
+# NuGet v3's project.json files produces more ignorable files
+*.nuget.props
+*.nuget.targets
+
+# Microsoft Azure Build Output
+csx/
+*.build.csdef
+
+# Microsoft Azure Emulator
+ecf/
+rcf/
+
+# Windows Store app package directories and files
+AppPackages/
+BundleArtifacts/
+Package.StoreAssociation.xml
+_pkginfo.txt
+*.appx
+*.appxbundle
+*.appxupload
+
+# Visual Studio cache files
+# files ending in .cache can be ignored
+*.[Cc]ache
+# but keep track of directories ending in .cache
+!?*.[Cc]ache/
+
+# Others
+ClientBin/
+~$*
+*~
+*.dbmdl
+*.dbproj.schemaview
+*.jfm
+*.pfx
+*.publishsettings
+orleans.codegen.cs
+
+# Including strong name files can present a security risk
+# (https://github.com/github/gitignore/pull/2483#issue-259490424)
+#*.snk
+
+# Since there are multiple workflows, uncomment next line to ignore bower_components
+# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
+#bower_components/
+
+# RIA/Silverlight projects
+Generated_Code/
+
+# Backup & report files from converting an old project file
+# to a newer Visual Studio version. Backup files are not needed,
+# because we have git ;-)
+_UpgradeReport_Files/
+Backup*/
+UpgradeLog*.XML
+UpgradeLog*.htm
+ServiceFabricBackup/
+*.rptproj.bak
+
+# SQL Server files
+*.mdf
+*.ldf
+*.ndf
+
+# Business Intelligence projects
+*.rdl.data
+*.bim.layout
+*.bim_*.settings
+*.rptproj.rsuser
+*- [Bb]ackup.rdl
+*- [Bb]ackup ([0-9]).rdl
+*- [Bb]ackup ([0-9][0-9]).rdl
+
+# Microsoft Fakes
+FakesAssemblies/
+
+# GhostDoc plugin setting file
+*.GhostDoc.xml
+
+# Node.js Tools for Visual Studio
+.ntvs_analysis.dat
+node_modules/
+
+# Visual Studio 6 build log
+*.plg
+
+# Visual Studio 6 workspace options file
+*.opt
+
+# Visual Studio 6 auto-generated workspace file (contains which files were open etc.)
+*.vbw
+
+# Visual Studio 6 auto-generated project file (contains which files were open etc.)
+*.vbp
+
+# Visual Studio 6 workspace and project file (working project files containing files to include in project)
+*.dsw
+*.dsp
+
+# Visual Studio 6 technical files
+*.ncb
+*.aps
+
+# Visual Studio LightSwitch build output
+**/*.HTMLClient/GeneratedArtifacts
+**/*.DesktopClient/GeneratedArtifacts
+**/*.DesktopClient/ModelManifest.xml
+**/*.Server/GeneratedArtifacts
+**/*.Server/ModelManifest.xml
+_Pvt_Extensions
+
+# Paket dependency manager
+.paket/paket.exe
+paket-files/
+
+# FAKE - F# Make
+.fake/
+
+# CodeRush personal settings
+.cr/personal
+
+# Python Tools for Visual Studio (PTVS)
+__pycache__/
+*.pyc
+
+# Cake - Uncomment if you are using it
+# tools/**
+# !tools/packages.config
+
+# Tabs Studio
+*.tss
+
+# Telerik's JustMock configuration file
+*.jmconfig
+
+# BizTalk build output
+*.btp.cs
+*.btm.cs
+*.odx.cs
+*.xsd.cs
+
+# OpenCover UI analysis results
+OpenCover/
+
+# Azure Stream Analytics local run output
+ASALocalRun/
+
+# MSBuild Binary and Structured Log
+*.binlog
+
+# NVidia Nsight GPU debugger configuration file
+*.nvuser
+
+# MFractors (Xamarin productivity tool) working folder
+.mfractor/
+
+# Local History for Visual Studio
+.localhistory/
+
+# Visual Studio History (VSHistory) files
+.vshistory/
+
+# BeatPulse healthcheck temp database
+healthchecksdb
+
+# Backup folder for Package Reference Convert tool in Visual Studio 2017
+MigrationBackup/
+
+# Ionide (cross platform F# VS Code tools) working folder
+.ionide/
+
+# Fody - auto-generated XML schema
+FodyWeavers.xsd
+
+# VS Code files for those working on multiple tools
+.vscode/*
+!.vscode/settings.json
+!.vscode/tasks.json
+!.vscode/launch.json
+!.vscode/extensions.json
+*.code-workspace
+
+# Local History for Visual Studio Code
+.history/
+
+# Windows Installer files from build outputs
+*.cab
+*.msi
+*.msix
+*.msm
+*.msp
+
+# JetBrains Rider
+*.sln.iml
diff --git a/ConsoleApp2.sln b/ConsoleApp2.sln
new file mode 100644
index 0000000..e84cb85
--- /dev/null
+++ b/ConsoleApp2.sln
@@ -0,0 +1,16 @@
+
+Microsoft Visual Studio Solution File, Format Version 12.00
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ConsoleApp2", "ConsoleApp2\ConsoleApp2.csproj", "{155E4B04-E88C-4BA4-AED2-B13E0A0432B5}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {155E4B04-E88C-4BA4-AED2-B13E0A0432B5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {155E4B04-E88C-4BA4-AED2-B13E0A0432B5}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {155E4B04-E88C-4BA4-AED2-B13E0A0432B5}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {155E4B04-E88C-4BA4-AED2-B13E0A0432B5}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+EndGlobal
diff --git a/ConsoleApp2/ConsoleApp2.csproj b/ConsoleApp2/ConsoleApp2.csproj
new file mode 100644
index 0000000..8d421c2
--- /dev/null
+++ b/ConsoleApp2/ConsoleApp2.csproj
@@ -0,0 +1,20 @@
+
+
+
+ Exe
+ net8.0
+ enable
+ enable
+ Linux
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/ConsoleApp2/CsvConversion.cs b/ConsoleApp2/CsvConversion.cs
new file mode 100644
index 0000000..02e20a7
--- /dev/null
+++ b/ConsoleApp2/CsvConversion.cs
@@ -0,0 +1,76 @@
+using System.Diagnostics;
+using ConsoleApp2.Helpers;
+using ConsoleApp2.Services;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+
+namespace ConsoleApp2;
+
+public class CsvConversion : BackgroundService
+{
+ private readonly ILogger _logger;
+ private readonly IOptions _csvOptions;
+ private readonly DataTransformService _transform;
+ private readonly TaskManager _taskManager;
+
+
+ public CsvConversion(ILogger logger,
+ IOptions csvOptions,
+ DataTransformService transform,
+ TaskManager taskManager)
+ {
+ _logger = logger;
+ _csvOptions = csvOptions;
+ _transform = transform;
+ _taskManager = taskManager;
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
+ {
+ var sw = Stopwatch.StartNew();
+ var inputDir = _csvOptions.Value.InputDir;
+ _logger.LogInformation("Working dir: {InputDir}", inputDir);
+ var files = Directory.GetFiles(inputDir).Where(s => s.EndsWith(".sql") && !s.Contains("schema")).ToArray();
+ if (files.Length == 0)
+ {
+ _logger.LogInformation("No sql files found in {InputDir}", inputDir);
+ return;
+ }
+
+ foreach(var sqlPath in files)
+ {
+ _logger.LogInformation("Working sql file: {SqlPath}", sqlPath);
+ var headers = await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(sqlPath);
+ var csvFiles = await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(sqlPath);
+
+ var queue = new DataRecordQueue();
+ foreach (var csvFile in csvFiles)
+ {
+ var csvPath = Path.Combine(inputDir, csvFile);
+ var source = new JsvSource(csvPath, headers, _logger);
+
+ while (await source.ReadAsync())
+ {
+ queue.Enqueue(source.Current);
+ }
+
+ if (queue.Count > 200)
+ {
+ var queue1 = queue;
+ await _taskManager.CreateTask(async () => await _transform.ExecuteAsync(queue1, cancellationToken));
+ queue = new DataRecordQueue();
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ return;
+ }
+
+ _logger.LogInformation("File '{File}' input completed", Path.GetFileName(sqlPath));
+ }
+
+ _logger.LogInformation("***** Csv input service completed *****");
+ _logger.LogInformation("Elapsed: {Elapsed}", sw.Elapsed);
+ _taskManager.MainTaskCompleted = true;
+ }
+}
\ No newline at end of file
diff --git a/ConsoleApp2/Entities/DataRecord.cs b/ConsoleApp2/Entities/DataRecord.cs
new file mode 100644
index 0000000..b306b57
--- /dev/null
+++ b/ConsoleApp2/Entities/DataRecord.cs
@@ -0,0 +1,54 @@
+namespace ConsoleApp2.Entities;
+
+public class DataRecord
+{
+ public static bool TryGetField(DataRecord record, string columnName, out string value)
+ {
+ value = string.Empty;
+ if (record.Headers is null)
+ throw new InvalidOperationException("Cannot get field when headers of a record have not been set.");
+ var idx = Array.IndexOf(record.Headers, columnName);
+ if (idx == -1)
+ return false;
+ value = record.Fields[idx];
+ return true;
+ }
+
+ public static string GetField(DataRecord record, string columnName)
+ {
+ if (record.Headers is null)
+ throw new InvalidOperationException("Headers have not been set.");
+ var idx = Array.IndexOf(record.Headers, columnName);
+ if (idx is -1)
+ throw new IndexOutOfRangeException("Column name not found in this record.");
+ return record.Fields[idx];
+ }
+
+
+ public string[] Fields { get; }
+
+ public string[]? Headers { get; }
+
+ public string TableName { get; }
+
+
+ public DataRecord(string[] fields, string tableName, string[]? headers = null)
+ {
+ if (headers is not null && fields.Length != headers.Length)
+ throw new ArgumentException(
+ $"The number of fields does not match the number of headers. Expected: {fields.Length} Got: {headers.Length}",
+ nameof(fields));
+
+ Fields = fields;
+ TableName = tableName;
+ Headers = headers;
+ }
+
+ public string this[int index] => Fields[index];
+
+ public string this[string columnName] => GetField(this, columnName);
+
+ public int Count => Fields.Length;
+
+ public bool TryGetField(string columnName, out string value) => TryGetField(this, columnName, out value);
+}
\ No newline at end of file
diff --git a/ConsoleApp2/Helpers/DumpDataHelper.cs b/ConsoleApp2/Helpers/DumpDataHelper.cs
new file mode 100644
index 0000000..c754cb4
--- /dev/null
+++ b/ConsoleApp2/Helpers/DumpDataHelper.cs
@@ -0,0 +1,91 @@
+using System.Text.RegularExpressions;
+
+namespace ConsoleApp2.Helpers;
+
+public static partial class DumpDataHelper
+{
+ [GeneratedRegex(@"'.+\.dat'")]
+ private static partial Regex MatchDatFile();
+ [GeneratedRegex(@"\([^)]*\)")]
+ private static partial Regex MatchBrackets();
+
+
+ public static async Task GetCsvHeadersFromSqlFileAsync(string filePath)
+ {
+ var txt = await File.ReadAllTextAsync(filePath);
+ var match = MatchBrackets().Match(txt);
+
+ return ParseHeader(match.ValueSpan);
+ }
+
+ private static string[] ParseHeader(ReadOnlySpan headerStr)
+ {
+ headerStr = headerStr[1..^1];
+ Span ranges = stackalloc Range[50];
+ var count = headerStr.Split(ranges, ',');
+ var arr = new string[count];
+
+ for (var i = 0; i < count; i++)
+ {
+ arr[i] = headerStr[ranges[i]].Trim("@`").ToString(); // 消除列名的反引号,如果是变量则消除@
+ }
+
+ return arr;
+ }
+
+
+ public static string GetTableName(ReadOnlySpan filePath)
+ {
+ filePath = filePath[(filePath.LastIndexOf('\\') + 1)..];
+ var firstDotIdx = -1;
+ var secondDotIdx = -1;
+ var times = 0;
+ for (var i = 0; i < filePath.Length; i++)
+ {
+ if (filePath[i] == '.')
+ {
+ ++times;
+ if(times == 1)
+ firstDotIdx = i;
+ if (times == 2)
+ {
+ secondDotIdx = i;
+ break;
+ }
+ }
+ }
+
+ return filePath[(firstDotIdx+1)..secondDotIdx].ToString();
+ }
+
+ public static async Task GetCsvFileNamesFromSqlFileAsync(string filePath)
+ {
+ var txt = await File.ReadAllTextAsync(filePath);
+ var matches = MatchDatFile().Matches(txt);
+ return matches.Select(match => match.ValueSpan[1..^1].ToString()).ToArray();
+ }
+
+ public static bool CheckHexField(string? str)
+ {
+ if (string.IsNullOrWhiteSpace(str))
+ return false;
+
+ if (str.StartsWith('\"'))
+ return false;
+
+ var isDigit = true;
+
+ foreach (var c in str)
+ {
+ if (!char.IsAsciiHexDigit(c))
+ return false;
+ if (!char.IsNumber(c))
+ isDigit = false;
+ }
+
+ if (isDigit)
+ return false;
+
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/ConsoleApp2/Helpers/Extensions.Dictionary.cs b/ConsoleApp2/Helpers/Extensions.Dictionary.cs
new file mode 100644
index 0000000..826cb9d
--- /dev/null
+++ b/ConsoleApp2/Helpers/Extensions.Dictionary.cs
@@ -0,0 +1,30 @@
+namespace ConsoleApp2.Helpers;
+
+public static class DictionaryExtensions
+{
+
+ ///
+ /// 根据指定的键是否存在来添加或是更新字典
+ ///
+ ///
+ /// 指定的键
+ /// 如果指定的键不存在,则向字典添加该值
+ /// 如果指定的键存在,则根据该委托的返回值修改字典中对应的值
+ ///
+ ///
+ /// 添加或是修改后的值
+ public static TValue AddOrUpdate(this IDictionary @this, TKey key, TValue addValue,
+ Func updateFactory)
+ {
+ if (!@this.TryGetValue(key, out var value))
+ {
+ @this.Add(key, addValue);
+ }
+ else
+ {
+ @this[key] = updateFactory(key, value);
+ }
+
+ return @this[key];
+ }
+}
\ No newline at end of file
diff --git a/ConsoleApp2/Helpers/Extensions.String.cs b/ConsoleApp2/Helpers/Extensions.String.cs
new file mode 100644
index 0000000..e2d752b
--- /dev/null
+++ b/ConsoleApp2/Helpers/Extensions.String.cs
@@ -0,0 +1,47 @@
+using System.Globalization;
+using System.Text;
+
+namespace ConsoleApp2.Helpers;
+
+public static class StringExtensions
+{
+ public static string Omit(this ReadOnlySpan @this, int maxLength)
+ {
+ if (@this.Length > maxLength)
+ return @this[..maxLength].ToString() + "...";
+ return @this.ToString();
+ }
+
+ public static string Omit(this string @this, int maxLength) => Omit(@this.AsSpan(), maxLength);
+
+ public static string FromHex(ReadOnlySpan hexString, Encoding? encoding = null)
+ {
+ encoding ??= Encoding.UTF8;
+
+ var realLength = 0;
+ for (var i = hexString.Length - 2; i >= 0; i -= 2)
+ {
+ var b = byte.Parse(hexString.Slice(i, 2), NumberStyles.HexNumber, CultureInfo.InvariantCulture);
+ if (b != 0) //not NULL character
+ {
+ realLength = i + 2;
+ break;
+ }
+ }
+
+ var bytes = new byte[realLength / 2];
+ for (var i = 0; i < bytes.Length; i++)
+ {
+ bytes[i] = byte.Parse(hexString.Slice(i * 2, 2), NumberStyles.HexNumber, CultureInfo.InvariantCulture);
+ }
+
+ return encoding.GetString(bytes);
+ }
+
+ public static bool CheckJsonHex(ReadOnlySpan hexStr)
+ {
+ if (hexStr.Length < 2)
+ return false;
+ return FromHex(hexStr[..2]) is ['{'] or ['['];
+ }
+}
\ No newline at end of file
diff --git a/ConsoleApp2/Helpers/HashExtension.cs b/ConsoleApp2/Helpers/HashExtension.cs
new file mode 100644
index 0000000..76fd359
--- /dev/null
+++ b/ConsoleApp2/Helpers/HashExtension.cs
@@ -0,0 +1,249 @@
+using System.Security.Cryptography;
+using System.Text;
+
+namespace ConsoleApp2.Helpers;
+
+public static class HashExtensions
+{
+ ///
+ /// 计算32位MD5码
+ ///
+ /// 字符串
+ /// 返回哈希值格式 true:英文大写,false:英文小写
+ ///
+ public static string ToMd5Hash(this string word, bool toUpper = true)
+ {
+ try
+ {
+ var MD5CSP = MD5.Create();
+ var bytValue = Encoding.UTF8.GetBytes(word);
+ var bytHash = MD5CSP.ComputeHash(bytValue);
+ MD5CSP.Clear();
+ //根据计算得到的Hash码翻译为MD5码
+ var sHash = "";
+ foreach (var t in bytHash)
+ {
+ long i = t / 16;
+ var sTemp = i > 9 ? ((char)(i - 10 + 0x41)).ToString() : ((char)(i + 0x30)).ToString();
+ i = t % 16;
+ if (i > 9)
+ {
+ sTemp += ((char)(i - 10 + 0x41)).ToString();
+ }
+ else
+ {
+ sTemp += ((char)(i + 0x30)).ToString();
+ }
+
+ sHash += sTemp;
+ }
+
+ //根据大小写规则决定返回的字符串
+ return toUpper ? sHash : sHash.ToLower();
+ }
+ catch (System.Exception ex)
+ {
+ throw new System.Exception(ex.Message);
+ }
+ }
+
+ public static string ToMd5Hash(this Stream stream, bool toUpper = true)
+ {
+ using var md5Hash = MD5.Create();
+ var bytes = md5Hash.ComputeHash(stream);
+ return ToHashString(bytes, toUpper);
+ }
+
+ ///
+ /// 计算SHA-1码
+ ///
+ /// 字符串
+ /// 返回哈希值格式 true:英文大写,false:英文小写
+ ///
+ public static string ToSHA1Hash(this string word, bool toUpper = true)
+ {
+ try
+ {
+ var SHA1CSP = SHA1.Create();
+ var bytValue = Encoding.UTF8.GetBytes(word);
+ var bytHash = SHA1CSP.ComputeHash(bytValue);
+ SHA1CSP.Clear();
+ //根据计算得到的Hash码翻译为SHA-1码
+ var sHash = "";
+ foreach (var t in bytHash)
+ {
+ long i = t / 16;
+ var sTemp = i > 9 ? ((char)(i - 10 + 0x41)).ToString() : ((char)(i + 0x30)).ToString();
+ i = t % 16;
+ if (i > 9)
+ {
+ sTemp += ((char)(i - 10 + 0x41)).ToString();
+ }
+ else
+ {
+ sTemp += ((char)(i + 0x30)).ToString();
+ }
+
+ sHash += sTemp;
+ }
+
+ //根据大小写规则决定返回的字符串
+ return toUpper ? sHash : sHash.ToLower();
+ }
+ catch (System.Exception ex)
+ {
+ throw new System.Exception(ex.Message);
+ }
+ }
+
+ ///
+ /// 计算SHA-256码
+ ///
+ /// 字符串
+ /// 返回哈希值格式 true:英文大写,false:英文小写
+ ///
+ public static string ToSHA256Hash(this string word, bool toUpper = true)
+ {
+ try
+ {
+ var SHA256CSP = SHA256.Create();
+ var bytValue = Encoding.UTF8.GetBytes(word);
+ var bytHash = SHA256CSP.ComputeHash(bytValue);
+ SHA256CSP.Clear();
+ //根据计算得到的Hash码翻译为SHA-1码
+ var sHash = "";
+ foreach (var t in bytHash)
+ {
+ long i = t / 16;
+ var sTemp = i > 9 ? ((char)(i - 10 + 0x41)).ToString() : ((char)(i + 0x30)).ToString();
+ i = t % 16;
+ if (i > 9)
+ {
+ sTemp += ((char)(i - 10 + 0x41)).ToString();
+ }
+ else
+ {
+ sTemp += ((char)(i + 0x30)).ToString();
+ }
+
+ sHash += sTemp;
+ }
+
+ //根据大小写规则决定返回的字符串
+ return toUpper ? sHash : sHash.ToLower();
+ }
+ catch (System.Exception ex)
+ {
+ throw new System.Exception(ex.Message);
+ }
+ }
+
+ ///
+ /// 计算SHA-256码
+ ///
+ ///
+ ///
+ ///
+ public static string ToSHA256Hash(this Stream stream, bool toUpper = true)
+ {
+ using var sha256Hash = SHA256.Create();
+ var bytes = sha256Hash.ComputeHash(stream);
+ return ToHashString(bytes, toUpper);
+ }
+
+ ///
+ /// 计算SHA-384码
+ ///
+ /// 字符串
+ /// 返回哈希值格式 true:英文大写,false:英文小写
+ ///
+ public static string ToSHA384Hash(this string word, bool toUpper = true)
+ {
+ try
+ {
+ var SHA384CSP = SHA384.Create();
+ var bytValue = Encoding.UTF8.GetBytes(word);
+ var bytHash = SHA384CSP.ComputeHash(bytValue);
+ SHA384CSP.Clear();
+ //根据计算得到的Hash码翻译为SHA-1码
+ var sHash = "";
+ foreach (var t in bytHash)
+ {
+ long i = t / 16;
+ var sTemp = i > 9 ? ((char)(i - 10 + 0x41)).ToString() : ((char)(i + 0x30)).ToString();
+ i = t % 16;
+ if (i > 9)
+ {
+ sTemp += ((char)(i - 10 + 0x41)).ToString();
+ }
+ else
+ {
+ sTemp += ((char)(i + 0x30)).ToString();
+ }
+
+ sHash += sTemp;
+ }
+
+ //根据大小写规则决定返回的字符串
+ return toUpper ? sHash : sHash.ToLower();
+ }
+ catch (System.Exception ex)
+ {
+ throw new System.Exception(ex.Message);
+ }
+ }
+
+ ///
+ /// 计算SHA-512码
+ ///
+ /// 字符串
+ /// 返回哈希值格式 true:英文大写,false:英文小写
+ ///
+ public static string ToSHA512Hash(this string word, bool toUpper = true)
+ {
+ try
+ {
+ var SHA512CSP = SHA512.Create();
+ var bytValue = Encoding.UTF8.GetBytes(word);
+ var bytHash = SHA512CSP.ComputeHash(bytValue);
+ SHA512CSP.Clear();
+ //根据计算得到的Hash码翻译为SHA-1码
+ var sHash = "";
+ foreach (var t in bytHash)
+ {
+ long i = t / 16;
+ var sTemp = i > 9 ? ((char)(i - 10 + 0x41)).ToString() : ((char)(i + 0x30)).ToString();
+ i = t % 16;
+ if (i > 9)
+ {
+ sTemp += ((char)(i - 10 + 0x41)).ToString();
+ }
+ else
+ {
+ sTemp += ((char)(i + 0x30)).ToString();
+ }
+
+ sHash += sTemp;
+ }
+
+ //根据大小写规则决定返回的字符串
+ return toUpper ? sHash : sHash.ToLower();
+ }
+ catch (System.Exception ex)
+ {
+ throw new System.Exception(ex.Message);
+ }
+ }
+
+ private static string ToHashString(byte[] bytes, bool toUpper = true)
+ {
+ var builder = new StringBuilder();
+ foreach (var t in bytes)
+ {
+ builder.Append(t.ToString("x2"));
+ }
+
+ var str = builder.ToString();
+ return toUpper ? str.ToUpper() : str.ToLower();
+ }
+}
\ No newline at end of file
diff --git a/ConsoleApp2/JsvSource.cs b/ConsoleApp2/JsvSource.cs
new file mode 100644
index 0000000..829392f
--- /dev/null
+++ b/ConsoleApp2/JsvSource.cs
@@ -0,0 +1,48 @@
+using ConsoleApp2.Entities;
+using ConsoleApp2.Helpers;
+using Microsoft.Extensions.Logging;
+using ServiceStack.Text;
+
+namespace ConsoleApp2;
+
+public class JsvSource : IDisposable
+{
+ private readonly string _filePath;
+ private readonly JsvStringSerializer _jsv;
+ private readonly StreamReader _reader;
+ // ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable
+ private readonly ILogger? _logger;
+ private readonly string _tableName;
+
+ public DataRecord Current { get; protected set; } = null!;
+ public string[]? Headers { get; }
+ public bool EndOfSource => _reader.EndOfStream;
+
+ public JsvSource(string filePath, string[]? headers = null, ILogger? logger = null)
+ {
+ _filePath = filePath;
+ _jsv = new JsvStringSerializer();
+ _reader = new StreamReader(filePath);
+ Headers = headers;
+ _logger = logger;
+ // _logger?.LogInformation("Reading file: {FilePath}", filePath);
+ _tableName = DumpDataHelper.GetTableName(filePath);
+ }
+
+ public async ValueTask ReadAsync()
+ {
+ var str = await _reader.ReadLineAsync();
+ if (string.IsNullOrEmpty(str))
+ return false;
+ var fields = _jsv.DeserializeFromString(str);
+ if(Headers is not null && Headers.Length != fields.Length)
+ throw new InvalidDataException("解析的字段数与指定的列数不匹配");
+ Current = new DataRecord(fields, _tableName, Headers);
+ return true;
+ }
+
+ public void Dispose()
+ {
+ _reader.Dispose();
+ }
+}
\ No newline at end of file
diff --git a/ConsoleApp2/MySqlDestination.cs b/ConsoleApp2/MySqlDestination.cs
new file mode 100644
index 0000000..9654cc0
--- /dev/null
+++ b/ConsoleApp2/MySqlDestination.cs
@@ -0,0 +1,128 @@
+using System.Text;
+using ConsoleApp2.Entities;
+using ConsoleApp2.Helpers;
+using Microsoft.Extensions.Logging;
+using MySqlConnector;
+
+namespace ConsoleApp2;
+
+public class MySqlDestination : IDisposable, IAsyncDisposable
+{
+ private readonly Dictionary> _recordCache;
+ private readonly MySqlConnection _conn;
+ private readonly ILogger _logger;
+
+ public static int AddCount;
+
+ public MySqlDestination(string connStr, ILogger logger)
+ {
+ _conn = new MySqlConnection(connStr);
+ _conn.Open();
+ _recordCache = new Dictionary>();
+ _logger = logger;
+ }
+
+ public Task WriteRecordAsync(DataRecord record)
+ {
+ _recordCache.AddOrUpdate(record.TableName, [record], (key, value) =>
+ {
+ value.Add(record);
+ Interlocked.Increment(ref AddCount);
+ return value;
+ });
+ return Task.CompletedTask;
+ }
+
+ public async Task WriteRecordsAsync(IEnumerable records)
+ {
+ foreach (var record in records)
+ {
+ await WriteRecordAsync(record);
+ }
+ }
+
+ public async Task FlushAsync()
+ {
+ if (_recordCache.Count == 0)
+ return;
+ try
+ {
+ var cmd = _conn.CreateCommand();
+ var sb = new StringBuilder();
+
+ var count = 0;
+ foreach (var (tableName, records) in _recordCache)
+ {
+ if (records.Count == 0)
+ continue;
+ sb.Append($"INSERT INTO `{tableName}`(");
+ for (var i = 0; i < records[0].Headers.Length; i++)
+ {
+ var header = records[0].Headers[i];
+ sb.Append($"`{header}`");
+ if (i != records[0].Headers.Length - 1)
+ sb.Append(',');
+ }
+
+ sb.AppendLine(") VALUES");
+
+ for (var i = 0; i < records.Count; i++)
+ {
+ count++;
+ var record = records[i];
+ sb.Append('(');
+ for (var j = 0; j < record.Fields.Length; j++)
+ {
+ var field = record.Fields[j];
+
+ #region HandleFields
+
+ if (field == "\\N")
+ sb.Append("NULL");
+ else if (DumpDataHelper.CheckHexField(field)) // TODO: 性能消耗
+ {
+ if (StringExtensions.CheckJsonHex(field))
+ sb.Append($"UNHEX(\"{field}\")");
+ else
+ sb.Append($"\"{field}\"");
+ }
+ else
+ sb.Append($"\"{field}\"");
+
+ #endregion
+
+ if (j != record.Fields.Length - 1)
+ sb.Append(',');
+ }
+
+ sb.Append(')');
+
+ if (i != records.Count - 1) // not last field
+ sb.AppendLine(",");
+ }
+
+ sb.AppendLine(";\n");
+ }
+
+ cmd.CommandText = sb.ToString();
+
+ await cmd.ExecuteNonQueryAsync();
+ _recordCache.Clear();
+ }
+ catch (Exception e)
+ {
+ _logger.LogCritical(e, "Error when flushing records");
+ throw;
+ }
+ }
+
+ public void Dispose()
+ {
+ _conn.Dispose();
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ await _conn.DisposeAsync();
+ }
+}
\ No newline at end of file
diff --git a/ConsoleApp2/Options/CsvOptions.cs b/ConsoleApp2/Options/CsvOptions.cs
new file mode 100644
index 0000000..b01d43c
--- /dev/null
+++ b/ConsoleApp2/Options/CsvOptions.cs
@@ -0,0 +1,28 @@
+namespace ConsoleApp2;
+
+public class CsvOptions
+{
+ ///
+ /// The directory to input csv and sql file.
+ ///
+ public string InputDir { get; set; } = "./";
+ ///
+ /// The output directory.
+ ///
+ public string OutputDir { get; set; } = "./Output";
+
+ ///
+ /// The ASCII char that fields are enclosed by. Default is '"'.
+ ///
+ public char QuoteChar { get; set; } = '"';
+
+ ///
+ /// The ASCII char that fields are separated by. Default is ','.
+ ///
+ public char DelimiterChar { get; set; } = ',';
+
+ ///
+ /// The max number of threads to use.
+ ///
+ public int MaxThreads { get; set; } = 12;
+}
\ No newline at end of file
diff --git a/ConsoleApp2/Options/DataTransformOptions.cs b/ConsoleApp2/Options/DataTransformOptions.cs
new file mode 100644
index 0000000..cd1ed95
--- /dev/null
+++ b/ConsoleApp2/Options/DataTransformOptions.cs
@@ -0,0 +1,9 @@
+using ConsoleApp2.Entities;
+using ConsoleApp2.Options;
+
+namespace ConsoleApp2;
+
+public class DataTransformOptions
+{
+ public Func DatabaseFilter { get; set; }
+}
\ No newline at end of file
diff --git a/ConsoleApp2/Options/DatabaseOptions.cs b/ConsoleApp2/Options/DatabaseOptions.cs
new file mode 100644
index 0000000..3a4a3d0
--- /dev/null
+++ b/ConsoleApp2/Options/DatabaseOptions.cs
@@ -0,0 +1,3 @@
+namespace ConsoleApp2.Options;
+
+public record DatabaseOptions(string Host, uint Port, string Database, string User, string Password);
\ No newline at end of file
diff --git a/ConsoleApp2/Program.cs b/ConsoleApp2/Program.cs
new file mode 100644
index 0000000..51cd61c
--- /dev/null
+++ b/ConsoleApp2/Program.cs
@@ -0,0 +1,37 @@
+using ConsoleApp2;
+using ConsoleApp2.Options;
+using ConsoleApp2.Services;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Serilog;
+
+ThreadPool.SetMaxThreads(200, 200);
+var host = Host.CreateApplicationBuilder();
+host.Configuration.AddCommandLine(args);
+host.Services.Configure(option =>
+{
+ option.DelimiterChar = ',';
+ option.QuoteChar = '"';
+ option.InputDir = "D:/Dump/MyDumper-Csv";
+ option.OutputDir = "D:/DumpOutput";
+ option.MaxThreads = 12;
+});
+host.Services.Configure(options =>
+{
+ var dbOption = new DatabaseOptions("localhost", 33306, "cferp_test_1", "root", "123456");
+ options.DatabaseFilter = record => dbOption;
+});
+host.Services.AddLogging(builder =>
+{
+ builder.ClearProviders();
+ builder.AddSerilog(new LoggerConfiguration().WriteTo.Console().CreateLogger());
+});
+host.Services.AddHostedService();
+host.Services.AddHostedService();
+host.Services.AddSingleton();
+host.Services.AddSingleton();
+host.Services.AddSingleton();
+var app = host.Build();
+await app.RunAsync();
\ No newline at end of file
diff --git a/ConsoleApp2/Services/DataRecordQueue.cs b/ConsoleApp2/Services/DataRecordQueue.cs
new file mode 100644
index 0000000..72f4b13
--- /dev/null
+++ b/ConsoleApp2/Services/DataRecordQueue.cs
@@ -0,0 +1,60 @@
+using System.Collections.Concurrent;
+using System.Diagnostics.CodeAnalysis;
+using ConsoleApp2.Entities;
+using Microsoft.Extensions.Logging;
+
+namespace ConsoleApp2.Services;
+
+public class DataRecordQueue
+{
+ ///
+ /// Indicate that the queue is completed adding.
+ ///
+ public bool IsCompletedAdding { get; private set; }
+
+ ///
+ /// Remark that the queue is completed for adding and empty;
+ ///
+ public bool IsCompleted => IsCompletedAdding && _queue.IsEmpty;
+
+ private readonly ConcurrentQueue _queue;
+
+ public DataRecordQueue()
+ {
+ _queue = new ConcurrentQueue();
+ }
+
+ public DataRecordQueue(IEnumerable records)
+ {
+ _queue = new ConcurrentQueue(records);
+ }
+
+ ///
+ public void Enqueue(DataRecord item)
+ {
+ _queue.Enqueue(item);
+ }
+
+ ///
+ public bool TryDequeue([MaybeNullWhen(false)] out DataRecord result)
+ {
+ return _queue.TryDequeue(out result);
+ }
+
+ ///
+ public bool TryPeek([MaybeNullWhen(false)] out DataRecord result)
+ {
+ return _queue.TryPeek(out result);
+ }
+
+ ///
+ public int Count => _queue.Count;
+
+ ///
+ public bool IsEmpty => _queue.IsEmpty;
+
+ ///
+ /// Indicate that the queue is completed adding.
+ ///
+ public void CompleteAdding() => IsCompletedAdding = true;
+}
\ No newline at end of file
diff --git a/ConsoleApp2/Services/DataTransformService.cs b/ConsoleApp2/Services/DataTransformService.cs
new file mode 100644
index 0000000..c8a3729
--- /dev/null
+++ b/ConsoleApp2/Services/DataTransformService.cs
@@ -0,0 +1,46 @@
+using ConsoleApp2.Entities;
+using ConsoleApp2.Helpers;
+using ConsoleApp2.Options;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+
+namespace ConsoleApp2.Services;
+
+public class DataTransformService
+{
+ private readonly ILogger _logger;
+ private readonly TaskManager _taskManager;
+ private readonly DatabaseOutputService _output;
+ private readonly IOptions _options;
+
+ public DataTransformService(ILogger logger, TaskManager taskManager, DatabaseOutputService output, IOptions options)
+ {
+ _logger = logger;
+ _taskManager = taskManager;
+ _output = output;
+ _options = options;
+ }
+
+ public async Task ExecuteAsync(DataRecordQueue records, CancellationToken cancellationToken = default)
+ {
+ _logger.LogInformation("Start transforming data.");
+ var map = new Dictionary();
+ while (records.TryDequeue(out var record))
+ {
+ var dbOptions = _options.Value.DatabaseFilter(record);
+ map.AddOrUpdate(dbOptions, new DataRecordQueue([record]), (options, queue) =>
+ {
+ queue.Enqueue(record);
+ return queue;
+ });
+ }
+
+ foreach (var (dbOptions, queue) in map)
+ {
+ await _taskManager.CreateTask(async () =>
+ {
+ await _output.ExecuteAsync(queue, dbOptions, cancellationToken);
+ });
+ }
+ }
+}
\ No newline at end of file
diff --git a/ConsoleApp2/Services/DatabaseOutputService.cs b/ConsoleApp2/Services/DatabaseOutputService.cs
new file mode 100644
index 0000000..04948bc
--- /dev/null
+++ b/ConsoleApp2/Services/DatabaseOutputService.cs
@@ -0,0 +1,39 @@
+using ConsoleApp2.Entities;
+using ConsoleApp2.Options;
+using Microsoft.Extensions.Logging;
+using MySqlConnector;
+
+namespace ConsoleApp2.Services;
+
+public class DatabaseOutputService
+{
+ private readonly ILogger _logger;
+
+ public DatabaseOutputService(ILogger logger)
+ {
+ _logger = logger;
+ }
+
+ public async Task ExecuteAsync(DataRecordQueue records, DatabaseOptions options, CancellationToken stoppingToken = default)
+ {
+ var count = records.Count;
+ var output = new MySqlDestination(new MySqlConnectionStringBuilder()
+ {
+ Server = options.Host,
+ Port = options.Port,
+ Database = options.Database,
+ UserID = options.User,
+ Password = options.Password,
+ ConnectionTimeout = 120,
+ }.ConnectionString, _logger); // TODO: 加入DI
+
+ while (records.TryDequeue(out var record) && !stoppingToken.IsCancellationRequested)
+ {
+ await output.WriteRecordAsync(record);
+ }
+
+ await output.FlushAsync();
+
+ _logger.LogInformation("Flush {Count} records to database.", count);
+ }
+}
\ No newline at end of file
diff --git a/ConsoleApp2/Services/TaskManager.cs b/ConsoleApp2/Services/TaskManager.cs
new file mode 100644
index 0000000..3638d68
--- /dev/null
+++ b/ConsoleApp2/Services/TaskManager.cs
@@ -0,0 +1,28 @@
+using System.Collections.Concurrent;
+using Microsoft.Extensions.Logging;
+
+namespace ConsoleApp2.Services;
+
+public class TaskManager
+{
+ private readonly ConcurrentBag _tasks;
+ private readonly ILogger _logger;
+
+ public int RunningTaskCount => _tasks.Count(task => !task.IsCompleted);
+ public IReadOnlyCollection Tasks => _tasks;
+ public bool MainTaskCompleted { get; set; }
+
+ public TaskManager(ILogger logger)
+ {
+ _tasks = new ConcurrentBag();
+ _logger = logger;
+ }
+
+ public Task CreateTask(Func func)
+ {
+ var task = Task.Factory.StartNew(func);
+ _tasks.Add(task);
+ _logger.LogInformation("New task created.");
+ return task;
+ }
+}
\ No newline at end of file
diff --git a/ConsoleApp2/Services/TaskMonitorService.cs b/ConsoleApp2/Services/TaskMonitorService.cs
new file mode 100644
index 0000000..532ec55
--- /dev/null
+++ b/ConsoleApp2/Services/TaskMonitorService.cs
@@ -0,0 +1,57 @@
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace ConsoleApp2.Services;
+
+public class TaskMonitorService : BackgroundService
+{
+ private readonly IHostApplicationLifetime _lifetime;
+ private readonly TaskManager _taskManager;
+ private readonly ILogger _logger;
+
+ public TaskMonitorService(IHostApplicationLifetime lifetime, TaskManager taskManager,
+ ILogger logger)
+ {
+ _lifetime = lifetime;
+ _taskManager = taskManager;
+ _logger = logger;
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ while (!_taskManager.MainTaskCompleted || _taskManager.RunningTaskCount != 0)
+ {
+ var running = 0;
+ var error = 0;
+ var completed = 0;
+ var canceled = 0;
+ foreach (var task in _taskManager.Tasks)
+ {
+ switch (task.Status)
+ {
+ case TaskStatus.Running:
+ running++;
+ break;
+ case TaskStatus.Canceled:
+ canceled++;
+ break;
+ case TaskStatus.Faulted:
+ error++;
+ break;
+ case TaskStatus.RanToCompletion:
+ completed++;
+ break;
+ default:
+ throw new ArgumentOutOfRangeException();
+ }
+ }
+
+ _logger.LogInformation(
+ "Task monitor: running: {Running}, error: {Error}, completed: {Completed}, canceled: {Canceled}",
+ running, error, completed, canceled);
+ await Task.Delay(2000);
+ }
+
+ _logger.LogInformation("***** All tasks completed *****");
+ }
+}
\ No newline at end of file