sender.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. /* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Duplex" }] */
  2. 'use strict';
  3. const { Duplex } = require('stream');
  4. const { randomFillSync } = require('crypto');
  5. const PerMessageDeflate = require('./permessage-deflate');
  6. const { EMPTY_BUFFER, kWebSocket, NOOP } = require('./constants');
  7. const { isBlob, isValidStatusCode } = require('./validation');
  8. const { mask: applyMask, toBuffer } = require('./buffer-util');
  9. const kByteLength = Symbol('kByteLength');
  10. const maskBuffer = Buffer.alloc(4);
  11. const RANDOM_POOL_SIZE = 8 * 1024;
  12. let randomPool;
  13. let randomPoolPointer = RANDOM_POOL_SIZE;
  14. const DEFAULT = 0;
  15. const DEFLATING = 1;
  16. const GET_BLOB_DATA = 2;
  17. /**
  18. * HyBi Sender implementation.
  19. */
  20. class Sender {
  21. /**
  22. * Creates a Sender instance.
  23. *
  24. * @param {Duplex} socket The connection socket
  25. * @param {Object} [extensions] An object containing the negotiated extensions
  26. * @param {Function} [generateMask] The function used to generate the masking
  27. * key
  28. */
  29. constructor(socket, extensions, generateMask) {
  30. this._extensions = extensions || {};
  31. if (generateMask) {
  32. this._generateMask = generateMask;
  33. this._maskBuffer = Buffer.alloc(4);
  34. }
  35. this._socket = socket;
  36. this._firstFragment = true;
  37. this._compress = false;
  38. this._bufferedBytes = 0;
  39. this._queue = [];
  40. this._state = DEFAULT;
  41. this.onerror = NOOP;
  42. this[kWebSocket] = undefined;
  43. }
  44. /**
  45. * Frames a piece of data according to the HyBi WebSocket protocol.
  46. *
  47. * @param {(Buffer|String)} data The data to frame
  48. * @param {Object} options Options object
  49. * @param {Boolean} [options.fin=false] Specifies whether or not to set the
  50. * FIN bit
  51. * @param {Function} [options.generateMask] The function used to generate the
  52. * masking key
  53. * @param {Boolean} [options.mask=false] Specifies whether or not to mask
  54. * `data`
  55. * @param {Buffer} [options.maskBuffer] The buffer used to store the masking
  56. * key
  57. * @param {Number} options.opcode The opcode
  58. * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
  59. * modified
  60. * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
  61. * RSV1 bit
  62. * @return {(Buffer|String)[]} The framed data
  63. * @public
  64. */
  65. static frame(data, options) {
  66. let mask;
  67. let merge = false;
  68. let offset = 2;
  69. let skipMasking = false;
  70. if (options.mask) {
  71. mask = options.maskBuffer || maskBuffer;
  72. if (options.generateMask) {
  73. options.generateMask(mask);
  74. } else {
  75. if (randomPoolPointer === RANDOM_POOL_SIZE) {
  76. /* istanbul ignore else */
  77. if (randomPool === undefined) {
  78. //
  79. // This is lazily initialized because server-sent frames must not
  80. // be masked so it may never be used.
  81. //
  82. randomPool = Buffer.alloc(RANDOM_POOL_SIZE);
  83. }
  84. randomFillSync(randomPool, 0, RANDOM_POOL_SIZE);
  85. randomPoolPointer = 0;
  86. }
  87. mask[0] = randomPool[randomPoolPointer++];
  88. mask[1] = randomPool[randomPoolPointer++];
  89. mask[2] = randomPool[randomPoolPointer++];
  90. mask[3] = randomPool[randomPoolPointer++];
  91. }
  92. skipMasking = (mask[0] | mask[1] | mask[2] | mask[3]) === 0;
  93. offset = 6;
  94. }
  95. let dataLength;
  96. if (typeof data === 'string') {
  97. if (
  98. (!options.mask || skipMasking) &&
  99. options[kByteLength] !== undefined
  100. ) {
  101. dataLength = options[kByteLength];
  102. } else {
  103. data = Buffer.from(data);
  104. dataLength = data.length;
  105. }
  106. } else {
  107. dataLength = data.length;
  108. merge = options.mask && options.readOnly && !skipMasking;
  109. }
  110. let payloadLength = dataLength;
  111. if (dataLength >= 65536) {
  112. offset += 8;
  113. payloadLength = 127;
  114. } else if (dataLength > 125) {
  115. offset += 2;
  116. payloadLength = 126;
  117. }
  118. const target = Buffer.allocUnsafe(merge ? dataLength + offset : offset);
  119. target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
  120. if (options.rsv1) target[0] |= 0x40;
  121. target[1] = payloadLength;
  122. if (payloadLength === 126) {
  123. target.writeUInt16BE(dataLength, 2);
  124. } else if (payloadLength === 127) {
  125. target[2] = target[3] = 0;
  126. target.writeUIntBE(dataLength, 4, 6);
  127. }
  128. if (!options.mask) return [target, data];
  129. target[1] |= 0x80;
  130. target[offset - 4] = mask[0];
  131. target[offset - 3] = mask[1];
  132. target[offset - 2] = mask[2];
  133. target[offset - 1] = mask[3];
  134. if (skipMasking) return [target, data];
  135. if (merge) {
  136. applyMask(data, mask, target, offset, dataLength);
  137. return [target];
  138. }
  139. applyMask(data, mask, data, 0, dataLength);
  140. return [target, data];
  141. }
  142. /**
  143. * Sends a close message to the other peer.
  144. *
  145. * @param {Number} [code] The status code component of the body
  146. * @param {(String|Buffer)} [data] The message component of the body
  147. * @param {Boolean} [mask=false] Specifies whether or not to mask the message
  148. * @param {Function} [cb] Callback
  149. * @public
  150. */
  151. close(code, data, mask, cb) {
  152. let buf;
  153. if (code === undefined) {
  154. buf = EMPTY_BUFFER;
  155. } else if (typeof code !== 'number' || !isValidStatusCode(code)) {
  156. throw new TypeError('First argument must be a valid error code number');
  157. } else if (data === undefined || !data.length) {
  158. buf = Buffer.allocUnsafe(2);
  159. buf.writeUInt16BE(code, 0);
  160. } else {
  161. const length = Buffer.byteLength(data);
  162. if (length > 123) {
  163. throw new RangeError('The message must not be greater than 123 bytes');
  164. }
  165. buf = Buffer.allocUnsafe(2 + length);
  166. buf.writeUInt16BE(code, 0);
  167. if (typeof data === 'string') {
  168. buf.write(data, 2);
  169. } else {
  170. buf.set(data, 2);
  171. }
  172. }
  173. const options = {
  174. [kByteLength]: buf.length,
  175. fin: true,
  176. generateMask: this._generateMask,
  177. mask,
  178. maskBuffer: this._maskBuffer,
  179. opcode: 0x08,
  180. readOnly: false,
  181. rsv1: false
  182. };
  183. if (this._state !== DEFAULT) {
  184. this.enqueue([this.dispatch, buf, false, options, cb]);
  185. } else {
  186. this.sendFrame(Sender.frame(buf, options), cb);
  187. }
  188. }
  189. /**
  190. * Sends a ping message to the other peer.
  191. *
  192. * @param {*} data The message to send
  193. * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
  194. * @param {Function} [cb] Callback
  195. * @public
  196. */
  197. ping(data, mask, cb) {
  198. let byteLength;
  199. let readOnly;
  200. if (typeof data === 'string') {
  201. byteLength = Buffer.byteLength(data);
  202. readOnly = false;
  203. } else if (isBlob(data)) {
  204. byteLength = data.size;
  205. readOnly = false;
  206. } else {
  207. data = toBuffer(data);
  208. byteLength = data.length;
  209. readOnly = toBuffer.readOnly;
  210. }
  211. if (byteLength > 125) {
  212. throw new RangeError('The data size must not be greater than 125 bytes');
  213. }
  214. const options = {
  215. [kByteLength]: byteLength,
  216. fin: true,
  217. generateMask: this._generateMask,
  218. mask,
  219. maskBuffer: this._maskBuffer,
  220. opcode: 0x09,
  221. readOnly,
  222. rsv1: false
  223. };
  224. if (isBlob(data)) {
  225. if (this._state !== DEFAULT) {
  226. this.enqueue([this.getBlobData, data, false, options, cb]);
  227. } else {
  228. this.getBlobData(data, false, options, cb);
  229. }
  230. } else if (this._state !== DEFAULT) {
  231. this.enqueue([this.dispatch, data, false, options, cb]);
  232. } else {
  233. this.sendFrame(Sender.frame(data, options), cb);
  234. }
  235. }
  236. /**
  237. * Sends a pong message to the other peer.
  238. *
  239. * @param {*} data The message to send
  240. * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
  241. * @param {Function} [cb] Callback
  242. * @public
  243. */
  244. pong(data, mask, cb) {
  245. let byteLength;
  246. let readOnly;
  247. if (typeof data === 'string') {
  248. byteLength = Buffer.byteLength(data);
  249. readOnly = false;
  250. } else if (isBlob(data)) {
  251. byteLength = data.size;
  252. readOnly = false;
  253. } else {
  254. data = toBuffer(data);
  255. byteLength = data.length;
  256. readOnly = toBuffer.readOnly;
  257. }
  258. if (byteLength > 125) {
  259. throw new RangeError('The data size must not be greater than 125 bytes');
  260. }
  261. const options = {
  262. [kByteLength]: byteLength,
  263. fin: true,
  264. generateMask: this._generateMask,
  265. mask,
  266. maskBuffer: this._maskBuffer,
  267. opcode: 0x0a,
  268. readOnly,
  269. rsv1: false
  270. };
  271. if (isBlob(data)) {
  272. if (this._state !== DEFAULT) {
  273. this.enqueue([this.getBlobData, data, false, options, cb]);
  274. } else {
  275. this.getBlobData(data, false, options, cb);
  276. }
  277. } else if (this._state !== DEFAULT) {
  278. this.enqueue([this.dispatch, data, false, options, cb]);
  279. } else {
  280. this.sendFrame(Sender.frame(data, options), cb);
  281. }
  282. }
  283. /**
  284. * Sends a data message to the other peer.
  285. *
  286. * @param {*} data The message to send
  287. * @param {Object} options Options object
  288. * @param {Boolean} [options.binary=false] Specifies whether `data` is binary
  289. * or text
  290. * @param {Boolean} [options.compress=false] Specifies whether or not to
  291. * compress `data`
  292. * @param {Boolean} [options.fin=false] Specifies whether the fragment is the
  293. * last one
  294. * @param {Boolean} [options.mask=false] Specifies whether or not to mask
  295. * `data`
  296. * @param {Function} [cb] Callback
  297. * @public
  298. */
  299. send(data, options, cb) {
  300. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  301. let opcode = options.binary ? 2 : 1;
  302. let rsv1 = options.compress;
  303. let byteLength;
  304. let readOnly;
  305. if (typeof data === 'string') {
  306. byteLength = Buffer.byteLength(data);
  307. readOnly = false;
  308. } else if (isBlob(data)) {
  309. byteLength = data.size;
  310. readOnly = false;
  311. } else {
  312. data = toBuffer(data);
  313. byteLength = data.length;
  314. readOnly = toBuffer.readOnly;
  315. }
  316. if (this._firstFragment) {
  317. this._firstFragment = false;
  318. if (
  319. rsv1 &&
  320. perMessageDeflate &&
  321. perMessageDeflate.params[
  322. perMessageDeflate._isServer
  323. ? 'server_no_context_takeover'
  324. : 'client_no_context_takeover'
  325. ]
  326. ) {
  327. rsv1 = byteLength >= perMessageDeflate._threshold;
  328. }
  329. this._compress = rsv1;
  330. } else {
  331. rsv1 = false;
  332. opcode = 0;
  333. }
  334. if (options.fin) this._firstFragment = true;
  335. const opts = {
  336. [kByteLength]: byteLength,
  337. fin: options.fin,
  338. generateMask: this._generateMask,
  339. mask: options.mask,
  340. maskBuffer: this._maskBuffer,
  341. opcode,
  342. readOnly,
  343. rsv1
  344. };
  345. if (isBlob(data)) {
  346. if (this._state !== DEFAULT) {
  347. this.enqueue([this.getBlobData, data, this._compress, opts, cb]);
  348. } else {
  349. this.getBlobData(data, this._compress, opts, cb);
  350. }
  351. } else if (this._state !== DEFAULT) {
  352. this.enqueue([this.dispatch, data, this._compress, opts, cb]);
  353. } else {
  354. this.dispatch(data, this._compress, opts, cb);
  355. }
  356. }
  357. /**
  358. * Gets the contents of a blob as binary data.
  359. *
  360. * @param {Blob} blob The blob
  361. * @param {Boolean} [compress=false] Specifies whether or not to compress
  362. * the data
  363. * @param {Object} options Options object
  364. * @param {Boolean} [options.fin=false] Specifies whether or not to set the
  365. * FIN bit
  366. * @param {Function} [options.generateMask] The function used to generate the
  367. * masking key
  368. * @param {Boolean} [options.mask=false] Specifies whether or not to mask
  369. * `data`
  370. * @param {Buffer} [options.maskBuffer] The buffer used to store the masking
  371. * key
  372. * @param {Number} options.opcode The opcode
  373. * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
  374. * modified
  375. * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
  376. * RSV1 bit
  377. * @param {Function} [cb] Callback
  378. * @private
  379. */
  380. getBlobData(blob, compress, options, cb) {
  381. this._bufferedBytes += options[kByteLength];
  382. this._state = GET_BLOB_DATA;
  383. blob
  384. .arrayBuffer()
  385. .then((arrayBuffer) => {
  386. if (this._socket.destroyed) {
  387. const err = new Error(
  388. 'The socket was closed while the blob was being read'
  389. );
  390. //
  391. // `callCallbacks` is called in the next tick to ensure that errors
  392. // that might be thrown in the callbacks behave like errors thrown
  393. // outside the promise chain.
  394. //
  395. process.nextTick(callCallbacks, this, err, cb);
  396. return;
  397. }
  398. this._bufferedBytes -= options[kByteLength];
  399. const data = toBuffer(arrayBuffer);
  400. if (!compress) {
  401. this._state = DEFAULT;
  402. this.sendFrame(Sender.frame(data, options), cb);
  403. this.dequeue();
  404. } else {
  405. this.dispatch(data, compress, options, cb);
  406. }
  407. })
  408. .catch((err) => {
  409. //
  410. // `onError` is called in the next tick for the same reason that
  411. // `callCallbacks` above is.
  412. //
  413. process.nextTick(onError, this, err, cb);
  414. });
  415. }
  416. /**
  417. * Dispatches a message.
  418. *
  419. * @param {(Buffer|String)} data The message to send
  420. * @param {Boolean} [compress=false] Specifies whether or not to compress
  421. * `data`
  422. * @param {Object} options Options object
  423. * @param {Boolean} [options.fin=false] Specifies whether or not to set the
  424. * FIN bit
  425. * @param {Function} [options.generateMask] The function used to generate the
  426. * masking key
  427. * @param {Boolean} [options.mask=false] Specifies whether or not to mask
  428. * `data`
  429. * @param {Buffer} [options.maskBuffer] The buffer used to store the masking
  430. * key
  431. * @param {Number} options.opcode The opcode
  432. * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
  433. * modified
  434. * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
  435. * RSV1 bit
  436. * @param {Function} [cb] Callback
  437. * @private
  438. */
  439. dispatch(data, compress, options, cb) {
  440. if (!compress) {
  441. this.sendFrame(Sender.frame(data, options), cb);
  442. return;
  443. }
  444. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  445. this._bufferedBytes += options[kByteLength];
  446. this._state = DEFLATING;
  447. perMessageDeflate.compress(data, options.fin, (_, buf) => {
  448. if (this._socket.destroyed) {
  449. const err = new Error(
  450. 'The socket was closed while data was being compressed'
  451. );
  452. callCallbacks(this, err, cb);
  453. return;
  454. }
  455. this._bufferedBytes -= options[kByteLength];
  456. this._state = DEFAULT;
  457. options.readOnly = false;
  458. this.sendFrame(Sender.frame(buf, options), cb);
  459. this.dequeue();
  460. });
  461. }
  462. /**
  463. * Executes queued send operations.
  464. *
  465. * @private
  466. */
  467. dequeue() {
  468. while (this._state === DEFAULT && this._queue.length) {
  469. const params = this._queue.shift();
  470. this._bufferedBytes -= params[3][kByteLength];
  471. Reflect.apply(params[0], this, params.slice(1));
  472. }
  473. }
  474. /**
  475. * Enqueues a send operation.
  476. *
  477. * @param {Array} params Send operation parameters.
  478. * @private
  479. */
  480. enqueue(params) {
  481. this._bufferedBytes += params[3][kByteLength];
  482. this._queue.push(params);
  483. }
  484. /**
  485. * Sends a frame.
  486. *
  487. * @param {Buffer[]} list The frame to send
  488. * @param {Function} [cb] Callback
  489. * @private
  490. */
  491. sendFrame(list, cb) {
  492. if (list.length === 2) {
  493. this._socket.cork();
  494. this._socket.write(list[0]);
  495. this._socket.write(list[1], cb);
  496. this._socket.uncork();
  497. } else {
  498. this._socket.write(list[0], cb);
  499. }
  500. }
  501. }
  502. module.exports = Sender;
  503. /**
  504. * Calls queued callbacks with an error.
  505. *
  506. * @param {Sender} sender The `Sender` instance
  507. * @param {Error} err The error to call the callbacks with
  508. * @param {Function} [cb] The first callback
  509. * @private
  510. */
  511. function callCallbacks(sender, err, cb) {
  512. if (typeof cb === 'function') cb(err);
  513. for (let i = 0; i < sender._queue.length; i++) {
  514. const params = sender._queue[i];
  515. const callback = params[params.length - 1];
  516. if (typeof callback === 'function') callback(err);
  517. }
  518. }
  519. /**
  520. * Handles a `Sender` error.
  521. *
  522. * @param {Sender} sender The `Sender` instance
  523. * @param {Error} err The error
  524. * @param {Function} [cb] The first pending callback
  525. * @private
  526. */
  527. function onError(sender, err, cb) {
  528. callCallbacks(sender, err, cb);
  529. sender.onerror(err);
  530. }