readable.js 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. // Ported from https://github.com/nodejs/undici/pull/907
  2. 'use strict'
  3. const assert = require('node:assert')
  4. const { Readable } = require('node:stream')
  5. const { RequestAbortedError, NotSupportedError, InvalidArgumentError, AbortError } = require('../core/errors')
  6. const util = require('../core/util')
  7. const { ReadableStreamFrom } = require('../core/util')
  8. const kConsume = Symbol('kConsume')
  9. const kReading = Symbol('kReading')
  10. const kBody = Symbol('kBody')
  11. const kAbort = Symbol('kAbort')
  12. const kContentType = Symbol('kContentType')
  13. const kContentLength = Symbol('kContentLength')
  14. const noop = () => {}
  15. class BodyReadable extends Readable {
  16. constructor ({
  17. resume,
  18. abort,
  19. contentType = '',
  20. contentLength,
  21. highWaterMark = 64 * 1024 // Same as nodejs fs streams.
  22. }) {
  23. super({
  24. autoDestroy: true,
  25. read: resume,
  26. highWaterMark
  27. })
  28. this._readableState.dataEmitted = false
  29. this[kAbort] = abort
  30. this[kConsume] = null
  31. this[kBody] = null
  32. this[kContentType] = contentType
  33. this[kContentLength] = contentLength
  34. // Is stream being consumed through Readable API?
  35. // This is an optimization so that we avoid checking
  36. // for 'data' and 'readable' listeners in the hot path
  37. // inside push().
  38. this[kReading] = false
  39. }
  40. destroy (err) {
  41. if (!err && !this._readableState.endEmitted) {
  42. err = new RequestAbortedError()
  43. }
  44. if (err) {
  45. this[kAbort]()
  46. }
  47. return super.destroy(err)
  48. }
  49. _destroy (err, callback) {
  50. // Workaround for Node "bug". If the stream is destroyed in same
  51. // tick as it is created, then a user who is waiting for a
  52. // promise (i.e micro tick) for installing a 'error' listener will
  53. // never get a chance and will always encounter an unhandled exception.
  54. if (!this[kReading]) {
  55. setImmediate(() => {
  56. callback(err)
  57. })
  58. } else {
  59. callback(err)
  60. }
  61. }
  62. on (ev, ...args) {
  63. if (ev === 'data' || ev === 'readable') {
  64. this[kReading] = true
  65. }
  66. return super.on(ev, ...args)
  67. }
  68. addListener (ev, ...args) {
  69. return this.on(ev, ...args)
  70. }
  71. off (ev, ...args) {
  72. const ret = super.off(ev, ...args)
  73. if (ev === 'data' || ev === 'readable') {
  74. this[kReading] = (
  75. this.listenerCount('data') > 0 ||
  76. this.listenerCount('readable') > 0
  77. )
  78. }
  79. return ret
  80. }
  81. removeListener (ev, ...args) {
  82. return this.off(ev, ...args)
  83. }
  84. push (chunk) {
  85. if (this[kConsume] && chunk !== null) {
  86. consumePush(this[kConsume], chunk)
  87. return this[kReading] ? super.push(chunk) : true
  88. }
  89. return super.push(chunk)
  90. }
  91. // https://fetch.spec.whatwg.org/#dom-body-text
  92. async text () {
  93. return consume(this, 'text')
  94. }
  95. // https://fetch.spec.whatwg.org/#dom-body-json
  96. async json () {
  97. return consume(this, 'json')
  98. }
  99. // https://fetch.spec.whatwg.org/#dom-body-blob
  100. async blob () {
  101. return consume(this, 'blob')
  102. }
  103. // https://fetch.spec.whatwg.org/#dom-body-bytes
  104. async bytes () {
  105. return consume(this, 'bytes')
  106. }
  107. // https://fetch.spec.whatwg.org/#dom-body-arraybuffer
  108. async arrayBuffer () {
  109. return consume(this, 'arrayBuffer')
  110. }
  111. // https://fetch.spec.whatwg.org/#dom-body-formdata
  112. async formData () {
  113. // TODO: Implement.
  114. throw new NotSupportedError()
  115. }
  116. // https://fetch.spec.whatwg.org/#dom-body-bodyused
  117. get bodyUsed () {
  118. return util.isDisturbed(this)
  119. }
  120. // https://fetch.spec.whatwg.org/#dom-body-body
  121. get body () {
  122. if (!this[kBody]) {
  123. this[kBody] = ReadableStreamFrom(this)
  124. if (this[kConsume]) {
  125. // TODO: Is this the best way to force a lock?
  126. this[kBody].getReader() // Ensure stream is locked.
  127. assert(this[kBody].locked)
  128. }
  129. }
  130. return this[kBody]
  131. }
  132. async dump (opts) {
  133. let limit = Number.isFinite(opts?.limit) ? opts.limit : 128 * 1024
  134. const signal = opts?.signal
  135. if (signal != null && (typeof signal !== 'object' || !('aborted' in signal))) {
  136. throw new InvalidArgumentError('signal must be an AbortSignal')
  137. }
  138. signal?.throwIfAborted()
  139. if (this._readableState.closeEmitted) {
  140. return null
  141. }
  142. return await new Promise((resolve, reject) => {
  143. if (this[kContentLength] > limit) {
  144. this.destroy(new AbortError())
  145. }
  146. const onAbort = () => {
  147. this.destroy(signal.reason ?? new AbortError())
  148. }
  149. signal?.addEventListener('abort', onAbort)
  150. this
  151. .on('close', function () {
  152. signal?.removeEventListener('abort', onAbort)
  153. if (signal?.aborted) {
  154. reject(signal.reason ?? new AbortError())
  155. } else {
  156. resolve(null)
  157. }
  158. })
  159. .on('error', noop)
  160. .on('data', function (chunk) {
  161. limit -= chunk.length
  162. if (limit <= 0) {
  163. this.destroy()
  164. }
  165. })
  166. .resume()
  167. })
  168. }
  169. }
  170. // https://streams.spec.whatwg.org/#readablestream-locked
  171. function isLocked (self) {
  172. // Consume is an implicit lock.
  173. return (self[kBody] && self[kBody].locked === true) || self[kConsume]
  174. }
  175. // https://fetch.spec.whatwg.org/#body-unusable
  176. function isUnusable (self) {
  177. return util.isDisturbed(self) || isLocked(self)
  178. }
  179. async function consume (stream, type) {
  180. assert(!stream[kConsume])
  181. return new Promise((resolve, reject) => {
  182. if (isUnusable(stream)) {
  183. const rState = stream._readableState
  184. if (rState.destroyed && rState.closeEmitted === false) {
  185. stream
  186. .on('error', err => {
  187. reject(err)
  188. })
  189. .on('close', () => {
  190. reject(new TypeError('unusable'))
  191. })
  192. } else {
  193. reject(rState.errored ?? new TypeError('unusable'))
  194. }
  195. } else {
  196. queueMicrotask(() => {
  197. stream[kConsume] = {
  198. type,
  199. stream,
  200. resolve,
  201. reject,
  202. length: 0,
  203. body: []
  204. }
  205. stream
  206. .on('error', function (err) {
  207. consumeFinish(this[kConsume], err)
  208. })
  209. .on('close', function () {
  210. if (this[kConsume].body !== null) {
  211. consumeFinish(this[kConsume], new RequestAbortedError())
  212. }
  213. })
  214. consumeStart(stream[kConsume])
  215. })
  216. }
  217. })
  218. }
  219. function consumeStart (consume) {
  220. if (consume.body === null) {
  221. return
  222. }
  223. const { _readableState: state } = consume.stream
  224. if (state.bufferIndex) {
  225. const start = state.bufferIndex
  226. const end = state.buffer.length
  227. for (let n = start; n < end; n++) {
  228. consumePush(consume, state.buffer[n])
  229. }
  230. } else {
  231. for (const chunk of state.buffer) {
  232. consumePush(consume, chunk)
  233. }
  234. }
  235. if (state.endEmitted) {
  236. consumeEnd(this[kConsume])
  237. } else {
  238. consume.stream.on('end', function () {
  239. consumeEnd(this[kConsume])
  240. })
  241. }
  242. consume.stream.resume()
  243. while (consume.stream.read() != null) {
  244. // Loop
  245. }
  246. }
  247. /**
  248. * @param {Buffer[]} chunks
  249. * @param {number} length
  250. */
  251. function chunksDecode (chunks, length) {
  252. if (chunks.length === 0 || length === 0) {
  253. return ''
  254. }
  255. const buffer = chunks.length === 1 ? chunks[0] : Buffer.concat(chunks, length)
  256. const bufferLength = buffer.length
  257. // Skip BOM.
  258. const start =
  259. bufferLength > 2 &&
  260. buffer[0] === 0xef &&
  261. buffer[1] === 0xbb &&
  262. buffer[2] === 0xbf
  263. ? 3
  264. : 0
  265. return buffer.utf8Slice(start, bufferLength)
  266. }
  267. /**
  268. * @param {Buffer[]} chunks
  269. * @param {number} length
  270. * @returns {Uint8Array}
  271. */
  272. function chunksConcat (chunks, length) {
  273. if (chunks.length === 0 || length === 0) {
  274. return new Uint8Array(0)
  275. }
  276. if (chunks.length === 1) {
  277. // fast-path
  278. return new Uint8Array(chunks[0])
  279. }
  280. const buffer = new Uint8Array(Buffer.allocUnsafeSlow(length).buffer)
  281. let offset = 0
  282. for (let i = 0; i < chunks.length; ++i) {
  283. const chunk = chunks[i]
  284. buffer.set(chunk, offset)
  285. offset += chunk.length
  286. }
  287. return buffer
  288. }
  289. function consumeEnd (consume) {
  290. const { type, body, resolve, stream, length } = consume
  291. try {
  292. if (type === 'text') {
  293. resolve(chunksDecode(body, length))
  294. } else if (type === 'json') {
  295. resolve(JSON.parse(chunksDecode(body, length)))
  296. } else if (type === 'arrayBuffer') {
  297. resolve(chunksConcat(body, length).buffer)
  298. } else if (type === 'blob') {
  299. resolve(new Blob(body, { type: stream[kContentType] }))
  300. } else if (type === 'bytes') {
  301. resolve(chunksConcat(body, length))
  302. }
  303. consumeFinish(consume)
  304. } catch (err) {
  305. stream.destroy(err)
  306. }
  307. }
  308. function consumePush (consume, chunk) {
  309. consume.length += chunk.length
  310. consume.body.push(chunk)
  311. }
  312. function consumeFinish (consume, err) {
  313. if (consume.body === null) {
  314. return
  315. }
  316. if (err) {
  317. consume.reject(err)
  318. } else {
  319. consume.resolve()
  320. }
  321. consume.type = null
  322. consume.stream = null
  323. consume.resolve = null
  324. consume.reject = null
  325. consume.length = 0
  326. consume.body = null
  327. }
  328. module.exports = { Readable: BodyReadable, chunksDecode }