api-stream.js 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. 'use strict'
  2. const assert = require('node:assert')
  3. const { finished, PassThrough } = require('node:stream')
  4. const { InvalidArgumentError, InvalidReturnValueError } = require('../core/errors')
  5. const util = require('../core/util')
  6. const { getResolveErrorBodyCallback } = require('./util')
  7. const { AsyncResource } = require('node:async_hooks')
  8. const { addSignal, removeSignal } = require('./abort-signal')
  9. class StreamHandler extends AsyncResource {
  10. constructor (opts, factory, callback) {
  11. if (!opts || typeof opts !== 'object') {
  12. throw new InvalidArgumentError('invalid opts')
  13. }
  14. const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError } = opts
  15. try {
  16. if (typeof callback !== 'function') {
  17. throw new InvalidArgumentError('invalid callback')
  18. }
  19. if (typeof factory !== 'function') {
  20. throw new InvalidArgumentError('invalid factory')
  21. }
  22. if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
  23. throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
  24. }
  25. if (method === 'CONNECT') {
  26. throw new InvalidArgumentError('invalid method')
  27. }
  28. if (onInfo && typeof onInfo !== 'function') {
  29. throw new InvalidArgumentError('invalid onInfo callback')
  30. }
  31. super('UNDICI_STREAM')
  32. } catch (err) {
  33. if (util.isStream(body)) {
  34. util.destroy(body.on('error', util.nop), err)
  35. }
  36. throw err
  37. }
  38. this.responseHeaders = responseHeaders || null
  39. this.opaque = opaque || null
  40. this.factory = factory
  41. this.callback = callback
  42. this.res = null
  43. this.abort = null
  44. this.context = null
  45. this.trailers = null
  46. this.body = body
  47. this.onInfo = onInfo || null
  48. this.throwOnError = throwOnError || false
  49. if (util.isStream(body)) {
  50. body.on('error', (err) => {
  51. this.onError(err)
  52. })
  53. }
  54. addSignal(this, signal)
  55. }
  56. onConnect (abort, context) {
  57. if (this.reason) {
  58. abort(this.reason)
  59. return
  60. }
  61. assert(this.callback)
  62. this.abort = abort
  63. this.context = context
  64. }
  65. onHeaders (statusCode, rawHeaders, resume, statusMessage) {
  66. const { factory, opaque, context, callback, responseHeaders } = this
  67. const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
  68. if (statusCode < 200) {
  69. if (this.onInfo) {
  70. this.onInfo({ statusCode, headers })
  71. }
  72. return
  73. }
  74. this.factory = null
  75. let res
  76. if (this.throwOnError && statusCode >= 400) {
  77. const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers
  78. const contentType = parsedHeaders['content-type']
  79. res = new PassThrough()
  80. this.callback = null
  81. this.runInAsyncScope(getResolveErrorBodyCallback, null,
  82. { callback, body: res, contentType, statusCode, statusMessage, headers }
  83. )
  84. } else {
  85. if (factory === null) {
  86. return
  87. }
  88. res = this.runInAsyncScope(factory, null, {
  89. statusCode,
  90. headers,
  91. opaque,
  92. context
  93. })
  94. if (
  95. !res ||
  96. typeof res.write !== 'function' ||
  97. typeof res.end !== 'function' ||
  98. typeof res.on !== 'function'
  99. ) {
  100. throw new InvalidReturnValueError('expected Writable')
  101. }
  102. // TODO: Avoid finished. It registers an unnecessary amount of listeners.
  103. finished(res, { readable: false }, (err) => {
  104. const { callback, res, opaque, trailers, abort } = this
  105. this.res = null
  106. if (err || !res.readable) {
  107. util.destroy(res, err)
  108. }
  109. this.callback = null
  110. this.runInAsyncScope(callback, null, err || null, { opaque, trailers })
  111. if (err) {
  112. abort()
  113. }
  114. })
  115. }
  116. res.on('drain', resume)
  117. this.res = res
  118. const needDrain = res.writableNeedDrain !== undefined
  119. ? res.writableNeedDrain
  120. : res._writableState?.needDrain
  121. return needDrain !== true
  122. }
  123. onData (chunk) {
  124. const { res } = this
  125. return res ? res.write(chunk) : true
  126. }
  127. onComplete (trailers) {
  128. const { res } = this
  129. removeSignal(this)
  130. if (!res) {
  131. return
  132. }
  133. this.trailers = util.parseHeaders(trailers)
  134. res.end()
  135. }
  136. onError (err) {
  137. const { res, callback, opaque, body } = this
  138. removeSignal(this)
  139. this.factory = null
  140. if (res) {
  141. this.res = null
  142. util.destroy(res, err)
  143. } else if (callback) {
  144. this.callback = null
  145. queueMicrotask(() => {
  146. this.runInAsyncScope(callback, null, err, { opaque })
  147. })
  148. }
  149. if (body) {
  150. this.body = null
  151. util.destroy(body, err)
  152. }
  153. }
  154. }
  155. function stream (opts, factory, callback) {
  156. if (callback === undefined) {
  157. return new Promise((resolve, reject) => {
  158. stream.call(this, opts, factory, (err, data) => {
  159. return err ? reject(err) : resolve(data)
  160. })
  161. })
  162. }
  163. try {
  164. this.dispatch(opts, new StreamHandler(opts, factory, callback))
  165. } catch (err) {
  166. if (typeof callback !== 'function') {
  167. throw err
  168. }
  169. const opaque = opts?.opaque
  170. queueMicrotask(() => callback(err, { opaque }))
  171. }
  172. }
  173. module.exports = stream