test.js 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. var tape = require('tape')
  2. var through = require('through2')
  3. var concat = require('concat-stream')
  4. var stream = require('readable-stream')
  5. var net = require('net')
  6. var duplexify = require('./')
  7. var HELLO_WORLD = (Buffer.from && Buffer.from !== Uint8Array.from)
  8. ? Buffer.from('hello world')
  9. : new Buffer('hello world')
  10. tape('passthrough', function(t) {
  11. t.plan(2)
  12. var pt = through()
  13. var dup = duplexify(pt, pt)
  14. dup.end('hello world')
  15. dup.on('finish', function() {
  16. t.ok(true, 'should finish')
  17. })
  18. dup.pipe(concat(function(data) {
  19. t.same(data.toString(), 'hello world', 'same in as out')
  20. }))
  21. })
  22. tape('passthrough + double end', function(t) {
  23. t.plan(2)
  24. var pt = through()
  25. var dup = duplexify(pt, pt)
  26. dup.end('hello world')
  27. dup.end()
  28. dup.on('finish', function() {
  29. t.ok(true, 'should finish')
  30. })
  31. dup.pipe(concat(function(data) {
  32. t.same(data.toString(), 'hello world', 'same in as out')
  33. }))
  34. })
  35. tape('async passthrough + end', function(t) {
  36. t.plan(2)
  37. var pt = through.obj({highWaterMark:1}, function(data, enc, cb) {
  38. setTimeout(function() {
  39. cb(null, data)
  40. }, 100)
  41. })
  42. var dup = duplexify(pt, pt)
  43. dup.write('hello ')
  44. dup.write('world')
  45. dup.end()
  46. dup.on('finish', function() {
  47. t.ok(true, 'should finish')
  48. })
  49. dup.pipe(concat(function(data) {
  50. t.same(data.toString(), 'hello world', 'same in as out')
  51. }))
  52. })
  53. tape('duplex', function(t) {
  54. var readExpected = ['read-a', 'read-b', 'read-c']
  55. var writeExpected = ['write-a', 'write-b', 'write-c']
  56. t.plan(readExpected.length+writeExpected.length+2)
  57. var readable = through.obj()
  58. var writable = through.obj(function(data, enc, cb) {
  59. t.same(data, writeExpected.shift(), 'onwrite should match')
  60. cb()
  61. })
  62. var dup = duplexify.obj(writable, readable)
  63. readExpected.slice().forEach(function(data) {
  64. readable.write(data)
  65. })
  66. readable.end()
  67. writeExpected.slice().forEach(function(data) {
  68. dup.write(data)
  69. })
  70. dup.end()
  71. dup.on('data', function(data) {
  72. t.same(data, readExpected.shift(), 'ondata should match')
  73. })
  74. dup.on('end', function() {
  75. t.ok(true, 'should end')
  76. })
  77. dup.on('finish', function() {
  78. t.ok(true, 'should finish')
  79. })
  80. })
  81. tape('async', function(t) {
  82. var dup = duplexify()
  83. var pt = through()
  84. dup.pipe(concat(function(data) {
  85. t.same(data.toString(), 'i was async', 'same in as out')
  86. t.end()
  87. }))
  88. dup.write('i')
  89. dup.write(' was ')
  90. dup.end('async')
  91. setTimeout(function() {
  92. dup.setWritable(pt)
  93. setTimeout(function() {
  94. dup.setReadable(pt)
  95. }, 50)
  96. }, 50)
  97. })
  98. tape('destroy', function(t) {
  99. t.plan(2)
  100. var write = through()
  101. var read = through()
  102. var dup = duplexify(write, read)
  103. write.destroy = function() {
  104. t.ok(true, 'write destroyed')
  105. }
  106. dup.on('close', function() {
  107. t.ok(true, 'close emitted')
  108. })
  109. dup.destroy()
  110. dup.destroy() // should only work once
  111. dup.end()
  112. })
  113. tape('destroy both', function(t) {
  114. t.plan(3)
  115. var write = through()
  116. var read = through()
  117. var dup = duplexify(write, read)
  118. write.destroy = function() {
  119. t.ok(true, 'write destroyed')
  120. }
  121. read.destroy = function() {
  122. t.ok(true, 'read destroyed')
  123. }
  124. dup.on('close', function() {
  125. t.ok(true, 'close emitted')
  126. })
  127. dup.destroy()
  128. dup.destroy() // should only work once
  129. })
  130. tape('bubble read errors', function(t) {
  131. t.plan(2)
  132. var write = through()
  133. var read = through()
  134. var dup = duplexify(write, read)
  135. dup.on('error', function(err) {
  136. t.same(err.message, 'read-error', 'received read error')
  137. })
  138. dup.on('close', function() {
  139. t.ok(true, 'close emitted')
  140. })
  141. read.emit('error', new Error('read-error'))
  142. write.emit('error', new Error('write-error')) // only emit first error
  143. })
  144. tape('bubble write errors', function(t) {
  145. t.plan(2)
  146. var write = through()
  147. var read = through()
  148. var dup = duplexify(write, read)
  149. dup.on('error', function(err) {
  150. t.same(err.message, 'write-error', 'received write error')
  151. })
  152. dup.on('close', function() {
  153. t.ok(true, 'close emitted')
  154. })
  155. write.emit('error', new Error('write-error'))
  156. read.emit('error', new Error('read-error')) // only emit first error
  157. })
  158. tape('bubble errors from write()', function(t) {
  159. t.plan(3)
  160. var errored = false
  161. var dup = duplexify(new stream.Writable({
  162. write: function(chunk, enc, next) {
  163. next(new Error('write-error'))
  164. }
  165. }))
  166. dup.on('error', function(err) {
  167. errored = true
  168. t.same(err.message, 'write-error', 'received write error')
  169. })
  170. dup.on('close', function() {
  171. t.pass('close emitted')
  172. t.ok(errored, 'error was emitted before close')
  173. })
  174. dup.end('123')
  175. })
  176. tape('destroy while waiting for drain', function(t) {
  177. t.plan(3)
  178. var errored = false
  179. var dup = duplexify(new stream.Writable({
  180. highWaterMark: 0,
  181. write: function() {}
  182. }))
  183. dup.on('error', function(err) {
  184. errored = true
  185. t.same(err.message, 'destroy-error', 'received destroy error')
  186. })
  187. dup.on('close', function() {
  188. t.pass('close emitted')
  189. t.ok(errored, 'error was emitted before close')
  190. })
  191. dup.write('123')
  192. dup.destroy(new Error('destroy-error'))
  193. })
  194. tape('reset writable / readable', function(t) {
  195. t.plan(3)
  196. var toUpperCase = function(data, enc, cb) {
  197. cb(null, data.toString().toUpperCase())
  198. }
  199. var passthrough = through()
  200. var upper = through(toUpperCase)
  201. var dup = duplexify(passthrough, passthrough)
  202. dup.once('data', function(data) {
  203. t.same(data.toString(), 'hello')
  204. dup.setWritable(upper)
  205. dup.setReadable(upper)
  206. dup.once('data', function(data) {
  207. t.same(data.toString(), 'HELLO')
  208. dup.once('data', function(data) {
  209. t.same(data.toString(), 'HI')
  210. t.end()
  211. })
  212. })
  213. dup.write('hello')
  214. dup.write('hi')
  215. })
  216. dup.write('hello')
  217. })
  218. tape('cork', function(t) {
  219. var passthrough = through()
  220. var dup = duplexify(passthrough, passthrough)
  221. var ok = false
  222. dup.on('prefinish', function() {
  223. dup.cork()
  224. setTimeout(function() {
  225. ok = true
  226. dup.uncork()
  227. }, 100)
  228. })
  229. dup.on('finish', function() {
  230. t.ok(ok)
  231. t.end()
  232. })
  233. dup.end()
  234. })
  235. tape('prefinish not twice', function(t) {
  236. var passthrough = through()
  237. var dup = duplexify(passthrough, passthrough)
  238. var prefinished = false
  239. dup.on('prefinish', function() {
  240. t.ok(!prefinished, 'only prefinish once')
  241. prefinished = true
  242. })
  243. dup.on('finish', function() {
  244. t.end()
  245. })
  246. dup.end()
  247. })
  248. tape('close', function(t) {
  249. var passthrough = through()
  250. var dup = duplexify(passthrough, passthrough)
  251. passthrough.emit('close')
  252. dup.on('close', function() {
  253. t.ok(true, 'should forward close')
  254. t.end()
  255. })
  256. })
  257. tape('works with node native streams (net)', function(t) {
  258. t.plan(1)
  259. var server = net.createServer(function(socket) {
  260. var dup = duplexify(socket, socket)
  261. dup.once('data', function(chunk) {
  262. t.same(chunk, HELLO_WORLD)
  263. server.close()
  264. socket.end()
  265. t.end()
  266. })
  267. })
  268. server.listen(0, function () {
  269. var socket = net.connect(server.address().port)
  270. var dup = duplexify(socket, socket)
  271. dup.write(HELLO_WORLD)
  272. })
  273. })