index.js 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. //filter will reemit the data if cb(err,pass) pass is truthy
  2. // reduce is more tricky
  3. // maybe we want to group the reductions or emit progress updates occasionally
  4. // the most basic reduce just emits one 'data' event after it has recieved 'end'
  5. var Stream = require('stream').Stream
  6. //create an event stream and apply function to each .write
  7. //emitting each response as data
  8. //unless it's an empty callback
  9. module.exports = function (mapper, opts) {
  10. var stream = new Stream()
  11. , inputs = 0
  12. , outputs = 0
  13. , ended = false
  14. , paused = false
  15. , destroyed = false
  16. , lastWritten = 0
  17. , inNext = false
  18. opts = opts || {};
  19. var errorEventName = opts.failures ? 'failure' : 'error';
  20. // Items that are not ready to be written yet (because they would come out of
  21. // order) get stuck in a queue for later.
  22. var writeQueue = {}
  23. stream.writable = true
  24. stream.readable = true
  25. function queueData (data, number) {
  26. var nextToWrite = lastWritten + 1
  27. if (number === nextToWrite) {
  28. // If it's next, and its not undefined write it
  29. if (data !== undefined) {
  30. stream.emit.apply(stream, ['data', data])
  31. }
  32. lastWritten ++
  33. nextToWrite ++
  34. } else {
  35. // Otherwise queue it for later.
  36. writeQueue[number] = data
  37. }
  38. // If the next value is in the queue, write it
  39. if (writeQueue.hasOwnProperty(nextToWrite)) {
  40. var dataToWrite = writeQueue[nextToWrite]
  41. delete writeQueue[nextToWrite]
  42. return queueData(dataToWrite, nextToWrite)
  43. }
  44. outputs ++
  45. if(inputs === outputs) {
  46. if(paused) paused = false, stream.emit('drain') //written all the incoming events
  47. if(ended) end()
  48. }
  49. }
  50. function next (err, data, number) {
  51. if(destroyed) return
  52. inNext = true
  53. if (!err || opts.failures) {
  54. queueData(data, number)
  55. }
  56. if (err) {
  57. stream.emit.apply(stream, [ errorEventName, err ]);
  58. }
  59. inNext = false;
  60. }
  61. // Wrap the mapper function by calling its callback with the order number of
  62. // the item in the stream.
  63. function wrappedMapper (input, number, callback) {
  64. return mapper.call(null, input, function(err, data){
  65. callback(err, data, number)
  66. })
  67. }
  68. stream.write = function (data) {
  69. if(ended) throw new Error('map stream is not writable')
  70. inNext = false
  71. inputs ++
  72. try {
  73. //catch sync errors and handle them like async errors
  74. var written = wrappedMapper(data, inputs, next)
  75. paused = (written === false)
  76. return !paused
  77. } catch (err) {
  78. //if the callback has been called syncronously, and the error
  79. //has occured in an listener, throw it again.
  80. if(inNext)
  81. throw err
  82. next(err)
  83. return !paused
  84. }
  85. }
  86. function end (data) {
  87. //if end was called with args, write it,
  88. ended = true //write will emit 'end' if ended is true
  89. stream.writable = false
  90. if(data !== undefined) {
  91. return queueData(data, inputs)
  92. } else if (inputs == outputs) { //wait for processing
  93. stream.readable = false, stream.emit('end'), stream.destroy()
  94. }
  95. }
  96. stream.end = function (data) {
  97. if(ended) return
  98. end(data)
  99. }
  100. stream.destroy = function () {
  101. ended = destroyed = true
  102. stream.writable = stream.readable = paused = false
  103. process.nextTick(function () {
  104. stream.emit('close')
  105. })
  106. }
  107. stream.pause = function () {
  108. paused = true
  109. }
  110. stream.resume = function () {
  111. paused = false
  112. }
  113. return stream
  114. }