receiver.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. 'use strict'
  2. const { Writable } = require('node:stream')
  3. const assert = require('node:assert')
  4. const { parserStates, opcodes, states, emptyBuffer, sentCloseFrameState } = require('./constants')
  5. const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols')
  6. const { channels } = require('../../core/diagnostics')
  7. const {
  8. isValidStatusCode,
  9. isValidOpcode,
  10. failWebsocketConnection,
  11. websocketMessageReceived,
  12. utf8Decode,
  13. isControlFrame,
  14. isTextBinaryFrame,
  15. isContinuationFrame
  16. } = require('./util')
  17. const { WebsocketFrameSend } = require('./frame')
  18. const { closeWebSocketConnection } = require('./connection')
  19. const { PerMessageDeflate } = require('./permessage-deflate')
  20. // This code was influenced by ws released under the MIT license.
  21. // Copyright (c) 2011 Einar Otto Stangvik <einaros@gmail.com>
  22. // Copyright (c) 2013 Arnout Kazemier and contributors
  23. // Copyright (c) 2016 Luigi Pinca and contributors
  24. class ByteParser extends Writable {
  25. #buffers = []
  26. #byteOffset = 0
  27. #loop = false
  28. #state = parserStates.INFO
  29. #info = {}
  30. #fragments = []
  31. /** @type {Map<string, PerMessageDeflate>} */
  32. #extensions
  33. constructor (ws, extensions) {
  34. super()
  35. this.ws = ws
  36. this.#extensions = extensions == null ? new Map() : extensions
  37. if (this.#extensions.has('permessage-deflate')) {
  38. this.#extensions.set('permessage-deflate', new PerMessageDeflate(extensions))
  39. }
  40. }
  41. /**
  42. * @param {Buffer} chunk
  43. * @param {() => void} callback
  44. */
  45. _write (chunk, _, callback) {
  46. this.#buffers.push(chunk)
  47. this.#byteOffset += chunk.length
  48. this.#loop = true
  49. this.run(callback)
  50. }
  51. /**
  52. * Runs whenever a new chunk is received.
  53. * Callback is called whenever there are no more chunks buffering,
  54. * or not enough bytes are buffered to parse.
  55. */
  56. run (callback) {
  57. while (this.#loop) {
  58. if (this.#state === parserStates.INFO) {
  59. // If there aren't enough bytes to parse the payload length, etc.
  60. if (this.#byteOffset < 2) {
  61. return callback()
  62. }
  63. const buffer = this.consume(2)
  64. const fin = (buffer[0] & 0x80) !== 0
  65. const opcode = buffer[0] & 0x0F
  66. const masked = (buffer[1] & 0x80) === 0x80
  67. const fragmented = !fin && opcode !== opcodes.CONTINUATION
  68. const payloadLength = buffer[1] & 0x7F
  69. const rsv1 = buffer[0] & 0x40
  70. const rsv2 = buffer[0] & 0x20
  71. const rsv3 = buffer[0] & 0x10
  72. if (!isValidOpcode(opcode)) {
  73. failWebsocketConnection(this.ws, 'Invalid opcode received')
  74. return callback()
  75. }
  76. if (masked) {
  77. failWebsocketConnection(this.ws, 'Frame cannot be masked')
  78. return callback()
  79. }
  80. // MUST be 0 unless an extension is negotiated that defines meanings
  81. // for non-zero values. If a nonzero value is received and none of
  82. // the negotiated extensions defines the meaning of such a nonzero
  83. // value, the receiving endpoint MUST _Fail the WebSocket
  84. // Connection_.
  85. // This document allocates the RSV1 bit of the WebSocket header for
  86. // PMCEs and calls the bit the "Per-Message Compressed" bit. On a
  87. // WebSocket connection where a PMCE is in use, this bit indicates
  88. // whether a message is compressed or not.
  89. if (rsv1 !== 0 && !this.#extensions.has('permessage-deflate')) {
  90. failWebsocketConnection(this.ws, 'Expected RSV1 to be clear.')
  91. return
  92. }
  93. if (rsv2 !== 0 || rsv3 !== 0) {
  94. failWebsocketConnection(this.ws, 'RSV1, RSV2, RSV3 must be clear')
  95. return
  96. }
  97. if (fragmented && !isTextBinaryFrame(opcode)) {
  98. // Only text and binary frames can be fragmented
  99. failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.')
  100. return
  101. }
  102. // If we are already parsing a text/binary frame and do not receive either
  103. // a continuation frame or close frame, fail the connection.
  104. if (isTextBinaryFrame(opcode) && this.#fragments.length > 0) {
  105. failWebsocketConnection(this.ws, 'Expected continuation frame')
  106. return
  107. }
  108. if (this.#info.fragmented && fragmented) {
  109. // A fragmented frame can't be fragmented itself
  110. failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.')
  111. return
  112. }
  113. // "All control frames MUST have a payload length of 125 bytes or less
  114. // and MUST NOT be fragmented."
  115. if ((payloadLength > 125 || fragmented) && isControlFrame(opcode)) {
  116. failWebsocketConnection(this.ws, 'Control frame either too large or fragmented')
  117. return
  118. }
  119. if (isContinuationFrame(opcode) && this.#fragments.length === 0 && !this.#info.compressed) {
  120. failWebsocketConnection(this.ws, 'Unexpected continuation frame')
  121. return
  122. }
  123. if (payloadLength <= 125) {
  124. this.#info.payloadLength = payloadLength
  125. this.#state = parserStates.READ_DATA
  126. } else if (payloadLength === 126) {
  127. this.#state = parserStates.PAYLOADLENGTH_16
  128. } else if (payloadLength === 127) {
  129. this.#state = parserStates.PAYLOADLENGTH_64
  130. }
  131. if (isTextBinaryFrame(opcode)) {
  132. this.#info.binaryType = opcode
  133. this.#info.compressed = rsv1 !== 0
  134. }
  135. this.#info.opcode = opcode
  136. this.#info.masked = masked
  137. this.#info.fin = fin
  138. this.#info.fragmented = fragmented
  139. } else if (this.#state === parserStates.PAYLOADLENGTH_16) {
  140. if (this.#byteOffset < 2) {
  141. return callback()
  142. }
  143. const buffer = this.consume(2)
  144. this.#info.payloadLength = buffer.readUInt16BE(0)
  145. this.#state = parserStates.READ_DATA
  146. } else if (this.#state === parserStates.PAYLOADLENGTH_64) {
  147. if (this.#byteOffset < 8) {
  148. return callback()
  149. }
  150. const buffer = this.consume(8)
  151. const upper = buffer.readUInt32BE(0)
  152. // 2^31 is the maximum bytes an arraybuffer can contain
  153. // on 32-bit systems. Although, on 64-bit systems, this is
  154. // 2^53-1 bytes.
  155. // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Errors/Invalid_array_length
  156. // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/common/globals.h;drc=1946212ac0100668f14eb9e2843bdd846e510a1e;bpv=1;bpt=1;l=1275
  157. // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/objects/js-array-buffer.h;l=34;drc=1946212ac0100668f14eb9e2843bdd846e510a1e
  158. if (upper > 2 ** 31 - 1) {
  159. failWebsocketConnection(this.ws, 'Received payload length > 2^31 bytes.')
  160. return
  161. }
  162. const lower = buffer.readUInt32BE(4)
  163. this.#info.payloadLength = (upper << 8) + lower
  164. this.#state = parserStates.READ_DATA
  165. } else if (this.#state === parserStates.READ_DATA) {
  166. if (this.#byteOffset < this.#info.payloadLength) {
  167. return callback()
  168. }
  169. const body = this.consume(this.#info.payloadLength)
  170. if (isControlFrame(this.#info.opcode)) {
  171. this.#loop = this.parseControlFrame(body)
  172. this.#state = parserStates.INFO
  173. } else {
  174. if (!this.#info.compressed) {
  175. this.#fragments.push(body)
  176. // If the frame is not fragmented, a message has been received.
  177. // If the frame is fragmented, it will terminate with a fin bit set
  178. // and an opcode of 0 (continuation), therefore we handle that when
  179. // parsing continuation frames, not here.
  180. if (!this.#info.fragmented && this.#info.fin) {
  181. const fullMessage = Buffer.concat(this.#fragments)
  182. websocketMessageReceived(this.ws, this.#info.binaryType, fullMessage)
  183. this.#fragments.length = 0
  184. }
  185. this.#state = parserStates.INFO
  186. } else {
  187. this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (error, data) => {
  188. if (error) {
  189. closeWebSocketConnection(this.ws, 1007, error.message, error.message.length)
  190. return
  191. }
  192. this.#fragments.push(data)
  193. if (!this.#info.fin) {
  194. this.#state = parserStates.INFO
  195. this.#loop = true
  196. this.run(callback)
  197. return
  198. }
  199. websocketMessageReceived(this.ws, this.#info.binaryType, Buffer.concat(this.#fragments))
  200. this.#loop = true
  201. this.#state = parserStates.INFO
  202. this.#fragments.length = 0
  203. this.run(callback)
  204. })
  205. this.#loop = false
  206. break
  207. }
  208. }
  209. }
  210. }
  211. }
  212. /**
  213. * Take n bytes from the buffered Buffers
  214. * @param {number} n
  215. * @returns {Buffer}
  216. */
  217. consume (n) {
  218. if (n > this.#byteOffset) {
  219. throw new Error('Called consume() before buffers satiated.')
  220. } else if (n === 0) {
  221. return emptyBuffer
  222. }
  223. if (this.#buffers[0].length === n) {
  224. this.#byteOffset -= this.#buffers[0].length
  225. return this.#buffers.shift()
  226. }
  227. const buffer = Buffer.allocUnsafe(n)
  228. let offset = 0
  229. while (offset !== n) {
  230. const next = this.#buffers[0]
  231. const { length } = next
  232. if (length + offset === n) {
  233. buffer.set(this.#buffers.shift(), offset)
  234. break
  235. } else if (length + offset > n) {
  236. buffer.set(next.subarray(0, n - offset), offset)
  237. this.#buffers[0] = next.subarray(n - offset)
  238. break
  239. } else {
  240. buffer.set(this.#buffers.shift(), offset)
  241. offset += next.length
  242. }
  243. }
  244. this.#byteOffset -= n
  245. return buffer
  246. }
  247. parseCloseBody (data) {
  248. assert(data.length !== 1)
  249. // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
  250. /** @type {number|undefined} */
  251. let code
  252. if (data.length >= 2) {
  253. // _The WebSocket Connection Close Code_ is
  254. // defined as the status code (Section 7.4) contained in the first Close
  255. // control frame received by the application
  256. code = data.readUInt16BE(0)
  257. }
  258. if (code !== undefined && !isValidStatusCode(code)) {
  259. return { code: 1002, reason: 'Invalid status code', error: true }
  260. }
  261. // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.6
  262. /** @type {Buffer} */
  263. let reason = data.subarray(2)
  264. // Remove BOM
  265. if (reason[0] === 0xEF && reason[1] === 0xBB && reason[2] === 0xBF) {
  266. reason = reason.subarray(3)
  267. }
  268. try {
  269. reason = utf8Decode(reason)
  270. } catch {
  271. return { code: 1007, reason: 'Invalid UTF-8', error: true }
  272. }
  273. return { code, reason, error: false }
  274. }
  275. /**
  276. * Parses control frames.
  277. * @param {Buffer} body
  278. */
  279. parseControlFrame (body) {
  280. const { opcode, payloadLength } = this.#info
  281. if (opcode === opcodes.CLOSE) {
  282. if (payloadLength === 1) {
  283. failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.')
  284. return false
  285. }
  286. this.#info.closeInfo = this.parseCloseBody(body)
  287. if (this.#info.closeInfo.error) {
  288. const { code, reason } = this.#info.closeInfo
  289. closeWebSocketConnection(this.ws, code, reason, reason.length)
  290. failWebsocketConnection(this.ws, reason)
  291. return false
  292. }
  293. if (this.ws[kSentClose] !== sentCloseFrameState.SENT) {
  294. // If an endpoint receives a Close frame and did not previously send a
  295. // Close frame, the endpoint MUST send a Close frame in response. (When
  296. // sending a Close frame in response, the endpoint typically echos the
  297. // status code it received.)
  298. let body = emptyBuffer
  299. if (this.#info.closeInfo.code) {
  300. body = Buffer.allocUnsafe(2)
  301. body.writeUInt16BE(this.#info.closeInfo.code, 0)
  302. }
  303. const closeFrame = new WebsocketFrameSend(body)
  304. this.ws[kResponse].socket.write(
  305. closeFrame.createFrame(opcodes.CLOSE),
  306. (err) => {
  307. if (!err) {
  308. this.ws[kSentClose] = sentCloseFrameState.SENT
  309. }
  310. }
  311. )
  312. }
  313. // Upon either sending or receiving a Close control frame, it is said
  314. // that _The WebSocket Closing Handshake is Started_ and that the
  315. // WebSocket connection is in the CLOSING state.
  316. this.ws[kReadyState] = states.CLOSING
  317. this.ws[kReceivedClose] = true
  318. return false
  319. } else if (opcode === opcodes.PING) {
  320. // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
  321. // response, unless it already received a Close frame.
  322. // A Pong frame sent in response to a Ping frame must have identical
  323. // "Application data"
  324. if (!this.ws[kReceivedClose]) {
  325. const frame = new WebsocketFrameSend(body)
  326. this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG))
  327. if (channels.ping.hasSubscribers) {
  328. channels.ping.publish({
  329. payload: body
  330. })
  331. }
  332. }
  333. } else if (opcode === opcodes.PONG) {
  334. // A Pong frame MAY be sent unsolicited. This serves as a
  335. // unidirectional heartbeat. A response to an unsolicited Pong frame is
  336. // not expected.
  337. if (channels.pong.hasSubscribers) {
  338. channels.pong.publish({
  339. payload: body
  340. })
  341. }
  342. }
  343. return true
  344. }
  345. get closingInfo () {
  346. return this.#info.closeInfo
  347. }
  348. }
  349. module.exports = {
  350. ByteParser
  351. }