read.js 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. /**
  2. * @method stream.read
  3. * @description
  4. * Consumes and processes data from a $[Readable] stream.
  5. *
  6. * It reads the entire stream, using either **paused mode** (default), or in chunks (see `options.readChunks`)
  7. * with support for both synchronous and asynchronous data processing.
  8. *
  9. * **NOTE:** Once the method has finished, the onus is on the caller to release the stream
  10. * according to its protocol.
  11. *
  12. * @param {Object} stream
  13. * $[Readable] stream object.
  14. *
  15. * Passing in anything else will throw `Readable stream is required.`
  16. *
  17. * @param {Function|generator} receiver
  18. * Data processing callback (or generator).
  19. *
  20. * Passing in anything else will throw `Invalid stream receiver.`
  21. *
  22. * Parameters:
  23. * - `index` = index of the call made to the function
  24. * - `data` = array of all data reads from the stream's buffer
  25. * - `delay` = number of milliseconds since the last call (`undefined` when `index=0`)
  26. *
  27. * The function is called with the same `this` context as the calling method.
  28. *
  29. * It can optionally return a promise object, if data processing is asynchronous.
  30. * And if a promise is returned, the method will not read data from the stream again,
  31. * until the promise has been resolved.
  32. *
  33. * If the function throws an error or returns a rejected promise, the method rejects
  34. * with the same error / rejection reason.
  35. *
  36. * @param {Object} [options]
  37. * Optional Parameters.
  38. *
  39. * @param {Boolean} [options.closable=false]
  40. * Instructs the method to resolve on event `close` supported by the stream, as opposed to event
  41. * `end` that's used by default.
  42. *
  43. * @param {Boolean} [options.readChunks=false]
  44. * By default, the method handles event `readable` of the stream to consume data in a simplified form,
  45. * item by item. If you enable this option, the method will instead handle event `data` of the stream,
  46. * to consume chunks of data.
  47. *
  48. * @param {Number} [options.readSize]
  49. * When the value is greater than 0, it sets the read size from the stream's buffer
  50. * when the next data is available. By default, the method uses as few reads as possible
  51. * to get all the data currently available in the buffer.
  52. *
  53. * NOTE: This option is ignored when option `readChunks` is enabled.
  54. *
  55. * @returns {external:Promise}
  56. *
  57. * When finished successfully, resolves with object `{calls, reads, length, duration}`:
  58. * - `calls` = number of calls made into the `receiver`
  59. * - `reads` = number of successful reads from the stream
  60. * - `length` = total length for all the data reads from the stream
  61. * - `duration` = number of milliseconds consumed by the method
  62. *
  63. * When it fails, the method rejects with the error/reject specified,
  64. * which can happen as a result of:
  65. * - event `error` emitted by the stream
  66. * - receiver throws an error or returns a rejected promise
  67. */
  68. function read(stream, receiver, options, config) {
  69. const $p = config.promise, utils = config.utils;
  70. if (!utils.isReadableStream(stream)) {
  71. return $p.reject(new TypeError('Readable stream is required.'));
  72. }
  73. if (typeof receiver !== 'function') {
  74. return $p.reject(new TypeError('Invalid stream receiver.'));
  75. }
  76. receiver = utils.wrap(receiver);
  77. options = options || {};
  78. const readSize = (options.readSize > 0) ? parseInt(options.readSize) : null,
  79. self = this, start = Date.now(), receiveEvent = options.readChunks ? 'data' : 'readable';
  80. let cbTime, ready, waiting, stop, reads = 0, length = 0, index = 0;
  81. return $p((resolve, reject) => {
  82. function onReceive(data) {
  83. ready = true;
  84. process(data);
  85. }
  86. function onEnd() {
  87. if (!options.closable) {
  88. success();
  89. }
  90. }
  91. function onClose() {
  92. success();
  93. }
  94. function onError(error) {
  95. fail(error);
  96. }
  97. stream.on(receiveEvent, onReceive);
  98. stream.on('end', onEnd);
  99. stream.on('close', onClose);
  100. stream.on('error', onError);
  101. function process(data) {
  102. if (!ready || stop || waiting) {
  103. return;
  104. }
  105. ready = false;
  106. let cache;
  107. if (options.readChunks) {
  108. cache = data;
  109. // istanbul ignore else;
  110. // we cannot test the else condition, as it requires a special broken stream interface.
  111. if (!Array.isArray(cache)) {
  112. cache = [cache];
  113. }
  114. length += cache.length;
  115. reads++;
  116. } else {
  117. cache = [];
  118. waiting = true;
  119. let page;
  120. do {
  121. page = stream.read(readSize);
  122. if (page) {
  123. cache.push(page);
  124. // istanbul ignore next: requires a unique stream that
  125. // creates objects without property `length` defined.
  126. length += page.length || 0;
  127. reads++;
  128. }
  129. } while (page);
  130. if (!cache.length) {
  131. waiting = false;
  132. return;
  133. }
  134. }
  135. const cbNow = Date.now(),
  136. cbDelay = index ? (cbNow - cbTime) : undefined;
  137. let result;
  138. cbTime = cbNow;
  139. try {
  140. result = receiver.call(self, index++, cache, cbDelay);
  141. } catch (e) {
  142. fail(e);
  143. return;
  144. }
  145. if (utils.isPromise(result)) {
  146. result
  147. .then(() => {
  148. waiting = false;
  149. process();
  150. return null; // this dummy return is just to prevent Bluebird warnings;
  151. })
  152. .catch(error => {
  153. fail(error);
  154. });
  155. } else {
  156. waiting = false;
  157. process();
  158. }
  159. }
  160. function success() {
  161. cleanup();
  162. resolve({
  163. calls: index,
  164. reads: reads,
  165. length: length,
  166. duration: Date.now() - start
  167. });
  168. }
  169. function fail(error) {
  170. stop = true;
  171. cleanup();
  172. reject(error);
  173. }
  174. function cleanup() {
  175. stream.removeListener(receiveEvent, onReceive);
  176. stream.removeListener('close', onClose);
  177. stream.removeListener('error', onError);
  178. stream.removeListener('end', onEnd);
  179. }
  180. });
  181. }
  182. module.exports = function (config) {
  183. return function (stream, receiver, options) {
  184. return read.call(this, stream, receiver, options, config);
  185. };
  186. };