index.js 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. 'use strict';
  2. var Stream = require('stream')
  3. // from
  4. //
  5. // a stream that reads from an source.
  6. // source may be an array, or a function.
  7. // from handles pause behaviour for you.
  8. module.exports =
  9. function from (source) {
  10. if(Array.isArray(source)) {
  11. var source_index = 0, source_len = source.length;
  12. return from (function (i) {
  13. if(source_index < source_len)
  14. this.emit('data', source[source_index++])
  15. else
  16. this.emit('end')
  17. return true
  18. })
  19. }
  20. var s = new Stream(), i = 0
  21. s.ended = false
  22. s.started = false
  23. s.readable = true
  24. s.writable = false
  25. s.paused = false
  26. s.ended = false
  27. s.pause = function () {
  28. s.started = true
  29. s.paused = true
  30. }
  31. function next () {
  32. s.started = true
  33. if(s.ended) return
  34. while(!s.ended && !s.paused && source.call(s, i++, function () {
  35. if(!s.ended && !s.paused)
  36. process.nextTick(next);
  37. }))
  38. ;
  39. }
  40. s.resume = function () {
  41. s.started = true
  42. s.paused = false
  43. next()
  44. }
  45. s.on('end', function () {
  46. s.ended = true
  47. s.readable = false
  48. process.nextTick(s.destroy)
  49. })
  50. s.destroy = function () {
  51. s.ended = true
  52. s.emit('close')
  53. }
  54. /*
  55. by default, the stream will start emitting at nextTick
  56. if you want, you can pause it, after pipeing.
  57. you can also resume before next tick, and that will also
  58. work.
  59. */
  60. process.nextTick(function () {
  61. if(!s.started) s.resume()
  62. })
  63. return s
  64. }