From aa7041962ac5b6de07b59728f04b938aaa4c2a22 Mon Sep 17 00:00:00 2001 From: CZY <2817212736@qq.com> Date: Sat, 10 Feb 2024 00:05:50 +0800 Subject: [PATCH] add gc interval --- .../HostedServices/TaskMonitorService.cs | 20 ++++++++++++++++++- MesETL.App/Services/DataRecordQueue.cs | 3 +++ MesETL.App/appsettings.json | 3 ++- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/MesETL.App/HostedServices/TaskMonitorService.cs b/MesETL.App/HostedServices/TaskMonitorService.cs index 609bd8e..c594654 100644 --- a/MesETL.App/HostedServices/TaskMonitorService.cs +++ b/MesETL.App/HostedServices/TaskMonitorService.cs @@ -2,6 +2,7 @@ using System.Text; using MesETL.App.Services; using MesETL.App.Services.Loggers; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; namespace MesETL.App.HostedServices; @@ -15,24 +16,32 @@ public class TaskMonitorService private readonly ProcessContext _context; private readonly DataRecordQueue _producerQueue; private readonly RecordQueuePool _queuePool; + private readonly IConfiguration _configuration; private string _outputPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Log/progress.txt"); + private readonly int _gcInterval; + public TaskMonitorService(ProcessContext context, [FromKeyedServices(Const.ConstVar.Producer)] DataRecordQueue producerQueue, RecordQueuePool queuePool, - IEnumerable monitorLoggers) + IEnumerable monitorLoggers, + IConfiguration configuration) { _context = context; _producerQueue = producerQueue; _queuePool = queuePool; _monitorLoggers = monitorLoggers; + _configuration = configuration; + + _gcInterval = _configuration.GetValue("GCIntervalMilliseconds)"); } public async Task Monitor(CancellationToken stoppingToken) { var sw = Stopwatch.StartNew(); + var lastGCTime = sw.ElapsedMilliseconds; var lastTime = sw.ElapsedMilliseconds; var lastInputCount = _context.InputCount; var lastTransformCount = _context.TransformCount; @@ -74,6 +83,12 @@ public class TaskMonitorService var inputSpeed = (inputCount - lastInputCount) / elapseTime; var transformSpeed = (transformCount - lastTransformCount) / elapseTime; var outputSpeed = (outputCount - lastOutputCount) / elapseTime; + + if(_gcInterval > 0 && time - lastGCTime > _gcInterval) + { + GC.Collect(); + lastGCTime = time; + } // _logger.LogInformation( // "Task monitor: running: {Running}, error: {Error}, completed: {Completed}, canceled: {Canceled}, outputSpeed: {Speed} records/s", @@ -88,6 +103,7 @@ public class TaskMonitorService {"| Input Queue", _producerQueue.Count.ToString() }, {"Output Queue", _queuePool.Queues.Values.Sum(queue => queue.Count).ToString()}, + {"Memory", $"{GC.GetTotalMemory(false) / 1024 / 1024} MiB"}, }); var dict = _context.TableProgress @@ -98,6 +114,8 @@ public class TaskMonitorService { sb.AppendLine($"{kv.Key}: {kv.Value}"); } + + sb.AppendLine($"LongestCharCount: {_producerQueue.LongestFieldCharCount}"); await File.WriteAllTextAsync(_outputPath, sb.ToString(), CancellationToken.None); diff --git a/MesETL.App/Services/DataRecordQueue.cs b/MesETL.App/Services/DataRecordQueue.cs index da2fa0b..a2b32c2 100644 --- a/MesETL.App/Services/DataRecordQueue.cs +++ b/MesETL.App/Services/DataRecordQueue.cs @@ -18,6 +18,8 @@ public class DataRecordQueue : IDisposable public bool IsCompleted => _queue.IsCompleted; public bool IsAddingCompleted => _queue.IsAddingCompleted; + public long LongestFieldCharCount { get; private set; } + public event Action? OnRecordWrite; public event Action? OnRecordRead; @@ -49,6 +51,7 @@ public class DataRecordQueue : IDisposable public async Task EnqueueAsync(DataRecord record) { var charCount = record.FieldCharCount; + LongestFieldCharCount = Math.Max(LongestFieldCharCount, charCount); if(_currentCharCount + charCount > _maxCharCount) await TaskExtensions.WaitUntil(() => _currentCharCount + charCount < _maxCharCount, 50); _queue.Add(record); diff --git a/MesETL.App/appsettings.json b/MesETL.App/appsettings.json index e5fe139..cd407bb 100644 --- a/MesETL.App/appsettings.json +++ b/MesETL.App/appsettings.json @@ -1,4 +1,5 @@ { + "GCIntervalMilliseconds": -1, "Logging": { "LogLevel": { "Default": "Debug" @@ -8,7 +9,7 @@ "InputDir": "D:\\Dump\\NewMockData", // Csv数据输入目录 "UseMock": false, // 使用模拟数据进行测试 "MockCountMultiplier": 1, // 模拟数据量级的乘数 - "TableIgnoreList": ["order_box_block"] // 忽略输入的表 + "TableIgnoreList": [] // 忽略输入的表 }, "Transform":{ "StrictMode": false, // 设为true时如果数据转换发生错误,立刻停止程序