123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- 'use strict'
- const { InvalidArgumentError } = require('../core/errors')
- const { kClients, kRunning, kClose, kDestroy, kDispatch, kInterceptors } = require('../core/symbols')
- const DispatcherBase = require('./dispatcher-base')
- const Pool = require('./pool')
- const Client = require('./client')
- const util = require('../core/util')
- const createRedirectInterceptor = require('../interceptor/redirect-interceptor')
- const kOnConnect = Symbol('onConnect')
- const kOnDisconnect = Symbol('onDisconnect')
- const kOnConnectionError = Symbol('onConnectionError')
- const kMaxRedirections = Symbol('maxRedirections')
- const kOnDrain = Symbol('onDrain')
- const kFactory = Symbol('factory')
- const kOptions = Symbol('options')
- function defaultFactory (origin, opts) {
- return opts && opts.connections === 1
- ? new Client(origin, opts)
- : new Pool(origin, opts)
- }
- class Agent extends DispatcherBase {
- constructor ({ factory = defaultFactory, maxRedirections = 0, connect, ...options } = {}) {
- super()
- if (typeof factory !== 'function') {
- throw new InvalidArgumentError('factory must be a function.')
- }
- if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
- throw new InvalidArgumentError('connect must be a function or an object')
- }
- if (!Number.isInteger(maxRedirections) || maxRedirections < 0) {
- throw new InvalidArgumentError('maxRedirections must be a positive number')
- }
- if (connect && typeof connect !== 'function') {
- connect = { ...connect }
- }
- this[kInterceptors] = options.interceptors?.Agent && Array.isArray(options.interceptors.Agent)
- ? options.interceptors.Agent
- : [createRedirectInterceptor({ maxRedirections })]
- this[kOptions] = { ...util.deepClone(options), connect }
- this[kOptions].interceptors = options.interceptors
- ? { ...options.interceptors }
- : undefined
- this[kMaxRedirections] = maxRedirections
- this[kFactory] = factory
- this[kClients] = new Map()
- this[kOnDrain] = (origin, targets) => {
- this.emit('drain', origin, [this, ...targets])
- }
- this[kOnConnect] = (origin, targets) => {
- this.emit('connect', origin, [this, ...targets])
- }
- this[kOnDisconnect] = (origin, targets, err) => {
- this.emit('disconnect', origin, [this, ...targets], err)
- }
- this[kOnConnectionError] = (origin, targets, err) => {
- this.emit('connectionError', origin, [this, ...targets], err)
- }
- }
- get [kRunning] () {
- let ret = 0
- for (const client of this[kClients].values()) {
- ret += client[kRunning]
- }
- return ret
- }
- [kDispatch] (opts, handler) {
- let key
- if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
- key = String(opts.origin)
- } else {
- throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
- }
- let dispatcher = this[kClients].get(key)
- if (!dispatcher) {
- dispatcher = this[kFactory](opts.origin, this[kOptions])
- .on('drain', this[kOnDrain])
- .on('connect', this[kOnConnect])
- .on('disconnect', this[kOnDisconnect])
- .on('connectionError', this[kOnConnectionError])
- // This introduces a tiny memory leak, as dispatchers are never removed from the map.
- // TODO(mcollina): remove te timer when the client/pool do not have any more
- // active connections.
- this[kClients].set(key, dispatcher)
- }
- return dispatcher.dispatch(opts, handler)
- }
- async [kClose] () {
- const closePromises = []
- for (const client of this[kClients].values()) {
- closePromises.push(client.close())
- }
- this[kClients].clear()
- await Promise.all(closePromises)
- }
- async [kDestroy] (err) {
- const destroyPromises = []
- for (const client of this[kClients].values()) {
- destroyPromises.push(client.destroy(err))
- }
- this[kClients].clear()
- await Promise.all(destroyPromises)
- }
- }
- module.exports = Agent
|