api-request.js 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. 'use strict'
  2. const assert = require('node:assert')
  3. const { Readable } = require('./readable')
  4. const { InvalidArgumentError, RequestAbortedError } = require('../core/errors')
  5. const util = require('../core/util')
  6. const { getResolveErrorBodyCallback } = require('./util')
  7. const { AsyncResource } = require('node:async_hooks')
  8. class RequestHandler extends AsyncResource {
  9. constructor (opts, callback) {
  10. if (!opts || typeof opts !== 'object') {
  11. throw new InvalidArgumentError('invalid opts')
  12. }
  13. const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError, highWaterMark } = opts
  14. try {
  15. if (typeof callback !== 'function') {
  16. throw new InvalidArgumentError('invalid callback')
  17. }
  18. if (highWaterMark && (typeof highWaterMark !== 'number' || highWaterMark < 0)) {
  19. throw new InvalidArgumentError('invalid highWaterMark')
  20. }
  21. if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
  22. throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
  23. }
  24. if (method === 'CONNECT') {
  25. throw new InvalidArgumentError('invalid method')
  26. }
  27. if (onInfo && typeof onInfo !== 'function') {
  28. throw new InvalidArgumentError('invalid onInfo callback')
  29. }
  30. super('UNDICI_REQUEST')
  31. } catch (err) {
  32. if (util.isStream(body)) {
  33. util.destroy(body.on('error', util.nop), err)
  34. }
  35. throw err
  36. }
  37. this.method = method
  38. this.responseHeaders = responseHeaders || null
  39. this.opaque = opaque || null
  40. this.callback = callback
  41. this.res = null
  42. this.abort = null
  43. this.body = body
  44. this.trailers = {}
  45. this.context = null
  46. this.onInfo = onInfo || null
  47. this.throwOnError = throwOnError
  48. this.highWaterMark = highWaterMark
  49. this.signal = signal
  50. this.reason = null
  51. this.removeAbortListener = null
  52. if (util.isStream(body)) {
  53. body.on('error', (err) => {
  54. this.onError(err)
  55. })
  56. }
  57. if (this.signal) {
  58. if (this.signal.aborted) {
  59. this.reason = this.signal.reason ?? new RequestAbortedError()
  60. } else {
  61. this.removeAbortListener = util.addAbortListener(this.signal, () => {
  62. this.reason = this.signal.reason ?? new RequestAbortedError()
  63. if (this.res) {
  64. util.destroy(this.res, this.reason)
  65. } else if (this.abort) {
  66. this.abort(this.reason)
  67. }
  68. if (this.removeAbortListener) {
  69. this.res?.off('close', this.removeAbortListener)
  70. this.removeAbortListener()
  71. this.removeAbortListener = null
  72. }
  73. })
  74. }
  75. }
  76. }
  77. onConnect (abort, context) {
  78. if (this.reason) {
  79. abort(this.reason)
  80. return
  81. }
  82. assert(this.callback)
  83. this.abort = abort
  84. this.context = context
  85. }
  86. onHeaders (statusCode, rawHeaders, resume, statusMessage) {
  87. const { callback, opaque, abort, context, responseHeaders, highWaterMark } = this
  88. const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
  89. if (statusCode < 200) {
  90. if (this.onInfo) {
  91. this.onInfo({ statusCode, headers })
  92. }
  93. return
  94. }
  95. const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers
  96. const contentType = parsedHeaders['content-type']
  97. const contentLength = parsedHeaders['content-length']
  98. const res = new Readable({
  99. resume,
  100. abort,
  101. contentType,
  102. contentLength: this.method !== 'HEAD' && contentLength
  103. ? Number(contentLength)
  104. : null,
  105. highWaterMark
  106. })
  107. if (this.removeAbortListener) {
  108. res.on('close', this.removeAbortListener)
  109. }
  110. this.callback = null
  111. this.res = res
  112. if (callback !== null) {
  113. if (this.throwOnError && statusCode >= 400) {
  114. this.runInAsyncScope(getResolveErrorBodyCallback, null,
  115. { callback, body: res, contentType, statusCode, statusMessage, headers }
  116. )
  117. } else {
  118. this.runInAsyncScope(callback, null, null, {
  119. statusCode,
  120. headers,
  121. trailers: this.trailers,
  122. opaque,
  123. body: res,
  124. context
  125. })
  126. }
  127. }
  128. }
  129. onData (chunk) {
  130. return this.res.push(chunk)
  131. }
  132. onComplete (trailers) {
  133. util.parseHeaders(trailers, this.trailers)
  134. this.res.push(null)
  135. }
  136. onError (err) {
  137. const { res, callback, body, opaque } = this
  138. if (callback) {
  139. // TODO: Does this need queueMicrotask?
  140. this.callback = null
  141. queueMicrotask(() => {
  142. this.runInAsyncScope(callback, null, err, { opaque })
  143. })
  144. }
  145. if (res) {
  146. this.res = null
  147. // Ensure all queued handlers are invoked before destroying res.
  148. queueMicrotask(() => {
  149. util.destroy(res, err)
  150. })
  151. }
  152. if (body) {
  153. this.body = null
  154. util.destroy(body, err)
  155. }
  156. if (this.removeAbortListener) {
  157. res?.off('close', this.removeAbortListener)
  158. this.removeAbortListener()
  159. this.removeAbortListener = null
  160. }
  161. }
  162. }
  163. function request (opts, callback) {
  164. if (callback === undefined) {
  165. return new Promise((resolve, reject) => {
  166. request.call(this, opts, (err, data) => {
  167. return err ? reject(err) : resolve(data)
  168. })
  169. })
  170. }
  171. try {
  172. this.dispatch(opts, new RequestHandler(opts, callback))
  173. } catch (err) {
  174. if (typeof callback !== 'function') {
  175. throw err
  176. }
  177. const opaque = opts?.opaque
  178. queueMicrotask(() => callback(err, { opaque }))
  179. }
  180. }
  181. module.exports = request
  182. module.exports.RequestHandler = RequestHandler