MES-ETL/MesETL.App/Services/TaskManager.cs

86 lines
2.9 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Serilog;
using ApplicationException = System.ApplicationException;
using TaskExtensions = MesETL.Shared.Helper.TaskExtensions;
namespace MesETL.App.Services;
/// <summary>
/// 快速批量创建和等待任务
/// </summary>
public class TaskManager
{
private int _runningTaskCount;
public int RunningTaskCount => _runningTaskCount;
public int MaxTaskCount { get; }
public event Action<Exception>? OnException;
public event Action? OnTaskCompleteSuccessfully;
public TaskManager(int maxTaskCount)
{
MaxTaskCount = maxTaskCount;
}
public async ValueTask<Task> CreateTaskAsync(Func<Task> func, CancellationToken cancellationToken = default)
{
await TaskExtensions.WaitUntil(() => _runningTaskCount < MaxTaskCount, 25, cancellationToken);
return RunTask(func, cancellationToken);
}
public async ValueTask<Task> CreateTaskAsync(Func<object?, Task> func, object? arg, CancellationToken ct = default)
{
await TaskExtensions.WaitUntil(() => _runningTaskCount < MaxTaskCount, 25, ct);
return RunTaskNoClosure(func, arg, ct);
}
private Task RunTask(Func<Task> func, CancellationToken cancellationToken = default)
{
var task = Task.Run(async () =>
{
// Log.Logger.Verbose("[任务管理器] 新的任务已创建");
Interlocked.Increment(ref _runningTaskCount);
try
{
await func();
OnTaskCompleteSuccessfully?.Invoke();
}
catch(Exception ex)
{
OnException?.Invoke(ex);
Log.Logger.Error(ex, "[任务管理器] 执行任务时出错");
}
finally
{
Interlocked.Decrement(ref _runningTaskCount);
}
}, cancellationToken);
return task;
}
private Task RunTaskNoClosure(Func<object?, Task> func, object? arg, CancellationToken cancellationToken = default)
{
var task = Task.Factory.StartNew(async obj => // 性能考虑这个lambda中不要捕获任何外部变量!
{
// Log.Logger.Verbose("[任务管理器] 新的任务已创建");
if (obj is not Tuple<Func<object?, Task>, object?> tuple)
throw new ApplicationException("这个异常不该出现");
Interlocked.Increment(ref _runningTaskCount);
try
{
await tuple.Item1(tuple.Item2);
OnTaskCompleteSuccessfully?.Invoke();
}
catch(Exception ex)
{
OnException?.Invoke(ex);
Log.Logger.Error(ex, "[任务管理器] 执行任务时出错");
}
finally
{
Interlocked.Decrement(ref _runningTaskCount);
}
}, Tuple.Create(func, arg), cancellationToken).Unwrap();
return task;
}
}