client-h2.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744
  1. 'use strict'
  2. const assert = require('node:assert')
  3. const { pipeline } = require('node:stream')
  4. const util = require('../core/util.js')
  5. const {
  6. RequestContentLengthMismatchError,
  7. RequestAbortedError,
  8. SocketError,
  9. InformationalError
  10. } = require('../core/errors.js')
  11. const {
  12. kUrl,
  13. kReset,
  14. kClient,
  15. kRunning,
  16. kPending,
  17. kQueue,
  18. kPendingIdx,
  19. kRunningIdx,
  20. kError,
  21. kSocket,
  22. kStrictContentLength,
  23. kOnError,
  24. kMaxConcurrentStreams,
  25. kHTTP2Session,
  26. kResume,
  27. kSize,
  28. kHTTPContext
  29. } = require('../core/symbols.js')
  30. const kOpenStreams = Symbol('open streams')
  31. let extractBody
  32. // Experimental
  33. let h2ExperimentalWarned = false
  34. /** @type {import('http2')} */
  35. let http2
  36. try {
  37. http2 = require('node:http2')
  38. } catch {
  39. // @ts-ignore
  40. http2 = { constants: {} }
  41. }
  42. const {
  43. constants: {
  44. HTTP2_HEADER_AUTHORITY,
  45. HTTP2_HEADER_METHOD,
  46. HTTP2_HEADER_PATH,
  47. HTTP2_HEADER_SCHEME,
  48. HTTP2_HEADER_CONTENT_LENGTH,
  49. HTTP2_HEADER_EXPECT,
  50. HTTP2_HEADER_STATUS
  51. }
  52. } = http2
  53. function parseH2Headers (headers) {
  54. const result = []
  55. for (const [name, value] of Object.entries(headers)) {
  56. // h2 may concat the header value by array
  57. // e.g. Set-Cookie
  58. if (Array.isArray(value)) {
  59. for (const subvalue of value) {
  60. // we need to provide each header value of header name
  61. // because the headers handler expect name-value pair
  62. result.push(Buffer.from(name), Buffer.from(subvalue))
  63. }
  64. } else {
  65. result.push(Buffer.from(name), Buffer.from(value))
  66. }
  67. }
  68. return result
  69. }
  70. async function connectH2 (client, socket) {
  71. client[kSocket] = socket
  72. if (!h2ExperimentalWarned) {
  73. h2ExperimentalWarned = true
  74. process.emitWarning('H2 support is experimental, expect them to change at any time.', {
  75. code: 'UNDICI-H2'
  76. })
  77. }
  78. const session = http2.connect(client[kUrl], {
  79. createConnection: () => socket,
  80. peerMaxConcurrentStreams: client[kMaxConcurrentStreams]
  81. })
  82. session[kOpenStreams] = 0
  83. session[kClient] = client
  84. session[kSocket] = socket
  85. util.addListener(session, 'error', onHttp2SessionError)
  86. util.addListener(session, 'frameError', onHttp2FrameError)
  87. util.addListener(session, 'end', onHttp2SessionEnd)
  88. util.addListener(session, 'goaway', onHTTP2GoAway)
  89. util.addListener(session, 'close', function () {
  90. const { [kClient]: client } = this
  91. const { [kSocket]: socket } = client
  92. const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket))
  93. client[kHTTP2Session] = null
  94. if (client.destroyed) {
  95. assert(client[kPending] === 0)
  96. // Fail entire queue.
  97. const requests = client[kQueue].splice(client[kRunningIdx])
  98. for (let i = 0; i < requests.length; i++) {
  99. const request = requests[i]
  100. util.errorRequest(client, request, err)
  101. }
  102. }
  103. })
  104. session.unref()
  105. client[kHTTP2Session] = session
  106. socket[kHTTP2Session] = session
  107. util.addListener(socket, 'error', function (err) {
  108. assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
  109. this[kError] = err
  110. this[kClient][kOnError](err)
  111. })
  112. util.addListener(socket, 'end', function () {
  113. util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
  114. })
  115. util.addListener(socket, 'close', function () {
  116. const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
  117. client[kSocket] = null
  118. if (this[kHTTP2Session] != null) {
  119. this[kHTTP2Session].destroy(err)
  120. }
  121. client[kPendingIdx] = client[kRunningIdx]
  122. assert(client[kRunning] === 0)
  123. client.emit('disconnect', client[kUrl], [client], err)
  124. client[kResume]()
  125. })
  126. let closed = false
  127. socket.on('close', () => {
  128. closed = true
  129. })
  130. return {
  131. version: 'h2',
  132. defaultPipelining: Infinity,
  133. write (...args) {
  134. return writeH2(client, ...args)
  135. },
  136. resume () {
  137. resumeH2(client)
  138. },
  139. destroy (err, callback) {
  140. if (closed) {
  141. queueMicrotask(callback)
  142. } else {
  143. // Destroying the socket will trigger the session close
  144. socket.destroy(err).on('close', callback)
  145. }
  146. },
  147. get destroyed () {
  148. return socket.destroyed
  149. },
  150. busy () {
  151. return false
  152. }
  153. }
  154. }
  155. function resumeH2 (client) {
  156. const socket = client[kSocket]
  157. if (socket?.destroyed === false) {
  158. if (client[kSize] === 0 && client[kMaxConcurrentStreams] === 0) {
  159. socket.unref()
  160. client[kHTTP2Session].unref()
  161. } else {
  162. socket.ref()
  163. client[kHTTP2Session].ref()
  164. }
  165. }
  166. }
  167. function onHttp2SessionError (err) {
  168. assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
  169. this[kSocket][kError] = err
  170. this[kClient][kOnError](err)
  171. }
  172. function onHttp2FrameError (type, code, id) {
  173. if (id === 0) {
  174. const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
  175. this[kSocket][kError] = err
  176. this[kClient][kOnError](err)
  177. }
  178. }
  179. function onHttp2SessionEnd () {
  180. const err = new SocketError('other side closed', util.getSocketInfo(this[kSocket]))
  181. this.destroy(err)
  182. util.destroy(this[kSocket], err)
  183. }
  184. /**
  185. * This is the root cause of #3011
  186. * We need to handle GOAWAY frames properly, and trigger the session close
  187. * along with the socket right away
  188. */
  189. function onHTTP2GoAway (code) {
  190. // We cannot recover, so best to close the session and the socket
  191. const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${code}`, util.getSocketInfo(this))
  192. const client = this[kClient]
  193. client[kSocket] = null
  194. client[kHTTPContext] = null
  195. if (this[kHTTP2Session] != null) {
  196. this[kHTTP2Session].destroy(err)
  197. this[kHTTP2Session] = null
  198. }
  199. util.destroy(this[kSocket], err)
  200. // Fail head of pipeline.
  201. if (client[kRunningIdx] < client[kQueue].length) {
  202. const request = client[kQueue][client[kRunningIdx]]
  203. client[kQueue][client[kRunningIdx]++] = null
  204. util.errorRequest(client, request, err)
  205. client[kPendingIdx] = client[kRunningIdx]
  206. }
  207. assert(client[kRunning] === 0)
  208. client.emit('disconnect', client[kUrl], [client], err)
  209. client[kResume]()
  210. }
  211. // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
  212. function shouldSendContentLength (method) {
  213. return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
  214. }
  215. function writeH2 (client, request) {
  216. const session = client[kHTTP2Session]
  217. const { method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request
  218. let { body } = request
  219. if (upgrade) {
  220. util.errorRequest(client, request, new Error('Upgrade not supported for H2'))
  221. return false
  222. }
  223. const headers = {}
  224. for (let n = 0; n < reqHeaders.length; n += 2) {
  225. const key = reqHeaders[n + 0]
  226. const val = reqHeaders[n + 1]
  227. if (Array.isArray(val)) {
  228. for (let i = 0; i < val.length; i++) {
  229. if (headers[key]) {
  230. headers[key] += `,${val[i]}`
  231. } else {
  232. headers[key] = val[i]
  233. }
  234. }
  235. } else {
  236. headers[key] = val
  237. }
  238. }
  239. /** @type {import('node:http2').ClientHttp2Stream} */
  240. let stream
  241. const { hostname, port } = client[kUrl]
  242. headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}`
  243. headers[HTTP2_HEADER_METHOD] = method
  244. const abort = (err) => {
  245. if (request.aborted || request.completed) {
  246. return
  247. }
  248. err = err || new RequestAbortedError()
  249. util.errorRequest(client, request, err)
  250. if (stream != null) {
  251. util.destroy(stream, err)
  252. }
  253. // We do not destroy the socket as we can continue using the session
  254. // the stream get's destroyed and the session remains to create new streams
  255. util.destroy(body, err)
  256. client[kQueue][client[kRunningIdx]++] = null
  257. client[kResume]()
  258. }
  259. try {
  260. // We are already connected, streams are pending.
  261. // We can call on connect, and wait for abort
  262. request.onConnect(abort)
  263. } catch (err) {
  264. util.errorRequest(client, request, err)
  265. }
  266. if (request.aborted) {
  267. return false
  268. }
  269. if (method === 'CONNECT') {
  270. session.ref()
  271. // We are already connected, streams are pending, first request
  272. // will create a new stream. We trigger a request to create the stream and wait until
  273. // `ready` event is triggered
  274. // We disabled endStream to allow the user to write to the stream
  275. stream = session.request(headers, { endStream: false, signal })
  276. if (stream.id && !stream.pending) {
  277. request.onUpgrade(null, null, stream)
  278. ++session[kOpenStreams]
  279. client[kQueue][client[kRunningIdx]++] = null
  280. } else {
  281. stream.once('ready', () => {
  282. request.onUpgrade(null, null, stream)
  283. ++session[kOpenStreams]
  284. client[kQueue][client[kRunningIdx]++] = null
  285. })
  286. }
  287. stream.once('close', () => {
  288. session[kOpenStreams] -= 1
  289. if (session[kOpenStreams] === 0) session.unref()
  290. })
  291. return true
  292. }
  293. // https://tools.ietf.org/html/rfc7540#section-8.3
  294. // :path and :scheme headers must be omitted when sending CONNECT
  295. headers[HTTP2_HEADER_PATH] = path
  296. headers[HTTP2_HEADER_SCHEME] = 'https'
  297. // https://tools.ietf.org/html/rfc7231#section-4.3.1
  298. // https://tools.ietf.org/html/rfc7231#section-4.3.2
  299. // https://tools.ietf.org/html/rfc7231#section-4.3.5
  300. // Sending a payload body on a request that does not
  301. // expect it can cause undefined behavior on some
  302. // servers and corrupt connection state. Do not
  303. // re-use the connection for further requests.
  304. const expectsPayload = (
  305. method === 'PUT' ||
  306. method === 'POST' ||
  307. method === 'PATCH'
  308. )
  309. if (body && typeof body.read === 'function') {
  310. // Try to read EOF in order to get length.
  311. body.read(0)
  312. }
  313. let contentLength = util.bodyLength(body)
  314. if (util.isFormDataLike(body)) {
  315. extractBody ??= require('../web/fetch/body.js').extractBody
  316. const [bodyStream, contentType] = extractBody(body)
  317. headers['content-type'] = contentType
  318. body = bodyStream.stream
  319. contentLength = bodyStream.length
  320. }
  321. if (contentLength == null) {
  322. contentLength = request.contentLength
  323. }
  324. if (contentLength === 0 || !expectsPayload) {
  325. // https://tools.ietf.org/html/rfc7230#section-3.3.2
  326. // A user agent SHOULD NOT send a Content-Length header field when
  327. // the request message does not contain a payload body and the method
  328. // semantics do not anticipate such a body.
  329. contentLength = null
  330. }
  331. // https://github.com/nodejs/undici/issues/2046
  332. // A user agent may send a Content-Length header with 0 value, this should be allowed.
  333. if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) {
  334. if (client[kStrictContentLength]) {
  335. util.errorRequest(client, request, new RequestContentLengthMismatchError())
  336. return false
  337. }
  338. process.emitWarning(new RequestContentLengthMismatchError())
  339. }
  340. if (contentLength != null) {
  341. assert(body, 'no body must not have content length')
  342. headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}`
  343. }
  344. session.ref()
  345. const shouldEndStream = method === 'GET' || method === 'HEAD' || body === null
  346. if (expectContinue) {
  347. headers[HTTP2_HEADER_EXPECT] = '100-continue'
  348. stream = session.request(headers, { endStream: shouldEndStream, signal })
  349. stream.once('continue', writeBodyH2)
  350. } else {
  351. stream = session.request(headers, {
  352. endStream: shouldEndStream,
  353. signal
  354. })
  355. writeBodyH2()
  356. }
  357. // Increment counter as we have new streams open
  358. ++session[kOpenStreams]
  359. stream.once('response', headers => {
  360. const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
  361. request.onResponseStarted()
  362. // Due to the stream nature, it is possible we face a race condition
  363. // where the stream has been assigned, but the request has been aborted
  364. // the request remains in-flight and headers hasn't been received yet
  365. // for those scenarios, best effort is to destroy the stream immediately
  366. // as there's no value to keep it open.
  367. if (request.aborted) {
  368. const err = new RequestAbortedError()
  369. util.errorRequest(client, request, err)
  370. util.destroy(stream, err)
  371. return
  372. }
  373. if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '') === false) {
  374. stream.pause()
  375. }
  376. stream.on('data', (chunk) => {
  377. if (request.onData(chunk) === false) {
  378. stream.pause()
  379. }
  380. })
  381. })
  382. stream.once('end', () => {
  383. // When state is null, it means we haven't consumed body and the stream still do not have
  384. // a state.
  385. // Present specially when using pipeline or stream
  386. if (stream.state?.state == null || stream.state.state < 6) {
  387. request.onComplete([])
  388. }
  389. if (session[kOpenStreams] === 0) {
  390. // Stream is closed or half-closed-remote (6), decrement counter and cleanup
  391. // It does not have sense to continue working with the stream as we do not
  392. // have yet RST_STREAM support on client-side
  393. session.unref()
  394. }
  395. abort(new InformationalError('HTTP/2: stream half-closed (remote)'))
  396. client[kQueue][client[kRunningIdx]++] = null
  397. client[kPendingIdx] = client[kRunningIdx]
  398. client[kResume]()
  399. })
  400. stream.once('close', () => {
  401. session[kOpenStreams] -= 1
  402. if (session[kOpenStreams] === 0) {
  403. session.unref()
  404. }
  405. })
  406. stream.once('error', function (err) {
  407. abort(err)
  408. })
  409. stream.once('frameError', (type, code) => {
  410. abort(new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`))
  411. })
  412. // stream.on('aborted', () => {
  413. // // TODO(HTTP/2): Support aborted
  414. // })
  415. // stream.on('timeout', () => {
  416. // // TODO(HTTP/2): Support timeout
  417. // })
  418. // stream.on('push', headers => {
  419. // // TODO(HTTP/2): Support push
  420. // })
  421. // stream.on('trailers', headers => {
  422. // // TODO(HTTP/2): Support trailers
  423. // })
  424. return true
  425. function writeBodyH2 () {
  426. /* istanbul ignore else: assertion */
  427. if (!body || contentLength === 0) {
  428. writeBuffer(
  429. abort,
  430. stream,
  431. null,
  432. client,
  433. request,
  434. client[kSocket],
  435. contentLength,
  436. expectsPayload
  437. )
  438. } else if (util.isBuffer(body)) {
  439. writeBuffer(
  440. abort,
  441. stream,
  442. body,
  443. client,
  444. request,
  445. client[kSocket],
  446. contentLength,
  447. expectsPayload
  448. )
  449. } else if (util.isBlobLike(body)) {
  450. if (typeof body.stream === 'function') {
  451. writeIterable(
  452. abort,
  453. stream,
  454. body.stream(),
  455. client,
  456. request,
  457. client[kSocket],
  458. contentLength,
  459. expectsPayload
  460. )
  461. } else {
  462. writeBlob(
  463. abort,
  464. stream,
  465. body,
  466. client,
  467. request,
  468. client[kSocket],
  469. contentLength,
  470. expectsPayload
  471. )
  472. }
  473. } else if (util.isStream(body)) {
  474. writeStream(
  475. abort,
  476. client[kSocket],
  477. expectsPayload,
  478. stream,
  479. body,
  480. client,
  481. request,
  482. contentLength
  483. )
  484. } else if (util.isIterable(body)) {
  485. writeIterable(
  486. abort,
  487. stream,
  488. body,
  489. client,
  490. request,
  491. client[kSocket],
  492. contentLength,
  493. expectsPayload
  494. )
  495. } else {
  496. assert(false)
  497. }
  498. }
  499. }
  500. function writeBuffer (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
  501. try {
  502. if (body != null && util.isBuffer(body)) {
  503. assert(contentLength === body.byteLength, 'buffer body must have content length')
  504. h2stream.cork()
  505. h2stream.write(body)
  506. h2stream.uncork()
  507. h2stream.end()
  508. request.onBodySent(body)
  509. }
  510. if (!expectsPayload) {
  511. socket[kReset] = true
  512. }
  513. request.onRequestSent()
  514. client[kResume]()
  515. } catch (error) {
  516. abort(error)
  517. }
  518. }
  519. function writeStream (abort, socket, expectsPayload, h2stream, body, client, request, contentLength) {
  520. assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
  521. // For HTTP/2, is enough to pipe the stream
  522. const pipe = pipeline(
  523. body,
  524. h2stream,
  525. (err) => {
  526. if (err) {
  527. util.destroy(pipe, err)
  528. abort(err)
  529. } else {
  530. util.removeAllListeners(pipe)
  531. request.onRequestSent()
  532. if (!expectsPayload) {
  533. socket[kReset] = true
  534. }
  535. client[kResume]()
  536. }
  537. }
  538. )
  539. util.addListener(pipe, 'data', onPipeData)
  540. function onPipeData (chunk) {
  541. request.onBodySent(chunk)
  542. }
  543. }
  544. async function writeBlob (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
  545. assert(contentLength === body.size, 'blob body must have content length')
  546. try {
  547. if (contentLength != null && contentLength !== body.size) {
  548. throw new RequestContentLengthMismatchError()
  549. }
  550. const buffer = Buffer.from(await body.arrayBuffer())
  551. h2stream.cork()
  552. h2stream.write(buffer)
  553. h2stream.uncork()
  554. h2stream.end()
  555. request.onBodySent(buffer)
  556. request.onRequestSent()
  557. if (!expectsPayload) {
  558. socket[kReset] = true
  559. }
  560. client[kResume]()
  561. } catch (err) {
  562. abort(err)
  563. }
  564. }
  565. async function writeIterable (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
  566. assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
  567. let callback = null
  568. function onDrain () {
  569. if (callback) {
  570. const cb = callback
  571. callback = null
  572. cb()
  573. }
  574. }
  575. const waitForDrain = () => new Promise((resolve, reject) => {
  576. assert(callback === null)
  577. if (socket[kError]) {
  578. reject(socket[kError])
  579. } else {
  580. callback = resolve
  581. }
  582. })
  583. h2stream
  584. .on('close', onDrain)
  585. .on('drain', onDrain)
  586. try {
  587. // It's up to the user to somehow abort the async iterable.
  588. for await (const chunk of body) {
  589. if (socket[kError]) {
  590. throw socket[kError]
  591. }
  592. const res = h2stream.write(chunk)
  593. request.onBodySent(chunk)
  594. if (!res) {
  595. await waitForDrain()
  596. }
  597. }
  598. h2stream.end()
  599. request.onRequestSent()
  600. if (!expectsPayload) {
  601. socket[kReset] = true
  602. }
  603. client[kResume]()
  604. } catch (err) {
  605. abort(err)
  606. } finally {
  607. h2stream
  608. .off('close', onDrain)
  609. .off('drain', onDrain)
  610. }
  611. }
  612. module.exports = connectH2