sender.js 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. 'use strict'
  2. const { WebsocketFrameSend } = require('./frame')
  3. const { opcodes, sendHints } = require('./constants')
  4. const FixedQueue = require('../../dispatcher/fixed-queue')
  5. /** @type {typeof Uint8Array} */
  6. const FastBuffer = Buffer[Symbol.species]
  7. /**
  8. * @typedef {object} SendQueueNode
  9. * @property {Promise<void> | null} promise
  10. * @property {((...args: any[]) => any)} callback
  11. * @property {Buffer | null} frame
  12. */
  13. class SendQueue {
  14. /**
  15. * @type {FixedQueue}
  16. */
  17. #queue = new FixedQueue()
  18. /**
  19. * @type {boolean}
  20. */
  21. #running = false
  22. /** @type {import('node:net').Socket} */
  23. #socket
  24. constructor (socket) {
  25. this.#socket = socket
  26. }
  27. add (item, cb, hint) {
  28. if (hint !== sendHints.blob) {
  29. const frame = createFrame(item, hint)
  30. if (!this.#running) {
  31. // fast-path
  32. this.#socket.write(frame, cb)
  33. } else {
  34. /** @type {SendQueueNode} */
  35. const node = {
  36. promise: null,
  37. callback: cb,
  38. frame
  39. }
  40. this.#queue.push(node)
  41. }
  42. return
  43. }
  44. /** @type {SendQueueNode} */
  45. const node = {
  46. promise: item.arrayBuffer().then((ab) => {
  47. node.promise = null
  48. node.frame = createFrame(ab, hint)
  49. }),
  50. callback: cb,
  51. frame: null
  52. }
  53. this.#queue.push(node)
  54. if (!this.#running) {
  55. this.#run()
  56. }
  57. }
  58. async #run () {
  59. this.#running = true
  60. const queue = this.#queue
  61. while (!queue.isEmpty()) {
  62. const node = queue.shift()
  63. // wait pending promise
  64. if (node.promise !== null) {
  65. await node.promise
  66. }
  67. // write
  68. this.#socket.write(node.frame, node.callback)
  69. // cleanup
  70. node.callback = node.frame = null
  71. }
  72. this.#running = false
  73. }
  74. }
  75. function createFrame (data, hint) {
  76. return new WebsocketFrameSend(toBuffer(data, hint)).createFrame(hint === sendHints.string ? opcodes.TEXT : opcodes.BINARY)
  77. }
  78. function toBuffer (data, hint) {
  79. switch (hint) {
  80. case sendHints.string:
  81. return Buffer.from(data)
  82. case sendHints.arrayBuffer:
  83. case sendHints.blob:
  84. return new FastBuffer(data)
  85. case sendHints.typedArray:
  86. return new FastBuffer(data.buffer, data.byteOffset, data.byteLength)
  87. }
  88. }
  89. module.exports = { SendQueue }