api-pipeline.js 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. 'use strict'
  2. const {
  3. Readable,
  4. Duplex,
  5. PassThrough
  6. } = require('node:stream')
  7. const {
  8. InvalidArgumentError,
  9. InvalidReturnValueError,
  10. RequestAbortedError
  11. } = require('../core/errors')
  12. const util = require('../core/util')
  13. const { AsyncResource } = require('node:async_hooks')
  14. const { addSignal, removeSignal } = require('./abort-signal')
  15. const assert = require('node:assert')
  16. const kResume = Symbol('resume')
  17. class PipelineRequest extends Readable {
  18. constructor () {
  19. super({ autoDestroy: true })
  20. this[kResume] = null
  21. }
  22. _read () {
  23. const { [kResume]: resume } = this
  24. if (resume) {
  25. this[kResume] = null
  26. resume()
  27. }
  28. }
  29. _destroy (err, callback) {
  30. this._read()
  31. callback(err)
  32. }
  33. }
  34. class PipelineResponse extends Readable {
  35. constructor (resume) {
  36. super({ autoDestroy: true })
  37. this[kResume] = resume
  38. }
  39. _read () {
  40. this[kResume]()
  41. }
  42. _destroy (err, callback) {
  43. if (!err && !this._readableState.endEmitted) {
  44. err = new RequestAbortedError()
  45. }
  46. callback(err)
  47. }
  48. }
  49. class PipelineHandler extends AsyncResource {
  50. constructor (opts, handler) {
  51. if (!opts || typeof opts !== 'object') {
  52. throw new InvalidArgumentError('invalid opts')
  53. }
  54. if (typeof handler !== 'function') {
  55. throw new InvalidArgumentError('invalid handler')
  56. }
  57. const { signal, method, opaque, onInfo, responseHeaders } = opts
  58. if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
  59. throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
  60. }
  61. if (method === 'CONNECT') {
  62. throw new InvalidArgumentError('invalid method')
  63. }
  64. if (onInfo && typeof onInfo !== 'function') {
  65. throw new InvalidArgumentError('invalid onInfo callback')
  66. }
  67. super('UNDICI_PIPELINE')
  68. this.opaque = opaque || null
  69. this.responseHeaders = responseHeaders || null
  70. this.handler = handler
  71. this.abort = null
  72. this.context = null
  73. this.onInfo = onInfo || null
  74. this.req = new PipelineRequest().on('error', util.nop)
  75. this.ret = new Duplex({
  76. readableObjectMode: opts.objectMode,
  77. autoDestroy: true,
  78. read: () => {
  79. const { body } = this
  80. if (body?.resume) {
  81. body.resume()
  82. }
  83. },
  84. write: (chunk, encoding, callback) => {
  85. const { req } = this
  86. if (req.push(chunk, encoding) || req._readableState.destroyed) {
  87. callback()
  88. } else {
  89. req[kResume] = callback
  90. }
  91. },
  92. destroy: (err, callback) => {
  93. const { body, req, res, ret, abort } = this
  94. if (!err && !ret._readableState.endEmitted) {
  95. err = new RequestAbortedError()
  96. }
  97. if (abort && err) {
  98. abort()
  99. }
  100. util.destroy(body, err)
  101. util.destroy(req, err)
  102. util.destroy(res, err)
  103. removeSignal(this)
  104. callback(err)
  105. }
  106. }).on('prefinish', () => {
  107. const { req } = this
  108. // Node < 15 does not call _final in same tick.
  109. req.push(null)
  110. })
  111. this.res = null
  112. addSignal(this, signal)
  113. }
  114. onConnect (abort, context) {
  115. const { ret, res } = this
  116. if (this.reason) {
  117. abort(this.reason)
  118. return
  119. }
  120. assert(!res, 'pipeline cannot be retried')
  121. assert(!ret.destroyed)
  122. this.abort = abort
  123. this.context = context
  124. }
  125. onHeaders (statusCode, rawHeaders, resume) {
  126. const { opaque, handler, context } = this
  127. if (statusCode < 200) {
  128. if (this.onInfo) {
  129. const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
  130. this.onInfo({ statusCode, headers })
  131. }
  132. return
  133. }
  134. this.res = new PipelineResponse(resume)
  135. let body
  136. try {
  137. this.handler = null
  138. const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
  139. body = this.runInAsyncScope(handler, null, {
  140. statusCode,
  141. headers,
  142. opaque,
  143. body: this.res,
  144. context
  145. })
  146. } catch (err) {
  147. this.res.on('error', util.nop)
  148. throw err
  149. }
  150. if (!body || typeof body.on !== 'function') {
  151. throw new InvalidReturnValueError('expected Readable')
  152. }
  153. body
  154. .on('data', (chunk) => {
  155. const { ret, body } = this
  156. if (!ret.push(chunk) && body.pause) {
  157. body.pause()
  158. }
  159. })
  160. .on('error', (err) => {
  161. const { ret } = this
  162. util.destroy(ret, err)
  163. })
  164. .on('end', () => {
  165. const { ret } = this
  166. ret.push(null)
  167. })
  168. .on('close', () => {
  169. const { ret } = this
  170. if (!ret._readableState.ended) {
  171. util.destroy(ret, new RequestAbortedError())
  172. }
  173. })
  174. this.body = body
  175. }
  176. onData (chunk) {
  177. const { res } = this
  178. return res.push(chunk)
  179. }
  180. onComplete (trailers) {
  181. const { res } = this
  182. res.push(null)
  183. }
  184. onError (err) {
  185. const { ret } = this
  186. this.handler = null
  187. util.destroy(ret, err)
  188. }
  189. }
  190. function pipeline (opts, handler) {
  191. try {
  192. const pipelineHandler = new PipelineHandler(opts, handler)
  193. this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)
  194. return pipelineHandler.ret
  195. } catch (err) {
  196. return new PassThrough().destroy(err)
  197. }
  198. }
  199. module.exports = pipeline