stream.js 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. import { pickRunnableConfigKeys } from "../runnables/config.js";
  2. import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js";
  3. import { raceWithSignal } from "./signal.js";
  4. /*
  5. * Support async iterator syntax for ReadableStreams in all environments.
  6. * Source: https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490
  7. */
  8. export class IterableReadableStream extends ReadableStream {
  9. constructor() {
  10. super(...arguments);
  11. Object.defineProperty(this, "reader", {
  12. enumerable: true,
  13. configurable: true,
  14. writable: true,
  15. value: void 0
  16. });
  17. }
  18. ensureReader() {
  19. if (!this.reader) {
  20. this.reader = this.getReader();
  21. }
  22. }
  23. async next() {
  24. this.ensureReader();
  25. try {
  26. const result = await this.reader.read();
  27. if (result.done) {
  28. this.reader.releaseLock(); // release lock when stream becomes closed
  29. return {
  30. done: true,
  31. value: undefined,
  32. };
  33. }
  34. else {
  35. return {
  36. done: false,
  37. value: result.value,
  38. };
  39. }
  40. }
  41. catch (e) {
  42. this.reader.releaseLock(); // release lock when stream becomes errored
  43. throw e;
  44. }
  45. }
  46. async return() {
  47. this.ensureReader();
  48. // If wrapped in a Node stream, cancel is already called.
  49. if (this.locked) {
  50. const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet
  51. this.reader.releaseLock(); // release lock first
  52. await cancelPromise; // now await it
  53. }
  54. return { done: true, value: undefined };
  55. }
  56. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  57. async throw(e) {
  58. this.ensureReader();
  59. if (this.locked) {
  60. const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet
  61. this.reader.releaseLock(); // release lock first
  62. await cancelPromise; // now await it
  63. }
  64. throw e;
  65. }
  66. [Symbol.asyncIterator]() {
  67. return this;
  68. }
  69. // eslint-disable-next-line @typescript-eslint/ban-ts-comment
  70. // @ts-ignore Not present in Node 18 types, required in latest Node 22
  71. async [Symbol.asyncDispose]() {
  72. await this.return();
  73. }
  74. static fromReadableStream(stream) {
  75. // From https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams#reading_the_stream
  76. const reader = stream.getReader();
  77. return new IterableReadableStream({
  78. start(controller) {
  79. return pump();
  80. function pump() {
  81. return reader.read().then(({ done, value }) => {
  82. // When no more data needs to be consumed, close the stream
  83. if (done) {
  84. controller.close();
  85. return;
  86. }
  87. // Enqueue the next data chunk into our target stream
  88. controller.enqueue(value);
  89. return pump();
  90. });
  91. }
  92. },
  93. cancel() {
  94. reader.releaseLock();
  95. },
  96. });
  97. }
  98. static fromAsyncGenerator(generator) {
  99. return new IterableReadableStream({
  100. async pull(controller) {
  101. const { value, done } = await generator.next();
  102. // When no more data needs to be consumed, close the stream
  103. if (done) {
  104. controller.close();
  105. }
  106. // Fix: `else if (value)` will hang the streaming when nullish value (e.g. empty string) is pulled
  107. controller.enqueue(value);
  108. },
  109. async cancel(reason) {
  110. await generator.return(reason);
  111. },
  112. });
  113. }
  114. }
  115. export function atee(iter, length = 2) {
  116. const buffers = Array.from({ length }, () => []);
  117. return buffers.map(async function* makeIter(buffer) {
  118. while (true) {
  119. if (buffer.length === 0) {
  120. const result = await iter.next();
  121. for (const buffer of buffers) {
  122. buffer.push(result);
  123. }
  124. }
  125. else if (buffer[0].done) {
  126. return;
  127. }
  128. else {
  129. // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
  130. yield buffer.shift().value;
  131. }
  132. }
  133. });
  134. }
  135. export function concat(first, second) {
  136. if (Array.isArray(first) && Array.isArray(second)) {
  137. return first.concat(second);
  138. }
  139. else if (typeof first === "string" && typeof second === "string") {
  140. return (first + second);
  141. }
  142. else if (typeof first === "number" && typeof second === "number") {
  143. return (first + second);
  144. }
  145. else if (
  146. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  147. "concat" in first &&
  148. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  149. typeof first.concat === "function") {
  150. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  151. return first.concat(second);
  152. }
  153. else if (typeof first === "object" && typeof second === "object") {
  154. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  155. const chunk = { ...first };
  156. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  157. for (const [key, value] of Object.entries(second)) {
  158. if (key in chunk && !Array.isArray(chunk[key])) {
  159. chunk[key] = concat(chunk[key], value);
  160. }
  161. else {
  162. chunk[key] = value;
  163. }
  164. }
  165. return chunk;
  166. }
  167. else {
  168. throw new Error(`Cannot concat ${typeof first} and ${typeof second}`);
  169. }
  170. }
  171. export class AsyncGeneratorWithSetup {
  172. constructor(params) {
  173. Object.defineProperty(this, "generator", {
  174. enumerable: true,
  175. configurable: true,
  176. writable: true,
  177. value: void 0
  178. });
  179. Object.defineProperty(this, "setup", {
  180. enumerable: true,
  181. configurable: true,
  182. writable: true,
  183. value: void 0
  184. });
  185. Object.defineProperty(this, "config", {
  186. enumerable: true,
  187. configurable: true,
  188. writable: true,
  189. value: void 0
  190. });
  191. Object.defineProperty(this, "signal", {
  192. enumerable: true,
  193. configurable: true,
  194. writable: true,
  195. value: void 0
  196. });
  197. Object.defineProperty(this, "firstResult", {
  198. enumerable: true,
  199. configurable: true,
  200. writable: true,
  201. value: void 0
  202. });
  203. Object.defineProperty(this, "firstResultUsed", {
  204. enumerable: true,
  205. configurable: true,
  206. writable: true,
  207. value: false
  208. });
  209. this.generator = params.generator;
  210. this.config = params.config;
  211. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  212. this.signal = params.signal ?? this.config?.signal;
  213. // setup is a promise that resolves only after the first iterator value
  214. // is available. this is useful when setup of several piped generators
  215. // needs to happen in logical order, ie. in the order in which input to
  216. // to each generator is available.
  217. this.setup = new Promise((resolve, reject) => {
  218. void AsyncLocalStorageProviderSingleton.runWithConfig(pickRunnableConfigKeys(params.config), async () => {
  219. this.firstResult = params.generator.next();
  220. if (params.startSetup) {
  221. this.firstResult.then(params.startSetup).then(resolve, reject);
  222. }
  223. else {
  224. this.firstResult.then((_result) => resolve(undefined), reject);
  225. }
  226. }, true);
  227. });
  228. }
  229. async next(...args) {
  230. this.signal?.throwIfAborted();
  231. if (!this.firstResultUsed) {
  232. this.firstResultUsed = true;
  233. return this.firstResult;
  234. }
  235. return AsyncLocalStorageProviderSingleton.runWithConfig(pickRunnableConfigKeys(this.config), this.signal
  236. ? async () => {
  237. return raceWithSignal(this.generator.next(...args), this.signal);
  238. }
  239. : async () => {
  240. return this.generator.next(...args);
  241. }, true);
  242. }
  243. async return(value) {
  244. return this.generator.return(value);
  245. }
  246. async throw(e) {
  247. return this.generator.throw(e);
  248. }
  249. [Symbol.asyncIterator]() {
  250. return this;
  251. }
  252. // eslint-disable-next-line @typescript-eslint/ban-ts-comment
  253. // @ts-ignore Not present in Node 18 types, required in latest Node 22
  254. async [Symbol.asyncDispose]() {
  255. await this.return();
  256. }
  257. }
  258. export async function pipeGeneratorWithSetup(to, generator, startSetup, signal, ...args) {
  259. const gen = new AsyncGeneratorWithSetup({
  260. generator,
  261. startSetup,
  262. signal,
  263. });
  264. const setup = await gen.setup;
  265. return { output: to(gen, setup, ...args), setup };
  266. }