diff --git a/ConsoleApp2/HostedServices/OutputService.cs b/ConsoleApp2/HostedServices/OutputService.cs index f838125..4201604 100644 --- a/ConsoleApp2/HostedServices/OutputService.cs +++ b/ConsoleApp2/HostedServices/OutputService.cs @@ -54,20 +54,22 @@ public class OutputService : IOutputService } if (_context.GetExceptions().Count>0) { - _logger.LogInformation("***** Csv output service is canceled *****"); + _logger.LogInformation("***** Csv output thread is canceled *****"); return; } } - if (_context.IsTransformCompleted && records.Count > 0) + if (records.Count > 0) { await FlushAsync(records); records.Clear(); - _context.CompleteOutput(); - _logger.LogInformation("***** Mysql output service completed *****"); + _logger.LogInformation("***** Mysql output thread completed *****"); } }, _options.Value.TaskCount); await _taskManager.WaitAll(); + //_context.CompleteOutput(); + _logger.LogInformation(@"***** Mysql output service completed *****"); + } private async Task FlushAsync(IEnumerable records) diff --git a/ConsoleApp2/HostedServices/TransformService.cs b/ConsoleApp2/HostedServices/TransformService.cs index 97ec4f5..25450f5 100644 --- a/ConsoleApp2/HostedServices/TransformService.cs +++ b/ConsoleApp2/HostedServices/TransformService.cs @@ -65,7 +65,7 @@ public class TransformService : ITransformService field = string.IsNullOrEmpty(field) ? "''" : _options.Value.TransformBinary?.Invoke(field) ?? field; ; break; case ColumnType.Blob: - field = string.IsNullOrEmpty(field) ? "NULL" : $"0x{field}"; + //field = string.IsNullOrEmpty(field) ? "NULL" : $"0x{field}"; break; default: break; @@ -88,7 +88,7 @@ public class TransformService : ITransformService _consumerQueue.Enqueue(record); //数据增加 var addRecords=_options.Value.RecordAdd?.Invoke(record); - if(addRecords != null) + if(addRecords != null&& addRecords.Count>0) { foreach(var rc in addRecords) { diff --git a/ConsoleApp2/Options/DataTransformOptions.cs b/ConsoleApp2/Options/DataTransformOptions.cs index 3435ffd..0afb2cb 100644 --- a/ConsoleApp2/Options/DataTransformOptions.cs +++ b/ConsoleApp2/Options/DataTransformOptions.cs @@ -19,6 +19,7 @@ public class DataTransformOptions public Action? RecordModify { get; set; }//数据修改 public Func? RecordReplace { get; set; }//数据替换 public Func?>? RecordAdd { get; set; }//数据替换 + public Action? RecordCache { get; set; }//数据缓存 /// /// 配置导入数据的特殊列 diff --git a/ConsoleApp2/Program.cs b/ConsoleApp2/Program.cs index 6c0ed05..d4fb497 100644 --- a/ConsoleApp2/Program.cs +++ b/ConsoleApp2/Program.cs @@ -6,6 +6,7 @@ using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.Options; using ConsoleApp2.Services; using ConsoleApp2.SimulationService; +using Microsoft.Extensions.Caching.StackExchangeRedis; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -13,6 +14,7 @@ using Microsoft.Extensions.Logging; using MySqlConnector; using Serilog; using Serilog.Core; +using StackExchange.Redis; using System.Reflection.PortableExecutable; @@ -179,6 +181,7 @@ async Task RunProgram() } } + }; //数据替换 options.RecordReplace = (record) => @@ -196,6 +199,10 @@ async Task RunProgram() var fields = fs.ToArray(); return new DataRecord(fields, record.TableName, headers, record.CompanyID); } + } + if (record.TableName == "order_process_step") + { + } return null; }; @@ -212,16 +219,31 @@ async Task RunProgram() var packageIDIndex = Array.IndexOf(record.Headers, "PackageID"); var companyIDIndex = Array.IndexOf(record.Headers, "CompanyID"); - //resultList.Add(new DataRecord( - // new[] { "ItemID", "ShardKey", "PlanID","CompanyID" }, "order_block_plan_item", - // new[] { record.Fields[itemIDIndex], record.Fields[shardKeyIndex], record.Fields[planIDIndex], record.Fields[companyIDIndex] })); - //resultList.Add( - // new DataRecord(new[] { "ItemID", "ShardKey", "PackageID", "CompanyID" }, "order_package_item", - // new[] { record.Fields[itemIDIndex], record.Fields[shardKeyIndex], record.Fields[packageIDIndex], record.Fields[companyIDIndex] })); + resultList.Add(new DataRecord( + new[] { "ItemID", "ShardKey", "PlanID", "CompanyID" }, "order_block_plan_item", + new[] { record.Fields[itemIDIndex], record.Fields[shardKeyIndex], record.Fields[planIDIndex], record.Fields[companyIDIndex] })); + resultList.Add( + new DataRecord(new[] { "ItemID", "ShardKey", "PackageID", "CompanyID" }, "order_package_item", + new[] { record.Fields[itemIDIndex], record.Fields[shardKeyIndex], record.Fields[packageIDIndex], record.Fields[companyIDIndex] })); } return resultList; }; + options.RecordCache = async (record, db) => + { + if(record.TableName == "order_process") + { + var skIndex = Array.IndexOf(record.Headers, "ShardKey"); + if(skIndex > -1) + { + var sk = record.Fields[skIndex]; + var idIndex = Array.IndexOf(record.Headers, "ID"); + var id = record.Fields[idIndex]; + await db.SetAddAsync(id, sk); + } + } + + }; options.ColumnTypeConfig = new() { { "simple_plan_order.PlaceData", ColumnType.Blob }, @@ -283,10 +305,9 @@ async Task RunProgram() host.Services.AddSingleton(); host.Services.AddSingleton(); host.Services.AddSingleton(); - host.Services.AddStackExchangeRedisCache(options => - { - options.Configuration = "localhost:6379"; - }); + var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get()??new RedisCacheOptions(); + var redis = ConnectionMultiplexer.Connect(redisOptions.Configuration); + host.Services.AddSingleton(redis); var app = host.Build(); await app.RunAsync(); } \ No newline at end of file diff --git a/ConsoleApp2/Services/CsvSource.cs b/ConsoleApp2/Services/CsvSource.cs index 364ac83..929fa66 100644 --- a/ConsoleApp2/Services/CsvSource.cs +++ b/ConsoleApp2/Services/CsvSource.cs @@ -1,4 +1,5 @@ -using System.Text; +using System.Reflection.PortableExecutable; +using System.Text; using System.Text.RegularExpressions; using ConsoleApp2.Helpers; using ConsoleApp2.HostedServices.Abstractions; @@ -15,46 +16,27 @@ public class CsvSource:IDataSource //protected readonly StreamReader _reader; private readonly ILogger? _logger; protected readonly string _tableName; - protected string _sqlFilePath; - protected readonly string _sqlFileText; - - //public DataRecord Current { get; protected set; } - //public string[]? Headers { get; } + protected string? _sqlFilePath; + protected readonly string? _sqlFileText; + protected string[]? headers; + protected string[]? csvFiles; public string? CurrentRaw { get; protected set; } public string Delimiter { get; private set; } public char QuoteChar { get; private set; } - public CsvSource(string inputDir,string tableName,string delimiter = ",", char quoteChar = '"', ILogger? logger = null) { _inputDir = inputDir; _tableName = tableName; - //Headers = headers; _logger = logger; Delimiter = delimiter; QuoteChar = quoteChar; - - //var fs = File.OpenRead(filePath); - //_reader = new StreamReader(fs); - //_tableName = DumpDataHelper.GetTableName(filePath); string pattern = $"^.*\\.{tableName}\\..*\\.sql$"; _sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success); } - //public virtual async ValueTask ReadAsync() - //{ - // var str = await _reader.ReadLineAsync(); - // if (string.IsNullOrWhiteSpace(str)) - // return false; - - // CurrentRaw = str; - - // var fields = ParseRow2(str, QuoteChar, Delimiter); - // Current = new DataRecord(fields, _tableName, Headers); - // return true; - //} public string[] ParseRow(string row, char quoteChar, string delimiter) { @@ -145,23 +127,18 @@ public class CsvSource:IDataSource result.Add(current.ToString()); return result.ToArray(); } - public virtual async Task GetHeaders() + public virtual async Task GetHeaderAndCsvFiles() { var text = await File.ReadAllTextAsync(_sqlFilePath); - return await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text); - - } - public virtual async Task GetCsvFiles() - { - var text= await File.ReadAllTextAsync(_sqlFilePath); - return await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text,new Regex(@"'.+\.dat'")); + headers = await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text); + csvFiles = await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'")); + } public virtual async Task DoEnqueue(Action action) { - var sourceFiles =await GetCsvFiles(); - foreach (var file in sourceFiles) + await GetHeaderAndCsvFiles(); + foreach (var file in csvFiles) { - var headers = await GetHeaders(); var filePath= Path.Combine(_inputDir, file); using (var fs = File.OpenRead(filePath)) { @@ -182,11 +159,10 @@ public class CsvSource:IDataSource } public virtual async Task GetTestRecord() { - var sourceFiles = await GetCsvFiles(); - var file = sourceFiles.FirstOrDefault(); + await GetHeaderAndCsvFiles(); + var file = csvFiles.FirstOrDefault(); if (file != null) { - var headers = await GetHeaders(); var filePath = Path.Combine(_inputDir, file); using (var fs = File.OpenRead(filePath)) { diff --git a/ConsoleApp2/Services/MySqlDestination.cs b/ConsoleApp2/Services/MySqlDestination.cs index b6ed583..44952fa 100644 --- a/ConsoleApp2/Services/MySqlDestination.cs +++ b/ConsoleApp2/Services/MySqlDestination.cs @@ -17,7 +17,7 @@ public class MySqlDestination : IDisposable, IAsyncDisposable private readonly bool _prettyOutput; private readonly int _maxAllowPacket; private readonly ProcessContext _context; - private static StringBuilder recordSb = new StringBuilder(); + public MySqlDestination(string connStr, ILogger logger, ProcessContext context,bool prettyOutput = false) { _conn = new MySqlConnection(connStr); @@ -53,29 +53,27 @@ public class MySqlDestination : IDisposable, IAsyncDisposable if (_recordCache.Count == 0) return; - //var cmd = _conn.CreateCommand(); - //cmd.CommandTimeout = 3 * 60; - + var cmd = _conn.CreateCommand(); + cmd.CommandTimeout = 3 * 60; + var excuseList = GetExcuseList(_recordCache, maxAllowPacket, _prettyOutput); try { - var excuseList = GetExcuseList(_recordCache, maxAllowPacket, _prettyOutput); - //foreach (var insertSql in excuseList) - //{ - // //cmd.CommandText = insertSql; - // //await cmd.ExecuteNonQueryAsync(); - // //_logger.LogInformation(@"do insert completed!size:{Length}", cmd.CommandText.Length); - //} + foreach (var insertSql in excuseList) + { + cmd.CommandText = insertSql; + await cmd.ExecuteNonQueryAsync(); + } _recordCache.Clear(); } catch (Exception e) { - //_logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000)); + _logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000)); _context.AddException(e); throw; } finally { - //await cmd.DisposeAsync(); + await cmd.DisposeAsync(); } } @@ -83,25 +81,24 @@ public class MySqlDestination : IDisposable, IAsyncDisposable bool prettyOutput = false) { var resultList = new List(); - var headerSb = string.Empty; - //var recordSb = new StringBuilder(); - recordSb.Clear(); + var headerSb = new StringBuilder(); + var recordSb = new StringBuilder(); foreach (var (tableName, records) in tableRecords) { if (records.Count == 0) continue; - headerSb=$"INSERT INTO `{tableName}`("; + headerSb.Append($"INSERT INTO `{tableName}`("); for (var i = 0; i < records[0].Headers.Length; i++) { var header = records[0].Headers[i]; - headerSb+=$"`{header}`"; + headerSb.Append($"`{header}`"); if (i != records[0].Headers.Length - 1) headerSb.Append(','); } - headerSb+=") VALUES "; + headerSb.Append(") VALUES "); if (prettyOutput) - headerSb+="/r/n"; + headerSb.AppendLine(); var sbList = new List(); var currentLength = headerSb.Length; @@ -112,6 +109,11 @@ public class MySqlDestination : IDisposable, IAsyncDisposable for (var j = 0; j < record.Fields.Length; j++) { var field = record.Fields[j]; + if (record.TableName == "order_block_plan_result" && j == 2) + { + recordSb.Append("0x"+field); + } + else recordSb.Append(field); if (j != record.Fields.Length - 1) recordSb.Append(','); @@ -126,12 +128,12 @@ public class MySqlDestination : IDisposable, IAsyncDisposable if (currentLength + recordSb.Length >= maxAllowPacket) { - var insertSb = headerSb; + var insertSb = new StringBuilder(headerSb.ToString()); - insertSb+=string.Join(",", sbList); - insertSb += ";"; - resultList.Add(insertSb); - insertSb=String.Empty; + insertSb.Append(string.Join(",", sbList)); + insertSb.Append(";"); + resultList.Add(insertSb.ToString()); + insertSb.Clear(); sbList.Clear(); currentLength = headerSb.Length; sbList.Add(recordSb.ToString()); @@ -145,15 +147,18 @@ public class MySqlDestination : IDisposable, IAsyncDisposable } if (sbList.Count > 0) { - var insertSb = headerSb.ToString(); - insertSb += string.Join(",", sbList); - insertSb += ";"; + var insertSb = new StringBuilder(headerSb.ToString()); + insertSb.Append(string.Join(",", sbList)); + insertSb.Append(";"); resultList.Add(insertSb.ToString()); - insertSb=string.Empty; + insertSb.Clear(); } - headerSb=string.Empty; + headerSb.Clear(); + } + if (resultList.Count == 2) + { + var a = 1; } - return resultList; } diff --git a/ConsoleApp2/Services/ZstSource.cs b/ConsoleApp2/Services/ZstSource.cs index 62dee28..e658c81 100644 --- a/ConsoleApp2/Services/ZstSource.cs +++ b/ConsoleApp2/Services/ZstSource.cs @@ -32,89 +32,59 @@ namespace ConsoleApp2.Services } } } - public override async Task GetHeaders() + public override async Task GetHeaderAndCsvFiles() { var text = await DecompressFile(_sqlFilePath); - return await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text); + headers=await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text); + csvFiles=await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'")); } - public override async Task GetCsvFiles() - { - var text = await DecompressFile(_sqlFilePath); - return await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'")); - } public override async Task DoEnqueue(Action action) { - var sourceFiles = await GetCsvFiles(); - var headers = await GetHeaders(); - foreach (var file in sourceFiles) + await GetHeaderAndCsvFiles(); + foreach (var file in csvFiles) { var filePath = Path.Combine(_inputDir, file); using (var input = File.OpenRead(filePath)) { - using (var decopress = new DecompressionStream(input)) + using (var decopress = new DecompressionStream(input)) { - - var ms = new MemoryStream(); - decopress.CopyTo(ms); - ms.Seek(0, SeekOrigin.Begin); - StreamReader reader = new StreamReader(ms); - while (!reader.EndOfStream) + using( var reader = new StreamReader(decopress)) { - var line = await reader.ReadLineAsync(); - var fields = ParseRow2(line, QuoteChar, Delimiter); - var record = new DataRecord(fields, _tableName, headers); - action?.Invoke(record); + while (!reader.EndOfStream) + { + var line = await reader.ReadLineAsync(); + var fields = ParseRow2(line, QuoteChar, Delimiter); + var record = new DataRecord(fields, _tableName, headers); + action?.Invoke(record); + } } } } - //var headers = await GetHeaders(); - //using (StreamReader sr = new StreamReader(file)) - //{ - // while (!sr.EndOfStream) - // { - // var line = await sr.ReadLineAsync(); - // var fields = ParseRow2(line, QuoteChar, Delimiter); - // var record = new DataRecord(fields, _tableName, headers); - // action?.Invoke(record); - // } - //} + } } public override async Task GetTestRecord() { - var sourceFiles = await GetCsvFiles(); - var file = sourceFiles.FirstOrDefault(); + await GetHeaderAndCsvFiles(); + var file = csvFiles.FirstOrDefault(); if (file != null) { - var headers = await GetHeaders(); var filePath = Path.Combine(_inputDir, file); using (var input = File.OpenRead(filePath)) { using (var decopress = new DecompressionStream(input)) { - - var ms = new MemoryStream(); - decopress.CopyTo(ms); - ms.Seek(0, SeekOrigin.Begin); - StreamReader reader = new StreamReader(ms); - var line = await reader.ReadLineAsync(); - var fields = ParseRow2(line, QuoteChar, Delimiter); - var record = new DataRecord(fields, _tableName, headers); - return record; + using (var reader = new StreamReader(decopress)) + { + var line = await reader.ReadLineAsync(); + var fields = ParseRow2(line, QuoteChar, Delimiter); + var record = new DataRecord(fields, _tableName, headers); + return record; + } } } - //using (var fs = File.OpenRead(filePath)) - //{ - // using (StreamReader sr = new StreamReader(fs)) - // { - // var line = await sr.ReadLineAsync(); - // var fields = ParseRow2(line, QuoteChar, Delimiter); - // var record = new DataRecord(fields, _tableName, headers); - // return record; - // } - //} } return null; }