multTasks #2
@ -12,11 +12,9 @@ public static partial class DumpDataHelper
|
||||
private static partial Regex MatchBrackets();
|
||||
|
||||
|
||||
public static async Task<string[]> GetCsvHeadersFromSqlFileAsync(string txt)
|
||||
public static string[] GetCsvHeadersFromSqlFileAsync(string txt)
|
||||
{
|
||||
//var txt = await File.ReadAllTextAsync(filePath);
|
||||
var match = MatchBrackets().Match(txt);
|
||||
|
||||
return ParseHeader(match.ValueSpan);
|
||||
}
|
||||
|
||||
@ -60,9 +58,8 @@ public static partial class DumpDataHelper
|
||||
return filePath[(firstDotIdx+1)..secondDotIdx].ToString();
|
||||
}
|
||||
|
||||
public static async Task<string[]> GetCsvFileNamesFromSqlFileAsync(string txt,Regex regex)
|
||||
public static string[] GetCsvFileNamesFromSqlFileAsync(string txt,Regex regex)
|
||||
{
|
||||
//var txt = await File.ReadAllTextAsync(filePath);
|
||||
var matches = regex.Matches(txt);
|
||||
return matches.Select(match => match.ValueSpan[1..^1].ToString()).ToArray();
|
||||
}
|
||||
|
@ -5,5 +5,5 @@ namespace ConsoleApp2.HostedServices.Abstractions;
|
||||
|
||||
public interface IOutputService
|
||||
{
|
||||
public Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken);
|
||||
public void ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken);
|
||||
}
|
@ -27,7 +27,7 @@ public class InputService : IInputService
|
||||
_context = context;
|
||||
}
|
||||
|
||||
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, ProcessContext context,CancellationToken cancellationToken)
|
||||
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, ProcessContext context, CancellationToken cancellationToken)
|
||||
{
|
||||
var inputDir = _dataInputOptions.Value.InputDir;
|
||||
_logger.LogInformation("***** Csv input service start, working dir: {InputDir}, thread id: {ThreadId} *****", inputDir, Environment.CurrentManagedThreadId);
|
||||
@ -42,14 +42,17 @@ public class InputService : IInputService
|
||||
{
|
||||
_logger.LogInformation("Working table: {tableName}", tableName);
|
||||
var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName);
|
||||
await source.DoEnqueue((record) =>
|
||||
if (source != null)
|
||||
{
|
||||
_context.AddInput();
|
||||
producerQueue.Enqueue(record);
|
||||
count++;
|
||||
await source.DoEnqueue((record) =>
|
||||
{
|
||||
_context.AddInput();
|
||||
producerQueue.Enqueue(record);
|
||||
count++;
|
||||
|
||||
});
|
||||
if (_context.GetExceptions().Count > 0)
|
||||
});
|
||||
}
|
||||
if (!_context.GetExceptions().IsEmpty)
|
||||
{
|
||||
_logger.LogInformation("***** Csv input service is canceled *****");
|
||||
return;
|
||||
|
@ -5,14 +5,15 @@ using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
namespace ConsoleApp2.HostedServices;
|
||||
|
||||
public class MainHostedService : BackgroundService
|
||||
public class MainHostedService : IHostedService
|
||||
{
|
||||
private readonly ILogger _logger;
|
||||
private readonly IInputService _input;
|
||||
private readonly ITransformService _transform;
|
||||
private readonly IOutputService _output;
|
||||
private readonly ProcessContext _context;
|
||||
|
||||
private readonly Timer? _bigTableTimer=null;
|
||||
private readonly Timer? _smallTableTimer=null;
|
||||
public MainHostedService(ILogger<MainHostedService> logger, IInputService input, ITransformService transform, IOutputService output, ProcessContext context)
|
||||
{
|
||||
_logger = logger;
|
||||
@ -22,15 +23,15 @@ public class MainHostedService : BackgroundService
|
||||
_context = context;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var taskFun = (TasksOptions taskOp, DataRecordQueue producerQueue, DataRecordQueue consumerQueue, ProcessContext context) =>
|
||||
var taskFun = (TasksOptions taskOp, DataRecordQueue producerQueue, DataRecordQueue consumerQueue, ProcessContext context,Timer? timer) =>
|
||||
{
|
||||
var inputTask = Task.Factory.StartNew(async () =>
|
||||
Task.Factory.StartNew(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await _input.ExecuteAsync(taskOp, producerQueue, context, stoppingToken);
|
||||
await _input.ExecuteAsync(taskOp, producerQueue, context, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@ -39,11 +40,11 @@ public class MainHostedService : BackgroundService
|
||||
}
|
||||
|
||||
});
|
||||
var transformTask = Task.Factory.StartNew(async () =>
|
||||
Task.Factory.StartNew(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await _transform.ExecuteAsync(taskOp, producerQueue, consumerQueue, context, stoppingToken);
|
||||
await _transform.ExecuteAsync(taskOp, producerQueue, consumerQueue, context, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@ -52,11 +53,15 @@ public class MainHostedService : BackgroundService
|
||||
}
|
||||
|
||||
});
|
||||
var outputTask = Task.Factory.StartNew(async () =>
|
||||
Task.Factory.StartNew(() =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await _output.ExecuteAsync(taskOp, consumerQueue, context,stoppingToken);
|
||||
timer = new Timer((object? state) =>
|
||||
{
|
||||
_output.ExecuteAsync(taskOp, consumerQueue, context, cancellationToken);
|
||||
},null, TimeSpan.Zero,TimeSpan.FromSeconds(0.5));
|
||||
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@ -69,18 +74,19 @@ public class MainHostedService : BackgroundService
|
||||
|
||||
var bigTablesDic = new Dictionary<string, TableInfo>
|
||||
{
|
||||
{"order",new TableInfo{SimulaRowCount=5019216 }},
|
||||
{"order_block_plan",new TableInfo{SimulaRowCount=2725553 }},//CreateTime < 202301的删除
|
||||
{"order_block_plan_result",new TableInfo{SimulaRowCount=1174096 }},
|
||||
{"order_box_block",new TableInfo{SimulaRowCount=29755672 }},
|
||||
{"order_item",new TableInfo{SimulaRowCount=1345520079 }},
|
||||
{"simple_plan_order",new TableInfo{SimulaRowCount=351470 }},//CreateTime < 202301的删除
|
||||
};
|
||||
taskFun(new TasksOptions { TableInfoConfig = bigTablesDic, OutPutOptions = new OutPutOptions { FlushCount = 10000, OutPutTaskCount = 2 } },
|
||||
new DataRecordQueue(), new DataRecordQueue(),new ProcessContext());
|
||||
var bigTableContext = new ProcessContext();
|
||||
var bigTableOptions = new TasksOptions { TableInfoConfig = bigTablesDic, OutPutOptions = new OutPutOptions { FlushCount = 20000, OutPutTaskCount = 2 } };
|
||||
taskFun(bigTableOptions, new DataRecordQueue(), new DataRecordQueue(), bigTableContext,_bigTableTimer);
|
||||
var smallTablesDic = new Dictionary<string, TableInfo>
|
||||
{
|
||||
{"machine",new TableInfo{SimulaRowCount=14655 }},
|
||||
{"order",new TableInfo{SimulaRowCount=5019216 }},
|
||||
{"order_data_block",new TableInfo{SimulaRowCount=731800334 }},
|
||||
{"order_data_goods",new TableInfo{SimulaRowCount=25803671 }},
|
||||
{"order_data_parts",new TableInfo{SimulaRowCount=468517543 }},
|
||||
@ -104,7 +110,16 @@ public class MainHostedService : BackgroundService
|
||||
{"work_shift",new TableInfo{SimulaRowCount=59 }},
|
||||
{"work_time",new TableInfo{SimulaRowCount=62 }},
|
||||
};
|
||||
var smallTableContext = new ProcessContext();
|
||||
taskFun(new TasksOptions { TableInfoConfig = smallTablesDic, OutPutOptions = new OutPutOptions { FlushCount = 20000, OutPutTaskCount = 4 } },
|
||||
new DataRecordQueue(), new DataRecordQueue(), new ProcessContext());
|
||||
new DataRecordQueue(), new DataRecordQueue(), smallTableContext,_smallTableTimer);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -1,11 +1,9 @@
|
||||
using ConsoleApp2.Const;
|
||||
|
||||
using ConsoleApp2.HostedServices.Abstractions;
|
||||
using ConsoleApp2.Options;
|
||||
using ConsoleApp2.Services;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace ConsoleApp2.HostedServices;
|
||||
|
||||
/// <summary>
|
||||
@ -34,41 +32,33 @@ public class OutputService : IOutputService
|
||||
_transformOptions = transformOptions;
|
||||
_errorRecorder = errorRecorder;
|
||||
}
|
||||
|
||||
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context,CancellationToken cancellationToken)
|
||||
private int _runingTaskCount;
|
||||
public int RuningTaskCount
|
||||
{
|
||||
_logger.LogInformation("***** Mysql output service started *****");
|
||||
_taskManager.CreateTasks(async () =>
|
||||
get => _runingTaskCount;
|
||||
}
|
||||
public void DoTask() => Interlocked.Increment(ref _runingTaskCount);
|
||||
public void FinishTask() => Interlocked.Decrement(ref _runingTaskCount);
|
||||
public void ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
|
||||
{
|
||||
if (context.IsTransformCompleted == false && consumerQueue.Count < tasksOptions.OutPutOptions.FlushCount) return;
|
||||
if (RuningTaskCount >= tasksOptions.OutPutOptions.OutPutTaskCount ) return;
|
||||
var records = new List<DataRecord>();
|
||||
|
||||
for (int i = 0; i < tasksOptions.OutPutOptions.FlushCount; i++)
|
||||
{
|
||||
var records = new List<DataRecord>();
|
||||
while (!context.IsTransformCompleted || consumerQueue.Count > 0)
|
||||
{
|
||||
if (!consumerQueue.TryDequeue(out var record)) continue;
|
||||
records.Add(record);
|
||||
//_logger.LogInformation(@"*****OutputCount: {count} *****",count);
|
||||
if (records.Count >= tasksOptions.OutPutOptions.FlushCount)
|
||||
{
|
||||
await FlushAsync(records);
|
||||
records.Clear();
|
||||
}
|
||||
if (_context.GetExceptions().Count>0)
|
||||
{
|
||||
_logger.LogInformation("***** Csv output thread is canceled *****");
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (records.Count > 0)
|
||||
if (consumerQueue.TryDequeue(out var record)) records.Add(record);
|
||||
else break;
|
||||
}
|
||||
if (records.Count > 0)
|
||||
{
|
||||
ThreadPool.QueueUserWorkItem(async (queueState) =>
|
||||
{
|
||||
DoTask();
|
||||
await FlushAsync(records);
|
||||
records.Clear();
|
||||
_logger.LogInformation("***** Mysql output thread completed *****");
|
||||
}
|
||||
}, tasksOptions.OutPutOptions.OutPutTaskCount);
|
||||
|
||||
await _taskManager.WaitAll();
|
||||
//_context.CompleteOutput();
|
||||
_logger.LogInformation(@"***** Mysql output service completed *****");
|
||||
|
||||
FinishTask();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async Task FlushAsync(IEnumerable<DataRecord> records)
|
||||
|
@ -1,83 +0,0 @@
|
||||
using ConsoleApp2.HostedServices.Abstractions;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using ConsoleApp2.Const;
|
||||
using ConsoleApp2.Options;
|
||||
using ConsoleApp2.Services;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using System.Reflection.PortableExecutable;
|
||||
using System.Collections.Concurrent;
|
||||
using ConsoleApp2.SimulationService;
|
||||
|
||||
namespace ConsoleApp2.HostedServices
|
||||
{
|
||||
public class TestInputService : IInputService
|
||||
{
|
||||
private readonly ILogger _logger;
|
||||
private readonly IOptions<CsvOptions> _csvOptions;
|
||||
private readonly DataRecordQueue _producerQueue;
|
||||
private readonly ProcessContext _context;
|
||||
public TestInputService(ILogger<TestInputService> logger,
|
||||
IOptions<CsvOptions> csvOptions,
|
||||
[FromKeyedServices(ProcessStep.Producer)] DataRecordQueue producerQueue,
|
||||
ProcessContext context)
|
||||
{
|
||||
_logger = logger;
|
||||
_csvOptions = csvOptions;
|
||||
_producerQueue = producerQueue;
|
||||
_context = context;
|
||||
}
|
||||
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, ProcessContext context, CancellationToken cancellationToken)
|
||||
{
|
||||
var tableName = "order_item";
|
||||
var headers = new string[] { "ID","OrderNo","ItemNo","ItemType","RoomID","BoxID","DataID","PlanID","PackageID","Num","CompanyID","ShardKey" };
|
||||
var dataCount = 1200000000L;
|
||||
var tempCount = 80000;
|
||||
var tempRecords=new List<DataRecord>();
|
||||
var comanyID = 1;
|
||||
short[] shareKeys = { 23040, 23070, 23100, 24000, 24040, 24070, 24100, 25000, 25040, 25070, 25100 };
|
||||
int[] companyIds = { 1, 2, 3, 4 };
|
||||
var sk = shareKeys.First();
|
||||
var companyID = companyIds.First();
|
||||
|
||||
var shareKeyInterval = 20000;
|
||||
var getShareKeyTimes = 0;
|
||||
var getCompanyIDTimes = 0;
|
||||
var shareKeyIntervalCount = 0;
|
||||
for (long i = 1; i <= dataCount; i++)
|
||||
{
|
||||
shareKeyIntervalCount++;
|
||||
if (shareKeyIntervalCount > shareKeyInterval) {
|
||||
sk=DataHelper.GetShareKey(getShareKeyTimes);
|
||||
getShareKeyTimes++;
|
||||
shareKeyIntervalCount = 0;
|
||||
}
|
||||
var fields = new string[] { i.ToString(), "20220104020855", (220105981029 + i).ToString(), "1", "482278", "482279", "3768774", "0", "0", "1", companyID.ToString(), sk.ToString() };
|
||||
var record = new DataRecord(fields, tableName, headers, comanyID);
|
||||
tempRecords.Add(record);
|
||||
if (tempRecords.Count >= tempCount)
|
||||
{
|
||||
foreach (var rc in tempRecords)
|
||||
{
|
||||
_context.AddInput();
|
||||
_producerQueue.Enqueue(rc);
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
return;
|
||||
}
|
||||
tempRecords.Clear();
|
||||
companyID = DataHelper. GetCompanyId(getCompanyIDTimes);
|
||||
getCompanyIDTimes++;
|
||||
}
|
||||
}
|
||||
|
||||
_context.CompleteInput();
|
||||
_logger.LogInformation("***** Csv input service completed *****");
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -19,65 +19,70 @@ public class TransformService : ITransformService
|
||||
private readonly IOptions<DataTransformOptions> _options;
|
||||
private readonly ProcessContext _context;
|
||||
private readonly IDistributedCache _cache;
|
||||
private readonly TaskManager _taskManager;
|
||||
|
||||
|
||||
public TransformService(ILogger<TransformService> logger,
|
||||
public TransformService(ILogger<TransformService> logger,
|
||||
IOptions<DataTransformOptions> options,
|
||||
ProcessContext context,
|
||||
IDistributedCache cache,
|
||||
TaskManager taskManager)
|
||||
IDistributedCache cache)
|
||||
{
|
||||
_logger = logger;
|
||||
_options = options;
|
||||
_context = context;
|
||||
_cache = cache;
|
||||
_taskManager = taskManager;
|
||||
}
|
||||
|
||||
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
|
||||
{
|
||||
_logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
|
||||
|
||||
_taskManager.CreateTasks(async () =>
|
||||
while ((!context.IsInputCompleted || producerQueue.Count > 0))
|
||||
{
|
||||
while ((!context.IsInputCompleted || producerQueue.Count > 0))
|
||||
if (_context.GetExceptions().Count > 0)
|
||||
{
|
||||
if (_context.GetExceptions().Count > 0)
|
||||
{
|
||||
_logger.LogInformation("***** Csv transform service is canceled *****");
|
||||
return;
|
||||
}
|
||||
if (!producerQueue.TryDequeue(out var record)) continue;
|
||||
_logger.LogInformation("***** Csv transform service is canceled *****");
|
||||
return;
|
||||
}
|
||||
if (!producerQueue.TryDequeue(out var record)) continue;
|
||||
|
||||
//过滤不要的record
|
||||
if (await _options.Value.RecordFilter?.Invoke(record, _cache) == false) continue;
|
||||
record.Database = _options.Value.DatabaseFilter?.Invoke(record);
|
||||
//修改record
|
||||
_options.Value.RecordModify?.Invoke(record);
|
||||
//缓存record
|
||||
await _options.Value.RecordCache?.Invoke(record, _cache);
|
||||
//替换record
|
||||
var replaceRecord = await _options.Value.RecordReplace?.Invoke(record, _cache);
|
||||
if (replaceRecord != null)
|
||||
//过滤不要的record
|
||||
|
||||
if (_options.Value.RecordFilter != null)
|
||||
{
|
||||
var result = await _options.Value.RecordFilter.Invoke(record, _cache);
|
||||
if (result == false) continue;
|
||||
}
|
||||
record.Database = _options.Value.DatabaseFilter?.Invoke(record);
|
||||
//修改record
|
||||
_options.Value.RecordModify?.Invoke(record);
|
||||
//缓存record
|
||||
if (_options.Value.RecordCache != null)
|
||||
{
|
||||
await _options.Value.RecordCache.Invoke(record, _cache);
|
||||
}
|
||||
//替换record
|
||||
if (_options.Value.RecordReplace != null)
|
||||
{
|
||||
var result = await _options.Value.RecordReplace.Invoke(record, _cache);
|
||||
if (result != null)
|
||||
{
|
||||
record = replaceRecord;
|
||||
}
|
||||
consumerQueue.Enqueue(record);
|
||||
_context.AddTransform();
|
||||
//数据增加
|
||||
var addRecords = _options.Value.RecordAdd?.Invoke(record);
|
||||
if (addRecords != null && addRecords.Count > 0)
|
||||
{
|
||||
foreach (var rc in addRecords)
|
||||
{
|
||||
consumerQueue.Enqueue(rc);
|
||||
_context.AddTransform();
|
||||
}
|
||||
record = result;
|
||||
}
|
||||
}
|
||||
context.CompleteTransform();
|
||||
},tasksOptions.TransformTaskCount,cancellationToken);
|
||||
consumerQueue.Enqueue(record);
|
||||
_context.AddTransform();
|
||||
//数据增加
|
||||
var addRecords = _options.Value.RecordAdd?.Invoke(record);
|
||||
if (addRecords != null && addRecords.Count > 0)
|
||||
{
|
||||
foreach (var rc in addRecords)
|
||||
{
|
||||
consumerQueue.Enqueue(rc);
|
||||
_context.AddTransform();
|
||||
}
|
||||
}
|
||||
}
|
||||
context.CompleteTransform();
|
||||
|
||||
_logger.LogInformation("***** Data transformation service completed *****");
|
||||
}
|
||||
}
|
@ -23,7 +23,7 @@ public class VoidOutputService : IOutputService
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
|
||||
public void ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
|
||||
{
|
||||
_logger.LogInformation("***** Void output service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
|
||||
while (!_context.IsTransformCompleted || _consumerQueue.Count > 0)
|
||||
@ -34,6 +34,5 @@ public class VoidOutputService : IOutputService
|
||||
|
||||
_context.CompleteOutput();
|
||||
_logger.LogInformation("***** Void output service completed *****");
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
@ -7,11 +7,8 @@ namespace ConsoleApp2.Options
|
||||
{
|
||||
|
||||
public string InputDir { get; set; } = "./MyDumper";
|
||||
|
||||
public int TaskCount { get; set; } = 16;
|
||||
|
||||
public int FlushCount { get; set; } = 20000;
|
||||
|
||||
public bool IsMock { get; set; } = false;
|
||||
public string NoFilterTables { get; set; }="";//不需要过滤的表列表
|
||||
public bool Isutf8mb4 { get; set; } = true;
|
||||
|
||||
public short OldestShardKey { get; set; } = 23010;
|
||||
|
@ -13,6 +13,7 @@ public enum ColumnType
|
||||
|
||||
public class DataTransformOptions
|
||||
{
|
||||
|
||||
public Func<DataRecord, string>? DatabaseFilter { get; set; }
|
||||
|
||||
public Func<string, string>? TransformBinary { get; set; }//Binary转字符串方法
|
||||
|
@ -19,6 +19,6 @@ namespace ConsoleApp2.Options
|
||||
public class OutPutOptions
|
||||
{
|
||||
public int FlushCount { get; set; } = 10000;
|
||||
public int OutPutTaskCount { get; set; } = 2;
|
||||
public int OutPutTaskCount { get; set; } = 1;
|
||||
}
|
||||
}
|
||||
|
@ -14,7 +14,6 @@ using Serilog;
|
||||
using Microsoft.Extensions.Caching.Distributed;
|
||||
using Serilog.Events;
|
||||
|
||||
|
||||
await RunProgram();
|
||||
return;
|
||||
|
||||
@ -57,53 +56,12 @@ async Task RunProgram()
|
||||
|
||||
//}, "请输入单次插入的行数(默认为20000):");
|
||||
|
||||
ThreadPool.SetMaxThreads(200, 200);
|
||||
ThreadPool.SetMaxThreads(8, 4);
|
||||
var host = Host.CreateApplicationBuilder(args);
|
||||
var commandOptions = host.Configuration.GetSection("CmdOptions").Get<CommandOptions>() ?? new CommandOptions();
|
||||
Console.WriteLine($"InputDir:{commandOptions?.InputDir}");
|
||||
|
||||
if (commandOptions == null) throw new ArgumentNullException("commandOptions is null");
|
||||
var oldestTime = DateTime.ParseExact(commandOptions.OldestTime, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo);
|
||||
//host.Services.Configure<InputTableOptions>(option =>
|
||||
//{
|
||||
// option.TableInfoConfig = new Dictionary<string, TableInfo>
|
||||
// {
|
||||
|
||||
// //order_block_plan_item从order_item表查询,然后程序插入
|
||||
// //order_package_item从order_item表查询,然后程序插入
|
||||
// //order_patch_detail生产没有这个表,不处理
|
||||
|
||||
|
||||
// {"machine",new TableInfo{SimulaRowCount=14655 }},
|
||||
// {"order",new TableInfo{SimulaRowCount=5019216 }},
|
||||
// {"order_block_plan",new TableInfo{SimulaRowCount=2725553 }},//CreateTime < 202301的删除
|
||||
// {"order_block_plan_result",new TableInfo{SimulaRowCount=1174096 }},
|
||||
// {"order_box_block",new TableInfo{SimulaRowCount=29755672 }},
|
||||
// {"order_data_block",new TableInfo{SimulaRowCount=731800334 }},
|
||||
// {"order_data_goods",new TableInfo{SimulaRowCount=25803671 }},
|
||||
// {"order_data_parts",new TableInfo{SimulaRowCount=468517543 }},
|
||||
// {"order_item",new TableInfo{SimulaRowCount=1345520079 }},
|
||||
// {"order_module",new TableInfo{SimulaRowCount=103325385 }},
|
||||
// {"order_module_extra",new TableInfo{SimulaRowCount=54361321 }},
|
||||
// {"order_module_item",new TableInfo{SimulaRowCount=69173339 }},
|
||||
// {"order_package",new TableInfo{SimulaRowCount=16196195 }},
|
||||
// {"order_process",new TableInfo{SimulaRowCount=3892685 }},//orderNo < 202301的
|
||||
// {"order_process_step",new TableInfo{SimulaRowCount=8050349 }},//orderNo < 202301的删除
|
||||
// {"order_process_step_item",new TableInfo{SimulaRowCount=14538058 }},//orderNo < 202301的删除
|
||||
// {"order_scrap_board",new TableInfo{SimulaRowCount=123998 }},
|
||||
// {"process_group",new TableInfo{SimulaRowCount=1253 }},
|
||||
// {"process_info",new TableInfo{SimulaRowCount=7839 }},
|
||||
// {"process_item_exp",new TableInfo{SimulaRowCount=28 }},
|
||||
// {"process_schdule_capacity",new TableInfo{SimulaRowCount=39736 }},
|
||||
// {"process_step_efficiency",new TableInfo{SimulaRowCount=8 }},
|
||||
// {"report_template",new TableInfo{SimulaRowCount=7337 }},
|
||||
// {"simple_package",new TableInfo{SimulaRowCount=130436 }},//orderNo < 202301的删除
|
||||
// {"simple_plan_order",new TableInfo{SimulaRowCount=351470 }},//CreateTime < 202301的删除
|
||||
// {"sys_config",new TableInfo{SimulaRowCount=2296 }},
|
||||
// {"work_calendar",new TableInfo{SimulaRowCount=11 }},
|
||||
// {"work_shift",new TableInfo{SimulaRowCount=59 }},
|
||||
// {"work_time",new TableInfo{SimulaRowCount=62 }},
|
||||
// };
|
||||
//});
|
||||
host.Services.Configure<CsvOptions>(option =>
|
||||
{
|
||||
option.Delimiter = ",";
|
||||
@ -122,13 +80,17 @@ async Task RunProgram()
|
||||
|
||||
host.Services.Configure<DataTransformOptions>(options =>
|
||||
{
|
||||
if (commandOptions.IsMock) return;
|
||||
options.DatabaseFilter = record => "cferp_test";
|
||||
|
||||
options.TransformBinary = field => commandOptions != null && commandOptions.Isutf8mb4 ? $"_utf8mb4 0x{field}" : $"0x{field}";
|
||||
var noFilterTables = commandOptions.NoFilterTables.Split(",");
|
||||
//数据过滤
|
||||
options.RecordFilter = async (record, cache) =>
|
||||
{
|
||||
//var index = Array.IndexOf(record.Headers, "ShardKey");
|
||||
|
||||
if (noFilterTables.Contains(record.TableName)) return true;
|
||||
|
||||
if (record.TryGetField("ShardKey", out var skStr))
|
||||
{
|
||||
short.TryParse(skStr, out var sk);
|
||||
@ -150,7 +112,7 @@ async Task RunProgram()
|
||||
if (dt < oldestTime) return false;
|
||||
|
||||
}
|
||||
catch (Exception ex)
|
||||
catch (Exception)
|
||||
{
|
||||
return false;//订单号转换失败,跳过
|
||||
|
||||
@ -176,20 +138,24 @@ async Task RunProgram()
|
||||
if (record.TableName == "order_process_step" || record.TableName == "order_process_step_item")
|
||||
{
|
||||
//如果缓存中不存在OrderProcessID,则丢弃
|
||||
//if(record.TryGetField("OrderProcessID",out var orderProcessID))
|
||||
|
||||
|
||||
var value = await cache.GetStringAsync(record.GetCacheKey("OrderProcessID"));
|
||||
if (string.IsNullOrEmpty(value))return false;
|
||||
if(record.TryGetField("OrderProcessID",out string orderProcessID))
|
||||
{
|
||||
var value = await cache.GetStringAsync($"order_process_{orderProcessID}");
|
||||
if (string.IsNullOrEmpty(value)) return false;
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
if (record.TableName == "order_block_plan_result" )
|
||||
{
|
||||
//如果缓存中不存在ID,则丢弃(ID 对应order_block_plan中的ID)
|
||||
|
||||
var value = await cache.GetStringAsync(record.GetCacheKey("ID"));
|
||||
if (record.TryGetField("ID", out string id))
|
||||
{
|
||||
var value = await cache.GetStringAsync($"order_block_plan_{id}");
|
||||
if (string.IsNullOrEmpty(value)) return false;
|
||||
|
||||
}
|
||||
}
|
||||
return true;
|
||||
|
||||
@ -292,7 +258,7 @@ async Task RunProgram()
|
||||
var headers = new List<string>(record.Headers);
|
||||
var fields =new List<string>(record.Fields);
|
||||
headers.Add("CompanyID");
|
||||
var companyidResult =await cache.GetStringAsync(record.GetCacheKey("ID"));
|
||||
var companyidResult =await cache.GetStringAsync($"order_block_plan_{id}");
|
||||
|
||||
_ = int.TryParse(companyidResult, out var companyid);
|
||||
fields.Add(companyid.ToString());
|
||||
@ -308,7 +274,7 @@ async Task RunProgram()
|
||||
var headers = new List<string>(record.Headers);
|
||||
var fields = new List<string>(record.Fields);
|
||||
headers.Add("CompanyID");
|
||||
var companyidResult = await cache.GetStringAsync(record.GetCacheKey("OrderNo"));
|
||||
var companyidResult = await cache.GetStringAsync($"order_{orderNo}");
|
||||
_ = int.TryParse(companyidResult, out var cpid);
|
||||
fields.Add(cpid.ToString());
|
||||
return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), cpid);
|
||||
@ -350,7 +316,7 @@ async Task RunProgram()
|
||||
{
|
||||
if (record.TryGetField("OrderProcessID",out var processID))
|
||||
{
|
||||
var shardKey =await cache.GetStringAsync(record.GetCacheKey("OrderProcessID"));
|
||||
var shardKey =await cache.GetStringAsync($"order_process_{processID}");
|
||||
var headers = new List<string>(record.Headers);
|
||||
var fields = new List<string>(record.Fields);
|
||||
headers.Add("ShardKey");
|
||||
@ -449,13 +415,14 @@ async Task RunProgram()
|
||||
// Password = "123456",
|
||||
// MaximumPoolSize = 50, // 这个值应当小于 max_connections
|
||||
//}.ConnectionString;
|
||||
options.ConnectionString = new MySqlConnectionStringBuilder(host.Configuration.GetConnectionString("MySqlMaster"))
|
||||
options.ConnectionString = new MySqlConnectionStringBuilder(host.Configuration.GetConnectionString("MySqlMaster")??"")
|
||||
{
|
||||
CharacterSet = "utf8",
|
||||
AllowUserVariables = true,
|
||||
IgnoreCommandTransaction = true,
|
||||
TreatTinyAsBoolean = false,
|
||||
MaximumPoolSize = 50
|
||||
MaximumPoolSize = 50,
|
||||
SslMode = MySqlSslMode.None,
|
||||
}.ConnectionString;
|
||||
});
|
||||
|
||||
@ -478,7 +445,8 @@ async Task RunProgram()
|
||||
|
||||
host.Services.AddHostedService<MainHostedService>();
|
||||
host.Services.AddHostedService<TaskMonitorService>();
|
||||
host.Services.AddSingleton<IInputService, InputService>();
|
||||
if(commandOptions.IsMock)host.Services.AddSingleton<IInputService,InputService>();
|
||||
else host.Services.AddSingleton<IInputService, InputService>();
|
||||
host.Services.AddSingleton<ITransformService, TransformService>();
|
||||
host.Services.AddSingleton<IOutputService, OutputService>();
|
||||
var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get<RedisCacheOptions>() ?? new RedisCacheOptions();
|
||||
|
@ -16,10 +16,10 @@ public class CsvSource:IDataSource
|
||||
//protected readonly StreamReader _reader;
|
||||
private readonly ILogger? _logger;
|
||||
protected readonly string _tableName;
|
||||
protected string? _sqlFilePath;
|
||||
protected string _sqlFilePath=string.Empty;
|
||||
protected readonly string? _sqlFileText;
|
||||
protected string[]? headers;
|
||||
protected string[]? csvFiles;
|
||||
protected string[] headers=Array.Empty<string>();
|
||||
protected string[] csvFiles = Array.Empty<string>();
|
||||
public string? CurrentRaw { get; protected set; }
|
||||
public string Delimiter { get; private set; }
|
||||
public char QuoteChar { get; private set; }
|
||||
@ -31,9 +31,6 @@ public class CsvSource:IDataSource
|
||||
_logger = logger;
|
||||
Delimiter = delimiter;
|
||||
QuoteChar = quoteChar;
|
||||
string pattern = $"^.*\\.{tableName}\\..*\\.sql$";
|
||||
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success);
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -129,9 +126,11 @@ public class CsvSource:IDataSource
|
||||
}
|
||||
public virtual async Task GetHeaderAndCsvFiles()
|
||||
{
|
||||
string pattern = $"^.*\\.{_tableName}\\..*\\.sql$";
|
||||
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success) ?? "";
|
||||
var text = await File.ReadAllTextAsync(_sqlFilePath);
|
||||
headers = await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
|
||||
csvFiles = await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
|
||||
headers = DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
|
||||
csvFiles = DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
|
||||
|
||||
}
|
||||
public virtual async Task DoEnqueue(Action<DataRecord> action)
|
||||
@ -140,9 +139,9 @@ public class CsvSource:IDataSource
|
||||
foreach (var file in csvFiles)
|
||||
{
|
||||
var filePath= Path.Combine(_inputDir, file);
|
||||
using (var fs = File.OpenRead(filePath))
|
||||
using var fs = File.OpenRead(filePath);
|
||||
{
|
||||
using (StreamReader sr = new StreamReader(fs))
|
||||
using StreamReader sr = new (fs);
|
||||
{
|
||||
while (!sr.EndOfStream)
|
||||
{
|
||||
@ -164,9 +163,9 @@ public class CsvSource:IDataSource
|
||||
if (file != null)
|
||||
{
|
||||
var filePath = Path.Combine(_inputDir, file);
|
||||
using (var fs = File.OpenRead(filePath))
|
||||
using var fs = File.OpenRead(filePath);
|
||||
{
|
||||
using (StreamReader sr = new StreamReader(fs))
|
||||
using StreamReader sr = new(fs);
|
||||
{
|
||||
var line = await sr.ReadLineAsync();
|
||||
var fields = ParseRow2(line, QuoteChar, Delimiter);
|
||||
|
@ -11,34 +11,31 @@ namespace ConsoleApp2.Services;
|
||||
[Obsolete]
|
||||
public class JsvSource:IDataSource
|
||||
{
|
||||
private readonly string _inputDir;
|
||||
private readonly JsvStringSerializer _jsv;
|
||||
private readonly StreamReader _reader;
|
||||
//private readonly string _inputDir;
|
||||
//private readonly JsvStringSerializer _jsv;
|
||||
//private readonly StreamReader? _reader;
|
||||
// ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable
|
||||
private readonly ILogger? _logger;
|
||||
private readonly string _tableName;
|
||||
//private readonly ILogger? _logger;
|
||||
//private readonly string _tableName;
|
||||
|
||||
public DataRecord Current { get; protected set; } = null!;
|
||||
public string[]? Headers { get; }
|
||||
public bool EndOfSource => _reader.EndOfStream;
|
||||
//public bool EndOfSource => _reader.EndOfStream;
|
||||
|
||||
public JsvSource(string inputDir,string tableName, ILogger? logger = null)
|
||||
{
|
||||
_inputDir = inputDir;
|
||||
_tableName = tableName;
|
||||
_jsv = new JsvStringSerializer();
|
||||
// _reader = new StreamReader(filePath);
|
||||
//Headers = headers;
|
||||
_logger = logger;
|
||||
// _logger?.LogInformation("Reading file: {FilePath}", filePath);
|
||||
//_tableName = DumpDataHelper.GetTableName(filePath);
|
||||
//_inputDir = inputDir;
|
||||
//_tableName = tableName;
|
||||
//_jsv = new JsvStringSerializer();
|
||||
//_logger = logger;
|
||||
}
|
||||
public async Task DoEnqueue(Action<DataRecord> action)
|
||||
public Task DoEnqueue(Action<DataRecord> action)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_reader.Dispose();
|
||||
// _reader?.Dispose();
|
||||
}
|
||||
}
|
@ -1,4 +1,6 @@
|
||||
namespace ConsoleApp2.Services;
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
namespace ConsoleApp2.Services;
|
||||
|
||||
/// <summary>
|
||||
/// 处理上下文类,标识处理进度
|
||||
@ -8,7 +10,7 @@ public class ProcessContext
|
||||
private int _inputCount;
|
||||
private int _transformCount;
|
||||
private int _outputCount;
|
||||
private IList<Exception> _exceptionList = new List<Exception>();
|
||||
private ConcurrentBag<Exception> _exceptionList = new ConcurrentBag<Exception>();
|
||||
public bool IsInputCompleted { get; private set; }
|
||||
public bool IsTransformCompleted { get; private set; }
|
||||
public bool IsOutputCompleted { get; private set; }
|
||||
@ -34,7 +36,7 @@ public class ProcessContext
|
||||
{
|
||||
_exceptionList.Add(ex);
|
||||
}
|
||||
public IList<Exception> GetExceptions()
|
||||
public ConcurrentBag<Exception> GetExceptions()
|
||||
{
|
||||
return _exceptionList;
|
||||
}
|
||||
|
@ -1,6 +1,5 @@
|
||||
using ConsoleApp2.Helpers;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using System.IO;
|
||||
using System.Text.RegularExpressions;
|
||||
using ZstdSharp;
|
||||
namespace ConsoleApp2.Services
|
||||
@ -10,22 +9,19 @@ namespace ConsoleApp2.Services
|
||||
public ZstSource(string inputDir, string tableName, string delimiter = ",", char quoteChar = '"',
|
||||
ILogger? logger = null) : base(inputDir, tableName, delimiter = ",", quoteChar = '"', logger = null)
|
||||
{
|
||||
//throw new Exception("aaa");
|
||||
string pattern = $"^.*\\.{tableName}\\..*\\.sql.zst$";
|
||||
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success);
|
||||
|
||||
}
|
||||
private async Task<string> DecompressFile(string filePath)
|
||||
private static async Task<string> DecompressFile(string filePath)
|
||||
{
|
||||
using (var input = File.OpenRead(filePath))
|
||||
using var input = File.OpenRead(filePath);
|
||||
{
|
||||
using (var decopress = new DecompressionStream(input))
|
||||
using var decopress = new DecompressionStream(input);
|
||||
{
|
||||
|
||||
var ms = new MemoryStream();
|
||||
decopress.CopyTo(ms);
|
||||
ms.Seek(0, SeekOrigin.Begin);
|
||||
StreamReader reader = new StreamReader(ms);
|
||||
StreamReader reader = new(ms);
|
||||
var text = await reader.ReadToEndAsync();
|
||||
return text;
|
||||
|
||||
@ -34,9 +30,11 @@ namespace ConsoleApp2.Services
|
||||
}
|
||||
public override async Task GetHeaderAndCsvFiles()
|
||||
{
|
||||
string pattern = $"^.*\\.{_tableName}\\..*\\.sql.zst$";
|
||||
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success) ?? "";
|
||||
var text = await DecompressFile(_sqlFilePath);
|
||||
headers=await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
|
||||
csvFiles=await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
|
||||
headers= DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
|
||||
csvFiles= DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
|
||||
|
||||
}
|
||||
public override async Task DoEnqueue(Action<DataRecord> action)
|
||||
@ -45,11 +43,11 @@ namespace ConsoleApp2.Services
|
||||
foreach (var file in csvFiles)
|
||||
{
|
||||
var filePath = Path.Combine(_inputDir, file);
|
||||
using (var input = File.OpenRead(filePath))
|
||||
using var input = File.OpenRead(filePath);
|
||||
{
|
||||
using (var decopress = new DecompressionStream(input))
|
||||
using var decopress = new DecompressionStream(input);
|
||||
{
|
||||
using( var reader = new StreamReader(decopress))
|
||||
using var reader = new StreamReader(decopress);
|
||||
{
|
||||
while (!reader.EndOfStream)
|
||||
{
|
||||
@ -67,15 +65,15 @@ namespace ConsoleApp2.Services
|
||||
public override async Task<DataRecord?> GetTestRecord()
|
||||
{
|
||||
await GetHeaderAndCsvFiles();
|
||||
var file = csvFiles.FirstOrDefault();
|
||||
var file = csvFiles?.FirstOrDefault();
|
||||
if (file != null)
|
||||
{
|
||||
var filePath = Path.Combine(_inputDir, file);
|
||||
using (var input = File.OpenRead(filePath))
|
||||
using var input = File.OpenRead(filePath);
|
||||
{
|
||||
using (var decopress = new DecompressionStream(input))
|
||||
using var decopress = new DecompressionStream(input);
|
||||
{
|
||||
using (var reader = new StreamReader(decopress))
|
||||
using var reader = new StreamReader(decopress);
|
||||
{
|
||||
var line = await reader.ReadLineAsync();
|
||||
var fields = ParseRow2(line, QuoteChar, Delimiter);
|
||||
@ -88,9 +86,5 @@ namespace ConsoleApp2.Services
|
||||
}
|
||||
return null;
|
||||
}
|
||||
public void Dispose()
|
||||
{
|
||||
//_reader.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -16,17 +16,15 @@ namespace ConsoleApp2.SimulationService
|
||||
{
|
||||
private readonly ILogger _logger;
|
||||
private readonly IOptions<DataInputOptions> _dataInputOptions;
|
||||
private readonly DataRecordQueue _producerQueue;
|
||||
|
||||
private readonly ProcessContext _context;
|
||||
|
||||
public SimulationInputService(ILogger<InputService> logger,
|
||||
IOptions<DataInputOptions> dataInputOptions,
|
||||
[FromKeyedServices(ProcessStep.Producer)] DataRecordQueue producerQueue,
|
||||
ProcessContext context)
|
||||
{
|
||||
_logger = logger;
|
||||
_dataInputOptions = dataInputOptions;
|
||||
_producerQueue = producerQueue;
|
||||
_context = context;
|
||||
}
|
||||
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, ProcessContext context, CancellationToken cancellationToken)
|
||||
@ -41,93 +39,97 @@ namespace ConsoleApp2.SimulationService
|
||||
}
|
||||
foreach (var tableName in tasksOptions.TableInfoConfig.Keys)
|
||||
{
|
||||
var dataCount = tasksOptions.TableInfoConfig[tableName].SimulaRowCount;//当前表要生成的总数据量
|
||||
var companyTotallCount = 1000;//当前表每个公司生成的总数据量
|
||||
var tempRecords = new List<DataRecord>();
|
||||
var sk = DataHelper.shareKeys.First();
|
||||
var companyID = DataHelper.companyIds.First();
|
||||
_logger.LogInformation("Working table: {tableName}", tableName);
|
||||
|
||||
var shareKeyInterval = 20000;//每个sharekey的数据量
|
||||
var getShareKeyTimes = 0;//sharekey生成的次数,每生成一次,改变sharekey的值
|
||||
var getCompanyIDTimes = 0;//公司生成的次数,每生成一次,改变companyID的值
|
||||
var shareKeyIntervalCount = 0;
|
||||
var dataCount = tasksOptions.TableInfoConfig[tableName].SimulaRowCount;//当前表要生成的总数据量
|
||||
var companyTotallCount = 1000;//当前表每个公司生成的总数据量
|
||||
var tempRecords = new List<DataRecord>();
|
||||
var sk = DataHelper.shareKeys.First();
|
||||
var companyID = DataHelper.companyIds.First();
|
||||
|
||||
var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName);
|
||||
var testRecord =await source.GetTestRecord();
|
||||
for (long i = 1; i <= dataCount; i++)
|
||||
var shareKeyInterval = 20000;//每个sharekey的数据量
|
||||
var getShareKeyTimes = 0;//sharekey生成的次数,每生成一次,改变sharekey的值
|
||||
var getCompanyIDTimes = 0;//公司生成的次数,每生成一次,改变companyID的值
|
||||
var shareKeyIntervalCount = 0;
|
||||
|
||||
var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName);
|
||||
if (source == null) throw new NullReferenceException($"create table source:{tableName} failed!");
|
||||
var testRecord = await source.GetTestRecord();
|
||||
if(testRecord == null) throw new NullReferenceException($"create testRecord failed, tableName:{tableName}");
|
||||
for (long i = 1; i <= dataCount; i++)
|
||||
{
|
||||
shareKeyIntervalCount++;
|
||||
if (shareKeyIntervalCount > shareKeyInterval)
|
||||
{
|
||||
shareKeyIntervalCount++;
|
||||
if (shareKeyIntervalCount > shareKeyInterval)
|
||||
sk = DataHelper.GetShareKey(getShareKeyTimes);
|
||||
getShareKeyTimes++;
|
||||
shareKeyIntervalCount = 0;
|
||||
}
|
||||
var fields = new string[testRecord.Fields.Length];
|
||||
Array.Copy(testRecord.Fields, fields, testRecord.Fields.Length);
|
||||
var record = new DataRecord(fields, testRecord.TableName, testRecord.Headers, companyID);
|
||||
//更新record的ID、OrderNo,ShardKey值
|
||||
if (record.Headers.Contains("ID"))
|
||||
{
|
||||
var index = Array.IndexOf(record.Headers, "ID");
|
||||
if (index > -1)
|
||||
{
|
||||
sk = DataHelper.GetShareKey(getShareKeyTimes);
|
||||
getShareKeyTimes++;
|
||||
shareKeyIntervalCount = 0;
|
||||
}
|
||||
var fields = new string[testRecord.Fields.Length];
|
||||
Array.Copy(testRecord.Fields, fields, testRecord.Fields.Length);
|
||||
var record = new DataRecord(fields, testRecord.TableName, testRecord.Headers, companyID);
|
||||
//更新record的ID、OrderNo,ShardKey值
|
||||
if (record.Headers.Contains("ID"))
|
||||
{
|
||||
var index = Array.IndexOf(record.Headers, "ID");
|
||||
if (index > -1)
|
||||
{
|
||||
record.Fields[index] = i.ToString();
|
||||
}
|
||||
}
|
||||
if (record.TableName == "order_box_block" && record.Headers.Contains("BoxID"))
|
||||
{
|
||||
var index = Array.IndexOf(record.Headers, "BoxID");
|
||||
if (index > -1)
|
||||
{
|
||||
record.Fields[index] = i.ToString();
|
||||
}
|
||||
}
|
||||
if ((record.TableName == "order_block_plan_item" || record.TableName == "order_package_item") && record.Headers.Contains("ItemID"))
|
||||
{
|
||||
var index = Array.IndexOf(record.Headers, "ItemID");
|
||||
if (index > -1)
|
||||
{
|
||||
record.Fields[index] = i.ToString();
|
||||
}
|
||||
}
|
||||
if (record.TableName == "order" && record.Headers.Contains("OrderNo"))
|
||||
{
|
||||
var index = Array.IndexOf(record.Headers, "OrderNo");
|
||||
if (index > -1)
|
||||
{
|
||||
record.Fields[index] = i.ToString();
|
||||
}
|
||||
}
|
||||
if (record.Headers.Contains("ShardKey"))
|
||||
{
|
||||
var index = Array.IndexOf(record.Headers, "ShardKey");
|
||||
if (index > -1)
|
||||
{
|
||||
record.Fields[index] = sk.ToString();
|
||||
}
|
||||
}
|
||||
tempRecords.Add(record);
|
||||
if (tempRecords.Count >= companyTotallCount || i >= dataCount - 1)
|
||||
{
|
||||
foreach (var rc in tempRecords)
|
||||
{
|
||||
_context.AddInput();
|
||||
_producerQueue.Enqueue(rc);
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
return;
|
||||
}
|
||||
tempRecords.Clear();
|
||||
companyID = DataHelper.GetCompanyId(getCompanyIDTimes);
|
||||
getCompanyIDTimes++;
|
||||
record.Fields[index] = i.ToString();
|
||||
}
|
||||
}
|
||||
_logger.LogInformation("table:'{tableName}' simulation input completed", tableName);
|
||||
if (record.TableName == "order_box_block" && record.Headers.Contains("BoxID"))
|
||||
{
|
||||
var index = Array.IndexOf(record.Headers, "BoxID");
|
||||
if (index > -1)
|
||||
{
|
||||
record.Fields[index] = i.ToString();
|
||||
}
|
||||
}
|
||||
if ((record.TableName == "order_block_plan_item" || record.TableName == "order_package_item") && record.Headers.Contains("ItemID"))
|
||||
{
|
||||
var index = Array.IndexOf(record.Headers, "ItemID");
|
||||
if (index > -1)
|
||||
{
|
||||
record.Fields[index] = i.ToString();
|
||||
}
|
||||
}
|
||||
if (record.TableName == "order" && record.Headers.Contains("OrderNo"))
|
||||
{
|
||||
var index = Array.IndexOf(record.Headers, "OrderNo");
|
||||
if (index > -1)
|
||||
{
|
||||
record.Fields[index] = i.ToString();
|
||||
}
|
||||
}
|
||||
if (record.Headers.Contains("ShardKey"))
|
||||
{
|
||||
var index = Array.IndexOf(record.Headers, "ShardKey");
|
||||
if (index > -1)
|
||||
{
|
||||
record.Fields[index] = sk.ToString();
|
||||
}
|
||||
}
|
||||
tempRecords.Add(record);
|
||||
if (tempRecords.Count >= companyTotallCount || i >= dataCount - 1)
|
||||
{
|
||||
foreach (var rc in tempRecords)
|
||||
{
|
||||
_context.AddInput();
|
||||
producerQueue.Enqueue(rc);
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
return;
|
||||
}
|
||||
tempRecords.Clear();
|
||||
companyID = DataHelper.GetCompanyId(getCompanyIDTimes);
|
||||
getCompanyIDTimes++;
|
||||
}
|
||||
}
|
||||
_logger.LogInformation("table:'{tableName}' simulation input completed", tableName);
|
||||
//}
|
||||
//_logger.LogInformation("File '{File}' input completed", Path.GetFileName(sqlPath));
|
||||
}
|
||||
|
||||
_context.CompleteInput();
|
||||
context.CompleteInput();
|
||||
_logger.LogInformation("***** Csv input service completed *****");
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user