using System.Collections.Concurrent; namespace MesETL.App.Services; /// /// 处理上下文类,标识处理进度 /// public class ProcessContext { private bool _hasException; private long _inputCount; private long _transformCount; private long _outputCount; private readonly ConcurrentDictionary _tableProgress = new(); public bool HasException => _hasException; public bool IsInputCompleted { get; private set; } public bool IsTransformCompleted { get; private set; } public bool IsOutputCompleted { get; private set; } public long InputCount { get => _inputCount; set => Interlocked.Exchange(ref _inputCount, value); } public long TransformCount { get => _transformCount; set => Interlocked.Exchange(ref _transformCount, value); } public long OutputCount { get => _outputCount; set => Interlocked.Exchange(ref _outputCount, value); } public long MaxMemoryUsage { get; set; } // TableName -> Count public IReadOnlyDictionary TableProgress => _tableProgress; public void CompleteInput() => IsInputCompleted = true; public void CompleteTransform() => IsTransformCompleted = true; public void CompleteOutput() => IsOutputCompleted = true; public bool AddException(Exception e) => _hasException = true; // 没打算存起来,暂时先加个标记 public void AddInput() => Interlocked.Increment(ref _inputCount); public void AddInput(int count) => Interlocked.Add(ref _inputCount, count); public void AddTransform() => Interlocked.Increment(ref _transformCount); public void AddTransform(int count) => Interlocked.Add(ref _transformCount, count); public void AddOutput() => Interlocked.Increment(ref _outputCount); public void AddOutput(int count) => Interlocked.Add(ref _outputCount, count); public void AddTableInput(string table, int count) { if (!_tableProgress.TryAdd(table, (input: count, output: 0))) { var tuple = _tableProgress[table]; tuple.input += count; _tableProgress[table] = tuple; } } public void AddTableOutput(string table, int count) { _tableProgress.AddOrUpdate(table, (input:0, output:count), (k, tuple) => { tuple.output += count; return tuple; }); } }