eventsource-stream.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. 'use strict'
  2. const { Transform } = require('node:stream')
  3. const { isASCIINumber, isValidLastEventId } = require('./util')
  4. /**
  5. * @type {number[]} BOM
  6. */
  7. const BOM = [0xEF, 0xBB, 0xBF]
  8. /**
  9. * @type {10} LF
  10. */
  11. const LF = 0x0A
  12. /**
  13. * @type {13} CR
  14. */
  15. const CR = 0x0D
  16. /**
  17. * @type {58} COLON
  18. */
  19. const COLON = 0x3A
  20. /**
  21. * @type {32} SPACE
  22. */
  23. const SPACE = 0x20
  24. /**
  25. * @typedef {object} EventSourceStreamEvent
  26. * @type {object}
  27. * @property {string} [event] The event type.
  28. * @property {string} [data] The data of the message.
  29. * @property {string} [id] A unique ID for the event.
  30. * @property {string} [retry] The reconnection time, in milliseconds.
  31. */
  32. /**
  33. * @typedef eventSourceSettings
  34. * @type {object}
  35. * @property {string} lastEventId The last event ID received from the server.
  36. * @property {string} origin The origin of the event source.
  37. * @property {number} reconnectionTime The reconnection time, in milliseconds.
  38. */
  39. class EventSourceStream extends Transform {
  40. /**
  41. * @type {eventSourceSettings}
  42. */
  43. state = null
  44. /**
  45. * Leading byte-order-mark check.
  46. * @type {boolean}
  47. */
  48. checkBOM = true
  49. /**
  50. * @type {boolean}
  51. */
  52. crlfCheck = false
  53. /**
  54. * @type {boolean}
  55. */
  56. eventEndCheck = false
  57. /**
  58. * @type {Buffer}
  59. */
  60. buffer = null
  61. pos = 0
  62. event = {
  63. data: undefined,
  64. event: undefined,
  65. id: undefined,
  66. retry: undefined
  67. }
  68. /**
  69. * @param {object} options
  70. * @param {eventSourceSettings} options.eventSourceSettings
  71. * @param {Function} [options.push]
  72. */
  73. constructor (options = {}) {
  74. // Enable object mode as EventSourceStream emits objects of shape
  75. // EventSourceStreamEvent
  76. options.readableObjectMode = true
  77. super(options)
  78. this.state = options.eventSourceSettings || {}
  79. if (options.push) {
  80. this.push = options.push
  81. }
  82. }
  83. /**
  84. * @param {Buffer} chunk
  85. * @param {string} _encoding
  86. * @param {Function} callback
  87. * @returns {void}
  88. */
  89. _transform (chunk, _encoding, callback) {
  90. if (chunk.length === 0) {
  91. callback()
  92. return
  93. }
  94. // Cache the chunk in the buffer, as the data might not be complete while
  95. // processing it
  96. // TODO: Investigate if there is a more performant way to handle
  97. // incoming chunks
  98. // see: https://github.com/nodejs/undici/issues/2630
  99. if (this.buffer) {
  100. this.buffer = Buffer.concat([this.buffer, chunk])
  101. } else {
  102. this.buffer = chunk
  103. }
  104. // Strip leading byte-order-mark if we opened the stream and started
  105. // the processing of the incoming data
  106. if (this.checkBOM) {
  107. switch (this.buffer.length) {
  108. case 1:
  109. // Check if the first byte is the same as the first byte of the BOM
  110. if (this.buffer[0] === BOM[0]) {
  111. // If it is, we need to wait for more data
  112. callback()
  113. return
  114. }
  115. // Set the checkBOM flag to false as we don't need to check for the
  116. // BOM anymore
  117. this.checkBOM = false
  118. // The buffer only contains one byte so we need to wait for more data
  119. callback()
  120. return
  121. case 2:
  122. // Check if the first two bytes are the same as the first two bytes
  123. // of the BOM
  124. if (
  125. this.buffer[0] === BOM[0] &&
  126. this.buffer[1] === BOM[1]
  127. ) {
  128. // If it is, we need to wait for more data, because the third byte
  129. // is needed to determine if it is the BOM or not
  130. callback()
  131. return
  132. }
  133. // Set the checkBOM flag to false as we don't need to check for the
  134. // BOM anymore
  135. this.checkBOM = false
  136. break
  137. case 3:
  138. // Check if the first three bytes are the same as the first three
  139. // bytes of the BOM
  140. if (
  141. this.buffer[0] === BOM[0] &&
  142. this.buffer[1] === BOM[1] &&
  143. this.buffer[2] === BOM[2]
  144. ) {
  145. // If it is, we can drop the buffered data, as it is only the BOM
  146. this.buffer = Buffer.alloc(0)
  147. // Set the checkBOM flag to false as we don't need to check for the
  148. // BOM anymore
  149. this.checkBOM = false
  150. // Await more data
  151. callback()
  152. return
  153. }
  154. // If it is not the BOM, we can start processing the data
  155. this.checkBOM = false
  156. break
  157. default:
  158. // The buffer is longer than 3 bytes, so we can drop the BOM if it is
  159. // present
  160. if (
  161. this.buffer[0] === BOM[0] &&
  162. this.buffer[1] === BOM[1] &&
  163. this.buffer[2] === BOM[2]
  164. ) {
  165. // Remove the BOM from the buffer
  166. this.buffer = this.buffer.subarray(3)
  167. }
  168. // Set the checkBOM flag to false as we don't need to check for the
  169. this.checkBOM = false
  170. break
  171. }
  172. }
  173. while (this.pos < this.buffer.length) {
  174. // If the previous line ended with an end-of-line, we need to check
  175. // if the next character is also an end-of-line.
  176. if (this.eventEndCheck) {
  177. // If the the current character is an end-of-line, then the event
  178. // is finished and we can process it
  179. // If the previous line ended with a carriage return, we need to
  180. // check if the current character is a line feed and remove it
  181. // from the buffer.
  182. if (this.crlfCheck) {
  183. // If the current character is a line feed, we can remove it
  184. // from the buffer and reset the crlfCheck flag
  185. if (this.buffer[this.pos] === LF) {
  186. this.buffer = this.buffer.subarray(this.pos + 1)
  187. this.pos = 0
  188. this.crlfCheck = false
  189. // It is possible that the line feed is not the end of the
  190. // event. We need to check if the next character is an
  191. // end-of-line character to determine if the event is
  192. // finished. We simply continue the loop to check the next
  193. // character.
  194. // As we removed the line feed from the buffer and set the
  195. // crlfCheck flag to false, we basically don't make any
  196. // distinction between a line feed and a carriage return.
  197. continue
  198. }
  199. this.crlfCheck = false
  200. }
  201. if (this.buffer[this.pos] === LF || this.buffer[this.pos] === CR) {
  202. // If the current character is a carriage return, we need to
  203. // set the crlfCheck flag to true, as we need to check if the
  204. // next character is a line feed so we can remove it from the
  205. // buffer
  206. if (this.buffer[this.pos] === CR) {
  207. this.crlfCheck = true
  208. }
  209. this.buffer = this.buffer.subarray(this.pos + 1)
  210. this.pos = 0
  211. if (
  212. this.event.data !== undefined || this.event.event || this.event.id || this.event.retry) {
  213. this.processEvent(this.event)
  214. }
  215. this.clearEvent()
  216. continue
  217. }
  218. // If the current character is not an end-of-line, then the event
  219. // is not finished and we have to reset the eventEndCheck flag
  220. this.eventEndCheck = false
  221. continue
  222. }
  223. // If the current character is an end-of-line, we can process the
  224. // line
  225. if (this.buffer[this.pos] === LF || this.buffer[this.pos] === CR) {
  226. // If the current character is a carriage return, we need to
  227. // set the crlfCheck flag to true, as we need to check if the
  228. // next character is a line feed
  229. if (this.buffer[this.pos] === CR) {
  230. this.crlfCheck = true
  231. }
  232. // In any case, we can process the line as we reached an
  233. // end-of-line character
  234. this.parseLine(this.buffer.subarray(0, this.pos), this.event)
  235. // Remove the processed line from the buffer
  236. this.buffer = this.buffer.subarray(this.pos + 1)
  237. // Reset the position as we removed the processed line from the buffer
  238. this.pos = 0
  239. // A line was processed and this could be the end of the event. We need
  240. // to check if the next line is empty to determine if the event is
  241. // finished.
  242. this.eventEndCheck = true
  243. continue
  244. }
  245. this.pos++
  246. }
  247. callback()
  248. }
  249. /**
  250. * @param {Buffer} line
  251. * @param {EventStreamEvent} event
  252. */
  253. parseLine (line, event) {
  254. // If the line is empty (a blank line)
  255. // Dispatch the event, as defined below.
  256. // This will be handled in the _transform method
  257. if (line.length === 0) {
  258. return
  259. }
  260. // If the line starts with a U+003A COLON character (:)
  261. // Ignore the line.
  262. const colonPosition = line.indexOf(COLON)
  263. if (colonPosition === 0) {
  264. return
  265. }
  266. let field = ''
  267. let value = ''
  268. // If the line contains a U+003A COLON character (:)
  269. if (colonPosition !== -1) {
  270. // Collect the characters on the line before the first U+003A COLON
  271. // character (:), and let field be that string.
  272. // TODO: Investigate if there is a more performant way to extract the
  273. // field
  274. // see: https://github.com/nodejs/undici/issues/2630
  275. field = line.subarray(0, colonPosition).toString('utf8')
  276. // Collect the characters on the line after the first U+003A COLON
  277. // character (:), and let value be that string.
  278. // If value starts with a U+0020 SPACE character, remove it from value.
  279. let valueStart = colonPosition + 1
  280. if (line[valueStart] === SPACE) {
  281. ++valueStart
  282. }
  283. // TODO: Investigate if there is a more performant way to extract the
  284. // value
  285. // see: https://github.com/nodejs/undici/issues/2630
  286. value = line.subarray(valueStart).toString('utf8')
  287. // Otherwise, the string is not empty but does not contain a U+003A COLON
  288. // character (:)
  289. } else {
  290. // Process the field using the steps described below, using the whole
  291. // line as the field name, and the empty string as the field value.
  292. field = line.toString('utf8')
  293. value = ''
  294. }
  295. // Modify the event with the field name and value. The value is also
  296. // decoded as UTF-8
  297. switch (field) {
  298. case 'data':
  299. if (event[field] === undefined) {
  300. event[field] = value
  301. } else {
  302. event[field] += `\n${value}`
  303. }
  304. break
  305. case 'retry':
  306. if (isASCIINumber(value)) {
  307. event[field] = value
  308. }
  309. break
  310. case 'id':
  311. if (isValidLastEventId(value)) {
  312. event[field] = value
  313. }
  314. break
  315. case 'event':
  316. if (value.length > 0) {
  317. event[field] = value
  318. }
  319. break
  320. }
  321. }
  322. /**
  323. * @param {EventSourceStreamEvent} event
  324. */
  325. processEvent (event) {
  326. if (event.retry && isASCIINumber(event.retry)) {
  327. this.state.reconnectionTime = parseInt(event.retry, 10)
  328. }
  329. if (event.id && isValidLastEventId(event.id)) {
  330. this.state.lastEventId = event.id
  331. }
  332. // only dispatch event, when data is provided
  333. if (event.data !== undefined) {
  334. this.push({
  335. type: event.event || 'message',
  336. options: {
  337. data: event.data,
  338. lastEventId: this.state.lastEventId,
  339. origin: this.state.origin
  340. }
  341. })
  342. }
  343. }
  344. clearEvent () {
  345. this.event = {
  346. data: undefined,
  347. event: undefined,
  348. id: undefined,
  349. retry: undefined
  350. }
  351. }
  352. }
  353. module.exports = {
  354. EventSourceStream
  355. }