修改Redis用法,添加缓存键前缀;

This commit is contained in:
陈梓阳 2024-01-17 14:15:44 +08:00 committed by lindj
parent 08e0444055
commit 70981fb985
3 changed files with 34 additions and 42 deletions

View File

@ -2,6 +2,7 @@
using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options;
using ConsoleApp2.Services;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@ -19,21 +20,22 @@ public class TransformService : ITransformService
private readonly DataRecordQueue _producerQueue;
private readonly DataRecordQueue _consumerQueue;
private readonly ProcessContext _context;
private readonly IDatabase _db;
private readonly IDistributedCache _cache;
public TransformService(ILogger<TransformService> logger,
IOptions<DataTransformOptions> options,
[FromKeyedServices(ProcessStep.Producer)]DataRecordQueue producerQueue,
[FromKeyedServices(ProcessStep.Consumer)]DataRecordQueue consumerQueue,
ProcessContext context, IDatabase db)
ProcessContext context,
IDistributedCache cache)
{
_logger = logger;
_options = options;
_producerQueue = producerQueue;
_consumerQueue = consumerQueue;
_context = context;
_db = db;
_cache = cache;
}
public async Task ExecuteAsync(CancellationToken cancellationToken)
@ -50,14 +52,14 @@ public class TransformService : ITransformService
if (!_producerQueue.TryDequeue(out var record)) continue;
//过滤不要的record
if ( await _options.Value.RecordFilter?.Invoke(record,_db) == false) continue;
if ( await _options.Value.RecordFilter?.Invoke(record, _cache) == false) continue;
record.Database = _options.Value.DatabaseFilter?.Invoke(record);
//修改record
_options.Value.RecordModify?.Invoke(record);
//缓存record
_options.Value.RecordCache?.Invoke(record, _db);
_options.Value.RecordCache?.Invoke(record, _cache);
//替换record
var replaceRecord =await _options.Value.RecordReplace?.Invoke(record, _db);
var replaceRecord =await _options.Value.RecordReplace?.Invoke(record, _cache);
if (replaceRecord != null)
{
record = replaceRecord;

View File

@ -1,4 +1,5 @@
using StackExchange.Redis;
using Microsoft.Extensions.Caching.Distributed;
using StackExchange.Redis;
namespace ConsoleApp2.Options;
@ -15,11 +16,11 @@ public class DataTransformOptions
public Func<string, string>? TransformBinary { get; set; }//Binary转字符串方法
public Func<DataRecord, IDatabase, Task<bool>>? RecordFilter { get; set; }//数据过滤方法
public Func<DataRecord, IDistributedCache, Task<bool>>? RecordFilter { get; set; }//数据过滤方法
public Action<DataRecord>? RecordModify { get; set; }//数据修改
public Func<DataRecord, IDatabase, Task<DataRecord?>>? RecordReplace { get; set; }//数据替换
public Func<DataRecord, IDistributedCache, Task<DataRecord?>>? RecordReplace { get; set; }//数据替换
public Func<DataRecord, IList<DataRecord>?>? RecordAdd { get; set; }//数据替换
public Action<DataRecord, IDatabase>? RecordCache { get; set; }//数据缓存
public Action<DataRecord, IDistributedCache>? RecordCache { get; set; }//数据缓存
/// <summary>
/// 配置导入数据的特殊列

View File

@ -1,11 +1,9 @@
using ConsoleApp2;
using ConsoleApp2.Const;
using ConsoleApp2.Helpers;
using ConsoleApp2.HostedServices;
using ConsoleApp2.HostedServices.Abstractions;
using ConsoleApp2.Options;
using ConsoleApp2.Services;
using ConsoleApp2.SimulationService;
using Microsoft.Extensions.Caching.StackExchangeRedis;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
@ -13,13 +11,9 @@ using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MySqlConnector;
using Serilog;
using Serilog.Core;
using StackExchange.Redis;
using System.Reflection.PortableExecutable;
using Microsoft.Extensions.Caching.Distributed;
// 运行之前把Mysql max_allowed_packets 调大
// 运行之前把process_step表的外键删掉
await RunProgram();
return;
@ -133,7 +127,7 @@ async Task RunProgram()
options.TransformBinary = field => commandOptions != null && commandOptions.Isutf8mb4 ? $"_utf8mb4 0x{field}" : $"0x{field}";
//数据过滤
options.RecordFilter = async (record, db) =>
options.RecordFilter = async (record, cache) =>
{
//var index = Array.IndexOf(record.Headers, "ShardKey");
if (record.TryGetField("ShardKey", out var skStr))
@ -185,8 +179,8 @@ async Task RunProgram()
//如果缓存中不存在OrderProcessID,则丢弃
if(record.TryGetField("OrderProcessID",out var orderProcessID))
{
var value = await db.StringGetAsync(orderProcessID);
if (string.IsNullOrEmpty(value.ToString()))return false;
var value = await cache.GetStringAsync(orderProcessID);
if (string.IsNullOrEmpty(value))return false;
}
}
if (record.TableName == "order_block_plan_result" )
@ -194,8 +188,8 @@ async Task RunProgram()
//如果缓存中不存在ID,则丢弃(ID 对应order_block_plan中的ID)
if (record.TryGetField("ID", out var id))
{
var value = await db.StringGetAsync(id);
if (string.IsNullOrEmpty(value.ToString())) return false;
var value = await cache.GetStringAsync(id);
if (string.IsNullOrEmpty(value)) return false;
}
}
return true;
@ -227,7 +221,7 @@ async Task RunProgram()
};
//数据缓存
options.RecordCache = async (record, db) =>
options.RecordCache = async (record, cache) =>
{
if (record.TableName == "order")
{
@ -235,7 +229,7 @@ async Task RunProgram()
{
if (record.TryGetField("CompanyID", out var companyid))
{
await db.StringSetAsync(orderNo, companyid);
await cache.SetStringAsync(orderNo, companyid);
}
}
@ -249,14 +243,7 @@ async Task RunProgram()
if( record.TryGetField("ID", out var id))
{
try
{
await db.StringSetAsync(id, sk);
}
catch (Exception ex)
{
}
await cache.SetStringAsync(id, sk);
}
}
@ -266,12 +253,12 @@ async Task RunProgram()
if (record.TryGetField("CompanyID", out var companyid))
{
record.TryGetField("ID", out var id);
await db.StringSetAsync(id, companyid);
await cache.SetStringAsync(id, companyid);
}
}
};
//数据替换
options.RecordReplace = async (record, db) =>
options.RecordReplace = async (record, cache) =>
{
//删除数据源里simple_plan_order.ProcessState 字段和值
@ -306,7 +293,7 @@ async Task RunProgram()
var headers = new List<string>(record.Headers);
var fields =new List<string>(record.Fields);
headers.Add("CompanyID");
var companyidResult =await db.StringGetAsync(id);
var companyidResult =await cache.GetStringAsync(id);
_ = int.TryParse(companyidResult.ToString(), out var companyid);
fields.Add(companyid.ToString());
return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), companyid);
@ -321,8 +308,8 @@ async Task RunProgram()
var headers = new List<string>(record.Headers);
var fields = new List<string>(record.Fields);
headers.Add("CompanyID");
var companyidResult = await db.StringGetAsync(orderNo);
_ = int.TryParse(companyidResult.ToString(), out var cpid);
var companyidResult = await cache.GetStringAsync(orderNo);
_ = int.TryParse(companyidResult, out var cpid);
fields.Add(cpid.ToString());
return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), cpid);
}
@ -366,11 +353,11 @@ async Task RunProgram()
try
{
var shardKey =await db.StringGetAsync(processID);
var shardKey =await cache.GetStringAsync(processID);
var headers = new List<string>(record.Headers);
var fields = new List<string>(record.Fields);
headers.Add("ShardKey");
fields.Add(shardKey.ToString());
fields.Add(shardKey);
return new DataRecord(fields.ToArray(), record.TableName, headers.ToArray(), record.CompanyID);
}
catch (Exception ex)
@ -498,9 +485,11 @@ async Task RunProgram()
host.Services.AddSingleton<ITransformService, TransformService>();
host.Services.AddSingleton<IOutputService, OutputService>();
var redisOptions = host.Configuration.GetSection("RedisCacheOptions").Get<RedisCacheOptions>() ?? new RedisCacheOptions();
redisOptions.InstanceName = "mes-etl";
var redis = ConnectionMultiplexer.Connect(redisOptions.Configuration);
host.Services.AddSingleton(redis.GetDatabase());
host.Services.AddStackExchangeRedisCache(options =>
{
options.Configuration = redisOptions.Configuration;
options.InstanceName = "mes-etl_"; // 缓存键前缀mes-etl_
});
var app = host.Build();
await app.RunAsync();
}