Pool.js 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744
  1. "use strict";
  2. const EventEmitter = require("events").EventEmitter;
  3. const factoryValidator = require("./factoryValidator");
  4. const PoolOptions = require("./PoolOptions");
  5. const ResourceRequest = require("./ResourceRequest");
  6. const ResourceLoan = require("./ResourceLoan");
  7. const PooledResource = require("./PooledResource");
  8. const DefaultEvictor = require("./DefaultEvictor");
  9. const Deque = require("./Deque");
  10. const Deferred = require("./Deferred");
  11. const PriorityQueue = require("./PriorityQueue");
  12. const DequeIterator = require("./DequeIterator");
  13. const reflector = require("./utils").reflector;
  14. /**
  15. * TODO: move me
  16. */
  17. const FACTORY_CREATE_ERROR = "factoryCreateError";
  18. const FACTORY_DESTROY_ERROR = "factoryDestroyError";
  19. class Pool extends EventEmitter {
  20. /**
  21. * Generate an Object pool with a specified `factory` and `config`.
  22. *
  23. * @param {typeof DefaultEvictor} Evictor
  24. * @param {typeof Deque} Deque
  25. * @param {typeof PriorityQueue} PriorityQueue
  26. * @param {Object} factory
  27. * Factory to be used for generating and destroying the items.
  28. * @param {Function} factory.create
  29. * Should create the item to be acquired,
  30. * and call it's first callback argument with the generated item as it's argument.
  31. * @param {Function} factory.destroy
  32. * Should gently close any resources that the item is using.
  33. * Called before the items is destroyed.
  34. * @param {Function} factory.validate
  35. * Test if a resource is still valid .Should return a promise that resolves to a boolean, true if resource is still valid and false
  36. * If it should be removed from pool.
  37. * @param {Object} options
  38. */
  39. constructor(Evictor, Deque, PriorityQueue, factory, options) {
  40. super();
  41. factoryValidator(factory);
  42. this._config = new PoolOptions(options);
  43. // TODO: fix up this ugly glue-ing
  44. this._Promise = this._config.Promise;
  45. this._factory = factory;
  46. this._draining = false;
  47. this._started = false;
  48. /**
  49. * Holds waiting clients
  50. * @type {PriorityQueue}
  51. */
  52. this._waitingClientsQueue = new PriorityQueue(this._config.priorityRange);
  53. /**
  54. * Collection of promises for resource creation calls made by the pool to factory.create
  55. * @type {Set}
  56. */
  57. this._factoryCreateOperations = new Set();
  58. /**
  59. * Collection of promises for resource destruction calls made by the pool to factory.destroy
  60. * @type {Set}
  61. */
  62. this._factoryDestroyOperations = new Set();
  63. /**
  64. * A queue/stack of pooledResources awaiting acquisition
  65. * TODO: replace with LinkedList backed array
  66. * @type {Deque}
  67. */
  68. this._availableObjects = new Deque();
  69. /**
  70. * Collection of references for any resource that are undergoing validation before being acquired
  71. * @type {Set}
  72. */
  73. this._testOnBorrowResources = new Set();
  74. /**
  75. * Collection of references for any resource that are undergoing validation before being returned
  76. * @type {Set}
  77. */
  78. this._testOnReturnResources = new Set();
  79. /**
  80. * Collection of promises for any validations currently in process
  81. * @type {Set}
  82. */
  83. this._validationOperations = new Set();
  84. /**
  85. * All objects associated with this pool in any state (except destroyed)
  86. * @type {Set}
  87. */
  88. this._allObjects = new Set();
  89. /**
  90. * Loans keyed by the borrowed resource
  91. * @type {Map}
  92. */
  93. this._resourceLoans = new Map();
  94. /**
  95. * Infinitely looping iterator over available object
  96. * @type {DequeIterator}
  97. */
  98. this._evictionIterator = this._availableObjects.iterator();
  99. this._evictor = new Evictor();
  100. /**
  101. * handle for setTimeout for next eviction run
  102. * @type {(number|null)}
  103. */
  104. this._scheduledEviction = null;
  105. // create initial resources (if factory.min > 0)
  106. if (this._config.autostart === true) {
  107. this.start();
  108. }
  109. }
  110. _destroy(pooledResource) {
  111. // FIXME: do we need another state for "in destruction"?
  112. pooledResource.invalidate();
  113. this._allObjects.delete(pooledResource);
  114. // NOTE: this maybe very bad promise usage?
  115. const destroyPromise = this._factory.destroy(pooledResource.obj);
  116. const wrappedDestroyPromise = this._config.destroyTimeoutMillis
  117. ? this._Promise.resolve(this._applyDestroyTimeout(destroyPromise))
  118. : this._Promise.resolve(destroyPromise);
  119. this._trackOperation(
  120. wrappedDestroyPromise,
  121. this._factoryDestroyOperations
  122. ).catch(reason => {
  123. this.emit(FACTORY_DESTROY_ERROR, reason);
  124. });
  125. // TODO: maybe ensuring minimum pool size should live outside here
  126. this._ensureMinimum();
  127. }
  128. _applyDestroyTimeout(promise) {
  129. const timeoutPromise = new this._Promise((resolve, reject) => {
  130. setTimeout(() => {
  131. reject(new Error("destroy timed out"));
  132. }, this._config.destroyTimeoutMillis).unref();
  133. });
  134. return this._Promise.race([timeoutPromise, promise]);
  135. }
  136. /**
  137. * Attempt to move an available resource into test and then onto a waiting client
  138. * @return {Boolean} could we move an available resource into test
  139. */
  140. _testOnBorrow() {
  141. if (this._availableObjects.length < 1) {
  142. return false;
  143. }
  144. const pooledResource = this._availableObjects.shift();
  145. // Mark the resource as in test
  146. pooledResource.test();
  147. this._testOnBorrowResources.add(pooledResource);
  148. const validationPromise = this._factory.validate(pooledResource.obj);
  149. const wrappedValidationPromise = this._Promise.resolve(validationPromise);
  150. this._trackOperation(
  151. wrappedValidationPromise,
  152. this._validationOperations
  153. ).then(isValid => {
  154. this._testOnBorrowResources.delete(pooledResource);
  155. if (isValid === false) {
  156. pooledResource.invalidate();
  157. this._destroy(pooledResource);
  158. this._dispense();
  159. return;
  160. }
  161. this._dispatchPooledResourceToNextWaitingClient(pooledResource);
  162. });
  163. return true;
  164. }
  165. /**
  166. * Attempt to move an available resource to a waiting client
  167. * @return {Boolean} [description]
  168. */
  169. _dispatchResource() {
  170. if (this._availableObjects.length < 1) {
  171. return false;
  172. }
  173. const pooledResource = this._availableObjects.shift();
  174. this._dispatchPooledResourceToNextWaitingClient(pooledResource);
  175. return false;
  176. }
  177. /**
  178. * Attempt to resolve an outstanding resource request using an available resource from
  179. * the pool, or creating new ones
  180. *
  181. * @private
  182. */
  183. _dispense() {
  184. /**
  185. * Local variables for ease of reading/writing
  186. * these don't (shouldn't) change across the execution of this fn
  187. */
  188. const numWaitingClients = this._waitingClientsQueue.length;
  189. // If there aren't any waiting requests then there is nothing to do
  190. // so lets short-circuit
  191. if (numWaitingClients < 1) {
  192. return;
  193. }
  194. const resourceShortfall =
  195. numWaitingClients - this._potentiallyAllocableResourceCount;
  196. const actualNumberOfResourcesToCreate = Math.min(
  197. this.spareResourceCapacity,
  198. resourceShortfall
  199. );
  200. for (let i = 0; actualNumberOfResourcesToCreate > i; i++) {
  201. this._createResource();
  202. }
  203. // If we are doing test-on-borrow see how many more resources need to be moved into test
  204. // to help satisfy waitingClients
  205. if (this._config.testOnBorrow === true) {
  206. // how many available resources do we need to shift into test
  207. const desiredNumberOfResourcesToMoveIntoTest =
  208. numWaitingClients - this._testOnBorrowResources.size;
  209. const actualNumberOfResourcesToMoveIntoTest = Math.min(
  210. this._availableObjects.length,
  211. desiredNumberOfResourcesToMoveIntoTest
  212. );
  213. for (let i = 0; actualNumberOfResourcesToMoveIntoTest > i; i++) {
  214. this._testOnBorrow();
  215. }
  216. }
  217. // if we aren't testing-on-borrow then lets try to allocate what we can
  218. if (this._config.testOnBorrow === false) {
  219. const actualNumberOfResourcesToDispatch = Math.min(
  220. this._availableObjects.length,
  221. numWaitingClients
  222. );
  223. for (let i = 0; actualNumberOfResourcesToDispatch > i; i++) {
  224. this._dispatchResource();
  225. }
  226. }
  227. }
  228. /**
  229. * Dispatches a pooledResource to the next waiting client (if any) else
  230. * puts the PooledResource back on the available list
  231. * @param {PooledResource} pooledResource [description]
  232. * @return {Boolean} [description]
  233. */
  234. _dispatchPooledResourceToNextWaitingClient(pooledResource) {
  235. const clientResourceRequest = this._waitingClientsQueue.dequeue();
  236. if (
  237. clientResourceRequest === undefined ||
  238. clientResourceRequest.state !== Deferred.PENDING
  239. ) {
  240. // While we were away either all the waiting clients timed out
  241. // or were somehow fulfilled. put our pooledResource back.
  242. this._addPooledResourceToAvailableObjects(pooledResource);
  243. // TODO: do need to trigger anything before we leave?
  244. return false;
  245. }
  246. const loan = new ResourceLoan(pooledResource, this._Promise);
  247. this._resourceLoans.set(pooledResource.obj, loan);
  248. pooledResource.allocate();
  249. clientResourceRequest.resolve(pooledResource.obj);
  250. return true;
  251. }
  252. /**
  253. * tracks on operation using given set
  254. * handles adding/removing from the set and resolve/rejects the value/reason
  255. * @param {Promise} operation
  256. * @param {Set} set Set holding operations
  257. * @return {Promise} Promise that resolves once operation has been removed from set
  258. */
  259. _trackOperation(operation, set) {
  260. set.add(operation);
  261. return operation.then(
  262. v => {
  263. set.delete(operation);
  264. return this._Promise.resolve(v);
  265. },
  266. e => {
  267. set.delete(operation);
  268. return this._Promise.reject(e);
  269. }
  270. );
  271. }
  272. /**
  273. * @private
  274. */
  275. _createResource() {
  276. // An attempt to create a resource
  277. const factoryPromise = this._factory.create();
  278. const wrappedFactoryPromise = this._Promise
  279. .resolve(factoryPromise)
  280. .then(resource => {
  281. const pooledResource = new PooledResource(resource);
  282. this._allObjects.add(pooledResource);
  283. this._addPooledResourceToAvailableObjects(pooledResource);
  284. });
  285. this._trackOperation(wrappedFactoryPromise, this._factoryCreateOperations)
  286. .then(() => {
  287. this._dispense();
  288. // Stop bluebird complaining about this side-effect only handler
  289. // - a promise was created in a handler but was not returned from it
  290. // https://goo.gl/rRqMUw
  291. return null;
  292. })
  293. .catch(reason => {
  294. this.emit(FACTORY_CREATE_ERROR, reason);
  295. this._dispense();
  296. });
  297. }
  298. /**
  299. * @private
  300. */
  301. _ensureMinimum() {
  302. if (this._draining === true) {
  303. return;
  304. }
  305. const minShortfall = this._config.min - this._count;
  306. for (let i = 0; i < minShortfall; i++) {
  307. this._createResource();
  308. }
  309. }
  310. _evict() {
  311. const testsToRun = Math.min(
  312. this._config.numTestsPerEvictionRun,
  313. this._availableObjects.length
  314. );
  315. const evictionConfig = {
  316. softIdleTimeoutMillis: this._config.softIdleTimeoutMillis,
  317. idleTimeoutMillis: this._config.idleTimeoutMillis,
  318. min: this._config.min
  319. };
  320. for (let testsHaveRun = 0; testsHaveRun < testsToRun; ) {
  321. const iterationResult = this._evictionIterator.next();
  322. // Safety check incase we could get stuck in infinite loop because we
  323. // somehow emptied the array after checking its length.
  324. if (iterationResult.done === true && this._availableObjects.length < 1) {
  325. this._evictionIterator.reset();
  326. return;
  327. }
  328. // If this happens it should just mean we reached the end of the
  329. // list and can reset the cursor.
  330. if (iterationResult.done === true && this._availableObjects.length > 0) {
  331. this._evictionIterator.reset();
  332. continue;
  333. }
  334. const resource = iterationResult.value;
  335. const shouldEvict = this._evictor.evict(
  336. evictionConfig,
  337. resource,
  338. this._availableObjects.length
  339. );
  340. testsHaveRun++;
  341. if (shouldEvict === true) {
  342. // take it out of the _availableObjects list
  343. this._evictionIterator.remove();
  344. this._destroy(resource);
  345. }
  346. }
  347. }
  348. _scheduleEvictorRun() {
  349. // Start eviction if set
  350. if (this._config.evictionRunIntervalMillis > 0) {
  351. // @ts-ignore
  352. this._scheduledEviction = setTimeout(() => {
  353. this._evict();
  354. this._scheduleEvictorRun();
  355. }, this._config.evictionRunIntervalMillis).unref();
  356. }
  357. }
  358. _descheduleEvictorRun() {
  359. if (this._scheduledEviction) {
  360. clearTimeout(this._scheduledEviction);
  361. }
  362. this._scheduledEviction = null;
  363. }
  364. start() {
  365. if (this._draining === true) {
  366. return;
  367. }
  368. if (this._started === true) {
  369. return;
  370. }
  371. this._started = true;
  372. this._scheduleEvictorRun();
  373. this._ensureMinimum();
  374. }
  375. /**
  376. * Request a new resource. The callback will be called,
  377. * when a new resource is available, passing the resource to the callback.
  378. * TODO: should we add a seperate "acquireWithPriority" function
  379. *
  380. * @param {Number} [priority=0]
  381. * Optional. Integer between 0 and (priorityRange - 1). Specifies the priority
  382. * of the caller if there are no available resources. Lower numbers mean higher
  383. * priority.
  384. *
  385. * @returns {Promise}
  386. */
  387. acquire(priority) {
  388. if (this._started === false && this._config.autostart === false) {
  389. this.start();
  390. }
  391. if (this._draining) {
  392. return this._Promise.reject(
  393. new Error("pool is draining and cannot accept work")
  394. );
  395. }
  396. // TODO: should we defer this check till after this event loop incase "the situation" changes in the meantime
  397. if (
  398. this.spareResourceCapacity < 1 &&
  399. this._availableObjects.length < 1 &&
  400. this._config.maxWaitingClients !== undefined &&
  401. this._waitingClientsQueue.length >= this._config.maxWaitingClients
  402. ) {
  403. return this._Promise.reject(
  404. new Error("max waitingClients count exceeded")
  405. );
  406. }
  407. const resourceRequest = new ResourceRequest(
  408. this._config.acquireTimeoutMillis,
  409. this._Promise
  410. );
  411. this._waitingClientsQueue.enqueue(resourceRequest, priority);
  412. this._dispense();
  413. return resourceRequest.promise;
  414. }
  415. /**
  416. * [use method, aquires a resource, passes the resource to a user supplied function and releases it]
  417. * @param {Function} fn [a function that accepts a resource and returns a promise that resolves/rejects once it has finished using the resource]
  418. * @return {Promise} [resolves once the resource is released to the pool]
  419. */
  420. use(fn, priority) {
  421. return this.acquire(priority).then(resource => {
  422. return fn(resource).then(
  423. result => {
  424. this.release(resource);
  425. return result;
  426. },
  427. err => {
  428. this.destroy(resource);
  429. throw err;
  430. }
  431. );
  432. });
  433. }
  434. /**
  435. * Check if resource is currently on loan from the pool
  436. *
  437. * @param {Function} resource
  438. * Resource for checking.
  439. *
  440. * @returns {Boolean}
  441. * True if resource belongs to this pool and false otherwise
  442. */
  443. isBorrowedResource(resource) {
  444. return this._resourceLoans.has(resource);
  445. }
  446. /**
  447. * Return the resource to the pool when it is no longer required.
  448. *
  449. * @param {Object} resource
  450. * The acquired object to be put back to the pool.
  451. */
  452. release(resource) {
  453. // check for an outstanding loan
  454. const loan = this._resourceLoans.get(resource);
  455. if (loan === undefined) {
  456. return this._Promise.reject(
  457. new Error("Resource not currently part of this pool")
  458. );
  459. }
  460. this._resourceLoans.delete(resource);
  461. loan.resolve();
  462. const pooledResource = loan.pooledResource;
  463. pooledResource.deallocate();
  464. this._addPooledResourceToAvailableObjects(pooledResource);
  465. this._dispense();
  466. return this._Promise.resolve();
  467. }
  468. /**
  469. * Request the resource to be destroyed. The factory's destroy handler
  470. * will also be called.
  471. *
  472. * This should be called within an acquire() block as an alternative to release().
  473. *
  474. * @param {Object} resource
  475. * The acquired resource to be destoyed.
  476. */
  477. destroy(resource) {
  478. // check for an outstanding loan
  479. const loan = this._resourceLoans.get(resource);
  480. if (loan === undefined) {
  481. return this._Promise.reject(
  482. new Error("Resource not currently part of this pool")
  483. );
  484. }
  485. this._resourceLoans.delete(resource);
  486. loan.resolve();
  487. const pooledResource = loan.pooledResource;
  488. pooledResource.deallocate();
  489. this._destroy(pooledResource);
  490. this._dispense();
  491. return this._Promise.resolve();
  492. }
  493. _addPooledResourceToAvailableObjects(pooledResource) {
  494. pooledResource.idle();
  495. if (this._config.fifo === true) {
  496. this._availableObjects.push(pooledResource);
  497. } else {
  498. this._availableObjects.unshift(pooledResource);
  499. }
  500. }
  501. /**
  502. * Disallow any new acquire calls and let the request backlog dissapate.
  503. * The Pool will no longer attempt to maintain a "min" number of resources
  504. * and will only make new resources on demand.
  505. * Resolves once all resource requests are fulfilled and all resources are returned to pool and available...
  506. * Should probably be called "drain work"
  507. * @returns {Promise}
  508. */
  509. drain() {
  510. this._draining = true;
  511. return this.__allResourceRequestsSettled()
  512. .then(() => {
  513. return this.__allResourcesReturned();
  514. })
  515. .then(() => {
  516. this._descheduleEvictorRun();
  517. });
  518. }
  519. __allResourceRequestsSettled() {
  520. if (this._waitingClientsQueue.length > 0) {
  521. // wait for last waiting client to be settled
  522. // FIXME: what if they can "resolve" out of order....?
  523. return reflector(this._waitingClientsQueue.tail.promise);
  524. }
  525. return this._Promise.resolve();
  526. }
  527. // FIXME: this is a horrific mess
  528. __allResourcesReturned() {
  529. const ps = Array.from(this._resourceLoans.values())
  530. .map(loan => loan.promise)
  531. .map(reflector);
  532. return this._Promise.all(ps);
  533. }
  534. /**
  535. * Forcibly destroys all available resources regardless of timeout. Intended to be
  536. * invoked as part of a drain. Does not prevent the creation of new
  537. * resources as a result of subsequent calls to acquire.
  538. *
  539. * Note that if factory.min > 0 and the pool isn't "draining", the pool will destroy all idle resources
  540. * in the pool, but replace them with newly created resources up to the
  541. * specified factory.min value. If this is not desired, set factory.min
  542. * to zero before calling clear()
  543. *
  544. */
  545. clear() {
  546. const reflectedCreatePromises = Array.from(
  547. this._factoryCreateOperations
  548. ).map(reflector);
  549. // wait for outstanding factory.create to complete
  550. return this._Promise.all(reflectedCreatePromises).then(() => {
  551. // Destroy existing resources
  552. // @ts-ignore
  553. for (const resource of this._availableObjects) {
  554. this._destroy(resource);
  555. }
  556. const reflectedDestroyPromises = Array.from(
  557. this._factoryDestroyOperations
  558. ).map(reflector);
  559. return reflector(this._Promise.all(reflectedDestroyPromises));
  560. });
  561. }
  562. /**
  563. * Waits until the pool is ready.
  564. * We define ready by checking if the current resource number is at least
  565. * the minimum number defined.
  566. * @returns {Promise} that resolves when the minimum number is ready.
  567. */
  568. ready() {
  569. return new this._Promise(resolve => {
  570. const isReady = () => {
  571. if (this.available >= this.min) {
  572. resolve();
  573. } else {
  574. setTimeout(isReady, 100);
  575. }
  576. };
  577. isReady();
  578. });
  579. }
  580. /**
  581. * How many resources are available to allocated
  582. * (includes resources that have not been tested and may faul validation)
  583. * NOTE: internal for now as the name is awful and might not be useful to anyone
  584. * @return {Number} number of resources the pool has to allocate
  585. */
  586. get _potentiallyAllocableResourceCount() {
  587. return (
  588. this._availableObjects.length +
  589. this._testOnBorrowResources.size +
  590. this._testOnReturnResources.size +
  591. this._factoryCreateOperations.size
  592. );
  593. }
  594. /**
  595. * The combined count of the currently created objects and those in the
  596. * process of being created
  597. * Does NOT include resources in the process of being destroyed
  598. * sort of legacy...
  599. * @return {Number}
  600. */
  601. get _count() {
  602. return this._allObjects.size + this._factoryCreateOperations.size;
  603. }
  604. /**
  605. * How many more resources does the pool have room for
  606. * @return {Number} number of resources the pool could create before hitting any limits
  607. */
  608. get spareResourceCapacity() {
  609. return (
  610. this._config.max -
  611. (this._allObjects.size + this._factoryCreateOperations.size)
  612. );
  613. }
  614. /**
  615. * see _count above
  616. * @return {Number} [description]
  617. */
  618. get size() {
  619. return this._count;
  620. }
  621. /**
  622. * number of available resources
  623. * @return {Number} [description]
  624. */
  625. get available() {
  626. return this._availableObjects.length;
  627. }
  628. /**
  629. * number of resources that are currently acquired
  630. * @return {Number} [description]
  631. */
  632. get borrowed() {
  633. return this._resourceLoans.size;
  634. }
  635. /**
  636. * number of waiting acquire calls
  637. * @return {Number} [description]
  638. */
  639. get pending() {
  640. return this._waitingClientsQueue.length;
  641. }
  642. /**
  643. * maximum size of the pool
  644. * @return {Number} [description]
  645. */
  646. get max() {
  647. return this._config.max;
  648. }
  649. /**
  650. * minimum size of the pool
  651. * @return {Number} [description]
  652. */
  653. get min() {
  654. return this._config.min;
  655. }
  656. }
  657. module.exports = Pool;