diff --git a/ConsoleApp2/ConsoleApp2.csproj b/ConsoleApp2/ConsoleApp2.csproj index 8d421c2..be7c309 100644 --- a/ConsoleApp2/ConsoleApp2.csproj +++ b/ConsoleApp2/ConsoleApp2.csproj @@ -9,12 +9,24 @@ + + + + + + PreserveNewest + + + + + + diff --git a/ConsoleApp2/DataRecord.cs b/ConsoleApp2/DataRecord.cs index 630eefb..f22576e 100644 --- a/ConsoleApp2/DataRecord.cs +++ b/ConsoleApp2/DataRecord.cs @@ -1,4 +1,6 @@ -namespace ConsoleApp2; +using System.ComponentModel.Design; + +namespace ConsoleApp2; public class DataRecord @@ -33,9 +35,10 @@ public class DataRecord public string TableName { get; } public string? Database { get; set; } + public int CompanyID { get; set; } - public DataRecord(string[] fields, string tableName, string[] headers) + public DataRecord(string[] fields, string tableName, string[] headers, int companyID=0) { if (fields.Length != headers.Length) throw new ArgumentException( @@ -45,6 +48,7 @@ public class DataRecord Fields = fields; TableName = tableName; Headers = headers; + CompanyID = companyID; } public string this[int index] diff --git a/ConsoleApp2/Helpers/DumpDataHelper.cs b/ConsoleApp2/Helpers/DumpDataHelper.cs index 5b38a5c..e3767d4 100644 --- a/ConsoleApp2/Helpers/DumpDataHelper.cs +++ b/ConsoleApp2/Helpers/DumpDataHelper.cs @@ -1,4 +1,5 @@ -using System.Text; +using ConsoleApp2.Options; +using System.Text; using System.Text.RegularExpressions; namespace ConsoleApp2.Helpers; @@ -11,9 +12,9 @@ public static partial class DumpDataHelper private static partial Regex MatchBrackets(); - public static async Task GetCsvHeadersFromSqlFileAsync(string filePath) + public static async Task GetCsvHeadersFromSqlFileAsync(string txt) { - var txt = await File.ReadAllTextAsync(filePath); + //var txt = await File.ReadAllTextAsync(filePath); var match = MatchBrackets().Match(txt); return ParseHeader(match.ValueSpan); @@ -59,10 +60,10 @@ public static partial class DumpDataHelper return filePath[(firstDotIdx+1)..secondDotIdx].ToString(); } - public static async Task GetCsvFileNamesFromSqlFileAsync(string filePath) + public static async Task GetCsvFileNamesFromSqlFileAsync(string txt,Regex regex) { - var txt = await File.ReadAllTextAsync(filePath); - var matches = MatchDatFile().Matches(txt); + //var txt = await File.ReadAllTextAsync(filePath); + var matches = regex.Matches(txt); return matches.Select(match => match.ValueSpan[1..^1].ToString()).ToArray(); } diff --git a/ConsoleApp2/Helpers/ValidateConsole.cs b/ConsoleApp2/Helpers/ValidateConsole.cs new file mode 100644 index 0000000..89c3662 --- /dev/null +++ b/ConsoleApp2/Helpers/ValidateConsole.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace ConsoleApp2.Helpers +{ + public static class ValidateConsole + { + public static void ValidateInput(Func converter,string message) + { + Console.Write(message); + string ? input = Console.ReadLine(); + while (true) + { + if (!string.IsNullOrEmpty(input)) + { + var result = converter(input); + if (result == false) + { + Console.WriteLine($"输入的内容不合法,请重新输入!"); + input = Console.ReadLine(); + } + else break; + } + break; + } + } + } +} diff --git a/ConsoleApp2/HostedServices/Abstractions/IDataSource.cs b/ConsoleApp2/HostedServices/Abstractions/IDataSource.cs new file mode 100644 index 0000000..c5b3619 --- /dev/null +++ b/ConsoleApp2/HostedServices/Abstractions/IDataSource.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace ConsoleApp2.HostedServices.Abstractions +{ + public interface IDataSource:IDisposable + { + public Task DoEnqueue(Action action); + } +} diff --git a/ConsoleApp2/HostedServices/InputService.cs b/ConsoleApp2/HostedServices/InputService.cs index c6385a0..43cbdae 100644 --- a/ConsoleApp2/HostedServices/InputService.cs +++ b/ConsoleApp2/HostedServices/InputService.cs @@ -15,54 +15,52 @@ namespace ConsoleApp2.HostedServices; public class InputService : IInputService { private readonly ILogger _logger; - private readonly IOptions _csvOptions; + private readonly IOptions _dataInputOptions; + private readonly IOptions _tableOptions; private readonly DataRecordQueue _producerQueue; private readonly ProcessContext _context; public InputService(ILogger logger, - IOptions csvOptions, - [FromKeyedServices(ProcessStep.Producer)]DataRecordQueue producerQueue, - ProcessContext context) + IOptions dataInputOptions, + IOptions tableOptions, + [FromKeyedServices(ProcessStep.Producer)] DataRecordQueue producerQueue, + ProcessContext context) { _logger = logger; - _csvOptions = csvOptions; + _dataInputOptions = dataInputOptions; + _tableOptions = tableOptions; _producerQueue = producerQueue; _context = context; } public async Task ExecuteAsync(CancellationToken cancellationToken) { - var inputDir = _csvOptions.Value.InputDir; + var inputDir = _dataInputOptions.Value.InputDir; _logger.LogInformation("***** Csv input service start, working dir: {InputDir}, thread id: {ThreadId} *****", inputDir, Environment.CurrentManagedThreadId); - var files = Directory.GetFiles(inputDir).Where(s => s.EndsWith(".sql") && !s.Contains("schema")).ToArray(); + var files = Directory.GetFiles(inputDir); if (files.Length == 0) { - _logger.LogInformation("No sql files found in {InputDir}", inputDir); + _logger.LogInformation("No source files found in {InputDir}", inputDir); return; } - - foreach (var sqlPath in files) + var count = 0; + foreach (var tableName in _tableOptions.Value.TableInfoConfig.Keys) { - _logger.LogInformation("Working sql file: {SqlPath}", sqlPath); - var headers = await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(sqlPath); - var csvFiles = await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(sqlPath); - - foreach (var csvFile in csvFiles) + _logger.LogInformation("Working table: {tableName}", tableName); + var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName); + await source.DoEnqueue((record) => { - var csvPath = Path.Combine(inputDir, csvFile); - // var source = new JsvSource(csvPath, headers, _logger); - var source = new CsvSource(csvPath, headers, _csvOptions.Value.Delimiter, _csvOptions.Value.QuoteChar, _logger); + _context.AddInput(); + _producerQueue.Enqueue(record); + count++; - while (await source.ReadAsync()) - { - _context.AddInput(); - _producerQueue.Enqueue(source.Current); - if (cancellationToken.IsCancellationRequested) - return; - } + }); + if (_context.GetExceptions().Count > 0) + { + _logger.LogInformation("***** Csv input service is canceled *****"); + return; } - - _logger.LogInformation("File '{File}' input completed", Path.GetFileName(sqlPath)); + _logger.LogInformation("table:'{tableName}' input completed", tableName); } _context.CompleteInput(); diff --git a/ConsoleApp2/HostedServices/MainHostedService.cs b/ConsoleApp2/HostedServices/MainHostedService.cs index 1e2d2db..82ccbf1 100644 --- a/ConsoleApp2/HostedServices/MainHostedService.cs +++ b/ConsoleApp2/HostedServices/MainHostedService.cs @@ -1,30 +1,73 @@ using ConsoleApp2.HostedServices.Abstractions; +using ConsoleApp2.Services; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System.Threading.Tasks; namespace ConsoleApp2.HostedServices; public class MainHostedService : BackgroundService { + private readonly ILogger _logger; private readonly IInputService _input; private readonly ITransformService _transform; private readonly IOutputService _output; + private readonly ProcessContext _context; - public MainHostedService(IInputService input, ITransformService transform, IOutputService output) + public MainHostedService(ILogger logger, IInputService input, ITransformService transform, IOutputService output, ProcessContext context) { + _logger = logger; _input = input; _transform = transform; _output = output; + _context = context; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - var tasks = new List() + + + var inputTask = Task.Factory.StartNew(async () => + { + try + { + await _input.ExecuteAsync(stoppingToken); + } + catch (Exception ex) + { + _context.AddException(ex); + _logger.LogError("Exception occurred on inputService:{Message},{StackTrace}", ex.Message, ex.StackTrace); + } + + }); + var transformTask = Task.Factory.StartNew(async () => { - Task.Run(async () => await _input.ExecuteAsync(stoppingToken), stoppingToken), - Task.Run(async () => await _transform.ExecuteAsync(stoppingToken), stoppingToken), - Task.Run(async () => await _output.ExecuteAsync(stoppingToken), stoppingToken), - }; - await Task.WhenAll(tasks); + try + { + await _transform.ExecuteAsync(stoppingToken); + } + catch (Exception ex) + { + _context.AddException(ex); + _logger.LogError("Exception occurred on transformService:{Message},{StackTrace}", ex.Message, ex.StackTrace); + } + + }); + + var outputTask = Task.Factory.StartNew(async () => + { + try + { + await _output.ExecuteAsync(stoppingToken); + } + catch (Exception ex) + { + _context.AddException(ex); + _logger.LogError("Exception occurred on outputService:{Message},{StackTrace}", ex.Message, ex.StackTrace); + } + + }); + // await Task.Run(async () => await _output.ExecuteAsync(stoppingToken), stoppingToken); } } \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/OutputService.cs b/ConsoleApp2/HostedServices/OutputService.cs index c1647c0..f838125 100644 --- a/ConsoleApp2/HostedServices/OutputService.cs +++ b/ConsoleApp2/HostedServices/OutputService.cs @@ -5,6 +5,8 @@ using ConsoleApp2.Services; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using MySqlConnector; +using System.Threading; namespace ConsoleApp2.HostedServices; @@ -32,36 +34,40 @@ public class OutputService : IOutputService _taskManager = taskManager; } - public async Task ExecuteAsync(CancellationToken stoppingToken) + public async Task ExecuteAsync(CancellationToken cancellationToken) { _logger.LogInformation("***** Mysql output service started *****"); - - var records = new List(); - while (!_context.IsTransformCompleted || _consumerQueue.Count > 0) + var count = 0; + _taskManager.CreateTasks(async () => { - if (!_consumerQueue.TryDequeue(out var record)) continue; - records.Add(record); - - if (records.Count >= _options.Value.FlushCount) + var records = new List(); + while (!_context.IsTransformCompleted || _consumerQueue.Count > 0) { - var recordsCopy = records; - _taskManager.CreateTask(async () => await FlushAsync(recordsCopy), stoppingToken); - records = []; + if (!_consumerQueue.TryDequeue(out var record)) continue; + records.Add(record); + count++; + //_logger.LogInformation(@"*****OutputCount: {count} *****",count); + if (records.Count >= _options.Value.FlushCount) + { + await FlushAsync(records); + records.Clear(); + } + if (_context.GetExceptions().Count>0) + { + _logger.LogInformation("***** Csv output service is canceled *****"); + return; + } } - - if (_taskManager.TaskCount >= _options.Value.MaxTask) + if (_context.IsTransformCompleted && records.Count > 0) { - await _taskManager.WaitAll(); - _taskManager.ClearTask(); + await FlushAsync(records); + records.Clear(); + _context.CompleteOutput(); + _logger.LogInformation("***** Mysql output service completed *****"); } - } + }, _options.Value.TaskCount); await _taskManager.WaitAll(); - await FlushAsync(records); - - _context.CompleteOutput(); - - _logger.LogInformation("***** Mysql output service completed *****"); } private async Task FlushAsync(IEnumerable records) @@ -69,15 +75,29 @@ public class OutputService : IOutputService var count = 0; await using var output = new MySqlDestination( _options.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"), - _logger, true); - + _logger, _context,true); + //if (records == null || records.Count() == 0) return; + //var dbName = $"cferp_test_1"; + //if (records != null && records.Count() > 0) + //{ + // dbName = $"cferp_test_{records.FirstOrDefault()?.CompanyID}"; + //} + + //await using var output = new MySqlDestination(new MySqlConnectionStringBuilder + //{ + // Server = "127.0.0.1", + // Port = 34309, + // Database = dbName, + // UserID = "root", + // Password = "123456", + // MaximumPoolSize = 50, + //}.ConnectionString, _logger,true); foreach (var record in records) { await output.WriteRecordAsync(record); count++; } - - await output.FlushAsync(); + await output.FlushAsync(_options.Value.MaxAllowedPacket); _context.AddOutput(count); } } \ No newline at end of file diff --git a/ConsoleApp2/HostedServices/TaskMonitorService.cs b/ConsoleApp2/HostedServices/TaskMonitorService.cs index eb65ee8..5025202 100644 --- a/ConsoleApp2/HostedServices/TaskMonitorService.cs +++ b/ConsoleApp2/HostedServices/TaskMonitorService.cs @@ -49,6 +49,7 @@ public class TaskMonitorService : BackgroundService bool endCheck = false; while (true) { + if (_context.GetExceptions().Count>0) return; EndCheck: // var running = 0; // var error = 0; diff --git a/ConsoleApp2/HostedServices/TestInputService.cs b/ConsoleApp2/HostedServices/TestInputService.cs new file mode 100644 index 0000000..93d8b93 --- /dev/null +++ b/ConsoleApp2/HostedServices/TestInputService.cs @@ -0,0 +1,83 @@ +using ConsoleApp2.HostedServices.Abstractions; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using ConsoleApp2.Const; +using ConsoleApp2.Options; +using ConsoleApp2.Services; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System.Reflection.PortableExecutable; +using System.Collections.Concurrent; +using ConsoleApp2.SimulationService; + +namespace ConsoleApp2.HostedServices +{ + public class TestInputService : IInputService + { + private readonly ILogger _logger; + private readonly IOptions _csvOptions; + private readonly DataRecordQueue _producerQueue; + private readonly ProcessContext _context; + public TestInputService(ILogger logger, + IOptions csvOptions, + [FromKeyedServices(ProcessStep.Producer)] DataRecordQueue producerQueue, + ProcessContext context) + { + _logger = logger; + _csvOptions = csvOptions; + _producerQueue = producerQueue; + _context = context; + } + public async Task ExecuteAsync(CancellationToken cancellationToken) + { + var tableName = "order_item"; + var headers = new string[] { "ID","OrderNo","ItemNo","ItemType","RoomID","BoxID","DataID","PlanID","PackageID","Num","CompanyID","ShardKey" }; + var dataCount = 1200000000L; + var tempCount = 80000; + var tempRecords=new List(); + var comanyID = 1; + short[] shareKeys = { 23040, 23070, 23100, 24000, 24040, 24070, 24100, 25000, 25040, 25070, 25100 }; + int[] companyIds = { 1, 2, 3, 4 }; + var sk = shareKeys.First(); + var companyID = companyIds.First(); + + var shareKeyInterval = 20000; + var getShareKeyTimes = 0; + var getCompanyIDTimes = 0; + var shareKeyIntervalCount = 0; + for (long i = 1; i <= dataCount; i++) + { + shareKeyIntervalCount++; + if (shareKeyIntervalCount > shareKeyInterval) { + sk=DataHelper.GetShareKey(getShareKeyTimes); + getShareKeyTimes++; + shareKeyIntervalCount = 0; + } + var fields = new string[] { i.ToString(), "20220104020855", (220105981029 + i).ToString(), "1", "482278", "482279", "3768774", "0", "0", "1", companyID.ToString(), sk.ToString() }; + var record = new DataRecord(fields, tableName, headers, comanyID); + tempRecords.Add(record); + if (tempRecords.Count >= tempCount) + { + foreach (var rc in tempRecords) + { + _context.AddInput(); + _producerQueue.Enqueue(rc); + if (cancellationToken.IsCancellationRequested) + return; + } + tempRecords.Clear(); + companyID = DataHelper. GetCompanyId(getCompanyIDTimes); + getCompanyIDTimes++; + } + } + + _context.CompleteInput(); + _logger.LogInformation("***** Csv input service completed *****"); + } + + } +} diff --git a/ConsoleApp2/HostedServices/TransformService.cs b/ConsoleApp2/HostedServices/TransformService.cs index ae2597c..97ec4f5 100644 --- a/ConsoleApp2/HostedServices/TransformService.cs +++ b/ConsoleApp2/HostedServices/TransformService.cs @@ -36,12 +36,16 @@ public class TransformService : ITransformService public async Task ExecuteAsync(CancellationToken cancellationToken) { _logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId); - while (!_context.IsInputCompleted || _producerQueue.Count > 0) + while ((!_context.IsInputCompleted || _producerQueue.Count > 0)) { + if (_context.GetExceptions().Count > 0) + { + _logger.LogInformation("***** Csv transform service is canceled *****"); + return; + } // var dbOptions = _options.Value.DatabaseFilter(record); if (!_producerQueue.TryDequeue(out var record)) continue; - record.Database = _options.Value.DatabaseFilter?.Invoke(record); - + for (var i = 0; i < record.Fields.Length; i++) { var field = record[i]; @@ -56,20 +60,41 @@ public class TransformService : ITransformService switch (_options.Value.GetColumnType(record.TableName, record.Headers[i])) { - case ColumnType.Blob or ColumnType.Text: - field = string.IsNullOrEmpty(field) ? "''" : $"0x{field}"; + case ColumnType.Text: + + field = string.IsNullOrEmpty(field) ? "''" : _options.Value.TransformBinary?.Invoke(field) ?? field; ; + break; + case ColumnType.Blob: + field = string.IsNullOrEmpty(field) ? "NULL" : $"0x{field}"; break; default: break; } - Escape: + Escape: record[i] = field; } - - // TODO: 数据处理/过滤/复制 - + //过滤不要的record + if (_options.Value.RecordFilter?.Invoke(record) == false) continue; + record.Database = _options.Value.DatabaseFilter?.Invoke(record); + //修改record + _options.Value.RecordModify?.Invoke(record); + //替换record + var replaceRecord = _options.Value.RecordReplace?.Invoke(record); + if (replaceRecord != null) + { + record = replaceRecord; + } _consumerQueue.Enqueue(record); + //数据增加 + var addRecords=_options.Value.RecordAdd?.Invoke(record); + if(addRecords != null) + { + foreach(var rc in addRecords) + { + _consumerQueue.Enqueue(rc); + } + } _context.AddTransform(); } diff --git a/ConsoleApp2/Options/CommandOptions.cs b/ConsoleApp2/Options/CommandOptions.cs new file mode 100644 index 0000000..78d0881 --- /dev/null +++ b/ConsoleApp2/Options/CommandOptions.cs @@ -0,0 +1,19 @@ +using System; +using System.ComponentModel; +using System.Configuration; +namespace ConsoleApp2.Options +{ + public class CommandOptions + { + + public string InputDir { get; set; } = "./MyDumper"; + + public int TaskCount { get; set; } = 16; + + public int FlushCount { get; set; } = 20000; + + public bool Isutf8mb4 { get; set; } = true; + + public short OldestShardKey { get; set; } = 23010; + } +} diff --git a/ConsoleApp2/Options/CsvOptions.cs b/ConsoleApp2/Options/CsvOptions.cs index ab03ee6..5d85cfb 100644 --- a/ConsoleApp2/Options/CsvOptions.cs +++ b/ConsoleApp2/Options/CsvOptions.cs @@ -5,7 +5,7 @@ public class CsvOptions /// /// MyDumper导出的CSV文件目录 /// - public string InputDir { get; set; } = "./"; + //public string InputDir { get; set; } = "./"; /// /// 字符串的包围符号,默认为双引号" diff --git a/ConsoleApp2/Options/DataInputOptions.cs b/ConsoleApp2/Options/DataInputOptions.cs new file mode 100644 index 0000000..f9e2237 --- /dev/null +++ b/ConsoleApp2/Options/DataInputOptions.cs @@ -0,0 +1,18 @@ +using ConsoleApp2.Services; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection.PortableExecutable; +using System.Text; +using System.Threading.Tasks; + +namespace ConsoleApp2.Options +{ + public enum InputFileType { CSV, JWT, JSV } + public class DataInputOptions + { + public string InputDir { get; set; } = "./"; + + public Func? CreateSource { get; set; } + } +} diff --git a/ConsoleApp2/Options/DataTransformOptions.cs b/ConsoleApp2/Options/DataTransformOptions.cs index 6ee7125..3435ffd 100644 --- a/ConsoleApp2/Options/DataTransformOptions.cs +++ b/ConsoleApp2/Options/DataTransformOptions.cs @@ -1,4 +1,6 @@ -namespace ConsoleApp2.Options; +using StackExchange.Redis; + +namespace ConsoleApp2.Options; public enum ColumnType { @@ -10,7 +12,14 @@ public enum ColumnType public class DataTransformOptions { public Func? DatabaseFilter { get; set; } - + + public Func? TransformBinary { get; set; }//Binary转字符串方法 + + public Func? RecordFilter { get; set; }//数据过滤方法 + public Action? RecordModify { get; set; }//数据修改 + public Func? RecordReplace { get; set; }//数据替换 + public Func?>? RecordAdd { get; set; }//数据替换 + /// /// 配置导入数据的特殊列 /// diff --git a/ConsoleApp2/Options/DatabaseOutputOptions.cs b/ConsoleApp2/Options/DatabaseOutputOptions.cs index 03a03ce..cd8f2da 100644 --- a/ConsoleApp2/Options/DatabaseOutputOptions.cs +++ b/ConsoleApp2/Options/DatabaseOutputOptions.cs @@ -7,11 +7,13 @@ public class DatabaseOutputOptions /// public string? ConnectionString { get; set; } /// - /// 输出服务的最大任务(Task)数 + /// 输出服务的任务(Task)数 /// - public int MaxTask { get; set; } + public int TaskCount { get; set; } /// /// 每个任务每次提交到数据库的记录数量(每N条构建一次SQL语句) /// public int FlushCount { get; set; } + + public int MaxAllowedPacket { get; set; } = 32*1024*1024; } \ No newline at end of file diff --git a/ConsoleApp2/Options/InputTableOptions.cs b/ConsoleApp2/Options/InputTableOptions.cs new file mode 100644 index 0000000..49f7397 --- /dev/null +++ b/ConsoleApp2/Options/InputTableOptions.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace ConsoleApp2.Options +{ + public class TableInfo + { + public long SimulaRowCount { get; set; }//模拟的记录条数 + } + public class InputTableOptions + { + public Dictionary TableInfoConfig { get; set; } = new(); + } +} diff --git a/ConsoleApp2/Program.cs b/ConsoleApp2/Program.cs index 0fad605..6c0ed05 100644 --- a/ConsoleApp2/Program.cs +++ b/ConsoleApp2/Program.cs @@ -1,35 +1,227 @@ -using ConsoleApp2.Const; +using ConsoleApp2; +using ConsoleApp2.Const; +using ConsoleApp2.Helpers; using ConsoleApp2.HostedServices; using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.Options; using ConsoleApp2.Services; +using ConsoleApp2.SimulationService; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using MySqlConnector; using Serilog; +using Serilog.Core; +using System.Reflection.PortableExecutable; + // 运行之前把Mysql max_allowed_packets 调大 // 运行之前把process_step表的外键删掉 - await RunProgram(); return; async Task RunProgram() { + + + //var inputDir= "D:\\MyDumper"; + //ValidateConsole.ValidateInput((_inputDir) => + //{ + // if (Directory.Exists(_inputDir)) + // { + // inputDir = _inputDir; + // return true; + // } + // else return false; + //}, "请输入读取csv文件的目录(默认为当前目录下MyDumper文件夹):"); + + //var maxTask = 16; + //ValidateConsole.ValidateInput((_inputDir) => + //{ + + // _ = int.TryParse(_inputDir.ToString(), out var _taskCount); + // if (_taskCount > 0) { + // maxTask = _taskCount; + // return true; + // } + // else return false; + //}, "请输入执行输出的线程数量(默认为16):"); + + //var flushCount = 2_0000; + //ValidateConsole.ValidateInput((_inputDir) => + //{ + // _ = int.TryParse(_inputDir.ToString(), out var _flashCount); + // if (_flashCount > 0) + // { + // flushCount = _flashCount; + // return true; + // } else return false; + + //}, "请输入单次插入的行数(默认为20000):"); + ThreadPool.SetMaxThreads(200, 200); - var host = Host.CreateApplicationBuilder(); - host.Configuration.AddCommandLine(args); + var host = Host.CreateApplicationBuilder(args); + var commandOptions = host.Configuration.GetSection("CmdOptions").Get() ?? new CommandOptions(); + Console.WriteLine($"InputDir:{commandOptions?.InputDir}"); + Console.WriteLine($"OutPutFlushCount:{commandOptions?.FlushCount}"); + Console.WriteLine($"OutPutTaskCount:{commandOptions?.TaskCount}"); + + + host.Services.Configure(option => + { + option.TableInfoConfig = new Dictionary + { + + //{"order_block_plan_item",new TableInfo{SimulaRowCount=136323566 }},//从order_item表查询,然后程序插入 + //{"order_package_item",new TableInfo{SimulaRowCount=52525224 }},//从order_item表查询,然后程序插入 + //{"order_patch_detail",new TableInfo{SimulaRowCount=10 }},//生产没有这个表,不处理 + + + //{"machine",new TableInfo{SimulaRowCount=14655 }}, + //{"order",new TableInfo{SimulaRowCount=5019216 }}, + //{"order_block_plan",new TableInfo{SimulaRowCount=2725553 }},//CreateTime < 202301的删除 + {"order_block_plan_result",new TableInfo{SimulaRowCount=1174096 }}, + //{"order_box_block",new TableInfo{SimulaRowCount=29755672 }}, + //{"order_data_block",new TableInfo{SimulaRowCount=731800334 }}, + //{"order_data_goods",new TableInfo{SimulaRowCount=25803671 }}, + //{"order_data_parts",new TableInfo{SimulaRowCount=468517543 }}, + //{"order_item",new TableInfo{SimulaRowCount=1345520079 }}, + //{"order_module",new TableInfo{SimulaRowCount=103325385 }}, + //{"order_module_extra",new TableInfo{SimulaRowCount=54361321 }}, + //{"order_module_item",new TableInfo{SimulaRowCount=69173339 }}, + //{"order_package",new TableInfo{SimulaRowCount=16196195 }}, + //{"order_process",new TableInfo{SimulaRowCount=3892685 }},//orderNo < 202301的 + //{"order_process_step",new TableInfo{SimulaRowCount=8050349 }},//orderNo < 202301的删除 + //{"order_process_step_item",new TableInfo{SimulaRowCount=14538058 }},//orderNo < 202301的删除 + //{"order_scrap_board",new TableInfo{SimulaRowCount=123998 }}, + //{"process_group",new TableInfo{SimulaRowCount=1253 }}, + //{"process_info",new TableInfo{SimulaRowCount=7839 }}, + //{"process_item_exp",new TableInfo{SimulaRowCount=28 }}, + //{"process_schdule_capacity",new TableInfo{SimulaRowCount=39736 }}, + //{"process_step_efficiency",new TableInfo{SimulaRowCount=8 }}, + //{"report_template",new TableInfo{SimulaRowCount=7337 }}, + //{"simple_package",new TableInfo{SimulaRowCount=130436 }},//orderNo < 202301的删除 + //{"simple_plan_order",new TableInfo{SimulaRowCount=351470 }},//CreateTime < 202301的删除 + //{"sys_config",new TableInfo{SimulaRowCount=2296 }}, + //{"work_calendar",new TableInfo{SimulaRowCount=11 }}, + //{"work_shift",new TableInfo{SimulaRowCount=59 }}, + //{"work_time",new TableInfo{SimulaRowCount=62 }}, + }; + }); host.Services.Configure(option => { option.Delimiter = ","; option.QuoteChar = '"'; - option.InputDir = "D:/Dump"; }); + host.Services.Configure(options => + { + options.InputDir = commandOptions.InputDir; + var _csvOptions = new CsvOptions { Delimiter = ",", QuoteChar = '"' }; + options.CreateSource = (string tableName) => + { + var source = new ZstSource(commandOptions.InputDir, tableName, _csvOptions.Delimiter, _csvOptions.QuoteChar); + return source; + }; + }); + host.Services.Configure(options => { - options.DatabaseFilter = record => "cferp_test_1"; + options.DatabaseFilter = record => "cferp_test"; + + options.TransformBinary = field => commandOptions != null && commandOptions.Isutf8mb4 ? $"_utf8mb4 0x{field}" : $"0x{field}"; + //数据过滤 + options.RecordFilter = record => + { + var index = Array.IndexOf(record.Headers, "ShardKey"); + if (index > -1) + { + var skString = record.Fields[index]; + short.TryParse(skString, out var sk); + if (sk < commandOptions.OldestShardKey) return false; + } + if (record.TableName == "order_package") + { + var pkNoIndex = Array.IndexOf(record.Headers, "PakageNo"); + if (pkNoIndex > -1) + { + var pkNo = record.Fields[pkNoIndex]; + if (pkNo.Length <= 2) return false; + } + } + if (record.TableName == "order_block_plan") + { + var orderNosIndex = Array.IndexOf(record.Headers, "OrderNos"); + if (orderNosIndex > -1) + { + var pkNo = record.Fields[orderNosIndex]; + if (pkNo.Length <= 2) return false; + } + } + return true; + + }; + //数据修改 + options.RecordModify = (record) => + { + if (record.TableName == "order_process")//修改order_process.NextStepID的默认值为0 + { + var nextStepIdIndex = Array.IndexOf(record.Headers, "NextStepID"); + if (nextStepIdIndex > -1) + { + var idString = record.Fields[nextStepIdIndex]; + + if (idString == "\\N") + { + record.Fields[nextStepIdIndex] = "0"; + } + + } + } + + }; + //数据替换 + options.RecordReplace = (record) => + { + //删除数据源里simple_plan_order.ProcessState 字段和值 + + if (record.TableName == "simple_plan_order")//修改order_process.NextStepID的默认值为0 + { + var nextStepIdIndex = Array.IndexOf(record.Headers, "ProcessState"); + if (nextStepIdIndex > -1) + { + var headers = record.Headers.Where(t => t != "ProcessState").ToArray(); + var fs = record.Fields.ToList(); + fs.RemoveAt(nextStepIdIndex); + var fields = fs.ToArray(); + return new DataRecord(fields, record.TableName, headers, record.CompanyID); + } + } + return null; + }; + //数据生成 + options.RecordAdd = (record) => + { + var resultList = new List(); + if(record.TableName == "order_item") + { + + var itemIDIndex = Array.IndexOf(record.Headers, "ItemID"); + var shardKeyIndex = Array.IndexOf(record.Headers, "ShardKey"); + var planIDIndex = Array.IndexOf(record.Headers, "PlanID"); + var packageIDIndex = Array.IndexOf(record.Headers, "PackageID"); + var companyIDIndex = Array.IndexOf(record.Headers, "CompanyID"); + + //resultList.Add(new DataRecord( + // new[] { "ItemID", "ShardKey", "PlanID","CompanyID" }, "order_block_plan_item", + // new[] { record.Fields[itemIDIndex], record.Fields[shardKeyIndex], record.Fields[planIDIndex], record.Fields[companyIDIndex] })); + //resultList.Add( + // new DataRecord(new[] { "ItemID", "ShardKey", "PackageID", "CompanyID" }, "order_package_item", + // new[] { record.Fields[itemIDIndex], record.Fields[shardKeyIndex], record.Fields[packageIDIndex], record.Fields[companyIDIndex] })); + } + return resultList; + + }; options.ColumnTypeConfig = new() { { "simple_plan_order.PlaceData", ColumnType.Blob }, @@ -61,20 +253,19 @@ async Task RunProgram() { "order_block_plan.BlockInfo", ColumnType.Text }, }; }); - host.Services.Configure(options => { options.ConnectionString = new MySqlConnectionStringBuilder { Server = "127.0.0.1", - Port = 33306, - Database = "cferp_test_1", + Port = 33309, + Database = "cferp_test", UserID = "root", Password = "123456", MaximumPoolSize = 50, // 这个值应当小于 max_connections }.ConnectionString; - options.MaxTask = 16; - options.FlushCount = 200; + options.TaskCount = commandOptions.TaskCount; + options.FlushCount = commandOptions.FlushCount; }); host.Services.AddLogging(builder => { @@ -89,10 +280,13 @@ async Task RunProgram() host.Services.AddHostedService(); host.Services.AddHostedService(); - host.Services.AddSingleton(); + host.Services.AddSingleton(); host.Services.AddSingleton(); host.Services.AddSingleton(); - + host.Services.AddStackExchangeRedisCache(options => + { + options.Configuration = "localhost:6379"; + }); var app = host.Build(); await app.RunAsync(); } \ No newline at end of file diff --git a/ConsoleApp2/Services/CsvSource.cs b/ConsoleApp2/Services/CsvSource.cs index 50b26c6..364ac83 100644 --- a/ConsoleApp2/Services/CsvSource.cs +++ b/ConsoleApp2/Services/CsvSource.cs @@ -1,5 +1,7 @@ using System.Text; +using System.Text.RegularExpressions; using ConsoleApp2.Helpers; +using ConsoleApp2.HostedServices.Abstractions; using Microsoft.Extensions.Logging; namespace ConsoleApp2.Services; @@ -7,45 +9,52 @@ namespace ConsoleApp2.Services; /// /// CSV文件读取 /// -public class CsvSource +public class CsvSource:IDataSource { - private readonly string _filePath; - private readonly StreamReader _reader; + protected readonly string _inputDir; + //protected readonly StreamReader _reader; private readonly ILogger? _logger; - private readonly string _tableName; + protected readonly string _tableName; + protected string _sqlFilePath; + protected readonly string _sqlFileText; - public DataRecord Current { get; private set; } - public string[]? Headers { get; } - public string? CurrentRaw { get; private set; } + //public DataRecord Current { get; protected set; } + //public string[]? Headers { get; } + public string? CurrentRaw { get; protected set; } public string Delimiter { get; private set; } public char QuoteChar { get; private set; } - public CsvSource(string filePath, string[]? headers = null, string delimiter = ",", char quoteChar = '"', + public CsvSource(string inputDir,string tableName,string delimiter = ",", char quoteChar = '"', ILogger? logger = null) { - _filePath = filePath; - Headers = headers; + _inputDir = inputDir; + _tableName = tableName; + //Headers = headers; _logger = logger; Delimiter = delimiter; QuoteChar = quoteChar; - var fs = File.OpenRead(filePath); - _reader = new StreamReader(fs); - _tableName = DumpDataHelper.GetTableName(filePath); + //var fs = File.OpenRead(filePath); + //_reader = new StreamReader(fs); + //_tableName = DumpDataHelper.GetTableName(filePath); + string pattern = $"^.*\\.{tableName}\\..*\\.sql$"; + _sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success); + + } - public async ValueTask ReadAsync() - { - var str = await _reader.ReadLineAsync(); - if (string.IsNullOrWhiteSpace(str)) - return false; + //public virtual async ValueTask ReadAsync() + //{ + // var str = await _reader.ReadLineAsync(); + // if (string.IsNullOrWhiteSpace(str)) + // return false; - CurrentRaw = str; + // CurrentRaw = str; - var fields = ParseRow2(str, QuoteChar, Delimiter); - Current = new DataRecord(fields, _tableName, Headers); - return true; - } + // var fields = ParseRow2(str, QuoteChar, Delimiter); + // Current = new DataRecord(fields, _tableName, Headers); + // return true; + //} public string[] ParseRow(string row, char quoteChar, string delimiter) { @@ -136,4 +145,64 @@ public class CsvSource result.Add(current.ToString()); return result.ToArray(); } + public virtual async Task GetHeaders() + { + var text = await File.ReadAllTextAsync(_sqlFilePath); + return await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text); + + } + public virtual async Task GetCsvFiles() + { + var text= await File.ReadAllTextAsync(_sqlFilePath); + return await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text,new Regex(@"'.+\.dat'")); + } + public virtual async Task DoEnqueue(Action action) + { + var sourceFiles =await GetCsvFiles(); + foreach (var file in sourceFiles) + { + var headers = await GetHeaders(); + var filePath= Path.Combine(_inputDir, file); + using (var fs = File.OpenRead(filePath)) + { + using (StreamReader sr = new StreamReader(fs)) + { + while (!sr.EndOfStream) + { + var line = await sr.ReadLineAsync(); + var fields = ParseRow2(line, QuoteChar, Delimiter); + var record = new DataRecord(fields, _tableName, headers); + action?.Invoke(record); + } + } + } + + + } + } + public virtual async Task GetTestRecord() + { + var sourceFiles = await GetCsvFiles(); + var file = sourceFiles.FirstOrDefault(); + if (file != null) + { + var headers = await GetHeaders(); + var filePath = Path.Combine(_inputDir, file); + using (var fs = File.OpenRead(filePath)) + { + using (StreamReader sr = new StreamReader(fs)) + { + var line = await sr.ReadLineAsync(); + var fields = ParseRow2(line, QuoteChar, Delimiter); + var record = new DataRecord(fields, _tableName, headers); + return record; + } + } + } + return null; + } + public void Dispose() + { + // _reader.Dispose(); + } } \ No newline at end of file diff --git a/ConsoleApp2/Services/DataRecordQueue.cs b/ConsoleApp2/Services/DataRecordQueue.cs index e988423..fce654f 100644 --- a/ConsoleApp2/Services/DataRecordQueue.cs +++ b/ConsoleApp2/Services/DataRecordQueue.cs @@ -19,7 +19,7 @@ public class DataRecordQueue : IDisposable public DataRecordQueue() { - _queue = new BlockingCollection(200_000); // 队列最长为20W条记录 + _queue = new BlockingCollection(2000_000); // 队列最长为20W条记录 } public bool TryDequeue([MaybeNullWhen(false)] out DataRecord record) diff --git a/ConsoleApp2/Services/JsvSource.cs b/ConsoleApp2/Services/JsvSource.cs index c2764b3..c9b3913 100644 --- a/ConsoleApp2/Services/JsvSource.cs +++ b/ConsoleApp2/Services/JsvSource.cs @@ -1,4 +1,5 @@ using ConsoleApp2.Helpers; +using ConsoleApp2.HostedServices.Abstractions; using Microsoft.Extensions.Logging; using ServiceStack.Text; @@ -8,9 +9,9 @@ namespace ConsoleApp2.Services; /// 读取Jsv格式文件 /// [Obsolete] -public class JsvSource : IDisposable +public class JsvSource:IDataSource { - private readonly string _filePath; + private readonly string _inputDir; private readonly JsvStringSerializer _jsv; private readonly StreamReader _reader; // ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable @@ -21,29 +22,22 @@ public class JsvSource : IDisposable public string[]? Headers { get; } public bool EndOfSource => _reader.EndOfStream; - public JsvSource(string filePath, string[]? headers = null, ILogger? logger = null) + public JsvSource(string inputDir,string tableName, ILogger? logger = null) { - _filePath = filePath; + _inputDir = inputDir; + _tableName = tableName; _jsv = new JsvStringSerializer(); - _reader = new StreamReader(filePath); - Headers = headers; + // _reader = new StreamReader(filePath); + //Headers = headers; _logger = logger; // _logger?.LogInformation("Reading file: {FilePath}", filePath); - _tableName = DumpDataHelper.GetTableName(filePath); + //_tableName = DumpDataHelper.GetTableName(filePath); } - - public async ValueTask ReadAsync() + public async Task DoEnqueue(Action action) { - var str = await _reader.ReadLineAsync(); - if (string.IsNullOrEmpty(str)) - return false; - var fields = _jsv.DeserializeFromString(str); - - Current = new DataRecord(fields, _tableName, Headers); - return true; } - public void Dispose() + public void Dispose() { _reader.Dispose(); } diff --git a/ConsoleApp2/Services/MySqlDestination.cs b/ConsoleApp2/Services/MySqlDestination.cs index 47f6f4e..b6ed583 100644 --- a/ConsoleApp2/Services/MySqlDestination.cs +++ b/ConsoleApp2/Services/MySqlDestination.cs @@ -2,6 +2,7 @@ using ConsoleApp2.Helpers; using Microsoft.Extensions.Logging; using MySqlConnector; +using ServiceStack; namespace ConsoleApp2.Services; @@ -14,16 +15,21 @@ public class MySqlDestination : IDisposable, IAsyncDisposable private readonly MySqlConnection _conn; private readonly ILogger _logger; private readonly bool _prettyOutput; - - public MySqlDestination(string connStr, ILogger logger, bool prettyOutput = false) + private readonly int _maxAllowPacket; + private readonly ProcessContext _context; + private static StringBuilder recordSb = new StringBuilder(); + public MySqlDestination(string connStr, ILogger logger, ProcessContext context,bool prettyOutput = false) { _conn = new MySqlConnection(connStr); _conn.Open(); _recordCache = new Dictionary>(); _logger = logger; + _context = context; _prettyOutput = prettyOutput; - } + + + } public Task WriteRecordAsync(DataRecord record) { _recordCache.AddOrUpdate(record.TableName, [record], (key, value) => @@ -42,74 +48,113 @@ public class MySqlDestination : IDisposable, IAsyncDisposable } } - public async Task FlushAsync() + public async Task FlushAsync(int maxAllowPacket) { if (_recordCache.Count == 0) return; - - var cmd = _conn.CreateCommand(); - cmd.CommandText = SerializeRecords(_recordCache, _prettyOutput); + + //var cmd = _conn.CreateCommand(); + //cmd.CommandTimeout = 3 * 60; + try { - await cmd.ExecuteNonQueryAsync(); + var excuseList = GetExcuseList(_recordCache, maxAllowPacket, _prettyOutput); + //foreach (var insertSql in excuseList) + //{ + // //cmd.CommandText = insertSql; + // //await cmd.ExecuteNonQueryAsync(); + // //_logger.LogInformation(@"do insert completed!size:{Length}", cmd.CommandText.Length); + //} _recordCache.Clear(); } catch (Exception e) { - _logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000)); + //_logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000)); + _context.AddException(e); throw; } finally { - await cmd.DisposeAsync(); + //await cmd.DisposeAsync(); } } - public static string SerializeRecords(IDictionary> tableRecords, - bool prettyOutput = false) + public static IList GetExcuseList(IDictionary> tableRecords,int maxAllowPacket, + bool prettyOutput = false) { - var sb = new StringBuilder(); - + var resultList = new List(); + var headerSb = string.Empty; + //var recordSb = new StringBuilder(); + recordSb.Clear(); foreach (var (tableName, records) in tableRecords) { if (records.Count == 0) continue; - sb.Append($"INSERT INTO `{tableName}`("); + headerSb=$"INSERT INTO `{tableName}`("; for (var i = 0; i < records[0].Headers.Length; i++) { var header = records[0].Headers[i]; - sb.Append($"`{header}`"); + headerSb+=$"`{header}`"; if (i != records[0].Headers.Length - 1) - sb.Append(','); + headerSb.Append(','); } - sb.Append(") VALUES "); + headerSb+=") VALUES "; if (prettyOutput) - sb.AppendLine(); + headerSb+="/r/n"; + var sbList = new List(); + var currentLength = headerSb.Length; for (var i = 0; i < records.Count; i++) { var record = records[i]; - sb.Append('('); + recordSb.Append('('); for (var j = 0; j < record.Fields.Length; j++) { var field = record.Fields[j]; - sb.Append(field); + recordSb.Append(field); if (j != record.Fields.Length - 1) - sb.Append(','); + recordSb.Append(','); } - sb.Append(')'); + recordSb.Append(')'); - if (i != records.Count - 1) // not last field - sb.Append(','); - if (prettyOutput) sb.AppendLine(); + //if (i != records.Count - 1) // not last field + // recordSb.Append(','); + if (prettyOutput) recordSb.AppendLine(); + + if (currentLength + recordSb.Length >= maxAllowPacket) + { + + var insertSb = headerSb; + + insertSb+=string.Join(",", sbList); + insertSb += ";"; + resultList.Add(insertSb); + insertSb=String.Empty; + sbList.Clear(); + currentLength = headerSb.Length; + sbList.Add(recordSb.ToString()); + } + else + { + sbList.Add(recordSb.ToString()); + } + currentLength += recordSb.Length; + recordSb.Clear(); } - - sb.AppendLine(";"); + if (sbList.Count > 0) + { + var insertSb = headerSb.ToString(); + insertSb += string.Join(",", sbList); + insertSb += ";"; + resultList.Add(insertSb.ToString()); + insertSb=string.Empty; + } + headerSb=string.Empty; } - return sb.ToString(); + return resultList; } diff --git a/ConsoleApp2/Services/ProcessContext.cs b/ConsoleApp2/Services/ProcessContext.cs index 17c90ba..4bcbec7 100644 --- a/ConsoleApp2/Services/ProcessContext.cs +++ b/ConsoleApp2/Services/ProcessContext.cs @@ -8,6 +8,7 @@ public class ProcessContext private int _inputCount; private int _transformCount; private int _outputCount; + private IList _exceptionList = new List(); public bool IsInputCompleted { get; private set; } public bool IsTransformCompleted { get; private set; } public bool IsOutputCompleted { get; private set; } @@ -29,7 +30,14 @@ public class ProcessContext get => _outputCount; private set => _outputCount = value; } - + public void AddException(Exception ex) + { + _exceptionList.Add(ex); + } + public IList GetExceptions() + { + return _exceptionList; + } public void CompleteInput() => IsInputCompleted = true; public void CompleteTransform() => IsTransformCompleted = true; diff --git a/ConsoleApp2/Services/TaskManager.cs b/ConsoleApp2/Services/TaskManager.cs index 1e8af7f..38a9b8c 100644 --- a/ConsoleApp2/Services/TaskManager.cs +++ b/ConsoleApp2/Services/TaskManager.cs @@ -27,7 +27,13 @@ public class TaskManager _tasks.Add(task); _logger.LogDebug("New task created"); } - + public void CreateTasks(Func func,int taskCount, CancellationToken cancellationToken = default) + { + for (int i = 0; i < taskCount; i++) + { + CreateTask(func, cancellationToken); + } + } public async Task WaitAll() { await Task.WhenAll(_tasks); diff --git a/ConsoleApp2/Services/ZstSource.cs b/ConsoleApp2/Services/ZstSource.cs new file mode 100644 index 0000000..62dee28 --- /dev/null +++ b/ConsoleApp2/Services/ZstSource.cs @@ -0,0 +1,126 @@ +using ConsoleApp2.Helpers; +using Microsoft.Extensions.Logging; +using System.IO; +using System.Text.RegularExpressions; +using ZstdSharp; +namespace ConsoleApp2.Services +{ + public class ZstSource : CsvSource + { + public ZstSource(string inputDir, string tableName, string delimiter = ",", char quoteChar = '"', + ILogger? logger = null) : base(inputDir, tableName, delimiter = ",", quoteChar = '"', logger = null) + { + //throw new Exception("aaa"); + string pattern = $"^.*\\.{tableName}\\..*\\.sql.zst$"; + _sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success); + + } + private async Task DecompressFile(string filePath) + { + using (var input = File.OpenRead(filePath)) + { + using (var decopress = new DecompressionStream(input)) + { + + var ms = new MemoryStream(); + decopress.CopyTo(ms); + ms.Seek(0, SeekOrigin.Begin); + StreamReader reader = new StreamReader(ms); + var text = await reader.ReadToEndAsync(); + return text; + + } + } + } + public override async Task GetHeaders() + { + var text = await DecompressFile(_sqlFilePath); + return await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text); + + } + public override async Task GetCsvFiles() + { + var text = await DecompressFile(_sqlFilePath); + return await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'")); + } + public override async Task DoEnqueue(Action action) + { + var sourceFiles = await GetCsvFiles(); + var headers = await GetHeaders(); + foreach (var file in sourceFiles) + { + var filePath = Path.Combine(_inputDir, file); + using (var input = File.OpenRead(filePath)) + { + using (var decopress = new DecompressionStream(input)) + { + + var ms = new MemoryStream(); + decopress.CopyTo(ms); + ms.Seek(0, SeekOrigin.Begin); + StreamReader reader = new StreamReader(ms); + while (!reader.EndOfStream) + { + var line = await reader.ReadLineAsync(); + var fields = ParseRow2(line, QuoteChar, Delimiter); + var record = new DataRecord(fields, _tableName, headers); + action?.Invoke(record); + } + } + } + //var headers = await GetHeaders(); + //using (StreamReader sr = new StreamReader(file)) + //{ + // while (!sr.EndOfStream) + // { + // var line = await sr.ReadLineAsync(); + // var fields = ParseRow2(line, QuoteChar, Delimiter); + // var record = new DataRecord(fields, _tableName, headers); + // action?.Invoke(record); + // } + //} + } + } + public override async Task GetTestRecord() + { + var sourceFiles = await GetCsvFiles(); + var file = sourceFiles.FirstOrDefault(); + if (file != null) + { + var headers = await GetHeaders(); + var filePath = Path.Combine(_inputDir, file); + using (var input = File.OpenRead(filePath)) + { + using (var decopress = new DecompressionStream(input)) + { + + var ms = new MemoryStream(); + decopress.CopyTo(ms); + ms.Seek(0, SeekOrigin.Begin); + StreamReader reader = new StreamReader(ms); + var line = await reader.ReadLineAsync(); + var fields = ParseRow2(line, QuoteChar, Delimiter); + var record = new DataRecord(fields, _tableName, headers); + return record; + + } + } + //using (var fs = File.OpenRead(filePath)) + //{ + // using (StreamReader sr = new StreamReader(fs)) + // { + // var line = await sr.ReadLineAsync(); + // var fields = ParseRow2(line, QuoteChar, Delimiter); + // var record = new DataRecord(fields, _tableName, headers); + // return record; + // } + //} + } + return null; + } + public void Dispose() + { + //_reader.Dispose(); + } + } +} diff --git a/ConsoleApp2/SimulationService/DataHelper.cs b/ConsoleApp2/SimulationService/DataHelper.cs new file mode 100644 index 0000000..0a26af0 --- /dev/null +++ b/ConsoleApp2/SimulationService/DataHelper.cs @@ -0,0 +1,28 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.RegularExpressions; +using System.Threading.Tasks; + +namespace ConsoleApp2.SimulationService +{ + public static partial class DataHelper + { + public static short[] shareKeys = {23000, 23040, 23070, 23100, 24000, 24040, 24070, 24100, 25000, 25040, 25070, 25100 }; + public static int[] companyIds = { 1, 2, 3, 4 }; + private static T getArrayValue(int index, T[] array)//按index取数据,超过数组长度,index从0开始再取 + { + return array[index % array.Length]; + } + public static short GetShareKey(int index) + { + return getArrayValue(index, shareKeys); + } + public static int GetCompanyId(int index) + { + return getArrayValue(index, companyIds); + } + + } +} diff --git a/ConsoleApp2/SimulationService/SimulationInputService.cs b/ConsoleApp2/SimulationService/SimulationInputService.cs new file mode 100644 index 0000000..e8c5ba7 --- /dev/null +++ b/ConsoleApp2/SimulationService/SimulationInputService.cs @@ -0,0 +1,173 @@ +using ConsoleApp2.Const; +using ConsoleApp2.Helpers; +using ConsoleApp2.HostedServices; +using ConsoleApp2.HostedServices.Abstractions; +using ConsoleApp2.Options; +using ConsoleApp2.Services; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System.Text.RegularExpressions; + +namespace ConsoleApp2.SimulationService +{ + + public class SimulationInputService : IInputService + { + private readonly ILogger _logger; + private readonly IOptions _dataInputOptions; + private readonly IOptions _tableOptions; + private readonly DataRecordQueue _producerQueue; + private readonly ProcessContext _context; + + public SimulationInputService(ILogger logger, + IOptions dataInputOptions, + IOptions tableOptions, + [FromKeyedServices(ProcessStep.Producer)] DataRecordQueue producerQueue, + ProcessContext context) + { + _logger = logger; + _dataInputOptions = dataInputOptions; + _tableOptions = tableOptions; + _producerQueue = producerQueue; + _context = context; + } + public async Task ExecuteAsync(CancellationToken cancellationToken) + { + var inputDir = _dataInputOptions.Value.InputDir; + _logger.LogInformation("***** simulation input service start, working dir: {InputDir}, thread id: {ThreadId} *****", inputDir, Environment.CurrentManagedThreadId); + var files = Directory.GetFiles(inputDir); + if (files.Length == 0) + { + _logger.LogInformation("No source files found in {InputDir}", inputDir); + return; + } + foreach (var tableName in _tableOptions.Value.TableInfoConfig.Keys) + { + //_logger.LogInformation("Working sql file: {SqlPath}", sqlPath); + //var headers = await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(sqlPath); + //var sqlFileSource = _dataInputOptions.Value.CreateSource?.Invoke(sqlPath,null); + //var headers =await sqlFileSource?.GetHeaders(); + //var csvFiles = await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(sqlPath); + //var csvFiles =await sqlFileSource?.GetCsvFiles(); + + //foreach (var csvFile in csvFiles) + //{ + //var csvPath = Path.Combine(inputDir, csvFile); + //// var source = new JsvSource(csvPath, headers, _logger); + //var source = new CsvSource(csvPath, headers, _csvOptions.Value.Delimiter, _csvOptions.Value.QuoteChar, _logger); + + //while (await source.ReadAsync()) + //{ + // _context.AddInput(); + // _producerQueue.Enqueue(source.Current); + // if (cancellationToken.IsCancellationRequested) + // return; + //} + //var csvPath = Path.Combine(inputDir, csvFile); + //var tableName = DumpDataHelper.GetTableName(csvPath); + + //var dataCount = 1200000000L;//当前表要生成的总数据量 + var dataCount = _tableOptions.Value.TableInfoConfig[tableName].SimulaRowCount;//当前表要生成的总数据量 + var companyTotallCount = 1000;//当前表每个公司生成的总数据量 + var tempRecords = new List(); + var sk = DataHelper.shareKeys.First(); + var companyID = DataHelper.companyIds.First(); + + var shareKeyInterval = 20000;//每个sharekey的数据量 + var getShareKeyTimes = 0;//sharekey生成的次数,每生成一次,改变sharekey的值 + var getCompanyIDTimes = 0;//公司生成的次数,每生成一次,改变companyID的值 + var shareKeyIntervalCount = 0; + //CsvSource source; + //switch (_dataInputOptions.Value.FileType) + //{ + // case InputFileType.CSV: + // source=new CsvSource(csvPath, headers, _csvOptions.Value.Delimiter, _csvOptions.Value.QuoteChar, _logger); + // break; + // case InputFileType.JWT: + // source = new JwtSource(csvPath, headers, _csvOptions.Value.Delimiter, _csvOptions.Value.QuoteChar, _logger); + // break; + // default: break; + + //} + //var source = new JwtSource(csvPath, headers, _csvOptions.Value.Delimiter, _csvOptions.Value.QuoteChar, _logger); + var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName); + var testRecord =await source.GetTestRecord(); + for (long i = 1; i <= dataCount; i++) + { + shareKeyIntervalCount++; + if (shareKeyIntervalCount > shareKeyInterval) + { + sk = DataHelper.GetShareKey(getShareKeyTimes); + getShareKeyTimes++; + shareKeyIntervalCount = 0; + } + var fields = new string[testRecord.Fields.Length]; + Array.Copy(testRecord.Fields, fields, testRecord.Fields.Length); + var record = new DataRecord(fields, testRecord.TableName, testRecord.Headers, companyID); + //更新record的ID、OrderNo,ShardKey值 + if (record.Headers.Contains("ID")) + { + var index = Array.IndexOf(record.Headers, "ID"); + if (index > -1) + { + record.Fields[index] = i.ToString(); + } + } + if (record.TableName == "order_box_block" && record.Headers.Contains("BoxID")) + { + var index = Array.IndexOf(record.Headers, "BoxID"); + if (index > -1) + { + record.Fields[index] = i.ToString(); + } + } + if ((record.TableName == "order_block_plan_item" || record.TableName == "order_package_item") && record.Headers.Contains("ItemID")) + { + var index = Array.IndexOf(record.Headers, "ItemID"); + if (index > -1) + { + record.Fields[index] = i.ToString(); + } + } + if (record.TableName == "order" && record.Headers.Contains("OrderNo")) + { + var index = Array.IndexOf(record.Headers, "OrderNo"); + if (index > -1) + { + record.Fields[index] = i.ToString(); + } + } + if (record.Headers.Contains("ShardKey")) + { + var index = Array.IndexOf(record.Headers, "ShardKey"); + if (index > -1) + { + record.Fields[index] = sk.ToString(); + } + } + tempRecords.Add(record); + if (tempRecords.Count >= companyTotallCount || i >= dataCount - 1) + { + foreach (var rc in tempRecords) + { + _context.AddInput(); + _producerQueue.Enqueue(rc); + if (cancellationToken.IsCancellationRequested) + return; + } + tempRecords.Clear(); + companyID = DataHelper.GetCompanyId(getCompanyIDTimes); + getCompanyIDTimes++; + } + } + _logger.LogInformation("table:'{tableName}' simulation input completed", tableName); + //} + //_logger.LogInformation("File '{File}' input completed", Path.GetFileName(sqlPath)); + } + + _context.CompleteInput(); + _logger.LogInformation("***** Csv input service completed *****"); + } + } +} diff --git a/ConsoleApp2/appsettings.json b/ConsoleApp2/appsettings.json new file mode 100644 index 0000000..0ce179c --- /dev/null +++ b/ConsoleApp2/appsettings.json @@ -0,0 +1,10 @@ +{ + "CmdOptions": { + "InputFileType": "CSV", + "InputDir": "D:/MyDumper-ZST", + "TaskCount": 1, + "FlushCount": 100, + "Isutf8mb4": true, + "OldestShardKey": 22000 + } +}