client.js 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. 'use strict'
  2. // eslint-disable-next-line
  3. var Native
  4. try {
  5. // Wrap this `require()` in a try-catch to avoid upstream bundlers from complaining that this might not be available since it is an optional import
  6. Native = require('pg-native')
  7. } catch (e) {
  8. throw e
  9. }
  10. var TypeOverrides = require('../type-overrides')
  11. var EventEmitter = require('events').EventEmitter
  12. var util = require('util')
  13. var ConnectionParameters = require('../connection-parameters')
  14. var NativeQuery = require('./query')
  15. var Client = (module.exports = function (config) {
  16. EventEmitter.call(this)
  17. config = config || {}
  18. this._Promise = config.Promise || global.Promise
  19. this._types = new TypeOverrides(config.types)
  20. this.native = new Native({
  21. types: this._types,
  22. })
  23. this._queryQueue = []
  24. this._ending = false
  25. this._connecting = false
  26. this._connected = false
  27. this._queryable = true
  28. // keep these on the object for legacy reasons
  29. // for the time being. TODO: deprecate all this jazz
  30. var cp = (this.connectionParameters = new ConnectionParameters(config))
  31. if (config.nativeConnectionString) cp.nativeConnectionString = config.nativeConnectionString
  32. this.user = cp.user
  33. // "hiding" the password so it doesn't show up in stack traces
  34. // or if the client is console.logged
  35. Object.defineProperty(this, 'password', {
  36. configurable: true,
  37. enumerable: false,
  38. writable: true,
  39. value: cp.password,
  40. })
  41. this.database = cp.database
  42. this.host = cp.host
  43. this.port = cp.port
  44. // a hash to hold named queries
  45. this.namedQueries = {}
  46. })
  47. Client.Query = NativeQuery
  48. util.inherits(Client, EventEmitter)
  49. Client.prototype._errorAllQueries = function (err) {
  50. const enqueueError = (query) => {
  51. process.nextTick(() => {
  52. query.native = this.native
  53. query.handleError(err)
  54. })
  55. }
  56. if (this._hasActiveQuery()) {
  57. enqueueError(this._activeQuery)
  58. this._activeQuery = null
  59. }
  60. this._queryQueue.forEach(enqueueError)
  61. this._queryQueue.length = 0
  62. }
  63. // connect to the backend
  64. // pass an optional callback to be called once connected
  65. // or with an error if there was a connection error
  66. Client.prototype._connect = function (cb) {
  67. var self = this
  68. if (this._connecting) {
  69. process.nextTick(() => cb(new Error('Client has already been connected. You cannot reuse a client.')))
  70. return
  71. }
  72. this._connecting = true
  73. this.connectionParameters.getLibpqConnectionString(function (err, conString) {
  74. if (self.connectionParameters.nativeConnectionString) conString = self.connectionParameters.nativeConnectionString
  75. if (err) return cb(err)
  76. self.native.connect(conString, function (err) {
  77. if (err) {
  78. self.native.end()
  79. return cb(err)
  80. }
  81. // set internal states to connected
  82. self._connected = true
  83. // handle connection errors from the native layer
  84. self.native.on('error', function (err) {
  85. self._queryable = false
  86. self._errorAllQueries(err)
  87. self.emit('error', err)
  88. })
  89. self.native.on('notification', function (msg) {
  90. self.emit('notification', {
  91. channel: msg.relname,
  92. payload: msg.extra,
  93. })
  94. })
  95. // signal we are connected now
  96. self.emit('connect')
  97. self._pulseQueryQueue(true)
  98. cb()
  99. })
  100. })
  101. }
  102. Client.prototype.connect = function (callback) {
  103. if (callback) {
  104. this._connect(callback)
  105. return
  106. }
  107. return new this._Promise((resolve, reject) => {
  108. this._connect((error) => {
  109. if (error) {
  110. reject(error)
  111. } else {
  112. resolve()
  113. }
  114. })
  115. })
  116. }
  117. // send a query to the server
  118. // this method is highly overloaded to take
  119. // 1) string query, optional array of parameters, optional function callback
  120. // 2) object query with {
  121. // string query
  122. // optional array values,
  123. // optional function callback instead of as a separate parameter
  124. // optional string name to name & cache the query plan
  125. // optional string rowMode = 'array' for an array of results
  126. // }
  127. Client.prototype.query = function (config, values, callback) {
  128. var query
  129. var result
  130. var readTimeout
  131. var readTimeoutTimer
  132. var queryCallback
  133. if (config === null || config === undefined) {
  134. throw new TypeError('Client was passed a null or undefined query')
  135. } else if (typeof config.submit === 'function') {
  136. readTimeout = config.query_timeout || this.connectionParameters.query_timeout
  137. result = query = config
  138. // accept query(new Query(...), (err, res) => { }) style
  139. if (typeof values === 'function') {
  140. config.callback = values
  141. }
  142. } else {
  143. readTimeout = this.connectionParameters.query_timeout
  144. query = new NativeQuery(config, values, callback)
  145. if (!query.callback) {
  146. let resolveOut, rejectOut
  147. result = new this._Promise((resolve, reject) => {
  148. resolveOut = resolve
  149. rejectOut = reject
  150. }).catch(err => {
  151. Error.captureStackTrace(err);
  152. throw err;
  153. })
  154. query.callback = (err, res) => (err ? rejectOut(err) : resolveOut(res))
  155. }
  156. }
  157. if (readTimeout) {
  158. queryCallback = query.callback
  159. readTimeoutTimer = setTimeout(() => {
  160. var error = new Error('Query read timeout')
  161. process.nextTick(() => {
  162. query.handleError(error, this.connection)
  163. })
  164. queryCallback(error)
  165. // we already returned an error,
  166. // just do nothing if query completes
  167. query.callback = () => {}
  168. // Remove from queue
  169. var index = this._queryQueue.indexOf(query)
  170. if (index > -1) {
  171. this._queryQueue.splice(index, 1)
  172. }
  173. this._pulseQueryQueue()
  174. }, readTimeout)
  175. query.callback = (err, res) => {
  176. clearTimeout(readTimeoutTimer)
  177. queryCallback(err, res)
  178. }
  179. }
  180. if (!this._queryable) {
  181. query.native = this.native
  182. process.nextTick(() => {
  183. query.handleError(new Error('Client has encountered a connection error and is not queryable'))
  184. })
  185. return result
  186. }
  187. if (this._ending) {
  188. query.native = this.native
  189. process.nextTick(() => {
  190. query.handleError(new Error('Client was closed and is not queryable'))
  191. })
  192. return result
  193. }
  194. this._queryQueue.push(query)
  195. this._pulseQueryQueue()
  196. return result
  197. }
  198. // disconnect from the backend server
  199. Client.prototype.end = function (cb) {
  200. var self = this
  201. this._ending = true
  202. if (!this._connected) {
  203. this.once('connect', this.end.bind(this, cb))
  204. }
  205. var result
  206. if (!cb) {
  207. result = new this._Promise(function (resolve, reject) {
  208. cb = (err) => (err ? reject(err) : resolve())
  209. })
  210. }
  211. this.native.end(function () {
  212. self._errorAllQueries(new Error('Connection terminated'))
  213. process.nextTick(() => {
  214. self.emit('end')
  215. if (cb) cb()
  216. })
  217. })
  218. return result
  219. }
  220. Client.prototype._hasActiveQuery = function () {
  221. return this._activeQuery && this._activeQuery.state !== 'error' && this._activeQuery.state !== 'end'
  222. }
  223. Client.prototype._pulseQueryQueue = function (initialConnection) {
  224. if (!this._connected) {
  225. return
  226. }
  227. if (this._hasActiveQuery()) {
  228. return
  229. }
  230. var query = this._queryQueue.shift()
  231. if (!query) {
  232. if (!initialConnection) {
  233. this.emit('drain')
  234. }
  235. return
  236. }
  237. this._activeQuery = query
  238. query.submit(this)
  239. var self = this
  240. query.once('_done', function () {
  241. self._pulseQueryQueue()
  242. })
  243. }
  244. // attempt to cancel an in-progress query
  245. Client.prototype.cancel = function (query) {
  246. if (this._activeQuery === query) {
  247. this.native.cancel(function () {})
  248. } else if (this._queryQueue.indexOf(query) !== -1) {
  249. this._queryQueue.splice(this._queryQueue.indexOf(query), 1)
  250. }
  251. }
  252. Client.prototype.ref = function () {}
  253. Client.prototype.unref = function () {}
  254. Client.prototype.setTypeParser = function (oid, format, parseFn) {
  255. return this._types.setTypeParser(oid, format, parseFn)
  256. }
  257. Client.prototype.getTypeParser = function (oid, format) {
  258. return this._types.getTypeParser(oid, format)
  259. }