index.js 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. //filter will reemit the data if cb(err,pass) pass is truthy
  2. // reduce is more tricky
  3. // maybe we want to group the reductions or emit progress updates occasionally
  4. // the most basic reduce just emits one 'data' event after it has recieved 'end'
  5. var Stream = require('stream').Stream
  6. , es = exports
  7. , through = require('through')
  8. , from = require('from')
  9. , duplex = require('duplexer')
  10. , map = require('map-stream')
  11. , pause = require('pause-stream')
  12. , split = require('split')
  13. , pipeline = require('stream-combiner')
  14. , immediately = global.setImmediate || process.nextTick;
  15. es.Stream = Stream //re-export Stream from core
  16. es.through = through
  17. es.from = from
  18. es.duplex = duplex
  19. es.map = map
  20. es.pause = pause
  21. es.split = split
  22. es.pipeline = es.connect = es.pipe = pipeline
  23. // merge / concat
  24. //
  25. // combine multiple streams into a single stream.
  26. // will emit end only once
  27. es.concat = //actually this should be called concat
  28. es.merge = function (/*streams...*/) {
  29. var toMerge = [].slice.call(arguments)
  30. if (toMerge.length === 1 && (toMerge[0] instanceof Array)) {
  31. toMerge = toMerge[0] //handle array as arguments object
  32. }
  33. var stream = new Stream()
  34. stream.setMaxListeners(0) // allow adding more than 11 streams
  35. var endCount = 0
  36. stream.writable = stream.readable = true
  37. if (toMerge.length) {
  38. toMerge.forEach(function (e) {
  39. e.pipe(stream, {end: false})
  40. var ended = false
  41. e.on('end', function () {
  42. if(ended) return
  43. ended = true
  44. endCount ++
  45. if(endCount == toMerge.length)
  46. stream.emit('end')
  47. })
  48. })
  49. } else {
  50. process.nextTick(function () {
  51. stream.emit('end')
  52. })
  53. }
  54. stream.write = function (data) {
  55. this.emit('data', data)
  56. }
  57. stream.destroy = function () {
  58. toMerge.forEach(function (e) {
  59. if(e.destroy) e.destroy()
  60. })
  61. }
  62. return stream
  63. }
  64. // writable stream, collects all events into an array
  65. // and calls back when 'end' occurs
  66. // mainly I'm using this to test the other functions
  67. es.collect =
  68. es.writeArray = function (done) {
  69. if ('function' !== typeof done)
  70. throw new Error('function writeArray (done): done must be function')
  71. var a = new Stream ()
  72. , array = [], isDone = false
  73. a.write = function (l) {
  74. array.push(l)
  75. }
  76. a.end = function () {
  77. isDone = true
  78. done(null, array)
  79. }
  80. a.writable = true
  81. a.readable = false
  82. a.destroy = function () {
  83. a.writable = a.readable = false
  84. if(isDone) return
  85. done(new Error('destroyed before end'), array)
  86. }
  87. return a
  88. }
  89. //return a Stream that reads the properties of an object
  90. //respecting pause() and resume()
  91. es.readArray = function (array) {
  92. var stream = new Stream()
  93. , i = 0
  94. , paused = false
  95. , ended = false
  96. stream.readable = true
  97. stream.writable = false
  98. if(!Array.isArray(array))
  99. throw new Error('event-stream.read expects an array')
  100. stream.resume = function () {
  101. if(ended) return
  102. paused = false
  103. var l = array.length
  104. while(i < l && !paused && !ended) {
  105. stream.emit('data', array[i++])
  106. }
  107. if(i == l && !ended)
  108. ended = true, stream.readable = false, stream.emit('end')
  109. }
  110. process.nextTick(stream.resume)
  111. stream.pause = function () {
  112. paused = true
  113. }
  114. stream.destroy = function () {
  115. ended = true
  116. stream.emit('close')
  117. }
  118. return stream
  119. }
  120. //
  121. // readable (asyncFunction)
  122. // return a stream that calls an async function while the stream is not paused.
  123. //
  124. // the function must take: (count, callback) {...
  125. //
  126. es.readable =
  127. function (func, continueOnError) {
  128. var stream = new Stream()
  129. , i = 0
  130. , paused = false
  131. , ended = false
  132. , reading = false
  133. stream.readable = true
  134. stream.writable = false
  135. if('function' !== typeof func)
  136. throw new Error('event-stream.readable expects async function')
  137. stream.on('end', function () { ended = true })
  138. function get (err, data) {
  139. if(err) {
  140. stream.emit('error', err)
  141. if(!continueOnError) stream.emit('end')
  142. } else if (arguments.length > 1)
  143. stream.emit('data', data)
  144. immediately(function () {
  145. if(ended || paused || reading) return
  146. try {
  147. reading = true
  148. func.call(stream, i++, function () {
  149. reading = false
  150. get.apply(null, arguments)
  151. })
  152. } catch (err) {
  153. stream.emit('error', err)
  154. }
  155. })
  156. }
  157. stream.resume = function () {
  158. paused = false
  159. get()
  160. }
  161. process.nextTick(get)
  162. stream.pause = function () {
  163. paused = true
  164. }
  165. stream.destroy = function () {
  166. stream.emit('end')
  167. stream.emit('close')
  168. ended = true
  169. }
  170. return stream
  171. }
  172. //
  173. // map sync
  174. //
  175. es.mapSync = function (sync) {
  176. return es.through(function write(data) {
  177. var mappedData
  178. try {
  179. mappedData = sync(data)
  180. } catch (err) {
  181. return this.emit('error', err)
  182. }
  183. if (mappedData !== undefined)
  184. this.emit('data', mappedData)
  185. })
  186. }
  187. //
  188. // filterSync
  189. //
  190. es.filterSync = function (test) {
  191. return es.through(function(data){
  192. var s = this
  193. if (test(data)) {
  194. s.queue(data)
  195. }
  196. });
  197. }
  198. //
  199. // flatmapSync
  200. //
  201. es.flatmapSync = function (mapper) {
  202. return es.through(function(data) {
  203. var s = this
  204. data.forEach(function(e) {
  205. s.queue(mapper(e))
  206. })
  207. })
  208. }
  209. //
  210. // log just print out what is coming through the stream, for debugging
  211. //
  212. es.log = function (name) {
  213. return es.through(function (data) {
  214. var args = [].slice.call(arguments)
  215. if(name) console.error(name, data)
  216. else console.error(data)
  217. this.emit('data', data)
  218. })
  219. }
  220. //
  221. // child -- pipe through a child process
  222. //
  223. es.child = function (child) {
  224. return es.duplex(child.stdin, child.stdout)
  225. }
  226. //
  227. // parse
  228. //
  229. // must be used after es.split() to ensure that each chunk represents a line
  230. // source.pipe(es.split()).pipe(es.parse())
  231. es.parse = function (options) {
  232. var emitError = !!(options ? options.error : false)
  233. return es.through(function (data) {
  234. var obj
  235. try {
  236. if(data) //ignore empty lines
  237. obj = JSON.parse(data.toString())
  238. } catch (err) {
  239. if (emitError)
  240. return this.emit('error', err)
  241. return console.error(err, 'attempting to parse:', data)
  242. }
  243. //ignore lines that where only whitespace.
  244. if(obj !== undefined)
  245. this.emit('data', obj)
  246. })
  247. }
  248. //
  249. // stringify
  250. //
  251. es.stringify = function () {
  252. var Buffer = require('buffer').Buffer
  253. return es.mapSync(function (e){
  254. return JSON.stringify(Buffer.isBuffer(e) ? e.toString() : e) + '\n'
  255. })
  256. }
  257. //
  258. // replace a string within a stream.
  259. //
  260. // warn: just concatenates the string and then does str.split().join().
  261. // probably not optimal.
  262. // for smallish responses, who cares?
  263. // I need this for shadow-npm so it's only relatively small json files.
  264. es.replace = function (from, to) {
  265. return es.pipeline(es.split(from), es.join(to))
  266. }
  267. //
  268. // join chunks with a joiner. just like Array#join
  269. // also accepts a callback that is passed the chunks appended together
  270. // this is still supported for legacy reasons.
  271. //
  272. es.join = function (str) {
  273. //legacy api
  274. if('function' === typeof str)
  275. return es.wait(str)
  276. var first = true
  277. return es.through(function (data) {
  278. if(!first)
  279. this.emit('data', str)
  280. first = false
  281. this.emit('data', data)
  282. return true
  283. })
  284. }
  285. //
  286. // wait. callback when 'end' is emitted, with all chunks appended as string.
  287. //
  288. es.wait = function (callback) {
  289. var arr = []
  290. return es.through(function (data) { arr.push(data) },
  291. function () {
  292. var body = Buffer.isBuffer(arr[0]) ? Buffer.concat(arr)
  293. : arr.join('')
  294. this.emit('data', body)
  295. this.emit('end')
  296. if(callback) callback(null, body)
  297. })
  298. }
  299. es.pipeable = function () {
  300. throw new Error('[EVENT-STREAM] es.pipeable is deprecated')
  301. }