1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- // flow-executor.ts
- import { FlowWorkflow, FlowStatus } from './flow.workflow';
- import { FlowTask } from './flow.task';
- import { Subject } from 'rxjs';
- export class FlowExecutor {
- public workflow?: FlowWorkflow;
- private currentTaskIndex = 0;
- private _status: FlowStatus = 'idle';
- private retryCount = 0;
- // 事件系统
- public taskStart$ = new Subject<FlowTask>();
- public taskSuccess$ = new Subject<FlowTask>();
- public taskFailure$ = new Subject<{ task: FlowTask; error: Error }>();
- public statusChange$ = new Subject<FlowStatus>();
- public progressUpdate$ = new Subject<number>();
- constructor(
- public maxRetries = 3,
- public autoRetry = false
- ) {}
- setWorkflow(workflow: FlowWorkflow) {
- this.workflow = workflow;
- this.reset();
- }
- async start() {
- if (!this.workflow) throw new Error('工作流未设置');
- this._status = 'running';
- this.statusChange$.next(this._status);
- await this.executeNextTask();
- }
- // 方法:从指定任务索引重新执行
- async retryFromTask(taskIndex: number) {
- if (!this.workflow || taskIndex < 0 || taskIndex >= this.workflow.taskList.length) {
- throw new Error('无效的任务索引');
- }
- this.currentTaskIndex = taskIndex;
- this.retryCount = 0;
- this._status = 'running';
- this.statusChange$.next(this._status);
- await this.executeNextTask();
- }
- // 获取当前失败的任务索引
- get failedTaskIndex(): number | null {
- return this._status === 'failed' ? this.currentTaskIndex : null;
- }
- private async executeNextTask() {
- if (!this.workflow || this.currentTaskIndex >= this.workflow.taskList.length) {
- this._status = 'success';
- this.statusChange$.next(this._status);
- return;
- }
- const task = this.workflow.taskList[this.currentTaskIndex];
- try {
- this.taskStart$.next(task);
- await task.execute();
- this.taskSuccess$.next(task);
- this.currentTaskIndex++;
- this.retryCount = 0;
- await this.executeNextTask();
- } catch (error) {
- this.taskFailure$.next({ task, error: error as Error });
- if (this.autoRetry && this.retryCount < this.maxRetries) {
- this.retryCount++;
- await this.executeNextTask();
- } else {
- this._status = 'failed';
- this.statusChange$.next(this._status);
- }
- }
- }
- get status() {
- return this._status;
- }
- get currentProgress() {
- if (!this.workflow) return 0;
- return this.currentTaskIndex / this.workflow.taskList.length;
- }
- reset() {
- this.currentTaskIndex = 0;
- this.retryCount = 0;
- this._status = 'idle';
- }
- }
|