123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891 |
- /*
- * vasync.js: utilities for observable asynchronous control flow
- */
- var mod_assert = require('assert');
- var mod_events = require('events');
- var mod_util = require('util');
- var mod_verror = require('verror');
- /*
- * Public interface
- */
- exports.parallel = parallel;
- exports.forEachParallel = forEachParallel;
- exports.pipeline = pipeline;
- exports.tryEach = tryEach;
- exports.forEachPipeline = forEachPipeline;
- exports.filter = filter;
- exports.filterLimit = filterLimit;
- exports.filterSeries = filterSeries;
- exports.whilst = whilst;
- exports.queue = queue;
- exports.queuev = queuev;
- exports.barrier = barrier;
- exports.waterfall = waterfall;
- if (!global.setImmediate) {
- global.setImmediate = function (func) {
- var args = Array.prototype.slice.call(arguments, 1);
- args.unshift(0);
- args.unshift(func);
- setTimeout.apply(this, args);
- };
- }
- /*
- * This is incorporated here from jsprim because jsprim ends up pulling in a lot
- * of dependencies. If we end up needing more from jsprim, though, we should
- * add it back and rip out this function.
- */
- function isEmpty(obj)
- {
- var key;
- for (key in obj)
- return (false);
- return (true);
- }
- /*
- * Given a set of functions that complete asynchronously using the standard
- * callback(err, result) pattern, invoke them all and merge the results. See
- * README.md for details.
- */
- function parallel(args, callback)
- {
- var funcs, rv, doneOne, i;
- mod_assert.equal(typeof (args), 'object', '"args" must be an object');
- mod_assert.ok(Array.isArray(args['funcs']),
- '"args.funcs" must be specified and must be an array');
- mod_assert.equal(typeof (callback), 'function',
- 'callback argument must be specified and must be a function');
- funcs = args['funcs'].slice(0);
- rv = {
- 'operations': new Array(funcs.length),
- 'successes': [],
- 'ndone': 0,
- 'nerrors': 0
- };
- if (funcs.length === 0) {
- setImmediate(function () { callback(null, rv); });
- return (rv);
- }
- doneOne = function (entry) {
- return (function (err, result) {
- mod_assert.equal(entry['status'], 'pending');
- entry['err'] = err;
- entry['result'] = result;
- entry['status'] = err ? 'fail' : 'ok';
- if (err)
- rv['nerrors']++;
- else
- rv['successes'].push(result);
- if (++rv['ndone'] < funcs.length)
- return;
- var errors = rv['operations'].filter(function (ent) {
- return (ent['status'] == 'fail');
- }).map(function (ent) { return (ent['err']); });
- if (errors.length > 0)
- callback(new mod_verror.MultiError(errors), rv);
- else
- callback(null, rv);
- });
- };
- for (i = 0; i < funcs.length; i++) {
- rv['operations'][i] = {
- 'func': funcs[i],
- 'funcname': funcs[i].name || '(anon)',
- 'status': 'pending'
- };
- funcs[i](doneOne(rv['operations'][i]));
- }
- return (rv);
- }
- /*
- * Exactly like parallel, except that the input is specified as a single
- * function to invoke on N different inputs (rather than N functions). "args"
- * must have the following fields:
- *
- * func asynchronous function to invoke on each input value
- *
- * inputs array of input values
- */
- function forEachParallel(args, callback)
- {
- var func, funcs;
- mod_assert.equal(typeof (args), 'object', '"args" must be an object');
- mod_assert.equal(typeof (args['func']), 'function',
- '"args.func" must be specified and must be a function');
- mod_assert.ok(Array.isArray(args['inputs']),
- '"args.inputs" must be specified and must be an array');
- func = args['func'];
- funcs = args['inputs'].map(function (input) {
- return (function (subcallback) {
- return (func(input, subcallback));
- });
- });
- return (parallel({ 'funcs': funcs }, callback));
- }
- /*
- * Like parallel, but invokes functions in sequence rather than in parallel
- * and aborts if any function exits with failure. Arguments include:
- *
- * funcs invoke the functions in parallel
- *
- * arg first argument to each pipeline function
- */
- function pipeline(args, callback)
- {
- mod_assert.equal(typeof (args), 'object', '"args" must be an object');
- mod_assert.ok(Array.isArray(args['funcs']),
- '"args.funcs" must be specified and must be an array');
- var opts = {
- 'funcs': args['funcs'].slice(0),
- 'callback': callback,
- 'args': { impl: 'pipeline', uarg: args['arg'] },
- 'stop_when': 'error',
- 'res_type': 'rv'
- };
- return (waterfall_impl(opts));
- }
- function tryEach(funcs, callback)
- {
- mod_assert.ok(Array.isArray(funcs),
- '"funcs" must be specified and must be an array');
- mod_assert.ok(arguments.length == 1 || typeof (callback) == 'function',
- '"callback" must be a function');
- var opts = {
- 'funcs': funcs.slice(0),
- 'callback': callback,
- 'args': { impl: 'tryEach' },
- 'stop_when': 'success',
- 'res_type': 'array'
- };
- return (waterfall_impl(opts));
- }
- /*
- * Exactly like pipeline, except that the input is specified as a single
- * function to invoke on N different inputs (rather than N functions). "args"
- * must have the following fields:
- *
- * func asynchronous function to invoke on each input value
- *
- * inputs array of input values
- */
- function forEachPipeline(args, callback) {
- mod_assert.equal(typeof (args), 'object', '"args" must be an object');
- mod_assert.equal(typeof (args['func']), 'function',
- '"args.func" must be specified and must be a function');
- mod_assert.ok(Array.isArray(args['inputs']),
- '"args.inputs" must be specified and must be an array');
- mod_assert.equal(typeof (callback), 'function',
- 'callback argument must be specified and must be a function');
- var func = args['func'];
- var funcs = args['inputs'].map(function (input) {
- return (function (_, subcallback) {
- return (func(input, subcallback));
- });
- });
- return (pipeline({'funcs': funcs}, callback));
- }
- /*
- * async.js compatible filter, filterLimit, and filterSeries. Takes an input
- * array, optionally a limit, and a single function to filter an array and will
- * callback with a new filtered array. This is effectively an asynchronous
- * version of Array.prototype.filter.
- */
- function filter(inputs, filterFunc, callback) {
- return (filterLimit(inputs, Infinity, filterFunc, callback));
- }
- function filterSeries(inputs, filterFunc, callback) {
- return (filterLimit(inputs, 1, filterFunc, callback));
- }
- function filterLimit(inputs, limit, filterFunc, callback) {
- mod_assert.ok(Array.isArray(inputs),
- '"inputs" must be specified and must be an array');
- mod_assert.equal(typeof (limit), 'number',
- '"limit" must be a number');
- mod_assert.equal(isNaN(limit), false,
- '"limit" must be a number');
- mod_assert.equal(typeof (filterFunc), 'function',
- '"filterFunc" must be specified and must be a function');
- mod_assert.equal(typeof (callback), 'function',
- '"callback" argument must be specified as a function');
- var errors = [];
- var q = queue(processInput, limit);
- var results = [];
- function processInput(input, cb) {
- /*
- * If the errors array has any members, an error was
- * encountered in a previous invocation of filterFunc, so all
- * future filtering will be skipped.
- */
- if (errors.length > 0) {
- cb();
- return;
- }
- filterFunc(input.elem, function inputFiltered(err, ans) {
- /*
- * We ensure here that a filterFunc callback is only
- * ever invoked once.
- */
- if (results.hasOwnProperty(input.idx)) {
- throw (new mod_verror.VError(
- 'vasync.filter*: filterFunc idx %d ' +
- 'invoked its callback twice', input.idx));
- }
- /*
- * The original element, as well as the answer "ans"
- * (truth value) is stored to later be filtered when
- * all outstanding jobs are finished.
- */
- results[input.idx] = {
- elem: input.elem,
- ans: !!ans
- };
- /*
- * Any error encountered while filtering will result in
- * all future operations being skipped, and the error
- * object being returned in the users callback.
- */
- if (err) {
- errors.push(err);
- cb();
- return;
- }
- cb();
- });
- }
- q.once('end', function queueDrained() {
- if (errors.length > 0) {
- callback(mod_verror.errorFromList(errors));
- return;
- }
- /*
- * results is now an array of objects in the same order of the
- * inputs array, where each object looks like:
- *
- * {
- * "ans": <true|false>,
- * "elem": <original input element>
- * }
- *
- * we filter out elements that have a false "ans" value, and
- * then map the array to contain only the input elements.
- */
- results = results.filter(function filterFalseInputs(input) {
- return (input.ans);
- }).map(function mapInputElements(input) {
- return (input.elem);
- });
- callback(null, results);
- });
- inputs.forEach(function iterateInput(elem, idx) {
- /*
- * We retain the array index to ensure that order is
- * maintained.
- */
- q.push({
- elem: elem,
- idx: idx
- });
- });
- q.close();
- return (q);
- }
- /*
- * async-compatible "whilst" function, with a few notable exceptions/addons.
- *
- * 1. More strict typing of arguments (functions *must* be supplied).
- * 2. A callback function is required, not optional.
- * 3. An object is returned, not undefined.
- */
- function whilst(testFunc, iterateFunc, callback) {
- mod_assert.equal(typeof (testFunc), 'function',
- '"testFunc" must be specified and must be a function');
- mod_assert.equal(typeof (iterateFunc), 'function',
- '"iterateFunc" must be specified and must be a function');
- mod_assert.equal(typeof (callback), 'function',
- '"callback" argument must be specified as a function');
- /*
- * The object returned to the caller that provides a read-only
- * interface to introspect this specific invocation of "whilst".
- */
- var o = {
- 'finished': false,
- 'iterations': 0
- };
- /*
- * Store the last set of arguments from the final call to "iterateFunc".
- * The arguments will be passed to the final callback when an error is
- * encountered or when the testFunc returns false.
- */
- var args = [];
- function iterate() {
- var shouldContinue = testFunc();
- if (!shouldContinue) {
- /*
- * The test condition is false - break out of the loop.
- */
- done();
- return;
- }
- /* Bump iterations after testFunc but before iterateFunc. */
- o.iterations++;
- iterateFunc(function whilstIteration(err) {
- /* Store the latest set of arguments seen. */
- args = Array.prototype.slice.call(arguments);
- /* Any error with iterateFunc will break the loop. */
- if (err) {
- done();
- return;
- }
- /* Try again. */
- setImmediate(iterate);
- });
- }
- function done() {
- mod_assert.ok(!o.finished, 'whilst already finished');
- o.finished = true;
- callback.apply(this, args);
- }
- setImmediate(iterate);
- return (o);
- }
- /*
- * async-compatible "queue" function.
- */
- function queue(worker, concurrency)
- {
- return (new WorkQueue({
- 'worker': worker,
- 'concurrency': concurrency
- }));
- }
- function queuev(args)
- {
- return (new WorkQueue(args));
- }
- function WorkQueue(args)
- {
- mod_assert.ok(args.hasOwnProperty('worker'));
- mod_assert.equal(typeof (args['worker']), 'function');
- mod_assert.ok(args.hasOwnProperty('concurrency'));
- mod_assert.equal(typeof (args['concurrency']), 'number');
- mod_assert.equal(Math.floor(args['concurrency']), args['concurrency']);
- mod_assert.ok(args['concurrency'] > 0);
- mod_events.EventEmitter.call(this);
- this.nextid = 0;
- this.worker = args['worker'];
- this.worker_name = args['worker'].name || 'anon';
- this.npending = 0;
- this.pending = {};
- this.queued = [];
- this.closed = false;
- this.ended = false;
- /* user-settable fields inherited from "async" interface */
- this.concurrency = args['concurrency'];
- this.saturated = undefined;
- this.empty = undefined;
- this.drain = undefined;
- }
- mod_util.inherits(WorkQueue, mod_events.EventEmitter);
- WorkQueue.prototype.push = function (tasks, callback)
- {
- if (!Array.isArray(tasks))
- return (this.pushOne(tasks, callback));
- var wq = this;
- return (tasks.map(function (task) {
- return (wq.pushOne(task, callback));
- }));
- };
- WorkQueue.prototype.updateConcurrency = function (concurrency)
- {
- if (this.closed)
- throw new mod_verror.VError(
- 'update concurrency invoked after queue closed');
- this.concurrency = concurrency;
- this.dispatchNext();
- };
- WorkQueue.prototype.close = function ()
- {
- var wq = this;
- if (wq.closed)
- return;
- wq.closed = true;
- /*
- * If the queue is already empty, just fire the "end" event on the
- * next tick.
- */
- if (wq.npending === 0 && wq.queued.length === 0) {
- setImmediate(function () {
- if (!wq.ended) {
- wq.ended = true;
- wq.emit('end');
- }
- });
- }
- };
- /* private */
- WorkQueue.prototype.pushOne = function (task, callback)
- {
- if (this.closed)
- throw new mod_verror.VError('push invoked after queue closed');
- var id = ++this.nextid;
- var entry = { 'id': id, 'task': task, 'callback': callback };
- this.queued.push(entry);
- this.dispatchNext();
- return (id);
- };
- /* private */
- WorkQueue.prototype.dispatchNext = function ()
- {
- var wq = this;
- if (wq.npending === 0 && wq.queued.length === 0) {
- if (wq.drain)
- wq.drain();
- wq.emit('drain');
- /*
- * The queue is closed; emit the final "end"
- * event before we come to rest:
- */
- if (wq.closed) {
- wq.ended = true;
- wq.emit('end');
- }
- } else if (wq.queued.length > 0) {
- while (wq.queued.length > 0 && wq.npending < wq.concurrency) {
- var next = wq.queued.shift();
- wq.dispatch(next);
- if (wq.queued.length === 0) {
- if (wq.empty)
- wq.empty();
- wq.emit('empty');
- }
- }
- }
- };
- WorkQueue.prototype.dispatch = function (entry)
- {
- var wq = this;
- mod_assert.ok(!this.pending.hasOwnProperty(entry['id']));
- mod_assert.ok(this.npending < this.concurrency);
- mod_assert.ok(!this.ended);
- this.npending++;
- this.pending[entry['id']] = entry;
- if (this.npending === this.concurrency) {
- if (this.saturated)
- this.saturated();
- this.emit('saturated');
- }
- /*
- * We invoke the worker function on the next tick so that callers can
- * always assume that the callback is NOT invoked during the call to
- * push() even if the queue is not at capacity. It also avoids O(n)
- * stack usage when used with synchronous worker functions.
- */
- setImmediate(function () {
- wq.worker(entry['task'], function (err) {
- --wq.npending;
- delete (wq.pending[entry['id']]);
- if (entry['callback'])
- entry['callback'].apply(null, arguments);
- wq.dispatchNext();
- });
- });
- };
- WorkQueue.prototype.length = function ()
- {
- return (this.queued.length);
- };
- WorkQueue.prototype.kill = function ()
- {
- this.killed = true;
- this.queued = [];
- this.drain = undefined;
- this.close();
- };
- /*
- * Barriers coordinate multiple concurrent operations.
- */
- function barrier(args)
- {
- return (new Barrier(args));
- }
- function Barrier(args)
- {
- mod_assert.ok(!args || !args['nrecent'] ||
- typeof (args['nrecent']) == 'number',
- '"nrecent" must have type "number"');
- mod_events.EventEmitter.call(this);
- var nrecent = args && args['nrecent'] ? args['nrecent'] : 10;
- if (nrecent > 0) {
- this.nrecent = nrecent;
- this.recent = [];
- }
- this.pending = {};
- this.scheduled = false;
- }
- mod_util.inherits(Barrier, mod_events.EventEmitter);
- Barrier.prototype.start = function (name)
- {
- mod_assert.ok(!this.pending.hasOwnProperty(name),
- 'operation "' + name + '" is already pending');
- this.pending[name] = Date.now();
- };
- Barrier.prototype.done = function (name)
- {
- mod_assert.ok(this.pending.hasOwnProperty(name),
- 'operation "' + name + '" is not pending');
- if (this.recent) {
- this.recent.push({
- 'name': name,
- 'start': this.pending[name],
- 'done': Date.now()
- });
- if (this.recent.length > this.nrecent)
- this.recent.shift();
- }
- delete (this.pending[name]);
- /*
- * If we executed at least one operation and we're now empty, we should
- * emit "drain". But most code doesn't deal well with events being
- * processed while they're executing, so we actually schedule this event
- * for the next tick.
- *
- * We use the "scheduled" flag to avoid emitting multiple "drain" events
- * on consecutive ticks if the user starts and ends another task during
- * this tick.
- */
- if (!isEmpty(this.pending) || this.scheduled)
- return;
- this.scheduled = true;
- var self = this;
- setImmediate(function () {
- self.scheduled = false;
- /*
- * It's also possible that the user has started another task on
- * the previous tick, in which case we really shouldn't emit
- * "drain".
- */
- if (isEmpty(self.pending))
- self.emit('drain');
- });
- };
- /*
- * waterfall([ funcs ], callback): invoke each of the asynchronous functions
- * "funcs" in series. Each function is passed any values emitted by the
- * previous function (none for the first function), followed by the callback to
- * invoke upon completion. This callback must be invoked exactly once,
- * regardless of success or failure. As conventional in Node, the first
- * argument to the callback indicates an error (if non-null). Subsequent
- * arguments are passed to the next function in the "funcs" chain.
- *
- * If any function fails (i.e., calls its callback with an Error), then the
- * remaining functions are not invoked and "callback" is invoked with the error.
- *
- * The only difference between waterfall() and pipeline() are the arguments
- * passed to each function in the chain. pipeline() always passes the same
- * argument followed by the callback, while waterfall() passes whatever values
- * were emitted by the previous function followed by the callback.
- */
- function waterfall(funcs, callback)
- {
- mod_assert.ok(Array.isArray(funcs),
- '"funcs" must be specified and must be an array');
- mod_assert.ok(arguments.length == 1 || typeof (callback) == 'function',
- '"callback" must be a function');
- var opts = {
- 'funcs': funcs.slice(0),
- 'callback': callback,
- 'args': { impl: 'waterfall' },
- 'stop_when': 'error',
- 'res_type': 'values'
- };
- return (waterfall_impl(opts));
- }
- /*
- * This function is used to implement vasync-functions that need to execute a
- * list of functions in a sequence, but differ in how they make use of the
- * intermediate callbacks and finall callback, as well as under what conditions
- * they stop executing the functions in the list. Examples of such functions
- * are `pipeline`, `waterfall`, and `tryEach`. See the documentation for those
- * functions to see how they operate.
- *
- * This function's behavior is influenced via the `opts` object that we pass
- * in. This object has the following layout:
- *
- * {
- * 'funcs': array of functions
- * 'callback': the final callback
- * 'args': {
- * 'impl': 'pipeline' or 'tryEach' or 'waterfall'
- * 'uarg': the arg passed to each func for 'pipeline'
- * }
- * 'stop_when': 'error' or 'success'
- * 'res_type': 'values' or 'arrays' or 'rv'
- * }
- *
- * In the object, 'res_type' is used to indicate what the type of the result
- * values(s) is that we pass to the final callback. We secondarily use
- * 'args.impl' to adjust this behavior in an implementation-specific way. For
- * example, 'tryEach' only returns an array if it has more than 1 result passed
- * to the final callback. Otherwise, it passes a solitary value to the final
- * callback.
- *
- * In case it's not clear, 'rv' in the `res_type` member, is just the
- * result-value that we also return. This is the convention in functions that
- * originated in `vasync` (pipeline), but not in functions that originated in
- * `async` (waterfall, tryEach).
- */
- function waterfall_impl(opts)
- {
- mod_assert.ok(typeof (opts) === 'object');
- var rv, current, next;
- var funcs = opts.funcs;
- var callback = opts.callback;
- mod_assert.ok(Array.isArray(funcs),
- '"opts.funcs" must be specified and must be an array');
- mod_assert.ok(arguments.length == 1,
- 'Function "waterfall_impl" must take only 1 arg');
- mod_assert.ok(opts.res_type === 'values' ||
- opts.res_type === 'array' || opts.res_type == 'rv',
- '"opts.res_type" must either be "values", "array", or "rv"');
- mod_assert.ok(opts.stop_when === 'error' ||
- opts.stop_when === 'success',
- '"opts.stop_when" must either be "error" or "success"');
- mod_assert.ok(opts.args.impl === 'pipeline' ||
- opts.args.impl === 'waterfall' || opts.args.impl === 'tryEach',
- '"opts.args.impl" must be "pipeline", "waterfall", or "tryEach"');
- if (opts.args.impl === 'pipeline') {
- mod_assert.ok(typeof (opts.args.uarg) !== undefined,
- '"opts.args.uarg" should be defined when pipeline is used');
- }
- rv = {
- 'operations': funcs.map(function (func) {
- return ({
- 'func': func,
- 'funcname': func.name || '(anon)',
- 'status': 'waiting'
- });
- }),
- 'successes': [],
- 'ndone': 0,
- 'nerrors': 0
- };
- if (funcs.length === 0) {
- if (callback)
- setImmediate(function () {
- var res = (opts.args.impl === 'pipeline') ? rv
- : undefined;
- callback(null, res);
- });
- return (rv);
- }
- next = function (idx, err) {
- /*
- * Note that nfunc_args contains the args we will pass to the
- * next func in the func-list the user gave us. Except for
- * 'tryEach', which passes cb's. However, it will pass
- * 'nfunc_args' to its final callback -- see below.
- */
- var res_key, nfunc_args, entry, nextentry;
- if (err === undefined)
- err = null;
- if (idx != current) {
- throw (new mod_verror.VError(
- 'vasync.waterfall: function %d ("%s") invoked ' +
- 'its callback twice', idx,
- rv['operations'][idx].funcname));
- }
- mod_assert.equal(idx, rv['ndone'],
- 'idx should be equal to ndone');
- entry = rv['operations'][rv['ndone']++];
- if (opts.args.impl === 'tryEach' ||
- opts.args.impl === 'waterfall') {
- nfunc_args = Array.prototype.slice.call(arguments, 2);
- res_key = 'results';
- entry['results'] = nfunc_args;
- } else if (opts.args.impl === 'pipeline') {
- nfunc_args = [ opts.args.uarg ];
- res_key = 'result';
- entry['result'] = arguments[2];
- }
- mod_assert.equal(entry['status'], 'pending',
- 'status should be pending');
- entry['status'] = err ? 'fail' : 'ok';
- entry['err'] = err;
- if (err) {
- rv['nerrors']++;
- } else {
- rv['successes'].push(entry[res_key]);
- }
- if ((opts.stop_when === 'error' && err) ||
- (opts.stop_when === 'success' &&
- rv['successes'].length > 0) ||
- rv['ndone'] == funcs.length) {
- if (callback) {
- if (opts.res_type === 'values' ||
- (opts.res_type === 'array' &&
- nfunc_args.length <= 1)) {
- nfunc_args.unshift(err);
- callback.apply(null, nfunc_args);
- } else if (opts.res_type === 'array') {
- callback(err, nfunc_args);
- } else if (opts.res_type === 'rv') {
- callback(err, rv);
- }
- }
- } else {
- nextentry = rv['operations'][rv['ndone']];
- nextentry['status'] = 'pending';
- current++;
- nfunc_args.push(next.bind(null, current));
- setImmediate(function () {
- var nfunc = nextentry['func'];
- /*
- * At first glance it may seem like this branch
- * is superflous with the code above that
- * branches on `opts.args.impl`. It may also
- * seem like calling `nfunc.apply` is
- * sufficient for both cases (after all we
- * pushed `next.bind(null, current)` to the
- * `nfunc_args` array), before we call
- * `setImmediate()`. However, this is not the
- * case, because the interface exposed by
- * tryEach is different from the others. The
- * others pass argument(s) from task to task.
- * tryEach passes nothing but a callback
- * (`next.bind` below). However, the callback
- * itself _can_ be called with one or more
- * results, which we collect into `nfunc_args`
- * using the aformentioned `opts.args.impl`
- * branch above, and which we pass to the
- * callback via the `opts.res_type` branch
- * above (where res_type is set to 'array').
- */
- if (opts.args.impl !== 'tryEach') {
- nfunc.apply(null, nfunc_args);
- } else {
- nfunc(next.bind(null, current));
- }
- });
- }
- };
- rv['operations'][0]['status'] = 'pending';
- current = 0;
- if (opts.args.impl !== 'pipeline') {
- funcs[0](next.bind(null, current));
- } else {
- funcs[0](opts.args.uarg, next.bind(null, current));
- }
- return (rv);
- }
|