// flow-executor.ts import { FlowWorkflow, FlowStatus } from './flow.workflow'; import { FlowTask } from './flow.task'; import { Subject } from 'rxjs'; export class FlowExecutor { public workflow?: FlowWorkflow; private currentTaskIndex = 0; private _status: FlowStatus = 'idle'; private retryCount = 0; // 事件系统 public taskStart$ = new Subject(); public taskSuccess$ = new Subject(); public taskFailure$ = new Subject<{ task: FlowTask; error: Error }>(); public statusChange$ = new Subject(); public progressUpdate$ = new Subject(); constructor( public maxRetries = 3, public autoRetry = false ) {} setWorkflow(workflow: FlowWorkflow) { this.workflow = workflow; this.reset(); } async start() { if (!this.workflow) throw new Error('工作流未设置'); this._status = 'running'; this.statusChange$.next(this._status); await this.executeNextTask(); } // 方法:从指定任务索引重新执行 async retryFromTask(taskIndex: number) { if (!this.workflow || taskIndex < 0 || taskIndex >= this.workflow.taskList.length) { throw new Error('无效的任务索引'); } this.currentTaskIndex = taskIndex; this.retryCount = 0; this._status = 'running'; this.statusChange$.next(this._status); await this.executeNextTask(); } // 获取当前失败的任务索引 get failedTaskIndex(): number | null { return this._status === 'failed' ? this.currentTaskIndex : null; } private async executeNextTask() { if (!this.workflow || this.currentTaskIndex >= this.workflow.taskList.length) { this._status = 'success'; this.statusChange$.next(this._status); return; } const task = this.workflow.taskList[this.currentTaskIndex]; try { this.taskStart$.next(task); await task.execute(); this.taskSuccess$.next(task); this.currentTaskIndex++; this.retryCount = 0; await this.executeNextTask(); } catch (error) { this.taskFailure$.next({ task, error: error as Error }); if (this.autoRetry && this.retryCount < this.maxRetries) { this.retryCount++; await this.executeNextTask(); } else { this._status = 'failed'; this.statusChange$.next(this._status); } } } get status() { return this._status; } get currentProgress() { if (!this.workflow) return 0; return this.currentTaskIndex / this.workflow.taskList.length; } reset() { this.currentTaskIndex = 0; this.retryCount = 0; this._status = 'idle'; } }