connection.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. 'use strict'
  2. var EventEmitter = require('events').EventEmitter
  3. const { parse, serialize } = require('pg-protocol')
  4. const { getStream, getSecureStream } = require('./stream')
  5. const flushBuffer = serialize.flush()
  6. const syncBuffer = serialize.sync()
  7. const endBuffer = serialize.end()
  8. // TODO(bmc) support binary mode at some point
  9. class Connection extends EventEmitter {
  10. constructor(config) {
  11. super()
  12. config = config || {}
  13. this.stream = config.stream || getStream(config.ssl)
  14. if (typeof this.stream === 'function') {
  15. this.stream = this.stream(config)
  16. }
  17. this._keepAlive = config.keepAlive
  18. this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
  19. this.lastBuffer = false
  20. this.parsedStatements = {}
  21. this.ssl = config.ssl || false
  22. this._ending = false
  23. this._emitMessage = false
  24. var self = this
  25. this.on('newListener', function (eventName) {
  26. if (eventName === 'message') {
  27. self._emitMessage = true
  28. }
  29. })
  30. }
  31. connect(port, host) {
  32. var self = this
  33. this._connecting = true
  34. this.stream.setNoDelay(true)
  35. this.stream.connect(port, host)
  36. this.stream.once('connect', function () {
  37. if (self._keepAlive) {
  38. self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
  39. }
  40. self.emit('connect')
  41. })
  42. const reportStreamError = function (error) {
  43. // errors about disconnections should be ignored during disconnect
  44. if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
  45. return
  46. }
  47. self.emit('error', error)
  48. }
  49. this.stream.on('error', reportStreamError)
  50. this.stream.on('close', function () {
  51. self.emit('end')
  52. })
  53. if (!this.ssl) {
  54. return this.attachListeners(this.stream)
  55. }
  56. this.stream.once('data', function (buffer) {
  57. var responseCode = buffer.toString('utf8')
  58. switch (responseCode) {
  59. case 'S': // Server supports SSL connections, continue with a secure connection
  60. break
  61. case 'N': // Server does not support SSL connections
  62. self.stream.end()
  63. return self.emit('error', new Error('The server does not support SSL connections'))
  64. default:
  65. // Any other response byte, including 'E' (ErrorResponse) indicating a server error
  66. self.stream.end()
  67. return self.emit('error', new Error('There was an error establishing an SSL connection'))
  68. }
  69. const options = {
  70. socket: self.stream,
  71. }
  72. if (self.ssl !== true) {
  73. Object.assign(options, self.ssl)
  74. if ('key' in self.ssl) {
  75. options.key = self.ssl.key
  76. }
  77. }
  78. var net = require('net')
  79. if (net.isIP && net.isIP(host) === 0) {
  80. options.servername = host
  81. }
  82. try {
  83. self.stream = getSecureStream(options)
  84. } catch (err) {
  85. return self.emit('error', err)
  86. }
  87. self.attachListeners(self.stream)
  88. self.stream.on('error', reportStreamError)
  89. self.emit('sslconnect')
  90. })
  91. }
  92. attachListeners(stream) {
  93. parse(stream, (msg) => {
  94. var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
  95. if (this._emitMessage) {
  96. this.emit('message', msg)
  97. }
  98. this.emit(eventName, msg)
  99. })
  100. }
  101. requestSsl() {
  102. this.stream.write(serialize.requestSsl())
  103. }
  104. startup(config) {
  105. this.stream.write(serialize.startup(config))
  106. }
  107. cancel(processID, secretKey) {
  108. this._send(serialize.cancel(processID, secretKey))
  109. }
  110. password(password) {
  111. this._send(serialize.password(password))
  112. }
  113. sendSASLInitialResponseMessage(mechanism, initialResponse) {
  114. this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))
  115. }
  116. sendSCRAMClientFinalMessage(additionalData) {
  117. this._send(serialize.sendSCRAMClientFinalMessage(additionalData))
  118. }
  119. _send(buffer) {
  120. if (!this.stream.writable) {
  121. return false
  122. }
  123. return this.stream.write(buffer)
  124. }
  125. query(text) {
  126. this._send(serialize.query(text))
  127. }
  128. // send parse message
  129. parse(query) {
  130. this._send(serialize.parse(query))
  131. }
  132. // send bind message
  133. bind(config) {
  134. this._send(serialize.bind(config))
  135. }
  136. // send execute message
  137. execute(config) {
  138. this._send(serialize.execute(config))
  139. }
  140. flush() {
  141. if (this.stream.writable) {
  142. this.stream.write(flushBuffer)
  143. }
  144. }
  145. sync() {
  146. this._ending = true
  147. this._send(syncBuffer)
  148. }
  149. ref() {
  150. this.stream.ref()
  151. }
  152. unref() {
  153. this.stream.unref()
  154. }
  155. end() {
  156. // 0x58 = 'X'
  157. this._ending = true
  158. if (!this._connecting || !this.stream.writable) {
  159. this.stream.end()
  160. return
  161. }
  162. return this.stream.write(endBuffer, () => {
  163. this.stream.end()
  164. })
  165. }
  166. close(msg) {
  167. this._send(serialize.close(msg))
  168. }
  169. describe(msg) {
  170. this._send(serialize.describe(msg))
  171. }
  172. sendCopyFromChunk(chunk) {
  173. this._send(serialize.copyData(chunk))
  174. }
  175. endCopyFrom() {
  176. this._send(serialize.copyDone())
  177. }
  178. sendCopyFail(msg) {
  179. this._send(serialize.copyFail(msg))
  180. }
  181. }
  182. module.exports = Connection