123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- /**
- * @method stream.read
- * @description
- * Consumes and processes data from a $[Readable] stream.
- *
- * It reads the entire stream, using either **paused mode** (default), or in chunks (see `options.readChunks`)
- * with support for both synchronous and asynchronous data processing.
- *
- * **NOTE:** Once the method has finished, the onus is on the caller to release the stream
- * according to its protocol.
- *
- * @param {Object} stream
- * $[Readable] stream object.
- *
- * Passing in anything else will throw `Readable stream is required.`
- *
- * @param {Function|generator} receiver
- * Data processing callback (or generator).
- *
- * Passing in anything else will throw `Invalid stream receiver.`
- *
- * Parameters:
- * - `index` = index of the call made to the function
- * - `data` = array of all data reads from the stream's buffer
- * - `delay` = number of milliseconds since the last call (`undefined` when `index=0`)
- *
- * The function is called with the same `this` context as the calling method.
- *
- * It can optionally return a promise object, if data processing is asynchronous.
- * And if a promise is returned, the method will not read data from the stream again,
- * until the promise has been resolved.
- *
- * If the function throws an error or returns a rejected promise, the method rejects
- * with the same error / rejection reason.
- *
- * @param {Object} [options]
- * Optional Parameters.
- *
- * @param {Boolean} [options.closable=false]
- * Instructs the method to resolve on event `close` supported by the stream, as opposed to event
- * `end` that's used by default.
- *
- * @param {Boolean} [options.readChunks=false]
- * By default, the method handles event `readable` of the stream to consume data in a simplified form,
- * item by item. If you enable this option, the method will instead handle event `data` of the stream,
- * to consume chunks of data.
- *
- * @param {Number} [options.readSize]
- * When the value is greater than 0, it sets the read size from the stream's buffer
- * when the next data is available. By default, the method uses as few reads as possible
- * to get all the data currently available in the buffer.
- *
- * NOTE: This option is ignored when option `readChunks` is enabled.
- *
- * @returns {external:Promise}
- *
- * When finished successfully, resolves with object `{calls, reads, length, duration}`:
- * - `calls` = number of calls made into the `receiver`
- * - `reads` = number of successful reads from the stream
- * - `length` = total length for all the data reads from the stream
- * - `duration` = number of milliseconds consumed by the method
- *
- * When it fails, the method rejects with the error/reject specified,
- * which can happen as a result of:
- * - event `error` emitted by the stream
- * - receiver throws an error or returns a rejected promise
- */
- function read(stream, receiver, options, config) {
- const $p = config.promise, utils = config.utils;
- if (!utils.isReadableStream(stream)) {
- return $p.reject(new TypeError('Readable stream is required.'));
- }
- if (typeof receiver !== 'function') {
- return $p.reject(new TypeError('Invalid stream receiver.'));
- }
- receiver = utils.wrap(receiver);
- options = options || {};
- const readSize = (options.readSize > 0) ? parseInt(options.readSize) : null,
- self = this, start = Date.now(), receiveEvent = options.readChunks ? 'data' : 'readable';
- let cbTime, ready, waiting, stop, reads = 0, length = 0, index = 0;
- return $p((resolve, reject) => {
- function onReceive(data) {
- ready = true;
- process(data);
- }
- function onEnd() {
- if (!options.closable) {
- success();
- }
- }
- function onClose() {
- success();
- }
- function onError(error) {
- fail(error);
- }
- stream.on(receiveEvent, onReceive);
- stream.on('end', onEnd);
- stream.on('close', onClose);
- stream.on('error', onError);
- function process(data) {
- if (!ready || stop || waiting) {
- return;
- }
- ready = false;
- let cache;
- if (options.readChunks) {
- cache = data;
- // istanbul ignore else;
- // we cannot test the else condition, as it requires a special broken stream interface.
- if (!Array.isArray(cache)) {
- cache = [cache];
- }
- length += cache.length;
- reads++;
- } else {
- cache = [];
- waiting = true;
- let page;
- do {
- page = stream.read(readSize);
- if (page) {
- cache.push(page);
- // istanbul ignore next: requires a unique stream that
- // creates objects without property `length` defined.
- length += page.length || 0;
- reads++;
- }
- } while (page);
- if (!cache.length) {
- waiting = false;
- return;
- }
- }
- const cbNow = Date.now(),
- cbDelay = index ? (cbNow - cbTime) : undefined;
- let result;
- cbTime = cbNow;
- try {
- result = receiver.call(self, index++, cache, cbDelay);
- } catch (e) {
- fail(e);
- return;
- }
- if (utils.isPromise(result)) {
- result
- .then(() => {
- waiting = false;
- process();
- return null; // this dummy return is just to prevent Bluebird warnings;
- })
- .catch(error => {
- fail(error);
- });
- } else {
- waiting = false;
- process();
- }
- }
- function success() {
- cleanup();
- resolve({
- calls: index,
- reads: reads,
- length: length,
- duration: Date.now() - start
- });
- }
- function fail(error) {
- stop = true;
- cleanup();
- reject(error);
- }
- function cleanup() {
- stream.removeListener(receiveEvent, onReceive);
- stream.removeListener('close', onClose);
- stream.removeListener('error', onError);
- stream.removeListener('end', onEnd);
- }
- });
- }
- module.exports = function (config) {
- return function (stream, receiver, options) {
- return read.call(this, stream, receiver, options, config);
- };
- };
|