observableToAsyncIterable.js 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.observableToAsyncIterable = void 0;
  4. function observableToAsyncIterable(observable) {
  5. const pullQueue = [];
  6. const pushQueue = [];
  7. let listening = true;
  8. const pushValue = (value) => {
  9. if (pullQueue.length !== 0) {
  10. // It is safe to use the ! operator here as we check the length.
  11. pullQueue.shift()({ value, done: false });
  12. }
  13. else {
  14. pushQueue.push({ value, done: false });
  15. }
  16. };
  17. const pushError = (error) => {
  18. if (pullQueue.length !== 0) {
  19. // It is safe to use the ! operator here as we check the length.
  20. pullQueue.shift()({ value: { errors: [error] }, done: false });
  21. }
  22. else {
  23. pushQueue.push({ value: { errors: [error] }, done: false });
  24. }
  25. };
  26. const pushDone = () => {
  27. if (pullQueue.length !== 0) {
  28. // It is safe to use the ! operator here as we check the length.
  29. pullQueue.shift()({ done: true });
  30. }
  31. else {
  32. pushQueue.push({ done: true });
  33. }
  34. };
  35. const pullValue = () => new Promise(resolve => {
  36. if (pushQueue.length !== 0) {
  37. const element = pushQueue.shift();
  38. // either {value: {errors: [...]}} or {value: ...}
  39. resolve(element);
  40. }
  41. else {
  42. pullQueue.push(resolve);
  43. }
  44. });
  45. const subscription = observable.subscribe({
  46. next(value) {
  47. pushValue(value);
  48. },
  49. error(err) {
  50. pushError(err);
  51. },
  52. complete() {
  53. pushDone();
  54. },
  55. });
  56. const emptyQueue = () => {
  57. if (listening) {
  58. listening = false;
  59. subscription.unsubscribe();
  60. for (const resolve of pullQueue) {
  61. resolve({ value: undefined, done: true });
  62. }
  63. pullQueue.length = 0;
  64. pushQueue.length = 0;
  65. }
  66. };
  67. return {
  68. next() {
  69. // return is a defined method, so it is safe to call it.
  70. return listening ? pullValue() : this.return();
  71. },
  72. return() {
  73. emptyQueue();
  74. return Promise.resolve({ value: undefined, done: true });
  75. },
  76. throw(error) {
  77. emptyQueue();
  78. return Promise.reject(error);
  79. },
  80. [Symbol.asyncIterator]() {
  81. return this;
  82. },
  83. };
  84. }
  85. exports.observableToAsyncIterable = observableToAsyncIterable;