client.js 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622
  1. // @ts-check
  2. 'use strict'
  3. const assert = require('node:assert')
  4. const net = require('node:net')
  5. const http = require('node:http')
  6. const util = require('../core/util.js')
  7. const { channels } = require('../core/diagnostics.js')
  8. const Request = require('../core/request.js')
  9. const DispatcherBase = require('./dispatcher-base')
  10. const {
  11. InvalidArgumentError,
  12. InformationalError,
  13. ClientDestroyedError
  14. } = require('../core/errors.js')
  15. const buildConnector = require('../core/connect.js')
  16. const {
  17. kUrl,
  18. kServerName,
  19. kClient,
  20. kBusy,
  21. kConnect,
  22. kResuming,
  23. kRunning,
  24. kPending,
  25. kSize,
  26. kQueue,
  27. kConnected,
  28. kConnecting,
  29. kNeedDrain,
  30. kKeepAliveDefaultTimeout,
  31. kHostHeader,
  32. kPendingIdx,
  33. kRunningIdx,
  34. kError,
  35. kPipelining,
  36. kKeepAliveTimeoutValue,
  37. kMaxHeadersSize,
  38. kKeepAliveMaxTimeout,
  39. kKeepAliveTimeoutThreshold,
  40. kHeadersTimeout,
  41. kBodyTimeout,
  42. kStrictContentLength,
  43. kConnector,
  44. kMaxRedirections,
  45. kMaxRequests,
  46. kCounter,
  47. kClose,
  48. kDestroy,
  49. kDispatch,
  50. kInterceptors,
  51. kLocalAddress,
  52. kMaxResponseSize,
  53. kOnError,
  54. kHTTPContext,
  55. kMaxConcurrentStreams,
  56. kResume
  57. } = require('../core/symbols.js')
  58. const connectH1 = require('./client-h1.js')
  59. const connectH2 = require('./client-h2.js')
  60. let deprecatedInterceptorWarned = false
  61. const kClosedResolve = Symbol('kClosedResolve')
  62. const noop = () => {}
  63. function getPipelining (client) {
  64. return client[kPipelining] ?? client[kHTTPContext]?.defaultPipelining ?? 1
  65. }
  66. /**
  67. * @type {import('../../types/client.js').default}
  68. */
  69. class Client extends DispatcherBase {
  70. /**
  71. *
  72. * @param {string|URL} url
  73. * @param {import('../../types/client.js').Client.Options} options
  74. */
  75. constructor (url, {
  76. interceptors,
  77. maxHeaderSize,
  78. headersTimeout,
  79. socketTimeout,
  80. requestTimeout,
  81. connectTimeout,
  82. bodyTimeout,
  83. idleTimeout,
  84. keepAlive,
  85. keepAliveTimeout,
  86. maxKeepAliveTimeout,
  87. keepAliveMaxTimeout,
  88. keepAliveTimeoutThreshold,
  89. socketPath,
  90. pipelining,
  91. tls,
  92. strictContentLength,
  93. maxCachedSessions,
  94. maxRedirections,
  95. connect,
  96. maxRequestsPerClient,
  97. localAddress,
  98. maxResponseSize,
  99. autoSelectFamily,
  100. autoSelectFamilyAttemptTimeout,
  101. // h2
  102. maxConcurrentStreams,
  103. allowH2
  104. } = {}) {
  105. super()
  106. if (keepAlive !== undefined) {
  107. throw new InvalidArgumentError('unsupported keepAlive, use pipelining=0 instead')
  108. }
  109. if (socketTimeout !== undefined) {
  110. throw new InvalidArgumentError('unsupported socketTimeout, use headersTimeout & bodyTimeout instead')
  111. }
  112. if (requestTimeout !== undefined) {
  113. throw new InvalidArgumentError('unsupported requestTimeout, use headersTimeout & bodyTimeout instead')
  114. }
  115. if (idleTimeout !== undefined) {
  116. throw new InvalidArgumentError('unsupported idleTimeout, use keepAliveTimeout instead')
  117. }
  118. if (maxKeepAliveTimeout !== undefined) {
  119. throw new InvalidArgumentError('unsupported maxKeepAliveTimeout, use keepAliveMaxTimeout instead')
  120. }
  121. if (maxHeaderSize != null && !Number.isFinite(maxHeaderSize)) {
  122. throw new InvalidArgumentError('invalid maxHeaderSize')
  123. }
  124. if (socketPath != null && typeof socketPath !== 'string') {
  125. throw new InvalidArgumentError('invalid socketPath')
  126. }
  127. if (connectTimeout != null && (!Number.isFinite(connectTimeout) || connectTimeout < 0)) {
  128. throw new InvalidArgumentError('invalid connectTimeout')
  129. }
  130. if (keepAliveTimeout != null && (!Number.isFinite(keepAliveTimeout) || keepAliveTimeout <= 0)) {
  131. throw new InvalidArgumentError('invalid keepAliveTimeout')
  132. }
  133. if (keepAliveMaxTimeout != null && (!Number.isFinite(keepAliveMaxTimeout) || keepAliveMaxTimeout <= 0)) {
  134. throw new InvalidArgumentError('invalid keepAliveMaxTimeout')
  135. }
  136. if (keepAliveTimeoutThreshold != null && !Number.isFinite(keepAliveTimeoutThreshold)) {
  137. throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold')
  138. }
  139. if (headersTimeout != null && (!Number.isInteger(headersTimeout) || headersTimeout < 0)) {
  140. throw new InvalidArgumentError('headersTimeout must be a positive integer or zero')
  141. }
  142. if (bodyTimeout != null && (!Number.isInteger(bodyTimeout) || bodyTimeout < 0)) {
  143. throw new InvalidArgumentError('bodyTimeout must be a positive integer or zero')
  144. }
  145. if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
  146. throw new InvalidArgumentError('connect must be a function or an object')
  147. }
  148. if (maxRedirections != null && (!Number.isInteger(maxRedirections) || maxRedirections < 0)) {
  149. throw new InvalidArgumentError('maxRedirections must be a positive number')
  150. }
  151. if (maxRequestsPerClient != null && (!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0)) {
  152. throw new InvalidArgumentError('maxRequestsPerClient must be a positive number')
  153. }
  154. if (localAddress != null && (typeof localAddress !== 'string' || net.isIP(localAddress) === 0)) {
  155. throw new InvalidArgumentError('localAddress must be valid string IP address')
  156. }
  157. if (maxResponseSize != null && (!Number.isInteger(maxResponseSize) || maxResponseSize < -1)) {
  158. throw new InvalidArgumentError('maxResponseSize must be a positive number')
  159. }
  160. if (
  161. autoSelectFamilyAttemptTimeout != null &&
  162. (!Number.isInteger(autoSelectFamilyAttemptTimeout) || autoSelectFamilyAttemptTimeout < -1)
  163. ) {
  164. throw new InvalidArgumentError('autoSelectFamilyAttemptTimeout must be a positive number')
  165. }
  166. // h2
  167. if (allowH2 != null && typeof allowH2 !== 'boolean') {
  168. throw new InvalidArgumentError('allowH2 must be a valid boolean value')
  169. }
  170. if (maxConcurrentStreams != null && (typeof maxConcurrentStreams !== 'number' || maxConcurrentStreams < 1)) {
  171. throw new InvalidArgumentError('maxConcurrentStreams must be a positive integer, greater than 0')
  172. }
  173. if (typeof connect !== 'function') {
  174. connect = buildConnector({
  175. ...tls,
  176. maxCachedSessions,
  177. allowH2,
  178. socketPath,
  179. timeout: connectTimeout,
  180. ...(autoSelectFamily ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined),
  181. ...connect
  182. })
  183. }
  184. if (interceptors?.Client && Array.isArray(interceptors.Client)) {
  185. this[kInterceptors] = interceptors.Client
  186. if (!deprecatedInterceptorWarned) {
  187. deprecatedInterceptorWarned = true
  188. process.emitWarning('Client.Options#interceptor is deprecated. Use Dispatcher#compose instead.', {
  189. code: 'UNDICI-CLIENT-INTERCEPTOR-DEPRECATED'
  190. })
  191. }
  192. } else {
  193. this[kInterceptors] = [createRedirectInterceptor({ maxRedirections })]
  194. }
  195. this[kUrl] = util.parseOrigin(url)
  196. this[kConnector] = connect
  197. this[kPipelining] = pipelining != null ? pipelining : 1
  198. this[kMaxHeadersSize] = maxHeaderSize || http.maxHeaderSize
  199. this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout
  200. this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout
  201. this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 2e3 : keepAliveTimeoutThreshold
  202. this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout]
  203. this[kServerName] = null
  204. this[kLocalAddress] = localAddress != null ? localAddress : null
  205. this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming
  206. this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming
  207. this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n`
  208. this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 300e3
  209. this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 300e3
  210. this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength
  211. this[kMaxRedirections] = maxRedirections
  212. this[kMaxRequests] = maxRequestsPerClient
  213. this[kClosedResolve] = null
  214. this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
  215. this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server
  216. this[kHTTPContext] = null
  217. // kQueue is built up of 3 sections separated by
  218. // the kRunningIdx and kPendingIdx indices.
  219. // | complete | running | pending |
  220. // ^ kRunningIdx ^ kPendingIdx ^ kQueue.length
  221. // kRunningIdx points to the first running element.
  222. // kPendingIdx points to the first pending element.
  223. // This implements a fast queue with an amortized
  224. // time of O(1).
  225. this[kQueue] = []
  226. this[kRunningIdx] = 0
  227. this[kPendingIdx] = 0
  228. this[kResume] = (sync) => resume(this, sync)
  229. this[kOnError] = (err) => onError(this, err)
  230. }
  231. get pipelining () {
  232. return this[kPipelining]
  233. }
  234. set pipelining (value) {
  235. this[kPipelining] = value
  236. this[kResume](true)
  237. }
  238. get [kPending] () {
  239. return this[kQueue].length - this[kPendingIdx]
  240. }
  241. get [kRunning] () {
  242. return this[kPendingIdx] - this[kRunningIdx]
  243. }
  244. get [kSize] () {
  245. return this[kQueue].length - this[kRunningIdx]
  246. }
  247. get [kConnected] () {
  248. return !!this[kHTTPContext] && !this[kConnecting] && !this[kHTTPContext].destroyed
  249. }
  250. get [kBusy] () {
  251. return Boolean(
  252. this[kHTTPContext]?.busy(null) ||
  253. (this[kSize] >= (getPipelining(this) || 1)) ||
  254. this[kPending] > 0
  255. )
  256. }
  257. /* istanbul ignore: only used for test */
  258. [kConnect] (cb) {
  259. connect(this)
  260. this.once('connect', cb)
  261. }
  262. [kDispatch] (opts, handler) {
  263. const origin = opts.origin || this[kUrl].origin
  264. const request = new Request(origin, opts, handler)
  265. this[kQueue].push(request)
  266. if (this[kResuming]) {
  267. // Do nothing.
  268. } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
  269. // Wait a tick in case stream/iterator is ended in the same tick.
  270. this[kResuming] = 1
  271. queueMicrotask(() => resume(this))
  272. } else {
  273. this[kResume](true)
  274. }
  275. if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) {
  276. this[kNeedDrain] = 2
  277. }
  278. return this[kNeedDrain] < 2
  279. }
  280. async [kClose] () {
  281. // TODO: for H2 we need to gracefully flush the remaining enqueued
  282. // request and close each stream.
  283. return new Promise((resolve) => {
  284. if (this[kSize]) {
  285. this[kClosedResolve] = resolve
  286. } else {
  287. resolve(null)
  288. }
  289. })
  290. }
  291. async [kDestroy] (err) {
  292. return new Promise((resolve) => {
  293. const requests = this[kQueue].splice(this[kPendingIdx])
  294. for (let i = 0; i < requests.length; i++) {
  295. const request = requests[i]
  296. util.errorRequest(this, request, err)
  297. }
  298. const callback = () => {
  299. if (this[kClosedResolve]) {
  300. // TODO (fix): Should we error here with ClientDestroyedError?
  301. this[kClosedResolve]()
  302. this[kClosedResolve] = null
  303. }
  304. resolve(null)
  305. }
  306. if (this[kHTTPContext]) {
  307. this[kHTTPContext].destroy(err, callback)
  308. this[kHTTPContext] = null
  309. } else {
  310. queueMicrotask(callback)
  311. }
  312. this[kResume]()
  313. })
  314. }
  315. }
  316. const createRedirectInterceptor = require('../interceptor/redirect-interceptor.js')
  317. function onError (client, err) {
  318. if (
  319. client[kRunning] === 0 &&
  320. err.code !== 'UND_ERR_INFO' &&
  321. err.code !== 'UND_ERR_SOCKET'
  322. ) {
  323. // Error is not caused by running request and not a recoverable
  324. // socket error.
  325. assert(client[kPendingIdx] === client[kRunningIdx])
  326. const requests = client[kQueue].splice(client[kRunningIdx])
  327. for (let i = 0; i < requests.length; i++) {
  328. const request = requests[i]
  329. util.errorRequest(client, request, err)
  330. }
  331. assert(client[kSize] === 0)
  332. }
  333. }
  334. /**
  335. * @param {Client} client
  336. * @returns
  337. */
  338. async function connect (client) {
  339. assert(!client[kConnecting])
  340. assert(!client[kHTTPContext])
  341. let { host, hostname, protocol, port } = client[kUrl]
  342. // Resolve ipv6
  343. if (hostname[0] === '[') {
  344. const idx = hostname.indexOf(']')
  345. assert(idx !== -1)
  346. const ip = hostname.substring(1, idx)
  347. assert(net.isIP(ip))
  348. hostname = ip
  349. }
  350. client[kConnecting] = true
  351. if (channels.beforeConnect.hasSubscribers) {
  352. channels.beforeConnect.publish({
  353. connectParams: {
  354. host,
  355. hostname,
  356. protocol,
  357. port,
  358. version: client[kHTTPContext]?.version,
  359. servername: client[kServerName],
  360. localAddress: client[kLocalAddress]
  361. },
  362. connector: client[kConnector]
  363. })
  364. }
  365. try {
  366. const socket = await new Promise((resolve, reject) => {
  367. client[kConnector]({
  368. host,
  369. hostname,
  370. protocol,
  371. port,
  372. servername: client[kServerName],
  373. localAddress: client[kLocalAddress]
  374. }, (err, socket) => {
  375. if (err) {
  376. reject(err)
  377. } else {
  378. resolve(socket)
  379. }
  380. })
  381. })
  382. if (client.destroyed) {
  383. util.destroy(socket.on('error', noop), new ClientDestroyedError())
  384. return
  385. }
  386. assert(socket)
  387. try {
  388. client[kHTTPContext] = socket.alpnProtocol === 'h2'
  389. ? await connectH2(client, socket)
  390. : await connectH1(client, socket)
  391. } catch (err) {
  392. socket.destroy().on('error', noop)
  393. throw err
  394. }
  395. client[kConnecting] = false
  396. socket[kCounter] = 0
  397. socket[kMaxRequests] = client[kMaxRequests]
  398. socket[kClient] = client
  399. socket[kError] = null
  400. if (channels.connected.hasSubscribers) {
  401. channels.connected.publish({
  402. connectParams: {
  403. host,
  404. hostname,
  405. protocol,
  406. port,
  407. version: client[kHTTPContext]?.version,
  408. servername: client[kServerName],
  409. localAddress: client[kLocalAddress]
  410. },
  411. connector: client[kConnector],
  412. socket
  413. })
  414. }
  415. client.emit('connect', client[kUrl], [client])
  416. } catch (err) {
  417. if (client.destroyed) {
  418. return
  419. }
  420. client[kConnecting] = false
  421. if (channels.connectError.hasSubscribers) {
  422. channels.connectError.publish({
  423. connectParams: {
  424. host,
  425. hostname,
  426. protocol,
  427. port,
  428. version: client[kHTTPContext]?.version,
  429. servername: client[kServerName],
  430. localAddress: client[kLocalAddress]
  431. },
  432. connector: client[kConnector],
  433. error: err
  434. })
  435. }
  436. if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') {
  437. assert(client[kRunning] === 0)
  438. while (client[kPending] > 0 && client[kQueue][client[kPendingIdx]].servername === client[kServerName]) {
  439. const request = client[kQueue][client[kPendingIdx]++]
  440. util.errorRequest(client, request, err)
  441. }
  442. } else {
  443. onError(client, err)
  444. }
  445. client.emit('connectionError', client[kUrl], [client], err)
  446. }
  447. client[kResume]()
  448. }
  449. function emitDrain (client) {
  450. client[kNeedDrain] = 0
  451. client.emit('drain', client[kUrl], [client])
  452. }
  453. function resume (client, sync) {
  454. if (client[kResuming] === 2) {
  455. return
  456. }
  457. client[kResuming] = 2
  458. _resume(client, sync)
  459. client[kResuming] = 0
  460. if (client[kRunningIdx] > 256) {
  461. client[kQueue].splice(0, client[kRunningIdx])
  462. client[kPendingIdx] -= client[kRunningIdx]
  463. client[kRunningIdx] = 0
  464. }
  465. }
  466. function _resume (client, sync) {
  467. while (true) {
  468. if (client.destroyed) {
  469. assert(client[kPending] === 0)
  470. return
  471. }
  472. if (client[kClosedResolve] && !client[kSize]) {
  473. client[kClosedResolve]()
  474. client[kClosedResolve] = null
  475. return
  476. }
  477. if (client[kHTTPContext]) {
  478. client[kHTTPContext].resume()
  479. }
  480. if (client[kBusy]) {
  481. client[kNeedDrain] = 2
  482. } else if (client[kNeedDrain] === 2) {
  483. if (sync) {
  484. client[kNeedDrain] = 1
  485. queueMicrotask(() => emitDrain(client))
  486. } else {
  487. emitDrain(client)
  488. }
  489. continue
  490. }
  491. if (client[kPending] === 0) {
  492. return
  493. }
  494. if (client[kRunning] >= (getPipelining(client) || 1)) {
  495. return
  496. }
  497. const request = client[kQueue][client[kPendingIdx]]
  498. if (client[kUrl].protocol === 'https:' && client[kServerName] !== request.servername) {
  499. if (client[kRunning] > 0) {
  500. return
  501. }
  502. client[kServerName] = request.servername
  503. client[kHTTPContext]?.destroy(new InformationalError('servername changed'), () => {
  504. client[kHTTPContext] = null
  505. resume(client)
  506. })
  507. }
  508. if (client[kConnecting]) {
  509. return
  510. }
  511. if (!client[kHTTPContext]) {
  512. connect(client)
  513. return
  514. }
  515. if (client[kHTTPContext].destroyed) {
  516. return
  517. }
  518. if (client[kHTTPContext].busy(request)) {
  519. return
  520. }
  521. if (!request.aborted && client[kHTTPContext].write(request)) {
  522. client[kPendingIdx]++
  523. } else {
  524. client[kQueue].splice(client[kPendingIdx], 1)
  525. }
  526. }
  527. }
  528. module.exports = Client