feat:处理器初步实现---有接上了新优化,回显需要再看下

This commit is contained in:
2025-07-09 16:36:26 +08:00
parent 92b49c7035
commit 595675a08a
42 changed files with 70546 additions and 1993 deletions

248
src/device.ts Normal file
View File

@@ -0,0 +1,248 @@
// 回调函数类型定义
type ProcessCallback<T, R> = (
input: T,
next: (input: T) => R | Promise<R>,
context?: any
) => R | Promise<R>;
// 模块配置类型
type ModuleConfig = Record<string, any>;
// 回调函数类型
type ModuleCallback<T, R> = (result: R, input?: T) => void | Promise<void>;
// 扩展后的功能模块接口
export interface ProcessorModule<T, R> {
// 主处理函数(可回调)
process?: ProcessCallback<T, R>;
// 直接是处理函数(无回调)
handle?: (input: T, next?: (input: T) => R | Promise<R>, context?: Record<string, any>) => R | Promise<R>;
// 模块名称(用于标识和排序)
moduleName?: string;
// 模块配置
config?: ModuleConfig;
getConfigList?: () => ModuleConfig
// 设置配置的方法
setConfig?: (config: ModuleConfig) => void;
// 前置回调(在模块处理前执行)
before?: ModuleCallback<T, R>;
// 后置回调(在模块处理后执行)
after?: ModuleCallback<T, R>;
// 错误处理回调
onError?: (error: unknown, input?: T) => void | Promise<void>;
}
export interface Processor<T, R> {
// 注册模块
use(module: ProcessorModule<T, R> | ProcessorModule<T, R>[]): this;
// 调整模块顺序
reorderModules(moduleNames: string[]): this;
// 执行处理流程
process(input: T): Promise<R>;
// 获取当前模块列表
getModules(): ProcessorModule<T, R>[];
// 新增方法:更新模块配置
updateModuleConfig(moduleName: string, config: ModuleConfig): this;
}
// 处理器集合接口
export interface ProcessorCollection<T, R> {
// 注册处理器
registerProcessor(name: string, processor: Processor<T, R>): this;
// 切换当前处理器
useProcessor(name: string): Processor<T, R>;
// 获取处理器
getProcessor(name: string): Processor<T, R> | undefined;
}
/** 流程管理器 处理器内组件的执行流程管理器
*
* 负责管理 组件执行顺序 和执行模块
*/
export class StepControllerProcessor<T, R> implements Processor<T, R> {
private modules: ProcessorModule<T, R>[] = [];
private modulesMap = new Map<string, ProcessorModule<T, R>>();
use(module: ProcessorModule<T, R> | ProcessorModule<T, R>[]): this {
const modules = Array.isArray(module) ? module : [module];
modules.forEach(m => {
if (m.moduleName) {
this.modulesMap.set(m.moduleName, m);
}
this.modules.push(m);
});
return this;
}
reorderModules(moduleNames: string[]): this {
const orderedModules = moduleNames
.map(name => this.modulesMap.get(name))
.filter(Boolean) as ProcessorModule<T, R>[];
const remainingModules = this.modules.filter(
m => !m.moduleName || !moduleNames.includes(m.moduleName)
);
this.modules = [...orderedModules, ...remainingModules];
return this;
}
updateModuleConfig(moduleName: string, config: ModuleConfig): this {
const module = this.modulesMap.get(moduleName);
if (module && module.setConfig) {
module.setConfig(config);
} else if (module) {
module.config = { ...module.config, ...config };
}
return this;
}
private async executeModule(
module: ProcessorModule<T, R>,
input: T,
next: (input: T) => Promise<R>,
context: Record<string, any>
): Promise<R> {
try {
// 执行前置回调
if (module.before) {
await module.before(input, input);
}
// 执行主处理逻辑(支持两种风格)
let result: R;
if (module.process) {
// 回调风格
const processResult = module.process(input, next, context);
result = processResult instanceof Promise ? await processResult : processResult;
} else if (module.handle) {
// 传统风格
const handleResult = module.handle(input, next, context);
result = handleResult instanceof Promise ? await handleResult : handleResult;
} else {
// 默认直接调用 next
result = await next(input);
}
// 执行后置回调
if (module.after) {
await module.after(result, input);
}
return result;
} catch (error) {
// 执行错误处理
if (module.onError) {
await module.onError(error, input);
// 即使出错也继续流程(除非抛出)
return await next(input);
}
throw error;
}
}
async process(input: T): Promise<R> {
if (this.modules.length === 0) {
throw new Error("No modules registered");
}
let currentIndex = 0;
const modules = this.modules;
const context: Record<string, any> = {};
const executeNext = async (currentInput: T): Promise<R> => {
const currentModule = modules[currentIndex++];
if (!currentModule) {
return currentInput as unknown as R;
}
// 创建 next 函数
const next = async (nextInput: T): Promise<R> => {
return executeNext(nextInput);
};
return this.executeModule(currentModule, currentInput, next, context);
};
return executeNext(input);
}
// async process(input: T): Promise<R> {
// if (this.modules.length === 0) {
// throw new Error("No modules registered");
// }
// let currentIndex = 0;
// const modules = this.modules;
// const context: Record<string, any> = {}; // 共享上下文
// const executeNext = async (currentInput: T): Promise<R> => {
// const currentModule = modules[currentIndex++];
// if (!currentModule) {
// return currentInput as unknown as R;
// }
// try {
// // 执行前置回调
// if (currentModule.before) {
// await currentModule.before(currentInput, currentInput);
// }
// // 执行主处理函数
// const next = async (nextInput: T): Promise<R> => {
// return executeNext(nextInput);
// };
// let result: R;
// const processResult = currentModule.process(currentInput, next, context);
// if (processResult instanceof Promise) {
// result = await processResult;
// } else {
// result = processResult;
// }
// // 执行后置回调
// if (currentModule.after) {
// await currentModule.after(result, currentInput);
// }
// return result;
// } catch (error) {
// // 执行错误处理
// if (currentModule.onError) {
// await currentModule.onError(error, currentInput);
// } else {
// throw error; // 如果没有错误处理,则向上抛出
// }
// // 根据错误处理结果决定是否继续
// return currentInput as unknown as R;
// }
// };
// return executeNext(input);
// }
getModules(): ProcessorModule<T, R>[] {
return [...this.modules];
}
}