修复流水号缓存服务的并行错误;
修复输出线程没有捕获异常的严重错误;
This commit is contained in:
parent
b20c56640f
commit
4986c60416
@ -56,7 +56,18 @@ public class OutputService : IOutputService
|
|||||||
if (!dbTasks.ContainsKey(db))
|
if (!dbTasks.ContainsKey(db))
|
||||||
{
|
{
|
||||||
dbTasks.Add(db, await dbTaskManager.CreateTaskAsync(
|
dbTasks.Add(db, await dbTaskManager.CreateTaskAsync(
|
||||||
async () => await StartDatabaseWorker(db, queue, ct), ct));
|
async () =>
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await StartDatabaseWorker(db, queue, ct);
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
_logger.LogError(e, "输出线程发生错误");
|
||||||
|
_queuePool.RemoveQueue(db);
|
||||||
|
}
|
||||||
|
}, ct));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,26 +48,92 @@ public class TransformService : ITransformService
|
|||||||
|
|
||||||
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
public async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("***** 数据转换服务已启动, 当前线程ID: {ThreadId} *****", Environment.CurrentManagedThreadId);
|
_logger.LogInformation("***** 数据转换服务已启动 *****");
|
||||||
|
|
||||||
// var tasks = new List<Task>();
|
|
||||||
// for (int i = 0; i < 4; i++)
|
|
||||||
// {
|
|
||||||
// tasks.Add(Task.Run(TransformWorker, cancellationToken));
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// await Task.WhenAll(tasks);
|
|
||||||
await TransformWorker();
|
|
||||||
|
|
||||||
|
await TransformWorker2();
|
||||||
|
|
||||||
|
_context.CompleteTransform();
|
||||||
_logger.LogInformation("***** 数据转换服务执行完毕 *****");
|
_logger.LogInformation("***** 数据转换服务执行完毕 *****");
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task TransformWorker()
|
public async Task TransformWorker(DataRecordQueue queue)
|
||||||
{
|
{
|
||||||
while (!_context.IsInputCompleted || _producerQueue.Count > 0)
|
while (!_context.IsInputCompleted || _producerQueue.Count > 0)
|
||||||
{
|
{
|
||||||
if (!_producerQueue.TryDequeue(out var record))
|
if (!_producerQueue.TryDequeue(out var record))
|
||||||
{
|
{
|
||||||
|
await Task.Delay(100);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var context = new DataTransformContext(record, _cache, _logger, _services);
|
||||||
|
if (_options.Value.EnableFilter)
|
||||||
|
{
|
||||||
|
// 数据过滤
|
||||||
|
var filter = _options.Value.RecordFilter;
|
||||||
|
if (filter is not null && await filter(context) == false) continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_options.Value.EnableReplacer)
|
||||||
|
{
|
||||||
|
// 数据替换
|
||||||
|
var replacer = _options.Value.RecordModify;
|
||||||
|
if (replacer is not null)
|
||||||
|
{
|
||||||
|
record = await replacer(context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 字段缓存
|
||||||
|
var cacher = _options.Value.RecordCache;
|
||||||
|
if(cacher is not null)
|
||||||
|
await cacher.Invoke(context);
|
||||||
|
|
||||||
|
//计算需要分流的数据库
|
||||||
|
var dbFilter = _options.Value.DatabaseFilter
|
||||||
|
?? throw new ApplicationException("未配置数据库过滤器");
|
||||||
|
record.Database = dbFilter(record);
|
||||||
|
|
||||||
|
if (_options.Value.EnableReBuilder)
|
||||||
|
{
|
||||||
|
//数据重建
|
||||||
|
var addRecords = _options.Value.RecordReBuild?.Invoke(context);
|
||||||
|
if (addRecords is { Count: > 0 })
|
||||||
|
{
|
||||||
|
foreach (var rc in addRecords)
|
||||||
|
{
|
||||||
|
if(dbFilter is not null)
|
||||||
|
rc.Database = dbFilter.Invoke(record);
|
||||||
|
await queue.EnqueueAsync(rc);
|
||||||
|
_context.AddTransform();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await queue.EnqueueAsync(record);
|
||||||
|
_context.AddTransform();
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
_context.AddException(e);
|
||||||
|
var errorRecorder = _errorRecorderFactory.CreateTransform();
|
||||||
|
await errorRecorder.LogErrorRecordAsync(record, e);
|
||||||
|
if (!_options.Value.StrictMode)
|
||||||
|
_logger.LogError(e, "数据转换时发生错误");
|
||||||
|
else throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task TransformWorker2()
|
||||||
|
{
|
||||||
|
while (!_context.IsInputCompleted || _producerQueue.Count > 0)
|
||||||
|
{
|
||||||
|
if (!_producerQueue.TryDequeue(out var record))
|
||||||
|
{
|
||||||
|
await Task.Delay(100);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -349,7 +349,7 @@ async Task RunProgram()
|
|||||||
{
|
{
|
||||||
builder.ClearProviders();
|
builder.ClearProviders();
|
||||||
builder.AddSerilog(new LoggerConfiguration()
|
builder.AddSerilog(new LoggerConfiguration()
|
||||||
.WriteTo.Console()
|
.WriteTo.Console().MinimumLevel.Debug()
|
||||||
.WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"),
|
.WriteTo.File(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"./Log/Error/{ErrorRecorder.UID}.log"),
|
||||||
restrictedToMinimumLevel:LogEventLevel.Error)
|
restrictedToMinimumLevel:LogEventLevel.Error)
|
||||||
// .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
|
// .WriteTo.File("./Log/Info/{ErrorRecorder.UID}.log", restrictedToMinimumLevel:LogEventLevel.Information) //性能考虑暂不使用
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using System.Runtime.CompilerServices;
|
using System.Runtime.CompilerServices;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
@ -11,7 +12,7 @@ namespace MesETL.App.Services.Seq;
|
|||||||
public class SeqService
|
public class SeqService
|
||||||
{
|
{
|
||||||
private readonly string _connectionString;
|
private readonly string _connectionString;
|
||||||
private readonly Dictionary<SeqConfig, long> _cachedSequence;
|
private readonly ConcurrentDictionary<SeqConfig, long> _cachedSequence;
|
||||||
|
|
||||||
public IReadOnlyDictionary<SeqConfig, long> CachedSequence => _cachedSequence;
|
public IReadOnlyDictionary<SeqConfig, long> CachedSequence => _cachedSequence;
|
||||||
|
|
||||||
@ -24,7 +25,7 @@ public class SeqService
|
|||||||
};
|
};
|
||||||
_connectionString = builder.ConnectionString;
|
_connectionString = builder.ConnectionString;
|
||||||
|
|
||||||
_cachedSequence = new Dictionary<SeqConfig, long>();
|
_cachedSequence = new ConcurrentDictionary<SeqConfig, long>();
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<long> UpdateSequenceID(string name,int step,long max,bool recycle, int add)
|
private async Task<long> UpdateSequenceID(string name,int step,long max,bool recycle, int add)
|
||||||
@ -99,9 +100,9 @@ public class SeqService
|
|||||||
/// 移除一个缓存的流水号
|
/// 移除一个缓存的流水号
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="config"></param>
|
/// <param name="config"></param>
|
||||||
public void RemoveCachedSeq(SeqConfig config)
|
public bool RemoveCachedSeq(SeqConfig config)
|
||||||
{
|
{
|
||||||
_cachedSequence.Remove(config);
|
return _cachedSequence.Remove(config, out _);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
@ -4,7 +4,7 @@
|
|||||||
"UnsafeVariable": false,
|
"UnsafeVariable": false,
|
||||||
"Logging": {
|
"Logging": {
|
||||||
"LogLevel": {
|
"LogLevel": {
|
||||||
"Default": "Debug"
|
"Default": "Trace"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"Input":{
|
"Input":{
|
||||||
@ -33,8 +33,8 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"RecordQueue":{
|
"RecordQueue":{
|
||||||
"ProducerQueueLength": 50000, // 输入队列最大长度
|
"ProducerQueueLength": 40000, // 输入队列最大长度
|
||||||
"ConsumerQueueLength": 10000, // 每个输出队列最大长度
|
"ConsumerQueueLength": 20000, // 每个输出队列最大长度
|
||||||
"MaxByteCount": 3221225472 // 队列最大字节数
|
"MaxByteCount": 3221225472 // 队列最大字节数
|
||||||
},
|
},
|
||||||
"RedisCache": {
|
"RedisCache": {
|
||||||
|
Loading…
Reference in New Issue
Block a user