| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- // flow-executor.ts
- import { FlowWorkflow, FlowStatus } from './flow.workflow';
- import { FlowTask } from './flow.task';
- import { Subject } from 'rxjs';
- export class FlowExecutor {
- public workflow?: FlowWorkflow;
- private currentTaskIndex = 0;
- private _status: FlowStatus = 'idle';
- private retryCount = 0;
- // 共享数据
- public sharedData: Record<string, any> = {}; // 共享数据存储
- // 事件系统
- public taskStart$ = new Subject<FlowTask>();
- public taskSuccess$ = new Subject<FlowTask>();
- public taskFailure$ = new Subject<{ task: FlowTask; error: Error }>();
- public statusChange$ = new Subject<FlowStatus>();
- public progressUpdate$ = new Subject<number>();
- constructor(
- public maxRetries = 3,
- public autoRetry = false
- ) { }
- setWorkflow(workflow: FlowWorkflow) {
- this.workflow = workflow;
- // 初始化所有任务的共享数据
- this.workflow.taskList.forEach(task => {
- task.setSharedData(this.sharedData);
- });
- this.reset();
- }
- // 添加获取共享数据的方法
- getSharedData(): Record<string, any> {
- return this.sharedData;
- }
- // 添加更新共享数据的方法
- updateSharedData(data: Record<string, any>): void {
- this.sharedData = { ...this.sharedData, ...data };
- // 更新所有任务的引用(以防有新增任务)
- this.workflow?.taskList.forEach(task => {
- task.data = this.sharedData;
- });
- }
- async start() {
- if (!this.workflow) throw new Error('工作流未设置');
- this._status = 'running';
- this.statusChange$.next(this._status);
- await this.executeNextTask();
- }
- // 方法:从指定任务索引重新执行
- async retryFromTask(taskIndex: number) {
- if (!this.workflow || taskIndex < 0 || taskIndex >= this.workflow.taskList.length) {
- throw new Error('无效的任务索引');
- }
- this.currentTaskIndex = taskIndex;
- this.retryCount = 0;
- this._status = 'running';
- this.statusChange$.next(this._status);
- await this.executeNextTask();
- }
- // 获取当前失败的任务索引
- get failedTaskIndex(): number | null {
- return this._status === 'failed' ? this.currentTaskIndex : null;
- }
- private async executeNextTask() {
- if (!this.workflow || this.currentTaskIndex >= this.workflow.taskList.length) {
- this._status = 'success';
- this.statusChange$.next(this._status);
- return;
- }
- const task = this.workflow.taskList[this.currentTaskIndex];
- task.data = this.sharedData;
- try {
- this.taskStart$.next(task);
- await task.execute();
- console.log(task.title, task.data, this.sharedData)
- this.sharedData = task.data;
- // 只有当任务状态是success时才继续
- if (task.status === 'success') {
- this.taskSuccess$.next(task);
- this.currentTaskIndex++;
- this.retryCount = 0;
- await this.executeNextTask();
- }
- // 如果是idle状态(用户取消),不继续执行也不标记为失败
- } catch (error) {
- console.error('任务执行错误:', error);
- this.taskFailure$.next({ task, error: error as Error });
- if (this.autoRetry && this.retryCount < this.maxRetries) {
- this.retryCount++;
- await this.executeNextTask();
- } else {
- this._status = 'failed';
- this.statusChange$.next(this._status);
- }
- }
- }
- get status() {
- return this._status;
- }
- get currentProgress() {
- if (!this.workflow) return 0;
- return this.currentTaskIndex / this.workflow.taskList.length;
- }
- reset() {
- this.currentTaskIndex = 0;
- this.retryCount = 0;
- this._status = 'idle';
- }
- }
|