client-h1.js 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370
  1. 'use strict'
  2. /* global WebAssembly */
  3. const assert = require('node:assert')
  4. const util = require('../core/util.js')
  5. const { channels } = require('../core/diagnostics.js')
  6. const timers = require('../util/timers.js')
  7. const {
  8. RequestContentLengthMismatchError,
  9. ResponseContentLengthMismatchError,
  10. RequestAbortedError,
  11. HeadersTimeoutError,
  12. HeadersOverflowError,
  13. SocketError,
  14. InformationalError,
  15. BodyTimeoutError,
  16. HTTPParserError,
  17. ResponseExceededMaxSizeError
  18. } = require('../core/errors.js')
  19. const {
  20. kUrl,
  21. kReset,
  22. kClient,
  23. kParser,
  24. kBlocking,
  25. kRunning,
  26. kPending,
  27. kSize,
  28. kWriting,
  29. kQueue,
  30. kNoRef,
  31. kKeepAliveDefaultTimeout,
  32. kHostHeader,
  33. kPendingIdx,
  34. kRunningIdx,
  35. kError,
  36. kPipelining,
  37. kSocket,
  38. kKeepAliveTimeoutValue,
  39. kMaxHeadersSize,
  40. kKeepAliveMaxTimeout,
  41. kKeepAliveTimeoutThreshold,
  42. kHeadersTimeout,
  43. kBodyTimeout,
  44. kStrictContentLength,
  45. kMaxRequests,
  46. kCounter,
  47. kMaxResponseSize,
  48. kOnError,
  49. kResume,
  50. kHTTPContext
  51. } = require('../core/symbols.js')
  52. const constants = require('../llhttp/constants.js')
  53. const EMPTY_BUF = Buffer.alloc(0)
  54. const FastBuffer = Buffer[Symbol.species]
  55. const addListener = util.addListener
  56. const removeAllListeners = util.removeAllListeners
  57. let extractBody
  58. async function lazyllhttp () {
  59. const llhttpWasmData = process.env.JEST_WORKER_ID ? require('../llhttp/llhttp-wasm.js') : undefined
  60. let mod
  61. try {
  62. mod = await WebAssembly.compile(require('../llhttp/llhttp_simd-wasm.js'))
  63. } catch (e) {
  64. /* istanbul ignore next */
  65. // We could check if the error was caused by the simd option not
  66. // being enabled, but the occurring of this other error
  67. // * https://github.com/emscripten-core/emscripten/issues/11495
  68. // got me to remove that check to avoid breaking Node 12.
  69. mod = await WebAssembly.compile(llhttpWasmData || require('../llhttp/llhttp-wasm.js'))
  70. }
  71. return await WebAssembly.instantiate(mod, {
  72. env: {
  73. /* eslint-disable camelcase */
  74. wasm_on_url: (p, at, len) => {
  75. /* istanbul ignore next */
  76. return 0
  77. },
  78. wasm_on_status: (p, at, len) => {
  79. assert(currentParser.ptr === p)
  80. const start = at - currentBufferPtr + currentBufferRef.byteOffset
  81. return currentParser.onStatus(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
  82. },
  83. wasm_on_message_begin: (p) => {
  84. assert(currentParser.ptr === p)
  85. return currentParser.onMessageBegin() || 0
  86. },
  87. wasm_on_header_field: (p, at, len) => {
  88. assert(currentParser.ptr === p)
  89. const start = at - currentBufferPtr + currentBufferRef.byteOffset
  90. return currentParser.onHeaderField(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
  91. },
  92. wasm_on_header_value: (p, at, len) => {
  93. assert(currentParser.ptr === p)
  94. const start = at - currentBufferPtr + currentBufferRef.byteOffset
  95. return currentParser.onHeaderValue(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
  96. },
  97. wasm_on_headers_complete: (p, statusCode, upgrade, shouldKeepAlive) => {
  98. assert(currentParser.ptr === p)
  99. return currentParser.onHeadersComplete(statusCode, Boolean(upgrade), Boolean(shouldKeepAlive)) || 0
  100. },
  101. wasm_on_body: (p, at, len) => {
  102. assert(currentParser.ptr === p)
  103. const start = at - currentBufferPtr + currentBufferRef.byteOffset
  104. return currentParser.onBody(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
  105. },
  106. wasm_on_message_complete: (p) => {
  107. assert(currentParser.ptr === p)
  108. return currentParser.onMessageComplete() || 0
  109. }
  110. /* eslint-enable camelcase */
  111. }
  112. })
  113. }
  114. let llhttpInstance = null
  115. let llhttpPromise = lazyllhttp()
  116. llhttpPromise.catch()
  117. let currentParser = null
  118. let currentBufferRef = null
  119. let currentBufferSize = 0
  120. let currentBufferPtr = null
  121. const USE_NATIVE_TIMER = 0
  122. const USE_FAST_TIMER = 1
  123. // Use fast timers for headers and body to take eventual event loop
  124. // latency into account.
  125. const TIMEOUT_HEADERS = 2 | USE_FAST_TIMER
  126. const TIMEOUT_BODY = 4 | USE_FAST_TIMER
  127. // Use native timers to ignore event loop latency for keep-alive
  128. // handling.
  129. const TIMEOUT_KEEP_ALIVE = 8 | USE_NATIVE_TIMER
  130. class Parser {
  131. constructor (client, socket, { exports }) {
  132. assert(Number.isFinite(client[kMaxHeadersSize]) && client[kMaxHeadersSize] > 0)
  133. this.llhttp = exports
  134. this.ptr = this.llhttp.llhttp_alloc(constants.TYPE.RESPONSE)
  135. this.client = client
  136. this.socket = socket
  137. this.timeout = null
  138. this.timeoutValue = null
  139. this.timeoutType = null
  140. this.statusCode = null
  141. this.statusText = ''
  142. this.upgrade = false
  143. this.headers = []
  144. this.headersSize = 0
  145. this.headersMaxSize = client[kMaxHeadersSize]
  146. this.shouldKeepAlive = false
  147. this.paused = false
  148. this.resume = this.resume.bind(this)
  149. this.bytesRead = 0
  150. this.keepAlive = ''
  151. this.contentLength = ''
  152. this.connection = ''
  153. this.maxResponseSize = client[kMaxResponseSize]
  154. }
  155. setTimeout (delay, type) {
  156. // If the existing timer and the new timer are of different timer type
  157. // (fast or native) or have different delay, we need to clear the existing
  158. // timer and set a new one.
  159. if (
  160. delay !== this.timeoutValue ||
  161. (type & USE_FAST_TIMER) ^ (this.timeoutType & USE_FAST_TIMER)
  162. ) {
  163. // If a timeout is already set, clear it with clearTimeout of the fast
  164. // timer implementation, as it can clear fast and native timers.
  165. if (this.timeout) {
  166. timers.clearTimeout(this.timeout)
  167. this.timeout = null
  168. }
  169. if (delay) {
  170. if (type & USE_FAST_TIMER) {
  171. this.timeout = timers.setFastTimeout(onParserTimeout, delay, new WeakRef(this))
  172. } else {
  173. this.timeout = setTimeout(onParserTimeout, delay, new WeakRef(this))
  174. this.timeout.unref()
  175. }
  176. }
  177. this.timeoutValue = delay
  178. } else if (this.timeout) {
  179. // istanbul ignore else: only for jest
  180. if (this.timeout.refresh) {
  181. this.timeout.refresh()
  182. }
  183. }
  184. this.timeoutType = type
  185. }
  186. resume () {
  187. if (this.socket.destroyed || !this.paused) {
  188. return
  189. }
  190. assert(this.ptr != null)
  191. assert(currentParser == null)
  192. this.llhttp.llhttp_resume(this.ptr)
  193. assert(this.timeoutType === TIMEOUT_BODY)
  194. if (this.timeout) {
  195. // istanbul ignore else: only for jest
  196. if (this.timeout.refresh) {
  197. this.timeout.refresh()
  198. }
  199. }
  200. this.paused = false
  201. this.execute(this.socket.read() || EMPTY_BUF) // Flush parser.
  202. this.readMore()
  203. }
  204. readMore () {
  205. while (!this.paused && this.ptr) {
  206. const chunk = this.socket.read()
  207. if (chunk === null) {
  208. break
  209. }
  210. this.execute(chunk)
  211. }
  212. }
  213. execute (data) {
  214. assert(this.ptr != null)
  215. assert(currentParser == null)
  216. assert(!this.paused)
  217. const { socket, llhttp } = this
  218. if (data.length > currentBufferSize) {
  219. if (currentBufferPtr) {
  220. llhttp.free(currentBufferPtr)
  221. }
  222. currentBufferSize = Math.ceil(data.length / 4096) * 4096
  223. currentBufferPtr = llhttp.malloc(currentBufferSize)
  224. }
  225. new Uint8Array(llhttp.memory.buffer, currentBufferPtr, currentBufferSize).set(data)
  226. // Call `execute` on the wasm parser.
  227. // We pass the `llhttp_parser` pointer address, the pointer address of buffer view data,
  228. // and finally the length of bytes to parse.
  229. // The return value is an error code or `constants.ERROR.OK`.
  230. try {
  231. let ret
  232. try {
  233. currentBufferRef = data
  234. currentParser = this
  235. ret = llhttp.llhttp_execute(this.ptr, currentBufferPtr, data.length)
  236. /* eslint-disable-next-line no-useless-catch */
  237. } catch (err) {
  238. /* istanbul ignore next: difficult to make a test case for */
  239. throw err
  240. } finally {
  241. currentParser = null
  242. currentBufferRef = null
  243. }
  244. const offset = llhttp.llhttp_get_error_pos(this.ptr) - currentBufferPtr
  245. if (ret === constants.ERROR.PAUSED_UPGRADE) {
  246. this.onUpgrade(data.slice(offset))
  247. } else if (ret === constants.ERROR.PAUSED) {
  248. this.paused = true
  249. socket.unshift(data.slice(offset))
  250. } else if (ret !== constants.ERROR.OK) {
  251. const ptr = llhttp.llhttp_get_error_reason(this.ptr)
  252. let message = ''
  253. /* istanbul ignore else: difficult to make a test case for */
  254. if (ptr) {
  255. const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0)
  256. message =
  257. 'Response does not match the HTTP/1.1 protocol (' +
  258. Buffer.from(llhttp.memory.buffer, ptr, len).toString() +
  259. ')'
  260. }
  261. throw new HTTPParserError(message, constants.ERROR[ret], data.slice(offset))
  262. }
  263. } catch (err) {
  264. util.destroy(socket, err)
  265. }
  266. }
  267. destroy () {
  268. assert(this.ptr != null)
  269. assert(currentParser == null)
  270. this.llhttp.llhttp_free(this.ptr)
  271. this.ptr = null
  272. this.timeout && timers.clearTimeout(this.timeout)
  273. this.timeout = null
  274. this.timeoutValue = null
  275. this.timeoutType = null
  276. this.paused = false
  277. }
  278. onStatus (buf) {
  279. this.statusText = buf.toString()
  280. }
  281. onMessageBegin () {
  282. const { socket, client } = this
  283. /* istanbul ignore next: difficult to make a test case for */
  284. if (socket.destroyed) {
  285. return -1
  286. }
  287. const request = client[kQueue][client[kRunningIdx]]
  288. if (!request) {
  289. return -1
  290. }
  291. request.onResponseStarted()
  292. }
  293. onHeaderField (buf) {
  294. const len = this.headers.length
  295. if ((len & 1) === 0) {
  296. this.headers.push(buf)
  297. } else {
  298. this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
  299. }
  300. this.trackHeader(buf.length)
  301. }
  302. onHeaderValue (buf) {
  303. let len = this.headers.length
  304. if ((len & 1) === 1) {
  305. this.headers.push(buf)
  306. len += 1
  307. } else {
  308. this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
  309. }
  310. const key = this.headers[len - 2]
  311. if (key.length === 10) {
  312. const headerName = util.bufferToLowerCasedHeaderName(key)
  313. if (headerName === 'keep-alive') {
  314. this.keepAlive += buf.toString()
  315. } else if (headerName === 'connection') {
  316. this.connection += buf.toString()
  317. }
  318. } else if (key.length === 14 && util.bufferToLowerCasedHeaderName(key) === 'content-length') {
  319. this.contentLength += buf.toString()
  320. }
  321. this.trackHeader(buf.length)
  322. }
  323. trackHeader (len) {
  324. this.headersSize += len
  325. if (this.headersSize >= this.headersMaxSize) {
  326. util.destroy(this.socket, new HeadersOverflowError())
  327. }
  328. }
  329. onUpgrade (head) {
  330. const { upgrade, client, socket, headers, statusCode } = this
  331. assert(upgrade)
  332. assert(client[kSocket] === socket)
  333. assert(!socket.destroyed)
  334. assert(!this.paused)
  335. assert((headers.length & 1) === 0)
  336. const request = client[kQueue][client[kRunningIdx]]
  337. assert(request)
  338. assert(request.upgrade || request.method === 'CONNECT')
  339. this.statusCode = null
  340. this.statusText = ''
  341. this.shouldKeepAlive = null
  342. this.headers = []
  343. this.headersSize = 0
  344. socket.unshift(head)
  345. socket[kParser].destroy()
  346. socket[kParser] = null
  347. socket[kClient] = null
  348. socket[kError] = null
  349. removeAllListeners(socket)
  350. client[kSocket] = null
  351. client[kHTTPContext] = null // TODO (fix): This is hacky...
  352. client[kQueue][client[kRunningIdx]++] = null
  353. client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade'))
  354. try {
  355. request.onUpgrade(statusCode, headers, socket)
  356. } catch (err) {
  357. util.destroy(socket, err)
  358. }
  359. client[kResume]()
  360. }
  361. onHeadersComplete (statusCode, upgrade, shouldKeepAlive) {
  362. const { client, socket, headers, statusText } = this
  363. /* istanbul ignore next: difficult to make a test case for */
  364. if (socket.destroyed) {
  365. return -1
  366. }
  367. const request = client[kQueue][client[kRunningIdx]]
  368. /* istanbul ignore next: difficult to make a test case for */
  369. if (!request) {
  370. return -1
  371. }
  372. assert(!this.upgrade)
  373. assert(this.statusCode < 200)
  374. if (statusCode === 100) {
  375. util.destroy(socket, new SocketError('bad response', util.getSocketInfo(socket)))
  376. return -1
  377. }
  378. /* this can only happen if server is misbehaving */
  379. if (upgrade && !request.upgrade) {
  380. util.destroy(socket, new SocketError('bad upgrade', util.getSocketInfo(socket)))
  381. return -1
  382. }
  383. assert(this.timeoutType === TIMEOUT_HEADERS)
  384. this.statusCode = statusCode
  385. this.shouldKeepAlive = (
  386. shouldKeepAlive ||
  387. // Override llhttp value which does not allow keepAlive for HEAD.
  388. (request.method === 'HEAD' && !socket[kReset] && this.connection.toLowerCase() === 'keep-alive')
  389. )
  390. if (this.statusCode >= 200) {
  391. const bodyTimeout = request.bodyTimeout != null
  392. ? request.bodyTimeout
  393. : client[kBodyTimeout]
  394. this.setTimeout(bodyTimeout, TIMEOUT_BODY)
  395. } else if (this.timeout) {
  396. // istanbul ignore else: only for jest
  397. if (this.timeout.refresh) {
  398. this.timeout.refresh()
  399. }
  400. }
  401. if (request.method === 'CONNECT') {
  402. assert(client[kRunning] === 1)
  403. this.upgrade = true
  404. return 2
  405. }
  406. if (upgrade) {
  407. assert(client[kRunning] === 1)
  408. this.upgrade = true
  409. return 2
  410. }
  411. assert((this.headers.length & 1) === 0)
  412. this.headers = []
  413. this.headersSize = 0
  414. if (this.shouldKeepAlive && client[kPipelining]) {
  415. const keepAliveTimeout = this.keepAlive ? util.parseKeepAliveTimeout(this.keepAlive) : null
  416. if (keepAliveTimeout != null) {
  417. const timeout = Math.min(
  418. keepAliveTimeout - client[kKeepAliveTimeoutThreshold],
  419. client[kKeepAliveMaxTimeout]
  420. )
  421. if (timeout <= 0) {
  422. socket[kReset] = true
  423. } else {
  424. client[kKeepAliveTimeoutValue] = timeout
  425. }
  426. } else {
  427. client[kKeepAliveTimeoutValue] = client[kKeepAliveDefaultTimeout]
  428. }
  429. } else {
  430. // Stop more requests from being dispatched.
  431. socket[kReset] = true
  432. }
  433. const pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false
  434. if (request.aborted) {
  435. return -1
  436. }
  437. if (request.method === 'HEAD') {
  438. return 1
  439. }
  440. if (statusCode < 200) {
  441. return 1
  442. }
  443. if (socket[kBlocking]) {
  444. socket[kBlocking] = false
  445. client[kResume]()
  446. }
  447. return pause ? constants.ERROR.PAUSED : 0
  448. }
  449. onBody (buf) {
  450. const { client, socket, statusCode, maxResponseSize } = this
  451. if (socket.destroyed) {
  452. return -1
  453. }
  454. const request = client[kQueue][client[kRunningIdx]]
  455. assert(request)
  456. assert(this.timeoutType === TIMEOUT_BODY)
  457. if (this.timeout) {
  458. // istanbul ignore else: only for jest
  459. if (this.timeout.refresh) {
  460. this.timeout.refresh()
  461. }
  462. }
  463. assert(statusCode >= 200)
  464. if (maxResponseSize > -1 && this.bytesRead + buf.length > maxResponseSize) {
  465. util.destroy(socket, new ResponseExceededMaxSizeError())
  466. return -1
  467. }
  468. this.bytesRead += buf.length
  469. if (request.onData(buf) === false) {
  470. return constants.ERROR.PAUSED
  471. }
  472. }
  473. onMessageComplete () {
  474. const { client, socket, statusCode, upgrade, headers, contentLength, bytesRead, shouldKeepAlive } = this
  475. if (socket.destroyed && (!statusCode || shouldKeepAlive)) {
  476. return -1
  477. }
  478. if (upgrade) {
  479. return
  480. }
  481. assert(statusCode >= 100)
  482. assert((this.headers.length & 1) === 0)
  483. const request = client[kQueue][client[kRunningIdx]]
  484. assert(request)
  485. this.statusCode = null
  486. this.statusText = ''
  487. this.bytesRead = 0
  488. this.contentLength = ''
  489. this.keepAlive = ''
  490. this.connection = ''
  491. this.headers = []
  492. this.headersSize = 0
  493. if (statusCode < 200) {
  494. return
  495. }
  496. /* istanbul ignore next: should be handled by llhttp? */
  497. if (request.method !== 'HEAD' && contentLength && bytesRead !== parseInt(contentLength, 10)) {
  498. util.destroy(socket, new ResponseContentLengthMismatchError())
  499. return -1
  500. }
  501. request.onComplete(headers)
  502. client[kQueue][client[kRunningIdx]++] = null
  503. if (socket[kWriting]) {
  504. assert(client[kRunning] === 0)
  505. // Response completed before request.
  506. util.destroy(socket, new InformationalError('reset'))
  507. return constants.ERROR.PAUSED
  508. } else if (!shouldKeepAlive) {
  509. util.destroy(socket, new InformationalError('reset'))
  510. return constants.ERROR.PAUSED
  511. } else if (socket[kReset] && client[kRunning] === 0) {
  512. // Destroy socket once all requests have completed.
  513. // The request at the tail of the pipeline is the one
  514. // that requested reset and no further requests should
  515. // have been queued since then.
  516. util.destroy(socket, new InformationalError('reset'))
  517. return constants.ERROR.PAUSED
  518. } else if (client[kPipelining] == null || client[kPipelining] === 1) {
  519. // We must wait a full event loop cycle to reuse this socket to make sure
  520. // that non-spec compliant servers are not closing the connection even if they
  521. // said they won't.
  522. setImmediate(() => client[kResume]())
  523. } else {
  524. client[kResume]()
  525. }
  526. }
  527. }
  528. function onParserTimeout (parser) {
  529. const { socket, timeoutType, client, paused } = parser.deref()
  530. /* istanbul ignore else */
  531. if (timeoutType === TIMEOUT_HEADERS) {
  532. if (!socket[kWriting] || socket.writableNeedDrain || client[kRunning] > 1) {
  533. assert(!paused, 'cannot be paused while waiting for headers')
  534. util.destroy(socket, new HeadersTimeoutError())
  535. }
  536. } else if (timeoutType === TIMEOUT_BODY) {
  537. if (!paused) {
  538. util.destroy(socket, new BodyTimeoutError())
  539. }
  540. } else if (timeoutType === TIMEOUT_KEEP_ALIVE) {
  541. assert(client[kRunning] === 0 && client[kKeepAliveTimeoutValue])
  542. util.destroy(socket, new InformationalError('socket idle timeout'))
  543. }
  544. }
  545. async function connectH1 (client, socket) {
  546. client[kSocket] = socket
  547. if (!llhttpInstance) {
  548. llhttpInstance = await llhttpPromise
  549. llhttpPromise = null
  550. }
  551. socket[kNoRef] = false
  552. socket[kWriting] = false
  553. socket[kReset] = false
  554. socket[kBlocking] = false
  555. socket[kParser] = new Parser(client, socket, llhttpInstance)
  556. addListener(socket, 'error', function (err) {
  557. assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
  558. const parser = this[kParser]
  559. // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
  560. // to the user.
  561. if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
  562. // We treat all incoming data so for as a valid response.
  563. parser.onMessageComplete()
  564. return
  565. }
  566. this[kError] = err
  567. this[kClient][kOnError](err)
  568. })
  569. addListener(socket, 'readable', function () {
  570. const parser = this[kParser]
  571. if (parser) {
  572. parser.readMore()
  573. }
  574. })
  575. addListener(socket, 'end', function () {
  576. const parser = this[kParser]
  577. if (parser.statusCode && !parser.shouldKeepAlive) {
  578. // We treat all incoming data so far as a valid response.
  579. parser.onMessageComplete()
  580. return
  581. }
  582. util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
  583. })
  584. addListener(socket, 'close', function () {
  585. const client = this[kClient]
  586. const parser = this[kParser]
  587. if (parser) {
  588. if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
  589. // We treat all incoming data so far as a valid response.
  590. parser.onMessageComplete()
  591. }
  592. this[kParser].destroy()
  593. this[kParser] = null
  594. }
  595. const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
  596. client[kSocket] = null
  597. client[kHTTPContext] = null // TODO (fix): This is hacky...
  598. if (client.destroyed) {
  599. assert(client[kPending] === 0)
  600. // Fail entire queue.
  601. const requests = client[kQueue].splice(client[kRunningIdx])
  602. for (let i = 0; i < requests.length; i++) {
  603. const request = requests[i]
  604. util.errorRequest(client, request, err)
  605. }
  606. } else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
  607. // Fail head of pipeline.
  608. const request = client[kQueue][client[kRunningIdx]]
  609. client[kQueue][client[kRunningIdx]++] = null
  610. util.errorRequest(client, request, err)
  611. }
  612. client[kPendingIdx] = client[kRunningIdx]
  613. assert(client[kRunning] === 0)
  614. client.emit('disconnect', client[kUrl], [client], err)
  615. client[kResume]()
  616. })
  617. let closed = false
  618. socket.on('close', () => {
  619. closed = true
  620. })
  621. return {
  622. version: 'h1',
  623. defaultPipelining: 1,
  624. write (...args) {
  625. return writeH1(client, ...args)
  626. },
  627. resume () {
  628. resumeH1(client)
  629. },
  630. destroy (err, callback) {
  631. if (closed) {
  632. queueMicrotask(callback)
  633. } else {
  634. socket.destroy(err).on('close', callback)
  635. }
  636. },
  637. get destroyed () {
  638. return socket.destroyed
  639. },
  640. busy (request) {
  641. if (socket[kWriting] || socket[kReset] || socket[kBlocking]) {
  642. return true
  643. }
  644. if (request) {
  645. if (client[kRunning] > 0 && !request.idempotent) {
  646. // Non-idempotent request cannot be retried.
  647. // Ensure that no other requests are inflight and
  648. // could cause failure.
  649. return true
  650. }
  651. if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
  652. // Don't dispatch an upgrade until all preceding requests have completed.
  653. // A misbehaving server might upgrade the connection before all pipelined
  654. // request has completed.
  655. return true
  656. }
  657. if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
  658. (util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) {
  659. // Request with stream or iterator body can error while other requests
  660. // are inflight and indirectly error those as well.
  661. // Ensure this doesn't happen by waiting for inflight
  662. // to complete before dispatching.
  663. // Request with stream or iterator body cannot be retried.
  664. // Ensure that no other requests are inflight and
  665. // could cause failure.
  666. return true
  667. }
  668. }
  669. return false
  670. }
  671. }
  672. }
  673. function resumeH1 (client) {
  674. const socket = client[kSocket]
  675. if (socket && !socket.destroyed) {
  676. if (client[kSize] === 0) {
  677. if (!socket[kNoRef] && socket.unref) {
  678. socket.unref()
  679. socket[kNoRef] = true
  680. }
  681. } else if (socket[kNoRef] && socket.ref) {
  682. socket.ref()
  683. socket[kNoRef] = false
  684. }
  685. if (client[kSize] === 0) {
  686. if (socket[kParser].timeoutType !== TIMEOUT_KEEP_ALIVE) {
  687. socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_KEEP_ALIVE)
  688. }
  689. } else if (client[kRunning] > 0 && socket[kParser].statusCode < 200) {
  690. if (socket[kParser].timeoutType !== TIMEOUT_HEADERS) {
  691. const request = client[kQueue][client[kRunningIdx]]
  692. const headersTimeout = request.headersTimeout != null
  693. ? request.headersTimeout
  694. : client[kHeadersTimeout]
  695. socket[kParser].setTimeout(headersTimeout, TIMEOUT_HEADERS)
  696. }
  697. }
  698. }
  699. }
  700. // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
  701. function shouldSendContentLength (method) {
  702. return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
  703. }
  704. function writeH1 (client, request) {
  705. const { method, path, host, upgrade, blocking, reset } = request
  706. let { body, headers, contentLength } = request
  707. // https://tools.ietf.org/html/rfc7231#section-4.3.1
  708. // https://tools.ietf.org/html/rfc7231#section-4.3.2
  709. // https://tools.ietf.org/html/rfc7231#section-4.3.5
  710. // Sending a payload body on a request that does not
  711. // expect it can cause undefined behavior on some
  712. // servers and corrupt connection state. Do not
  713. // re-use the connection for further requests.
  714. const expectsPayload = (
  715. method === 'PUT' ||
  716. method === 'POST' ||
  717. method === 'PATCH' ||
  718. method === 'QUERY' ||
  719. method === 'PROPFIND' ||
  720. method === 'PROPPATCH'
  721. )
  722. if (util.isFormDataLike(body)) {
  723. if (!extractBody) {
  724. extractBody = require('../web/fetch/body.js').extractBody
  725. }
  726. const [bodyStream, contentType] = extractBody(body)
  727. if (request.contentType == null) {
  728. headers.push('content-type', contentType)
  729. }
  730. body = bodyStream.stream
  731. contentLength = bodyStream.length
  732. } else if (util.isBlobLike(body) && request.contentType == null && body.type) {
  733. headers.push('content-type', body.type)
  734. }
  735. if (body && typeof body.read === 'function') {
  736. // Try to read EOF in order to get length.
  737. body.read(0)
  738. }
  739. const bodyLength = util.bodyLength(body)
  740. contentLength = bodyLength ?? contentLength
  741. if (contentLength === null) {
  742. contentLength = request.contentLength
  743. }
  744. if (contentLength === 0 && !expectsPayload) {
  745. // https://tools.ietf.org/html/rfc7230#section-3.3.2
  746. // A user agent SHOULD NOT send a Content-Length header field when
  747. // the request message does not contain a payload body and the method
  748. // semantics do not anticipate such a body.
  749. contentLength = null
  750. }
  751. // https://github.com/nodejs/undici/issues/2046
  752. // A user agent may send a Content-Length header with 0 value, this should be allowed.
  753. if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength !== null && request.contentLength !== contentLength) {
  754. if (client[kStrictContentLength]) {
  755. util.errorRequest(client, request, new RequestContentLengthMismatchError())
  756. return false
  757. }
  758. process.emitWarning(new RequestContentLengthMismatchError())
  759. }
  760. const socket = client[kSocket]
  761. const abort = (err) => {
  762. if (request.aborted || request.completed) {
  763. return
  764. }
  765. util.errorRequest(client, request, err || new RequestAbortedError())
  766. util.destroy(body)
  767. util.destroy(socket, new InformationalError('aborted'))
  768. }
  769. try {
  770. request.onConnect(abort)
  771. } catch (err) {
  772. util.errorRequest(client, request, err)
  773. }
  774. if (request.aborted) {
  775. return false
  776. }
  777. if (method === 'HEAD') {
  778. // https://github.com/mcollina/undici/issues/258
  779. // Close after a HEAD request to interop with misbehaving servers
  780. // that may send a body in the response.
  781. socket[kReset] = true
  782. }
  783. if (upgrade || method === 'CONNECT') {
  784. // On CONNECT or upgrade, block pipeline from dispatching further
  785. // requests on this connection.
  786. socket[kReset] = true
  787. }
  788. if (reset != null) {
  789. socket[kReset] = reset
  790. }
  791. if (client[kMaxRequests] && socket[kCounter]++ >= client[kMaxRequests]) {
  792. socket[kReset] = true
  793. }
  794. if (blocking) {
  795. socket[kBlocking] = true
  796. }
  797. let header = `${method} ${path} HTTP/1.1\r\n`
  798. if (typeof host === 'string') {
  799. header += `host: ${host}\r\n`
  800. } else {
  801. header += client[kHostHeader]
  802. }
  803. if (upgrade) {
  804. header += `connection: upgrade\r\nupgrade: ${upgrade}\r\n`
  805. } else if (client[kPipelining] && !socket[kReset]) {
  806. header += 'connection: keep-alive\r\n'
  807. } else {
  808. header += 'connection: close\r\n'
  809. }
  810. if (Array.isArray(headers)) {
  811. for (let n = 0; n < headers.length; n += 2) {
  812. const key = headers[n + 0]
  813. const val = headers[n + 1]
  814. if (Array.isArray(val)) {
  815. for (let i = 0; i < val.length; i++) {
  816. header += `${key}: ${val[i]}\r\n`
  817. }
  818. } else {
  819. header += `${key}: ${val}\r\n`
  820. }
  821. }
  822. }
  823. if (channels.sendHeaders.hasSubscribers) {
  824. channels.sendHeaders.publish({ request, headers: header, socket })
  825. }
  826. /* istanbul ignore else: assertion */
  827. if (!body || bodyLength === 0) {
  828. writeBuffer(abort, null, client, request, socket, contentLength, header, expectsPayload)
  829. } else if (util.isBuffer(body)) {
  830. writeBuffer(abort, body, client, request, socket, contentLength, header, expectsPayload)
  831. } else if (util.isBlobLike(body)) {
  832. if (typeof body.stream === 'function') {
  833. writeIterable(abort, body.stream(), client, request, socket, contentLength, header, expectsPayload)
  834. } else {
  835. writeBlob(abort, body, client, request, socket, contentLength, header, expectsPayload)
  836. }
  837. } else if (util.isStream(body)) {
  838. writeStream(abort, body, client, request, socket, contentLength, header, expectsPayload)
  839. } else if (util.isIterable(body)) {
  840. writeIterable(abort, body, client, request, socket, contentLength, header, expectsPayload)
  841. } else {
  842. assert(false)
  843. }
  844. return true
  845. }
  846. function writeStream (abort, body, client, request, socket, contentLength, header, expectsPayload) {
  847. assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
  848. let finished = false
  849. const writer = new AsyncWriter({ abort, socket, request, contentLength, client, expectsPayload, header })
  850. const onData = function (chunk) {
  851. if (finished) {
  852. return
  853. }
  854. try {
  855. if (!writer.write(chunk) && this.pause) {
  856. this.pause()
  857. }
  858. } catch (err) {
  859. util.destroy(this, err)
  860. }
  861. }
  862. const onDrain = function () {
  863. if (finished) {
  864. return
  865. }
  866. if (body.resume) {
  867. body.resume()
  868. }
  869. }
  870. const onClose = function () {
  871. // 'close' might be emitted *before* 'error' for
  872. // broken streams. Wait a tick to avoid this case.
  873. queueMicrotask(() => {
  874. // It's only safe to remove 'error' listener after
  875. // 'close'.
  876. body.removeListener('error', onFinished)
  877. })
  878. if (!finished) {
  879. const err = new RequestAbortedError()
  880. queueMicrotask(() => onFinished(err))
  881. }
  882. }
  883. const onFinished = function (err) {
  884. if (finished) {
  885. return
  886. }
  887. finished = true
  888. assert(socket.destroyed || (socket[kWriting] && client[kRunning] <= 1))
  889. socket
  890. .off('drain', onDrain)
  891. .off('error', onFinished)
  892. body
  893. .removeListener('data', onData)
  894. .removeListener('end', onFinished)
  895. .removeListener('close', onClose)
  896. if (!err) {
  897. try {
  898. writer.end()
  899. } catch (er) {
  900. err = er
  901. }
  902. }
  903. writer.destroy(err)
  904. if (err && (err.code !== 'UND_ERR_INFO' || err.message !== 'reset')) {
  905. util.destroy(body, err)
  906. } else {
  907. util.destroy(body)
  908. }
  909. }
  910. body
  911. .on('data', onData)
  912. .on('end', onFinished)
  913. .on('error', onFinished)
  914. .on('close', onClose)
  915. if (body.resume) {
  916. body.resume()
  917. }
  918. socket
  919. .on('drain', onDrain)
  920. .on('error', onFinished)
  921. if (body.errorEmitted ?? body.errored) {
  922. setImmediate(() => onFinished(body.errored))
  923. } else if (body.endEmitted ?? body.readableEnded) {
  924. setImmediate(() => onFinished(null))
  925. }
  926. if (body.closeEmitted ?? body.closed) {
  927. setImmediate(onClose)
  928. }
  929. }
  930. function writeBuffer (abort, body, client, request, socket, contentLength, header, expectsPayload) {
  931. try {
  932. if (!body) {
  933. if (contentLength === 0) {
  934. socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
  935. } else {
  936. assert(contentLength === null, 'no body must not have content length')
  937. socket.write(`${header}\r\n`, 'latin1')
  938. }
  939. } else if (util.isBuffer(body)) {
  940. assert(contentLength === body.byteLength, 'buffer body must have content length')
  941. socket.cork()
  942. socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
  943. socket.write(body)
  944. socket.uncork()
  945. request.onBodySent(body)
  946. if (!expectsPayload && request.reset !== false) {
  947. socket[kReset] = true
  948. }
  949. }
  950. request.onRequestSent()
  951. client[kResume]()
  952. } catch (err) {
  953. abort(err)
  954. }
  955. }
  956. async function writeBlob (abort, body, client, request, socket, contentLength, header, expectsPayload) {
  957. assert(contentLength === body.size, 'blob body must have content length')
  958. try {
  959. if (contentLength != null && contentLength !== body.size) {
  960. throw new RequestContentLengthMismatchError()
  961. }
  962. const buffer = Buffer.from(await body.arrayBuffer())
  963. socket.cork()
  964. socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
  965. socket.write(buffer)
  966. socket.uncork()
  967. request.onBodySent(buffer)
  968. request.onRequestSent()
  969. if (!expectsPayload && request.reset !== false) {
  970. socket[kReset] = true
  971. }
  972. client[kResume]()
  973. } catch (err) {
  974. abort(err)
  975. }
  976. }
  977. async function writeIterable (abort, body, client, request, socket, contentLength, header, expectsPayload) {
  978. assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
  979. let callback = null
  980. function onDrain () {
  981. if (callback) {
  982. const cb = callback
  983. callback = null
  984. cb()
  985. }
  986. }
  987. const waitForDrain = () => new Promise((resolve, reject) => {
  988. assert(callback === null)
  989. if (socket[kError]) {
  990. reject(socket[kError])
  991. } else {
  992. callback = resolve
  993. }
  994. })
  995. socket
  996. .on('close', onDrain)
  997. .on('drain', onDrain)
  998. const writer = new AsyncWriter({ abort, socket, request, contentLength, client, expectsPayload, header })
  999. try {
  1000. // It's up to the user to somehow abort the async iterable.
  1001. for await (const chunk of body) {
  1002. if (socket[kError]) {
  1003. throw socket[kError]
  1004. }
  1005. if (!writer.write(chunk)) {
  1006. await waitForDrain()
  1007. }
  1008. }
  1009. writer.end()
  1010. } catch (err) {
  1011. writer.destroy(err)
  1012. } finally {
  1013. socket
  1014. .off('close', onDrain)
  1015. .off('drain', onDrain)
  1016. }
  1017. }
  1018. class AsyncWriter {
  1019. constructor ({ abort, socket, request, contentLength, client, expectsPayload, header }) {
  1020. this.socket = socket
  1021. this.request = request
  1022. this.contentLength = contentLength
  1023. this.client = client
  1024. this.bytesWritten = 0
  1025. this.expectsPayload = expectsPayload
  1026. this.header = header
  1027. this.abort = abort
  1028. socket[kWriting] = true
  1029. }
  1030. write (chunk) {
  1031. const { socket, request, contentLength, client, bytesWritten, expectsPayload, header } = this
  1032. if (socket[kError]) {
  1033. throw socket[kError]
  1034. }
  1035. if (socket.destroyed) {
  1036. return false
  1037. }
  1038. const len = Buffer.byteLength(chunk)
  1039. if (!len) {
  1040. return true
  1041. }
  1042. // We should defer writing chunks.
  1043. if (contentLength !== null && bytesWritten + len > contentLength) {
  1044. if (client[kStrictContentLength]) {
  1045. throw new RequestContentLengthMismatchError()
  1046. }
  1047. process.emitWarning(new RequestContentLengthMismatchError())
  1048. }
  1049. socket.cork()
  1050. if (bytesWritten === 0) {
  1051. if (!expectsPayload && request.reset !== false) {
  1052. socket[kReset] = true
  1053. }
  1054. if (contentLength === null) {
  1055. socket.write(`${header}transfer-encoding: chunked\r\n`, 'latin1')
  1056. } else {
  1057. socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
  1058. }
  1059. }
  1060. if (contentLength === null) {
  1061. socket.write(`\r\n${len.toString(16)}\r\n`, 'latin1')
  1062. }
  1063. this.bytesWritten += len
  1064. const ret = socket.write(chunk)
  1065. socket.uncork()
  1066. request.onBodySent(chunk)
  1067. if (!ret) {
  1068. if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
  1069. // istanbul ignore else: only for jest
  1070. if (socket[kParser].timeout.refresh) {
  1071. socket[kParser].timeout.refresh()
  1072. }
  1073. }
  1074. }
  1075. return ret
  1076. }
  1077. end () {
  1078. const { socket, contentLength, client, bytesWritten, expectsPayload, header, request } = this
  1079. request.onRequestSent()
  1080. socket[kWriting] = false
  1081. if (socket[kError]) {
  1082. throw socket[kError]
  1083. }
  1084. if (socket.destroyed) {
  1085. return
  1086. }
  1087. if (bytesWritten === 0) {
  1088. if (expectsPayload) {
  1089. // https://tools.ietf.org/html/rfc7230#section-3.3.2
  1090. // A user agent SHOULD send a Content-Length in a request message when
  1091. // no Transfer-Encoding is sent and the request method defines a meaning
  1092. // for an enclosed payload body.
  1093. socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
  1094. } else {
  1095. socket.write(`${header}\r\n`, 'latin1')
  1096. }
  1097. } else if (contentLength === null) {
  1098. socket.write('\r\n0\r\n\r\n', 'latin1')
  1099. }
  1100. if (contentLength !== null && bytesWritten !== contentLength) {
  1101. if (client[kStrictContentLength]) {
  1102. throw new RequestContentLengthMismatchError()
  1103. } else {
  1104. process.emitWarning(new RequestContentLengthMismatchError())
  1105. }
  1106. }
  1107. if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
  1108. // istanbul ignore else: only for jest
  1109. if (socket[kParser].timeout.refresh) {
  1110. socket[kParser].timeout.refresh()
  1111. }
  1112. }
  1113. client[kResume]()
  1114. }
  1115. destroy (err) {
  1116. const { socket, client, abort } = this
  1117. socket[kWriting] = false
  1118. if (err) {
  1119. assert(client[kRunning] <= 1, 'pipeline should only contain this request')
  1120. abort(err)
  1121. }
  1122. }
  1123. }
  1124. module.exports = connectH1