using System.Diagnostics; using System.Text; using MesETL.App.Services; using MesETL.App.Services.Loggers; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; namespace MesETL.App.HostedServices; /// /// 任务监控 /// public class TaskMonitorService { private readonly IEnumerable _monitorLoggers; private readonly ProcessContext _context; private readonly DataRecordQueue _producerQueue; private readonly RecordQueuePool _queuePool; private readonly IConfiguration _configuration; private string _outputPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Log/progress.txt"); private readonly int _gcInterval; public TaskMonitorService(ProcessContext context, [FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue, RecordQueuePool queuePool, IEnumerable monitorLoggers, IConfiguration configuration) { _context = context; _producerQueue = producerQueue; _queuePool = queuePool; _monitorLoggers = monitorLoggers; _configuration = configuration; _gcInterval = _configuration.GetValue("GCIntervalMilliseconds)"); } public async Task Monitor(CancellationToken stoppingToken) { var sw = Stopwatch.StartNew(); var lastGCTime = sw.ElapsedMilliseconds; var lastTime = sw.ElapsedMilliseconds; var lastInputCount = _context.InputCount; var lastTransformCount = _context.TransformCount; var lastOutputCount = _context.OutputCount; bool endCheck = false; while (!stoppingToken.IsCancellationRequested) { EndCheck: // var running = 0; // var error = 0; // var completed = 0; // var canceled = 0; // foreach (var task in _taskManager.Tasks) // { // switch (task.Status) // { // case TaskStatus.Canceled: // canceled++; // break; // case TaskStatus.Faulted: // error++; // break; // case TaskStatus.RanToCompletion: // completed++; // break; // default: // running++; // break; // } // } var time = sw.ElapsedMilliseconds; var inputCount = _context.InputCount; var transformCount = _context.TransformCount; var outputCount = _context.OutputCount; var elapseTime = (time - lastTime) / 1000f; var inputSpeed = (inputCount - lastInputCount) / elapseTime; var transformSpeed = (transformCount - lastTransformCount) / elapseTime; var outputSpeed = (outputCount - lastOutputCount) / elapseTime; if(_gcInterval > 0 && time - lastGCTime > _gcInterval) { GC.Collect(); lastGCTime = time; } // _logger.LogInformation( // "Task monitor: running: {Running}, error: {Error}, completed: {Completed}, canceled: {Canceled}, outputSpeed: {Speed} records/s", // running, error, completed, canceled, outputSpeed); foreach (var logger in _monitorLoggers) { logger.LogStatus("Monitor: Progress status", new Dictionary { {"Input",_context.IsInputCompleted ? "OK" : $"{inputSpeed:F2}/s" }, {"Transform", _context.IsTransformCompleted ? "OK" : $"{transformSpeed:F2}/s" }, {"Output", _context.IsOutputCompleted ? "OK" : $"{outputSpeed:F2}/s" }, {"| Input Queue", _producerQueue.Count.ToString() }, {"Output Queue", _queuePool.Queues.Values.Sum(queue => queue.Count).ToString()}, {"Memory", $"{GC.GetTotalMemory(false) / 1024 / 1024} MiB"}, }); var dict = _context.TableProgress .ToDictionary(kv => kv.Key, kv => kv.Value.ToString()); logger.LogStatus("Monitor: Table output progress", dict, ITaskMonitorLogger.LogLevel.Progress); var sb = new StringBuilder("Table Progress: \n"); foreach (var kv in _context.TableProgress) { sb.AppendLine($"{kv.Key}: {kv.Value}"); } sb.AppendLine($"LongestCharCount: {_producerQueue.LongestFieldCharCount}"); await File.WriteAllTextAsync(_outputPath, sb.ToString(), CancellationToken.None); logger.LogStatus("Monitor: Process count", new Dictionary { {"Input", inputCount.ToString()}, {"Transform", transformCount.ToString()}, {"Output", outputCount.ToString()} }, ITaskMonitorLogger.LogLevel.Progress); } await Task.Delay(5000, stoppingToken); lastTime = time; lastInputCount = inputCount; lastTransformCount = transformCount; lastOutputCount = outputCount; if (_context is { IsInputCompleted: true, IsTransformCompleted: true, IsOutputCompleted: true }) { if (!endCheck) { endCheck = true; goto EndCheck; } break; } } } }