balanced-pool.js 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. 'use strict'
  2. const {
  3. BalancedPoolMissingUpstreamError,
  4. InvalidArgumentError
  5. } = require('../core/errors')
  6. const {
  7. PoolBase,
  8. kClients,
  9. kNeedDrain,
  10. kAddClient,
  11. kRemoveClient,
  12. kGetDispatcher
  13. } = require('./pool-base')
  14. const Pool = require('./pool')
  15. const { kUrl, kInterceptors } = require('../core/symbols')
  16. const { parseOrigin } = require('../core/util')
  17. const kFactory = Symbol('factory')
  18. const kOptions = Symbol('options')
  19. const kGreatestCommonDivisor = Symbol('kGreatestCommonDivisor')
  20. const kCurrentWeight = Symbol('kCurrentWeight')
  21. const kIndex = Symbol('kIndex')
  22. const kWeight = Symbol('kWeight')
  23. const kMaxWeightPerServer = Symbol('kMaxWeightPerServer')
  24. const kErrorPenalty = Symbol('kErrorPenalty')
  25. /**
  26. * Calculate the greatest common divisor of two numbers by
  27. * using the Euclidean algorithm.
  28. *
  29. * @param {number} a
  30. * @param {number} b
  31. * @returns {number}
  32. */
  33. function getGreatestCommonDivisor (a, b) {
  34. if (a === 0) return b
  35. while (b !== 0) {
  36. const t = b
  37. b = a % b
  38. a = t
  39. }
  40. return a
  41. }
  42. function defaultFactory (origin, opts) {
  43. return new Pool(origin, opts)
  44. }
  45. class BalancedPool extends PoolBase {
  46. constructor (upstreams = [], { factory = defaultFactory, ...opts } = {}) {
  47. super()
  48. this[kOptions] = opts
  49. this[kIndex] = -1
  50. this[kCurrentWeight] = 0
  51. this[kMaxWeightPerServer] = this[kOptions].maxWeightPerServer || 100
  52. this[kErrorPenalty] = this[kOptions].errorPenalty || 15
  53. if (!Array.isArray(upstreams)) {
  54. upstreams = [upstreams]
  55. }
  56. if (typeof factory !== 'function') {
  57. throw new InvalidArgumentError('factory must be a function.')
  58. }
  59. this[kInterceptors] = opts.interceptors?.BalancedPool && Array.isArray(opts.interceptors.BalancedPool)
  60. ? opts.interceptors.BalancedPool
  61. : []
  62. this[kFactory] = factory
  63. for (const upstream of upstreams) {
  64. this.addUpstream(upstream)
  65. }
  66. this._updateBalancedPoolStats()
  67. }
  68. addUpstream (upstream) {
  69. const upstreamOrigin = parseOrigin(upstream).origin
  70. if (this[kClients].find((pool) => (
  71. pool[kUrl].origin === upstreamOrigin &&
  72. pool.closed !== true &&
  73. pool.destroyed !== true
  74. ))) {
  75. return this
  76. }
  77. const pool = this[kFactory](upstreamOrigin, Object.assign({}, this[kOptions]))
  78. this[kAddClient](pool)
  79. pool.on('connect', () => {
  80. pool[kWeight] = Math.min(this[kMaxWeightPerServer], pool[kWeight] + this[kErrorPenalty])
  81. })
  82. pool.on('connectionError', () => {
  83. pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
  84. this._updateBalancedPoolStats()
  85. })
  86. pool.on('disconnect', (...args) => {
  87. const err = args[2]
  88. if (err && err.code === 'UND_ERR_SOCKET') {
  89. // decrease the weight of the pool.
  90. pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
  91. this._updateBalancedPoolStats()
  92. }
  93. })
  94. for (const client of this[kClients]) {
  95. client[kWeight] = this[kMaxWeightPerServer]
  96. }
  97. this._updateBalancedPoolStats()
  98. return this
  99. }
  100. _updateBalancedPoolStats () {
  101. let result = 0
  102. for (let i = 0; i < this[kClients].length; i++) {
  103. result = getGreatestCommonDivisor(this[kClients][i][kWeight], result)
  104. }
  105. this[kGreatestCommonDivisor] = result
  106. }
  107. removeUpstream (upstream) {
  108. const upstreamOrigin = parseOrigin(upstream).origin
  109. const pool = this[kClients].find((pool) => (
  110. pool[kUrl].origin === upstreamOrigin &&
  111. pool.closed !== true &&
  112. pool.destroyed !== true
  113. ))
  114. if (pool) {
  115. this[kRemoveClient](pool)
  116. }
  117. return this
  118. }
  119. get upstreams () {
  120. return this[kClients]
  121. .filter(dispatcher => dispatcher.closed !== true && dispatcher.destroyed !== true)
  122. .map((p) => p[kUrl].origin)
  123. }
  124. [kGetDispatcher] () {
  125. // We validate that pools is greater than 0,
  126. // otherwise we would have to wait until an upstream
  127. // is added, which might never happen.
  128. if (this[kClients].length === 0) {
  129. throw new BalancedPoolMissingUpstreamError()
  130. }
  131. const dispatcher = this[kClients].find(dispatcher => (
  132. !dispatcher[kNeedDrain] &&
  133. dispatcher.closed !== true &&
  134. dispatcher.destroyed !== true
  135. ))
  136. if (!dispatcher) {
  137. return
  138. }
  139. const allClientsBusy = this[kClients].map(pool => pool[kNeedDrain]).reduce((a, b) => a && b, true)
  140. if (allClientsBusy) {
  141. return
  142. }
  143. let counter = 0
  144. let maxWeightIndex = this[kClients].findIndex(pool => !pool[kNeedDrain])
  145. while (counter++ < this[kClients].length) {
  146. this[kIndex] = (this[kIndex] + 1) % this[kClients].length
  147. const pool = this[kClients][this[kIndex]]
  148. // find pool index with the largest weight
  149. if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) {
  150. maxWeightIndex = this[kIndex]
  151. }
  152. // decrease the current weight every `this[kClients].length`.
  153. if (this[kIndex] === 0) {
  154. // Set the current weight to the next lower weight.
  155. this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor]
  156. if (this[kCurrentWeight] <= 0) {
  157. this[kCurrentWeight] = this[kMaxWeightPerServer]
  158. }
  159. }
  160. if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) {
  161. return pool
  162. }
  163. }
  164. this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight]
  165. this[kIndex] = maxWeightIndex
  166. return this[kClients][maxWeightIndex]
  167. }
  168. }
  169. module.exports = BalancedPool