Compare commits
21 Commits
dadb36b1c9
...
multTasks
Author | SHA1 | Date | |
---|---|---|---|
a169eecec5 | |||
fcc0de5b2a | |||
7235400aee | |||
7e3690a325 | |||
241f52e30f | |||
b0795f9a2c | |||
f167256082 | |||
e3f6ecbd91 | |||
45ad15a065 | |||
854111315b | |||
6ec782ec93 | |||
97e359468f | |||
1f9c9e0c13 | |||
629a4d2fb5 | |||
f4f7ff316b | |||
dda87349fd | |||
469e59628c | |||
70981fb985 | |||
08e0444055 | |||
e0df7ff4e9 | |||
1de3603afe |
@@ -25,6 +25,7 @@
|
|||||||
<PackageReference Include="Serilog" Version="3.1.2-dev-02097" />
|
<PackageReference Include="Serilog" Version="3.1.2-dev-02097" />
|
||||||
<PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" />
|
<PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" />
|
||||||
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
|
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
|
||||||
|
<PackageReference Include="Serilog.Sinks.File" Version="5.0.1-dev-00972" />
|
||||||
<PackageReference Include="ServiceStack.Text" Version="8.0.0" />
|
<PackageReference Include="ServiceStack.Text" Version="8.0.0" />
|
||||||
<PackageReference Include="ZstdSharp.Port" Version="0.7.4" />
|
<PackageReference Include="ZstdSharp.Port" Version="0.7.4" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
@@ -65,6 +65,8 @@ public class DataRecord
|
|||||||
|
|
||||||
public bool SetField(string columnName, string value) => SetField(this, columnName,value);
|
public bool SetField(string columnName, string value) => SetField(this, columnName,value);
|
||||||
|
|
||||||
|
public string GetCacheKey(string columnName) => GetCacheKey(this, columnName);
|
||||||
|
|
||||||
public bool SetField( DataRecord record,string columnName,string value)
|
public bool SetField( DataRecord record,string columnName,string value)
|
||||||
{
|
{
|
||||||
if (record.Headers is null)
|
if (record.Headers is null)
|
||||||
@@ -75,4 +77,14 @@ public class DataRecord
|
|||||||
record.Fields[idx] = value;
|
record.Fields[idx] = value;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
public string GetCacheKey(DataRecord record, string columnName)
|
||||||
|
{
|
||||||
|
if (TryGetField(record, columnName, out var value))
|
||||||
|
{
|
||||||
|
return $"{TableName}_{value}";
|
||||||
|
}else
|
||||||
|
throw new IndexOutOfRangeException($"Column name:{columnName} not found in this record.");
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
@@ -12,11 +12,9 @@ public static partial class DumpDataHelper
|
|||||||
private static partial Regex MatchBrackets();
|
private static partial Regex MatchBrackets();
|
||||||
|
|
||||||
|
|
||||||
public static async Task<string[]> GetCsvHeadersFromSqlFileAsync(string txt)
|
public static string[] GetCsvHeadersFromSqlFileAsync(string txt)
|
||||||
{
|
{
|
||||||
//var txt = await File.ReadAllTextAsync(filePath);
|
|
||||||
var match = MatchBrackets().Match(txt);
|
var match = MatchBrackets().Match(txt);
|
||||||
|
|
||||||
return ParseHeader(match.ValueSpan);
|
return ParseHeader(match.ValueSpan);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -60,9 +58,8 @@ public static partial class DumpDataHelper
|
|||||||
return filePath[(firstDotIdx+1)..secondDotIdx].ToString();
|
return filePath[(firstDotIdx+1)..secondDotIdx].ToString();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static async Task<string[]> GetCsvFileNamesFromSqlFileAsync(string txt,Regex regex)
|
public static string[] GetCsvFileNamesFromSqlFileAsync(string txt,Regex regex)
|
||||||
{
|
{
|
||||||
//var txt = await File.ReadAllTextAsync(filePath);
|
|
||||||
var matches = regex.Matches(txt);
|
var matches = regex.Matches(txt);
|
||||||
return matches.Select(match => match.ValueSpan[1..^1].ToString()).ToArray();
|
return matches.Select(match => match.ValueSpan[1..^1].ToString()).ToArray();
|
||||||
}
|
}
|
||||||
|
@@ -1,6 +1,9 @@
|
|||||||
namespace ConsoleApp2.HostedServices.Abstractions;
|
using ConsoleApp2.Options;
|
||||||
|
using ConsoleApp2.Services;
|
||||||
|
|
||||||
|
namespace ConsoleApp2.HostedServices.Abstractions;
|
||||||
|
|
||||||
public interface IInputService
|
public interface IInputService
|
||||||
{
|
{
|
||||||
public Task ExecuteAsync(CancellationToken cancellationToken);
|
public Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, ProcessContext context, CancellationToken cancellationToken);
|
||||||
}
|
}
|
@@ -1,6 +1,9 @@
|
|||||||
namespace ConsoleApp2.HostedServices.Abstractions;
|
using ConsoleApp2.Options;
|
||||||
|
using ConsoleApp2.Services;
|
||||||
|
|
||||||
|
namespace ConsoleApp2.HostedServices.Abstractions;
|
||||||
|
|
||||||
public interface IOutputService
|
public interface IOutputService
|
||||||
{
|
{
|
||||||
public Task ExecuteAsync(CancellationToken cancellationToken);
|
public void ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken);
|
||||||
}
|
}
|
@@ -1,6 +1,9 @@
|
|||||||
namespace ConsoleApp2.HostedServices.Abstractions;
|
using ConsoleApp2.Options;
|
||||||
|
using ConsoleApp2.Services;
|
||||||
|
|
||||||
|
namespace ConsoleApp2.HostedServices.Abstractions;
|
||||||
|
|
||||||
public interface ITransformService
|
public interface ITransformService
|
||||||
{
|
{
|
||||||
public Task ExecuteAsync(CancellationToken cancellationToken);
|
public Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken);
|
||||||
}
|
}
|
@@ -16,24 +16,18 @@ public class InputService : IInputService
|
|||||||
{
|
{
|
||||||
private readonly ILogger _logger;
|
private readonly ILogger _logger;
|
||||||
private readonly IOptions<DataInputOptions> _dataInputOptions;
|
private readonly IOptions<DataInputOptions> _dataInputOptions;
|
||||||
private readonly IOptions<InputTableOptions> _tableOptions;
|
|
||||||
private readonly DataRecordQueue _producerQueue;
|
|
||||||
private readonly ProcessContext _context;
|
private readonly ProcessContext _context;
|
||||||
|
|
||||||
public InputService(ILogger<InputService> logger,
|
public InputService(ILogger<InputService> logger,
|
||||||
IOptions<DataInputOptions> dataInputOptions,
|
IOptions<DataInputOptions> dataInputOptions,
|
||||||
IOptions<InputTableOptions> tableOptions,
|
|
||||||
[FromKeyedServices(ProcessStep.Producer)] DataRecordQueue producerQueue,
|
|
||||||
ProcessContext context)
|
ProcessContext context)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_dataInputOptions = dataInputOptions;
|
_dataInputOptions = dataInputOptions;
|
||||||
_tableOptions = tableOptions;
|
|
||||||
_producerQueue = producerQueue;
|
|
||||||
_context = context;
|
_context = context;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, ProcessContext context, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
var inputDir = _dataInputOptions.Value.InputDir;
|
var inputDir = _dataInputOptions.Value.InputDir;
|
||||||
_logger.LogInformation("***** Csv input service start, working dir: {InputDir}, thread id: {ThreadId} *****", inputDir, Environment.CurrentManagedThreadId);
|
_logger.LogInformation("***** Csv input service start, working dir: {InputDir}, thread id: {ThreadId} *****", inputDir, Environment.CurrentManagedThreadId);
|
||||||
@@ -44,18 +38,21 @@ public class InputService : IInputService
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
var count = 0;
|
var count = 0;
|
||||||
foreach (var tableName in _tableOptions.Value.TableInfoConfig.Keys)
|
foreach (var tableName in tasksOptions.TableInfoConfig.Keys)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("Working table: {tableName}", tableName);
|
_logger.LogInformation("Working table: {tableName}", tableName);
|
||||||
var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName);
|
var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName);
|
||||||
|
if (source != null)
|
||||||
|
{
|
||||||
await source.DoEnqueue((record) =>
|
await source.DoEnqueue((record) =>
|
||||||
{
|
{
|
||||||
_context.AddInput();
|
_context.AddInput();
|
||||||
_producerQueue.Enqueue(record);
|
producerQueue.Enqueue(record);
|
||||||
count++;
|
count++;
|
||||||
|
|
||||||
});
|
});
|
||||||
if (_context.GetExceptions().Count > 0)
|
}
|
||||||
|
if (!_context.GetExceptions().IsEmpty)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("***** Csv input service is canceled *****");
|
_logger.LogInformation("***** Csv input service is canceled *****");
|
||||||
return;
|
return;
|
||||||
@@ -63,7 +60,7 @@ public class InputService : IInputService
|
|||||||
_logger.LogInformation("table:'{tableName}' input completed", tableName);
|
_logger.LogInformation("table:'{tableName}' input completed", tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
_context.CompleteInput();
|
context.CompleteInput();
|
||||||
_logger.LogInformation("***** Csv input service completed *****");
|
_logger.LogInformation("***** Csv input service completed *****");
|
||||||
}
|
}
|
||||||
}
|
}
|
@@ -1,19 +1,19 @@
|
|||||||
using ConsoleApp2.HostedServices.Abstractions;
|
using ConsoleApp2.HostedServices.Abstractions;
|
||||||
|
using ConsoleApp2.Options;
|
||||||
using ConsoleApp2.Services;
|
using ConsoleApp2.Services;
|
||||||
using Microsoft.Extensions.Hosting;
|
using Microsoft.Extensions.Hosting;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace ConsoleApp2.HostedServices;
|
namespace ConsoleApp2.HostedServices;
|
||||||
|
|
||||||
public class MainHostedService : BackgroundService
|
public class MainHostedService : IHostedService
|
||||||
{
|
{
|
||||||
private readonly ILogger _logger;
|
private readonly ILogger _logger;
|
||||||
private readonly IInputService _input;
|
private readonly IInputService _input;
|
||||||
private readonly ITransformService _transform;
|
private readonly ITransformService _transform;
|
||||||
private readonly IOutputService _output;
|
private readonly IOutputService _output;
|
||||||
private readonly ProcessContext _context;
|
private readonly ProcessContext _context;
|
||||||
|
private readonly Timer? _bigTableTimer=null;
|
||||||
|
private readonly Timer? _smallTableTimer=null;
|
||||||
public MainHostedService(ILogger<MainHostedService> logger, IInputService input, ITransformService transform, IOutputService output, ProcessContext context)
|
public MainHostedService(ILogger<MainHostedService> logger, IInputService input, ITransformService transform, IOutputService output, ProcessContext context)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
@@ -23,15 +23,15 @@ public class MainHostedService : BackgroundService
|
|||||||
_context = context;
|
_context = context;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
public Task StartAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
|
var taskFun = (TasksOptions taskOp, DataRecordQueue producerQueue, DataRecordQueue consumerQueue, ProcessContext context,Timer? timer) =>
|
||||||
|
{
|
||||||
var inputTask = Task.Factory.StartNew(async () =>
|
Task.Factory.StartNew(async () =>
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await _input.ExecuteAsync(stoppingToken);
|
await _input.ExecuteAsync(taskOp, producerQueue, context, cancellationToken);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@@ -40,11 +40,11 @@ public class MainHostedService : BackgroundService
|
|||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
var transformTask = Task.Factory.StartNew(async () =>
|
Task.Factory.StartNew(async () =>
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await _transform.ExecuteAsync(stoppingToken);
|
await _transform.ExecuteAsync(taskOp, producerQueue, consumerQueue, context, cancellationToken);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@@ -53,12 +53,15 @@ public class MainHostedService : BackgroundService
|
|||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
|
Task.Factory.StartNew(() =>
|
||||||
var outputTask = Task.Factory.StartNew(async () =>
|
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await _output.ExecuteAsync(stoppingToken);
|
timer = new Timer((object? state) =>
|
||||||
|
{
|
||||||
|
_output.ExecuteAsync(taskOp, consumerQueue, context, cancellationToken);
|
||||||
|
},null, TimeSpan.Zero,TimeSpan.FromSeconds(0.5));
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@@ -67,7 +70,56 @@ public class MainHostedService : BackgroundService
|
|||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
|
};
|
||||||
|
|
||||||
// await Task.Run(async () => await _output.ExecuteAsync(stoppingToken), stoppingToken);
|
var bigTablesDic = new Dictionary<string, TableInfo>
|
||||||
|
{
|
||||||
|
{"order",new TableInfo{SimulaRowCount=5019216 }},
|
||||||
|
{"order_block_plan",new TableInfo{SimulaRowCount=2725553 }},//CreateTime < 202301的删除
|
||||||
|
{"order_block_plan_result",new TableInfo{SimulaRowCount=1174096 }},
|
||||||
|
{"order_box_block",new TableInfo{SimulaRowCount=29755672 }},
|
||||||
|
{"order_item",new TableInfo{SimulaRowCount=1345520079 }},
|
||||||
|
{"simple_plan_order",new TableInfo{SimulaRowCount=351470 }},//CreateTime < 202301的删除
|
||||||
|
};
|
||||||
|
var bigTableContext = new ProcessContext();
|
||||||
|
var bigTableOptions = new TasksOptions { TableInfoConfig = bigTablesDic, OutPutOptions = new OutPutOptions { FlushCount = 20000, OutPutTaskCount = 2 } };
|
||||||
|
taskFun(bigTableOptions, new DataRecordQueue(), new DataRecordQueue(), bigTableContext,_bigTableTimer);
|
||||||
|
var smallTablesDic = new Dictionary<string, TableInfo>
|
||||||
|
{
|
||||||
|
{"machine",new TableInfo{SimulaRowCount=14655 }},
|
||||||
|
{"order_data_block",new TableInfo{SimulaRowCount=731800334 }},
|
||||||
|
{"order_data_goods",new TableInfo{SimulaRowCount=25803671 }},
|
||||||
|
{"order_data_parts",new TableInfo{SimulaRowCount=468517543 }},
|
||||||
|
{"order_module",new TableInfo{SimulaRowCount=103325385 }},
|
||||||
|
{"order_module_extra",new TableInfo{SimulaRowCount=54361321 }},
|
||||||
|
{"order_module_item",new TableInfo{SimulaRowCount=69173339 }},
|
||||||
|
{"order_package",new TableInfo{SimulaRowCount=16196195 }},
|
||||||
|
{"order_process",new TableInfo{SimulaRowCount=3892685 }},//orderNo < 202301的
|
||||||
|
{"order_process_step",new TableInfo{SimulaRowCount=8050349 }},//orderNo < 202301的删除
|
||||||
|
{"order_process_step_item",new TableInfo{SimulaRowCount=14538058 }},//orderNo < 202301的删除
|
||||||
|
{"order_scrap_board",new TableInfo{SimulaRowCount=123998 }},
|
||||||
|
{"process_group",new TableInfo{SimulaRowCount=1253 }},
|
||||||
|
{"process_info",new TableInfo{SimulaRowCount=7839 }},
|
||||||
|
{"process_item_exp",new TableInfo{SimulaRowCount=28 }},
|
||||||
|
{"process_schdule_capacity",new TableInfo{SimulaRowCount=39736 }},
|
||||||
|
{"process_step_efficiency",new TableInfo{SimulaRowCount=8 }},
|
||||||
|
{"report_template",new TableInfo{SimulaRowCount=7337 }},
|
||||||
|
{"simple_package",new TableInfo{SimulaRowCount=130436 }},//orderNo < 202301的删除
|
||||||
|
{"sys_config",new TableInfo{SimulaRowCount=2296 }},
|
||||||
|
{"work_calendar",new TableInfo{SimulaRowCount=11 }},
|
||||||
|
{"work_shift",new TableInfo{SimulaRowCount=59 }},
|
||||||
|
{"work_time",new TableInfo{SimulaRowCount=62 }},
|
||||||
|
};
|
||||||
|
var smallTableContext = new ProcessContext();
|
||||||
|
taskFun(new TasksOptions { TableInfoConfig = smallTablesDic, OutPutOptions = new OutPutOptions { FlushCount = 20000, OutPutTaskCount = 4 } },
|
||||||
|
new DataRecordQueue(), new DataRecordQueue(), smallTableContext,_smallTableTimer);
|
||||||
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Task StopAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
@@ -1,13 +1,9 @@
|
|||||||
using ConsoleApp2.Const;
|
|
||||||
using ConsoleApp2.HostedServices.Abstractions;
|
using ConsoleApp2.HostedServices.Abstractions;
|
||||||
using ConsoleApp2.Options;
|
using ConsoleApp2.Options;
|
||||||
using ConsoleApp2.Services;
|
using ConsoleApp2.Services;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using MySqlConnector;
|
|
||||||
using System.Threading;
|
|
||||||
|
|
||||||
namespace ConsoleApp2.HostedServices;
|
namespace ConsoleApp2.HostedServices;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -16,71 +12,61 @@ namespace ConsoleApp2.HostedServices;
|
|||||||
public class OutputService : IOutputService
|
public class OutputService : IOutputService
|
||||||
{
|
{
|
||||||
private readonly ILogger _logger;
|
private readonly ILogger _logger;
|
||||||
private readonly DataRecordQueue _consumerQueue;
|
private readonly IOptions<DatabaseOutputOptions> _outputOptions;
|
||||||
private readonly IOptions<DataTransformOptions> _transOptions;
|
private readonly IOptions<DataTransformOptions> _transformOptions;
|
||||||
private readonly IOptions<DatabaseOutputOptions> _options;
|
|
||||||
private readonly ProcessContext _context;
|
private readonly ProcessContext _context;
|
||||||
private readonly TaskManager _taskManager;
|
private readonly TaskManager _taskManager;
|
||||||
|
private readonly ErrorRecorder _errorRecorder;
|
||||||
|
|
||||||
public OutputService(ILogger<OutputService> logger,
|
public OutputService(ILogger<OutputService> logger,
|
||||||
[FromKeyedServices(ProcessStep.Consumer)] DataRecordQueue consumerQueue,
|
IOptions<DatabaseOutputOptions> outputOptions,
|
||||||
IOptions<DatabaseOutputOptions> options,
|
|
||||||
IOptions<DataTransformOptions> transOptions,
|
|
||||||
ProcessContext context,
|
ProcessContext context,
|
||||||
TaskManager taskManager)
|
TaskManager taskManager,
|
||||||
|
IOptions<DataTransformOptions> transformOptions,
|
||||||
|
ErrorRecorder errorRecorder)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_consumerQueue = consumerQueue;
|
_outputOptions = outputOptions;
|
||||||
_transOptions = transOptions;
|
|
||||||
_options = options;
|
|
||||||
_context = context;
|
_context = context;
|
||||||
_taskManager = taskManager;
|
_taskManager = taskManager;
|
||||||
|
_transformOptions = transformOptions;
|
||||||
|
_errorRecorder = errorRecorder;
|
||||||
}
|
}
|
||||||
|
private int _runingTaskCount;
|
||||||
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
public int RuningTaskCount
|
||||||
{
|
{
|
||||||
_logger.LogInformation("***** Mysql output service started *****");
|
get => _runingTaskCount;
|
||||||
var count = 0;
|
}
|
||||||
_taskManager.CreateTasks(async () =>
|
public void DoTask() => Interlocked.Increment(ref _runingTaskCount);
|
||||||
|
public void FinishTask() => Interlocked.Decrement(ref _runingTaskCount);
|
||||||
|
public void ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
|
if (context.IsTransformCompleted == false && consumerQueue.Count < tasksOptions.OutPutOptions.FlushCount) return;
|
||||||
|
if (RuningTaskCount >= tasksOptions.OutPutOptions.OutPutTaskCount ) return;
|
||||||
var records = new List<DataRecord>();
|
var records = new List<DataRecord>();
|
||||||
while (!_context.IsTransformCompleted || _consumerQueue.Count > 0)
|
|
||||||
|
for (int i = 0; i < tasksOptions.OutPutOptions.FlushCount; i++)
|
||||||
{
|
{
|
||||||
if (!_consumerQueue.TryDequeue(out var record)) continue;
|
if (consumerQueue.TryDequeue(out var record)) records.Add(record);
|
||||||
records.Add(record);
|
else break;
|
||||||
count++;
|
|
||||||
//_logger.LogInformation(@"*****OutputCount: {count} *****",count);
|
|
||||||
if (records.Count >= _options.Value.FlushCount)
|
|
||||||
{
|
|
||||||
await FlushAsync(records);
|
|
||||||
records.Clear();
|
|
||||||
}
|
|
||||||
if (_context.GetExceptions().Count>0)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("***** Csv output thread is canceled *****");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (records.Count > 0)
|
if (records.Count > 0)
|
||||||
{
|
{
|
||||||
|
ThreadPool.QueueUserWorkItem(async (queueState) =>
|
||||||
|
{
|
||||||
|
DoTask();
|
||||||
await FlushAsync(records);
|
await FlushAsync(records);
|
||||||
records.Clear();
|
FinishTask();
|
||||||
_logger.LogInformation("***** Mysql output thread completed *****");
|
});
|
||||||
}
|
}
|
||||||
}, _options.Value.TaskCount);
|
|
||||||
|
|
||||||
await _taskManager.WaitAll();
|
|
||||||
//_context.CompleteOutput();
|
|
||||||
_logger.LogInformation(@"***** Mysql output service completed *****");
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task FlushAsync(IEnumerable<DataRecord> records)
|
private async Task FlushAsync(IEnumerable<DataRecord> records)
|
||||||
{
|
{
|
||||||
var count = 0;
|
var count = 0;
|
||||||
await using var output = new MySqlDestination(
|
await using var output = new MySqlDestination(
|
||||||
_options.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"),
|
_outputOptions.Value.ConnectionString ?? throw new InvalidOperationException("Connection string is required"),
|
||||||
_logger, _context,true);
|
_logger, _context, _transformOptions, _errorRecorder);
|
||||||
//if (records == null || records.Count() == 0) return;
|
//if (records == null || records.Count() == 0) return;
|
||||||
//var dbName = $"cferp_test_1";
|
//var dbName = $"cferp_test_1";
|
||||||
//if (records != null && records.Count() > 0)
|
//if (records != null && records.Count() > 0)
|
||||||
@@ -102,7 +88,7 @@ public class OutputService : IOutputService
|
|||||||
await output.WriteRecordAsync(record);
|
await output.WriteRecordAsync(record);
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
await output.FlushAsync(_options.Value.MaxAllowedPacket, _transOptions);
|
await output.FlushAsync(_outputOptions.Value.MaxAllowedPacket);
|
||||||
_context.AddOutput(count);
|
_context.AddOutput(count);
|
||||||
}
|
}
|
||||||
}
|
}
|
@@ -1,83 +0,0 @@
|
|||||||
using ConsoleApp2.HostedServices.Abstractions;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using ConsoleApp2.Const;
|
|
||||||
using ConsoleApp2.Options;
|
|
||||||
using ConsoleApp2.Services;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using Microsoft.Extensions.Options;
|
|
||||||
using System.Reflection.PortableExecutable;
|
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using ConsoleApp2.SimulationService;
|
|
||||||
|
|
||||||
namespace ConsoleApp2.HostedServices
|
|
||||||
{
|
|
||||||
public class TestInputService : IInputService
|
|
||||||
{
|
|
||||||
private readonly ILogger _logger;
|
|
||||||
private readonly IOptions<CsvOptions> _csvOptions;
|
|
||||||
private readonly DataRecordQueue _producerQueue;
|
|
||||||
private readonly ProcessContext _context;
|
|
||||||
public TestInputService(ILogger<TestInputService> logger,
|
|
||||||
IOptions<CsvOptions> csvOptions,
|
|
||||||
[FromKeyedServices(ProcessStep.Producer)] DataRecordQueue producerQueue,
|
|
||||||
ProcessContext context)
|
|
||||||
{
|
|
||||||
_logger = logger;
|
|
||||||
_csvOptions = csvOptions;
|
|
||||||
_producerQueue = producerQueue;
|
|
||||||
_context = context;
|
|
||||||
}
|
|
||||||
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
var tableName = "order_item";
|
|
||||||
var headers = new string[] { "ID","OrderNo","ItemNo","ItemType","RoomID","BoxID","DataID","PlanID","PackageID","Num","CompanyID","ShardKey" };
|
|
||||||
var dataCount = 1200000000L;
|
|
||||||
var tempCount = 80000;
|
|
||||||
var tempRecords=new List<DataRecord>();
|
|
||||||
var comanyID = 1;
|
|
||||||
short[] shareKeys = { 23040, 23070, 23100, 24000, 24040, 24070, 24100, 25000, 25040, 25070, 25100 };
|
|
||||||
int[] companyIds = { 1, 2, 3, 4 };
|
|
||||||
var sk = shareKeys.First();
|
|
||||||
var companyID = companyIds.First();
|
|
||||||
|
|
||||||
var shareKeyInterval = 20000;
|
|
||||||
var getShareKeyTimes = 0;
|
|
||||||
var getCompanyIDTimes = 0;
|
|
||||||
var shareKeyIntervalCount = 0;
|
|
||||||
for (long i = 1; i <= dataCount; i++)
|
|
||||||
{
|
|
||||||
shareKeyIntervalCount++;
|
|
||||||
if (shareKeyIntervalCount > shareKeyInterval) {
|
|
||||||
sk=DataHelper.GetShareKey(getShareKeyTimes);
|
|
||||||
getShareKeyTimes++;
|
|
||||||
shareKeyIntervalCount = 0;
|
|
||||||
}
|
|
||||||
var fields = new string[] { i.ToString(), "20220104020855", (220105981029 + i).ToString(), "1", "482278", "482279", "3768774", "0", "0", "1", companyID.ToString(), sk.ToString() };
|
|
||||||
var record = new DataRecord(fields, tableName, headers, comanyID);
|
|
||||||
tempRecords.Add(record);
|
|
||||||
if (tempRecords.Count >= tempCount)
|
|
||||||
{
|
|
||||||
foreach (var rc in tempRecords)
|
|
||||||
{
|
|
||||||
_context.AddInput();
|
|
||||||
_producerQueue.Enqueue(rc);
|
|
||||||
if (cancellationToken.IsCancellationRequested)
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
tempRecords.Clear();
|
|
||||||
companyID = DataHelper. GetCompanyId(getCompanyIDTimes);
|
|
||||||
getCompanyIDTimes++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_context.CompleteInput();
|
|
||||||
_logger.LogInformation("***** Csv input service completed *****");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
@@ -2,6 +2,7 @@
|
|||||||
using ConsoleApp2.HostedServices.Abstractions;
|
using ConsoleApp2.HostedServices.Abstractions;
|
||||||
using ConsoleApp2.Options;
|
using ConsoleApp2.Options;
|
||||||
using ConsoleApp2.Services;
|
using ConsoleApp2.Services;
|
||||||
|
using Microsoft.Extensions.Caching.Distributed;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
@@ -16,81 +17,58 @@ public class TransformService : ITransformService
|
|||||||
{
|
{
|
||||||
private readonly ILogger _logger;
|
private readonly ILogger _logger;
|
||||||
private readonly IOptions<DataTransformOptions> _options;
|
private readonly IOptions<DataTransformOptions> _options;
|
||||||
private readonly DataRecordQueue _producerQueue;
|
|
||||||
private readonly DataRecordQueue _consumerQueue;
|
|
||||||
private readonly ProcessContext _context;
|
private readonly ProcessContext _context;
|
||||||
private readonly IDatabase _db;
|
private readonly IDistributedCache _cache;
|
||||||
|
|
||||||
|
|
||||||
public TransformService(ILogger<TransformService> logger,
|
public TransformService(ILogger<TransformService> logger,
|
||||||
IOptions<DataTransformOptions> options,
|
IOptions<DataTransformOptions> options,
|
||||||
[FromKeyedServices(ProcessStep.Producer)]DataRecordQueue producerQueue,
|
ProcessContext context,
|
||||||
[FromKeyedServices(ProcessStep.Consumer)]DataRecordQueue consumerQueue,
|
IDistributedCache cache)
|
||||||
ProcessContext context, IDatabase db)
|
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_options = options;
|
_options = options;
|
||||||
_producerQueue = producerQueue;
|
|
||||||
_consumerQueue = consumerQueue;
|
|
||||||
_context = context;
|
_context = context;
|
||||||
_db = db;
|
_cache = cache;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
|
_logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
|
||||||
while ((!_context.IsInputCompleted || _producerQueue.Count > 0))
|
while ((!context.IsInputCompleted || producerQueue.Count > 0))
|
||||||
{
|
{
|
||||||
if (_context.GetExceptions().Count > 0)
|
if (_context.GetExceptions().Count > 0)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("***** Csv transform service is canceled *****");
|
_logger.LogInformation("***** Csv transform service is canceled *****");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// var dbOptions = _options.Value.DatabaseFilter(record);
|
if (!producerQueue.TryDequeue(out var record)) continue;
|
||||||
if (!_producerQueue.TryDequeue(out var record)) continue;
|
|
||||||
|
|
||||||
for (var i = 0; i < record.Fields.Length; i++)
|
|
||||||
{
|
|
||||||
var field = record[i];
|
|
||||||
|
|
||||||
if (field == "\\N")
|
|
||||||
{
|
|
||||||
field = "NULL";
|
|
||||||
goto Escape;
|
|
||||||
}
|
|
||||||
// else if(DumpDataHelper.CheckHexField(field))
|
|
||||||
// field = $"0x{field}";
|
|
||||||
|
|
||||||
switch (_options.Value.GetColumnType(record.TableName, record.Headers[i]))
|
|
||||||
{
|
|
||||||
case ColumnType.Text:
|
|
||||||
|
|
||||||
field = string.IsNullOrEmpty(field) ? "''" : _options.Value.TransformBinary?.Invoke(field) ?? field; ;
|
|
||||||
break;
|
|
||||||
case ColumnType.Blob:
|
|
||||||
//field = string.IsNullOrEmpty(field) ? "NULL" : $"0x{field}";
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
Escape:
|
|
||||||
record[i] = field;
|
|
||||||
}
|
|
||||||
//过滤不要的record
|
//过滤不要的record
|
||||||
if ( await _options.Value.RecordFilter?.Invoke(record,_db) == false) continue;
|
|
||||||
|
if (_options.Value.RecordFilter != null)
|
||||||
|
{
|
||||||
|
var result = await _options.Value.RecordFilter.Invoke(record, _cache);
|
||||||
|
if (result == false) continue;
|
||||||
|
}
|
||||||
record.Database = _options.Value.DatabaseFilter?.Invoke(record);
|
record.Database = _options.Value.DatabaseFilter?.Invoke(record);
|
||||||
//修改record
|
//修改record
|
||||||
_options.Value.RecordModify?.Invoke(record);
|
_options.Value.RecordModify?.Invoke(record);
|
||||||
//缓存record
|
//缓存record
|
||||||
_options.Value.RecordCache?.Invoke(record, _db);
|
if (_options.Value.RecordCache != null)
|
||||||
//替换record
|
|
||||||
var replaceRecord =await _options.Value.RecordReplace?.Invoke(record, _db);
|
|
||||||
if (replaceRecord != null)
|
|
||||||
{
|
{
|
||||||
record = replaceRecord;
|
await _options.Value.RecordCache.Invoke(record, _cache);
|
||||||
}
|
}
|
||||||
_consumerQueue.Enqueue(record);
|
//替换record
|
||||||
|
if (_options.Value.RecordReplace != null)
|
||||||
|
{
|
||||||
|
var result = await _options.Value.RecordReplace.Invoke(record, _cache);
|
||||||
|
if (result != null)
|
||||||
|
{
|
||||||
|
record = result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
consumerQueue.Enqueue(record);
|
||||||
_context.AddTransform();
|
_context.AddTransform();
|
||||||
//数据增加
|
//数据增加
|
||||||
var addRecords = _options.Value.RecordAdd?.Invoke(record);
|
var addRecords = _options.Value.RecordAdd?.Invoke(record);
|
||||||
@@ -98,13 +76,13 @@ public class TransformService : ITransformService
|
|||||||
{
|
{
|
||||||
foreach (var rc in addRecords)
|
foreach (var rc in addRecords)
|
||||||
{
|
{
|
||||||
_consumerQueue.Enqueue(rc);
|
consumerQueue.Enqueue(rc);
|
||||||
_context.AddTransform();
|
_context.AddTransform();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
context.CompleteTransform();
|
||||||
|
|
||||||
_context.CompleteTransform();
|
|
||||||
_logger.LogInformation("***** Data transformation service completed *****");
|
_logger.LogInformation("***** Data transformation service completed *****");
|
||||||
}
|
}
|
||||||
}
|
}
|
@@ -1,5 +1,6 @@
|
|||||||
using ConsoleApp2.Const;
|
using ConsoleApp2.Const;
|
||||||
using ConsoleApp2.HostedServices.Abstractions;
|
using ConsoleApp2.HostedServices.Abstractions;
|
||||||
|
using ConsoleApp2.Options;
|
||||||
using ConsoleApp2.Services;
|
using ConsoleApp2.Services;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Hosting;
|
using Microsoft.Extensions.Hosting;
|
||||||
@@ -22,7 +23,7 @@ public class VoidOutputService : IOutputService
|
|||||||
_logger = logger;
|
_logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task ExecuteAsync(CancellationToken stoppingToken)
|
public void ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("***** Void output service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
|
_logger.LogInformation("***** Void output service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
|
||||||
while (!_context.IsTransformCompleted || _consumerQueue.Count > 0)
|
while (!_context.IsTransformCompleted || _consumerQueue.Count > 0)
|
||||||
@@ -33,6 +34,5 @@ public class VoidOutputService : IOutputService
|
|||||||
|
|
||||||
_context.CompleteOutput();
|
_context.CompleteOutput();
|
||||||
_logger.LogInformation("***** Void output service completed *****");
|
_logger.LogInformation("***** Void output service completed *****");
|
||||||
return Task.CompletedTask;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
@@ -7,11 +7,8 @@ namespace ConsoleApp2.Options
|
|||||||
{
|
{
|
||||||
|
|
||||||
public string InputDir { get; set; } = "./MyDumper";
|
public string InputDir { get; set; } = "./MyDumper";
|
||||||
|
public bool IsMock { get; set; } = false;
|
||||||
public int TaskCount { get; set; } = 16;
|
public string NoFilterTables { get; set; }="";//不需要过滤的表列表
|
||||||
|
|
||||||
public int FlushCount { get; set; } = 20000;
|
|
||||||
|
|
||||||
public bool Isutf8mb4 { get; set; } = true;
|
public bool Isutf8mb4 { get; set; } = true;
|
||||||
|
|
||||||
public short OldestShardKey { get; set; } = 23010;
|
public short OldestShardKey { get; set; } = 23010;
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
using StackExchange.Redis;
|
using Microsoft.Extensions.Caching.Distributed;
|
||||||
|
using StackExchange.Redis;
|
||||||
|
|
||||||
namespace ConsoleApp2.Options;
|
namespace ConsoleApp2.Options;
|
||||||
|
|
||||||
@@ -6,20 +7,22 @@ public enum ColumnType
|
|||||||
{
|
{
|
||||||
Blob,
|
Blob,
|
||||||
Text,
|
Text,
|
||||||
|
Json,
|
||||||
UnDefine,
|
UnDefine,
|
||||||
}
|
}
|
||||||
|
|
||||||
public class DataTransformOptions
|
public class DataTransformOptions
|
||||||
{
|
{
|
||||||
|
|
||||||
public Func<DataRecord, string>? DatabaseFilter { get; set; }
|
public Func<DataRecord, string>? DatabaseFilter { get; set; }
|
||||||
|
|
||||||
public Func<string, string>? TransformBinary { get; set; }//Binary转字符串方法
|
public Func<string, string>? TransformBinary { get; set; }//Binary转字符串方法
|
||||||
|
|
||||||
public Func<DataRecord, IDatabase, Task<bool>>? RecordFilter { get; set; }//数据过滤方法
|
public Func<DataRecord, IDistributedCache, Task<bool>>? RecordFilter { get; set; }//数据过滤方法
|
||||||
public Action<DataRecord>? RecordModify { get; set; }//数据修改
|
public Action<DataRecord>? RecordModify { get; set; }//数据修改
|
||||||
public Func<DataRecord, IDatabase, Task<DataRecord?>>? RecordReplace { get; set; }//数据替换
|
public Func<DataRecord, IDistributedCache, Task<DataRecord?>>? RecordReplace { get; set; }//数据替换
|
||||||
public Func<DataRecord, IList<DataRecord>?>? RecordAdd { get; set; }//数据替换
|
public Func<DataRecord, IList<DataRecord>?>? RecordAdd { get; set; }//数据替换
|
||||||
public Action<DataRecord, IDatabase>? RecordCache { get; set; }//数据缓存
|
public Func<DataRecord, IDistributedCache, Task>? RecordCache { get; set; }//数据缓存
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 配置导入数据的特殊列
|
/// 配置导入数据的特殊列
|
||||||
|
@@ -6,14 +6,5 @@ public class DatabaseOutputOptions
|
|||||||
/// 数据库连接字符串
|
/// 数据库连接字符串
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public string? ConnectionString { get; set; }
|
public string? ConnectionString { get; set; }
|
||||||
/// <summary>
|
|
||||||
/// 输出服务的任务(Task)数
|
|
||||||
/// </summary>
|
|
||||||
public int TaskCount { get; set; }
|
|
||||||
/// <summary>
|
|
||||||
/// 每个任务每次提交到数据库的记录数量(每N条构建一次SQL语句)
|
|
||||||
/// </summary>
|
|
||||||
public int FlushCount { get; set; }
|
|
||||||
|
|
||||||
public int MaxAllowedPacket { get; set; } = 64*1024*1024;
|
public int MaxAllowedPacket { get; set; } = 64*1024*1024;
|
||||||
}
|
}
|
@@ -10,8 +10,15 @@ namespace ConsoleApp2.Options
|
|||||||
{
|
{
|
||||||
public long SimulaRowCount { get; set; }//模拟的记录条数
|
public long SimulaRowCount { get; set; }//模拟的记录条数
|
||||||
}
|
}
|
||||||
public class InputTableOptions
|
public class TasksOptions
|
||||||
{
|
{
|
||||||
public Dictionary<string, TableInfo> TableInfoConfig { get; set; } = new();
|
public Dictionary<string, TableInfo> TableInfoConfig { get; set; } = new();
|
||||||
|
public int TransformTaskCount { get; set; } = 1;
|
||||||
|
public OutPutOptions OutPutOptions { get; set; }=new();
|
||||||
|
}
|
||||||
|
public class OutPutOptions
|
||||||
|
{
|
||||||
|
public int FlushCount { get; set; } = 10000;
|
||||||
|
public int OutPutTaskCount { get; set; } = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
@@ -1,11 +1,9 @@
|
|||||||
using ConsoleApp2;
|
using ConsoleApp2;
|
||||||
using ConsoleApp2.Const;
|
using ConsoleApp2.Const;
|
||||||
using ConsoleApp2.Helpers;
|
|
||||||
using ConsoleApp2.HostedServices;
|
using ConsoleApp2.HostedServices;
|
||||||
using ConsoleApp2.HostedServices.Abstractions;
|
using ConsoleApp2.HostedServices.Abstractions;
|
||||||
using ConsoleApp2.Options;
|
using ConsoleApp2.Options;
|
||||||
using ConsoleApp2.Services;
|
using ConsoleApp2.Services;
|
||||||
using ConsoleApp2.SimulationService;
|
|
||||||
using Microsoft.Extensions.Caching.StackExchangeRedis;
|
using Microsoft.Extensions.Caching.StackExchangeRedis;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
@@ -13,13 +11,9 @@ using Microsoft.Extensions.Hosting;
|
|||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using MySqlConnector;
|
using MySqlConnector;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
using Serilog.Core;
|
using Microsoft.Extensions.Caching.Distributed;
|
||||||
using StackExchange.Redis;
|
using Serilog.Events;
|
||||||
using System.Reflection.PortableExecutable;
|
|
||||||
|
|
||||||
|
|
||||||
// 运行之前把Mysql max_allowed_packets 调大
|
|
||||||
// 运行之前把process_step表的外键删掉
|
|
||||||
await RunProgram();
|
await RunProgram();
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@@ -62,55 +56,12 @@ async Task RunProgram()
|
|||||||
|
|
||||||
//}, "请输入单次插入的行数(默认为20000):");
|
//}, "请输入单次插入的行数(默认为20000):");
|
||||||
|
|
||||||
ThreadPool.SetMaxThreads(200, 200);
|
ThreadPool.SetMaxThreads(8, 4);
|
||||||
var host = Host.CreateApplicationBuilder(args);
|
var host = Host.CreateApplicationBuilder(args);
|
||||||
var commandOptions = host.Configuration.GetSection("CmdOptions").Get<CommandOptions>() ?? new CommandOptions();
|
var commandOptions = host.Configuration.GetSection("CmdOptions").Get<CommandOptions>() ?? new CommandOptions();
|
||||||
Console.WriteLine($"InputDir:{commandOptions?.InputDir}");
|
Console.WriteLine($"InputDir:{commandOptions?.InputDir}");
|
||||||
Console.WriteLine($"OutPutFlushCount:{commandOptions?.FlushCount}");
|
if (commandOptions == null) throw new ArgumentNullException("commandOptions is null");
|
||||||
Console.WriteLine($"OutPutTaskCount:{commandOptions?.TaskCount}");
|
|
||||||
|
|
||||||
var oldestTime = DateTime.ParseExact(commandOptions.OldestTime, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo);
|
var oldestTime = DateTime.ParseExact(commandOptions.OldestTime, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo);
|
||||||
host.Services.Configure<InputTableOptions>(option =>
|
|
||||||
{
|
|
||||||
option.TableInfoConfig = new Dictionary<string, TableInfo>
|
|
||||||
{
|
|
||||||
|
|
||||||
//order_block_plan_item从order_item表查询,然后程序插入
|
|
||||||
//order_package_item从order_item表查询,然后程序插入
|
|
||||||
//order_patch_detail生产没有这个表,不处理
|
|
||||||
|
|
||||||
|
|
||||||
{"machine",new TableInfo{SimulaRowCount=14655 }},
|
|
||||||
{"order",new TableInfo{SimulaRowCount=5019216 }},
|
|
||||||
{"order_block_plan",new TableInfo{SimulaRowCount=2725553 }},//CreateTime < 202301的删除
|
|
||||||
{"order_block_plan_result",new TableInfo{SimulaRowCount=1174096 }},
|
|
||||||
{"order_box_block",new TableInfo{SimulaRowCount=29755672 }},
|
|
||||||
{"order_data_block",new TableInfo{SimulaRowCount=731800334 }},
|
|
||||||
{"order_data_goods",new TableInfo{SimulaRowCount=25803671 }},
|
|
||||||
{"order_data_parts",new TableInfo{SimulaRowCount=468517543 }},
|
|
||||||
{"order_item",new TableInfo{SimulaRowCount=1345520079 }},
|
|
||||||
{"order_module",new TableInfo{SimulaRowCount=103325385 }},
|
|
||||||
{"order_module_extra",new TableInfo{SimulaRowCount=54361321 }},
|
|
||||||
{"order_module_item",new TableInfo{SimulaRowCount=69173339 }},
|
|
||||||
{"order_package",new TableInfo{SimulaRowCount=16196195 }},
|
|
||||||
{"order_process",new TableInfo{SimulaRowCount=3892685 }},//orderNo < 202301的
|
|
||||||
{"order_process_step",new TableInfo{SimulaRowCount=8050349 }},//orderNo < 202301的删除
|
|
||||||
{"order_process_step_item",new TableInfo{SimulaRowCount=14538058 }},//orderNo < 202301的删除
|
|
||||||
{"order_scrap_board",new TableInfo{SimulaRowCount=123998 }},
|
|
||||||
{"process_group",new TableInfo{SimulaRowCount=1253 }},
|
|
||||||
{"process_info",new TableInfo{SimulaRowCount=7839 }},
|
|
||||||
{"process_item_exp",new TableInfo{SimulaRowCount=28 }},
|
|
||||||
{"process_schdule_capacity",new TableInfo{SimulaRowCount=39736 }},
|
|
||||||
{"process_step_efficiency",new TableInfo{SimulaRowCount=8 }},
|
|
||||||
{"report_template",new TableInfo{SimulaRowCount=7337 }},
|
|
||||||
{"simple_package",new TableInfo{SimulaRowCount=130436 }},//orderNo < 202301的删除
|
|
||||||
{"simple_plan_order",new TableInfo{SimulaRowCount=351470 }},//CreateTime < 202301的删除
|
|
||||||
{"sys_config",new TableInfo{SimulaRowCount=2296 }},
|
|
||||||
{"work_calendar",new TableInfo{SimulaRowCount=11 }},
|
|
||||||
{"work_shift",new TableInfo{SimulaRowCount=59 }},
|
|
||||||
{"work_time",new TableInfo{SimulaRowCount=62 }},
|
|
||||||
};
|
|
||||||
});
|
|
||||||
host.Services.Configure<CsvOptions>(option =>
|
host.Services.Configure<CsvOptions>(option =>
|
||||||
{
|
{
|
||||||
option.Delimiter = ",";
|
option.Delimiter = ",";
|
||||||
@@ -129,13 +80,17 @@ async Task RunProgram()
|
|||||||
|
|
||||||
host.Services.Configure<DataTransformOptions>(options =>
|
host.Services.Configure<DataTransformOptions>(options =>
|
||||||
{
|
{
|
||||||
|
if (commandOptions.IsMock) return;
|
||||||
options.DatabaseFilter = record => "cferp_test";
|
options.DatabaseFilter = record => "cferp_test";
|
||||||
|
|
||||||
options.TransformBinary = field => commandOptions != null && commandOptions.Isutf8mb4 ? $"_utf8mb4 0x{field}" : $"0x{field}";
|
options.TransformBinary = field => commandOptions != null && commandOptions.Isutf8mb4 ? $"_utf8mb4 0x{field}" : $"0x{field}";
|
||||||
|
var noFilterTables = commandOptions.NoFilterTables.Split(",");
|
||||||
//数据过滤
|
//数据过滤
|
||||||
options.RecordFilter = async (record, db) =>
|
options.RecordFilter = async (record, cache) =>
|
||||||
{
|
{
|
||||||
//var index = Array.IndexOf(record.Headers, "ShardKey");
|
|
||||||
|
if (noFilterTables.Contains(record.TableName)) return true;
|
||||||
|
|
||||||
if (record.TryGetField("ShardKey", out var skStr))
|
if (record.TryGetField("ShardKey", out var skStr))
|
||||||
{
|
{
|
||||||
short.TryParse(skStr, out var sk);
|
short.TryParse(skStr, out var sk);
|
||||||
@@ -157,7 +112,7 @@ async Task RunProgram()
|
|||||||
if (dt < oldestTime) return false;
|
if (dt < oldestTime) return false;
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception)
|
||||||
{
|
{
|
||||||
return false;//订单号转换失败,跳过
|
return false;//订单号转换失败,跳过
|
||||||
|
|
||||||
@@ -183,19 +138,23 @@ async Task RunProgram()
|
|||||||
if (record.TableName == "order_process_step" || record.TableName == "order_process_step_item")
|
if (record.TableName == "order_process_step" || record.TableName == "order_process_step_item")
|
||||||
{
|
{
|
||||||
//如果缓存中不存在OrderProcessID,则丢弃
|
//如果缓存中不存在OrderProcessID,则丢弃
|
||||||
if(record.TryGetField("OrderProcessID",out var orderProcessID))
|
|
||||||
|
if(record.TryGetField("OrderProcessID",out string orderProcessID))
|
||||||
{
|
{
|
||||||
var value = await db.StringGetAsync(orderProcessID);
|
var value = await cache.GetStringAsync($"order_process_{orderProcessID}");
|
||||||
if (string.IsNullOrEmpty(value.ToString()))return false;
|
if (string.IsNullOrEmpty(value)) return false;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
if (record.TableName == "order_block_plan_result" )
|
if (record.TableName == "order_block_plan_result" )
|
||||||
{
|
{
|
||||||
//如果缓存中不存在ID,则丢弃(ID 对应order_block_plan中的ID)
|
//如果缓存中不存在ID,则丢弃(ID 对应order_block_plan中的ID)
|
||||||
if (record.TryGetField("ID", out var id))
|
if (record.TryGetField("ID", out string id))
|
||||||
{
|
{
|
||||||
var value = await db.StringGetAsync(id);
|
var value = await cache.GetStringAsync($"order_block_plan_{id}");
|
||||||
if (string.IsNullOrEmpty(value.ToString())) return false;
|
if (string.IsNullOrEmpty(value)) return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
@@ -208,7 +167,7 @@ async Task RunProgram()
|
|||||||
{
|
{
|
||||||
if (record.TryGetField("OrderNos", out var nos))
|
if (record.TryGetField("OrderNos", out var nos))
|
||||||
{
|
{
|
||||||
if (nos.Length <= 2) record.SetField("OrderNos", "\"[]\"");
|
if (nos.Length <= 2) record.SetField("OrderNos", "");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -218,7 +177,7 @@ async Task RunProgram()
|
|||||||
if (record.TryGetField("NextStepID", out var idStr))
|
if (record.TryGetField("NextStepID", out var idStr))
|
||||||
{
|
{
|
||||||
|
|
||||||
if (idStr == "NULL")
|
if (idStr == "\\N")
|
||||||
{
|
{
|
||||||
record.SetField("NextStepID", "0");
|
record.SetField("NextStepID", "0");
|
||||||
}
|
}
|
||||||
@@ -227,7 +186,7 @@ async Task RunProgram()
|
|||||||
|
|
||||||
};
|
};
|
||||||
//数据缓存
|
//数据缓存
|
||||||
options.RecordCache = async (record, db) =>
|
options.RecordCache = async (record, cache) =>
|
||||||
{
|
{
|
||||||
if (record.TableName == "order")
|
if (record.TableName == "order")
|
||||||
{
|
{
|
||||||
@@ -235,7 +194,7 @@ async Task RunProgram()
|
|||||||
{
|
{
|
||||||
if (record.TryGetField("CompanyID", out var companyid))
|
if (record.TryGetField("CompanyID", out var companyid))
|
||||||
{
|
{
|
||||||
await db.StringSetAsync(orderNo, companyid);
|
await cache.SetStringAsync(record.GetCacheKey("OrderNo"), companyid);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -249,14 +208,7 @@ async Task RunProgram()
|
|||||||
|
|
||||||
if( record.TryGetField("ID", out var id))
|
if( record.TryGetField("ID", out var id))
|
||||||
{
|
{
|
||||||
try
|
await cache.SetStringAsync(record.GetCacheKey("ID"), sk);
|
||||||
{
|
|
||||||
await db.StringSetAsync(id, sk);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -266,12 +218,12 @@ async Task RunProgram()
|
|||||||
if (record.TryGetField("CompanyID", out var companyid))
|
if (record.TryGetField("CompanyID", out var companyid))
|
||||||
{
|
{
|
||||||
record.TryGetField("ID", out var id);
|
record.TryGetField("ID", out var id);
|
||||||
await db.StringSetAsync(id, companyid);
|
await cache.SetStringAsync(record.GetCacheKey("ID"), companyid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
//数据替换
|
//数据替换
|
||||||
options.RecordReplace = async (record, db) =>
|
options.RecordReplace = async (record, cache) =>
|
||||||
{
|
{
|
||||||
//删除数据源里simple_plan_order.ProcessState 字段和值
|
//删除数据源里simple_plan_order.ProcessState 字段和值
|
||||||
|
|
||||||
@@ -306,8 +258,9 @@ async Task RunProgram()
|
|||||||
var headers = new List<string>(record.Headers);
|
var headers = new List<string>(record.Headers);
|
||||||
var fields =new List<string>(record.Fields);
|
var fields =new List<string>(record.Fields);
|
||||||
headers.Add("CompanyID");
|
headers.Add("CompanyID");
|
||||||
var companyidResult =await db.StringGetAsync(id);
|
var companyidResult =await cache.GetStringAsync($"order_block_plan_{id}");
|
||||||
_ = int.TryParse(companyidResult.ToString(), out var companyid);
|
|
||||||
|
_ = int.TryParse(companyidResult, out var companyid);
|
||||||
fields.Add(companyid.ToString());
|
fields.Add(companyid.ToString());
|
||||||
return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), companyid);
|
return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), companyid);
|
||||||
}
|
}
|
||||||
@@ -321,8 +274,8 @@ async Task RunProgram()
|
|||||||
var headers = new List<string>(record.Headers);
|
var headers = new List<string>(record.Headers);
|
||||||
var fields = new List<string>(record.Fields);
|
var fields = new List<string>(record.Fields);
|
||||||
headers.Add("CompanyID");
|
headers.Add("CompanyID");
|
||||||
var companyidResult = await db.StringGetAsync(orderNo);
|
var companyidResult = await cache.GetStringAsync($"order_{orderNo}");
|
||||||
_ = int.TryParse(companyidResult.ToString(), out var cpid);
|
_ = int.TryParse(companyidResult, out var cpid);
|
||||||
fields.Add(cpid.ToString());
|
fields.Add(cpid.ToString());
|
||||||
return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), cpid);
|
return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), cpid);
|
||||||
}
|
}
|
||||||
@@ -363,21 +316,13 @@ async Task RunProgram()
|
|||||||
{
|
{
|
||||||
if (record.TryGetField("OrderProcessID",out var processID))
|
if (record.TryGetField("OrderProcessID",out var processID))
|
||||||
{
|
{
|
||||||
try
|
var shardKey =await cache.GetStringAsync($"order_process_{processID}");
|
||||||
{
|
|
||||||
|
|
||||||
var shardKey =await db.StringGetAsync(processID);
|
|
||||||
var headers = new List<string>(record.Headers);
|
var headers = new List<string>(record.Headers);
|
||||||
var fields = new List<string>(record.Fields);
|
var fields = new List<string>(record.Fields);
|
||||||
headers.Add("ShardKey");
|
headers.Add("ShardKey");
|
||||||
fields.Add(shardKey.ToString());
|
fields.Add(shardKey??"0");
|
||||||
return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), record.CompanyID);
|
return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), record.CompanyID);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(record.TableName == "order_moudle")
|
if(record.TableName == "order_moudle")
|
||||||
@@ -455,7 +400,7 @@ async Task RunProgram()
|
|||||||
{ "process_item_exp.ItemJson", ColumnType.Text },
|
{ "process_item_exp.ItemJson", ColumnType.Text },
|
||||||
{ "report_template.Template", ColumnType.Text },
|
{ "report_template.Template", ColumnType.Text },
|
||||||
{ "report_template.SourceConfig", ColumnType.Text },
|
{ "report_template.SourceConfig", ColumnType.Text },
|
||||||
{ "order_block_plan.OrderNos", ColumnType.Text },
|
{ "order_block_plan.OrderNos", ColumnType.Json },
|
||||||
{ "order_block_plan.BlockInfo", ColumnType.Text },
|
{ "order_block_plan.BlockInfo", ColumnType.Text },
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
@@ -470,36 +415,46 @@ async Task RunProgram()
|
|||||||
// Password = "123456",
|
// Password = "123456",
|
||||||
// MaximumPoolSize = 50, // 这个值应当小于 max_connections
|
// MaximumPoolSize = 50, // 这个值应当小于 max_connections
|
||||||
//}.ConnectionString;
|
//}.ConnectionString;
|
||||||
options.ConnectionString = new MySqlConnectionStringBuilder(host.Configuration.GetConnectionString("MySqlMaster"))
|
options.ConnectionString = new MySqlConnectionStringBuilder(host.Configuration.GetConnectionString("MySqlMaster")??"")
|
||||||
{
|
{
|
||||||
CharacterSet = "utf8",
|
CharacterSet = "utf8",
|
||||||
AllowUserVariables = true,
|
AllowUserVariables = true,
|
||||||
IgnoreCommandTransaction = true,
|
IgnoreCommandTransaction = true,
|
||||||
TreatTinyAsBoolean = false,
|
TreatTinyAsBoolean = false,
|
||||||
MaximumPoolSize = 50
|
MaximumPoolSize = 50,
|
||||||
|
SslMode = MySqlSslMode.None,
|
||||||
}.ConnectionString;
|
}.ConnectionString;
|
||||||
options.TaskCount = commandOptions.TaskCount;
|
|
||||||
options.FlushCount = commandOptions.FlushCount;
|
|
||||||
});
|
});
|
||||||
|
|
||||||
host.Services.AddLogging(builder =>
|
host.Services.AddLogging(builder =>
|
||||||
{
|
{
|
||||||
builder.ClearProviders();
|
builder.ClearProviders();
|
||||||
builder.AddSerilog(new LoggerConfiguration().WriteTo.Console().CreateLogger());
|
builder.AddSerilog(new LoggerConfiguration()
|
||||||
|
.WriteTo.Console()
|
||||||
|
.WriteTo.File("./log/error.log", restrictedToMinimumLevel:LogEventLevel.Error)
|
||||||
|
// .WriteTo.File("./log/info.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
|
||||||
|
.CreateLogger()
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
host.Services.AddSingleton<ProcessContext>();
|
host.Services.AddScoped<ProcessContext>();
|
||||||
host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Producer);
|
host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Producer);
|
||||||
host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Consumer);
|
host.Services.AddKeyedSingleton<DataRecordQueue>(ProcessStep.Consumer);
|
||||||
host.Services.AddTransient<TaskManager>();
|
host.Services.AddTransient<TaskManager>();
|
||||||
|
host.Services.AddSingleton<ErrorRecorder>();
|
||||||
|
|
||||||
host.Services.AddHostedService<MainHostedService>();
|
host.Services.AddHostedService<MainHostedService>();
|
||||||
host.Services.AddHostedService<TaskMonitorService>();
|
host.Services.AddHostedService<TaskMonitorService>();
|
||||||
host.Services.AddSingleton<IInputService, InputService>();
|
if(commandOptions.IsMock)host.Services.AddSingleton<IInputService,InputService>();
|
||||||
|
else host.Services.AddSingleton<IInputService, InputService>();
|
||||||
host.Services.AddSingleton<ITransformService, TransformService>();
|
host.Services.AddSingleton<ITransformService, TransformService>();
|
||||||
host.Services.AddSingleton<IOutputService, OutputService>();
|
host.Services.AddSingleton<IOutputService, OutputService>();
|
||||||
var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get<RedisCacheOptions>() ?? new RedisCacheOptions();
|
var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get<RedisCacheOptions>() ?? new RedisCacheOptions();
|
||||||
var redis = ConnectionMultiplexer.Connect(redisOptions.Configuration);
|
host.Services.AddStackExchangeRedisCache(options =>
|
||||||
host.Services.AddSingleton(redis.GetDatabase());
|
{
|
||||||
|
options.Configuration = redisOptions.Configuration;
|
||||||
|
options.InstanceName = redisOptions.InstanceName;
|
||||||
|
});
|
||||||
var app = host.Build();
|
var app = host.Build();
|
||||||
await app.RunAsync();
|
await app.RunAsync();
|
||||||
}
|
}
|
@@ -16,10 +16,10 @@ public class CsvSource:IDataSource
|
|||||||
//protected readonly StreamReader _reader;
|
//protected readonly StreamReader _reader;
|
||||||
private readonly ILogger? _logger;
|
private readonly ILogger? _logger;
|
||||||
protected readonly string _tableName;
|
protected readonly string _tableName;
|
||||||
protected string? _sqlFilePath;
|
protected string _sqlFilePath=string.Empty;
|
||||||
protected readonly string? _sqlFileText;
|
protected readonly string? _sqlFileText;
|
||||||
protected string[]? headers;
|
protected string[] headers=Array.Empty<string>();
|
||||||
protected string[]? csvFiles;
|
protected string[] csvFiles = Array.Empty<string>();
|
||||||
public string? CurrentRaw { get; protected set; }
|
public string? CurrentRaw { get; protected set; }
|
||||||
public string Delimiter { get; private set; }
|
public string Delimiter { get; private set; }
|
||||||
public char QuoteChar { get; private set; }
|
public char QuoteChar { get; private set; }
|
||||||
@@ -31,9 +31,6 @@ public class CsvSource:IDataSource
|
|||||||
_logger = logger;
|
_logger = logger;
|
||||||
Delimiter = delimiter;
|
Delimiter = delimiter;
|
||||||
QuoteChar = quoteChar;
|
QuoteChar = quoteChar;
|
||||||
string pattern = $"^.*\\.{tableName}\\..*\\.sql$";
|
|
||||||
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success);
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -129,9 +126,11 @@ public class CsvSource:IDataSource
|
|||||||
}
|
}
|
||||||
public virtual async Task GetHeaderAndCsvFiles()
|
public virtual async Task GetHeaderAndCsvFiles()
|
||||||
{
|
{
|
||||||
|
string pattern = $"^.*\\.{_tableName}\\..*\\.sql$";
|
||||||
|
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success) ?? "";
|
||||||
var text = await File.ReadAllTextAsync(_sqlFilePath);
|
var text = await File.ReadAllTextAsync(_sqlFilePath);
|
||||||
headers = await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
|
headers = DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
|
||||||
csvFiles = await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
|
csvFiles = DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
|
||||||
|
|
||||||
}
|
}
|
||||||
public virtual async Task DoEnqueue(Action<DataRecord> action)
|
public virtual async Task DoEnqueue(Action<DataRecord> action)
|
||||||
@@ -140,9 +139,9 @@ public class CsvSource:IDataSource
|
|||||||
foreach (var file in csvFiles)
|
foreach (var file in csvFiles)
|
||||||
{
|
{
|
||||||
var filePath= Path.Combine(_inputDir, file);
|
var filePath= Path.Combine(_inputDir, file);
|
||||||
using (var fs = File.OpenRead(filePath))
|
using var fs = File.OpenRead(filePath);
|
||||||
{
|
{
|
||||||
using (StreamReader sr = new StreamReader(fs))
|
using StreamReader sr = new (fs);
|
||||||
{
|
{
|
||||||
while (!sr.EndOfStream)
|
while (!sr.EndOfStream)
|
||||||
{
|
{
|
||||||
@@ -164,9 +163,9 @@ public class CsvSource:IDataSource
|
|||||||
if (file != null)
|
if (file != null)
|
||||||
{
|
{
|
||||||
var filePath = Path.Combine(_inputDir, file);
|
var filePath = Path.Combine(_inputDir, file);
|
||||||
using (var fs = File.OpenRead(filePath))
|
using var fs = File.OpenRead(filePath);
|
||||||
{
|
{
|
||||||
using (StreamReader sr = new StreamReader(fs))
|
using StreamReader sr = new(fs);
|
||||||
{
|
{
|
||||||
var line = await sr.ReadLineAsync();
|
var line = await sr.ReadLineAsync();
|
||||||
var fields = ParseRow2(line, QuoteChar, Delimiter);
|
var fields = ParseRow2(line, QuoteChar, Delimiter);
|
||||||
|
104
ConsoleApp2/Services/ErrorRecorder.cs
Normal file
104
ConsoleApp2/Services/ErrorRecorder.cs
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
using System.Text;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
|
namespace ConsoleApp2.Services;
|
||||||
|
|
||||||
|
public class ErrorRecorder
|
||||||
|
{
|
||||||
|
private readonly string _outputDir = "./ErrorRecords";
|
||||||
|
private readonly ILogger _logger;
|
||||||
|
private readonly Dictionary<string, int> _logIndex = new();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 当次执行标识
|
||||||
|
/// </summary>
|
||||||
|
private static readonly string UID = DateTime.Now.ToString("yyyy-MM-dd HH-mm-ss");
|
||||||
|
|
||||||
|
public ErrorRecorder(ILogger<ErrorRecorder> logger)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
var dir = Path.Combine(_outputDir, UID);
|
||||||
|
if (!Directory.Exists(dir))
|
||||||
|
{
|
||||||
|
Directory.CreateDirectory(dir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 记录已知表名发生错误的SQL
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="commandText"></param>
|
||||||
|
/// <param name="tableName"></param>
|
||||||
|
/// <param name="exception"></param>
|
||||||
|
public async Task LogErrorSqlAsync(string commandText, string tableName, Exception exception)
|
||||||
|
{
|
||||||
|
if (!_logIndex.TryGetValue(tableName, out var idx))
|
||||||
|
{
|
||||||
|
idx = 0;
|
||||||
|
_logIndex.Add(tableName, idx);
|
||||||
|
}
|
||||||
|
var filePath = Path.Combine(_outputDir, UID, $"{tableName}-{idx}.errlog");
|
||||||
|
|
||||||
|
if (File.Exists(filePath) && new FileInfo(filePath).Length > 10 * 1024 * 1024)
|
||||||
|
{
|
||||||
|
++idx;
|
||||||
|
_logIndex[tableName] = idx;
|
||||||
|
filePath = Path.Combine(_outputDir, UID, $"{tableName}-{idx}.errlog");
|
||||||
|
}
|
||||||
|
var content = $"""
|
||||||
|
/* [{DateTime.Now:yyyy-MM-dd HH:mm:ss}]
|
||||||
|
* Error occurred when export table '{tableName}':
|
||||||
|
* {exception.Message}
|
||||||
|
*/
|
||||||
|
|
||||||
|
{commandText}
|
||||||
|
|
||||||
|
|
||||||
|
""";
|
||||||
|
await File.AppendAllTextAsync(filePath, content, Encoding.UTF8);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 记录发生错误的SQL
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="commandText"></param>
|
||||||
|
/// <param name="exception"></param>
|
||||||
|
public async Task LogErrorSqlAsync(string commandText, Exception exception)
|
||||||
|
{
|
||||||
|
var filePath = Path.Combine(_outputDir, UID, "UnknownTables.errlog");
|
||||||
|
var content = $"""
|
||||||
|
/* [{DateTime.Now:yyyy-MM-dd HH:mm:ss}]
|
||||||
|
* Error occurred when export table with unknown table name:
|
||||||
|
* {exception.Message}
|
||||||
|
*/
|
||||||
|
{commandText}
|
||||||
|
|
||||||
|
|
||||||
|
""";
|
||||||
|
await File.AppendAllTextAsync(filePath, content, Encoding.UTF8);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task LogErrorRecordsAsync(IDictionary<string, DataRecord> records, Exception exception)
|
||||||
|
{
|
||||||
|
var pathDict = new Dictionary<string, string>();
|
||||||
|
foreach (var pair in records)
|
||||||
|
{
|
||||||
|
if(!pathDict.TryGetValue(pair.Key, out var path))
|
||||||
|
{
|
||||||
|
path = Path.Combine(_outputDir, UID, "ErrorRecords", $"{pair.Key}.errlog");
|
||||||
|
pathDict.Add(pair.Key, path);
|
||||||
|
}
|
||||||
|
//
|
||||||
|
await File.AppendAllTextAsync(path, string.Join(',', pair.Value.Fields));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void ClearErrorRecords()
|
||||||
|
{
|
||||||
|
_logger.LogInformation("***** Clear error records *****");
|
||||||
|
foreach (var file in Directory.GetFiles(_outputDir, "*.errlog", SearchOption.AllDirectories))
|
||||||
|
{
|
||||||
|
File.Delete(file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@@ -11,34 +11,31 @@ namespace ConsoleApp2.Services;
|
|||||||
[Obsolete]
|
[Obsolete]
|
||||||
public class JsvSource:IDataSource
|
public class JsvSource:IDataSource
|
||||||
{
|
{
|
||||||
private readonly string _inputDir;
|
//private readonly string _inputDir;
|
||||||
private readonly JsvStringSerializer _jsv;
|
//private readonly JsvStringSerializer _jsv;
|
||||||
private readonly StreamReader _reader;
|
//private readonly StreamReader? _reader;
|
||||||
// ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable
|
// ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable
|
||||||
private readonly ILogger? _logger;
|
//private readonly ILogger? _logger;
|
||||||
private readonly string _tableName;
|
//private readonly string _tableName;
|
||||||
|
|
||||||
public DataRecord Current { get; protected set; } = null!;
|
public DataRecord Current { get; protected set; } = null!;
|
||||||
public string[]? Headers { get; }
|
public string[]? Headers { get; }
|
||||||
public bool EndOfSource => _reader.EndOfStream;
|
//public bool EndOfSource => _reader.EndOfStream;
|
||||||
|
|
||||||
public JsvSource(string inputDir,string tableName, ILogger? logger = null)
|
public JsvSource(string inputDir,string tableName, ILogger? logger = null)
|
||||||
{
|
{
|
||||||
_inputDir = inputDir;
|
//_inputDir = inputDir;
|
||||||
_tableName = tableName;
|
//_tableName = tableName;
|
||||||
_jsv = new JsvStringSerializer();
|
//_jsv = new JsvStringSerializer();
|
||||||
// _reader = new StreamReader(filePath);
|
//_logger = logger;
|
||||||
//Headers = headers;
|
|
||||||
_logger = logger;
|
|
||||||
// _logger?.LogInformation("Reading file: {FilePath}", filePath);
|
|
||||||
//_tableName = DumpDataHelper.GetTableName(filePath);
|
|
||||||
}
|
}
|
||||||
public async Task DoEnqueue(Action<DataRecord> action)
|
public Task DoEnqueue(Action<DataRecord> action)
|
||||||
{
|
{
|
||||||
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
_reader.Dispose();
|
// _reader?.Dispose();
|
||||||
}
|
}
|
||||||
}
|
}
|
@@ -1,5 +1,6 @@
|
|||||||
using System.Reflection.Metadata;
|
using System.Data.Common;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
|
using System.Text.RegularExpressions;
|
||||||
using ConsoleApp2.Helpers;
|
using ConsoleApp2.Helpers;
|
||||||
using ConsoleApp2.Options;
|
using ConsoleApp2.Options;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
@@ -12,27 +13,31 @@ namespace ConsoleApp2.Services;
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Mysql导出
|
/// Mysql导出
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public class MySqlDestination : IDisposable, IAsyncDisposable
|
public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
||||||
{
|
{
|
||||||
private readonly Dictionary<string, IList<DataRecord>> _recordCache;
|
private readonly Dictionary<string, IList<DataRecord>> _recordCache;
|
||||||
private readonly MySqlConnection _conn;
|
private readonly MySqlConnection _conn;
|
||||||
private readonly ILogger _logger;
|
private readonly ILogger _logger;
|
||||||
private readonly bool _prettyOutput;
|
|
||||||
private readonly int _maxAllowPacket;
|
|
||||||
private readonly ProcessContext _context;
|
private readonly ProcessContext _context;
|
||||||
|
private readonly IOptions<DataTransformOptions> _transformOptions;
|
||||||
|
private readonly ErrorRecorder _errorRecorder;
|
||||||
|
|
||||||
public MySqlDestination(string connStr, ILogger logger, ProcessContext context,bool prettyOutput = false)
|
public MySqlDestination(
|
||||||
|
string connStr,
|
||||||
|
ILogger logger,
|
||||||
|
ProcessContext context,
|
||||||
|
IOptions<DataTransformOptions> transformOptions,
|
||||||
|
ErrorRecorder errorRecorder)
|
||||||
{
|
{
|
||||||
_conn = new MySqlConnection(connStr);
|
_conn = new MySqlConnection(connStr);
|
||||||
_conn.Open();
|
_conn.Open();
|
||||||
_recordCache = new Dictionary<string, IList<DataRecord>>();
|
_recordCache = new Dictionary<string, IList<DataRecord>>();
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_context = context;
|
_context = context;
|
||||||
_prettyOutput = prettyOutput;
|
_transformOptions = transformOptions;
|
||||||
|
_errorRecorder = errorRecorder;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task WriteRecordAsync(DataRecord record)
|
public Task WriteRecordAsync(DataRecord record)
|
||||||
{
|
{
|
||||||
_recordCache.AddOrUpdate(record.TableName, [record], (key, value) =>
|
_recordCache.AddOrUpdate(record.TableName, [record], (key, value) =>
|
||||||
@@ -51,28 +56,44 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task FlushAsync(int maxAllowPacket, IOptions<DataTransformOptions> transOptions)
|
public async Task FlushAsync(int maxAllowPacket)
|
||||||
{
|
{
|
||||||
if (_recordCache.Count == 0)
|
if (_recordCache.Count == 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
var cmd = _conn.CreateCommand();
|
var cmd = _conn.CreateCommand();
|
||||||
cmd.CommandTimeout = 3 * 60;
|
cmd.CommandTimeout = 3 * 60;
|
||||||
var excuseList = GetExcuseList(_recordCache, maxAllowPacket, transOptions, _prettyOutput);
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
var excuseList = GetExcuseList(_recordCache, maxAllowPacket).ToList();
|
||||||
foreach (var insertSql in excuseList)
|
foreach (var insertSql in excuseList)
|
||||||
{
|
{
|
||||||
cmd.CommandText = insertSql;
|
cmd.CommandText = insertSql;
|
||||||
|
try
|
||||||
|
{
|
||||||
await cmd.ExecuteNonQueryAsync();
|
await cmd.ExecuteNonQueryAsync();
|
||||||
}
|
}
|
||||||
_recordCache.Clear();
|
|
||||||
}
|
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
_logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000));
|
_logger.LogCritical(e, "Error when flushing records, sql: {Sql}", cmd.CommandText.Omit(1000));
|
||||||
_context.AddException(e);
|
_context.AddException(e);
|
||||||
throw;
|
|
||||||
|
var match = MatchTableName().Match(cmd.CommandText);
|
||||||
|
if (match is { Success: true, Groups.Count: > 1 })
|
||||||
|
{
|
||||||
|
var tableName = match.Groups[1].Value;
|
||||||
|
await _errorRecorder.LogErrorSqlAsync(cmd.CommandText, tableName, e);
|
||||||
|
}
|
||||||
|
else await _errorRecorder.LogErrorSqlAsync(cmd.CommandText, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_recordCache.Clear();
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
_logger.LogCritical(e, "Error when serialize records, record:");
|
||||||
|
_context.AddException(e);
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
@@ -80,89 +101,102 @@ public class MySqlDestination : IDisposable, IAsyncDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static IList<string> GetExcuseList(IDictionary<string, IList<DataRecord>> tableRecords,int maxAllowPacket, IOptions<DataTransformOptions> transOptions,
|
[GeneratedRegex("INSERT INTO `([^`]+)`")]
|
||||||
bool prettyOutput = false)
|
private static partial Regex MatchTableName();
|
||||||
|
|
||||||
|
public IEnumerable<string> GetExcuseList(IDictionary<string, IList<DataRecord>> tableRecords,int maxAllowPacket)
|
||||||
{
|
{
|
||||||
var resultList = new List<string>();
|
var sb = new StringBuilder();
|
||||||
var headerSb = new StringBuilder();
|
|
||||||
var recordSb = new StringBuilder();
|
|
||||||
foreach (var (tableName, records) in tableRecords)
|
foreach (var (tableName, records) in tableRecords)
|
||||||
{
|
{
|
||||||
if (records.Count == 0)
|
if (records.Count == 0)
|
||||||
continue;
|
continue;
|
||||||
headerSb.Append($"INSERT INTO `{tableName}`(");
|
|
||||||
|
var recordIdx = 0;
|
||||||
|
StartBuild:
|
||||||
|
var noCommas = true;
|
||||||
|
|
||||||
|
// INSERT INTO ... VALUES >>>
|
||||||
|
sb.Append($"INSERT INTO `{tableName}`(");
|
||||||
for (var i = 0; i < records[0].Headers.Length; i++)
|
for (var i = 0; i < records[0].Headers.Length; i++)
|
||||||
{
|
{
|
||||||
var header = records[0].Headers[i];
|
var header = records[0].Headers[i];
|
||||||
headerSb.Append($"`{header}`");
|
sb.Append($"`{header}`");
|
||||||
if (i != records[0].Headers.Length - 1)
|
if (i != records[0].Headers.Length - 1)
|
||||||
headerSb.Append(',');
|
sb.Append(',');
|
||||||
}
|
}
|
||||||
|
|
||||||
headerSb.Append(") VALUES ");
|
sb.Append(") VALUES ");
|
||||||
if (prettyOutput)
|
|
||||||
headerSb.AppendLine();
|
|
||||||
|
|
||||||
var sbList = new List<string>();
|
// ([FIELDS]), >>>
|
||||||
var currentLength = headerSb.Length;
|
for (;recordIdx < records.Count; recordIdx++)
|
||||||
for (var i = 0; i < records.Count; i++)
|
|
||||||
{
|
{
|
||||||
var record = records[i];
|
var record = records[recordIdx];
|
||||||
|
var recordSb = new StringBuilder();
|
||||||
recordSb.Append('(');
|
recordSb.Append('(');
|
||||||
for (var j = 0; j < record.Fields.Length; j++)
|
for (var fieldIdx = 0; fieldIdx < record.Fields.Length; fieldIdx++)
|
||||||
{
|
{
|
||||||
var field = record.Fields[j];
|
var field = record.Fields[fieldIdx];
|
||||||
var header = record.Headers[j];
|
|
||||||
if (transOptions.Value.GetColumnType(record.TableName, header) ==ColumnType.Blob)
|
// 在这里处理特殊列
|
||||||
{
|
#region HandleFields
|
||||||
if (string.IsNullOrEmpty(field))
|
if (field == "\\N")
|
||||||
{
|
{
|
||||||
recordSb.Append("NULL");
|
recordSb.Append("NULL");
|
||||||
|
goto Escape;
|
||||||
}
|
}
|
||||||
else
|
|
||||||
recordSb.Append("0x"+field);
|
switch (_transformOptions.Value.GetColumnType(record.TableName, record.Headers[fieldIdx]))
|
||||||
}
|
{
|
||||||
else
|
case ColumnType.Text:
|
||||||
|
recordSb.Append(string.IsNullOrEmpty(field)
|
||||||
|
? "''"
|
||||||
|
: _transformOptions.Value.TransformBinary?.Invoke(field) ?? field);
|
||||||
|
break;
|
||||||
|
case ColumnType.Blob:
|
||||||
|
if (string.IsNullOrEmpty(field))
|
||||||
|
recordSb.Append("''");
|
||||||
|
else recordSb.Append($"0x{field}");
|
||||||
|
break;
|
||||||
|
case ColumnType.Json:
|
||||||
|
recordSb.Append(string.IsNullOrEmpty(field)
|
||||||
|
? "\"[]\""
|
||||||
|
: _transformOptions.Value.TransformBinary?.Invoke(field) ?? field);
|
||||||
|
break;
|
||||||
|
case ColumnType.UnDefine:
|
||||||
|
default:
|
||||||
recordSb.Append(field);
|
recordSb.Append(field);
|
||||||
if (j != record.Fields.Length - 1)
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
Escape:
|
||||||
|
|
||||||
|
#endregion
|
||||||
|
if (fieldIdx != record.Fields.Length - 1)
|
||||||
recordSb.Append(',');
|
recordSb.Append(',');
|
||||||
}
|
}
|
||||||
|
|
||||||
recordSb.Append(')');
|
recordSb.Append(')');
|
||||||
|
|
||||||
//if (i != records.Count - 1) // not last field
|
// 若字符数量即将大于限制,则返回SQL,清空StringBuilder,保留当前记录的索引值,然后转到StartBuild标签重新开始一轮INSERT
|
||||||
// recordSb.Append(',');
|
if (sb.Length + recordSb.Length + 1 > maxAllowPacket)
|
||||||
if (prettyOutput) recordSb.AppendLine();
|
{
|
||||||
|
sb.Append(';');
|
||||||
|
yield return sb.ToString();
|
||||||
|
sb.Clear();
|
||||||
|
goto StartBuild;
|
||||||
|
}
|
||||||
|
|
||||||
if (currentLength + recordSb.Length >= maxAllowPacket)
|
if (!noCommas)
|
||||||
{
|
sb.Append(',').AppendLine();
|
||||||
var insertSb = new StringBuilder(headerSb.ToString());
|
noCommas = false;
|
||||||
insertSb.Append(string.Join(",", sbList));
|
sb.Append(recordSb); // StringBuilder.Append(StringBuilder)不会分配多余的内存
|
||||||
insertSb.Append(";");
|
|
||||||
resultList.Add(insertSb.ToString());
|
|
||||||
insertSb.Clear();
|
|
||||||
sbList.Clear();
|
|
||||||
sbList.Add(recordSb.ToString());
|
|
||||||
currentLength = headerSb.Length + 1;//逗号长度加1
|
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
sb.Append(';');
|
||||||
sbList.Add(recordSb.ToString());
|
yield return sb.ToString();
|
||||||
|
sb.Clear();
|
||||||
}
|
}
|
||||||
currentLength += recordSb.Length;
|
|
||||||
recordSb.Clear();
|
|
||||||
}
|
|
||||||
if (sbList.Count > 0)
|
|
||||||
{
|
|
||||||
var insertSb = new StringBuilder(headerSb.ToString());
|
|
||||||
insertSb.Append(string.Join(",", sbList));
|
|
||||||
insertSb.Append(";");
|
|
||||||
resultList.Add(insertSb.ToString());
|
|
||||||
insertSb.Clear();
|
|
||||||
}
|
|
||||||
headerSb.Clear();
|
|
||||||
}
|
|
||||||
return resultList;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@@ -1,4 +1,6 @@
|
|||||||
namespace ConsoleApp2.Services;
|
using System.Collections.Concurrent;
|
||||||
|
|
||||||
|
namespace ConsoleApp2.Services;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 处理上下文类,标识处理进度
|
/// 处理上下文类,标识处理进度
|
||||||
@@ -8,7 +10,7 @@ public class ProcessContext
|
|||||||
private int _inputCount;
|
private int _inputCount;
|
||||||
private int _transformCount;
|
private int _transformCount;
|
||||||
private int _outputCount;
|
private int _outputCount;
|
||||||
private IList<Exception> _exceptionList = new List<Exception>();
|
private ConcurrentBag<Exception> _exceptionList = new ConcurrentBag<Exception>();
|
||||||
public bool IsInputCompleted { get; private set; }
|
public bool IsInputCompleted { get; private set; }
|
||||||
public bool IsTransformCompleted { get; private set; }
|
public bool IsTransformCompleted { get; private set; }
|
||||||
public bool IsOutputCompleted { get; private set; }
|
public bool IsOutputCompleted { get; private set; }
|
||||||
@@ -34,7 +36,7 @@ public class ProcessContext
|
|||||||
{
|
{
|
||||||
_exceptionList.Add(ex);
|
_exceptionList.Add(ex);
|
||||||
}
|
}
|
||||||
public IList<Exception> GetExceptions()
|
public ConcurrentBag<Exception> GetExceptions()
|
||||||
{
|
{
|
||||||
return _exceptionList;
|
return _exceptionList;
|
||||||
}
|
}
|
||||||
|
@@ -1,6 +1,5 @@
|
|||||||
using ConsoleApp2.Helpers;
|
using ConsoleApp2.Helpers;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using System.IO;
|
|
||||||
using System.Text.RegularExpressions;
|
using System.Text.RegularExpressions;
|
||||||
using ZstdSharp;
|
using ZstdSharp;
|
||||||
namespace ConsoleApp2.Services
|
namespace ConsoleApp2.Services
|
||||||
@@ -10,22 +9,19 @@ namespace ConsoleApp2.Services
|
|||||||
public ZstSource(string inputDir, string tableName, string delimiter = ",", char quoteChar = '"',
|
public ZstSource(string inputDir, string tableName, string delimiter = ",", char quoteChar = '"',
|
||||||
ILogger? logger = null) : base(inputDir, tableName, delimiter = ",", quoteChar = '"', logger = null)
|
ILogger? logger = null) : base(inputDir, tableName, delimiter = ",", quoteChar = '"', logger = null)
|
||||||
{
|
{
|
||||||
//throw new Exception("aaa");
|
|
||||||
string pattern = $"^.*\\.{tableName}\\..*\\.sql.zst$";
|
|
||||||
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
private async Task<string> DecompressFile(string filePath)
|
private static async Task<string> DecompressFile(string filePath)
|
||||||
{
|
{
|
||||||
using (var input = File.OpenRead(filePath))
|
using var input = File.OpenRead(filePath);
|
||||||
{
|
{
|
||||||
using (var decopress = new DecompressionStream(input))
|
using var decopress = new DecompressionStream(input);
|
||||||
{
|
{
|
||||||
|
|
||||||
var ms = new MemoryStream();
|
var ms = new MemoryStream();
|
||||||
decopress.CopyTo(ms);
|
decopress.CopyTo(ms);
|
||||||
ms.Seek(0, SeekOrigin.Begin);
|
ms.Seek(0, SeekOrigin.Begin);
|
||||||
StreamReader reader = new StreamReader(ms);
|
StreamReader reader = new(ms);
|
||||||
var text = await reader.ReadToEndAsync();
|
var text = await reader.ReadToEndAsync();
|
||||||
return text;
|
return text;
|
||||||
|
|
||||||
@@ -34,9 +30,11 @@ namespace ConsoleApp2.Services
|
|||||||
}
|
}
|
||||||
public override async Task GetHeaderAndCsvFiles()
|
public override async Task GetHeaderAndCsvFiles()
|
||||||
{
|
{
|
||||||
|
string pattern = $"^.*\\.{_tableName}\\..*\\.sql.zst$";
|
||||||
|
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success) ?? "";
|
||||||
var text = await DecompressFile(_sqlFilePath);
|
var text = await DecompressFile(_sqlFilePath);
|
||||||
headers=await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
|
headers= DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
|
||||||
csvFiles=await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
|
csvFiles= DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
|
||||||
|
|
||||||
}
|
}
|
||||||
public override async Task DoEnqueue(Action<DataRecord> action)
|
public override async Task DoEnqueue(Action<DataRecord> action)
|
||||||
@@ -45,11 +43,11 @@ namespace ConsoleApp2.Services
|
|||||||
foreach (var file in csvFiles)
|
foreach (var file in csvFiles)
|
||||||
{
|
{
|
||||||
var filePath = Path.Combine(_inputDir, file);
|
var filePath = Path.Combine(_inputDir, file);
|
||||||
using (var input = File.OpenRead(filePath))
|
using var input = File.OpenRead(filePath);
|
||||||
{
|
{
|
||||||
using (var decopress = new DecompressionStream(input))
|
using var decopress = new DecompressionStream(input);
|
||||||
{
|
{
|
||||||
using( var reader = new StreamReader(decopress))
|
using var reader = new StreamReader(decopress);
|
||||||
{
|
{
|
||||||
while (!reader.EndOfStream)
|
while (!reader.EndOfStream)
|
||||||
{
|
{
|
||||||
@@ -67,15 +65,15 @@ namespace ConsoleApp2.Services
|
|||||||
public override async Task<DataRecord?> GetTestRecord()
|
public override async Task<DataRecord?> GetTestRecord()
|
||||||
{
|
{
|
||||||
await GetHeaderAndCsvFiles();
|
await GetHeaderAndCsvFiles();
|
||||||
var file = csvFiles.FirstOrDefault();
|
var file = csvFiles?.FirstOrDefault();
|
||||||
if (file != null)
|
if (file != null)
|
||||||
{
|
{
|
||||||
var filePath = Path.Combine(_inputDir, file);
|
var filePath = Path.Combine(_inputDir, file);
|
||||||
using (var input = File.OpenRead(filePath))
|
using var input = File.OpenRead(filePath);
|
||||||
{
|
{
|
||||||
using (var decopress = new DecompressionStream(input))
|
using var decopress = new DecompressionStream(input);
|
||||||
{
|
{
|
||||||
using (var reader = new StreamReader(decopress))
|
using var reader = new StreamReader(decopress);
|
||||||
{
|
{
|
||||||
var line = await reader.ReadLineAsync();
|
var line = await reader.ReadLineAsync();
|
||||||
var fields = ParseRow2(line, QuoteChar, Delimiter);
|
var fields = ParseRow2(line, QuoteChar, Delimiter);
|
||||||
@@ -88,9 +86,5 @@ namespace ConsoleApp2.Services
|
|||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
public void Dispose()
|
|
||||||
{
|
|
||||||
//_reader.Dispose();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -16,23 +16,18 @@ namespace ConsoleApp2.SimulationService
|
|||||||
{
|
{
|
||||||
private readonly ILogger _logger;
|
private readonly ILogger _logger;
|
||||||
private readonly IOptions<DataInputOptions> _dataInputOptions;
|
private readonly IOptions<DataInputOptions> _dataInputOptions;
|
||||||
private readonly IOptions<InputTableOptions> _tableOptions;
|
|
||||||
private readonly DataRecordQueue _producerQueue;
|
|
||||||
private readonly ProcessContext _context;
|
private readonly ProcessContext _context;
|
||||||
|
|
||||||
public SimulationInputService(ILogger<InputService> logger,
|
public SimulationInputService(ILogger<InputService> logger,
|
||||||
IOptions<DataInputOptions> dataInputOptions,
|
IOptions<DataInputOptions> dataInputOptions,
|
||||||
IOptions<InputTableOptions> tableOptions,
|
|
||||||
[FromKeyedServices(ProcessStep.Producer)] DataRecordQueue producerQueue,
|
|
||||||
ProcessContext context)
|
ProcessContext context)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_dataInputOptions = dataInputOptions;
|
_dataInputOptions = dataInputOptions;
|
||||||
_tableOptions = tableOptions;
|
|
||||||
_producerQueue = producerQueue;
|
|
||||||
_context = context;
|
_context = context;
|
||||||
}
|
}
|
||||||
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, ProcessContext context, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
var inputDir = _dataInputOptions.Value.InputDir;
|
var inputDir = _dataInputOptions.Value.InputDir;
|
||||||
_logger.LogInformation("***** simulation input service start, working dir: {InputDir}, thread id: {ThreadId} *****", inputDir, Environment.CurrentManagedThreadId);
|
_logger.LogInformation("***** simulation input service start, working dir: {InputDir}, thread id: {ThreadId} *****", inputDir, Environment.CurrentManagedThreadId);
|
||||||
@@ -42,9 +37,11 @@ namespace ConsoleApp2.SimulationService
|
|||||||
_logger.LogInformation("No source files found in {InputDir}", inputDir);
|
_logger.LogInformation("No source files found in {InputDir}", inputDir);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
foreach (var tableName in _tableOptions.Value.TableInfoConfig.Keys)
|
foreach (var tableName in tasksOptions.TableInfoConfig.Keys)
|
||||||
{
|
{
|
||||||
var dataCount = _tableOptions.Value.TableInfoConfig[tableName].SimulaRowCount;//当前表要生成的总数据量
|
_logger.LogInformation("Working table: {tableName}", tableName);
|
||||||
|
|
||||||
|
var dataCount = tasksOptions.TableInfoConfig[tableName].SimulaRowCount;//当前表要生成的总数据量
|
||||||
var companyTotallCount = 1000;//当前表每个公司生成的总数据量
|
var companyTotallCount = 1000;//当前表每个公司生成的总数据量
|
||||||
var tempRecords = new List<DataRecord>();
|
var tempRecords = new List<DataRecord>();
|
||||||
var sk = DataHelper.shareKeys.First();
|
var sk = DataHelper.shareKeys.First();
|
||||||
@@ -56,7 +53,9 @@ namespace ConsoleApp2.SimulationService
|
|||||||
var shareKeyIntervalCount = 0;
|
var shareKeyIntervalCount = 0;
|
||||||
|
|
||||||
var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName);
|
var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName);
|
||||||
|
if (source == null) throw new NullReferenceException($"create table source:{tableName} failed!");
|
||||||
var testRecord = await source.GetTestRecord();
|
var testRecord = await source.GetTestRecord();
|
||||||
|
if(testRecord == null) throw new NullReferenceException($"create testRecord failed, tableName:{tableName}");
|
||||||
for (long i = 1; i <= dataCount; i++)
|
for (long i = 1; i <= dataCount; i++)
|
||||||
{
|
{
|
||||||
shareKeyIntervalCount++;
|
shareKeyIntervalCount++;
|
||||||
@@ -116,7 +115,7 @@ namespace ConsoleApp2.SimulationService
|
|||||||
foreach (var rc in tempRecords)
|
foreach (var rc in tempRecords)
|
||||||
{
|
{
|
||||||
_context.AddInput();
|
_context.AddInput();
|
||||||
_producerQueue.Enqueue(rc);
|
producerQueue.Enqueue(rc);
|
||||||
if (cancellationToken.IsCancellationRequested)
|
if (cancellationToken.IsCancellationRequested)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -130,7 +129,7 @@ namespace ConsoleApp2.SimulationService
|
|||||||
//_logger.LogInformation("File '{File}' input completed", Path.GetFileName(sqlPath));
|
//_logger.LogInformation("File '{File}' input completed", Path.GetFileName(sqlPath));
|
||||||
}
|
}
|
||||||
|
|
||||||
_context.CompleteInput();
|
context.CompleteInput();
|
||||||
_logger.LogInformation("***** Csv input service completed *****");
|
_logger.LogInformation("***** Csv input service completed *****");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -2,7 +2,7 @@
|
|||||||
"CmdOptions": {
|
"CmdOptions": {
|
||||||
"InputFileType": "CSV",
|
"InputFileType": "CSV",
|
||||||
"InputDir": "D:/MyDumper-ZST",
|
"InputDir": "D:/MyDumper-ZST",
|
||||||
"TaskCount": 4,
|
"TaskCount": 6,
|
||||||
"FlushCount": 10000,
|
"FlushCount": 10000,
|
||||||
"Isutf8mb4": true,
|
"Isutf8mb4": true,
|
||||||
"OldestShardKey": 23000,
|
"OldestShardKey": 23000,
|
||||||
@@ -12,6 +12,7 @@
|
|||||||
"MySqlMaster": "Server=127.0.0.1;Port=33309;UserId=root;Password=123456;Database=cferp_test;"
|
"MySqlMaster": "Server=127.0.0.1;Port=33309;UserId=root;Password=123456;Database=cferp_test;"
|
||||||
},
|
},
|
||||||
"RedisCacheOptions": {
|
"RedisCacheOptions": {
|
||||||
"Configuration": "localhost:6379"
|
"Configuration": "192.168.1.246:6380",
|
||||||
|
"InstanceName" : "mes-etl:"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user