index.js 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. export default async function pMap(
  2. iterable,
  3. mapper,
  4. {
  5. concurrency = Number.POSITIVE_INFINITY,
  6. stopOnError = true,
  7. signal,
  8. } = {},
  9. ) {
  10. return new Promise((resolve_, reject_) => {
  11. if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) {
  12. throw new TypeError(`Expected \`input\` to be either an \`Iterable\` or \`AsyncIterable\`, got (${typeof iterable})`);
  13. }
  14. if (typeof mapper !== 'function') {
  15. throw new TypeError('Mapper function is required');
  16. }
  17. if (!((Number.isSafeInteger(concurrency) && concurrency >= 1) || concurrency === Number.POSITIVE_INFINITY)) {
  18. throw new TypeError(`Expected \`concurrency\` to be an integer from 1 and up or \`Infinity\`, got \`${concurrency}\` (${typeof concurrency})`);
  19. }
  20. const result = [];
  21. const errors = [];
  22. const skippedIndexesMap = new Map();
  23. let isRejected = false;
  24. let isResolved = false;
  25. let isIterableDone = false;
  26. let resolvingCount = 0;
  27. let currentIndex = 0;
  28. const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator]();
  29. const signalListener = () => {
  30. reject(signal.reason);
  31. };
  32. const cleanup = () => {
  33. signal?.removeEventListener('abort', signalListener);
  34. };
  35. const resolve = value => {
  36. resolve_(value);
  37. cleanup();
  38. };
  39. const reject = reason => {
  40. isRejected = true;
  41. isResolved = true;
  42. reject_(reason);
  43. cleanup();
  44. };
  45. if (signal) {
  46. if (signal.aborted) {
  47. reject(signal.reason);
  48. }
  49. signal.addEventListener('abort', signalListener, {once: true});
  50. }
  51. const next = async () => {
  52. if (isResolved) {
  53. return;
  54. }
  55. const nextItem = await iterator.next();
  56. const index = currentIndex;
  57. currentIndex++;
  58. // Note: `iterator.next()` can be called many times in parallel.
  59. // This can cause multiple calls to this `next()` function to
  60. // receive a `nextItem` with `done === true`.
  61. // The shutdown logic that rejects/resolves must be protected
  62. // so it runs only one time as the `skippedIndex` logic is
  63. // non-idempotent.
  64. if (nextItem.done) {
  65. isIterableDone = true;
  66. if (resolvingCount === 0 && !isResolved) {
  67. if (!stopOnError && errors.length > 0) {
  68. reject(new AggregateError(errors)); // eslint-disable-line unicorn/error-message
  69. return;
  70. }
  71. isResolved = true;
  72. if (skippedIndexesMap.size === 0) {
  73. resolve(result);
  74. return;
  75. }
  76. const pureResult = [];
  77. // Support multiple `pMapSkip`'s.
  78. for (const [index, value] of result.entries()) {
  79. if (skippedIndexesMap.get(index) === pMapSkip) {
  80. continue;
  81. }
  82. pureResult.push(value);
  83. }
  84. resolve(pureResult);
  85. }
  86. return;
  87. }
  88. resolvingCount++;
  89. // Intentionally detached
  90. (async () => {
  91. try {
  92. const element = await nextItem.value;
  93. if (isResolved) {
  94. return;
  95. }
  96. const value = await mapper(element, index);
  97. // Use Map to stage the index of the element.
  98. if (value === pMapSkip) {
  99. skippedIndexesMap.set(index, value);
  100. }
  101. result[index] = value;
  102. resolvingCount--;
  103. await next();
  104. } catch (error) {
  105. if (stopOnError) {
  106. reject(error);
  107. } else {
  108. errors.push(error);
  109. resolvingCount--;
  110. // In that case we can't really continue regardless of `stopOnError` state
  111. // since an iterable is likely to continue throwing after it throws once.
  112. // If we continue calling `next()` indefinitely we will likely end up
  113. // in an infinite loop of failed iteration.
  114. try {
  115. await next();
  116. } catch (error) {
  117. reject(error);
  118. }
  119. }
  120. }
  121. })();
  122. };
  123. // Create the concurrent runners in a detached (non-awaited)
  124. // promise. We need this so we can await the `next()` calls
  125. // to stop creating runners before hitting the concurrency limit
  126. // if the iterable has already been marked as done.
  127. // NOTE: We *must* do this for async iterators otherwise we'll spin up
  128. // infinite `next()` calls by default and never start the event loop.
  129. (async () => {
  130. for (let index = 0; index < concurrency; index++) {
  131. try {
  132. // eslint-disable-next-line no-await-in-loop
  133. await next();
  134. } catch (error) {
  135. reject(error);
  136. break;
  137. }
  138. if (isIterableDone || isRejected) {
  139. break;
  140. }
  141. }
  142. })();
  143. });
  144. }
  145. export function pMapIterable(
  146. iterable,
  147. mapper,
  148. {
  149. concurrency = Number.POSITIVE_INFINITY,
  150. backpressure = concurrency,
  151. } = {},
  152. ) {
  153. if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) {
  154. throw new TypeError(`Expected \`input\` to be either an \`Iterable\` or \`AsyncIterable\`, got (${typeof iterable})`);
  155. }
  156. if (typeof mapper !== 'function') {
  157. throw new TypeError('Mapper function is required');
  158. }
  159. if (!((Number.isSafeInteger(concurrency) && concurrency >= 1) || concurrency === Number.POSITIVE_INFINITY)) {
  160. throw new TypeError(`Expected \`concurrency\` to be an integer from 1 and up or \`Infinity\`, got \`${concurrency}\` (${typeof concurrency})`);
  161. }
  162. if (!((Number.isSafeInteger(backpressure) && backpressure >= concurrency) || backpressure === Number.POSITIVE_INFINITY)) {
  163. throw new TypeError(`Expected \`backpressure\` to be an integer from \`concurrency\` (${concurrency}) and up or \`Infinity\`, got \`${backpressure}\` (${typeof backpressure})`);
  164. }
  165. return {
  166. async * [Symbol.asyncIterator]() {
  167. const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator]();
  168. const promises = [];
  169. let runningMappersCount = 0;
  170. let isDone = false;
  171. let index = 0;
  172. function trySpawn() {
  173. if (isDone || !(runningMappersCount < concurrency && promises.length < backpressure)) {
  174. return;
  175. }
  176. const promise = (async () => {
  177. const {done, value} = await iterator.next();
  178. if (done) {
  179. return {done: true};
  180. }
  181. runningMappersCount++;
  182. // Spawn if still below concurrency and backpressure limit
  183. trySpawn();
  184. try {
  185. const returnValue = await mapper(await value, index++);
  186. runningMappersCount--;
  187. if (returnValue === pMapSkip) {
  188. const index = promises.indexOf(promise);
  189. if (index > 0) {
  190. promises.splice(index, 1);
  191. }
  192. }
  193. // Spawn if still below backpressure limit and just dropped below concurrency limit
  194. trySpawn();
  195. return {done: false, value: returnValue};
  196. } catch (error) {
  197. isDone = true;
  198. return {error};
  199. }
  200. })();
  201. promises.push(promise);
  202. }
  203. trySpawn();
  204. while (promises.length > 0) {
  205. const {error, done, value} = await promises[0]; // eslint-disable-line no-await-in-loop
  206. promises.shift();
  207. if (error) {
  208. throw error;
  209. }
  210. if (done) {
  211. return;
  212. }
  213. // Spawn if just dropped below backpressure limit and below the concurrency limit
  214. trySpawn();
  215. if (value === pMapSkip) {
  216. continue;
  217. }
  218. yield value;
  219. }
  220. },
  221. };
  222. }
  223. export const pMapSkip = Symbol('skip');