vasync.js 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891
  1. /*
  2. * vasync.js: utilities for observable asynchronous control flow
  3. */
  4. var mod_assert = require('assert');
  5. var mod_events = require('events');
  6. var mod_util = require('util');
  7. var mod_verror = require('verror');
  8. /*
  9. * Public interface
  10. */
  11. exports.parallel = parallel;
  12. exports.forEachParallel = forEachParallel;
  13. exports.pipeline = pipeline;
  14. exports.tryEach = tryEach;
  15. exports.forEachPipeline = forEachPipeline;
  16. exports.filter = filter;
  17. exports.filterLimit = filterLimit;
  18. exports.filterSeries = filterSeries;
  19. exports.whilst = whilst;
  20. exports.queue = queue;
  21. exports.queuev = queuev;
  22. exports.barrier = barrier;
  23. exports.waterfall = waterfall;
  24. if (!global.setImmediate) {
  25. global.setImmediate = function (func) {
  26. var args = Array.prototype.slice.call(arguments, 1);
  27. args.unshift(0);
  28. args.unshift(func);
  29. setTimeout.apply(this, args);
  30. };
  31. }
  32. /*
  33. * This is incorporated here from jsprim because jsprim ends up pulling in a lot
  34. * of dependencies. If we end up needing more from jsprim, though, we should
  35. * add it back and rip out this function.
  36. */
  37. function isEmpty(obj)
  38. {
  39. var key;
  40. for (key in obj)
  41. return (false);
  42. return (true);
  43. }
  44. /*
  45. * Given a set of functions that complete asynchronously using the standard
  46. * callback(err, result) pattern, invoke them all and merge the results. See
  47. * README.md for details.
  48. */
  49. function parallel(args, callback)
  50. {
  51. var funcs, rv, doneOne, i;
  52. mod_assert.equal(typeof (args), 'object', '"args" must be an object');
  53. mod_assert.ok(Array.isArray(args['funcs']),
  54. '"args.funcs" must be specified and must be an array');
  55. mod_assert.equal(typeof (callback), 'function',
  56. 'callback argument must be specified and must be a function');
  57. funcs = args['funcs'].slice(0);
  58. rv = {
  59. 'operations': new Array(funcs.length),
  60. 'successes': [],
  61. 'ndone': 0,
  62. 'nerrors': 0
  63. };
  64. if (funcs.length === 0) {
  65. setImmediate(function () { callback(null, rv); });
  66. return (rv);
  67. }
  68. doneOne = function (entry) {
  69. return (function (err, result) {
  70. mod_assert.equal(entry['status'], 'pending');
  71. entry['err'] = err;
  72. entry['result'] = result;
  73. entry['status'] = err ? 'fail' : 'ok';
  74. if (err)
  75. rv['nerrors']++;
  76. else
  77. rv['successes'].push(result);
  78. if (++rv['ndone'] < funcs.length)
  79. return;
  80. var errors = rv['operations'].filter(function (ent) {
  81. return (ent['status'] == 'fail');
  82. }).map(function (ent) { return (ent['err']); });
  83. if (errors.length > 0)
  84. callback(new mod_verror.MultiError(errors), rv);
  85. else
  86. callback(null, rv);
  87. });
  88. };
  89. for (i = 0; i < funcs.length; i++) {
  90. rv['operations'][i] = {
  91. 'func': funcs[i],
  92. 'funcname': funcs[i].name || '(anon)',
  93. 'status': 'pending'
  94. };
  95. funcs[i](doneOne(rv['operations'][i]));
  96. }
  97. return (rv);
  98. }
  99. /*
  100. * Exactly like parallel, except that the input is specified as a single
  101. * function to invoke on N different inputs (rather than N functions). "args"
  102. * must have the following fields:
  103. *
  104. * func asynchronous function to invoke on each input value
  105. *
  106. * inputs array of input values
  107. */
  108. function forEachParallel(args, callback)
  109. {
  110. var func, funcs;
  111. mod_assert.equal(typeof (args), 'object', '"args" must be an object');
  112. mod_assert.equal(typeof (args['func']), 'function',
  113. '"args.func" must be specified and must be a function');
  114. mod_assert.ok(Array.isArray(args['inputs']),
  115. '"args.inputs" must be specified and must be an array');
  116. func = args['func'];
  117. funcs = args['inputs'].map(function (input) {
  118. return (function (subcallback) {
  119. return (func(input, subcallback));
  120. });
  121. });
  122. return (parallel({ 'funcs': funcs }, callback));
  123. }
  124. /*
  125. * Like parallel, but invokes functions in sequence rather than in parallel
  126. * and aborts if any function exits with failure. Arguments include:
  127. *
  128. * funcs invoke the functions in parallel
  129. *
  130. * arg first argument to each pipeline function
  131. */
  132. function pipeline(args, callback)
  133. {
  134. mod_assert.equal(typeof (args), 'object', '"args" must be an object');
  135. mod_assert.ok(Array.isArray(args['funcs']),
  136. '"args.funcs" must be specified and must be an array');
  137. var opts = {
  138. 'funcs': args['funcs'].slice(0),
  139. 'callback': callback,
  140. 'args': { impl: 'pipeline', uarg: args['arg'] },
  141. 'stop_when': 'error',
  142. 'res_type': 'rv'
  143. };
  144. return (waterfall_impl(opts));
  145. }
  146. function tryEach(funcs, callback)
  147. {
  148. mod_assert.ok(Array.isArray(funcs),
  149. '"funcs" must be specified and must be an array');
  150. mod_assert.ok(arguments.length == 1 || typeof (callback) == 'function',
  151. '"callback" must be a function');
  152. var opts = {
  153. 'funcs': funcs.slice(0),
  154. 'callback': callback,
  155. 'args': { impl: 'tryEach' },
  156. 'stop_when': 'success',
  157. 'res_type': 'array'
  158. };
  159. return (waterfall_impl(opts));
  160. }
  161. /*
  162. * Exactly like pipeline, except that the input is specified as a single
  163. * function to invoke on N different inputs (rather than N functions). "args"
  164. * must have the following fields:
  165. *
  166. * func asynchronous function to invoke on each input value
  167. *
  168. * inputs array of input values
  169. */
  170. function forEachPipeline(args, callback) {
  171. mod_assert.equal(typeof (args), 'object', '"args" must be an object');
  172. mod_assert.equal(typeof (args['func']), 'function',
  173. '"args.func" must be specified and must be a function');
  174. mod_assert.ok(Array.isArray(args['inputs']),
  175. '"args.inputs" must be specified and must be an array');
  176. mod_assert.equal(typeof (callback), 'function',
  177. 'callback argument must be specified and must be a function');
  178. var func = args['func'];
  179. var funcs = args['inputs'].map(function (input) {
  180. return (function (_, subcallback) {
  181. return (func(input, subcallback));
  182. });
  183. });
  184. return (pipeline({'funcs': funcs}, callback));
  185. }
  186. /*
  187. * async.js compatible filter, filterLimit, and filterSeries. Takes an input
  188. * array, optionally a limit, and a single function to filter an array and will
  189. * callback with a new filtered array. This is effectively an asynchronous
  190. * version of Array.prototype.filter.
  191. */
  192. function filter(inputs, filterFunc, callback) {
  193. return (filterLimit(inputs, Infinity, filterFunc, callback));
  194. }
  195. function filterSeries(inputs, filterFunc, callback) {
  196. return (filterLimit(inputs, 1, filterFunc, callback));
  197. }
  198. function filterLimit(inputs, limit, filterFunc, callback) {
  199. mod_assert.ok(Array.isArray(inputs),
  200. '"inputs" must be specified and must be an array');
  201. mod_assert.equal(typeof (limit), 'number',
  202. '"limit" must be a number');
  203. mod_assert.equal(isNaN(limit), false,
  204. '"limit" must be a number');
  205. mod_assert.equal(typeof (filterFunc), 'function',
  206. '"filterFunc" must be specified and must be a function');
  207. mod_assert.equal(typeof (callback), 'function',
  208. '"callback" argument must be specified as a function');
  209. var errors = [];
  210. var q = queue(processInput, limit);
  211. var results = [];
  212. function processInput(input, cb) {
  213. /*
  214. * If the errors array has any members, an error was
  215. * encountered in a previous invocation of filterFunc, so all
  216. * future filtering will be skipped.
  217. */
  218. if (errors.length > 0) {
  219. cb();
  220. return;
  221. }
  222. filterFunc(input.elem, function inputFiltered(err, ans) {
  223. /*
  224. * We ensure here that a filterFunc callback is only
  225. * ever invoked once.
  226. */
  227. if (results.hasOwnProperty(input.idx)) {
  228. throw (new mod_verror.VError(
  229. 'vasync.filter*: filterFunc idx %d ' +
  230. 'invoked its callback twice', input.idx));
  231. }
  232. /*
  233. * The original element, as well as the answer "ans"
  234. * (truth value) is stored to later be filtered when
  235. * all outstanding jobs are finished.
  236. */
  237. results[input.idx] = {
  238. elem: input.elem,
  239. ans: !!ans
  240. };
  241. /*
  242. * Any error encountered while filtering will result in
  243. * all future operations being skipped, and the error
  244. * object being returned in the users callback.
  245. */
  246. if (err) {
  247. errors.push(err);
  248. cb();
  249. return;
  250. }
  251. cb();
  252. });
  253. }
  254. q.once('end', function queueDrained() {
  255. if (errors.length > 0) {
  256. callback(mod_verror.errorFromList(errors));
  257. return;
  258. }
  259. /*
  260. * results is now an array of objects in the same order of the
  261. * inputs array, where each object looks like:
  262. *
  263. * {
  264. * "ans": <true|false>,
  265. * "elem": <original input element>
  266. * }
  267. *
  268. * we filter out elements that have a false "ans" value, and
  269. * then map the array to contain only the input elements.
  270. */
  271. results = results.filter(function filterFalseInputs(input) {
  272. return (input.ans);
  273. }).map(function mapInputElements(input) {
  274. return (input.elem);
  275. });
  276. callback(null, results);
  277. });
  278. inputs.forEach(function iterateInput(elem, idx) {
  279. /*
  280. * We retain the array index to ensure that order is
  281. * maintained.
  282. */
  283. q.push({
  284. elem: elem,
  285. idx: idx
  286. });
  287. });
  288. q.close();
  289. return (q);
  290. }
  291. /*
  292. * async-compatible "whilst" function, with a few notable exceptions/addons.
  293. *
  294. * 1. More strict typing of arguments (functions *must* be supplied).
  295. * 2. A callback function is required, not optional.
  296. * 3. An object is returned, not undefined.
  297. */
  298. function whilst(testFunc, iterateFunc, callback) {
  299. mod_assert.equal(typeof (testFunc), 'function',
  300. '"testFunc" must be specified and must be a function');
  301. mod_assert.equal(typeof (iterateFunc), 'function',
  302. '"iterateFunc" must be specified and must be a function');
  303. mod_assert.equal(typeof (callback), 'function',
  304. '"callback" argument must be specified as a function');
  305. /*
  306. * The object returned to the caller that provides a read-only
  307. * interface to introspect this specific invocation of "whilst".
  308. */
  309. var o = {
  310. 'finished': false,
  311. 'iterations': 0
  312. };
  313. /*
  314. * Store the last set of arguments from the final call to "iterateFunc".
  315. * The arguments will be passed to the final callback when an error is
  316. * encountered or when the testFunc returns false.
  317. */
  318. var args = [];
  319. function iterate() {
  320. var shouldContinue = testFunc();
  321. if (!shouldContinue) {
  322. /*
  323. * The test condition is false - break out of the loop.
  324. */
  325. done();
  326. return;
  327. }
  328. /* Bump iterations after testFunc but before iterateFunc. */
  329. o.iterations++;
  330. iterateFunc(function whilstIteration(err) {
  331. /* Store the latest set of arguments seen. */
  332. args = Array.prototype.slice.call(arguments);
  333. /* Any error with iterateFunc will break the loop. */
  334. if (err) {
  335. done();
  336. return;
  337. }
  338. /* Try again. */
  339. setImmediate(iterate);
  340. });
  341. }
  342. function done() {
  343. mod_assert.ok(!o.finished, 'whilst already finished');
  344. o.finished = true;
  345. callback.apply(this, args);
  346. }
  347. setImmediate(iterate);
  348. return (o);
  349. }
  350. /*
  351. * async-compatible "queue" function.
  352. */
  353. function queue(worker, concurrency)
  354. {
  355. return (new WorkQueue({
  356. 'worker': worker,
  357. 'concurrency': concurrency
  358. }));
  359. }
  360. function queuev(args)
  361. {
  362. return (new WorkQueue(args));
  363. }
  364. function WorkQueue(args)
  365. {
  366. mod_assert.ok(args.hasOwnProperty('worker'));
  367. mod_assert.equal(typeof (args['worker']), 'function');
  368. mod_assert.ok(args.hasOwnProperty('concurrency'));
  369. mod_assert.equal(typeof (args['concurrency']), 'number');
  370. mod_assert.equal(Math.floor(args['concurrency']), args['concurrency']);
  371. mod_assert.ok(args['concurrency'] > 0);
  372. mod_events.EventEmitter.call(this);
  373. this.nextid = 0;
  374. this.worker = args['worker'];
  375. this.worker_name = args['worker'].name || 'anon';
  376. this.npending = 0;
  377. this.pending = {};
  378. this.queued = [];
  379. this.closed = false;
  380. this.ended = false;
  381. /* user-settable fields inherited from "async" interface */
  382. this.concurrency = args['concurrency'];
  383. this.saturated = undefined;
  384. this.empty = undefined;
  385. this.drain = undefined;
  386. }
  387. mod_util.inherits(WorkQueue, mod_events.EventEmitter);
  388. WorkQueue.prototype.push = function (tasks, callback)
  389. {
  390. if (!Array.isArray(tasks))
  391. return (this.pushOne(tasks, callback));
  392. var wq = this;
  393. return (tasks.map(function (task) {
  394. return (wq.pushOne(task, callback));
  395. }));
  396. };
  397. WorkQueue.prototype.updateConcurrency = function (concurrency)
  398. {
  399. if (this.closed)
  400. throw new mod_verror.VError(
  401. 'update concurrency invoked after queue closed');
  402. this.concurrency = concurrency;
  403. this.dispatchNext();
  404. };
  405. WorkQueue.prototype.close = function ()
  406. {
  407. var wq = this;
  408. if (wq.closed)
  409. return;
  410. wq.closed = true;
  411. /*
  412. * If the queue is already empty, just fire the "end" event on the
  413. * next tick.
  414. */
  415. if (wq.npending === 0 && wq.queued.length === 0) {
  416. setImmediate(function () {
  417. if (!wq.ended) {
  418. wq.ended = true;
  419. wq.emit('end');
  420. }
  421. });
  422. }
  423. };
  424. /* private */
  425. WorkQueue.prototype.pushOne = function (task, callback)
  426. {
  427. if (this.closed)
  428. throw new mod_verror.VError('push invoked after queue closed');
  429. var id = ++this.nextid;
  430. var entry = { 'id': id, 'task': task, 'callback': callback };
  431. this.queued.push(entry);
  432. this.dispatchNext();
  433. return (id);
  434. };
  435. /* private */
  436. WorkQueue.prototype.dispatchNext = function ()
  437. {
  438. var wq = this;
  439. if (wq.npending === 0 && wq.queued.length === 0) {
  440. if (wq.drain)
  441. wq.drain();
  442. wq.emit('drain');
  443. /*
  444. * The queue is closed; emit the final "end"
  445. * event before we come to rest:
  446. */
  447. if (wq.closed) {
  448. wq.ended = true;
  449. wq.emit('end');
  450. }
  451. } else if (wq.queued.length > 0) {
  452. while (wq.queued.length > 0 && wq.npending < wq.concurrency) {
  453. var next = wq.queued.shift();
  454. wq.dispatch(next);
  455. if (wq.queued.length === 0) {
  456. if (wq.empty)
  457. wq.empty();
  458. wq.emit('empty');
  459. }
  460. }
  461. }
  462. };
  463. WorkQueue.prototype.dispatch = function (entry)
  464. {
  465. var wq = this;
  466. mod_assert.ok(!this.pending.hasOwnProperty(entry['id']));
  467. mod_assert.ok(this.npending < this.concurrency);
  468. mod_assert.ok(!this.ended);
  469. this.npending++;
  470. this.pending[entry['id']] = entry;
  471. if (this.npending === this.concurrency) {
  472. if (this.saturated)
  473. this.saturated();
  474. this.emit('saturated');
  475. }
  476. /*
  477. * We invoke the worker function on the next tick so that callers can
  478. * always assume that the callback is NOT invoked during the call to
  479. * push() even if the queue is not at capacity. It also avoids O(n)
  480. * stack usage when used with synchronous worker functions.
  481. */
  482. setImmediate(function () {
  483. wq.worker(entry['task'], function (err) {
  484. --wq.npending;
  485. delete (wq.pending[entry['id']]);
  486. if (entry['callback'])
  487. entry['callback'].apply(null, arguments);
  488. wq.dispatchNext();
  489. });
  490. });
  491. };
  492. WorkQueue.prototype.length = function ()
  493. {
  494. return (this.queued.length);
  495. };
  496. WorkQueue.prototype.kill = function ()
  497. {
  498. this.killed = true;
  499. this.queued = [];
  500. this.drain = undefined;
  501. this.close();
  502. };
  503. /*
  504. * Barriers coordinate multiple concurrent operations.
  505. */
  506. function barrier(args)
  507. {
  508. return (new Barrier(args));
  509. }
  510. function Barrier(args)
  511. {
  512. mod_assert.ok(!args || !args['nrecent'] ||
  513. typeof (args['nrecent']) == 'number',
  514. '"nrecent" must have type "number"');
  515. mod_events.EventEmitter.call(this);
  516. var nrecent = args && args['nrecent'] ? args['nrecent'] : 10;
  517. if (nrecent > 0) {
  518. this.nrecent = nrecent;
  519. this.recent = [];
  520. }
  521. this.pending = {};
  522. this.scheduled = false;
  523. }
  524. mod_util.inherits(Barrier, mod_events.EventEmitter);
  525. Barrier.prototype.start = function (name)
  526. {
  527. mod_assert.ok(!this.pending.hasOwnProperty(name),
  528. 'operation "' + name + '" is already pending');
  529. this.pending[name] = Date.now();
  530. };
  531. Barrier.prototype.done = function (name)
  532. {
  533. mod_assert.ok(this.pending.hasOwnProperty(name),
  534. 'operation "' + name + '" is not pending');
  535. if (this.recent) {
  536. this.recent.push({
  537. 'name': name,
  538. 'start': this.pending[name],
  539. 'done': Date.now()
  540. });
  541. if (this.recent.length > this.nrecent)
  542. this.recent.shift();
  543. }
  544. delete (this.pending[name]);
  545. /*
  546. * If we executed at least one operation and we're now empty, we should
  547. * emit "drain". But most code doesn't deal well with events being
  548. * processed while they're executing, so we actually schedule this event
  549. * for the next tick.
  550. *
  551. * We use the "scheduled" flag to avoid emitting multiple "drain" events
  552. * on consecutive ticks if the user starts and ends another task during
  553. * this tick.
  554. */
  555. if (!isEmpty(this.pending) || this.scheduled)
  556. return;
  557. this.scheduled = true;
  558. var self = this;
  559. setImmediate(function () {
  560. self.scheduled = false;
  561. /*
  562. * It's also possible that the user has started another task on
  563. * the previous tick, in which case we really shouldn't emit
  564. * "drain".
  565. */
  566. if (isEmpty(self.pending))
  567. self.emit('drain');
  568. });
  569. };
  570. /*
  571. * waterfall([ funcs ], callback): invoke each of the asynchronous functions
  572. * "funcs" in series. Each function is passed any values emitted by the
  573. * previous function (none for the first function), followed by the callback to
  574. * invoke upon completion. This callback must be invoked exactly once,
  575. * regardless of success or failure. As conventional in Node, the first
  576. * argument to the callback indicates an error (if non-null). Subsequent
  577. * arguments are passed to the next function in the "funcs" chain.
  578. *
  579. * If any function fails (i.e., calls its callback with an Error), then the
  580. * remaining functions are not invoked and "callback" is invoked with the error.
  581. *
  582. * The only difference between waterfall() and pipeline() are the arguments
  583. * passed to each function in the chain. pipeline() always passes the same
  584. * argument followed by the callback, while waterfall() passes whatever values
  585. * were emitted by the previous function followed by the callback.
  586. */
  587. function waterfall(funcs, callback)
  588. {
  589. mod_assert.ok(Array.isArray(funcs),
  590. '"funcs" must be specified and must be an array');
  591. mod_assert.ok(arguments.length == 1 || typeof (callback) == 'function',
  592. '"callback" must be a function');
  593. var opts = {
  594. 'funcs': funcs.slice(0),
  595. 'callback': callback,
  596. 'args': { impl: 'waterfall' },
  597. 'stop_when': 'error',
  598. 'res_type': 'values'
  599. };
  600. return (waterfall_impl(opts));
  601. }
  602. /*
  603. * This function is used to implement vasync-functions that need to execute a
  604. * list of functions in a sequence, but differ in how they make use of the
  605. * intermediate callbacks and finall callback, as well as under what conditions
  606. * they stop executing the functions in the list. Examples of such functions
  607. * are `pipeline`, `waterfall`, and `tryEach`. See the documentation for those
  608. * functions to see how they operate.
  609. *
  610. * This function's behavior is influenced via the `opts` object that we pass
  611. * in. This object has the following layout:
  612. *
  613. * {
  614. * 'funcs': array of functions
  615. * 'callback': the final callback
  616. * 'args': {
  617. * 'impl': 'pipeline' or 'tryEach' or 'waterfall'
  618. * 'uarg': the arg passed to each func for 'pipeline'
  619. * }
  620. * 'stop_when': 'error' or 'success'
  621. * 'res_type': 'values' or 'arrays' or 'rv'
  622. * }
  623. *
  624. * In the object, 'res_type' is used to indicate what the type of the result
  625. * values(s) is that we pass to the final callback. We secondarily use
  626. * 'args.impl' to adjust this behavior in an implementation-specific way. For
  627. * example, 'tryEach' only returns an array if it has more than 1 result passed
  628. * to the final callback. Otherwise, it passes a solitary value to the final
  629. * callback.
  630. *
  631. * In case it's not clear, 'rv' in the `res_type` member, is just the
  632. * result-value that we also return. This is the convention in functions that
  633. * originated in `vasync` (pipeline), but not in functions that originated in
  634. * `async` (waterfall, tryEach).
  635. */
  636. function waterfall_impl(opts)
  637. {
  638. mod_assert.ok(typeof (opts) === 'object');
  639. var rv, current, next;
  640. var funcs = opts.funcs;
  641. var callback = opts.callback;
  642. mod_assert.ok(Array.isArray(funcs),
  643. '"opts.funcs" must be specified and must be an array');
  644. mod_assert.ok(arguments.length == 1,
  645. 'Function "waterfall_impl" must take only 1 arg');
  646. mod_assert.ok(opts.res_type === 'values' ||
  647. opts.res_type === 'array' || opts.res_type == 'rv',
  648. '"opts.res_type" must either be "values", "array", or "rv"');
  649. mod_assert.ok(opts.stop_when === 'error' ||
  650. opts.stop_when === 'success',
  651. '"opts.stop_when" must either be "error" or "success"');
  652. mod_assert.ok(opts.args.impl === 'pipeline' ||
  653. opts.args.impl === 'waterfall' || opts.args.impl === 'tryEach',
  654. '"opts.args.impl" must be "pipeline", "waterfall", or "tryEach"');
  655. if (opts.args.impl === 'pipeline') {
  656. mod_assert.ok(typeof (opts.args.uarg) !== undefined,
  657. '"opts.args.uarg" should be defined when pipeline is used');
  658. }
  659. rv = {
  660. 'operations': funcs.map(function (func) {
  661. return ({
  662. 'func': func,
  663. 'funcname': func.name || '(anon)',
  664. 'status': 'waiting'
  665. });
  666. }),
  667. 'successes': [],
  668. 'ndone': 0,
  669. 'nerrors': 0
  670. };
  671. if (funcs.length === 0) {
  672. if (callback)
  673. setImmediate(function () {
  674. var res = (opts.args.impl === 'pipeline') ? rv
  675. : undefined;
  676. callback(null, res);
  677. });
  678. return (rv);
  679. }
  680. next = function (idx, err) {
  681. /*
  682. * Note that nfunc_args contains the args we will pass to the
  683. * next func in the func-list the user gave us. Except for
  684. * 'tryEach', which passes cb's. However, it will pass
  685. * 'nfunc_args' to its final callback -- see below.
  686. */
  687. var res_key, nfunc_args, entry, nextentry;
  688. if (err === undefined)
  689. err = null;
  690. if (idx != current) {
  691. throw (new mod_verror.VError(
  692. 'vasync.waterfall: function %d ("%s") invoked ' +
  693. 'its callback twice', idx,
  694. rv['operations'][idx].funcname));
  695. }
  696. mod_assert.equal(idx, rv['ndone'],
  697. 'idx should be equal to ndone');
  698. entry = rv['operations'][rv['ndone']++];
  699. if (opts.args.impl === 'tryEach' ||
  700. opts.args.impl === 'waterfall') {
  701. nfunc_args = Array.prototype.slice.call(arguments, 2);
  702. res_key = 'results';
  703. entry['results'] = nfunc_args;
  704. } else if (opts.args.impl === 'pipeline') {
  705. nfunc_args = [ opts.args.uarg ];
  706. res_key = 'result';
  707. entry['result'] = arguments[2];
  708. }
  709. mod_assert.equal(entry['status'], 'pending',
  710. 'status should be pending');
  711. entry['status'] = err ? 'fail' : 'ok';
  712. entry['err'] = err;
  713. if (err) {
  714. rv['nerrors']++;
  715. } else {
  716. rv['successes'].push(entry[res_key]);
  717. }
  718. if ((opts.stop_when === 'error' && err) ||
  719. (opts.stop_when === 'success' &&
  720. rv['successes'].length > 0) ||
  721. rv['ndone'] == funcs.length) {
  722. if (callback) {
  723. if (opts.res_type === 'values' ||
  724. (opts.res_type === 'array' &&
  725. nfunc_args.length <= 1)) {
  726. nfunc_args.unshift(err);
  727. callback.apply(null, nfunc_args);
  728. } else if (opts.res_type === 'array') {
  729. callback(err, nfunc_args);
  730. } else if (opts.res_type === 'rv') {
  731. callback(err, rv);
  732. }
  733. }
  734. } else {
  735. nextentry = rv['operations'][rv['ndone']];
  736. nextentry['status'] = 'pending';
  737. current++;
  738. nfunc_args.push(next.bind(null, current));
  739. setImmediate(function () {
  740. var nfunc = nextentry['func'];
  741. /*
  742. * At first glance it may seem like this branch
  743. * is superflous with the code above that
  744. * branches on `opts.args.impl`. It may also
  745. * seem like calling `nfunc.apply` is
  746. * sufficient for both cases (after all we
  747. * pushed `next.bind(null, current)` to the
  748. * `nfunc_args` array), before we call
  749. * `setImmediate()`. However, this is not the
  750. * case, because the interface exposed by
  751. * tryEach is different from the others. The
  752. * others pass argument(s) from task to task.
  753. * tryEach passes nothing but a callback
  754. * (`next.bind` below). However, the callback
  755. * itself _can_ be called with one or more
  756. * results, which we collect into `nfunc_args`
  757. * using the aformentioned `opts.args.impl`
  758. * branch above, and which we pass to the
  759. * callback via the `opts.res_type` branch
  760. * above (where res_type is set to 'array').
  761. */
  762. if (opts.args.impl !== 'tryEach') {
  763. nfunc.apply(null, nfunc_args);
  764. } else {
  765. nfunc(next.bind(null, current));
  766. }
  767. });
  768. }
  769. };
  770. rv['operations'][0]['status'] = 'pending';
  771. current = 0;
  772. if (opts.args.impl !== 'pipeline') {
  773. funcs[0](next.bind(null, current));
  774. } else {
  775. funcs[0](opts.args.uarg, next.bind(null, current));
  776. }
  777. return (rv);
  778. }