flow.executor.ts 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. // flow-executor.ts
  2. import { FlowWorkflow, FlowStatus } from './flow.workflow';
  3. import { FlowTask } from './flow.task';
  4. import { Subject } from 'rxjs';
  5. export class FlowExecutor {
  6. public workflow?: FlowWorkflow;
  7. private currentTaskIndex = 0;
  8. private _status: FlowStatus = 'idle';
  9. private retryCount = 0;
  10. // 共享数据
  11. public sharedData: Record<string, any> = {}; // 共享数据存储
  12. // 事件系统
  13. public taskStart$ = new Subject<FlowTask>();
  14. public taskSuccess$ = new Subject<FlowTask>();
  15. public taskFailure$ = new Subject<{ task: FlowTask; error: Error }>();
  16. public statusChange$ = new Subject<FlowStatus>();
  17. public progressUpdate$ = new Subject<number>();
  18. constructor(
  19. public maxRetries = 3,
  20. public autoRetry = false
  21. ) { }
  22. setWorkflow(workflow: FlowWorkflow) {
  23. this.workflow = workflow;
  24. // 初始化所有任务的共享数据
  25. this.workflow.taskList.forEach(task => {
  26. task.setSharedData(this.sharedData);
  27. });
  28. this.reset();
  29. }
  30. // 添加获取共享数据的方法
  31. getSharedData(): Record<string, any> {
  32. return this.sharedData;
  33. }
  34. // 添加更新共享数据的方法
  35. updateSharedData(data: Record<string, any>): void {
  36. this.sharedData = { ...this.sharedData, ...data };
  37. // 更新所有任务的引用(以防有新增任务)
  38. this.workflow?.taskList.forEach(task => {
  39. task.data = this.sharedData;
  40. });
  41. }
  42. async start() {
  43. if (!this.workflow) throw new Error('工作流未设置');
  44. this._status = 'running';
  45. this.statusChange$.next(this._status);
  46. await this.executeNextTask();
  47. }
  48. // 方法:从指定任务索引重新执行
  49. async retryFromTask(taskIndex: number) {
  50. if (!this.workflow || taskIndex < 0 || taskIndex >= this.workflow.taskList.length) {
  51. throw new Error('无效的任务索引');
  52. }
  53. this.currentTaskIndex = taskIndex;
  54. this.retryCount = 0;
  55. this._status = 'running';
  56. this.statusChange$.next(this._status);
  57. await this.executeNextTask();
  58. }
  59. // 获取当前失败的任务索引
  60. get failedTaskIndex(): number | null {
  61. return this._status === 'failed' ? this.currentTaskIndex : null;
  62. }
  63. private async executeNextTask() {
  64. if (!this.workflow || this.currentTaskIndex >= this.workflow.taskList.length) {
  65. this._status = 'success';
  66. this.statusChange$.next(this._status);
  67. return;
  68. }
  69. const task = this.workflow.taskList[this.currentTaskIndex];
  70. task.data = this.sharedData;
  71. try {
  72. this.taskStart$.next(task);
  73. await task.execute();
  74. console.log(task.title, task.data, this.sharedData)
  75. this.sharedData = task.data;
  76. // 只有当任务状态是success时才继续
  77. if (task.status === 'success') {
  78. this.taskSuccess$.next(task);
  79. this.currentTaskIndex++;
  80. this.retryCount = 0;
  81. await this.executeNextTask();
  82. }
  83. // 如果是idle状态(用户取消),不继续执行也不标记为失败
  84. } catch (error) {
  85. console.error('任务执行错误:', error);
  86. this.taskFailure$.next({ task, error: error as Error });
  87. if (this.autoRetry && this.retryCount < this.maxRetries) {
  88. this.retryCount++;
  89. await this.executeNextTask();
  90. } else {
  91. this._status = 'failed';
  92. this.statusChange$.next(this._status);
  93. }
  94. }
  95. }
  96. get status() {
  97. return this._status;
  98. }
  99. get currentProgress() {
  100. if (!this.workflow) return 0;
  101. return this.currentTaskIndex / this.workflow.taskList.length;
  102. }
  103. reset() {
  104. this.currentTaskIndex = 0;
  105. this.retryCount = 0;
  106. this._status = 'idle';
  107. }
  108. }