This commit is contained in:
2023-12-29 16:16:05 +08:00
parent 6b88f5bd40
commit c53d2927bb
24 changed files with 909 additions and 386 deletions

View File

@@ -1,60 +1,49 @@
using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using ConsoleApp2.Entities;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services;
public class DataRecordQueue
public class DataRecordQueue : IDisposable
{
/// <summary>
/// Indicate that the queue is completed adding.
/// </summary>
public bool IsCompletedAdding { get; private set; }
/// <summary>
/// Remark that the queue is completed for adding and empty;
/// </summary>
public bool IsCompleted => IsCompletedAdding && _queue.IsEmpty;
private readonly ConcurrentQueue<DataRecord> _queue;
private readonly BlockingCollection<DataRecord> _queue;
public int Count => _queue.Count;
public bool IsCompleted => _queue.IsCompleted;
public bool IsAddingCompleted => _queue.IsAddingCompleted;
public event Action? OnRecordWrite;
public event Action? OnRecordRead;
public DataRecordQueue()
{
_queue = new ConcurrentQueue<DataRecord>();
}
public DataRecordQueue(IEnumerable<DataRecord> records)
{
_queue = new ConcurrentQueue<DataRecord>(records);
_queue = new BlockingCollection<DataRecord>();
}
/// <inheritdoc cref="ConcurrentQueue{T}.Enqueue"/>
public void Enqueue(DataRecord item)
public bool TryDequeue([MaybeNullWhen(false)] out DataRecord record)
{
_queue.Enqueue(item);
if (_queue.TryTake(out record))
{
OnRecordRead?.Invoke();
return true;
}
return false;
}
/// <inheritdoc cref="ConcurrentQueue{T}.TryDequeue"/>
public bool TryDequeue([MaybeNullWhen(false)] out DataRecord result)
public DataRecord Dequeue() => _queue.Take();
public void CompleteAdding() => _queue.CompleteAdding();
public void Enqueue(DataRecord record)
{
return _queue.TryDequeue(out result);
_queue.Add(record);
OnRecordWrite?.Invoke();
}
/// <inheritdoc cref="ConcurrentQueue{T}.TryPeek"/>
public bool TryPeek([MaybeNullWhen(false)] out DataRecord result)
public void Dispose()
{
return _queue.TryPeek(out result);
_queue.Dispose();
}
/// <inheritdoc cref="ConcurrentQueue{T}.Count"/>
public int Count => _queue.Count;
/// <inheritdoc cref="ConcurrentQueue{T}.IsEmpty"/>
public bool IsEmpty => _queue.IsEmpty;
/// <summary>
/// Indicate that the queue is completed adding.
/// </summary>
public void CompleteAdding() => IsCompletedAdding = true;
}

View File

@@ -1,46 +0,0 @@
using ConsoleApp2.Entities;
using ConsoleApp2.Helpers;
using ConsoleApp2.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ConsoleApp2.Services;
public class DataTransformService
{
private readonly ILogger _logger;
private readonly TaskManager _taskManager;
private readonly DatabaseOutputService _output;
private readonly IOptions<DataTransformOptions> _options;
public DataTransformService(ILogger<DataTransformService> logger, TaskManager taskManager, DatabaseOutputService output, IOptions<DataTransformOptions> options)
{
_logger = logger;
_taskManager = taskManager;
_output = output;
_options = options;
}
public async Task ExecuteAsync(DataRecordQueue records, CancellationToken cancellationToken = default)
{
_logger.LogInformation("Start transforming data.");
var map = new Dictionary<DatabaseOptions, DataRecordQueue>();
while (records.TryDequeue(out var record))
{
var dbOptions = _options.Value.DatabaseFilter(record);
map.AddOrUpdate(dbOptions, new DataRecordQueue([record]), (options, queue) =>
{
queue.Enqueue(record);
return queue;
});
}
foreach (var (dbOptions, queue) in map)
{
await _taskManager.CreateTask(async () =>
{
await _output.ExecuteAsync(queue, dbOptions, cancellationToken);
});
}
}
}

View File

@@ -1,39 +0,0 @@
using ConsoleApp2.Entities;
using ConsoleApp2.Options;
using Microsoft.Extensions.Logging;
using MySqlConnector;
namespace ConsoleApp2.Services;
public class DatabaseOutputService
{
private readonly ILogger _logger;
public DatabaseOutputService(ILogger<DatabaseOutputService> logger)
{
_logger = logger;
}
public async Task ExecuteAsync(DataRecordQueue records, DatabaseOptions options, CancellationToken stoppingToken = default)
{
var count = records.Count;
var output = new MySqlDestination(new MySqlConnectionStringBuilder()
{
Server = options.Host,
Port = options.Port,
Database = options.Database,
UserID = options.User,
Password = options.Password,
ConnectionTimeout = 120,
}.ConnectionString, _logger); // TODO: 加入DI
while (records.TryDequeue(out var record) && !stoppingToken.IsCancellationRequested)
{
await output.WriteRecordAsync(record);
}
await output.FlushAsync();
_logger.LogInformation("Flush {Count} records to database.", count);
}
}

View File

@@ -0,0 +1,47 @@
using ConsoleApp2.Entities;
using ConsoleApp2.Helpers;
using Microsoft.Extensions.Logging;
using ServiceStack.Text;
namespace ConsoleApp2.Services;
public class JsvSource : IDisposable
{
private readonly string _filePath;
private readonly JsvStringSerializer _jsv;
private readonly StreamReader _reader;
// ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable
private readonly ILogger? _logger;
private readonly string _tableName;
public DataRecord Current { get; protected set; } = null!;
public string[]? Headers { get; }
public bool EndOfSource => _reader.EndOfStream;
public JsvSource(string filePath, string[]? headers = null, ILogger? logger = null)
{
_filePath = filePath;
_jsv = new JsvStringSerializer();
_reader = new StreamReader(filePath);
Headers = headers;
_logger = logger;
// _logger?.LogInformation("Reading file: {FilePath}", filePath);
_tableName = DumpDataHelper.GetTableName(filePath);
}
public async ValueTask<bool> ReadAsync()
{
var str = await _reader.ReadLineAsync();
if (string.IsNullOrEmpty(str))
return false;
var fields = _jsv.DeserializeFromString<string[]>(str);
Current = new DataRecord(fields, _tableName, Headers);
return true;
}
public void Dispose()
{
_reader.Dispose();
}
}

View File

@@ -0,0 +1,137 @@
using System.Text;
using ConsoleApp2.Entities;
using ConsoleApp2.Helpers;
using Microsoft.Extensions.Logging;
using MySqlConnector;
namespace ConsoleApp2.Services;
public class MySqlDestination : IDisposable, IAsyncDisposable
{
private readonly Dictionary<string, IList<DataRecord>> _recordCache;
private readonly MySqlConnection _conn;
private readonly ILogger _logger;
private readonly bool _prettyOutput;
public static int AddCount;
public MySqlDestination(string connStr, ILogger logger, bool prettyOutput = false)
{
_conn = new MySqlConnection(connStr);
_conn.Open();
_recordCache = new Dictionary<string, IList<DataRecord>>();
_logger = logger;
_prettyOutput = prettyOutput;
}
public Task WriteRecordAsync(DataRecord record)
{
_recordCache.AddOrUpdate(record.TableName, [record], (key, value) =>
{
value.Add(record);
Interlocked.Increment(ref AddCount);
return value;
});
return Task.CompletedTask;
}
public async Task WriteRecordsAsync(IEnumerable<DataRecord> records)
{
foreach (var record in records)
{
await WriteRecordAsync(record);
}
}
public async Task FlushAsync()
{
if (_recordCache.Count == 0)
return;
var cmd = _conn.CreateCommand();
cmd.CommandText = SerializeRecords(_recordCache, _prettyOutput);
try
{
await cmd.ExecuteNonQueryAsync();
_recordCache.Clear();
}
catch (Exception e)
{
_logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000));
throw;
}
}
public static string SerializeRecords(IDictionary<string, IList<DataRecord>> tableRecords,
bool prettyOutput = false)
{
var sb = new StringBuilder();
foreach (var (tableName, records) in tableRecords)
{
if (records.Count == 0)
continue;
sb.Append($"INSERT INTO `{tableName}`(");
for (var i = 0; i < records[0].Headers.Length; i++)
{
var header = records[0].Headers[i];
sb.Append($"`{header}`");
if (i != records[0].Headers.Length - 1)
sb.Append(',');
}
sb.Append(") VALUES ");
if (prettyOutput)
sb.AppendLine();
for (var i = 0; i < records.Count; i++)
{
var record = records[i];
sb.Append('(');
for (var j = 0; j < record.Fields.Length; j++)
{
var field = record.Fields[j];
#region HandleFields
// if (field == "\\N")
// sb.Append("NULL");
// else if (DumpDataHelper.CheckHexField(field))
// {
// // if (StringExtensions.CheckJsonHex(field))
// sb.Append($"0x{field}");
// }
// else
// sb.Append($"'{field}'");
sb.Append(field);
#endregion
if (j != record.Fields.Length - 1)
sb.Append(',');
}
sb.Append(')');
if (i != records.Count - 1) // not last field
sb.Append(',');
if (prettyOutput) sb.AppendLine();
}
sb.AppendLine(";");
}
return sb.ToString();
}
public void Dispose()
{
_conn.Dispose();
}
public async ValueTask DisposeAsync()
{
await _conn.DisposeAsync();
}
}

View File

@@ -0,0 +1,137 @@
using System.Text;
using ConsoleApp2.Entities;
using ConsoleApp2.Helpers;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services;
public class NewCsvSource
{
private readonly string _filePath;
private readonly StreamReader _reader;
private readonly ILogger? _logger;
private readonly string _tableName;
public DataRecord Current { get; protected set; }
public string[]? Headers { get; }
public string? CurrentRaw { get; private set; }
public string Delimiter { get; private set; }
public char QuoteChar { get; private set; }
public NewCsvSource(string filePath, string[]? headers = null, string delimiter = ",", char quoteChar = '"',
ILogger? logger = null)
{
_filePath = filePath;
Headers = headers;
_logger = logger;
Delimiter = delimiter;
QuoteChar = quoteChar;
var fs = File.OpenRead(filePath);
_reader = new StreamReader(fs);
_tableName = DumpDataHelper.GetTableName(filePath);
}
public async ValueTask<bool> ReadAsync()
{
var str = await _reader.ReadLineAsync();
if (string.IsNullOrWhiteSpace(str))
return false;
CurrentRaw = str;
var fields = ParseRow2(str, QuoteChar, Delimiter);
Current = new DataRecord(fields, _tableName, Headers);
return true;
}
public string[] ParseRow(string row, char quoteChar, string delimiter)
{
var span = row.AsSpan();
var result = new List<string>();
if (span.Length == 0)
throw new ArgumentException("The row is empty", nameof(row));
var isInQuote = span[0] == quoteChar;
var start = 0;
for (var i = 1; i < span.Length; i++)
{
if (span[i] == quoteChar)
{
isInQuote = !isInQuote;
}
// delimiter需要足够复杂
else if (/*!isInQuote && */span.Length > i + delimiter.Length && span[i..(i + delimiter.Length)].Equals(delimiter, StringComparison.CurrentCulture)) // field matched
{
string field;
if (span[start] == quoteChar && span[i - 1] == quoteChar) // enclosed by quoteChar
field = span[(start + 1)..(i - 1)].ToString(); // escape quoteChar
else
field = span[start..i].ToString();
start = i + delimiter.Length;
if (field == "\\N")
field = "NULL";
result.Add(field);
continue;
}
}
result.Add(span[start..].ToString());
for (var i = 0; i < result.Count; i++)
{
var field = result[i];
if (DumpDataHelper.CheckHexField(field) && StringExtensions.CheckJsonHex(field))
{
result[i] = StringExtensions.FromHex(field);
}
}
return result.ToArray();
}
public string[] ParseRow2(ReadOnlySpan<char> source, char quoteChar, string delimiter)
{
var result = new List<string>();
var index = -1;
StringBuilder current = new StringBuilder();
bool hasQuote = false;
bool hasSlash = false;
while (index < source.Length-1)
{
index++;
if (hasSlash == false && source[index] == '\\')
{
hasSlash = true;
current.Append('\\');
continue;
}
if (hasSlash ==false && source[index] == quoteChar)
{
hasQuote = !hasQuote;
current.Append(source[index]);
continue;
}
if (hasQuote==false && source[index] == delimiter[0])
{
result.Add(current.ToString());
current.Clear();
}
else
{
current.Append(source[index]);
}
hasSlash = false;
}
result.Add(current.ToString());
return result.ToArray();
}
}

View File

@@ -0,0 +1,44 @@
namespace ConsoleApp2.Services;
public class ProcessContext
{
private int _inputCount;
private int _transformCount;
private int _outputCount;
public bool IsInputCompleted { get; private set; }
public bool IsTransformCompleted { get; private set; }
public bool IsOutputCompleted { get; private set; }
public int InputCount
{
get => _inputCount;
private set => _inputCount = value;
}
public int TransformCount
{
get => _transformCount;
private set => _transformCount = value;
}
public int OutputCount
{
get => _outputCount;
private set => _outputCount = value;
}
public void CompleteInput() => IsInputCompleted = true;
public void CompleteTransform() => IsTransformCompleted = true;
public void CompleteOutput() => IsOutputCompleted = true;
public void AddInput() => Interlocked.Increment(ref _inputCount);
public void AddInput(int count) => Interlocked.Add(ref _inputCount, count);
public void AddTransform() => Interlocked.Increment(ref _transformCount);
public void AddTransform(int count) => Interlocked.Add(ref _transformCount, count);
public void AddOutput() => Interlocked.Increment(ref _outputCount);
public void AddOutput(int count) => Interlocked.Add(ref _outputCount, count);
}

View File

@@ -22,7 +22,7 @@ public class TaskManager
{
var task = Task.Factory.StartNew(func);
_tasks.Add(task);
_logger.LogInformation("New task created.");
_logger.LogDebug("New task created.");
return task;
}
}

View File

@@ -1,57 +0,0 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services;
public class TaskMonitorService : BackgroundService
{
private readonly IHostApplicationLifetime _lifetime;
private readonly TaskManager _taskManager;
private readonly ILogger<TaskMonitorService> _logger;
public TaskMonitorService(IHostApplicationLifetime lifetime, TaskManager taskManager,
ILogger<TaskMonitorService> logger)
{
_lifetime = lifetime;
_taskManager = taskManager;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!_taskManager.MainTaskCompleted || _taskManager.RunningTaskCount != 0)
{
var running = 0;
var error = 0;
var completed = 0;
var canceled = 0;
foreach (var task in _taskManager.Tasks)
{
switch (task.Status)
{
case TaskStatus.Running:
running++;
break;
case TaskStatus.Canceled:
canceled++;
break;
case TaskStatus.Faulted:
error++;
break;
case TaskStatus.RanToCompletion:
completed++;
break;
default:
throw new ArgumentOutOfRangeException();
}
}
_logger.LogInformation(
"Task monitor: running: {Running}, error: {Error}, completed: {Completed}, canceled: {Canceled}",
running, error, completed, canceled);
await Task.Delay(2000);
}
_logger.LogInformation("***** All tasks completed *****");
}
}

View File

@@ -0,0 +1,6 @@
namespace ConsoleApp2.Services;
public class TsvSource
{
}