整理代码

This commit is contained in:
lindj 2024-01-22 15:44:37 +08:00
parent 241f52e30f
commit 7e3690a325
17 changed files with 86 additions and 150 deletions

View File

@ -12,11 +12,9 @@ public static partial class DumpDataHelper
private static partial Regex MatchBrackets();
public static async Task<string[]> GetCsvHeadersFromSqlFileAsync(string txt)
public static string[] GetCsvHeadersFromSqlFileAsync(string txt)
{
//var txt = await File.ReadAllTextAsync(filePath);
var match = MatchBrackets().Match(txt);
return ParseHeader(match.ValueSpan);
}
@ -60,9 +58,8 @@ public static partial class DumpDataHelper
return filePath[(firstDotIdx+1)..secondDotIdx].ToString();
}
public static async Task<string[]> GetCsvFileNamesFromSqlFileAsync(string txt,Regex regex)
public static string[] GetCsvFileNamesFromSqlFileAsync(string txt,Regex regex)
{
//var txt = await File.ReadAllTextAsync(filePath);
var matches = regex.Matches(txt);
return matches.Select(match => match.ValueSpan[1..^1].ToString()).ToArray();
}

View File

@ -5,5 +5,5 @@ namespace ConsoleApp2.HostedServices.Abstractions;
public interface IOutputService
{
public Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken);
public void ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken);
}

View File

@ -42,14 +42,17 @@ public class InputService : IInputService
{
_logger.LogInformation("Working table: {tableName}", tableName);
var source = _dataInputOptions.Value.CreateSource?.Invoke(tableName);
await source.DoEnqueue((record) =>
if (source != null)
{
_context.AddInput();
producerQueue.Enqueue(record);
count++;
await source.DoEnqueue((record) =>
{
_context.AddInput();
producerQueue.Enqueue(record);
count++;
});
if (_context.GetExceptions().Count > 0)
});
}
if (!_context.GetExceptions().IsEmpty)
{
_logger.LogInformation("***** Csv input service is canceled *****");
return;

View File

@ -5,14 +5,15 @@ using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace ConsoleApp2.HostedServices;
public class MainHostedService : BackgroundService
public class MainHostedService : IHostedService
{
private readonly ILogger _logger;
private readonly IInputService _input;
private readonly ITransformService _transform;
private readonly IOutputService _output;
private readonly ProcessContext _context;
private readonly Timer? _bigTableTimer;
private readonly Timer? _smallTableTimer;
public MainHostedService(ILogger<MainHostedService> logger, IInputService input, ITransformService transform, IOutputService output, ProcessContext context)
{
_logger = logger;
@ -22,15 +23,15 @@ public class MainHostedService : BackgroundService
_context = context;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
public Task StartAsync(CancellationToken cancellationToken)
{
var taskFun = (TasksOptions taskOp, DataRecordQueue producerQueue, DataRecordQueue consumerQueue, ProcessContext context) =>
var taskFun = (TasksOptions taskOp, DataRecordQueue producerQueue, DataRecordQueue consumerQueue, ProcessContext context,Timer? timer) =>
{
var inputTask = Task.Factory.StartNew(async () =>
Task.Factory.StartNew(async () =>
{
try
{
await _input.ExecuteAsync(taskOp, producerQueue, context, stoppingToken);
await _input.ExecuteAsync(taskOp, producerQueue, context, cancellationToken);
}
catch (Exception ex)
{
@ -39,11 +40,11 @@ public class MainHostedService : BackgroundService
}
});
var transformTask = Task.Factory.StartNew(async () =>
Task.Factory.StartNew(async () =>
{
try
{
await _transform.ExecuteAsync(taskOp, producerQueue, consumerQueue, context, stoppingToken);
await _transform.ExecuteAsync(taskOp, producerQueue, consumerQueue, context, cancellationToken);
}
catch (Exception ex)
{
@ -52,11 +53,15 @@ public class MainHostedService : BackgroundService
}
});
var outputTask = Task.Factory.StartNew(async () =>
Task.Factory.StartNew(() =>
{
try
{
await _output.ExecuteAsync(taskOp, consumerQueue, context,stoppingToken);
timer = new Timer((object? state) =>
{
_output.ExecuteAsync(taskOp, consumerQueue, context, cancellationToken);
},null, TimeSpan.Zero,TimeSpan.FromSeconds(0.5));
}
catch (Exception ex)
{
@ -77,8 +82,8 @@ public class MainHostedService : BackgroundService
{"simple_plan_order",new TableInfo{SimulaRowCount=351470 }},//CreateTime < 202301的删除
};
var bigTableContext = new ProcessContext();
var bigTableOptions = new TasksOptions { TableInfoConfig = bigTablesDic, OutPutOptions = new OutPutOptions { FlushCount = 10000, OutPutTaskCount = 2 } };
taskFun(bigTableOptions,new DataRecordQueue(), new DataRecordQueue(), bigTableContext);
var bigTableOptions = new TasksOptions { TableInfoConfig = bigTablesDic, OutPutOptions = new OutPutOptions { FlushCount = 20000, OutPutTaskCount = 2 } };
taskFun(bigTableOptions, new DataRecordQueue(), new DataRecordQueue(), bigTableContext,_bigTableTimer);
var smallTablesDic = new Dictionary<string, TableInfo>
{
{"machine",new TableInfo{SimulaRowCount=14655 }},
@ -107,6 +112,14 @@ public class MainHostedService : BackgroundService
};
var smallTableContext = new ProcessContext();
taskFun(new TasksOptions { TableInfoConfig = smallTablesDic, OutPutOptions = new OutPutOptions { FlushCount = 20000, OutPutTaskCount = 4 } },
new DataRecordQueue(), new DataRecordQueue(), smallTableContext);
new DataRecordQueue(), new DataRecordQueue(), smallTableContext,_smallTableTimer);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
}

View File

@ -1,11 +1,9 @@
using ConsoleApp2.Const;

using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options;
using ConsoleApp2.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace ConsoleApp2.HostedServices;
/// <summary>
@ -34,42 +32,32 @@ public class OutputService : IOutputService
_transformOptions = transformOptions;
_errorRecorder = errorRecorder;
}
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
private int _runingTaskCount;
public int RuningTaskCount
{
_logger.LogInformation("***** Mysql output service started *****");
get => _runingTaskCount;
}
public void DoTask() => Interlocked.Increment(ref _runingTaskCount);
public void FinishTask() => Interlocked.Decrement(ref _runingTaskCount);
public void ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
{
if (context.IsTransformCompleted == false && consumerQueue.Count < tasksOptions.OutPutOptions.FlushCount) return;
if (RuningTaskCount >= tasksOptions.OutPutOptions.OutPutTaskCount ) return;
var records = new List<DataRecord>();
while (!context.IsTransformCompleted || consumerQueue.Count > 0)
{
if (!consumerQueue.TryDequeue(out var record)) continue;
records.Add(record);
if (records.Count >= tasksOptions.OutPutOptions.FlushCount)
{
var temp= new List<DataRecord>(records);
ThreadPool.QueueUserWorkItem(async (queueState) =>
{
await FlushAsync(temp);
});
records.Clear();
}
if (_context.GetExceptions().Count > 0)
{
_logger.LogInformation("***** Csv output thread is canceled *****");
return;
}
for (int i = 0; i < tasksOptions.OutPutOptions.FlushCount; i++)
{
if (consumerQueue.TryDequeue(out var record)) records.Add(record);
else break;
}
if (records.Count > 0)
{
var temp = new List<DataRecord>(records);
ThreadPool.QueueUserWorkItem(async (queueState) =>
{
await FlushAsync(temp);
DoTask();
await FlushAsync(records);
FinishTask();
});
records.Clear();
_logger.LogInformation("***** Mysql output thread completed *****");
}
}

View File

@ -19,20 +19,17 @@ public class TransformService : ITransformService
private readonly IOptions<DataTransformOptions> _options;
private readonly ProcessContext _context;
private readonly IDistributedCache _cache;
private readonly TaskManager _taskManager;
public TransformService(ILogger<TransformService> logger,
IOptions<DataTransformOptions> options,
ProcessContext context,
IDistributedCache cache,
TaskManager taskManager)
IDistributedCache cache)
{
_logger = logger;
_options = options;
_context = context;
_cache = cache;
_taskManager = taskManager;
}
public async Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue producerQueue, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)

View File

@ -23,7 +23,7 @@ public class VoidOutputService : IOutputService
_logger = logger;
}
public Task ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
public void ExecuteAsync(TasksOptions tasksOptions, DataRecordQueue consumerQueue, ProcessContext context, CancellationToken cancellationToken)
{
_logger.LogInformation("***** Void output service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId);
while (!_context.IsTransformCompleted || _consumerQueue.Count > 0)
@ -34,6 +34,5 @@ public class VoidOutputService : IOutputService
_context.CompleteOutput();
_logger.LogInformation("***** Void output service completed *****");
return Task.CompletedTask;
}
}

View File

@ -7,11 +7,8 @@ namespace ConsoleApp2.Options
{
public string InputDir { get; set; } = "./MyDumper";
public int TaskCount { get; set; } = 16;
public int FlushCount { get; set; } = 20000;
public bool IsMock { get; set; } = false;
public string NoFilterTables { get; set; }="";//不需要过滤的表列表
public bool Isutf8mb4 { get; set; } = true;
public short OldestShardKey { get; set; } = 23010;

View File

@ -13,6 +13,7 @@ public enum ColumnType
public class DataTransformOptions
{
public Func<DataRecord, string>? DatabaseFilter { get; set; }
public Func<string, string>? TransformBinary { get; set; }//Binary转字符串方法

View File

@ -19,6 +19,6 @@ namespace ConsoleApp2.Options
public class OutPutOptions
{
public int FlushCount { get; set; } = 10000;
public int OutPutTaskCount { get; set; } = 2;
public int OutPutTaskCount { get; set; } = 1;
}
}

View File

@ -62,47 +62,6 @@ async Task RunProgram()
Console.WriteLine($"InputDir:{commandOptions?.InputDir}");
var oldestTime = DateTime.ParseExact(commandOptions.OldestTime, "yyyyMM", System.Globalization.DateTimeFormatInfo.InvariantInfo);
//host.Services.Configure<InputTableOptions>(option =>
//{
// option.TableInfoConfig = new Dictionary<string, TableInfo>
// {
// //order_block_plan_item从order_item表查询然后程序插入
// //order_package_item从order_item表查询然后程序插入
// //order_patch_detail生产没有这个表不处理
// {"machine",new TableInfo{SimulaRowCount=14655 }},
// {"order",new TableInfo{SimulaRowCount=5019216 }},
// {"order_block_plan",new TableInfo{SimulaRowCount=2725553 }},//CreateTime < 202301的删除
// {"order_block_plan_result",new TableInfo{SimulaRowCount=1174096 }},
// {"order_box_block",new TableInfo{SimulaRowCount=29755672 }},
// {"order_data_block",new TableInfo{SimulaRowCount=731800334 }},
// {"order_data_goods",new TableInfo{SimulaRowCount=25803671 }},
// {"order_data_parts",new TableInfo{SimulaRowCount=468517543 }},
// {"order_item",new TableInfo{SimulaRowCount=1345520079 }},
// {"order_module",new TableInfo{SimulaRowCount=103325385 }},
// {"order_module_extra",new TableInfo{SimulaRowCount=54361321 }},
// {"order_module_item",new TableInfo{SimulaRowCount=69173339 }},
// {"order_package",new TableInfo{SimulaRowCount=16196195 }},
// {"order_process",new TableInfo{SimulaRowCount=3892685 }},//orderNo < 202301的
// {"order_process_step",new TableInfo{SimulaRowCount=8050349 }},//orderNo < 202301的删除
// {"order_process_step_item",new TableInfo{SimulaRowCount=14538058 }},//orderNo < 202301的删除
// {"order_scrap_board",new TableInfo{SimulaRowCount=123998 }},
// {"process_group",new TableInfo{SimulaRowCount=1253 }},
// {"process_info",new TableInfo{SimulaRowCount=7839 }},
// {"process_item_exp",new TableInfo{SimulaRowCount=28 }},
// {"process_schdule_capacity",new TableInfo{SimulaRowCount=39736 }},
// {"process_step_efficiency",new TableInfo{SimulaRowCount=8 }},
// {"report_template",new TableInfo{SimulaRowCount=7337 }},
// {"simple_package",new TableInfo{SimulaRowCount=130436 }},//orderNo < 202301的删除
// {"simple_plan_order",new TableInfo{SimulaRowCount=351470 }},//CreateTime < 202301的删除
// {"sys_config",new TableInfo{SimulaRowCount=2296 }},
// {"work_calendar",new TableInfo{SimulaRowCount=11 }},
// {"work_shift",new TableInfo{SimulaRowCount=59 }},
// {"work_time",new TableInfo{SimulaRowCount=62 }},
// };
//});
host.Services.Configure<CsvOptions>(option =>
{
option.Delimiter = ",";
@ -121,13 +80,17 @@ async Task RunProgram()
host.Services.Configure<DataTransformOptions>(options =>
{
if (commandOptions.IsMock) return;
options.DatabaseFilter = record => "cferp_test";
options.TransformBinary = field => commandOptions != null && commandOptions.Isutf8mb4 ? $"_utf8mb4 0x{field}" : $"0x{field}";
var noFilterTables = commandOptions.NoFilterTables.Split(",");
//数据过滤
options.RecordFilter = async (record, cache) =>
{
//var index = Array.IndexOf(record.Headers, "ShardKey");
if (noFilterTables.Contains(record.TableName)) return true;
if (record.TryGetField("ShardKey", out var skStr))
{
short.TryParse(skStr, out var sk);
@ -149,7 +112,7 @@ async Task RunProgram()
if (dt < oldestTime) return false;
}
catch (Exception ex)
catch (Exception)
{
return false;//订单号转换失败,跳过
@ -175,9 +138,7 @@ async Task RunProgram()
if (record.TableName == "order_process_step" || record.TableName == "order_process_step_item")
{
//如果缓存中不存在OrderProcessID,则丢弃
//if(record.TryGetField("OrderProcessID",out var orderProcessID))
if(record.TryGetField("OrderProcessID",out string orderProcessID))
{
var value = await cache.GetStringAsync($"order_process_{orderProcessID}");
@ -195,8 +156,6 @@ async Task RunProgram()
var value = await cache.GetStringAsync($"order_block_plan_{id}");
if (string.IsNullOrEmpty(value)) return false;
}
}
return true;
@ -486,7 +445,7 @@ async Task RunProgram()
host.Services.AddHostedService<MainHostedService>();
host.Services.AddHostedService<TaskMonitorService>();
host.Services.AddSingleton<IInputService,InputService>();
host.Services.AddSingleton<IInputService,SimulationInputService>();
host.Services.AddSingleton<ITransformService, TransformService>();
host.Services.AddSingleton<IOutputService, OutputService>();
var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get<RedisCacheOptions>() ?? new RedisCacheOptions();

View File

@ -130,8 +130,8 @@ public class CsvSource:IDataSource
public virtual async Task GetHeaderAndCsvFiles()
{
var text = await File.ReadAllTextAsync(_sqlFilePath);
headers = await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
csvFiles = await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
headers = DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
csvFiles = DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
}
public virtual async Task DoEnqueue(Action<DataRecord> action)
@ -140,9 +140,9 @@ public class CsvSource:IDataSource
foreach (var file in csvFiles)
{
var filePath= Path.Combine(_inputDir, file);
using (var fs = File.OpenRead(filePath))
using var fs = File.OpenRead(filePath);
{
using (StreamReader sr = new StreamReader(fs))
using StreamReader sr = new (fs);
{
while (!sr.EndOfStream)
{
@ -164,9 +164,9 @@ public class CsvSource:IDataSource
if (file != null)
{
var filePath = Path.Combine(_inputDir, file);
using (var fs = File.OpenRead(filePath))
using var fs = File.OpenRead(filePath);
{
using (StreamReader sr = new StreamReader(fs))
using StreamReader sr = new(fs);
{
var line = await sr.ReadLineAsync();
var fields = ParseRow2(line, QuoteChar, Delimiter);

View File

@ -27,14 +27,11 @@ public class JsvSource:IDataSource
_inputDir = inputDir;
_tableName = tableName;
_jsv = new JsvStringSerializer();
// _reader = new StreamReader(filePath);
//Headers = headers;
_logger = logger;
// _logger?.LogInformation("Reading file: {FilePath}", filePath);
//_tableName = DumpDataHelper.GetTableName(filePath);
}
public async Task DoEnqueue(Action<DataRecord> action)
public Task DoEnqueue(Action<DataRecord> action)
{
return Task.CompletedTask;
}
public void Dispose()

View File

@ -109,10 +109,6 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
var sb = new StringBuilder();
foreach (var (tableName, records) in tableRecords)
{
if (tableName == "order_process_step")
{
var a = 1;
}
if (records.Count == 0)
continue;

View File

@ -10,7 +10,6 @@ public class ProcessContext
private int _inputCount;
private int _transformCount;
private int _outputCount;
private int _finishedTaskCount;
private ConcurrentBag<Exception> _exceptionList = new ConcurrentBag<Exception>();
public bool IsInputCompleted { get; private set; }
public bool IsTransformCompleted { get; private set; }

View File

@ -1,6 +1,5 @@
using ConsoleApp2.Helpers;
using Microsoft.Extensions.Logging;
using System.IO;
using System.Text.RegularExpressions;
using ZstdSharp;
namespace ConsoleApp2.Services
@ -10,22 +9,21 @@ namespace ConsoleApp2.Services
public ZstSource(string inputDir, string tableName, string delimiter = ",", char quoteChar = '"',
ILogger? logger = null) : base(inputDir, tableName, delimiter = ",", quoteChar = '"', logger = null)
{
//throw new Exception("aaa");
string pattern = $"^.*\\.{tableName}\\..*\\.sql.zst$";
_sqlFilePath = Directory.GetFiles(_inputDir).FirstOrDefault(s => Regex.Match(s, pattern).Success);
}
private async Task<string> DecompressFile(string filePath)
private static async Task<string> DecompressFile(string filePath)
{
using (var input = File.OpenRead(filePath))
using var input = File.OpenRead(filePath);
{
using (var decopress = new DecompressionStream(input))
using var decopress = new DecompressionStream(input);
{
var ms = new MemoryStream();
decopress.CopyTo(ms);
ms.Seek(0, SeekOrigin.Begin);
StreamReader reader = new StreamReader(ms);
StreamReader reader = new(ms);
var text = await reader.ReadToEndAsync();
return text;
@ -35,8 +33,8 @@ namespace ConsoleApp2.Services
public override async Task GetHeaderAndCsvFiles()
{
var text = await DecompressFile(_sqlFilePath);
headers=await DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
csvFiles=await DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
headers= DumpDataHelper.GetCsvHeadersFromSqlFileAsync(text);
csvFiles= DumpDataHelper.GetCsvFileNamesFromSqlFileAsync(text, new Regex(@"'.+\.dat.zst'"));
}
public override async Task DoEnqueue(Action<DataRecord> action)
@ -45,11 +43,11 @@ namespace ConsoleApp2.Services
foreach (var file in csvFiles)
{
var filePath = Path.Combine(_inputDir, file);
using (var input = File.OpenRead(filePath))
using var input = File.OpenRead(filePath);
{
using (var decopress = new DecompressionStream(input))
using var decopress = new DecompressionStream(input);
{
using( var reader = new StreamReader(decopress))
using var reader = new StreamReader(decopress);
{
while (!reader.EndOfStream)
{
@ -88,9 +86,5 @@ namespace ConsoleApp2.Services
}
return null;
}
public void Dispose()
{
//_reader.Dispose();
}
}
}

View File

@ -113,10 +113,6 @@ namespace ConsoleApp2.SimulationService
foreach (var rc in tempRecords)
{
_context.AddInput();
if(_context.InputCount== 2000000)
{
var a = 1;
}
producerQueue.Enqueue(rc);
if (cancellationToken.IsCancellationRequested)
return;