connection.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. 'use strict'
  2. var net = require('net')
  3. var EventEmitter = require('events').EventEmitter
  4. const { parse, serialize } = require('pg-protocol')
  5. const { getStream, getSecureStream } = require('./stream')
  6. const flushBuffer = serialize.flush()
  7. const syncBuffer = serialize.sync()
  8. const endBuffer = serialize.end()
  9. // TODO(bmc) support binary mode at some point
  10. class Connection extends EventEmitter {
  11. constructor(config) {
  12. super()
  13. config = config || {}
  14. this.stream = config.stream || getStream(config.ssl)
  15. if (typeof this.stream === 'function') {
  16. this.stream = this.stream(config)
  17. }
  18. this._keepAlive = config.keepAlive
  19. this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
  20. this.lastBuffer = false
  21. this.parsedStatements = {}
  22. this.ssl = config.ssl || false
  23. this._ending = false
  24. this._emitMessage = false
  25. var self = this
  26. this.on('newListener', function (eventName) {
  27. if (eventName === 'message') {
  28. self._emitMessage = true
  29. }
  30. })
  31. }
  32. connect(port, host) {
  33. var self = this
  34. this._connecting = true
  35. this.stream.setNoDelay(true)
  36. this.stream.connect(port, host)
  37. this.stream.once('connect', function () {
  38. if (self._keepAlive) {
  39. self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
  40. }
  41. self.emit('connect')
  42. })
  43. const reportStreamError = function (error) {
  44. // errors about disconnections should be ignored during disconnect
  45. if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
  46. return
  47. }
  48. self.emit('error', error)
  49. }
  50. this.stream.on('error', reportStreamError)
  51. this.stream.on('close', function () {
  52. self.emit('end')
  53. })
  54. if (!this.ssl) {
  55. return this.attachListeners(this.stream)
  56. }
  57. this.stream.once('data', function (buffer) {
  58. var responseCode = buffer.toString('utf8')
  59. switch (responseCode) {
  60. case 'S': // Server supports SSL connections, continue with a secure connection
  61. break
  62. case 'N': // Server does not support SSL connections
  63. self.stream.end()
  64. return self.emit('error', new Error('The server does not support SSL connections'))
  65. default:
  66. // Any other response byte, including 'E' (ErrorResponse) indicating a server error
  67. self.stream.end()
  68. return self.emit('error', new Error('There was an error establishing an SSL connection'))
  69. }
  70. const options = {
  71. socket: self.stream,
  72. }
  73. if (self.ssl !== true) {
  74. Object.assign(options, self.ssl)
  75. if ('key' in self.ssl) {
  76. options.key = self.ssl.key
  77. }
  78. }
  79. var net = require('net')
  80. if (net.isIP && net.isIP(host) === 0) {
  81. options.servername = host
  82. }
  83. try {
  84. self.stream = getSecureStream(options)
  85. } catch (err) {
  86. return self.emit('error', err)
  87. }
  88. self.attachListeners(self.stream)
  89. self.stream.on('error', reportStreamError)
  90. self.emit('sslconnect')
  91. })
  92. }
  93. attachListeners(stream) {
  94. parse(stream, (msg) => {
  95. var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
  96. if (this._emitMessage) {
  97. this.emit('message', msg)
  98. }
  99. this.emit(eventName, msg)
  100. })
  101. }
  102. requestSsl() {
  103. this.stream.write(serialize.requestSsl())
  104. }
  105. startup(config) {
  106. this.stream.write(serialize.startup(config))
  107. }
  108. cancel(processID, secretKey) {
  109. this._send(serialize.cancel(processID, secretKey))
  110. }
  111. password(password) {
  112. this._send(serialize.password(password))
  113. }
  114. sendSASLInitialResponseMessage(mechanism, initialResponse) {
  115. this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))
  116. }
  117. sendSCRAMClientFinalMessage(additionalData) {
  118. this._send(serialize.sendSCRAMClientFinalMessage(additionalData))
  119. }
  120. _send(buffer) {
  121. if (!this.stream.writable) {
  122. return false
  123. }
  124. return this.stream.write(buffer)
  125. }
  126. query(text) {
  127. this._send(serialize.query(text))
  128. }
  129. // send parse message
  130. parse(query) {
  131. this._send(serialize.parse(query))
  132. }
  133. // send bind message
  134. bind(config) {
  135. this._send(serialize.bind(config))
  136. }
  137. // send execute message
  138. execute(config) {
  139. this._send(serialize.execute(config))
  140. }
  141. flush() {
  142. if (this.stream.writable) {
  143. this.stream.write(flushBuffer)
  144. }
  145. }
  146. sync() {
  147. this._ending = true
  148. this._send(syncBuffer)
  149. }
  150. ref() {
  151. this.stream.ref()
  152. }
  153. unref() {
  154. this.stream.unref()
  155. }
  156. end() {
  157. // 0x58 = 'X'
  158. this._ending = true
  159. if (!this._connecting || !this.stream.writable) {
  160. this.stream.end()
  161. return
  162. }
  163. return this.stream.write(endBuffer, () => {
  164. this.stream.end()
  165. })
  166. }
  167. close(msg) {
  168. this._send(serialize.close(msg))
  169. }
  170. describe(msg) {
  171. this._send(serialize.describe(msg))
  172. }
  173. sendCopyFromChunk(chunk) {
  174. this._send(serialize.copyData(chunk))
  175. }
  176. endCopyFrom() {
  177. this._send(serialize.copyDone())
  178. }
  179. sendCopyFail(msg) {
  180. this._send(serialize.copyFail(msg))
  181. }
  182. }
  183. module.exports = Connection