// flow-executor.ts import { FlowWorkflow, FlowStatus } from './flow.workflow'; import { FlowTask } from './flow.task'; import { Subject } from 'rxjs'; export class FlowExecutor { public workflow?: FlowWorkflow; private currentTaskIndex = 0; private _status: FlowStatus = 'idle'; private retryCount = 0; // 共享数据 public sharedData: Record = {}; // 共享数据存储 // 事件系统 public taskStart$ = new Subject(); public taskSuccess$ = new Subject(); public taskFailure$ = new Subject<{ task: FlowTask; error: Error }>(); public statusChange$ = new Subject(); public progressUpdate$ = new Subject(); constructor( public maxRetries = 3, public autoRetry = false ) { } setWorkflow(workflow: FlowWorkflow) { this.workflow = workflow; // 初始化所有任务的共享数据 this.workflow.taskList.forEach(task => { task.setSharedData(this.sharedData); }); this.reset(); } // 添加获取共享数据的方法 getSharedData(): Record { return this.sharedData; } // 添加更新共享数据的方法 updateSharedData(data: Record): void { this.sharedData = { ...this.sharedData, ...data }; // 更新所有任务的引用(以防有新增任务) this.workflow?.taskList.forEach(task => { task.data = this.sharedData; }); } async start() { if (!this.workflow) throw new Error('工作流未设置'); this._status = 'running'; this.statusChange$.next(this._status); await this.executeNextTask(); } // 方法:从指定任务索引重新执行 async retryFromTask(taskIndex: number) { if (!this.workflow || taskIndex < 0 || taskIndex >= this.workflow.taskList.length) { throw new Error('无效的任务索引'); } this.currentTaskIndex = taskIndex; this.retryCount = 0; this._status = 'running'; this.statusChange$.next(this._status); await this.executeNextTask(); } // 获取当前失败的任务索引 get failedTaskIndex(): number | null { return this._status === 'failed' ? this.currentTaskIndex : null; } private async executeNextTask() { if (!this.workflow || this.currentTaskIndex >= this.workflow.taskList.length) { this._status = 'success'; this.statusChange$.next(this._status); return; } const task = this.workflow.taskList[this.currentTaskIndex]; task.data = this.sharedData; try { this.taskStart$.next(task); await task.execute(); console.log(task.title, task.data, this.sharedData) this.sharedData = task.data; // 只有当任务状态是success时才继续 if (task.status === 'success') { this.taskSuccess$.next(task); this.currentTaskIndex++; this.retryCount = 0; await this.executeNextTask(); } // 如果是idle状态(用户取消),不继续执行也不标记为失败 } catch (error) { console.error('任务执行错误:', error); this.taskFailure$.next({ task, error: error as Error }); if (this.autoRetry && this.retryCount < this.maxRetries) { this.retryCount++; await this.executeNextTask(); } else { this._status = 'failed'; this.statusChange$.next(this._status); } } } get status() { return this._status; } get currentProgress() { if (!this.workflow) return 0; return this.currentTaskIndex / this.workflow.taskList.length; } reset() { this.currentTaskIndex = 0; this.retryCount = 0; this._status = 'idle'; } }