|
- // @ts-check
- 'use strict'
- const assert = require('node:assert')
- const net = require('node:net')
- const http = require('node:http')
- const util = require('../core/util.js')
- const { channels } = require('../core/diagnostics.js')
- const Request = require('../core/request.js')
- const DispatcherBase = require('./dispatcher-base')
- const {
- InvalidArgumentError,
- InformationalError,
- ClientDestroyedError
- } = require('../core/errors.js')
- const buildConnector = require('../core/connect.js')
- const {
- kUrl,
- kServerName,
- kClient,
- kBusy,
- kConnect,
- kResuming,
- kRunning,
- kPending,
- kSize,
- kQueue,
- kConnected,
- kConnecting,
- kNeedDrain,
- kKeepAliveDefaultTimeout,
- kHostHeader,
- kPendingIdx,
- kRunningIdx,
- kError,
- kPipelining,
- kKeepAliveTimeoutValue,
- kMaxHeadersSize,
- kKeepAliveMaxTimeout,
- kKeepAliveTimeoutThreshold,
- kHeadersTimeout,
- kBodyTimeout,
- kStrictContentLength,
- kConnector,
- kMaxRedirections,
- kMaxRequests,
- kCounter,
- kClose,
- kDestroy,
- kDispatch,
- kInterceptors,
- kLocalAddress,
- kMaxResponseSize,
- kOnError,
- kHTTPContext,
- kMaxConcurrentStreams,
- kResume
- } = require('../core/symbols.js')
- const connectH1 = require('./client-h1.js')
- const connectH2 = require('./client-h2.js')
- let deprecatedInterceptorWarned = false
- const kClosedResolve = Symbol('kClosedResolve')
- const noop = () => {}
- function getPipelining (client) {
- return client[kPipelining] ?? client[kHTTPContext]?.defaultPipelining ?? 1
- }
- /**
- * @type {import('../../types/client.js').default}
- */
- class Client extends DispatcherBase {
- /**
- *
- * @param {string|URL} url
- * @param {import('../../types/client.js').Client.Options} options
- */
- constructor (url, {
- interceptors,
- maxHeaderSize,
- headersTimeout,
- socketTimeout,
- requestTimeout,
- connectTimeout,
- bodyTimeout,
- idleTimeout,
- keepAlive,
- keepAliveTimeout,
- maxKeepAliveTimeout,
- keepAliveMaxTimeout,
- keepAliveTimeoutThreshold,
- socketPath,
- pipelining,
- tls,
- strictContentLength,
- maxCachedSessions,
- maxRedirections,
- connect,
- maxRequestsPerClient,
- localAddress,
- maxResponseSize,
- autoSelectFamily,
- autoSelectFamilyAttemptTimeout,
- // h2
- maxConcurrentStreams,
- allowH2
- } = {}) {
- super()
- if (keepAlive !== undefined) {
- throw new InvalidArgumentError('unsupported keepAlive, use pipelining=0 instead')
- }
- if (socketTimeout !== undefined) {
- throw new InvalidArgumentError('unsupported socketTimeout, use headersTimeout & bodyTimeout instead')
- }
- if (requestTimeout !== undefined) {
- throw new InvalidArgumentError('unsupported requestTimeout, use headersTimeout & bodyTimeout instead')
- }
- if (idleTimeout !== undefined) {
- throw new InvalidArgumentError('unsupported idleTimeout, use keepAliveTimeout instead')
- }
- if (maxKeepAliveTimeout !== undefined) {
- throw new InvalidArgumentError('unsupported maxKeepAliveTimeout, use keepAliveMaxTimeout instead')
- }
- if (maxHeaderSize != null && !Number.isFinite(maxHeaderSize)) {
- throw new InvalidArgumentError('invalid maxHeaderSize')
- }
- if (socketPath != null && typeof socketPath !== 'string') {
- throw new InvalidArgumentError('invalid socketPath')
- }
- if (connectTimeout != null && (!Number.isFinite(connectTimeout) || connectTimeout < 0)) {
- throw new InvalidArgumentError('invalid connectTimeout')
- }
- if (keepAliveTimeout != null && (!Number.isFinite(keepAliveTimeout) || keepAliveTimeout <= 0)) {
- throw new InvalidArgumentError('invalid keepAliveTimeout')
- }
- if (keepAliveMaxTimeout != null && (!Number.isFinite(keepAliveMaxTimeout) || keepAliveMaxTimeout <= 0)) {
- throw new InvalidArgumentError('invalid keepAliveMaxTimeout')
- }
- if (keepAliveTimeoutThreshold != null && !Number.isFinite(keepAliveTimeoutThreshold)) {
- throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold')
- }
- if (headersTimeout != null && (!Number.isInteger(headersTimeout) || headersTimeout < 0)) {
- throw new InvalidArgumentError('headersTimeout must be a positive integer or zero')
- }
- if (bodyTimeout != null && (!Number.isInteger(bodyTimeout) || bodyTimeout < 0)) {
- throw new InvalidArgumentError('bodyTimeout must be a positive integer or zero')
- }
- if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
- throw new InvalidArgumentError('connect must be a function or an object')
- }
- if (maxRedirections != null && (!Number.isInteger(maxRedirections) || maxRedirections < 0)) {
- throw new InvalidArgumentError('maxRedirections must be a positive number')
- }
- if (maxRequestsPerClient != null && (!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0)) {
- throw new InvalidArgumentError('maxRequestsPerClient must be a positive number')
- }
- if (localAddress != null && (typeof localAddress !== 'string' || net.isIP(localAddress) === 0)) {
- throw new InvalidArgumentError('localAddress must be valid string IP address')
- }
- if (maxResponseSize != null && (!Number.isInteger(maxResponseSize) || maxResponseSize < -1)) {
- throw new InvalidArgumentError('maxResponseSize must be a positive number')
- }
- if (
- autoSelectFamilyAttemptTimeout != null &&
- (!Number.isInteger(autoSelectFamilyAttemptTimeout) || autoSelectFamilyAttemptTimeout < -1)
- ) {
- throw new InvalidArgumentError('autoSelectFamilyAttemptTimeout must be a positive number')
- }
- // h2
- if (allowH2 != null && typeof allowH2 !== 'boolean') {
- throw new InvalidArgumentError('allowH2 must be a valid boolean value')
- }
- if (maxConcurrentStreams != null && (typeof maxConcurrentStreams !== 'number' || maxConcurrentStreams < 1)) {
- throw new InvalidArgumentError('maxConcurrentStreams must be a positive integer, greater than 0')
- }
- if (typeof connect !== 'function') {
- connect = buildConnector({
- ...tls,
- maxCachedSessions,
- allowH2,
- socketPath,
- timeout: connectTimeout,
- ...(autoSelectFamily ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined),
- ...connect
- })
- }
- if (interceptors?.Client && Array.isArray(interceptors.Client)) {
- this[kInterceptors] = interceptors.Client
- if (!deprecatedInterceptorWarned) {
- deprecatedInterceptorWarned = true
- process.emitWarning('Client.Options#interceptor is deprecated. Use Dispatcher#compose instead.', {
- code: 'UNDICI-CLIENT-INTERCEPTOR-DEPRECATED'
- })
- }
- } else {
- this[kInterceptors] = [createRedirectInterceptor({ maxRedirections })]
- }
- this[kUrl] = util.parseOrigin(url)
- this[kConnector] = connect
- this[kPipelining] = pipelining != null ? pipelining : 1
- this[kMaxHeadersSize] = maxHeaderSize || http.maxHeaderSize
- this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout
- this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout
- this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 2e3 : keepAliveTimeoutThreshold
- this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout]
- this[kServerName] = null
- this[kLocalAddress] = localAddress != null ? localAddress : null
- this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming
- this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming
- this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n`
- this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 300e3
- this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 300e3
- this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength
- this[kMaxRedirections] = maxRedirections
- this[kMaxRequests] = maxRequestsPerClient
- this[kClosedResolve] = null
- this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
- this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server
- this[kHTTPContext] = null
- // kQueue is built up of 3 sections separated by
- // the kRunningIdx and kPendingIdx indices.
- // | complete | running | pending |
- // ^ kRunningIdx ^ kPendingIdx ^ kQueue.length
- // kRunningIdx points to the first running element.
- // kPendingIdx points to the first pending element.
- // This implements a fast queue with an amortized
- // time of O(1).
- this[kQueue] = []
- this[kRunningIdx] = 0
- this[kPendingIdx] = 0
- this[kResume] = (sync) => resume(this, sync)
- this[kOnError] = (err) => onError(this, err)
- }
- get pipelining () {
- return this[kPipelining]
- }
- set pipelining (value) {
- this[kPipelining] = value
- this[kResume](true)
- }
- get [kPending] () {
- return this[kQueue].length - this[kPendingIdx]
- }
- get [kRunning] () {
- return this[kPendingIdx] - this[kRunningIdx]
- }
- get [kSize] () {
- return this[kQueue].length - this[kRunningIdx]
- }
- get [kConnected] () {
- return !!this[kHTTPContext] && !this[kConnecting] && !this[kHTTPContext].destroyed
- }
- get [kBusy] () {
- return Boolean(
- this[kHTTPContext]?.busy(null) ||
- (this[kSize] >= (getPipelining(this) || 1)) ||
- this[kPending] > 0
- )
- }
- /* istanbul ignore: only used for test */
- [kConnect] (cb) {
- connect(this)
- this.once('connect', cb)
- }
- [kDispatch] (opts, handler) {
- const origin = opts.origin || this[kUrl].origin
- const request = new Request(origin, opts, handler)
- this[kQueue].push(request)
- if (this[kResuming]) {
- // Do nothing.
- } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
- // Wait a tick in case stream/iterator is ended in the same tick.
- this[kResuming] = 1
- queueMicrotask(() => resume(this))
- } else {
- this[kResume](true)
- }
- if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) {
- this[kNeedDrain] = 2
- }
- return this[kNeedDrain] < 2
- }
- async [kClose] () {
- // TODO: for H2 we need to gracefully flush the remaining enqueued
- // request and close each stream.
- return new Promise((resolve) => {
- if (this[kSize]) {
- this[kClosedResolve] = resolve
- } else {
- resolve(null)
- }
- })
- }
- async [kDestroy] (err) {
- return new Promise((resolve) => {
- const requests = this[kQueue].splice(this[kPendingIdx])
- for (let i = 0; i < requests.length; i++) {
- const request = requests[i]
- util.errorRequest(this, request, err)
- }
- const callback = () => {
- if (this[kClosedResolve]) {
- // TODO (fix): Should we error here with ClientDestroyedError?
- this[kClosedResolve]()
- this[kClosedResolve] = null
- }
- resolve(null)
- }
- if (this[kHTTPContext]) {
- this[kHTTPContext].destroy(err, callback)
- this[kHTTPContext] = null
- } else {
- queueMicrotask(callback)
- }
- this[kResume]()
- })
- }
- }
- const createRedirectInterceptor = require('../interceptor/redirect-interceptor.js')
- function onError (client, err) {
- if (
- client[kRunning] === 0 &&
- err.code !== 'UND_ERR_INFO' &&
- err.code !== 'UND_ERR_SOCKET'
- ) {
- // Error is not caused by running request and not a recoverable
- // socket error.
- assert(client[kPendingIdx] === client[kRunningIdx])
- const requests = client[kQueue].splice(client[kRunningIdx])
- for (let i = 0; i < requests.length; i++) {
- const request = requests[i]
- util.errorRequest(client, request, err)
- }
- assert(client[kSize] === 0)
- }
- }
- /**
- * @param {Client} client
- * @returns
- */
- async function connect (client) {
- assert(!client[kConnecting])
- assert(!client[kHTTPContext])
- let { host, hostname, protocol, port } = client[kUrl]
- // Resolve ipv6
- if (hostname[0] === '[') {
- const idx = hostname.indexOf(']')
- assert(idx !== -1)
- const ip = hostname.substring(1, idx)
- assert(net.isIP(ip))
- hostname = ip
- }
- client[kConnecting] = true
- if (channels.beforeConnect.hasSubscribers) {
- channels.beforeConnect.publish({
- connectParams: {
- host,
- hostname,
- protocol,
- port,
- version: client[kHTTPContext]?.version,
- servername: client[kServerName],
- localAddress: client[kLocalAddress]
- },
- connector: client[kConnector]
- })
- }
- try {
- const socket = await new Promise((resolve, reject) => {
- client[kConnector]({
- host,
- hostname,
- protocol,
- port,
- servername: client[kServerName],
- localAddress: client[kLocalAddress]
- }, (err, socket) => {
- if (err) {
- reject(err)
- } else {
- resolve(socket)
- }
- })
- })
- if (client.destroyed) {
- util.destroy(socket.on('error', noop), new ClientDestroyedError())
- return
- }
- assert(socket)
- try {
- client[kHTTPContext] = socket.alpnProtocol === 'h2'
- ? await connectH2(client, socket)
- : await connectH1(client, socket)
- } catch (err) {
- socket.destroy().on('error', noop)
- throw err
- }
- client[kConnecting] = false
- socket[kCounter] = 0
- socket[kMaxRequests] = client[kMaxRequests]
- socket[kClient] = client
- socket[kError] = null
- if (channels.connected.hasSubscribers) {
- channels.connected.publish({
- connectParams: {
- host,
- hostname,
- protocol,
- port,
- version: client[kHTTPContext]?.version,
- servername: client[kServerName],
- localAddress: client[kLocalAddress]
- },
- connector: client[kConnector],
- socket
- })
- }
- client.emit('connect', client[kUrl], [client])
- } catch (err) {
- if (client.destroyed) {
- return
- }
- client[kConnecting] = false
- if (channels.connectError.hasSubscribers) {
- channels.connectError.publish({
- connectParams: {
- host,
- hostname,
- protocol,
- port,
- version: client[kHTTPContext]?.version,
- servername: client[kServerName],
- localAddress: client[kLocalAddress]
- },
- connector: client[kConnector],
- error: err
- })
- }
- if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') {
- assert(client[kRunning] === 0)
- while (client[kPending] > 0 && client[kQueue][client[kPendingIdx]].servername === client[kServerName]) {
- const request = client[kQueue][client[kPendingIdx]++]
- util.errorRequest(client, request, err)
- }
- } else {
- onError(client, err)
- }
- client.emit('connectionError', client[kUrl], [client], err)
- }
- client[kResume]()
- }
- function emitDrain (client) {
- client[kNeedDrain] = 0
- client.emit('drain', client[kUrl], [client])
- }
- function resume (client, sync) {
- if (client[kResuming] === 2) {
- return
- }
- client[kResuming] = 2
- _resume(client, sync)
- client[kResuming] = 0
- if (client[kRunningIdx] > 256) {
- client[kQueue].splice(0, client[kRunningIdx])
- client[kPendingIdx] -= client[kRunningIdx]
- client[kRunningIdx] = 0
- }
- }
- function _resume (client, sync) {
- while (true) {
- if (client.destroyed) {
- assert(client[kPending] === 0)
- return
- }
- if (client[kClosedResolve] && !client[kSize]) {
- client[kClosedResolve]()
- client[kClosedResolve] = null
- return
- }
- if (client[kHTTPContext]) {
- client[kHTTPContext].resume()
- }
- if (client[kBusy]) {
- client[kNeedDrain] = 2
- } else if (client[kNeedDrain] === 2) {
- if (sync) {
- client[kNeedDrain] = 1
- queueMicrotask(() => emitDrain(client))
- } else {
- emitDrain(client)
- }
- continue
- }
- if (client[kPending] === 0) {
- return
- }
- if (client[kRunning] >= (getPipelining(client) || 1)) {
- return
- }
- const request = client[kQueue][client[kPendingIdx]]
- if (client[kUrl].protocol === 'https:' && client[kServerName] !== request.servername) {
- if (client[kRunning] > 0) {
- return
- }
- client[kServerName] = request.servername
- client[kHTTPContext]?.destroy(new InformationalError('servername changed'), () => {
- client[kHTTPContext] = null
- resume(client)
- })
- }
- if (client[kConnecting]) {
- return
- }
- if (!client[kHTTPContext]) {
- connect(client)
- return
- }
- if (client[kHTTPContext].destroyed) {
- return
- }
- if (client[kHTTPContext].busy(request)) {
- return
- }
- if (!request.aborted && client[kHTTPContext].write(request)) {
- client[kPendingIdx]++
- } else {
- client[kQueue].splice(client[kPendingIdx], 1)
- }
- }
- }
- module.exports = Client
|