using ConsoleApp2.Const; using ConsoleApp2.HostedServices.Abstractions; using ConsoleApp2.Options; using ConsoleApp2.Services; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StackExchange.Redis; namespace ConsoleApp2.HostedServices; /// /// 数据处理服务,对导入后的数据进行处理 /// public class TransformService : ITransformService { private readonly ILogger _logger; private readonly IOptions _options; private readonly DataRecordQueue _producerQueue; private readonly DataRecordQueue _consumerQueue; private readonly ProcessContext _context; private readonly IDatabase _db; public TransformService(ILogger logger, IOptions options, [FromKeyedServices(ProcessStep.Producer)]DataRecordQueue producerQueue, [FromKeyedServices(ProcessStep.Consumer)]DataRecordQueue consumerQueue, ProcessContext context, IDatabase db) { _logger = logger; _options = options; _producerQueue = producerQueue; _consumerQueue = consumerQueue; _context = context; _db = db; } public async Task ExecuteAsync(CancellationToken cancellationToken) { _logger.LogInformation("***** Data transform service started, thread id: {ThreadId} *****", Environment.CurrentManagedThreadId); while ((!_context.IsInputCompleted || _producerQueue.Count > 0)) { if (_context.GetExceptions().Count > 0) { _logger.LogInformation("***** Csv transform service is canceled *****"); return; } // var dbOptions = _options.Value.DatabaseFilter(record); if (!_producerQueue.TryDequeue(out var record)) continue; for (var i = 0; i < record.Fields.Length; i++) { var field = record[i]; if (field == "\\N") { field = "NULL"; goto Escape; } // else if(DumpDataHelper.CheckHexField(field)) // field = $"0x{field}"; switch (_options.Value.GetColumnType(record.TableName, record.Headers[i])) { case ColumnType.Text: field = string.IsNullOrEmpty(field) ? "''" : _options.Value.TransformBinary?.Invoke(field) ?? field; ; break; case ColumnType.Blob: //field = string.IsNullOrEmpty(field) ? "NULL" : $"0x{field}"; break; default: break; } Escape: record[i] = field; } //过滤不要的record if ( await _options.Value.RecordFilter?.Invoke(record,_db) == false) continue; record.Database = _options.Value.DatabaseFilter?.Invoke(record); //修改record _options.Value.RecordModify?.Invoke(record); //缓存record _options.Value.RecordCache?.Invoke(record, _db); //替换record var replaceRecord =await _options.Value.RecordReplace?.Invoke(record, _db); if (replaceRecord != null) { record = replaceRecord; } _consumerQueue.Enqueue(record); _context.AddTransform(); //数据增加 var addRecords=_options.Value.RecordAdd?.Invoke(record); if(addRecords != null&& addRecords.Count>0) { foreach(var rc in addRecords) { _consumerQueue.Enqueue(rc); _context.AddTransform(); } } } _context.CompleteTransform(); _logger.LogInformation("***** Data transformation service completed *****"); } }