retrier.mjs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. /**
  2. * @fileoverview A utility for retrying failed async method calls.
  3. */
  4. /* global setTimeout, clearTimeout */
  5. //-----------------------------------------------------------------------------
  6. // Constants
  7. //-----------------------------------------------------------------------------
  8. const MAX_TASK_TIMEOUT = 60000;
  9. const MAX_TASK_DELAY = 100;
  10. const MAX_CONCURRENCY = 1000;
  11. //-----------------------------------------------------------------------------
  12. // Helpers
  13. //-----------------------------------------------------------------------------
  14. /**
  15. * Logs a message to the console if the DEBUG environment variable is set.
  16. * @param {string} message The message to log.
  17. * @returns {void}
  18. */
  19. function debug(message) {
  20. if (globalThis?.process?.env.DEBUG === "@hwc/retry") {
  21. console.debug(message);
  22. }
  23. }
  24. /*
  25. * The following logic has been extracted from graceful-fs.
  26. *
  27. * The ISC License
  28. *
  29. * Copyright (c) 2011-2023 Isaac Z. Schlueter, Ben Noordhuis, and Contributors
  30. *
  31. * Permission to use, copy, modify, and/or distribute this software for any
  32. * purpose with or without fee is hereby granted, provided that the above
  33. * copyright notice and this permission notice appear in all copies.
  34. *
  35. * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  36. * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  37. * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  38. * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  39. * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  40. * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR
  41. * IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  42. */
  43. /**
  44. * Checks if it is time to retry a task based on the timestamp and last attempt time.
  45. * @param {RetryTask} task The task to check.
  46. * @param {number} maxDelay The maximum delay for the queue.
  47. * @returns {boolean} true if it is time to retry, false otherwise.
  48. */
  49. function isTimeToRetry(task, maxDelay) {
  50. const timeSinceLastAttempt = Date.now() - task.lastAttempt;
  51. const timeSinceStart = Math.max(task.lastAttempt - task.timestamp, 1);
  52. const desiredDelay = Math.min(timeSinceStart * 1.2, maxDelay);
  53. return timeSinceLastAttempt >= desiredDelay;
  54. }
  55. /**
  56. * Checks if it is time to bail out based on the given timestamp.
  57. * @param {RetryTask} task The task to check.
  58. * @param {number} timeout The timeout for the queue.
  59. * @returns {boolean} true if it is time to bail, false otherwise.
  60. */
  61. function isTimeToBail(task, timeout) {
  62. return task.age > timeout;
  63. }
  64. /**
  65. * Creates a new promise with resolve and reject functions.
  66. * @returns {{promise:Promise<any>, resolve:(value:any) => any, reject: (value:any) => any}} A new promise.
  67. */
  68. function createPromise() {
  69. if (Promise.withResolvers) {
  70. return Promise.withResolvers();
  71. }
  72. let resolve, reject;
  73. const promise = new Promise((res, rej) => {
  74. resolve = res;
  75. reject = rej;
  76. });
  77. if (resolve === undefined || reject === undefined) {
  78. throw new Error("Promise executor did not initialize resolve or reject.");
  79. }
  80. return { promise, resolve, reject };
  81. }
  82. /**
  83. * A class to represent a task in the retry queue.
  84. */
  85. class RetryTask {
  86. /**
  87. * The unique ID for the task.
  88. * @type {string}
  89. */
  90. id = Math.random().toString(36).slice(2);
  91. /**
  92. * The function to call.
  93. * @type {Function}
  94. */
  95. fn;
  96. /**
  97. * The error that was thrown.
  98. * @type {Error}
  99. */
  100. error;
  101. /**
  102. * The timestamp of the task.
  103. * @type {number}
  104. */
  105. timestamp = Date.now();
  106. /**
  107. * The timestamp of the last attempt.
  108. * @type {number}
  109. */
  110. lastAttempt = this.timestamp;
  111. /**
  112. * The resolve function for the promise.
  113. * @type {Function}
  114. */
  115. resolve;
  116. /**
  117. * The reject function for the promise.
  118. * @type {Function}
  119. */
  120. reject;
  121. /**
  122. * The AbortSignal to monitor for cancellation.
  123. * @type {AbortSignal|undefined}
  124. */
  125. signal;
  126. /**
  127. * Creates a new instance.
  128. * @param {Function} fn The function to call.
  129. * @param {Error} error The error that was thrown.
  130. * @param {Function} resolve The resolve function for the promise.
  131. * @param {Function} reject The reject function for the promise.
  132. * @param {AbortSignal|undefined} signal The AbortSignal to monitor for cancellation.
  133. */
  134. constructor(fn, error, resolve, reject, signal) {
  135. this.fn = fn;
  136. this.error = error;
  137. this.timestamp = Date.now();
  138. this.lastAttempt = Date.now();
  139. this.resolve = resolve;
  140. this.reject = reject;
  141. this.signal = signal;
  142. }
  143. /**
  144. * Gets the age of the task.
  145. * @returns {number} The age of the task in milliseconds.
  146. * @readonly
  147. */
  148. get age() {
  149. return Date.now() - this.timestamp;
  150. }
  151. }
  152. //-----------------------------------------------------------------------------
  153. // Exports
  154. //-----------------------------------------------------------------------------
  155. /**
  156. * A class that manages a queue of retry jobs.
  157. */
  158. class Retrier {
  159. /**
  160. * Represents the queue for processing tasks.
  161. * @type {Array<RetryTask>}
  162. */
  163. #retrying = [];
  164. /**
  165. * Represents the queue for pending tasks.
  166. * @type {Array<Function>}
  167. */
  168. #pending = [];
  169. /**
  170. * The number of tasks currently being processed.
  171. * @type {number}
  172. */
  173. #working = 0;
  174. /**
  175. * The timeout for the queue.
  176. * @type {number}
  177. */
  178. #timeout;
  179. /**
  180. * The maximum delay for the queue.
  181. * @type {number}
  182. */
  183. #maxDelay;
  184. /**
  185. * The setTimeout() timer ID.
  186. * @type {NodeJS.Timeout|undefined}
  187. */
  188. #timerId;
  189. /**
  190. * The function to call.
  191. * @type {Function}
  192. */
  193. #check;
  194. /**
  195. * The maximum number of concurrent tasks.
  196. * @type {number}
  197. */
  198. #concurrency;
  199. /**
  200. * Creates a new instance.
  201. * @param {Function} check The function to call.
  202. * @param {object} [options] The options for the instance.
  203. * @param {number} [options.timeout] The timeout for the queue.
  204. * @param {number} [options.maxDelay] The maximum delay for the queue.
  205. * @param {number} [options.concurrency] The maximum number of concurrent tasks.
  206. */
  207. constructor(check, { timeout = MAX_TASK_TIMEOUT, maxDelay = MAX_TASK_DELAY, concurrency = MAX_CONCURRENCY } = {}) {
  208. if (typeof check !== "function") {
  209. throw new Error("Missing function to check errors");
  210. }
  211. this.#check = check;
  212. this.#timeout = timeout;
  213. this.#maxDelay = maxDelay;
  214. this.#concurrency = concurrency;
  215. }
  216. /**
  217. * Gets the number of tasks waiting to be retried.
  218. * @returns {number} The number of tasks in the retry queue.
  219. */
  220. get retrying() {
  221. return this.#retrying.length;
  222. }
  223. /**
  224. * Gets the number of tasks waiting to be processed in the pending queue.
  225. * @returns {number} The number of tasks in the pending queue.
  226. */
  227. get pending() {
  228. return this.#pending.length;
  229. }
  230. /**
  231. * Gets the number of tasks currently being processed.
  232. * @returns {number} The number of tasks currently being processed.
  233. */
  234. get working() {
  235. return this.#working;
  236. }
  237. /**
  238. * Calls the function and retries if it fails.
  239. * @param {Function} fn The function to call.
  240. * @param {Object} options The options for the job.
  241. * @param {AbortSignal} [options.signal] The AbortSignal to monitor for cancellation.
  242. * @param {Promise<any>} options.promise The promise to return when the function settles.
  243. * @param {Function} options.resolve The resolve function for the promise.
  244. * @param {Function} options.reject The reject function for the promise.
  245. * @returns {Promise<any>} A promise that resolves when the function is
  246. * called successfully.
  247. */
  248. #call(fn, { signal, promise, resolve, reject }) {
  249. let result;
  250. try {
  251. result = fn();
  252. } catch (/** @type {any} */ error) {
  253. reject(new Error(`Synchronous error: ${error.message}`, { cause: error }));
  254. return promise;
  255. }
  256. // if the result is not a promise then reject an error
  257. if (!result || typeof result.then !== "function") {
  258. reject(new Error("Result is not a promise."));
  259. return promise;
  260. }
  261. this.#working++;
  262. promise.finally(() => {
  263. this.#working--;
  264. this.#processPending();
  265. });
  266. // call the original function and catch any ENFILE or EMFILE errors
  267. // @ts-ignore because we know it's any
  268. return Promise.resolve(result)
  269. .then(value => {
  270. debug("Function called successfully without retry.");
  271. resolve(value);
  272. return promise;
  273. })
  274. .catch(error => {
  275. if (!this.#check(error)) {
  276. reject(error);
  277. return promise;
  278. }
  279. const task = new RetryTask(fn, error, resolve, reject, signal);
  280. debug(`Function failed, queuing for retry with task ${task.id}.`);
  281. this.#retrying.push(task);
  282. signal?.addEventListener("abort", () => {
  283. debug(`Task ${task.id} was aborted due to AbortSignal.`);
  284. reject(signal.reason);
  285. });
  286. this.#processQueue();
  287. return promise;
  288. });
  289. }
  290. /**
  291. * Adds a new retry job to the queue.
  292. * @param {Function} fn The function to call.
  293. * @param {object} [options] The options for the job.
  294. * @param {AbortSignal} [options.signal] The AbortSignal to monitor for cancellation.
  295. * @returns {Promise<any>} A promise that resolves when the queue is
  296. * processed.
  297. */
  298. retry(fn, { signal } = {}) {
  299. signal?.throwIfAborted();
  300. const { promise, resolve, reject } = createPromise();
  301. this.#pending.push(() => this.#call(fn, { signal, promise, resolve, reject }));
  302. this.#processPending();
  303. return promise;
  304. }
  305. /**
  306. * Processes the pending queue and the retry queue.
  307. * @returns {void}
  308. */
  309. #processAll() {
  310. if (this.pending) {
  311. this.#processPending();
  312. }
  313. if (this.retrying) {
  314. this.#processQueue();
  315. }
  316. }
  317. /**
  318. * Processes the pending queue to see which tasks can be started.
  319. * @returns {void}
  320. */
  321. #processPending() {
  322. debug(`Processing pending tasks: ${this.pending} pending, ${this.working} working.`);
  323. const available = this.#concurrency - this.working;
  324. if (available <= 0) {
  325. return;
  326. }
  327. const count = Math.min(this.pending, available);
  328. for (let i = 0; i < count; i++) {
  329. const task = this.#pending.shift();
  330. task?.();
  331. }
  332. debug(`Processed pending tasks: ${this.pending} pending, ${this.working} working.`);
  333. }
  334. /**
  335. * Processes the queue.
  336. * @returns {void}
  337. */
  338. #processQueue() {
  339. // clear any timer because we're going to check right now
  340. clearTimeout(this.#timerId);
  341. this.#timerId = undefined;
  342. debug(`Processing retry queue: ${this.retrying} retrying, ${this.working} working.`);
  343. const processAgain = () => {
  344. this.#timerId = setTimeout(() => this.#processAll(), 0);
  345. };
  346. // if there's nothing in the queue, we're done
  347. const task = this.#retrying.shift();
  348. if (!task) {
  349. debug("Queue is empty, exiting.");
  350. if (this.pending) {
  351. processAgain();
  352. }
  353. return;
  354. }
  355. // if it's time to bail, then bail
  356. if (isTimeToBail(task, this.#timeout)) {
  357. debug(`Task ${task.id} was abandoned due to timeout.`);
  358. task.reject(task.error);
  359. processAgain();
  360. return;
  361. }
  362. // if it's not time to retry, then wait and try again
  363. if (!isTimeToRetry(task, this.#maxDelay)) {
  364. debug(`Task ${task.id} is not ready to retry, skipping.`);
  365. this.#retrying.push(task);
  366. processAgain();
  367. return;
  368. }
  369. // otherwise, try again
  370. task.lastAttempt = Date.now();
  371. // Promise.resolve needed in case it's a thenable but not a Promise
  372. Promise.resolve(task.fn())
  373. // @ts-ignore because we know it's any
  374. .then(result => {
  375. debug(`Task ${task.id} succeeded after ${task.age}ms.`);
  376. task.resolve(result);
  377. })
  378. // @ts-ignore because we know it's any
  379. .catch(error => {
  380. if (!this.#check(error)) {
  381. debug(`Task ${task.id} failed with non-retryable error: ${error.message}.`);
  382. task.reject(error);
  383. return;
  384. }
  385. // update the task timestamp and push to back of queue to try again
  386. task.lastAttempt = Date.now();
  387. this.#retrying.push(task);
  388. debug(`Task ${task.id} failed, requeueing to try again.`);
  389. })
  390. .finally(() => {
  391. this.#processAll();
  392. });
  393. }
  394. }
  395. export { Retrier };