86 lines
2.9 KiB
C#
86 lines
2.9 KiB
C#
using Serilog;
|
||
using ApplicationException = System.ApplicationException;
|
||
using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
|
||
|
||
namespace MesETL.App.Services;
|
||
|
||
/// <summary>
|
||
/// 快速批量创建和等待任务
|
||
/// </summary>
|
||
public class TaskManager
|
||
{
|
||
private int _runningTaskCount;
|
||
|
||
public int RunningTaskCount => _runningTaskCount;
|
||
public int MaxTaskCount { get; }
|
||
|
||
public event Action<Exception>? OnException;
|
||
public event Action? OnTaskCompleteSuccessfully;
|
||
|
||
public TaskManager(int maxTaskCount)
|
||
{
|
||
MaxTaskCount = maxTaskCount;
|
||
}
|
||
|
||
public async ValueTask<Task> CreateTaskAsync(Func<Task> func, CancellationToken cancellationToken = default)
|
||
{
|
||
await TaskExtensions.WaitUntil(() => _runningTaskCount < MaxTaskCount, 25, cancellationToken);
|
||
return RunTask(func, cancellationToken);
|
||
}
|
||
|
||
public async ValueTask<Task> CreateTaskAsync(Func<object?, Task> func, object? arg, CancellationToken ct = default)
|
||
{
|
||
await TaskExtensions.WaitUntil(() => _runningTaskCount < MaxTaskCount, 25, ct);
|
||
return RunTaskNoClosure(func, arg, ct);
|
||
}
|
||
|
||
private Task RunTask(Func<Task> func, CancellationToken cancellationToken = default)
|
||
{
|
||
var task = Task.Run(async () =>
|
||
{
|
||
// Log.Logger.Verbose("[任务管理器] 新的任务已创建");
|
||
Interlocked.Increment(ref _runningTaskCount);
|
||
try
|
||
{
|
||
await func();
|
||
OnTaskCompleteSuccessfully?.Invoke();
|
||
}
|
||
catch(Exception ex)
|
||
{
|
||
OnException?.Invoke(ex);
|
||
Log.Logger.Error(ex, "[任务管理器] 执行任务时出错");
|
||
}
|
||
finally
|
||
{
|
||
Interlocked.Decrement(ref _runningTaskCount);
|
||
}
|
||
}, cancellationToken);
|
||
return task;
|
||
}
|
||
|
||
private Task RunTaskNoClosure(Func<object?, Task> func, object? arg, CancellationToken cancellationToken = default)
|
||
{
|
||
var task = Task.Factory.StartNew(async obj => // 性能考虑,这个lambda中不要捕获任何外部变量!
|
||
{
|
||
// Log.Logger.Verbose("[任务管理器] 新的任务已创建");
|
||
if (obj is not Tuple<Func<object?, Task>, object?> tuple)
|
||
throw new ApplicationException("这个异常不该出现");
|
||
Interlocked.Increment(ref _runningTaskCount);
|
||
try
|
||
{
|
||
await tuple.Item1(tuple.Item2);
|
||
OnTaskCompleteSuccessfully?.Invoke();
|
||
}
|
||
catch(Exception ex)
|
||
{
|
||
OnException?.Invoke(ex);
|
||
Log.Logger.Error(ex, "[任务管理器] 执行任务时出错");
|
||
}
|
||
finally
|
||
{
|
||
Interlocked.Decrement(ref _runningTaskCount);
|
||
}
|
||
}, Tuple.Create(func, arg), cancellationToken).Unwrap();
|
||
return task;
|
||
}
|
||
} |