client-h2.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730
  1. 'use strict'
  2. const assert = require('node:assert')
  3. const { pipeline } = require('node:stream')
  4. const util = require('../core/util.js')
  5. const {
  6. RequestContentLengthMismatchError,
  7. RequestAbortedError,
  8. SocketError,
  9. InformationalError
  10. } = require('../core/errors.js')
  11. const {
  12. kUrl,
  13. kReset,
  14. kClient,
  15. kRunning,
  16. kPending,
  17. kQueue,
  18. kPendingIdx,
  19. kRunningIdx,
  20. kError,
  21. kSocket,
  22. kStrictContentLength,
  23. kOnError,
  24. kMaxConcurrentStreams,
  25. kHTTP2Session,
  26. kResume,
  27. kSize,
  28. kHTTPContext
  29. } = require('../core/symbols.js')
  30. const kOpenStreams = Symbol('open streams')
  31. // Experimental
  32. let h2ExperimentalWarned = false
  33. /** @type {import('http2')} */
  34. let http2
  35. try {
  36. http2 = require('node:http2')
  37. } catch {
  38. // @ts-ignore
  39. http2 = { constants: {} }
  40. }
  41. const {
  42. constants: {
  43. HTTP2_HEADER_AUTHORITY,
  44. HTTP2_HEADER_METHOD,
  45. HTTP2_HEADER_PATH,
  46. HTTP2_HEADER_SCHEME,
  47. HTTP2_HEADER_CONTENT_LENGTH,
  48. HTTP2_HEADER_EXPECT,
  49. HTTP2_HEADER_STATUS
  50. }
  51. } = http2
  52. function parseH2Headers (headers) {
  53. const result = []
  54. for (const [name, value] of Object.entries(headers)) {
  55. // h2 may concat the header value by array
  56. // e.g. Set-Cookie
  57. if (Array.isArray(value)) {
  58. for (const subvalue of value) {
  59. // we need to provide each header value of header name
  60. // because the headers handler expect name-value pair
  61. result.push(Buffer.from(name), Buffer.from(subvalue))
  62. }
  63. } else {
  64. result.push(Buffer.from(name), Buffer.from(value))
  65. }
  66. }
  67. return result
  68. }
  69. async function connectH2 (client, socket) {
  70. client[kSocket] = socket
  71. if (!h2ExperimentalWarned) {
  72. h2ExperimentalWarned = true
  73. process.emitWarning('H2 support is experimental, expect them to change at any time.', {
  74. code: 'UNDICI-H2'
  75. })
  76. }
  77. const session = http2.connect(client[kUrl], {
  78. createConnection: () => socket,
  79. peerMaxConcurrentStreams: client[kMaxConcurrentStreams]
  80. })
  81. session[kOpenStreams] = 0
  82. session[kClient] = client
  83. session[kSocket] = socket
  84. util.addListener(session, 'error', onHttp2SessionError)
  85. util.addListener(session, 'frameError', onHttp2FrameError)
  86. util.addListener(session, 'end', onHttp2SessionEnd)
  87. util.addListener(session, 'goaway', onHTTP2GoAway)
  88. util.addListener(session, 'close', function () {
  89. const { [kClient]: client } = this
  90. const { [kSocket]: socket } = client
  91. const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket))
  92. client[kHTTP2Session] = null
  93. if (client.destroyed) {
  94. assert(client[kPending] === 0)
  95. // Fail entire queue.
  96. const requests = client[kQueue].splice(client[kRunningIdx])
  97. for (let i = 0; i < requests.length; i++) {
  98. const request = requests[i]
  99. util.errorRequest(client, request, err)
  100. }
  101. }
  102. })
  103. session.unref()
  104. client[kHTTP2Session] = session
  105. socket[kHTTP2Session] = session
  106. util.addListener(socket, 'error', function (err) {
  107. assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
  108. this[kError] = err
  109. this[kClient][kOnError](err)
  110. })
  111. util.addListener(socket, 'end', function () {
  112. util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
  113. })
  114. util.addListener(socket, 'close', function () {
  115. const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
  116. client[kSocket] = null
  117. if (this[kHTTP2Session] != null) {
  118. this[kHTTP2Session].destroy(err)
  119. }
  120. client[kPendingIdx] = client[kRunningIdx]
  121. assert(client[kRunning] === 0)
  122. client.emit('disconnect', client[kUrl], [client], err)
  123. client[kResume]()
  124. })
  125. let closed = false
  126. socket.on('close', () => {
  127. closed = true
  128. })
  129. return {
  130. version: 'h2',
  131. defaultPipelining: Infinity,
  132. write (...args) {
  133. return writeH2(client, ...args)
  134. },
  135. resume () {
  136. resumeH2(client)
  137. },
  138. destroy (err, callback) {
  139. if (closed) {
  140. queueMicrotask(callback)
  141. } else {
  142. // Destroying the socket will trigger the session close
  143. socket.destroy(err).on('close', callback)
  144. }
  145. },
  146. get destroyed () {
  147. return socket.destroyed
  148. },
  149. busy () {
  150. return false
  151. }
  152. }
  153. }
  154. function resumeH2 (client) {
  155. const socket = client[kSocket]
  156. if (socket?.destroyed === false) {
  157. if (client[kSize] === 0 && client[kMaxConcurrentStreams] === 0) {
  158. socket.unref()
  159. client[kHTTP2Session].unref()
  160. } else {
  161. socket.ref()
  162. client[kHTTP2Session].ref()
  163. }
  164. }
  165. }
  166. function onHttp2SessionError (err) {
  167. assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
  168. this[kSocket][kError] = err
  169. this[kClient][kOnError](err)
  170. }
  171. function onHttp2FrameError (type, code, id) {
  172. if (id === 0) {
  173. const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
  174. this[kSocket][kError] = err
  175. this[kClient][kOnError](err)
  176. }
  177. }
  178. function onHttp2SessionEnd () {
  179. const err = new SocketError('other side closed', util.getSocketInfo(this[kSocket]))
  180. this.destroy(err)
  181. util.destroy(this[kSocket], err)
  182. }
  183. /**
  184. * This is the root cause of #3011
  185. * We need to handle GOAWAY frames properly, and trigger the session close
  186. * along with the socket right away
  187. */
  188. function onHTTP2GoAway (code) {
  189. // We cannot recover, so best to close the session and the socket
  190. const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${code}`, util.getSocketInfo(this))
  191. const client = this[kClient]
  192. client[kSocket] = null
  193. client[kHTTPContext] = null
  194. if (this[kHTTP2Session] != null) {
  195. this[kHTTP2Session].destroy(err)
  196. this[kHTTP2Session] = null
  197. }
  198. util.destroy(this[kSocket], err)
  199. // Fail head of pipeline.
  200. const request = client[kQueue][client[kRunningIdx]]
  201. client[kQueue][client[kRunningIdx]++] = null
  202. util.errorRequest(client, request, err)
  203. client[kPendingIdx] = client[kRunningIdx]
  204. assert(client[kRunning] === 0)
  205. client.emit('disconnect', client[kUrl], [client], err)
  206. client[kResume]()
  207. }
  208. // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
  209. function shouldSendContentLength (method) {
  210. return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
  211. }
  212. function writeH2 (client, request) {
  213. const session = client[kHTTP2Session]
  214. const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request
  215. if (upgrade) {
  216. util.errorRequest(client, request, new Error('Upgrade not supported for H2'))
  217. return false
  218. }
  219. const headers = {}
  220. for (let n = 0; n < reqHeaders.length; n += 2) {
  221. const key = reqHeaders[n + 0]
  222. const val = reqHeaders[n + 1]
  223. if (Array.isArray(val)) {
  224. for (let i = 0; i < val.length; i++) {
  225. if (headers[key]) {
  226. headers[key] += `,${val[i]}`
  227. } else {
  228. headers[key] = val[i]
  229. }
  230. }
  231. } else {
  232. headers[key] = val
  233. }
  234. }
  235. /** @type {import('node:http2').ClientHttp2Stream} */
  236. let stream
  237. const { hostname, port } = client[kUrl]
  238. headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}`
  239. headers[HTTP2_HEADER_METHOD] = method
  240. const abort = (err) => {
  241. if (request.aborted || request.completed) {
  242. return
  243. }
  244. err = err || new RequestAbortedError()
  245. util.errorRequest(client, request, err)
  246. if (stream != null) {
  247. util.destroy(stream, err)
  248. }
  249. // We do not destroy the socket as we can continue using the session
  250. // the stream get's destroyed and the session remains to create new streams
  251. util.destroy(body, err)
  252. client[kQueue][client[kRunningIdx]++] = null
  253. client[kResume]()
  254. }
  255. try {
  256. // We are already connected, streams are pending.
  257. // We can call on connect, and wait for abort
  258. request.onConnect(abort)
  259. } catch (err) {
  260. util.errorRequest(client, request, err)
  261. }
  262. if (request.aborted) {
  263. return false
  264. }
  265. if (method === 'CONNECT') {
  266. session.ref()
  267. // We are already connected, streams are pending, first request
  268. // will create a new stream. We trigger a request to create the stream and wait until
  269. // `ready` event is triggered
  270. // We disabled endStream to allow the user to write to the stream
  271. stream = session.request(headers, { endStream: false, signal })
  272. if (stream.id && !stream.pending) {
  273. request.onUpgrade(null, null, stream)
  274. ++session[kOpenStreams]
  275. client[kQueue][client[kRunningIdx]++] = null
  276. } else {
  277. stream.once('ready', () => {
  278. request.onUpgrade(null, null, stream)
  279. ++session[kOpenStreams]
  280. client[kQueue][client[kRunningIdx]++] = null
  281. })
  282. }
  283. stream.once('close', () => {
  284. session[kOpenStreams] -= 1
  285. if (session[kOpenStreams] === 0) session.unref()
  286. })
  287. return true
  288. }
  289. // https://tools.ietf.org/html/rfc7540#section-8.3
  290. // :path and :scheme headers must be omitted when sending CONNECT
  291. headers[HTTP2_HEADER_PATH] = path
  292. headers[HTTP2_HEADER_SCHEME] = 'https'
  293. // https://tools.ietf.org/html/rfc7231#section-4.3.1
  294. // https://tools.ietf.org/html/rfc7231#section-4.3.2
  295. // https://tools.ietf.org/html/rfc7231#section-4.3.5
  296. // Sending a payload body on a request that does not
  297. // expect it can cause undefined behavior on some
  298. // servers and corrupt connection state. Do not
  299. // re-use the connection for further requests.
  300. const expectsPayload = (
  301. method === 'PUT' ||
  302. method === 'POST' ||
  303. method === 'PATCH'
  304. )
  305. if (body && typeof body.read === 'function') {
  306. // Try to read EOF in order to get length.
  307. body.read(0)
  308. }
  309. let contentLength = util.bodyLength(body)
  310. if (contentLength == null) {
  311. contentLength = request.contentLength
  312. }
  313. if (contentLength === 0 || !expectsPayload) {
  314. // https://tools.ietf.org/html/rfc7230#section-3.3.2
  315. // A user agent SHOULD NOT send a Content-Length header field when
  316. // the request message does not contain a payload body and the method
  317. // semantics do not anticipate such a body.
  318. contentLength = null
  319. }
  320. // https://github.com/nodejs/undici/issues/2046
  321. // A user agent may send a Content-Length header with 0 value, this should be allowed.
  322. if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) {
  323. if (client[kStrictContentLength]) {
  324. util.errorRequest(client, request, new RequestContentLengthMismatchError())
  325. return false
  326. }
  327. process.emitWarning(new RequestContentLengthMismatchError())
  328. }
  329. if (contentLength != null) {
  330. assert(body, 'no body must not have content length')
  331. headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}`
  332. }
  333. session.ref()
  334. const shouldEndStream = method === 'GET' || method === 'HEAD' || body === null
  335. if (expectContinue) {
  336. headers[HTTP2_HEADER_EXPECT] = '100-continue'
  337. stream = session.request(headers, { endStream: shouldEndStream, signal })
  338. stream.once('continue', writeBodyH2)
  339. } else {
  340. stream = session.request(headers, {
  341. endStream: shouldEndStream,
  342. signal
  343. })
  344. writeBodyH2()
  345. }
  346. // Increment counter as we have new streams open
  347. ++session[kOpenStreams]
  348. stream.once('response', headers => {
  349. const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
  350. request.onResponseStarted()
  351. // Due to the stream nature, it is possible we face a race condition
  352. // where the stream has been assigned, but the request has been aborted
  353. // the request remains in-flight and headers hasn't been received yet
  354. // for those scenarios, best effort is to destroy the stream immediately
  355. // as there's no value to keep it open.
  356. if (request.aborted) {
  357. const err = new RequestAbortedError()
  358. util.errorRequest(client, request, err)
  359. util.destroy(stream, err)
  360. return
  361. }
  362. if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '') === false) {
  363. stream.pause()
  364. }
  365. stream.on('data', (chunk) => {
  366. if (request.onData(chunk) === false) {
  367. stream.pause()
  368. }
  369. })
  370. })
  371. stream.once('end', () => {
  372. // When state is null, it means we haven't consumed body and the stream still do not have
  373. // a state.
  374. // Present specially when using pipeline or stream
  375. if (stream.state?.state == null || stream.state.state < 6) {
  376. request.onComplete([])
  377. }
  378. if (session[kOpenStreams] === 0) {
  379. // Stream is closed or half-closed-remote (6), decrement counter and cleanup
  380. // It does not have sense to continue working with the stream as we do not
  381. // have yet RST_STREAM support on client-side
  382. session.unref()
  383. }
  384. abort(new InformationalError('HTTP/2: stream half-closed (remote)'))
  385. client[kQueue][client[kRunningIdx]++] = null
  386. client[kPendingIdx] = client[kRunningIdx]
  387. client[kResume]()
  388. })
  389. stream.once('close', () => {
  390. session[kOpenStreams] -= 1
  391. if (session[kOpenStreams] === 0) {
  392. session.unref()
  393. }
  394. })
  395. stream.once('error', function (err) {
  396. abort(err)
  397. })
  398. stream.once('frameError', (type, code) => {
  399. abort(new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`))
  400. })
  401. // stream.on('aborted', () => {
  402. // // TODO(HTTP/2): Support aborted
  403. // })
  404. // stream.on('timeout', () => {
  405. // // TODO(HTTP/2): Support timeout
  406. // })
  407. // stream.on('push', headers => {
  408. // // TODO(HTTP/2): Support push
  409. // })
  410. // stream.on('trailers', headers => {
  411. // // TODO(HTTP/2): Support trailers
  412. // })
  413. return true
  414. function writeBodyH2 () {
  415. /* istanbul ignore else: assertion */
  416. if (!body || contentLength === 0) {
  417. writeBuffer(
  418. abort,
  419. stream,
  420. null,
  421. client,
  422. request,
  423. client[kSocket],
  424. contentLength,
  425. expectsPayload
  426. )
  427. } else if (util.isBuffer(body)) {
  428. writeBuffer(
  429. abort,
  430. stream,
  431. body,
  432. client,
  433. request,
  434. client[kSocket],
  435. contentLength,
  436. expectsPayload
  437. )
  438. } else if (util.isBlobLike(body)) {
  439. if (typeof body.stream === 'function') {
  440. writeIterable(
  441. abort,
  442. stream,
  443. body.stream(),
  444. client,
  445. request,
  446. client[kSocket],
  447. contentLength,
  448. expectsPayload
  449. )
  450. } else {
  451. writeBlob(
  452. abort,
  453. stream,
  454. body,
  455. client,
  456. request,
  457. client[kSocket],
  458. contentLength,
  459. expectsPayload
  460. )
  461. }
  462. } else if (util.isStream(body)) {
  463. writeStream(
  464. abort,
  465. client[kSocket],
  466. expectsPayload,
  467. stream,
  468. body,
  469. client,
  470. request,
  471. contentLength
  472. )
  473. } else if (util.isIterable(body)) {
  474. writeIterable(
  475. abort,
  476. stream,
  477. body,
  478. client,
  479. request,
  480. client[kSocket],
  481. contentLength,
  482. expectsPayload
  483. )
  484. } else {
  485. assert(false)
  486. }
  487. }
  488. }
  489. function writeBuffer (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
  490. try {
  491. if (body != null && util.isBuffer(body)) {
  492. assert(contentLength === body.byteLength, 'buffer body must have content length')
  493. h2stream.cork()
  494. h2stream.write(body)
  495. h2stream.uncork()
  496. h2stream.end()
  497. request.onBodySent(body)
  498. }
  499. if (!expectsPayload) {
  500. socket[kReset] = true
  501. }
  502. request.onRequestSent()
  503. client[kResume]()
  504. } catch (error) {
  505. abort(error)
  506. }
  507. }
  508. function writeStream (abort, socket, expectsPayload, h2stream, body, client, request, contentLength) {
  509. assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
  510. // For HTTP/2, is enough to pipe the stream
  511. const pipe = pipeline(
  512. body,
  513. h2stream,
  514. (err) => {
  515. if (err) {
  516. util.destroy(pipe, err)
  517. abort(err)
  518. } else {
  519. util.removeAllListeners(pipe)
  520. request.onRequestSent()
  521. if (!expectsPayload) {
  522. socket[kReset] = true
  523. }
  524. client[kResume]()
  525. }
  526. }
  527. )
  528. util.addListener(pipe, 'data', onPipeData)
  529. function onPipeData (chunk) {
  530. request.onBodySent(chunk)
  531. }
  532. }
  533. async function writeBlob (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
  534. assert(contentLength === body.size, 'blob body must have content length')
  535. try {
  536. if (contentLength != null && contentLength !== body.size) {
  537. throw new RequestContentLengthMismatchError()
  538. }
  539. const buffer = Buffer.from(await body.arrayBuffer())
  540. h2stream.cork()
  541. h2stream.write(buffer)
  542. h2stream.uncork()
  543. h2stream.end()
  544. request.onBodySent(buffer)
  545. request.onRequestSent()
  546. if (!expectsPayload) {
  547. socket[kReset] = true
  548. }
  549. client[kResume]()
  550. } catch (err) {
  551. abort(err)
  552. }
  553. }
  554. async function writeIterable (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
  555. assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
  556. let callback = null
  557. function onDrain () {
  558. if (callback) {
  559. const cb = callback
  560. callback = null
  561. cb()
  562. }
  563. }
  564. const waitForDrain = () => new Promise((resolve, reject) => {
  565. assert(callback === null)
  566. if (socket[kError]) {
  567. reject(socket[kError])
  568. } else {
  569. callback = resolve
  570. }
  571. })
  572. h2stream
  573. .on('close', onDrain)
  574. .on('drain', onDrain)
  575. try {
  576. // It's up to the user to somehow abort the async iterable.
  577. for await (const chunk of body) {
  578. if (socket[kError]) {
  579. throw socket[kError]
  580. }
  581. const res = h2stream.write(chunk)
  582. request.onBodySent(chunk)
  583. if (!res) {
  584. await waitForDrain()
  585. }
  586. }
  587. h2stream.end()
  588. request.onRequestSent()
  589. if (!expectsPayload) {
  590. socket[kReset] = true
  591. }
  592. client[kResume]()
  593. } catch (err) {
  594. abort(err)
  595. } finally {
  596. h2stream
  597. .off('close', onDrain)
  598. .off('drain', onDrain)
  599. }
  600. }
  601. module.exports = connectH2