json-stream.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. const Parser = require('jsonparse')
  2. const { Minipass } = require('minipass')
  3. class JSONStreamError extends Error {
  4. constructor (err, caller) {
  5. super(err.message)
  6. Error.captureStackTrace(this, caller || this.constructor)
  7. }
  8. get name () {
  9. return 'JSONStreamError'
  10. }
  11. }
  12. const check = (x, y) =>
  13. typeof x === 'string' ? String(y) === x
  14. : x && typeof x.test === 'function' ? x.test(y)
  15. : typeof x === 'boolean' || typeof x === 'object' ? x
  16. : typeof x === 'function' ? x(y)
  17. : false
  18. class JSONStream extends Minipass {
  19. #count = 0
  20. #ending = false
  21. #footer = null
  22. #header = null
  23. #map = null
  24. #onTokenOriginal
  25. #parser
  26. #path = null
  27. #root = null
  28. constructor (opts) {
  29. super({
  30. ...opts,
  31. objectMode: true,
  32. })
  33. const parser = this.#parser = new Parser()
  34. parser.onValue = value => this.#onValue(value)
  35. this.#onTokenOriginal = parser.onToken
  36. parser.onToken = (token, value) => this.#onToken(token, value)
  37. parser.onError = er => this.#onError(er)
  38. this.#path = typeof opts.path === 'string'
  39. ? opts.path.split('.').map(e =>
  40. e === '$*' ? { emitKey: true }
  41. : e === '*' ? true
  42. : e === '' ? { recurse: true }
  43. : e)
  44. : Array.isArray(opts.path) && opts.path.length ? opts.path
  45. : null
  46. if (typeof opts.map === 'function') {
  47. this.#map = opts.map
  48. }
  49. }
  50. #setHeaderFooter (key, value) {
  51. // header has not been emitted yet
  52. if (this.#header !== false) {
  53. this.#header = this.#header || {}
  54. this.#header[key] = value
  55. }
  56. // footer has not been emitted yet but header has
  57. if (this.#footer !== false && this.#header === false) {
  58. this.#footer = this.#footer || {}
  59. this.#footer[key] = value
  60. }
  61. }
  62. #onError (er) {
  63. // error will always happen during a write() call.
  64. const caller = this.#ending ? this.end : this.write
  65. this.#ending = false
  66. return this.emit('error', new JSONStreamError(er, caller))
  67. }
  68. #onToken (token, value) {
  69. const parser = this.#parser
  70. this.#onTokenOriginal.call(this.#parser, token, value)
  71. if (parser.stack.length === 0) {
  72. if (this.#root) {
  73. const root = this.#root
  74. if (!this.#path) {
  75. super.write(root)
  76. }
  77. this.#root = null
  78. this.#count = 0
  79. }
  80. }
  81. }
  82. #onValue (value) {
  83. const parser = this.#parser
  84. // the LAST onValue encountered is the root object.
  85. // just overwrite it each time.
  86. this.#root = value
  87. if (!this.#path) {
  88. return
  89. }
  90. let i = 0 // iterates on path
  91. let j = 0 // iterates on stack
  92. let emitKey = false
  93. while (i < this.#path.length) {
  94. const key = this.#path[i]
  95. j++
  96. if (key && !key.recurse) {
  97. const c = (j === parser.stack.length) ? parser : parser.stack[j]
  98. if (!c) {
  99. return
  100. }
  101. if (!check(key, c.key)) {
  102. this.#setHeaderFooter(c.key, value)
  103. return
  104. }
  105. emitKey = !!key.emitKey
  106. i++
  107. } else {
  108. i++
  109. if (i >= this.#path.length) {
  110. return
  111. }
  112. const nextKey = this.#path[i]
  113. if (!nextKey) {
  114. return
  115. }
  116. while (true) {
  117. const c = (j === parser.stack.length) ? parser : parser.stack[j]
  118. if (!c) {
  119. return
  120. }
  121. if (check(nextKey, c.key)) {
  122. i++
  123. if (!Object.isFrozen(parser.stack[j])) {
  124. parser.stack[j].value = null
  125. }
  126. break
  127. } else {
  128. this.#setHeaderFooter(c.key, value)
  129. }
  130. j++
  131. }
  132. }
  133. }
  134. // emit header
  135. if (this.#header) {
  136. const header = this.#header
  137. this.#header = false
  138. this.emit('header', header)
  139. }
  140. if (j !== parser.stack.length) {
  141. return
  142. }
  143. this.#count++
  144. const actualPath = parser.stack.slice(1)
  145. .map(e => e.key).concat([parser.key])
  146. if (value !== null && value !== undefined) {
  147. const data = this.#map ? this.#map(value, actualPath) : value
  148. if (data !== null && data !== undefined) {
  149. const emit = emitKey ? { value: data } : data
  150. if (emitKey) {
  151. emit.key = parser.key
  152. }
  153. super.write(emit)
  154. }
  155. }
  156. if (parser.value) {
  157. delete parser.value[parser.key]
  158. }
  159. for (const k of parser.stack) {
  160. k.value = null
  161. }
  162. }
  163. write (chunk, encoding) {
  164. if (typeof chunk === 'string') {
  165. chunk = Buffer.from(chunk, encoding)
  166. } else if (!Buffer.isBuffer(chunk)) {
  167. return this.emit('error', new TypeError(
  168. 'Can only parse JSON from string or buffer input'))
  169. }
  170. this.#parser.write(chunk)
  171. return this.flowing
  172. }
  173. end (chunk, encoding) {
  174. this.#ending = true
  175. if (chunk) {
  176. this.write(chunk, encoding)
  177. }
  178. const h = this.#header
  179. this.#header = null
  180. const f = this.#footer
  181. this.#footer = null
  182. if (h) {
  183. this.emit('header', h)
  184. }
  185. if (f) {
  186. this.emit('footer', f)
  187. }
  188. return super.end()
  189. }
  190. static get JSONStreamError () {
  191. return JSONStreamError
  192. }
  193. static parse (path, map) {
  194. return new JSONStream({ path, map })
  195. }
  196. }
  197. module.exports = JSONStream