整理代码

This commit is contained in:
lindj 2024-01-15 17:26:44 +08:00
parent 0984853c79
commit 78cd833617
7 changed files with 114 additions and 139 deletions

View File

@ -54,20 +54,22 @@ public class OutputService : IOutputService
} }
if (_context.GetExceptions().Count>0) if (_context.GetExceptions().Count>0)
{ {
_logger.LogInformation("***** Csv output service is canceled *****"); _logger.LogInformation("***** Csv output thread is canceled *****");
return; return;
} }
} }
if (_context.IsTransformCompleted && records.Count > 0) if (records.Count > 0)
{ {
await FlushAsync(records); await FlushAsync(records);
records.Clear(); records.Clear();
_context.CompleteOutput(); _logger.LogInformation("***** Mysql output thread completed *****");
_logger.LogInformation("***** Mysql output service completed *****");
} }
}, _options.Value.TaskCount); }, _options.Value.TaskCount);
await _taskManager.WaitAll(); await _taskManager.WaitAll();
//_context.CompleteOutput();
_logger.LogInformation(@"***** Mysql output service completed *****");
} }
private async Task FlushAsync(IEnumerable<DataRecord> records) private async Task FlushAsync(IEnumerable<DataRecord> records)

View File

@ -65,7 +65,7 @@ public class TransformService : ITransformService
field = string.IsNullOrEmpty(field) ? "''" : _options.Value.TransformBinary?.Invoke(field) ?? field; ; field = string.IsNullOrEmpty(field) ? "''" : _options.Value.TransformBinary?.Invoke(field) ?? field; ;
break; break;
case ColumnType.Blob: case ColumnType.Blob:
field = string.IsNullOrEmpty(field) ? "NULL" : $"0x{field}"; //field = string.IsNullOrEmpty(field) ? "NULL" : $"0x{field}";
break; break;
default: default:
break; break;
@ -88,7 +88,7 @@ public class TransformService : ITransformService
_consumerQueue.Enqueue(record); _consumerQueue.Enqueue(record);
//数据增加 //数据增加
var addRecords=_options.Value.RecordAdd?.Invoke(record); var addRecords=_options.Value.RecordAdd?.Invoke(record);
if(addRecords != null) if(addRecords != null&& addRecords.Count>0)
{ {
foreach(var rc in addRecords) foreach(var rc in addRecords)
{ {

View File

@ -19,6 +19,7 @@ public class DataTransformOptions
public Action<DataRecord>? RecordModify { get; set; }//数据修改 public Action<DataRecord>? RecordModify { get; set; }//数据修改
public Func<DataRecord, DataRecord?>? RecordReplace { get; set; }//数据替换 public Func<DataRecord, DataRecord?>? RecordReplace { get; set; }//数据替换
public Func<DataRecord, IList<DataRecord>?>? RecordAdd { get; set; }//数据替换 public Func<DataRecord, IList<DataRecord>?>? RecordAdd { get; set; }//数据替换
public Action<DataRecord, IDatabase>? RecordCache { get; set; }//数据缓存
/// <summary> /// <summary>
/// 配置导入数据的特殊列 /// 配置导入数据的特殊列

View File

@ -6,6 +6,7 @@ using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options; using ConsoleApp2.Options;
using ConsoleApp2.Services; using ConsoleApp2.Services;
using ConsoleApp2.SimulationService; using ConsoleApp2.SimulationService;
using Microsoft.Extensions.Caching.StackExchangeRedis;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
@ -13,6 +14,7 @@ using Microsoft.Extensions.Logging;
using MySqlConnector; using MySqlConnector;
using Serilog; using Serilog;
using Serilog.Core; using Serilog.Core;
using StackExchange.Redis;
using System.Reflection.PortableExecutable; using System.Reflection.PortableExecutable;
@ -179,6 +181,7 @@ async Task RunProgram()
} }
} }
}; };
//数据替换 //数据替换
options.RecordReplace = (record) => options.RecordReplace = (record) =>
@ -196,6 +199,10 @@ async Task RunProgram()
var fields = fs.ToArray(); var fields = fs.ToArray();
return new DataRecord(fields, record.TableName, headers, record.CompanyID); return new DataRecord(fields, record.TableName, headers, record.CompanyID);
} }
}
if (record.TableName == "order_process_step")
{
} }
return null; return null;
}; };
@ -212,16 +219,31 @@ async Task RunProgram()
var packageIDIndex = Array.IndexOf(record.Headers, "PackageID"); var packageIDIndex = Array.IndexOf(record.Headers, "PackageID");
var companyIDIndex = Array.IndexOf(record.Headers, "CompanyID"); var companyIDIndex = Array.IndexOf(record.Headers, "CompanyID");
//resultList.Add(new DataRecord( resultList.Add(new DataRecord(
// new[] { "ItemID", "ShardKey", "PlanID","CompanyID" }, "order_block_plan_item", new[] { "ItemID", "ShardKey", "PlanID", "CompanyID" }, "order_block_plan_item",
// new[] { record.Fields[itemIDIndex], record.Fields[shardKeyIndex], record.Fields[planIDIndex], record.Fields[companyIDIndex] })); new[] { record.Fields[itemIDIndex], record.Fields[shardKeyIndex], record.Fields[planIDIndex], record.Fields[companyIDIndex] }));
//resultList.Add( resultList.Add(
// new DataRecord(new[] { "ItemID", "ShardKey", "PackageID", "CompanyID" }, "order_package_item", new DataRecord(new[] { "ItemID", "ShardKey", "PackageID", "CompanyID" }, "order_package_item",
// new[] { record.Fields[itemIDIndex], record.Fields[shardKeyIndex], record.Fields[packageIDIndex], record.Fields[companyIDIndex] })); new[] { record.Fields[itemIDIndex], record.Fields[shardKeyIndex], record.Fields[packageIDIndex], record.Fields[companyIDIndex] }));
} }
return resultList; return resultList;
}; };
options.RecordCache = async (record, db) =>
{
if(record.TableName == "order_process")
{
var skIndex = Array.IndexOf(record.Headers, "ShardKey");
if(skIndex > -1)
{
var sk = record.Fields[skIndex];
var idIndex = Array.IndexOf(record.Headers, "ID");
var id = record.Fields[idIndex];
await db.SetAddAsync(id, sk);
}
}
};
options.ColumnTypeConfig = new() options.ColumnTypeConfig = new()
{ {
{ "simple_plan_order.PlaceData", ColumnType.Blob }, { "simple_plan_order.PlaceData", ColumnType.Blob },
@ -283,10 +305,9 @@ async Task RunProgram()
host.Services.AddSingleton<IInputService, SimulationInputService>(); host.Services.AddSingleton<IInputService, SimulationInputService>();
host.Services.AddSingleton<ITransformService, TransformService>(); host.Services.AddSingleton<ITransformService, TransformService>();
host.Services.AddSingleton<IOutputService, OutputService>(); host.Services.AddSingleton<IOutputService, OutputService>();
host.Services.AddStackExchangeRedisCache(options => var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get<RedisCacheOptions>()??new RedisCacheOptions();
{ var redis = ConnectionMultiplexer.Connect(redisOptions.Configuration);
options.Configuration = "localhost:6379"; host.Services.AddSingleton(redis);
});
var app = host.Build(); var app = host.Build();
await app.RunAsync(); await app.RunAsync();
} }

View File

@ -1,4 +1,5 @@
using System.Text; using System.Reflection.PortableExecutable;
using System.Text;
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using ConsoleApp2.Helpers; using ConsoleApp2.Helpers;
using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.HostedServices.Abstractions;
@ -15,46 +16,27 @@ public class CsvSource:IDataSource
//protected readonly StreamReader _reader; //protected readonly StreamReader _reader;
private readonly ILogger? _logger; private readonly ILogger? _logger;
protected readonly string _tableName; protected readonly string _tableName;
protected string _sqlFilePath; protected string? _sqlFilePath;
protected readonly string _sqlFileText; protected readonly string? _sqlFileText;
protected string[]? headers;
//public DataRecord Current { get; protected set; } protected string[]? csvFiles;
//public string[]? Headers { get; }
public string? CurrentRaw { get; protected set; } public string? CurrentRaw { get; protected set; }
public string Delimiter { get; private set; } public string Delimiter { get; private set; }
public char QuoteChar { get; private set; } public char QuoteChar { get; private set; }
public CsvSource(string inputDir,string tableName,string delimiter = ",", char quoteChar = '"', public CsvSource(string inputDir,string tableName,string delimiter = ",", char quoteChar = '"',
ILogger? logger = null) ILogger? logger = null)
{ {
_inputDir = inputDir; _inputDir = inputDir;
_tableName = tableName; _tableName = tableName;
//Headers = headers;
_logger = logger; _logger = logger;
Delimiter = delimiter; Delimiter = delimiter;
QuoteChar = quoteChar; QuoteChar = quoteChar;
//var fs = File.OpenRead(filePath);
//_reader = new StreamReader(fs);
//_tableName = DumpDataHelper.GetTableName(filePath);
string pattern = $"^.*\\.{tableName}\\..*\\.sql$"; string pattern = $"^.*\\.{tableName}\\..*\\.sql$";
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success); _sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success);
} }
//public virtual async ValueTask<bool> ReadAsync()
//{
// var str = await _reader.ReadLineAsync();
// if (string.IsNullOrWhiteSpace(str))
// return false;
// CurrentRaw = str;
// var fields = ParseRow2(str, QuoteChar, Delimiter);
// Current = new DataRecord(fields, _tableName, Headers);
// return true;
//}
public string[] ParseRow(string row, char quoteChar, string delimiter) public string[] ParseRow(string row, char quoteChar, string delimiter)
{ {
@ -145,23 +127,18 @@ public class CsvSource:IDataSource
result.Add(current.ToString()); result.Add(current.ToString());
return result.ToArray(); return result.ToArray();
} }
public virtual async Task<string[]> GetHeaders() public virtual async Task GetHeaderAndCsvFiles()
{ {
var text = await File.ReadAllTextAsync(_sqlFilePath); var text = await File.ReadAllTextAsync(_sqlFilePath);
return await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text); headers = await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
csvFiles = await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
}
public virtual async Task<string[]> GetCsvFiles()
{
var text= await File.ReadAllTextAsync(_sqlFilePath);
return await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text,new Regex(@"'.+\.dat'"));
} }
public virtual async Task DoEnqueue(Action<DataRecord> action) public virtual async Task DoEnqueue(Action<DataRecord> action)
{ {
var sourceFiles =await GetCsvFiles(); await GetHeaderAndCsvFiles();
foreach (var file in sourceFiles) foreach (var file in csvFiles)
{ {
var headers = await GetHeaders();
var filePath= Path.Combine(_inputDir, file); var filePath= Path.Combine(_inputDir, file);
using (var fs = File.OpenRead(filePath)) using (var fs = File.OpenRead(filePath))
{ {
@ -182,11 +159,10 @@ public class CsvSource:IDataSource
} }
public virtual async Task<DataRecord?> GetTestRecord() public virtual async Task<DataRecord?> GetTestRecord()
{ {
var sourceFiles = await GetCsvFiles(); await GetHeaderAndCsvFiles();
var file = sourceFiles.FirstOrDefault(); var file = csvFiles.FirstOrDefault();
if (file != null) if (file != null)
{ {
var headers = await GetHeaders();
var filePath = Path.Combine(_inputDir, file); var filePath = Path.Combine(_inputDir, file);
using (var fs = File.OpenRead(filePath)) using (var fs = File.OpenRead(filePath))
{ {

View File

@ -17,7 +17,7 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
private readonly bool _prettyOutput; private readonly bool _prettyOutput;
private readonly int _maxAllowPacket; private readonly int _maxAllowPacket;
private readonly ProcessContext _context; private readonly ProcessContext _context;
private static StringBuilder recordSb = new StringBuilder();
public MySqlDestination(string connStr, ILogger logger, ProcessContext context,bool prettyOutput = false) public MySqlDestination(string connStr, ILogger logger, ProcessContext context,bool prettyOutput = false)
{ {
_conn = new MySqlConnection(connStr); _conn = new MySqlConnection(connStr);
@ -53,29 +53,27 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
if (_recordCache.Count == 0) if (_recordCache.Count == 0)
return; return;
//var cmd = _conn.CreateCommand(); var cmd = _conn.CreateCommand();
//cmd.CommandTimeout = 3 * 60; cmd.CommandTimeout = 3 * 60;
var excuseList = GetExcuseList(_recordCache, maxAllowPacket, _prettyOutput);
try try
{ {
var excuseList = GetExcuseList(_recordCache, maxAllowPacket, _prettyOutput); foreach (var insertSql in excuseList)
//foreach (var insertSql in excuseList) {
//{ cmd.CommandText = insertSql;
// //cmd.CommandText = insertSql; await cmd.ExecuteNonQueryAsync();
// //await cmd.ExecuteNonQueryAsync(); }
// //_logger.LogInformation(@"do insert completed!size:{Length}", cmd.CommandText.Length);
//}
_recordCache.Clear(); _recordCache.Clear();
} }
catch (Exception e) catch (Exception e)
{ {
//_logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000)); _logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000));
_context.AddException(e); _context.AddException(e);
throw; throw;
} }
finally finally
{ {
//await cmd.DisposeAsync(); await cmd.DisposeAsync();
} }
} }
@ -83,25 +81,24 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
bool prettyOutput = false) bool prettyOutput = false)
{ {
var resultList = new List<string>(); var resultList = new List<string>();
var headerSb = string.Empty; var headerSb = new StringBuilder();
//var recordSb = new StringBuilder(); var recordSb = new StringBuilder();
recordSb.Clear();
foreach (var (tableName, records) in tableRecords) foreach (var (tableName, records) in tableRecords)
{ {
if (records.Count == 0) if (records.Count == 0)
continue; continue;
headerSb=$"INSERT INTO `{tableName}`("; headerSb.Append($"INSERT INTO `{tableName}`(");
for (var i = 0; i < records[0].Headers.Length; i++) for (var i = 0; i < records[0].Headers.Length; i++)
{ {
var header = records[0].Headers[i]; var header = records[0].Headers[i];
headerSb+=$"`{header}`"; headerSb.Append($"`{header}`");
if (i != records[0].Headers.Length - 1) if (i != records[0].Headers.Length - 1)
headerSb.Append(','); headerSb.Append(',');
} }
headerSb+=") VALUES "; headerSb.Append(") VALUES ");
if (prettyOutput) if (prettyOutput)
headerSb+="/r/n"; headerSb.AppendLine();
var sbList = new List<string>(); var sbList = new List<string>();
var currentLength = headerSb.Length; var currentLength = headerSb.Length;
@ -112,6 +109,11 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
for (var j = 0; j < record.Fields.Length; j++) for (var j = 0; j < record.Fields.Length; j++)
{ {
var field = record.Fields[j]; var field = record.Fields[j];
if (record.TableName == "order_block_plan_result" && j == 2)
{
recordSb.Append("0x"+field);
}
else
recordSb.Append(field); recordSb.Append(field);
if (j != record.Fields.Length - 1) if (j != record.Fields.Length - 1)
recordSb.Append(','); recordSb.Append(',');
@ -126,12 +128,12 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
if (currentLength + recordSb.Length >= maxAllowPacket) if (currentLength + recordSb.Length >= maxAllowPacket)
{ {
var insertSb = headerSb; var insertSb = new StringBuilder(headerSb.ToString());
insertSb+=string.Join(",", sbList); insertSb.Append(string.Join(",", sbList));
insertSb += ";"; insertSb.Append(";");
resultList.Add(insertSb); resultList.Add(insertSb.ToString());
insertSb=String.Empty; insertSb.Clear();
sbList.Clear(); sbList.Clear();
currentLength = headerSb.Length; currentLength = headerSb.Length;
sbList.Add(recordSb.ToString()); sbList.Add(recordSb.ToString());
@ -145,15 +147,18 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
} }
if (sbList.Count > 0) if (sbList.Count > 0)
{ {
var insertSb = headerSb.ToString(); var insertSb = new StringBuilder(headerSb.ToString());
insertSb += string.Join(",", sbList); insertSb.Append(string.Join(",", sbList));
insertSb += ";"; insertSb.Append(";");
resultList.Add(insertSb.ToString()); resultList.Add(insertSb.ToString());
insertSb=string.Empty; insertSb.Clear();
} }
headerSb=string.Empty; headerSb.Clear();
}
if (resultList.Count == 2)
{
var a = 1;
} }
return resultList; return resultList;
} }

View File

@ -32,89 +32,59 @@ namespace ConsoleApp2.Services
} }
} }
} }
public override async Task<string[]> GetHeaders() public override async Task GetHeaderAndCsvFiles()
{ {
var text = await DecompressFile(_sqlFilePath); var text = await DecompressFile(_sqlFilePath);
return await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text); headers=await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
csvFiles=await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
} }
public override async Task<string[]> GetCsvFiles()
{
var text = await DecompressFile(_sqlFilePath);
return await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
}
public override async Task DoEnqueue(Action<DataRecord> action) public override async Task DoEnqueue(Action<DataRecord> action)
{ {
var sourceFiles = await GetCsvFiles(); await GetHeaderAndCsvFiles();
var headers = await GetHeaders(); foreach (var file in csvFiles)
foreach (var file in sourceFiles)
{ {
var filePath = Path.Combine(_inputDir, file); var filePath = Path.Combine(_inputDir, file);
using (var input = File.OpenRead(filePath)) using (var input = File.OpenRead(filePath))
{ {
using (var decopress = new DecompressionStream(input)) using (var decopress = new DecompressionStream(input))
{ {
using( var reader = new StreamReader(decopress))
var ms = new MemoryStream();
decopress.CopyTo(ms);
ms.Seek(0, SeekOrigin.Begin);
StreamReader reader = new StreamReader(ms);
while (!reader.EndOfStream)
{ {
var line = await reader.ReadLineAsync(); while (!reader.EndOfStream)
var fields = ParseRow2(line, QuoteChar, Delimiter); {
var record = new DataRecord(fields, _tableName, headers); var line = await reader.ReadLineAsync();
action?.Invoke(record); var fields = ParseRow2(line, QuoteChar, Delimiter);
var record = new DataRecord(fields, _tableName, headers);
action?.Invoke(record);
}
} }
} }
} }
//var headers = await GetHeaders();
//using (StreamReader sr = new StreamReader(file))
//{
// while (!sr.EndOfStream)
// {
// var line = await sr.ReadLineAsync();
// var fields = ParseRow2(line, QuoteChar, Delimiter);
// var record = new DataRecord(fields, _tableName, headers);
// action?.Invoke(record);
// }
//}
} }
} }
public override async Task<DataRecord?> GetTestRecord() public override async Task<DataRecord?> GetTestRecord()
{ {
var sourceFiles = await GetCsvFiles(); await GetHeaderAndCsvFiles();
var file = sourceFiles.FirstOrDefault(); var file = csvFiles.FirstOrDefault();
if (file != null) if (file != null)
{ {
var headers = await GetHeaders();
var filePath = Path.Combine(_inputDir, file); var filePath = Path.Combine(_inputDir, file);
using (var input = File.OpenRead(filePath)) using (var input = File.OpenRead(filePath))
{ {
using (var decopress = new DecompressionStream(input)) using (var decopress = new DecompressionStream(input))
{ {
using (var reader = new StreamReader(decopress))
var ms = new MemoryStream(); {
decopress.CopyTo(ms); var line = await reader.ReadLineAsync();
ms.Seek(0, SeekOrigin.Begin); var fields = ParseRow2(line, QuoteChar, Delimiter);
StreamReader reader = new StreamReader(ms); var record = new DataRecord(fields, _tableName, headers);
var line = await reader.ReadLineAsync(); return record;
var fields = ParseRow2(line, QuoteChar, Delimiter); }
var record = new DataRecord(fields, _tableName, headers);
return record;
} }
} }
//using (var fs = File.OpenRead(filePath))
//{
// using (StreamReader sr = new StreamReader(fs))
// {
// var line = await sr.ReadLineAsync();
// var fields = ParseRow2(line, QuoteChar, Delimiter);
// var record = new DataRecord(fields, _tableName, headers);
// return record;
// }
//}
} }
return null; return null;
} }