agent.js 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. 'use strict'
  2. const { InvalidArgumentError } = require('../core/errors')
  3. const { kClients, kRunning, kClose, kDestroy, kDispatch, kInterceptors } = require('../core/symbols')
  4. const DispatcherBase = require('./dispatcher-base')
  5. const Pool = require('./pool')
  6. const Client = require('./client')
  7. const util = require('../core/util')
  8. const createRedirectInterceptor = require('../interceptor/redirect-interceptor')
  9. const kOnConnect = Symbol('onConnect')
  10. const kOnDisconnect = Symbol('onDisconnect')
  11. const kOnConnectionError = Symbol('onConnectionError')
  12. const kMaxRedirections = Symbol('maxRedirections')
  13. const kOnDrain = Symbol('onDrain')
  14. const kFactory = Symbol('factory')
  15. const kOptions = Symbol('options')
  16. function defaultFactory (origin, opts) {
  17. return opts && opts.connections === 1
  18. ? new Client(origin, opts)
  19. : new Pool(origin, opts)
  20. }
  21. class Agent extends DispatcherBase {
  22. constructor ({ factory = defaultFactory, maxRedirections = 0, connect, ...options } = {}) {
  23. super()
  24. if (typeof factory !== 'function') {
  25. throw new InvalidArgumentError('factory must be a function.')
  26. }
  27. if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
  28. throw new InvalidArgumentError('connect must be a function or an object')
  29. }
  30. if (!Number.isInteger(maxRedirections) || maxRedirections < 0) {
  31. throw new InvalidArgumentError('maxRedirections must be a positive number')
  32. }
  33. if (connect && typeof connect !== 'function') {
  34. connect = { ...connect }
  35. }
  36. this[kInterceptors] = options.interceptors?.Agent && Array.isArray(options.interceptors.Agent)
  37. ? options.interceptors.Agent
  38. : [createRedirectInterceptor({ maxRedirections })]
  39. this[kOptions] = { ...util.deepClone(options), connect }
  40. this[kOptions].interceptors = options.interceptors
  41. ? { ...options.interceptors }
  42. : undefined
  43. this[kMaxRedirections] = maxRedirections
  44. this[kFactory] = factory
  45. this[kClients] = new Map()
  46. this[kOnDrain] = (origin, targets) => {
  47. this.emit('drain', origin, [this, ...targets])
  48. }
  49. this[kOnConnect] = (origin, targets) => {
  50. this.emit('connect', origin, [this, ...targets])
  51. }
  52. this[kOnDisconnect] = (origin, targets, err) => {
  53. this.emit('disconnect', origin, [this, ...targets], err)
  54. }
  55. this[kOnConnectionError] = (origin, targets, err) => {
  56. this.emit('connectionError', origin, [this, ...targets], err)
  57. }
  58. }
  59. get [kRunning] () {
  60. let ret = 0
  61. for (const client of this[kClients].values()) {
  62. ret += client[kRunning]
  63. }
  64. return ret
  65. }
  66. [kDispatch] (opts, handler) {
  67. let key
  68. if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
  69. key = String(opts.origin)
  70. } else {
  71. throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
  72. }
  73. let dispatcher = this[kClients].get(key)
  74. if (!dispatcher) {
  75. dispatcher = this[kFactory](opts.origin, this[kOptions])
  76. .on('drain', this[kOnDrain])
  77. .on('connect', this[kOnConnect])
  78. .on('disconnect', this[kOnDisconnect])
  79. .on('connectionError', this[kOnConnectionError])
  80. // This introduces a tiny memory leak, as dispatchers are never removed from the map.
  81. // TODO(mcollina): remove te timer when the client/pool do not have any more
  82. // active connections.
  83. this[kClients].set(key, dispatcher)
  84. }
  85. return dispatcher.dispatch(opts, handler)
  86. }
  87. async [kClose] () {
  88. const closePromises = []
  89. for (const client of this[kClients].values()) {
  90. closePromises.push(client.close())
  91. }
  92. this[kClients].clear()
  93. await Promise.all(closePromises)
  94. }
  95. async [kDestroy] (err) {
  96. const destroyPromises = []
  97. for (const client of this[kClients].values()) {
  98. destroyPromises.push(client.destroy(err))
  99. }
  100. this[kClients].clear()
  101. await Promise.all(destroyPromises)
  102. }
  103. }
  104. module.exports = Agent