Compare commits
No commits in common. "77a3909160bdbd77bb429aa940f7395db5cb82d2" and "b20c56640f32edc412918fa6604d0709caad1e7d" have entirely different histories.
77a3909160
...
b20c56640f
@ -114,7 +114,6 @@ public class MainHostedService : BackgroundService
|
|||||||
{
|
{
|
||||||
var connStr = _databaseOptions.Value.ConnectionString
|
var connStr = _databaseOptions.Value.ConnectionString
|
||||||
?? throw new ApplicationException("分库配置中没有配置数据库");
|
?? throw new ApplicationException("分库配置中没有配置数据库");
|
||||||
_logger.LogWarning("已开启MySQL延迟写入功能并禁用重做日志,请注意数据安全");
|
|
||||||
if (enable)
|
if (enable)
|
||||||
{
|
{
|
||||||
await DatabaseHelper.NonQueryAsync(connStr,
|
await DatabaseHelper.NonQueryAsync(connStr,
|
||||||
|
@ -56,18 +56,7 @@ public class OutputService : IOutputService
|
|||||||
if (!dbTasks.ContainsKey(db))
|
if (!dbTasks.ContainsKey(db))
|
||||||
{
|
{
|
||||||
dbTasks.Add(db, await dbTaskManager.CreateTaskAsync(
|
dbTasks.Add(db, await dbTaskManager.CreateTaskAsync(
|
||||||
async () =>
|
async () => await StartDatabaseWorker(db, queue, ct), ct));
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await StartDatabaseWorker(db, queue, ct);
|
|
||||||
}
|
|
||||||
catch (Exception e)
|
|
||||||
{
|
|
||||||
_logger.LogError(e, "输出线程发生错误");
|
|
||||||
_queuePool.RemoveQueue(db);
|
|
||||||
}
|
|
||||||
}, ct));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -164,6 +153,6 @@ public class OutputService : IOutputService
|
|||||||
_context.AddOutput(value);
|
_context.AddOutput(value);
|
||||||
_context.AddTableOutput(key, value);
|
_context.AddTableOutput(key, value);
|
||||||
}
|
}
|
||||||
// _logger.LogTrace("输出任务:刷新了 {Count} 条记录", tableOutput.Values.Sum(i => i));
|
_logger.LogTrace("输出任务:刷新了 {Count} 条记录", tableOutput.Values.Sum(i => i));
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -48,92 +48,26 @@ public class TransformService : ITransformService
|
|||||||
|
|
||||||
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("***** 数据转换服务已启动 *****");
|
_logger.LogInformation("***** 数据转换服务已启动, 当前线程ID: {ThreadId} *****", Environment.CurrentManagedThreadId);
|
||||||
|
|
||||||
|
// var tasks = new List<Task>();
|
||||||
|
// for (int i = 0; i < 4; i++)
|
||||||
|
// {
|
||||||
|
// tasks.Add(Task.Run(TransformWorker, cancellationToken));
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// await Task.WhenAll(tasks);
|
||||||
|
await TransformWorker();
|
||||||
|
|
||||||
await TransformWorker2();
|
|
||||||
|
|
||||||
_context.CompleteTransform();
|
|
||||||
_logger.LogInformation("***** 数据转换服务执行完毕 *****");
|
_logger.LogInformation("***** 数据转换服务执行完毕 *****");
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task TransformWorker(DataRecordQueue queue)
|
public async Task TransformWorker()
|
||||||
{
|
{
|
||||||
while (!_context.IsInputCompleted || _producerQueue.Count > 0)
|
while (!_context.IsInputCompleted || _producerQueue.Count > 0)
|
||||||
{
|
{
|
||||||
if (!_producerQueue.TryDequeue(out var record))
|
if (!_producerQueue.TryDequeue(out var record))
|
||||||
{
|
{
|
||||||
await Task.Delay(100);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var context = new DataTransformContext(record, _cache, _logger, _services);
|
|
||||||
if (_options.Value.EnableFilter)
|
|
||||||
{
|
|
||||||
// 数据过滤
|
|
||||||
var filter = _options.Value.RecordFilter;
|
|
||||||
if (filter is not null && await filter(context) == false) continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_options.Value.EnableReplacer)
|
|
||||||
{
|
|
||||||
// 数据替换
|
|
||||||
var replacer = _options.Value.RecordModify;
|
|
||||||
if (replacer is not null)
|
|
||||||
{
|
|
||||||
record = await replacer(context);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 字段缓存
|
|
||||||
var cacher = _options.Value.RecordCache;
|
|
||||||
if(cacher is not null)
|
|
||||||
await cacher.Invoke(context);
|
|
||||||
|
|
||||||
//计算需要分流的数据库
|
|
||||||
var dbFilter = _options.Value.DatabaseFilter
|
|
||||||
?? throw new ApplicationException("未配置数据库过滤器");
|
|
||||||
record.Database = dbFilter(record);
|
|
||||||
|
|
||||||
if (_options.Value.EnableReBuilder)
|
|
||||||
{
|
|
||||||
//数据重建
|
|
||||||
var addRecords = _options.Value.RecordReBuild?.Invoke(context);
|
|
||||||
if (addRecords is { Count: > 0 })
|
|
||||||
{
|
|
||||||
foreach (var rc in addRecords)
|
|
||||||
{
|
|
||||||
if(dbFilter is not null)
|
|
||||||
rc.Database = dbFilter.Invoke(record);
|
|
||||||
await queue.EnqueueAsync(rc);
|
|
||||||
_context.AddTransform();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
await queue.EnqueueAsync(record);
|
|
||||||
_context.AddTransform();
|
|
||||||
}
|
|
||||||
catch (Exception e)
|
|
||||||
{
|
|
||||||
_context.AddException(e);
|
|
||||||
var errorRecorder = _errorRecorderFactory.CreateTransform();
|
|
||||||
await errorRecorder.LogErrorRecordAsync(record, e);
|
|
||||||
if (!_options.Value.StrictMode)
|
|
||||||
_logger.LogError(e, "数据转换时发生错误");
|
|
||||||
else throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task TransformWorker2()
|
|
||||||
{
|
|
||||||
while (!_context.IsInputCompleted || _producerQueue.Count > 0)
|
|
||||||
{
|
|
||||||
if (!_producerQueue.TryDequeue(out var record))
|
|
||||||
{
|
|
||||||
await Task.Delay(100);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,8 +124,6 @@ async Task RunProgram()
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
#if USE_TEST_DB
|
|
||||||
// 测试环境的OrderExtra表没有分区,故按照SharedKey清理
|
|
||||||
// 清理ShardKey < 24010的
|
// 清理ShardKey < 24010的
|
||||||
case TableNames.OrderExtra:
|
case TableNames.OrderExtra:
|
||||||
{
|
{
|
||||||
@ -135,8 +133,7 @@ async Task RunProgram()
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
// 清理(Status != 0 || Deleted = 1) && ID前四位 < 2401的
|
// 清理(Status != 0 || Deleted = 1) && ID前四位 < 2401的
|
||||||
case TableNames.OrderScrapBoard:
|
case TableNames.OrderScrapBoard:
|
||||||
{
|
{
|
||||||
@ -217,6 +214,16 @@ async Task RunProgram()
|
|||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
// 移除ViewFileName列
|
||||||
|
case TableNames.OrderModule:
|
||||||
|
{
|
||||||
|
#if USE_TEST_DB
|
||||||
|
if (record.HeaderExists("ViewFileName"))
|
||||||
|
#endif
|
||||||
|
record.RemoveField("ViewFileName");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
#if USE_TEST_DB
|
||||||
// 删除ID列,让数据库自行递增
|
// 删除ID列,让数据库自行递增
|
||||||
// TODO: 数据表改进,删除ID列或是替换为流水号
|
// TODO: 数据表改进,删除ID列或是替换为流水号
|
||||||
case TableNames.ProcessStepEfficiency:
|
case TableNames.ProcessStepEfficiency:
|
||||||
@ -229,18 +236,17 @@ async Task RunProgram()
|
|||||||
record.RemoveField("ID");
|
record.RemoveField("ID");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TableNames.SysConfig:
|
|
||||||
{
|
|
||||||
record.RemoveField("Key");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
#if USE_TEST_DB
|
|
||||||
// 测试环境忽略PlaceData列,生产环境会提前将其移除
|
// 测试环境忽略PlaceData列,生产环境会提前将其移除
|
||||||
case TableNames.SimplePlanOrder:
|
case TableNames.SimplePlanOrder:
|
||||||
{
|
{
|
||||||
record.RemoveField("PlaceData");
|
record.RemoveField("PlaceData");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case TableNames.SysConfig:
|
||||||
|
{
|
||||||
|
record.RemoveField("Key");
|
||||||
|
break;
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
default: break;
|
default: break;
|
||||||
}
|
}
|
||||||
@ -342,15 +348,13 @@ async Task RunProgram()
|
|||||||
host.Services.AddLogging(builder =>
|
host.Services.AddLogging(builder =>
|
||||||
{
|
{
|
||||||
builder.ClearProviders();
|
builder.ClearProviders();
|
||||||
var logger = new LoggerConfiguration()
|
builder.AddSerilog(new LoggerConfiguration()
|
||||||
.MinimumLevel.Verbose()
|
|
||||||
.WriteTo.Console()
|
.WriteTo.Console()
|
||||||
.WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"),
|
.WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"),
|
||||||
restrictedToMinimumLevel: LogEventLevel.Error)
|
restrictedToMinimumLevel:LogEventLevel.Error)
|
||||||
// .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
|
// .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
|
||||||
.CreateLogger();
|
.CreateLogger()
|
||||||
builder.AddSerilog(logger);
|
);
|
||||||
Log.Logger = logger;
|
|
||||||
});
|
});
|
||||||
|
|
||||||
host.Services.AddDataSourceFactory();
|
host.Services.AddDataSourceFactory();
|
||||||
@ -368,7 +372,7 @@ async Task RunProgram()
|
|||||||
host.Services.AddHostedService<MainHostedService>();
|
host.Services.AddHostedService<MainHostedService>();
|
||||||
host.Services.AddSingleton<IInputService, FileInputService>();
|
host.Services.AddSingleton<IInputService, FileInputService>();
|
||||||
host.Services.AddSingleton<ITransformService, TransformService>();
|
host.Services.AddSingleton<ITransformService, TransformService>();
|
||||||
host.Services.AddSingleton<IOutputService, VoidOutputService>();
|
host.Services.AddSingleton<IOutputService, OutputService>();
|
||||||
host.Services.AddSingleton<TaskMonitorService>();
|
host.Services.AddSingleton<TaskMonitorService>();
|
||||||
// host.Services.AddRedisCache(redisOptions);
|
// host.Services.AddRedisCache(redisOptions);
|
||||||
host.Services.AddSingleton<ICacher, MemoryCache>();
|
host.Services.AddSingleton<ICacher, MemoryCache>();
|
||||||
|
@ -64,7 +64,7 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
|||||||
if (_recordCache.Count == 0)
|
if (_recordCache.Count == 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
await using var cmd = _conn.CreateCommand();
|
var cmd = _conn.CreateCommand();
|
||||||
cmd.CommandTimeout = 0;
|
cmd.CommandTimeout = 0;
|
||||||
|
|
||||||
try
|
try
|
||||||
@ -255,13 +255,11 @@ public partial class MySqlDestination : IDisposable, IAsyncDisposable
|
|||||||
{
|
{
|
||||||
_conn.Close();
|
_conn.Close();
|
||||||
_conn.Dispose();
|
_conn.Dispose();
|
||||||
_recordCache.Clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public async ValueTask DisposeAsync()
|
public async ValueTask DisposeAsync()
|
||||||
{
|
{
|
||||||
await _conn.CloseAsync();
|
await _conn.CloseAsync();
|
||||||
await _conn.DisposeAsync();
|
await _conn.DisposeAsync();
|
||||||
_recordCache.Clear();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,4 +1,3 @@
|
|||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using System.Runtime.CompilerServices;
|
using System.Runtime.CompilerServices;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
@ -12,7 +11,7 @@ namespace MesETL.App.Services.Seq;
|
|||||||
public class SeqService
|
public class SeqService
|
||||||
{
|
{
|
||||||
private readonly string _connectionString;
|
private readonly string _connectionString;
|
||||||
private readonly ConcurrentDictionary<SeqConfig, long> _cachedSequence;
|
private readonly Dictionary<SeqConfig, long> _cachedSequence;
|
||||||
|
|
||||||
public IReadOnlyDictionary<SeqConfig, long> CachedSequence => _cachedSequence;
|
public IReadOnlyDictionary<SeqConfig, long> CachedSequence => _cachedSequence;
|
||||||
|
|
||||||
@ -25,7 +24,7 @@ public class SeqService
|
|||||||
};
|
};
|
||||||
_connectionString = builder.ConnectionString;
|
_connectionString = builder.ConnectionString;
|
||||||
|
|
||||||
_cachedSequence = new ConcurrentDictionary<SeqConfig, long>();
|
_cachedSequence = new Dictionary<SeqConfig, long>();
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<long> UpdateSequenceID(string name,int step,long max,bool recycle, int add)
|
private async Task<long> UpdateSequenceID(string name,int step,long max,bool recycle, int add)
|
||||||
@ -100,9 +99,9 @@ public class SeqService
|
|||||||
/// 移除一个缓存的流水号
|
/// 移除一个缓存的流水号
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="config"></param>
|
/// <param name="config"></param>
|
||||||
public bool RemoveCachedSeq(SeqConfig config)
|
public void RemoveCachedSeq(SeqConfig config)
|
||||||
{
|
{
|
||||||
return _cachedSequence.Remove(config, out _);
|
_cachedSequence.Remove(config);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
using Serilog;
|
using ApplicationException = System.ApplicationException;
|
||||||
using ApplicationException = System.ApplicationException;
|
|
||||||
using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
|
using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
|
||||||
|
|
||||||
namespace MesETL.App.Services;
|
namespace MesETL.App.Services;
|
||||||
@ -38,8 +37,6 @@ public class TaskManager
|
|||||||
{
|
{
|
||||||
var task = Task.Run(async () =>
|
var task = Task.Run(async () =>
|
||||||
{
|
{
|
||||||
// Log.Logger.Verbose("[任务管理器] 新的任务已创建");
|
|
||||||
Interlocked.Increment(ref _runningTaskCount);
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await func();
|
await func();
|
||||||
@ -48,13 +45,13 @@ public class TaskManager
|
|||||||
catch(Exception ex)
|
catch(Exception ex)
|
||||||
{
|
{
|
||||||
OnException?.Invoke(ex);
|
OnException?.Invoke(ex);
|
||||||
Log.Logger.Error(ex, "[任务管理器] 执行任务时出错");
|
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
Interlocked.Decrement(ref _runningTaskCount);
|
Interlocked.Decrement(ref _runningTaskCount);
|
||||||
}
|
}
|
||||||
}, cancellationToken);
|
}, cancellationToken);
|
||||||
|
Interlocked.Increment(ref _runningTaskCount);
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,10 +59,8 @@ public class TaskManager
|
|||||||
{
|
{
|
||||||
var task = Task.Factory.StartNew(async obj => // 性能考虑,这个lambda中不要捕获任何外部变量!
|
var task = Task.Factory.StartNew(async obj => // 性能考虑,这个lambda中不要捕获任何外部变量!
|
||||||
{
|
{
|
||||||
// Log.Logger.Verbose("[任务管理器] 新的任务已创建");
|
|
||||||
if (obj is not Tuple<Func<object?, Task>, object?> tuple)
|
if (obj is not Tuple<Func<object?, Task>, object?> tuple)
|
||||||
throw new ApplicationException("这个异常不该出现");
|
throw new ApplicationException("这个异常不该出现");
|
||||||
Interlocked.Increment(ref _runningTaskCount);
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await tuple.Item1(tuple.Item2);
|
await tuple.Item1(tuple.Item2);
|
||||||
@ -74,13 +69,13 @@ public class TaskManager
|
|||||||
catch(Exception ex)
|
catch(Exception ex)
|
||||||
{
|
{
|
||||||
OnException?.Invoke(ex);
|
OnException?.Invoke(ex);
|
||||||
Log.Logger.Error(ex, "[任务管理器] 执行任务时出错");
|
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
Interlocked.Decrement(ref _runningTaskCount);
|
Interlocked.Decrement(ref _runningTaskCount);
|
||||||
}
|
}
|
||||||
}, Tuple.Create(func, arg), cancellationToken).Unwrap();
|
}, Tuple.Create(func, arg), cancellationToken).Unwrap();
|
||||||
|
Interlocked.Increment(ref _runningTaskCount);
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,10 +1,10 @@
|
|||||||
{
|
{
|
||||||
"MemoryThreshold": 6,
|
"MemoryThreshold": 8,
|
||||||
"GCIntervalMilliseconds": -1,
|
"GCIntervalMilliseconds": -1,
|
||||||
"UnsafeVariable": true,
|
"UnsafeVariable": false,
|
||||||
"Logging": {
|
"Logging": {
|
||||||
"LogLevel": {
|
"LogLevel": {
|
||||||
"Default": "Trace"
|
"Default": "Debug"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"Input":{
|
"Input":{
|
||||||
@ -33,8 +33,8 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"RecordQueue":{
|
"RecordQueue":{
|
||||||
"ProducerQueueLength": 20000, // 输入队列最大长度
|
"ProducerQueueLength": 50000, // 输入队列最大长度
|
||||||
"ConsumerQueueLength": 20000, // 每个输出队列最大长度
|
"ConsumerQueueLength": 10000, // 每个输出队列最大长度
|
||||||
"MaxByteCount": 3221225472 // 队列最大字节数
|
"MaxByteCount": 3221225472 // 队列最大字节数
|
||||||
},
|
},
|
||||||
"RedisCache": {
|
"RedisCache": {
|
||||||
@ -44,7 +44,7 @@
|
|||||||
"TenantDb": // 分库配置
|
"TenantDb": // 分库配置
|
||||||
{
|
{
|
||||||
"TenantKey" : "CompanyID",
|
"TenantKey" : "CompanyID",
|
||||||
"UseDbGroup": "mock",
|
"UseDbGroup": "prod",
|
||||||
"DbGroups": {
|
"DbGroups": {
|
||||||
"test": {
|
"test": {
|
||||||
"cferp_test_1": 1000,
|
"cferp_test_1": 1000,
|
||||||
@ -58,20 +58,6 @@
|
|||||||
"mesdb_4": 15000,
|
"mesdb_4": 15000,
|
||||||
"mesdb_5": 20000,
|
"mesdb_5": 20000,
|
||||||
"mesdb_6": 2147483647
|
"mesdb_6": 2147483647
|
||||||
},
|
|
||||||
"mock":{
|
|
||||||
"mesdb_1": 5000,
|
|
||||||
"mesdb_2": 10000,
|
|
||||||
"mesdb_3": 15000,
|
|
||||||
"mesdb_4": 20000,
|
|
||||||
"mesdb_5": 2147483647
|
|
||||||
},
|
|
||||||
"mock_void":{
|
|
||||||
"mesdb_1_void": 5000,
|
|
||||||
"mesdb_2_void": 10000,
|
|
||||||
"mesdb_3_void": 15000,
|
|
||||||
"mesdb_4_void": 20000,
|
|
||||||
"mesdb_5_void": 2147483647
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -177,6 +177,7 @@ public class DatabaseToolBox
|
|||||||
[InlineData("mesdb_3")]
|
[InlineData("mesdb_3")]
|
||||||
[InlineData("mesdb_4")]
|
[InlineData("mesdb_4")]
|
||||||
[InlineData("mesdb_5")]
|
[InlineData("mesdb_5")]
|
||||||
|
[InlineData("mesdb_6")]
|
||||||
public async Task TruncateAllTable(string database)
|
public async Task TruncateAllTable(string database)
|
||||||
{
|
{
|
||||||
var tables = await DatabaseHelper.QueryTableAsync(ConnStr,
|
var tables = await DatabaseHelper.QueryTableAsync(ConnStr,
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
using System.Runtime;
|
|
||||||
using MesETL.App.Const;
|
using MesETL.App.Const;
|
||||||
using MesETL.App.HostedServices.Abstractions;
|
using MesETL.App.HostedServices.Abstractions;
|
||||||
using MesETL.App.Services;
|
using MesETL.App.Services;
|
||||||
using Microsoft.Extensions.Configuration;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
@ -15,16 +13,14 @@ public class MockInputService : IInputService
|
|||||||
private readonly ProcessContext _context;
|
private readonly ProcessContext _context;
|
||||||
private readonly IOptions<MockInputOptions> _options;
|
private readonly IOptions<MockInputOptions> _options;
|
||||||
private readonly ILogger _logger;
|
private readonly ILogger _logger;
|
||||||
private readonly long _memoryThreshold;
|
|
||||||
|
|
||||||
public MockInputService([FromKeyedServices(ConstVar.Producer)]DataRecordQueue producerQueue, ProcessContext context, IOptions<MockInputOptions> options,
|
public MockInputService([FromKeyedServices(ConstVar.Producer)]DataRecordQueue producerQueue, ProcessContext context, IOptions<MockInputOptions> options,
|
||||||
ILogger<MockInputService> logger, IConfiguration configuration)
|
ILogger<MockInputService> logger)
|
||||||
{
|
{
|
||||||
_producerQueue = producerQueue;
|
_producerQueue = producerQueue;
|
||||||
_context = context;
|
_context = context;
|
||||||
_options = options;
|
_options = options;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_memoryThreshold = (long)(configuration.GetValue<double>("MemoryThreshold", 8) * 1024 * 1024 * 1024);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||||
@ -36,13 +32,6 @@ public class MockInputService : IInputService
|
|||||||
|
|
||||||
for (int i = 0; i < options.Amount; i++)
|
for (int i = 0; i < options.Amount; i++)
|
||||||
{
|
{
|
||||||
if (GC.GetTotalMemory(false) > _memoryThreshold)
|
|
||||||
{
|
|
||||||
_logger.LogWarning("内存使用率过高,暂缓输入");
|
|
||||||
GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
|
|
||||||
GC.Collect();
|
|
||||||
await Task.Delay(3000, cancellationToken);
|
|
||||||
}
|
|
||||||
var ctx = new TableMockContext()
|
var ctx = new TableMockContext()
|
||||||
{
|
{
|
||||||
Index = i,
|
Index = i,
|
||||||
|
File diff suppressed because one or more lines are too long
@ -27,23 +27,21 @@ async Task RunProgram()
|
|||||||
{
|
{
|
||||||
ThreadPool.SetMaxThreads(200, 200);
|
ThreadPool.SetMaxThreads(200, 200);
|
||||||
var host = Host.CreateApplicationBuilder(args);
|
var host = Host.CreateApplicationBuilder(args);
|
||||||
|
|
||||||
var inputOptions = host.Configuration.GetRequiredSection("Input").Get<DataInputOptions>()
|
var dbGroup = new Dictionary<string, int>()
|
||||||
?? throw new ApplicationException("缺少Input配置");
|
{
|
||||||
|
{ "mesdb_1", 5000 },
|
||||||
var transformOptions = host.Configuration.GetRequiredSection("Transform").Get<DataTransformOptions>()
|
{ "mesdb_2", 10000 },
|
||||||
?? throw new ApplicationException("缺少Transform配置");
|
{ "mesdb_3", 15000 },
|
||||||
|
{ "mesdb_4", 20000 },
|
||||||
var outputOptions = host.Configuration.GetRequiredSection("Output").Get<DatabaseOutputOptions>()
|
{ "mesdb_5", 2147483647 },
|
||||||
?? throw new ApplicationException("缺少Output配置");
|
};
|
||||||
|
|
||||||
var tenantDbSection = host.Configuration.GetRequiredSection("TenantDb");
|
|
||||||
var tenantDbOptions = new TenantDbOptions()
|
var tenantDbOptions = new TenantDbOptions()
|
||||||
{
|
{
|
||||||
TenantKey = tenantDbSection.GetValue<string>(nameof(TenantDbOptions.TenantKey)) ??
|
TenantKey = "CompanyID",
|
||||||
throw new ApplicationException("分库配置缺少分库键TenantKey"),
|
DbGroup = dbGroup,
|
||||||
UseDbGroup = tenantDbSection.GetValue<string>(nameof(TenantDbOptions.UseDbGroup)) ??
|
UseDbGroup = "Prod",
|
||||||
throw new ApplicationException("分库配置缺少使用分库组UseDbGroup")
|
|
||||||
};
|
};
|
||||||
host.Services.Configure<TenantDbOptions>(options =>
|
host.Services.Configure<TenantDbOptions>(options =>
|
||||||
{
|
{
|
||||||
@ -51,10 +49,7 @@ async Task RunProgram()
|
|||||||
options.DbGroup = tenantDbOptions.DbGroup;
|
options.DbGroup = tenantDbOptions.DbGroup;
|
||||||
options.UseDbGroup = tenantDbOptions.UseDbGroup;
|
options.UseDbGroup = tenantDbOptions.UseDbGroup;
|
||||||
});
|
});
|
||||||
tenantDbOptions.DbGroup = tenantDbSection.GetRequiredSection($"DbGroups:{tenantDbOptions.UseDbGroup}")
|
|
||||||
.Get<Dictionary<string, int>>()
|
|
||||||
?? throw new ApplicationException($"分库配置无法解析分库组{tenantDbOptions.UseDbGroup},请检查配置");
|
|
||||||
|
|
||||||
host.Services.Configure<MockInputOptions>(options =>
|
host.Services.Configure<MockInputOptions>(options =>
|
||||||
{
|
{
|
||||||
const float Multiplexer = 0.01F;
|
const float Multiplexer = 0.01F;
|
||||||
@ -211,11 +206,6 @@ async Task RunProgram()
|
|||||||
|
|
||||||
host.Services.Configure<DataTransformOptions>(options =>
|
host.Services.Configure<DataTransformOptions>(options =>
|
||||||
{
|
{
|
||||||
options.StrictMode = transformOptions.StrictMode;
|
|
||||||
options.EnableFilter = transformOptions.EnableFilter;
|
|
||||||
options.EnableReplacer = transformOptions.EnableReplacer;
|
|
||||||
options.EnableReBuilder = transformOptions.EnableReBuilder;
|
|
||||||
|
|
||||||
options.DatabaseFilter = record =>
|
options.DatabaseFilter = record =>
|
||||||
{
|
{
|
||||||
var companyId = int.Parse(record[tenantDbOptions.TenantKey]); // 每个实体都应存在CompanyID,否则异常
|
var companyId = int.Parse(record[tenantDbOptions.TenantKey]); // 每个实体都应存在CompanyID,否则异常
|
||||||
@ -225,16 +215,11 @@ async Task RunProgram()
|
|||||||
|
|
||||||
host.Services.Configure<DatabaseOutputOptions>(options =>
|
host.Services.Configure<DatabaseOutputOptions>(options =>
|
||||||
{
|
{
|
||||||
options.ConnectionString = outputOptions.ConnectionString;
|
options.ConnectionString = "Server=192.168.1.246;Port=3333;UserId=root;Password=123456;";
|
||||||
options.FlushCount = outputOptions.FlushCount;
|
options.FlushCount = 10000;
|
||||||
options.MaxAllowedPacket = outputOptions.MaxAllowedPacket / 2;
|
options.MaxAllowedPacket = 67108864;
|
||||||
options.MaxDatabaseOutputTask = outputOptions.MaxDatabaseOutputTask;
|
options.MaxDatabaseOutputTask = 4;
|
||||||
options.TreatJsonAsHex = outputOptions.TreatJsonAsHex;
|
|
||||||
options.NoOutput = outputOptions.NoOutput;
|
|
||||||
options.ForUpdate = outputOptions.ForUpdate;
|
|
||||||
|
|
||||||
// 配置列的类型以便于在输出时区分二进制内容
|
|
||||||
// Prod server
|
|
||||||
options.ColumnTypeConfig = new Dictionary<string, ColumnType>
|
options.ColumnTypeConfig = new Dictionary<string, ColumnType>
|
||||||
{
|
{
|
||||||
{ "machine.Settings", ColumnType.Text },
|
{ "machine.Settings", ColumnType.Text },
|
||||||
@ -268,15 +253,14 @@ async Task RunProgram()
|
|||||||
host.Services.AddLogging(builder =>
|
host.Services.AddLogging(builder =>
|
||||||
{
|
{
|
||||||
builder.ClearProviders();
|
builder.ClearProviders();
|
||||||
var logger = new LoggerConfiguration()
|
builder.AddSerilog(new LoggerConfiguration()
|
||||||
.MinimumLevel.Verbose()
|
.MinimumLevel.Debug()
|
||||||
.WriteTo.Console()
|
.WriteTo.Console()
|
||||||
.WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"),
|
.WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"),
|
||||||
restrictedToMinimumLevel: LogEventLevel.Error)
|
restrictedToMinimumLevel: LogEventLevel.Error)
|
||||||
// .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
|
// .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
|
||||||
.CreateLogger();
|
.CreateLogger()
|
||||||
builder.AddSerilog(logger);
|
);
|
||||||
Log.Logger = logger;
|
|
||||||
});
|
});
|
||||||
|
|
||||||
host.Services.AddDataSourceFactory();
|
host.Services.AddDataSourceFactory();
|
||||||
@ -287,7 +271,7 @@ async Task RunProgram()
|
|||||||
var consLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ConsumerQueueLength");
|
var consLen = host.Configuration.GetRequiredSection("RecordQueue").GetValue<int>("ConsumerQueueLength");
|
||||||
var maxCharCount = host.Configuration.GetRequiredSection("RecordQueue").GetValue<long>("MaxByteCount") / 2;
|
var maxCharCount = host.Configuration.GetRequiredSection("RecordQueue").GetValue<long>("MaxByteCount") / 2;
|
||||||
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer, new DataRecordQueue(prodLen, maxCharCount));
|
host.Services.AddKeyedSingleton<DataRecordQueue>(ConstVar.Producer, new DataRecordQueue(prodLen, maxCharCount));
|
||||||
host.Services.AddRecordQueuePool(tenantDbOptions.DbGroup.Keys
|
host.Services.AddRecordQueuePool(dbGroup.Keys
|
||||||
.Select(key => (key: key, queue: new DataRecordQueue(consLen, maxCharCount))).ToArray());
|
.Select(key => (key: key, queue: new DataRecordQueue(consLen, maxCharCount))).ToArray());
|
||||||
// host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
|
// host.Services.AddSingleton<ITaskMonitorLogger, CacheTaskMonitorLogger>();
|
||||||
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();
|
host.Services.AddSingleton<ITaskMonitorLogger, LoggerTaskMonitorLogger>();
|
||||||
|
Loading…
Reference in New Issue
Block a user