index.ts 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import { SocketOptions, Socket, TlsOptions } from 'cloudflare:sockets'
  2. import { EventEmitter } from 'events'
  3. /**
  4. * Wrapper around the Cloudflare built-in socket that can be used by the `Connection`.
  5. */
  6. export class CloudflareSocket extends EventEmitter {
  7. writable = false
  8. destroyed = false
  9. private _upgrading = false
  10. private _upgraded = false
  11. private _cfSocket: Socket | null = null
  12. private _cfWriter: WritableStreamDefaultWriter | null = null
  13. private _cfReader: ReadableStreamDefaultReader | null = null
  14. constructor(readonly ssl: boolean) {
  15. super()
  16. }
  17. setNoDelay() {
  18. return this
  19. }
  20. setKeepAlive() {
  21. return this
  22. }
  23. ref() {
  24. return this
  25. }
  26. unref() {
  27. return this
  28. }
  29. async connect(port: number, host: string, connectListener?: (...args: unknown[]) => void) {
  30. try {
  31. log('connecting')
  32. if (connectListener) this.once('connect', connectListener)
  33. const options: SocketOptions = this.ssl ? { secureTransport: 'starttls' } : {}
  34. const { connect } = await import('cloudflare:sockets')
  35. this._cfSocket = connect(`${host}:${port}`, options)
  36. this._cfWriter = this._cfSocket.writable.getWriter()
  37. this._addClosedHandler()
  38. this._cfReader = this._cfSocket.readable.getReader()
  39. if (this.ssl) {
  40. this._listenOnce().catch((e) => this.emit('error', e))
  41. } else {
  42. this._listen().catch((e) => this.emit('error', e))
  43. }
  44. await this._cfWriter!.ready
  45. log('socket ready')
  46. this.writable = true
  47. this.emit('connect')
  48. return this
  49. } catch (e) {
  50. this.emit('error', e)
  51. }
  52. }
  53. async _listen() {
  54. while (true) {
  55. log('awaiting receive from CF socket')
  56. const { done, value } = await this._cfReader!.read()
  57. log('CF socket received:', done, value)
  58. if (done) {
  59. log('done')
  60. break
  61. }
  62. this.emit('data', Buffer.from(value))
  63. }
  64. }
  65. async _listenOnce() {
  66. log('awaiting first receive from CF socket')
  67. const { done, value } = await this._cfReader!.read()
  68. log('First CF socket received:', done, value)
  69. this.emit('data', Buffer.from(value))
  70. }
  71. write(
  72. data: Uint8Array | string,
  73. encoding: BufferEncoding = 'utf8',
  74. callback: (...args: unknown[]) => void = () => {}
  75. ) {
  76. if (data.length === 0) return callback()
  77. if (typeof data === 'string') data = Buffer.from(data, encoding)
  78. log('sending data direct:', data)
  79. this._cfWriter!.write(data).then(
  80. () => {
  81. log('data sent')
  82. callback()
  83. },
  84. (err) => {
  85. log('send error', err)
  86. callback(err)
  87. }
  88. )
  89. return true
  90. }
  91. end(data = Buffer.alloc(0), encoding: BufferEncoding = 'utf8', callback: (...args: unknown[]) => void = () => {}) {
  92. log('ending CF socket')
  93. this.write(data, encoding, (err) => {
  94. this._cfSocket!.close()
  95. if (callback) callback(err)
  96. })
  97. return this
  98. }
  99. destroy(reason: string) {
  100. log('destroying CF socket', reason)
  101. this.destroyed = true
  102. return this.end()
  103. }
  104. startTls(options: TlsOptions) {
  105. if (this._upgraded) {
  106. // Don't try to upgrade again.
  107. this.emit('error', 'Cannot call `startTls()` more than once on a socket')
  108. return
  109. }
  110. this._cfWriter!.releaseLock()
  111. this._cfReader!.releaseLock()
  112. this._upgrading = true
  113. this._cfSocket = this._cfSocket!.startTls(options)
  114. this._cfWriter = this._cfSocket.writable.getWriter()
  115. this._cfReader = this._cfSocket.readable.getReader()
  116. this._addClosedHandler()
  117. this._listen().catch((e) => this.emit('error', e))
  118. }
  119. _addClosedHandler() {
  120. this._cfSocket!.closed.then(() => {
  121. if (!this._upgrading) {
  122. log('CF socket closed')
  123. this._cfSocket = null
  124. this.emit('close')
  125. } else {
  126. this._upgrading = false
  127. this._upgraded = true
  128. }
  129. }).catch((e) => this.emit('error', e))
  130. }
  131. }
  132. const debug = false
  133. function dump(data: unknown) {
  134. if (data instanceof Uint8Array || data instanceof ArrayBuffer) {
  135. const hex = Buffer.from(data).toString('hex')
  136. const str = new TextDecoder().decode(data)
  137. return `\n>>> STR: "${str.replace(/\n/g, '\\n')}"\n>>> HEX: ${hex}\n`
  138. } else {
  139. return data
  140. }
  141. }
  142. function log(...args: unknown[]) {
  143. debug && console.log(...args.map(dump))
  144. }