observableToAsyncIterable.js 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. export function observableToAsyncIterable(observable) {
  2. const pullQueue = [];
  3. const pushQueue = [];
  4. let listening = true;
  5. const pushValue = (value) => {
  6. if (pullQueue.length !== 0) {
  7. // It is safe to use the ! operator here as we check the length.
  8. pullQueue.shift()({ value, done: false });
  9. }
  10. else {
  11. pushQueue.push({ value, done: false });
  12. }
  13. };
  14. const pushError = (error) => {
  15. if (pullQueue.length !== 0) {
  16. // It is safe to use the ! operator here as we check the length.
  17. pullQueue.shift()({ value: { errors: [error] }, done: false });
  18. }
  19. else {
  20. pushQueue.push({ value: { errors: [error] }, done: false });
  21. }
  22. };
  23. const pushDone = () => {
  24. if (pullQueue.length !== 0) {
  25. // It is safe to use the ! operator here as we check the length.
  26. pullQueue.shift()({ done: true });
  27. }
  28. else {
  29. pushQueue.push({ done: true });
  30. }
  31. };
  32. const pullValue = () => new Promise(resolve => {
  33. if (pushQueue.length !== 0) {
  34. const element = pushQueue.shift();
  35. // either {value: {errors: [...]}} or {value: ...}
  36. resolve(element);
  37. }
  38. else {
  39. pullQueue.push(resolve);
  40. }
  41. });
  42. const subscription = observable.subscribe({
  43. next(value) {
  44. pushValue(value);
  45. },
  46. error(err) {
  47. pushError(err);
  48. },
  49. complete() {
  50. pushDone();
  51. },
  52. });
  53. const emptyQueue = () => {
  54. if (listening) {
  55. listening = false;
  56. subscription.unsubscribe();
  57. for (const resolve of pullQueue) {
  58. resolve({ value: undefined, done: true });
  59. }
  60. pullQueue.length = 0;
  61. pushQueue.length = 0;
  62. }
  63. };
  64. return {
  65. next() {
  66. // return is a defined method, so it is safe to call it.
  67. return listening ? pullValue() : this.return();
  68. },
  69. return() {
  70. emptyQueue();
  71. return Promise.resolve({ value: undefined, done: true });
  72. },
  73. throw(error) {
  74. emptyQueue();
  75. return Promise.reject(error);
  76. },
  77. [Symbol.asyncIterator]() {
  78. return this;
  79. },
  80. };
  81. }