flow.executor.ts 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. // flow-executor.ts
  2. import { FlowWorkflow, FlowStatus } from './flow.workflow';
  3. import { FlowTask } from './flow.task';
  4. import { Subject } from 'rxjs';
  5. export class FlowExecutor {
  6. public workflow?: FlowWorkflow;
  7. private currentTaskIndex = 0;
  8. private _status: FlowStatus = 'idle';
  9. private retryCount = 0;
  10. // 事件系统
  11. public taskStart$ = new Subject<FlowTask>();
  12. public taskSuccess$ = new Subject<FlowTask>();
  13. public taskFailure$ = new Subject<{ task: FlowTask; error: Error }>();
  14. public statusChange$ = new Subject<FlowStatus>();
  15. public progressUpdate$ = new Subject<number>();
  16. constructor(
  17. public maxRetries = 3,
  18. public autoRetry = false
  19. ) {}
  20. setWorkflow(workflow: FlowWorkflow) {
  21. this.workflow = workflow;
  22. this.reset();
  23. }
  24. async start() {
  25. if (!this.workflow) throw new Error('工作流未设置');
  26. this._status = 'running';
  27. this.statusChange$.next(this._status);
  28. await this.executeNextTask();
  29. }
  30. // 方法:从指定任务索引重新执行
  31. async retryFromTask(taskIndex: number) {
  32. if (!this.workflow || taskIndex < 0 || taskIndex >= this.workflow.taskList.length) {
  33. throw new Error('无效的任务索引');
  34. }
  35. this.currentTaskIndex = taskIndex;
  36. this.retryCount = 0;
  37. this._status = 'running';
  38. this.statusChange$.next(this._status);
  39. await this.executeNextTask();
  40. }
  41. // 获取当前失败的任务索引
  42. get failedTaskIndex(): number | null {
  43. return this._status === 'failed' ? this.currentTaskIndex : null;
  44. }
  45. private async executeNextTask() {
  46. if (!this.workflow || this.currentTaskIndex >= this.workflow.taskList.length) {
  47. this._status = 'success';
  48. this.statusChange$.next(this._status);
  49. return;
  50. }
  51. const task = this.workflow.taskList[this.currentTaskIndex];
  52. try {
  53. this.taskStart$.next(task);
  54. await task.execute();
  55. this.taskSuccess$.next(task);
  56. this.currentTaskIndex++;
  57. this.retryCount = 0;
  58. await this.executeNextTask();
  59. } catch (error) {
  60. this.taskFailure$.next({ task, error: error as Error });
  61. if (this.autoRetry && this.retryCount < this.maxRetries) {
  62. this.retryCount++;
  63. await this.executeNextTask();
  64. } else {
  65. this._status = 'failed';
  66. this.statusChange$.next(this._status);
  67. }
  68. }
  69. }
  70. get status() {
  71. return this._status;
  72. }
  73. get currentProgress() {
  74. if (!this.workflow) return 0;
  75. return this.currentTaskIndex / this.workflow.taskList.length;
  76. }
  77. reset() {
  78. this.currentTaskIndex = 0;
  79. this.retryCount = 0;
  80. this._status = 'idle';
  81. }
  82. }