Compare commits

...

21 Commits

Author SHA1 Message Date
a169eecec5 整理代码 2024-01-22 17:04:49 +08:00
fcc0de5b2a 整理代码 2024-01-22 16:58:05 +08:00
7235400aee 增加IsMock参数 2024-01-22 16:57:45 +08:00
7e3690a325 整理代码 2024-01-22 16:57:43 +08:00
241f52e30f 删除无用文件 2024-01-22 16:56:55 +08:00
b0795f9a2c 修复测试服务context错误 2024-01-22 16:56:52 +08:00
f167256082 修复获取cachekey错误问题
output使用ThreadPool.QueueUserWorkItem
2024-01-22 16:55:47 +08:00
e3f6ecbd91 修改 2024-01-22 16:54:35 +08:00
CZY
45ad15a065 Merge remote-tracking branch 'origin/multTasks'
# Conflicts:
#	ConsoleApp2/HostedServices/OutputService.cs
#	ConsoleApp2/HostedServices/TransformService.cs
#	ConsoleApp2/Program.cs
2024-01-18 15:40:15 +08:00
854111315b 修改cache key格式 2024-01-18 15:06:52 +08:00
CZY
6ec782ec93 添加异常记录器,记录输出时发生异常的SQL; 2024-01-18 15:03:45 +08:00
97e359468f 支持按多个表开立线程 2024-01-18 14:36:36 +08:00
CZY
1f9c9e0c13 添加Serilog文件日志;
更改空blob列插入规则,现在将插入空blob而不是NULL;
2024-01-17 17:44:08 +08:00
629a4d2fb5 多任务处理输入转换和输出 2024-01-17 17:12:31 +08:00
CZY
f4f7ff316b 修复可能的异步等待问题 2024-01-17 17:05:03 +08:00
CZY
dda87349fd 修复可能的空指针异常 2024-01-17 15:39:38 +08:00
CZY
469e59628c Redis前缀添加到配置文件 2024-01-17 15:11:49 +08:00
CZY
70981fb985 修改Redis用法,添加缓存键前缀; 2024-01-17 14:27:25 +08:00
08e0444055 修复output计数 2024-01-17 14:26:43 +08:00
e0df7ff4e9 修改 2024-01-17 14:26:35 +08:00
CZY
1de3603afe 优化内存分配 2024-01-17 14:26:13 +08:00
25 changed files with 622 additions and 590 deletions

View File

@@ -25,6 +25,7 @@
<PackageReference Include="Serilog" Version="3.1.2-dev-02097" /> <PackageReference Include="Serilog" Version="3.1.2-dev-02097" />
<PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" /> <PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" /> <PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
<PackageReference Include="Serilog.Sinks.File" Version="5.0.1-dev-00972" />
<PackageReference Include="ServiceStack.Text" Version="8.0.0" /> <PackageReference Include="ServiceStack.Text" Version="8.0.0" />
<PackageReference Include="ZstdSharp.Port" Version="0.7.4" /> <PackageReference Include="ZstdSharp.Port" Version="0.7.4" />
</ItemGroup> </ItemGroup>

View File

@@ -65,6 +65,8 @@ public class DataRecord
public bool SetField(string columnName, string value) => SetField(this, columnName,value); public bool SetField(string columnName, string value) => SetField(this, columnName,value);
public string GetCacheKey(string columnName) => GetCacheKey(this, columnName);
public bool SetField( DataRecord record,string columnName,string value) public bool SetField( DataRecord record,string columnName,string value)
{ {
if (record.Headers is null) if (record.Headers is null)
@@ -75,4 +77,14 @@ public class DataRecord
record.Fields[idx] = value; record.Fields[idx] = value;
return true; return true;
} }
public string GetCacheKey(DataRecord record, string columnName)
{
if (TryGetField(record, columnName, out var value))
{
return $"{TableName}_{value}";
}else
throw new IndexOutOfRangeException($"Column name:{columnName} not found in this record.");
}
} }

View File

@@ -12,11 +12,9 @@ public static partial class DumpDataHelper
private static partial Regex MatchBrackets(); private static partial Regex MatchBrackets();
public static async Task<string[]> GetCsvHeadersFromSqlFileAsync(string txt) public static string[] GetCsvHeadersFromSqlFileAsync(string txt)
{ {
//var txt = await File.ReadAllTextAsync(filePath);
var match = MatchBrackets().Match(txt); var match = MatchBrackets().Match(txt);
return ParseHeader(match.ValueSpan); return ParseHeader(match.ValueSpan);
} }
@@ -60,9 +58,8 @@ public static partial class DumpDataHelper
return filePath[(firstDotIdx+1)..secondDotIdx].ToString(); return filePath[(firstDotIdx+1)..secondDotIdx].ToString();
} }
public static async Task<string[]> GetCsvFileNamesFromSqlFileAsync(string txt,Regex regex) public static string[] GetCsvFileNamesFromSqlFileAsync(string txt,Regex regex)
{ {
//var txt = await File.ReadAllTextAsync(filePath);
var matches = regex.Matches(txt); var matches = regex.Matches(txt);
return matches.Select(match => match.ValueSpan[1..^1].ToString()).ToArray(); return matches.Select(match => match.ValueSpan[1..^1].ToString()).ToArray();
} }

View File

@@ -1,6 +1,9 @@
namespace ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.Options;
using ConsoleApp2.Services;
namespace ConsoleApp2.HostedServices.Abstractions;
public interface IInputService public interface IInputService
{ {
public Task ExecuteAsync(CancellationToken cancellationToken); public Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, ProcessContext context, CancellationToken cancellationToken);
} }

View File

@@ -1,6 +1,9 @@
namespace ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.Options;
using ConsoleApp2.Services;
namespace ConsoleApp2.HostedServices.Abstractions;
public interface IOutputService public interface IOutputService
{ {
public Task ExecuteAsync(CancellationToken cancellationToken); public void ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken);
} }

View File

@@ -1,6 +1,9 @@
namespace ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.Options;
using ConsoleApp2.Services;
namespace ConsoleApp2.HostedServices.Abstractions;
public interface ITransformService public interface ITransformService
{ {
public Task ExecuteAsync(CancellationToken cancellationToken); public Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken);
} }

View File

@@ -16,24 +16,18 @@ public class InputService : IInputService
{ {
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IOptions<DataInputOptions> _dataInputOptions; private readonly IOptions<DataInputOptions> _dataInputOptions;
private readonly IOptions<InputTableOptions> _tableOptions;
private readonly DataRecordQueue _producerQueue;
private readonly ProcessContext _context; private readonly ProcessContext _context;
public InputService(ILogger<InputService> logger, public InputService(ILogger<InputService> logger,
IOptions<DataInputOptions> dataInputOptions, IOptions<DataInputOptions> dataInputOptions,
IOptions<InputTableOptions> tableOptions,
[FromKeyedServices(ProcessStep.Producer)] DataRecordQueue producerQueue,
ProcessContext context) ProcessContext context)
{ {
_logger = logger; _logger = logger;
_dataInputOptions = dataInputOptions; _dataInputOptions = dataInputOptions;
_tableOptions = tableOptions;
_producerQueue = producerQueue;
_context = context; _context = context;
} }
public async Task ExecuteAsync(CancellationToken cancellationToken) public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, ProcessContext context, CancellationToken cancellationToken)
{ {
var inputDir = _dataInputOptions.Value.InputDir; var inputDir = _dataInputOptions.Value.InputDir;
_logger.LogInformation("***** Csv input service start, working dir: {InputDir}, thread id: {ThreadId} *****", inputDir, Environment.CurrentManagedThreadId); _logger.LogInformation("***** Csv input service start, working dir: {InputDir}, thread id: {ThreadId} *****", inputDir, Environment.CurrentManagedThreadId);
@@ -44,18 +38,21 @@ public class InputService : IInputService
return; return;
} }
var count = 0; var count = 0;
foreach (var tableName in _tableOptions.Value.TableInfoConfig.Keys) foreach (var tableName in tasksOptions.TableInfoConfig.Keys)
{ {
_logger.LogInformation("Working table: {tableName}", tableName); _logger.LogInformation("Working table: {tableName}", tableName);
var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName); var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName);
await source.DoEnqueue((record) => if (source != null)
{ {
_context.AddInput(); await source.DoEnqueue((record) =>
_producerQueue.Enqueue(record); {
count++; _context.AddInput();
producerQueue.Enqueue(record);
count++;
}); });
if (_context.GetExceptions().Count > 0) }
if (!_context.GetExceptions().IsEmpty)
{ {
_logger.LogInformation("***** Csv input service is canceled *****"); _logger.LogInformation("***** Csv input service is canceled *****");
return; return;
@@ -63,7 +60,7 @@ public class InputService : IInputService
_logger.LogInformation("table:'{tableName}' input completed", tableName); _logger.LogInformation("table:'{tableName}' input completed", tableName);
} }
_context.CompleteInput(); context.CompleteInput();
_logger.LogInformation("***** Csv input service completed *****"); _logger.LogInformation("***** Csv input service completed *****");
} }
} }

View File

@@ -1,19 +1,19 @@
using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options;
using ConsoleApp2.Services; using ConsoleApp2.Services;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System.Threading.Tasks;
namespace ConsoleApp2.HostedServices; namespace ConsoleApp2.HostedServices;
public class MainHostedService : BackgroundService public class MainHostedService : IHostedService
{ {
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IInputService _input; private readonly IInputService _input;
private readonly ITransformService _transform; private readonly ITransformService _transform;
private readonly IOutputService _output; private readonly IOutputService _output;
private readonly ProcessContext _context; private readonly ProcessContext _context;
private readonly Timer? _bigTableTimer=null;
private readonly Timer? _smallTableTimer=null;
public MainHostedService(ILogger<MainHostedService> logger, IInputService input, ITransformService transform, IOutputService output, ProcessContext context) public MainHostedService(ILogger<MainHostedService> logger, IInputService input, ITransformService transform, IOutputService output, ProcessContext context)
{ {
_logger = logger; _logger = logger;
@@ -23,51 +23,103 @@ public class MainHostedService : BackgroundService
_context = context; _context = context;
} }
protected override async Task ExecuteAsync(CancellationToken stoppingToken) public Task StartAsync(CancellationToken cancellationToken)
{ {
var taskFun = (TasksOptions taskOp, DataRecordQueue producerQueue, DataRecordQueue consumerQueue, ProcessContext context,Timer? timer) =>
var inputTask = Task.Factory.StartNew(async () =>
{
try
{
await _input.ExecuteAsync(stoppingToken);
}
catch (Exception ex)
{
_context.AddException(ex);
_logger.LogError("Exception occurred on inputService:{Message},{StackTrace}", ex.Message, ex.StackTrace);
}
});
var transformTask = Task.Factory.StartNew(async () =>
{ {
try Task.Factory.StartNew(async () =>
{ {
await _transform.ExecuteAsync(stoppingToken); try
} {
catch (Exception ex) await _input.ExecuteAsync(taskOp, producerQueue, context, cancellationToken);
}
catch (Exception ex)
{
_context.AddException(ex);
_logger.LogError("Exception occurred on inputService:{Message},{StackTrace}", ex.Message, ex.StackTrace);
}
});
Task.Factory.StartNew(async () =>
{ {
_context.AddException(ex); try
_logger.LogError("Exception occurred on transformService:{Message},{StackTrace}", ex.Message, ex.StackTrace); {
} await _transform.ExecuteAsync(taskOp, producerQueue, consumerQueue, context, cancellationToken);
}
catch (Exception ex)
{
_context.AddException(ex);
_logger.LogError("Exception occurred on transformService:{Message},{StackTrace}", ex.Message, ex.StackTrace);
}
}); });
Task.Factory.StartNew(() =>
{
try
{
timer = new Timer((object? state) =>
{
_output.ExecuteAsync(taskOp, consumerQueue, context, cancellationToken);
},null, TimeSpan.Zero,TimeSpan.FromSeconds(0.5));
var outputTask = Task.Factory.StartNew(async () => }
catch (Exception ex)
{
_context.AddException(ex);
_logger.LogError("Exception occurred on outputService:{Message},{StackTrace}", ex.Message, ex.StackTrace);
}
});
};
var bigTablesDic = new Dictionary<string, TableInfo>
{ {
try {"order",new TableInfo{SimulaRowCount=5019216 }},
{ {"order_block_plan",new TableInfo{SimulaRowCount=2725553 }},//CreateTime < 202301的删除
await _output.ExecuteAsync(stoppingToken); {"order_block_plan_result",new TableInfo{SimulaRowCount=1174096 }},
} {"order_box_block",new TableInfo{SimulaRowCount=29755672 }},
catch (Exception ex) {"order_item",new TableInfo{SimulaRowCount=1345520079 }},
{ {"simple_plan_order",new TableInfo{SimulaRowCount=351470 }},//CreateTime < 202301的删除
_context.AddException(ex); };
_logger.LogError("Exception occurred on outputService:{Message},{StackTrace}", ex.Message, ex.StackTrace); var bigTableContext = new ProcessContext();
} var bigTableOptions = new TasksOptions { TableInfoConfig = bigTablesDic, OutPutOptions = new OutPutOptions { FlushCount = 20000, OutPutTaskCount = 2 } };
taskFun(bigTableOptions, new DataRecordQueue(), new DataRecordQueue(), bigTableContext,_bigTableTimer);
}); var smallTablesDic = new Dictionary<string, TableInfo>
{
// await Task.Run(async () => await _output.ExecuteAsync(stoppingToken), stoppingToken); {"machine",new TableInfo{SimulaRowCount=14655 }},
{"order_data_block",new TableInfo{SimulaRowCount=731800334 }},
{"order_data_goods",new TableInfo{SimulaRowCount=25803671 }},
{"order_data_parts",new TableInfo{SimulaRowCount=468517543 }},
{"order_module",new TableInfo{SimulaRowCount=103325385 }},
{"order_module_extra",new TableInfo{SimulaRowCount=54361321 }},
{"order_module_item",new TableInfo{SimulaRowCount=69173339 }},
{"order_package",new TableInfo{SimulaRowCount=16196195 }},
{"order_process",new TableInfo{SimulaRowCount=3892685 }},//orderNo < 202301的
{"order_process_step",new TableInfo{SimulaRowCount=8050349 }},//orderNo < 202301的删除
{"order_process_step_item",new TableInfo{SimulaRowCount=14538058 }},//orderNo < 202301的删除
{"order_scrap_board",new TableInfo{SimulaRowCount=123998 }},
{"process_group",new TableInfo{SimulaRowCount=1253 }},
{"process_info",new TableInfo{SimulaRowCount=7839 }},
{"process_item_exp",new TableInfo{SimulaRowCount=28 }},
{"process_schdule_capacity",new TableInfo{SimulaRowCount=39736 }},
{"process_step_efficiency",new TableInfo{SimulaRowCount=8 }},
{"report_template",new TableInfo{SimulaRowCount=7337 }},
{"simple_package",new TableInfo{SimulaRowCount=130436 }},//orderNo < 202301的删除
{"sys_config",new TableInfo{SimulaRowCount=2296 }},
{"work_calendar",new TableInfo{SimulaRowCount=11 }},
{"work_shift",new TableInfo{SimulaRowCount=59 }},
{"work_time",new TableInfo{SimulaRowCount=62 }},
};
var smallTableContext = new ProcessContext();
taskFun(new TasksOptions { TableInfoConfig = smallTablesDic, OutPutOptions = new OutPutOptions { FlushCount = 20000, OutPutTaskCount = 4 } },
new DataRecordQueue(), new DataRecordQueue(), smallTableContext,_smallTableTimer);
return Task.CompletedTask;
} }
public Task StopAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
} }

View File

@@ -1,13 +1,9 @@
using ConsoleApp2.Const; 
using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options; using ConsoleApp2.Options;
using ConsoleApp2.Services; using ConsoleApp2.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using MySqlConnector;
using System.Threading;
namespace ConsoleApp2.HostedServices; namespace ConsoleApp2.HostedServices;
/// <summary> /// <summary>
@@ -16,71 +12,61 @@ namespace ConsoleApp2.HostedServices;
public class OutputService : IOutputService public class OutputService : IOutputService
{ {
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly DataRecordQueue _consumerQueue; private readonly IOptions<DatabaseOutputOptions> _outputOptions;
private readonly IOptions<DataTransformOptions> _transOptions; private readonly IOptions<DataTransformOptions> _transformOptions;
private readonly IOptions<DatabaseOutputOptions> _options;
private readonly ProcessContext _context; private readonly ProcessContext _context;
private readonly TaskManager _taskManager; private readonly TaskManager _taskManager;
private readonly ErrorRecorder _errorRecorder;
public OutputService(ILogger<OutputService> logger, public OutputService(ILogger<OutputService> logger,
[FromKeyedServices(ProcessStep.Consumer)] DataRecordQueue consumerQueue, IOptions<DatabaseOutputOptions> outputOptions,
IOptions<DatabaseOutputOptions> options,
IOptions<DataTransformOptions> transOptions,
ProcessContext context, ProcessContext context,
TaskManager taskManager) TaskManager taskManager,
IOptions<DataTransformOptions> transformOptions,
ErrorRecorder errorRecorder)
{ {
_logger = logger; _logger = logger;
_consumerQueue = consumerQueue; _outputOptions = outputOptions;
_transOptions = transOptions;
_options = options;
_context = context; _context = context;
_taskManager = taskManager; _taskManager = taskManager;
_transformOptions = transformOptions;
_errorRecorder = errorRecorder;
} }
private int _runingTaskCount;
public async Task ExecuteAsync(CancellationToken cancellationToken) public int RuningTaskCount
{ {
_logger.LogInformation("***** Mysql output service started *****"); get => _runingTaskCount;
var count = 0; }
_taskManager.CreateTasks(async () => public void DoTask() => Interlocked.Increment(ref _runingTaskCount);
public void FinishTask() => Interlocked.Decrement(ref _runingTaskCount);
public void ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
{
if (context.IsTransformCompleted == false && consumerQueue.Count < tasksOptions.OutPutOptions.FlushCount) return;
if (RuningTaskCount >= tasksOptions.OutPutOptions.OutPutTaskCount ) return;
var records = new List<DataRecord>();
for (int i = 0; i < tasksOptions.OutPutOptions.FlushCount; i++)
{ {
var records = new List<DataRecord>(); if (consumerQueue.TryDequeue(out var record)) records.Add(record);
while (!_context.IsTransformCompleted || _consumerQueue.Count > 0) else break;
{ }
if (!_consumerQueue.TryDequeue(out var record)) continue; if (records.Count > 0)
records.Add(record); {
count++; ThreadPool.QueueUserWorkItem(async (queueState) =>
//_logger.LogInformation(@"*****OutputCount: {count} *****",count);
if (records.Count >= _options.Value.FlushCount)
{
await FlushAsync(records);
records.Clear();
}
if (_context.GetExceptions().Count>0)
{
_logger.LogInformation("***** Csv output thread is canceled *****");
return;
}
}
if (records.Count > 0)
{ {
DoTask();
await FlushAsync(records); await FlushAsync(records);
records.Clear(); FinishTask();
_logger.LogInformation("***** Mysql output thread completed *****"); });
} }
}, _options.Value.TaskCount);
await _taskManager.WaitAll();
//_context.CompleteOutput();
_logger.LogInformation(@"***** Mysql output service completed *****");
} }
private async Task FlushAsync(IEnumerable<DataRecord> records) private async Task FlushAsync(IEnumerable<DataRecord> records)
{ {
var count = 0; var count = 0;
await using var output = new MySqlDestination( await using var output = new MySqlDestination(
_options.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"), _outputOptions.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"),
_logger, _context,true); _logger, _context, _transformOptions, _errorRecorder);
//if (records == null || records.Count() == 0) return; //if (records == null || records.Count() == 0) return;
//var dbName = $"cferp_test_1"; //var dbName = $"cferp_test_1";
//if (records != null && records.Count() > 0) //if (records != null && records.Count() > 0)
@@ -102,7 +88,7 @@ public class OutputService : IOutputService
await output.WriteRecordAsync(record); await output.WriteRecordAsync(record);
count++; count++;
} }
await output.FlushAsync(_options.Value.MaxAllowedPacket, _transOptions); await output.FlushAsync(_outputOptions.Value.MaxAllowedPacket);
_context.AddOutput(count); _context.AddOutput(count);
} }
} }

View File

@@ -1,83 +0,0 @@
using ConsoleApp2.HostedServices.Abstractions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ConsoleApp2.Const;
using ConsoleApp2.Options;
using ConsoleApp2.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Reflection.PortableExecutable;
using System.Collections.Concurrent;
using ConsoleApp2.SimulationService;
namespace ConsoleApp2.HostedServices
{
public class TestInputService : IInputService
{
private readonly ILogger _logger;
private readonly IOptions<CsvOptions> _csvOptions;
private readonly DataRecordQueue _producerQueue;
private readonly ProcessContext _context;
public TestInputService(ILogger<TestInputService> logger,
IOptions<CsvOptions> csvOptions,
[FromKeyedServices(ProcessStep.Producer)] DataRecordQueue producerQueue,
ProcessContext context)
{
_logger = logger;
_csvOptions = csvOptions;
_producerQueue = producerQueue;
_context = context;
}
public async Task ExecuteAsync(CancellationToken cancellationToken)
{
var tableName = "order_item";
var headers = new string[] { "ID","OrderNo","ItemNo","ItemType","RoomID","BoxID","DataID","PlanID","PackageID","Num","CompanyID","ShardKey" };
var dataCount = 1200000000L;
var tempCount = 80000;
var tempRecords=new List<DataRecord>();
var comanyID = 1;
short[] shareKeys = { 23040, 23070, 23100, 24000, 24040, 24070, 24100, 25000, 25040, 25070, 25100 };
int[] companyIds = { 1, 2, 3, 4 };
var sk = shareKeys.First();
var companyID = companyIds.First();
var shareKeyInterval = 20000;
var getShareKeyTimes = 0;
var getCompanyIDTimes = 0;
var shareKeyIntervalCount = 0;
for (long i = 1; i <= dataCount; i++)
{
shareKeyIntervalCount++;
if (shareKeyIntervalCount > shareKeyInterval) {
sk=DataHelper.GetShareKey(getShareKeyTimes);
getShareKeyTimes++;
shareKeyIntervalCount = 0;
}
var fields = new string[] { i.ToString(), "20220104020855", (220105981029 + i).ToString(), "1", "482278", "482279", "3768774", "0", "0", "1", companyID.ToString(), sk.ToString() };
var record = new DataRecord(fields, tableName, headers, comanyID);
tempRecords.Add(record);
if (tempRecords.Count >= tempCount)
{
foreach (var rc in tempRecords)
{
_context.AddInput();
_producerQueue.Enqueue(rc);
if (cancellationToken.IsCancellationRequested)
return;
}
tempRecords.Clear();
companyID = DataHelper. GetCompanyId(getCompanyIDTimes);
getCompanyIDTimes++;
}
}
_context.CompleteInput();
_logger.LogInformation("***** Csv input service completed *****");
}
}
}

View File

@@ -2,6 +2,7 @@
using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options; using ConsoleApp2.Options;
using ConsoleApp2.Services; using ConsoleApp2.Services;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
@@ -16,95 +17,72 @@ public class TransformService : ITransformService
{ {
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IOptions<DataTransformOptions> _options; private readonly IOptions<DataTransformOptions> _options;
private readonly DataRecordQueue _producerQueue;
private readonly DataRecordQueue _consumerQueue;
private readonly ProcessContext _context; private readonly ProcessContext _context;
private readonly IDatabase _db; private readonly IDistributedCache _cache;
public TransformService(ILogger<TransformService> logger, public TransformService(ILogger<TransformService> logger,
IOptions<DataTransformOptions> options, IOptions<DataTransformOptions> options,
[FromKeyedServices(ProcessStep.Producer)]DataRecordQueue producerQueue, ProcessContext context,
[FromKeyedServices(ProcessStep.Consumer)]DataRecordQueue consumerQueue, IDistributedCache cache)
ProcessContext context, IDatabase db)
{ {
_logger = logger; _logger = logger;
_options = options; _options = options;
_producerQueue = producerQueue;
_consumerQueue = consumerQueue;
_context = context; _context = context;
_db = db; _cache = cache;
} }
public async Task ExecuteAsync(CancellationToken cancellationToken) public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
{ {
_logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId); _logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
while ((!_context.IsInputCompleted || _producerQueue.Count > 0)) while ((!context.IsInputCompleted || producerQueue.Count > 0))
{ {
if (_context.GetExceptions().Count > 0) if (_context.GetExceptions().Count > 0)
{ {
_logger.LogInformation("***** Csv transform service is canceled *****"); _logger.LogInformation("***** Csv transform service is canceled *****");
return; return;
} }
// var dbOptions = _options.Value.DatabaseFilter(record); if (!producerQueue.TryDequeue(out var record)) continue;
if (!_producerQueue.TryDequeue(out var record)) continue;
for (var i = 0; i < record.Fields.Length; i++)
{
var field = record[i];
if (field == "\\N")
{
field = "NULL";
goto Escape;
}
// else if(DumpDataHelper.CheckHexField(field))
// field = $"0x{field}";
switch (_options.Value.GetColumnType(record.TableName, record.Headers[i]))
{
case ColumnType.Text:
field = string.IsNullOrEmpty(field) ? "''" : _options.Value.TransformBinary?.Invoke(field) ?? field; ;
break;
case ColumnType.Blob:
//field = string.IsNullOrEmpty(field) ? "NULL" : $"0x{field}";
break;
default:
break;
}
Escape:
record[i] = field;
}
//过滤不要的record //过滤不要的record
if ( await _options.Value.RecordFilter?.Invoke(record,_db) == false) continue;
if (_options.Value.RecordFilter != null)
{
var result = await _options.Value.RecordFilter.Invoke(record, _cache);
if (result == false) continue;
}
record.Database = _options.Value.DatabaseFilter?.Invoke(record); record.Database = _options.Value.DatabaseFilter?.Invoke(record);
//修改record //修改record
_options.Value.RecordModify?.Invoke(record); _options.Value.RecordModify?.Invoke(record);
//缓存record //缓存record
_options.Value.RecordCache?.Invoke(record, _db); if (_options.Value.RecordCache != null)
//替换record
var replaceRecord =await _options.Value.RecordReplace?.Invoke(record, _db);
if (replaceRecord != null)
{ {
record = replaceRecord; await _options.Value.RecordCache.Invoke(record, _cache);
} }
_consumerQueue.Enqueue(record); //替换record
if (_options.Value.RecordReplace != null)
{
var result = await _options.Value.RecordReplace.Invoke(record, _cache);
if (result != null)
{
record = result;
}
}
consumerQueue.Enqueue(record);
_context.AddTransform(); _context.AddTransform();
//数据增加 //数据增加
var addRecords=_options.Value.RecordAdd?.Invoke(record); var addRecords = _options.Value.RecordAdd?.Invoke(record);
if(addRecords != null&& addRecords.Count>0) if (addRecords != null && addRecords.Count > 0)
{ {
foreach(var rc in addRecords) foreach (var rc in addRecords)
{ {
_consumerQueue.Enqueue(rc); consumerQueue.Enqueue(rc);
_context.AddTransform(); _context.AddTransform();
} }
} }
} }
context.CompleteTransform();
_context.CompleteTransform();
_logger.LogInformation("***** Data transformation service completed *****"); _logger.LogInformation("***** Data transformation service completed *****");
} }
} }

View File

@@ -1,5 +1,6 @@
using ConsoleApp2.Const; using ConsoleApp2.Const;
using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options;
using ConsoleApp2.Services; using ConsoleApp2.Services;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
@@ -22,7 +23,7 @@ public class VoidOutputService : IOutputService
_logger = logger; _logger = logger;
} }
public Task ExecuteAsync(CancellationToken stoppingToken) public void ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
{ {
_logger.LogInformation("***** Void output service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId); _logger.LogInformation("***** Void output service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
while (!_context.IsTransformCompleted || _consumerQueue.Count > 0) while (!_context.IsTransformCompleted || _consumerQueue.Count > 0)
@@ -33,6 +34,5 @@ public class VoidOutputService : IOutputService
_context.CompleteOutput(); _context.CompleteOutput();
_logger.LogInformation("***** Void output service completed *****"); _logger.LogInformation("***** Void output service completed *****");
return Task.CompletedTask;
} }
} }

View File

@@ -7,11 +7,8 @@ namespace ConsoleApp2.Options
{ {
public string InputDir { get; set; } = "./MyDumper"; public string InputDir { get; set; } = "./MyDumper";
public bool IsMock { get; set; } = false;
public int TaskCount { get; set; } = 16; public string NoFilterTables { get; set; }="";//不需要过滤的表列表
public int FlushCount { get; set; } = 20000;
public bool Isutf8mb4 { get; set; } = true; public bool Isutf8mb4 { get; set; } = true;
public short OldestShardKey { get; set; } = 23010; public short OldestShardKey { get; set; } = 23010;

View File

@@ -1,4 +1,5 @@
using StackExchange.Redis; using Microsoft.Extensions.Caching.Distributed;
using StackExchange.Redis;
namespace ConsoleApp2.Options; namespace ConsoleApp2.Options;
@@ -6,20 +7,22 @@ public enum ColumnType
{ {
Blob, Blob,
Text, Text,
Json,
UnDefine, UnDefine,
} }
public class DataTransformOptions public class DataTransformOptions
{ {
public Func<DataRecord, string>? DatabaseFilter { get; set; } public Func<DataRecord, string>? DatabaseFilter { get; set; }
public Func<string, string>? TransformBinary { get; set; }//Binary转字符串方法 public Func<string, string>? TransformBinary { get; set; }//Binary转字符串方法
public Func<DataRecord, IDatabase, Task<bool>>? RecordFilter { get; set; }//数据过滤方法 public Func<DataRecord, IDistributedCache, Task<bool>>? RecordFilter { get; set; }//数据过滤方法
public Action<DataRecord>? RecordModify { get; set; }//数据修改 public Action<DataRecord>? RecordModify { get; set; }//数据修改
public Func<DataRecord, IDatabase, Task<DataRecord?>>? RecordReplace { get; set; }//数据替换 public Func<DataRecord, IDistributedCache, Task<DataRecord?>>? RecordReplace { get; set; }//数据替换
public Func<DataRecord, IList<DataRecord>?>? RecordAdd { get; set; }//数据替换 public Func<DataRecord, IList<DataRecord>?>? RecordAdd { get; set; }//数据替换
public Action<DataRecord, IDatabase>? RecordCache { get; set; }//数据缓存 public Func<DataRecord, IDistributedCache, Task>? RecordCache { get; set; }//数据缓存
/// <summary> /// <summary>
/// 配置导入数据的特殊列 /// 配置导入数据的特殊列

View File

@@ -6,14 +6,5 @@ public class DatabaseOutputOptions
/// 数据库连接字符串 /// 数据库连接字符串
/// </summary> /// </summary>
public string? ConnectionString { get; set; } public string? ConnectionString { get; set; }
/// <summary>
/// 输出服务的任务(Task)数
/// </summary>
public int TaskCount { get; set; }
/// <summary>
/// 每个任务每次提交到数据库的记录数量每N条构建一次SQL语句
/// </summary>
public int FlushCount { get; set; }
public int MaxAllowedPacket { get; set; } = 64*1024*1024; public int MaxAllowedPacket { get; set; } = 64*1024*1024;
} }

View File

@@ -10,8 +10,15 @@ namespace ConsoleApp2.Options
{ {
public long SimulaRowCount { get; set; }//模拟的记录条数 public long SimulaRowCount { get; set; }//模拟的记录条数
} }
public class InputTableOptions public class TasksOptions
{ {
public Dictionary<string, TableInfo> TableInfoConfig { get; set; } = new(); public Dictionary<string, TableInfo> TableInfoConfig { get; set; } = new();
public int TransformTaskCount { get; set; } = 1;
public OutPutOptions OutPutOptions { get; set; }=new();
}
public class OutPutOptions
{
public int FlushCount { get; set; } = 10000;
public int OutPutTaskCount { get; set; } = 1;
} }
} }

View File

@@ -1,11 +1,9 @@
using ConsoleApp2; using ConsoleApp2;
using ConsoleApp2.Const; using ConsoleApp2.Const;
using ConsoleApp2.Helpers;
using ConsoleApp2.HostedServices; using ConsoleApp2.HostedServices;
using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options; using ConsoleApp2.Options;
using ConsoleApp2.Services; using ConsoleApp2.Services;
using ConsoleApp2.SimulationService;
using Microsoft.Extensions.Caching.StackExchangeRedis; using Microsoft.Extensions.Caching.StackExchangeRedis;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
@@ -13,13 +11,9 @@ using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using MySqlConnector; using MySqlConnector;
using Serilog; using Serilog;
using Serilog.Core; using Microsoft.Extensions.Caching.Distributed;
using StackExchange.Redis; using Serilog.Events;
using System.Reflection.PortableExecutable;
// 运行之前把Mysql max_allowed_packets 调大
// 运行之前把process_step表的外键删掉
await RunProgram(); await RunProgram();
return; return;
@@ -62,55 +56,12 @@ async Task RunProgram()
//}, "请输入单次插入的行数(默认为20000):"); //}, "请输入单次插入的行数(默认为20000):");
ThreadPool.SetMaxThreads(200, 200); ThreadPool.SetMaxThreads(8, 4);
var host = Host.CreateApplicationBuilder(args); var host = Host.CreateApplicationBuilder(args);
var commandOptions = host.Configuration.GetSection("CmdOptions").Get<CommandOptions>() ?? new CommandOptions(); var commandOptions = host.Configuration.GetSection("CmdOptions").Get<CommandOptions>() ?? new CommandOptions();
Console.WriteLine($"InputDir:{commandOptions?.InputDir}"); Console.WriteLine($"InputDir:{commandOptions?.InputDir}");
Console.WriteLine($"OutPutFlushCount:{commandOptions?.FlushCount}"); if (commandOptions == null) throw new ArgumentNullException("commandOptions is null");
Console.WriteLine($"OutPutTaskCount:{commandOptions?.TaskCount}");
var oldestTime = DateTime.ParseExact(commandOptions.OldestTime, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo); var oldestTime = DateTime.ParseExact(commandOptions.OldestTime, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo);
host.Services.Configure<InputTableOptions>(option =>
{
option.TableInfoConfig = new Dictionary<string, TableInfo>
{
//order_block_plan_item从order_item表查询然后程序插入
//order_package_item从order_item表查询然后程序插入
//order_patch_detail生产没有这个表不处理
{"machine",new TableInfo{SimulaRowCount=14655 }},
{"order",new TableInfo{SimulaRowCount=5019216 }},
{"order_block_plan",new TableInfo{SimulaRowCount=2725553 }},//CreateTime < 202301的删除
{"order_block_plan_result",new TableInfo{SimulaRowCount=1174096 }},
{"order_box_block",new TableInfo{SimulaRowCount=29755672 }},
{"order_data_block",new TableInfo{SimulaRowCount=731800334 }},
{"order_data_goods",new TableInfo{SimulaRowCount=25803671 }},
{"order_data_parts",new TableInfo{SimulaRowCount=468517543 }},
{"order_item",new TableInfo{SimulaRowCount=1345520079 }},
{"order_module",new TableInfo{SimulaRowCount=103325385 }},
{"order_module_extra",new TableInfo{SimulaRowCount=54361321 }},
{"order_module_item",new TableInfo{SimulaRowCount=69173339 }},
{"order_package",new TableInfo{SimulaRowCount=16196195 }},
{"order_process",new TableInfo{SimulaRowCount=3892685 }},//orderNo < 202301的
{"order_process_step",new TableInfo{SimulaRowCount=8050349 }},//orderNo < 202301的删除
{"order_process_step_item",new TableInfo{SimulaRowCount=14538058 }},//orderNo < 202301的删除
{"order_scrap_board",new TableInfo{SimulaRowCount=123998 }},
{"process_group",new TableInfo{SimulaRowCount=1253 }},
{"process_info",new TableInfo{SimulaRowCount=7839 }},
{"process_item_exp",new TableInfo{SimulaRowCount=28 }},
{"process_schdule_capacity",new TableInfo{SimulaRowCount=39736 }},
{"process_step_efficiency",new TableInfo{SimulaRowCount=8 }},
{"report_template",new TableInfo{SimulaRowCount=7337 }},
{"simple_package",new TableInfo{SimulaRowCount=130436 }},//orderNo < 202301的删除
{"simple_plan_order",new TableInfo{SimulaRowCount=351470 }},//CreateTime < 202301的删除
{"sys_config",new TableInfo{SimulaRowCount=2296 }},
{"work_calendar",new TableInfo{SimulaRowCount=11 }},
{"work_shift",new TableInfo{SimulaRowCount=59 }},
{"work_time",new TableInfo{SimulaRowCount=62 }},
};
});
host.Services.Configure<CsvOptions>(option => host.Services.Configure<CsvOptions>(option =>
{ {
option.Delimiter = ","; option.Delimiter = ",";
@@ -129,13 +80,17 @@ async Task RunProgram()
host.Services.Configure<DataTransformOptions>(options => host.Services.Configure<DataTransformOptions>(options =>
{ {
if (commandOptions.IsMock) return;
options.DatabaseFilter = record => "cferp_test"; options.DatabaseFilter = record => "cferp_test";
options.TransformBinary = field => commandOptions != null && commandOptions.Isutf8mb4 ? $"_utf8mb4 0x{field}" : $"0x{field}"; options.TransformBinary = field => commandOptions != null && commandOptions.Isutf8mb4 ? $"_utf8mb4 0x{field}" : $"0x{field}";
var noFilterTables = commandOptions.NoFilterTables.Split(",");
//数据过滤 //数据过滤
options.RecordFilter = async (record, db) => options.RecordFilter = async (record, cache) =>
{ {
//var index = Array.IndexOf(record.Headers, "ShardKey");
if (noFilterTables.Contains(record.TableName)) return true;
if (record.TryGetField("ShardKey", out var skStr)) if (record.TryGetField("ShardKey", out var skStr))
{ {
short.TryParse(skStr, out var sk); short.TryParse(skStr, out var sk);
@@ -157,7 +112,7 @@ async Task RunProgram()
if (dt < oldestTime) return false; if (dt < oldestTime) return false;
} }
catch (Exception ex) catch (Exception)
{ {
return false;//订单号转换失败,跳过 return false;//订单号转换失败,跳过
@@ -183,19 +138,23 @@ async Task RunProgram()
if (record.TableName == "order_process_step" || record.TableName == "order_process_step_item") if (record.TableName == "order_process_step" || record.TableName == "order_process_step_item")
{ {
//如果缓存中不存在OrderProcessID,则丢弃 //如果缓存中不存在OrderProcessID,则丢弃
if(record.TryGetField("OrderProcessID",out var orderProcessID))
if(record.TryGetField("OrderProcessID",out string orderProcessID))
{ {
var value = await db.StringGetAsync(orderProcessID); var value = await cache.GetStringAsync($"order_process_{orderProcessID}");
if (string.IsNullOrEmpty(value.ToString()))return false; if (string.IsNullOrEmpty(value)) return false;
} }
} }
if (record.TableName == "order_block_plan_result" ) if (record.TableName == "order_block_plan_result" )
{ {
//如果缓存中不存在ID,则丢弃(ID 对应order_block_plan中的ID) //如果缓存中不存在ID,则丢弃(ID 对应order_block_plan中的ID)
if (record.TryGetField("ID", out var id)) if (record.TryGetField("ID", out string id))
{ {
var value = await db.StringGetAsync(id); var value = await cache.GetStringAsync($"order_block_plan_{id}");
if (string.IsNullOrEmpty(value.ToString())) return false; if (string.IsNullOrEmpty(value)) return false;
} }
} }
return true; return true;
@@ -208,7 +167,7 @@ async Task RunProgram()
{ {
if (record.TryGetField("OrderNos", out var nos)) if (record.TryGetField("OrderNos", out var nos))
{ {
if (nos.Length <= 2) record.SetField("OrderNos", "\"[]\""); if (nos.Length <= 2) record.SetField("OrderNos", "");
} }
} }
@@ -218,7 +177,7 @@ async Task RunProgram()
if (record.TryGetField("NextStepID", out var idStr)) if (record.TryGetField("NextStepID", out var idStr))
{ {
if (idStr == "NULL") if (idStr == "\\N")
{ {
record.SetField("NextStepID", "0"); record.SetField("NextStepID", "0");
} }
@@ -227,7 +186,7 @@ async Task RunProgram()
}; };
//数据缓存 //数据缓存
options.RecordCache = async (record, db) => options.RecordCache = async (record, cache) =>
{ {
if (record.TableName == "order") if (record.TableName == "order")
{ {
@@ -235,7 +194,7 @@ async Task RunProgram()
{ {
if (record.TryGetField("CompanyID", out var companyid)) if (record.TryGetField("CompanyID", out var companyid))
{ {
await db.StringSetAsync(orderNo, companyid); await cache.SetStringAsync(record.GetCacheKey("OrderNo"), companyid);
} }
} }
@@ -249,14 +208,7 @@ async Task RunProgram()
if( record.TryGetField("ID", out var id)) if( record.TryGetField("ID", out var id))
{ {
try await cache.SetStringAsync(record.GetCacheKey("ID"), sk);
{
await db.StringSetAsync(id, sk);
}
catch (Exception ex)
{
}
} }
} }
@@ -266,12 +218,12 @@ async Task RunProgram()
if (record.TryGetField("CompanyID", out var companyid)) if (record.TryGetField("CompanyID", out var companyid))
{ {
record.TryGetField("ID", out var id); record.TryGetField("ID", out var id);
await db.StringSetAsync(id, companyid); await cache.SetStringAsync(record.GetCacheKey("ID"), companyid);
} }
} }
}; };
//数据替换 //数据替换
options.RecordReplace = async (record, db) => options.RecordReplace = async (record, cache) =>
{ {
//删除数据源里simple_plan_order.ProcessState 字段和值 //删除数据源里simple_plan_order.ProcessState 字段和值
@@ -306,8 +258,9 @@ async Task RunProgram()
var headers = new List<string>(record.Headers); var headers = new List<string>(record.Headers);
var fields =new List<string>(record.Fields); var fields =new List<string>(record.Fields);
headers.Add("CompanyID"); headers.Add("CompanyID");
var companyidResult =await db.StringGetAsync(id); var companyidResult =await cache.GetStringAsync($"order_block_plan_{id}");
_ = int.TryParse(companyidResult.ToString(), out var companyid);
_ = int.TryParse(companyidResult, out var companyid);
fields.Add(companyid.ToString()); fields.Add(companyid.ToString());
return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), companyid); return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), companyid);
} }
@@ -321,8 +274,8 @@ async Task RunProgram()
var headers = new List<string>(record.Headers); var headers = new List<string>(record.Headers);
var fields = new List<string>(record.Fields); var fields = new List<string>(record.Fields);
headers.Add("CompanyID"); headers.Add("CompanyID");
var companyidResult = await db.StringGetAsync(orderNo); var companyidResult = await cache.GetStringAsync($"order_{orderNo}");
_ = int.TryParse(companyidResult.ToString(), out var cpid); _ = int.TryParse(companyidResult, out var cpid);
fields.Add(cpid.ToString()); fields.Add(cpid.ToString());
return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), cpid); return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), cpid);
} }
@@ -363,20 +316,12 @@ async Task RunProgram()
{ {
if (record.TryGetField("OrderProcessID",out var processID)) if (record.TryGetField("OrderProcessID",out var processID))
{ {
try var shardKey =await cache.GetStringAsync($"order_process_{processID}");
{
var shardKey =await db.StringGetAsync(processID);
var headers = new List<string>(record.Headers); var headers = new List<string>(record.Headers);
var fields = new List<string>(record.Fields); var fields = new List<string>(record.Fields);
headers.Add("ShardKey"); headers.Add("ShardKey");
fields.Add(shardKey.ToString()); fields.Add(shardKey??"0");
return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), record.CompanyID); return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), record.CompanyID);
}
catch (Exception ex)
{
}
} }
} }
} }
@@ -455,7 +400,7 @@ async Task RunProgram()
{ "process_item_exp.ItemJson", ColumnType.Text }, { "process_item_exp.ItemJson", ColumnType.Text },
{ "report_template.Template", ColumnType.Text }, { "report_template.Template", ColumnType.Text },
{ "report_template.SourceConfig", ColumnType.Text }, { "report_template.SourceConfig", ColumnType.Text },
{ "order_block_plan.OrderNos", ColumnType.Text }, { "order_block_plan.OrderNos", ColumnType.Json },
{ "order_block_plan.BlockInfo", ColumnType.Text }, { "order_block_plan.BlockInfo", ColumnType.Text },
}; };
}); });
@@ -470,36 +415,46 @@ async Task RunProgram()
// Password = "123456", // Password = "123456",
// MaximumPoolSize = 50, // 这个值应当小于 max_connections // MaximumPoolSize = 50, // 这个值应当小于 max_connections
//}.ConnectionString; //}.ConnectionString;
options.ConnectionString = new MySqlConnectionStringBuilder(host.Configuration.GetConnectionString("MySqlMaster")) options.ConnectionString = new MySqlConnectionStringBuilder(host.Configuration.GetConnectionString("MySqlMaster")??"")
{ {
CharacterSet = "utf8", CharacterSet = "utf8",
AllowUserVariables = true, AllowUserVariables = true,
IgnoreCommandTransaction = true, IgnoreCommandTransaction = true,
TreatTinyAsBoolean = false, TreatTinyAsBoolean = false,
MaximumPoolSize = 50 MaximumPoolSize = 50,
SslMode = MySqlSslMode.None,
}.ConnectionString; }.ConnectionString;
options.TaskCount = commandOptions.TaskCount;
options.FlushCount = commandOptions.FlushCount;
}); });
host.Services.AddLogging(builder => host.Services.AddLogging(builder =>
{ {
builder.ClearProviders(); builder.ClearProviders();
builder.AddSerilog(new LoggerConfiguration().WriteTo.Console().CreateLogger()); builder.AddSerilog(new LoggerConfiguration()
.WriteTo.Console()
.WriteTo.File("./log/error.log", restrictedToMinimumLevel:LogEventLevel.Error)
// .WriteTo.File("./log/info.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
.CreateLogger()
);
}); });
host.Services.AddSingleton<ProcessContext>(); host.Services.AddScoped<ProcessContext>();
host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Producer); host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Producer);
host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Consumer); host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Consumer);
host.Services.AddTransient<TaskManager>(); host.Services.AddTransient<TaskManager>();
host.Services.AddSingleton<ErrorRecorder>();
host.Services.AddHostedService<MainHostedService>(); host.Services.AddHostedService<MainHostedService>();
host.Services.AddHostedService<TaskMonitorService>(); host.Services.AddHostedService<TaskMonitorService>();
host.Services.AddSingleton<IInputService, InputService>(); if(commandOptions.IsMock)host.Services.AddSingleton<IInputService,InputService>();
else host.Services.AddSingleton<IInputService, InputService>();
host.Services.AddSingleton<ITransformService, TransformService>(); host.Services.AddSingleton<ITransformService, TransformService>();
host.Services.AddSingleton<IOutputService, OutputService>(); host.Services.AddSingleton<IOutputService, OutputService>();
var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get<RedisCacheOptions>() ?? new RedisCacheOptions(); var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get<RedisCacheOptions>() ?? new RedisCacheOptions();
var redis = ConnectionMultiplexer.Connect(redisOptions.Configuration); host.Services.AddStackExchangeRedisCache(options =>
host.Services.AddSingleton(redis.GetDatabase()); {
options.Configuration = redisOptions.Configuration;
options.InstanceName = redisOptions.InstanceName;
});
var app = host.Build(); var app = host.Build();
await app.RunAsync(); await app.RunAsync();
} }

View File

@@ -16,10 +16,10 @@ public class CsvSource:IDataSource
//protected readonly StreamReader _reader; //protected readonly StreamReader _reader;
private readonly ILogger? _logger; private readonly ILogger? _logger;
protected readonly string _tableName; protected readonly string _tableName;
protected string? _sqlFilePath; protected string _sqlFilePath=string.Empty;
protected readonly string? _sqlFileText; protected readonly string? _sqlFileText;
protected string[]? headers; protected string[] headers=Array.Empty<string>();
protected string[]? csvFiles; protected string[] csvFiles = Array.Empty<string>();
public string? CurrentRaw { get; protected set; } public string? CurrentRaw { get; protected set; }
public string Delimiter { get; private set; } public string Delimiter { get; private set; }
public char QuoteChar { get; private set; } public char QuoteChar { get; private set; }
@@ -31,9 +31,6 @@ public class CsvSource:IDataSource
_logger = logger; _logger = logger;
Delimiter = delimiter; Delimiter = delimiter;
QuoteChar = quoteChar; QuoteChar = quoteChar;
string pattern = $"^.*\\.{tableName}\\..*\\.sql$";
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success);
} }
@@ -129,9 +126,11 @@ public class CsvSource:IDataSource
} }
public virtual async Task GetHeaderAndCsvFiles() public virtual async Task GetHeaderAndCsvFiles()
{ {
string pattern = $"^.*\\.{_tableName}\\..*\\.sql$";
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success) ?? "";
var text = await File.ReadAllTextAsync(_sqlFilePath); var text = await File.ReadAllTextAsync(_sqlFilePath);
headers = await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text); headers = DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
csvFiles = await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'")); csvFiles = DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
} }
public virtual async Task DoEnqueue(Action<DataRecord> action) public virtual async Task DoEnqueue(Action<DataRecord> action)
@@ -140,9 +139,9 @@ public class CsvSource:IDataSource
foreach (var file in csvFiles) foreach (var file in csvFiles)
{ {
var filePath= Path.Combine(_inputDir, file); var filePath= Path.Combine(_inputDir, file);
using (var fs = File.OpenRead(filePath)) using var fs = File.OpenRead(filePath);
{ {
using (StreamReader sr = new StreamReader(fs)) using StreamReader sr = new (fs);
{ {
while (!sr.EndOfStream) while (!sr.EndOfStream)
{ {
@@ -164,9 +163,9 @@ public class CsvSource:IDataSource
if (file != null) if (file != null)
{ {
var filePath = Path.Combine(_inputDir, file); var filePath = Path.Combine(_inputDir, file);
using (var fs = File.OpenRead(filePath)) using var fs = File.OpenRead(filePath);
{ {
using (StreamReader sr = new StreamReader(fs)) using StreamReader sr = new(fs);
{ {
var line = await sr.ReadLineAsync(); var line = await sr.ReadLineAsync();
var fields = ParseRow2(line, QuoteChar, Delimiter); var fields = ParseRow2(line, QuoteChar, Delimiter);

View File

@@ -0,0 +1,104 @@
using System.Text;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.Services;
public class ErrorRecorder
{
private readonly string _outputDir = "./ErrorRecords";
private readonly ILogger _logger;
private readonly Dictionary<string, int> _logIndex = new();
/// <summary>
/// 当次执行标识
/// </summary>
private static readonly string UID = DateTime.Now.ToString("yyyy-MM-dd HH-mm-ss");
public ErrorRecorder(ILogger<ErrorRecorder> logger)
{
_logger = logger;
var dir = Path.Combine(_outputDir, UID);
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
}
}
/// <summary>
/// 记录已知表名发生错误的SQL
/// </summary>
/// <param name="commandText"></param>
/// <param name="tableName"></param>
/// <param name="exception"></param>
public async Task LogErrorSqlAsync(string commandText, string tableName, Exception exception)
{
if (!_logIndex.TryGetValue(tableName, out var idx))
{
idx = 0;
_logIndex.Add(tableName, idx);
}
var filePath = Path.Combine(_outputDir, UID, $"{tableName}-{idx}.errlog");
if (File.Exists(filePath) && new FileInfo(filePath).Length > 10 * 1024 * 1024)
{
++idx;
_logIndex[tableName] = idx;
filePath = Path.Combine(_outputDir, UID, $"{tableName}-{idx}.errlog");
}
var content = $"""
/* [{DateTime.Now:yyyy-MM-dd HH:mm:ss}]
* Error occurred when export table '{tableName}':
* {exception.Message}
*/
{commandText}
""";
await File.AppendAllTextAsync(filePath, content, Encoding.UTF8);
}
/// <summary>
/// 记录发生错误的SQL
/// </summary>
/// <param name="commandText"></param>
/// <param name="exception"></param>
public async Task LogErrorSqlAsync(string commandText, Exception exception)
{
var filePath = Path.Combine(_outputDir, UID, "UnknownTables.errlog");
var content = $"""
/* [{DateTime.Now:yyyy-MM-dd HH:mm:ss}]
* Error occurred when export table with unknown table name:
* {exception.Message}
*/
{commandText}
""";
await File.AppendAllTextAsync(filePath, content, Encoding.UTF8);
}
public async Task LogErrorRecordsAsync(IDictionary<string, DataRecord> records, Exception exception)
{
var pathDict = new Dictionary<string, string>();
foreach (var pair in records)
{
if(!pathDict.TryGetValue(pair.Key, out var path))
{
path = Path.Combine(_outputDir, UID, "ErrorRecords", $"{pair.Key}.errlog");
pathDict.Add(pair.Key, path);
}
//
await File.AppendAllTextAsync(path, string.Join(',', pair.Value.Fields));
}
}
public void ClearErrorRecords()
{
_logger.LogInformation("***** Clear error records *****");
foreach (var file in Directory.GetFiles(_outputDir, "*.errlog", SearchOption.AllDirectories))
{
File.Delete(file);
}
}
}

View File

@@ -11,34 +11,31 @@ namespace ConsoleApp2.Services;
[Obsolete] [Obsolete]
public class JsvSource:IDataSource public class JsvSource:IDataSource
{ {
private readonly string _inputDir; //private readonly string _inputDir;
private readonly JsvStringSerializer _jsv; //private readonly JsvStringSerializer _jsv;
private readonly StreamReader _reader; //private readonly StreamReader? _reader;
// ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable // ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable
private readonly ILogger? _logger; //private readonly ILogger? _logger;
private readonly string _tableName; //private readonly string _tableName;
public DataRecord Current { get; protected set; } = null!; public DataRecord Current { get; protected set; } = null!;
public string[]? Headers { get; } public string[]? Headers { get; }
public bool EndOfSource => _reader.EndOfStream; //public bool EndOfSource => _reader.EndOfStream;
public JsvSource(string inputDir,string tableName, ILogger? logger = null) public JsvSource(string inputDir,string tableName, ILogger? logger = null)
{ {
_inputDir = inputDir; //_inputDir = inputDir;
_tableName = tableName; //_tableName = tableName;
_jsv = new JsvStringSerializer(); //_jsv = new JsvStringSerializer();
// _reader = new StreamReader(filePath); //_logger = logger;
//Headers = headers;
_logger = logger;
// _logger?.LogInformation("Reading file: {FilePath}", filePath);
//_tableName = DumpDataHelper.GetTableName(filePath);
} }
public async Task DoEnqueue(Action<DataRecord> action) public Task DoEnqueue(Action<DataRecord> action)
{ {
return Task.CompletedTask;
} }
public void Dispose() public void Dispose()
{ {
_reader.Dispose(); // _reader?.Dispose();
} }
} }

View File

@@ -1,5 +1,6 @@
using System.Reflection.Metadata; using System.Data.Common;
using System.Text; using System.Text;
using System.Text.RegularExpressions;
using ConsoleApp2.Helpers; using ConsoleApp2.Helpers;
using ConsoleApp2.Options; using ConsoleApp2.Options;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@@ -12,27 +13,31 @@ namespace ConsoleApp2.Services;
/// <summary> /// <summary>
/// Mysql导出 /// Mysql导出
/// </summary> /// </summary>
public class MySqlDestination : IDisposable, IAsyncDisposable public partial class MySqlDestination : IDisposable, IAsyncDisposable
{ {
private readonly Dictionary<string, IList<DataRecord>> _recordCache; private readonly Dictionary<string, IList<DataRecord>> _recordCache;
private readonly MySqlConnection _conn; private readonly MySqlConnection _conn;
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly bool _prettyOutput;
private readonly int _maxAllowPacket;
private readonly ProcessContext _context; private readonly ProcessContext _context;
private readonly IOptions<DataTransformOptions> _transformOptions;
private readonly ErrorRecorder _errorRecorder;
public MySqlDestination(string connStr, ILogger logger, ProcessContext context,bool prettyOutput = false) public MySqlDestination(
string connStr,
ILogger logger,
ProcessContext context,
IOptions<DataTransformOptions> transformOptions,
ErrorRecorder errorRecorder)
{ {
_conn = new MySqlConnection(connStr); _conn = new MySqlConnection(connStr);
_conn.Open(); _conn.Open();
_recordCache = new Dictionary<string, IList<DataRecord>>(); _recordCache = new Dictionary<string, IList<DataRecord>>();
_logger = logger; _logger = logger;
_context = context; _context = context;
_prettyOutput = prettyOutput; _transformOptions = transformOptions;
_errorRecorder = errorRecorder;
} }
public Task WriteRecordAsync(DataRecord record) public Task WriteRecordAsync(DataRecord record)
{ {
_recordCache.AddOrUpdate(record.TableName, [record], (key, value) => _recordCache.AddOrUpdate(record.TableName, [record], (key, value) =>
@@ -51,28 +56,44 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
} }
} }
public async Task FlushAsync(int maxAllowPacket, IOptions<DataTransformOptions> transOptions) public async Task FlushAsync(int maxAllowPacket)
{ {
if (_recordCache.Count == 0) if (_recordCache.Count == 0)
return; return;
var cmd = _conn.CreateCommand(); var cmd = _conn.CreateCommand();
cmd.CommandTimeout = 3 * 60; cmd.CommandTimeout = 3 * 60;
var excuseList = GetExcuseList(_recordCache, maxAllowPacket, transOptions, _prettyOutput);
try try
{ {
var excuseList = GetExcuseList(_recordCache, maxAllowPacket).ToList();
foreach (var insertSql in excuseList) foreach (var insertSql in excuseList)
{ {
cmd.CommandText = insertSql; cmd.CommandText = insertSql;
await cmd.ExecuteNonQueryAsync(); try
{
await cmd.ExecuteNonQueryAsync();
}
catch (Exception e)
{
_logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000));
_context.AddException(e);
var match = MatchTableName().Match(cmd.CommandText);
if (match is { Success: true, Groups.Count: > 1 })
{
var tableName = match.Groups[1].Value;
await _errorRecorder.LogErrorSqlAsync(cmd.CommandText, tableName, e);
}
else await _errorRecorder.LogErrorSqlAsync(cmd.CommandText, e);
}
} }
_recordCache.Clear(); _recordCache.Clear();
} }
catch (Exception e) catch (Exception e)
{ {
_logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000)); _logger.LogCritical(e, "Error when serialize records, record:");
_context.AddException(e); _context.AddException(e);
throw;
} }
finally finally
{ {
@@ -80,89 +101,102 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
} }
} }
public static IList<string> GetExcuseList(IDictionary<string, IList<DataRecord>> tableRecords,int maxAllowPacket, IOptions<DataTransformOptions> transOptions, [GeneratedRegex("INSERT INTO `([^`]+)`")]
bool prettyOutput = false) private static partial Regex MatchTableName();
public IEnumerable<string> GetExcuseList(IDictionary<string, IList<DataRecord>> tableRecords,int maxAllowPacket)
{ {
var resultList = new List<string>(); var sb = new StringBuilder();
var headerSb = new StringBuilder();
var recordSb = new StringBuilder();
foreach (var (tableName, records) in tableRecords) foreach (var (tableName, records) in tableRecords)
{ {
if (records.Count == 0) if (records.Count == 0)
continue; continue;
headerSb.Append($"INSERT INTO `{tableName}`(");
var recordIdx = 0;
StartBuild:
var noCommas = true;
// INSERT INTO ... VALUES >>>
sb.Append($"INSERT INTO `{tableName}`(");
for (var i = 0; i < records[0].Headers.Length; i++) for (var i = 0; i < records[0].Headers.Length; i++)
{ {
var header = records[0].Headers[i]; var header = records[0].Headers[i];
headerSb.Append($"`{header}`"); sb.Append($"`{header}`");
if (i != records[0].Headers.Length - 1) if (i != records[0].Headers.Length - 1)
headerSb.Append(','); sb.Append(',');
} }
headerSb.Append(") VALUES "); sb.Append(") VALUES ");
if (prettyOutput)
headerSb.AppendLine();
var sbList = new List<string>(); // ([FIELDS]), >>>
var currentLength = headerSb.Length; for (;recordIdx < records.Count; recordIdx++)
for (var i = 0; i < records.Count; i++)
{ {
var record = records[i]; var record = records[recordIdx];
var recordSb = new StringBuilder();
recordSb.Append('('); recordSb.Append('(');
for (var j = 0; j < record.Fields.Length; j++) for (var fieldIdx = 0; fieldIdx < record.Fields.Length; fieldIdx++)
{ {
var field = record.Fields[j]; var field = record.Fields[fieldIdx];
var header = record.Headers[j];
if (transOptions.Value.GetColumnType(record.TableName, header) ==ColumnType.Blob) // 在这里处理特殊列
#region HandleFields
if (field == "\\N")
{ {
if (string.IsNullOrEmpty(field)) recordSb.Append("NULL");
{ goto Escape;
recordSb.Append("NULL");
}
else
recordSb.Append("0x"+field);
} }
else
recordSb.Append(field); switch (_transformOptions.Value.GetColumnType(record.TableName, record.Headers[fieldIdx]))
if (j != record.Fields.Length - 1) {
case ColumnType.Text:
recordSb.Append(string.IsNullOrEmpty(field)
? "''"
: _transformOptions.Value.TransformBinary?.Invoke(field) ?? field);
break;
case ColumnType.Blob:
if (string.IsNullOrEmpty(field))
recordSb.Append("''");
else recordSb.Append($"0x{field}");
break;
case ColumnType.Json:
recordSb.Append(string.IsNullOrEmpty(field)
? "\"[]\""
: _transformOptions.Value.TransformBinary?.Invoke(field) ?? field);
break;
case ColumnType.UnDefine:
default:
recordSb.Append(field);
break;
}
Escape:
#endregion
if (fieldIdx != record.Fields.Length - 1)
recordSb.Append(','); recordSb.Append(',');
} }
recordSb.Append(')'); recordSb.Append(')');
//if (i != records.Count - 1) // not last field // 若字符数量即将大于限制则返回SQL清空StringBuilder保留当前记录的索引值然后转到StartBuild标签重新开始一轮INSERT
// recordSb.Append(','); if (sb.Length + recordSb.Length + 1 > maxAllowPacket)
if (prettyOutput) recordSb.AppendLine(); {
sb.Append(';');
yield return sb.ToString();
sb.Clear();
goto StartBuild;
}
if (currentLength + recordSb.Length >= maxAllowPacket) if (!noCommas)
{ sb.Append(',').AppendLine();
var insertSb = new StringBuilder(headerSb.ToString()); noCommas = false;
insertSb.Append(string.Join(",", sbList)); sb.Append(recordSb); // StringBuilder.Append(StringBuilder)不会分配多余的内存
insertSb.Append(";");
resultList.Add(insertSb.ToString());
insertSb.Clear();
sbList.Clear();
sbList.Add(recordSb.ToString());
currentLength = headerSb.Length + 1;//逗号长度加1
}
else
{
sbList.Add(recordSb.ToString());
}
currentLength += recordSb.Length;
recordSb.Clear();
} }
if (sbList.Count > 0)
{ sb.Append(';');
var insertSb = new StringBuilder(headerSb.ToString()); yield return sb.ToString();
insertSb.Append(string.Join(",", sbList)); sb.Clear();
insertSb.Append(";");
resultList.Add(insertSb.ToString());
insertSb.Clear();
}
headerSb.Clear();
} }
return resultList;
} }

View File

@@ -1,4 +1,6 @@
namespace ConsoleApp2.Services; using System.Collections.Concurrent;
namespace ConsoleApp2.Services;
/// <summary> /// <summary>
/// 处理上下文类,标识处理进度 /// 处理上下文类,标识处理进度
@@ -8,7 +10,7 @@ public class ProcessContext
private int _inputCount; private int _inputCount;
private int _transformCount; private int _transformCount;
private int _outputCount; private int _outputCount;
private IList<Exception> _exceptionList = new List<Exception>(); private ConcurrentBag<Exception> _exceptionList = new ConcurrentBag<Exception>();
public bool IsInputCompleted { get; private set; } public bool IsInputCompleted { get; private set; }
public bool IsTransformCompleted { get; private set; } public bool IsTransformCompleted { get; private set; }
public bool IsOutputCompleted { get; private set; } public bool IsOutputCompleted { get; private set; }
@@ -34,7 +36,7 @@ public class ProcessContext
{ {
_exceptionList.Add(ex); _exceptionList.Add(ex);
} }
public IList<Exception> GetExceptions() public ConcurrentBag<Exception> GetExceptions()
{ {
return _exceptionList; return _exceptionList;
} }

View File

@@ -1,6 +1,5 @@
using ConsoleApp2.Helpers; using ConsoleApp2.Helpers;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System.IO;
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using ZstdSharp; using ZstdSharp;
namespace ConsoleApp2.Services namespace ConsoleApp2.Services
@@ -10,22 +9,19 @@ namespace ConsoleApp2.Services
public ZstSource(string inputDir, string tableName, string delimiter = ",", char quoteChar = '"', public ZstSource(string inputDir, string tableName, string delimiter = ",", char quoteChar = '"',
ILogger? logger = null) : base(inputDir, tableName, delimiter = ",", quoteChar = '"', logger = null) ILogger? logger = null) : base(inputDir, tableName, delimiter = ",", quoteChar = '"', logger = null)
{ {
//throw new Exception("aaa");
string pattern = $"^.*\\.{tableName}\\..*\\.sql.zst$";
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success);
} }
private async Task<string> DecompressFile(string filePath) private static async Task<string> DecompressFile(string filePath)
{ {
using (var input = File.OpenRead(filePath)) using var input = File.OpenRead(filePath);
{ {
using (var decopress = new DecompressionStream(input)) using var decopress = new DecompressionStream(input);
{ {
var ms = new MemoryStream(); var ms = new MemoryStream();
decopress.CopyTo(ms); decopress.CopyTo(ms);
ms.Seek(0, SeekOrigin.Begin); ms.Seek(0, SeekOrigin.Begin);
StreamReader reader = new StreamReader(ms); StreamReader reader = new(ms);
var text = await reader.ReadToEndAsync(); var text = await reader.ReadToEndAsync();
return text; return text;
@@ -34,9 +30,11 @@ namespace ConsoleApp2.Services
} }
public override async Task GetHeaderAndCsvFiles() public override async Task GetHeaderAndCsvFiles()
{ {
string pattern = $"^.*\\.{_tableName}\\..*\\.sql.zst$";
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success) ?? "";
var text = await DecompressFile(_sqlFilePath); var text = await DecompressFile(_sqlFilePath);
headers=await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text); headers= DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
csvFiles=await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'")); csvFiles= DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
} }
public override async Task DoEnqueue(Action<DataRecord> action) public override async Task DoEnqueue(Action<DataRecord> action)
@@ -45,11 +43,11 @@ namespace ConsoleApp2.Services
foreach (var file in csvFiles) foreach (var file in csvFiles)
{ {
var filePath = Path.Combine(_inputDir, file); var filePath = Path.Combine(_inputDir, file);
using (var input = File.OpenRead(filePath)) using var input = File.OpenRead(filePath);
{ {
using (var decopress = new DecompressionStream(input)) using var decopress = new DecompressionStream(input);
{ {
using( var reader = new StreamReader(decopress)) using var reader = new StreamReader(decopress);
{ {
while (!reader.EndOfStream) while (!reader.EndOfStream)
{ {
@@ -67,15 +65,15 @@ namespace ConsoleApp2.Services
public override async Task<DataRecord?> GetTestRecord() public override async Task<DataRecord?> GetTestRecord()
{ {
await GetHeaderAndCsvFiles(); await GetHeaderAndCsvFiles();
var file = csvFiles.FirstOrDefault(); var file = csvFiles?.FirstOrDefault();
if (file != null) if (file != null)
{ {
var filePath = Path.Combine(_inputDir, file); var filePath = Path.Combine(_inputDir, file);
using (var input = File.OpenRead(filePath)) using var input = File.OpenRead(filePath);
{ {
using (var decopress = new DecompressionStream(input)) using var decopress = new DecompressionStream(input);
{ {
using (var reader = new StreamReader(decopress)) using var reader = new StreamReader(decopress);
{ {
var line = await reader.ReadLineAsync(); var line = await reader.ReadLineAsync();
var fields = ParseRow2(line, QuoteChar, Delimiter); var fields = ParseRow2(line, QuoteChar, Delimiter);
@@ -88,9 +86,5 @@ namespace ConsoleApp2.Services
} }
return null; return null;
} }
public void Dispose()
{
//_reader.Dispose();
}
} }
} }

View File

@@ -16,23 +16,18 @@ namespace ConsoleApp2.SimulationService
{ {
private readonly ILogger _logger; private readonly ILogger _logger;
private readonly IOptions<DataInputOptions> _dataInputOptions; private readonly IOptions<DataInputOptions> _dataInputOptions;
private readonly IOptions<InputTableOptions> _tableOptions;
private readonly DataRecordQueue _producerQueue;
private readonly ProcessContext _context; private readonly ProcessContext _context;
public SimulationInputService(ILogger<InputService> logger, public SimulationInputService(ILogger<InputService> logger,
IOptions<DataInputOptions> dataInputOptions, IOptions<DataInputOptions> dataInputOptions,
IOptions<InputTableOptions> tableOptions,
[FromKeyedServices(ProcessStep.Producer)] DataRecordQueue producerQueue,
ProcessContext context) ProcessContext context)
{ {
_logger = logger; _logger = logger;
_dataInputOptions = dataInputOptions; _dataInputOptions = dataInputOptions;
_tableOptions = tableOptions;
_producerQueue = producerQueue;
_context = context; _context = context;
} }
public async Task ExecuteAsync(CancellationToken cancellationToken) public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, ProcessContext context, CancellationToken cancellationToken)
{ {
var inputDir = _dataInputOptions.Value.InputDir; var inputDir = _dataInputOptions.Value.InputDir;
_logger.LogInformation("***** simulation input service start, working dir: {InputDir}, thread id: {ThreadId} *****", inputDir, Environment.CurrentManagedThreadId); _logger.LogInformation("***** simulation input service start, working dir: {InputDir}, thread id: {ThreadId} *****", inputDir, Environment.CurrentManagedThreadId);
@@ -42,95 +37,99 @@ namespace ConsoleApp2.SimulationService
_logger.LogInformation("No source files found in {InputDir}", inputDir); _logger.LogInformation("No source files found in {InputDir}", inputDir);
return; return;
} }
foreach (var tableName in _tableOptions.Value.TableInfoConfig.Keys) foreach (var tableName in tasksOptions.TableInfoConfig.Keys)
{ {
var dataCount = _tableOptions.Value.TableInfoConfig[tableName].SimulaRowCount;//当前表要生成的总数据量 _logger.LogInformation("Working table: {tableName}", tableName);
var companyTotallCount = 1000;//当前表每个公司生成的总数据量
var tempRecords = new List<DataRecord>();
var sk = DataHelper.shareKeys.First();
var companyID = DataHelper.companyIds.First();
var shareKeyInterval = 20000;//每个sharekey的数据量 var dataCount = tasksOptions.TableInfoConfig[tableName].SimulaRowCount;//当前表要生成的总数据量
var getShareKeyTimes = 0;//sharekey生成的次数,每生成一次改变sharekey的值 var companyTotallCount = 1000;//当前表每个公司生成的总数据量
var getCompanyIDTimes = 0;//公司生成的次数,每生成一次改变companyID的值 var tempRecords = new List<DataRecord>();
var shareKeyIntervalCount = 0; var sk = DataHelper.shareKeys.First();
var companyID = DataHelper.companyIds.First();
var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName); var shareKeyInterval = 20000;//每个sharekey的数据量
var testRecord =await source.GetTestRecord(); var getShareKeyTimes = 0;//sharekey生成的次数,每生成一次改变sharekey的值
for (long i = 1; i <= dataCount; i++) var getCompanyIDTimes = 0;//公司生成的次数,每生成一次改变companyID的值
var shareKeyIntervalCount = 0;
var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName);
if (source == null) throw new NullReferenceException($"create table source:{tableName} failed!");
var testRecord = await source.GetTestRecord();
if(testRecord == null) throw new NullReferenceException($"create testRecord failed, tableName:{tableName}");
for (long i = 1; i <= dataCount; i++)
{
shareKeyIntervalCount++;
if (shareKeyIntervalCount > shareKeyInterval)
{ {
shareKeyIntervalCount++; sk = DataHelper.GetShareKey(getShareKeyTimes);
if (shareKeyIntervalCount > shareKeyInterval) getShareKeyTimes++;
shareKeyIntervalCount = 0;
}
var fields = new string[testRecord.Fields.Length];
Array.Copy(testRecord.Fields, fields, testRecord.Fields.Length);
var record = new DataRecord(fields, testRecord.TableName, testRecord.Headers, companyID);
//更新record的ID、OrderNo,ShardKey值
if (record.Headers.Contains("ID"))
{
var index = Array.IndexOf(record.Headers, "ID");
if (index > -1)
{ {
sk = DataHelper.GetShareKey(getShareKeyTimes); record.Fields[index] = i.ToString();
getShareKeyTimes++;
shareKeyIntervalCount = 0;
}
var fields = new string[testRecord.Fields.Length];
Array.Copy(testRecord.Fields, fields, testRecord.Fields.Length);
var record = new DataRecord(fields, testRecord.TableName, testRecord.Headers, companyID);
//更新record的ID、OrderNo,ShardKey值
if (record.Headers.Contains("ID"))
{
var index = Array.IndexOf(record.Headers, "ID");
if (index > -1)
{
record.Fields[index] = i.ToString();
}
}
if (record.TableName == "order_box_block" && record.Headers.Contains("BoxID"))
{
var index = Array.IndexOf(record.Headers, "BoxID");
if (index > -1)
{
record.Fields[index] = i.ToString();
}
}
if ((record.TableName == "order_block_plan_item" || record.TableName == "order_package_item") && record.Headers.Contains("ItemID"))
{
var index = Array.IndexOf(record.Headers, "ItemID");
if (index > -1)
{
record.Fields[index] = i.ToString();
}
}
if (record.TableName == "order" && record.Headers.Contains("OrderNo"))
{
var index = Array.IndexOf(record.Headers, "OrderNo");
if (index > -1)
{
record.Fields[index] = i.ToString();
}
}
if (record.Headers.Contains("ShardKey"))
{
var index = Array.IndexOf(record.Headers, "ShardKey");
if (index > -1)
{
record.Fields[index] = sk.ToString();
}
}
tempRecords.Add(record);
if (tempRecords.Count >= companyTotallCount || i >= dataCount - 1)
{
foreach (var rc in tempRecords)
{
_context.AddInput();
_producerQueue.Enqueue(rc);
if (cancellationToken.IsCancellationRequested)
return;
}
tempRecords.Clear();
companyID = DataHelper.GetCompanyId(getCompanyIDTimes);
getCompanyIDTimes++;
} }
} }
_logger.LogInformation("table:'{tableName}' simulation input completed", tableName); if (record.TableName == "order_box_block" && record.Headers.Contains("BoxID"))
{
var index = Array.IndexOf(record.Headers, "BoxID");
if (index > -1)
{
record.Fields[index] = i.ToString();
}
}
if ((record.TableName == "order_block_plan_item" || record.TableName == "order_package_item") && record.Headers.Contains("ItemID"))
{
var index = Array.IndexOf(record.Headers, "ItemID");
if (index > -1)
{
record.Fields[index] = i.ToString();
}
}
if (record.TableName == "order" && record.Headers.Contains("OrderNo"))
{
var index = Array.IndexOf(record.Headers, "OrderNo");
if (index > -1)
{
record.Fields[index] = i.ToString();
}
}
if (record.Headers.Contains("ShardKey"))
{
var index = Array.IndexOf(record.Headers, "ShardKey");
if (index > -1)
{
record.Fields[index] = sk.ToString();
}
}
tempRecords.Add(record);
if (tempRecords.Count >= companyTotallCount || i >= dataCount - 1)
{
foreach (var rc in tempRecords)
{
_context.AddInput();
producerQueue.Enqueue(rc);
if (cancellationToken.IsCancellationRequested)
return;
}
tempRecords.Clear();
companyID = DataHelper.GetCompanyId(getCompanyIDTimes);
getCompanyIDTimes++;
}
}
_logger.LogInformation("table:'{tableName}' simulation input completed", tableName);
//} //}
//_logger.LogInformation("File '{File}' input completed", Path.GetFileName(sqlPath)); //_logger.LogInformation("File '{File}' input completed", Path.GetFileName(sqlPath));
} }
_context.CompleteInput(); context.CompleteInput();
_logger.LogInformation("***** Csv input service completed *****"); _logger.LogInformation("***** Csv input service completed *****");
} }
} }

View File

@@ -2,7 +2,7 @@
"CmdOptions": { "CmdOptions": {
"InputFileType": "CSV", "InputFileType": "CSV",
"InputDir": "D:/MyDumper-ZST", "InputDir": "D:/MyDumper-ZST",
"TaskCount": 4, "TaskCount": 6,
"FlushCount": 10000, "FlushCount": 10000,
"Isutf8mb4": true, "Isutf8mb4": true,
"OldestShardKey": 23000, "OldestShardKey": 23000,
@@ -12,6 +12,7 @@
"MySqlMaster": "Server=127.0.0.1;Port=33309;UserId=root;Password=123456;Database=cferp_test;" "MySqlMaster": "Server=127.0.0.1;Port=33309;UserId=root;Password=123456;Database=cferp_test;"
}, },
"RedisCacheOptions": { "RedisCacheOptions": {
"Configuration": "localhost:6379" "Configuration": "192.168.1.246:6380",
"InstanceName" : "mes-etl:"
} }
} }