123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- 'use strict'
- // eslint-disable-next-line
- var Native
- try {
- // Wrap this `require()` in a try-catch to avoid upstream bundlers from complaining that this might not be available since it is an optional import
- Native = require('pg-native')
- } catch (e) {
- throw e
- }
- var TypeOverrides = require('../type-overrides')
- var EventEmitter = require('events').EventEmitter
- var util = require('util')
- var ConnectionParameters = require('../connection-parameters')
- var NativeQuery = require('./query')
- var Client = (module.exports = function (config) {
- EventEmitter.call(this)
- config = config || {}
- this._Promise = config.Promise || global.Promise
- this._types = new TypeOverrides(config.types)
- this.native = new Native({
- types: this._types,
- })
- this._queryQueue = []
- this._ending = false
- this._connecting = false
- this._connected = false
- this._queryable = true
- // keep these on the object for legacy reasons
- // for the time being. TODO: deprecate all this jazz
- var cp = (this.connectionParameters = new ConnectionParameters(config))
- if (config.nativeConnectionString) cp.nativeConnectionString = config.nativeConnectionString
- this.user = cp.user
- // "hiding" the password so it doesn't show up in stack traces
- // or if the client is console.logged
- Object.defineProperty(this, 'password', {
- configurable: true,
- enumerable: false,
- writable: true,
- value: cp.password,
- })
- this.database = cp.database
- this.host = cp.host
- this.port = cp.port
- // a hash to hold named queries
- this.namedQueries = {}
- })
- Client.Query = NativeQuery
- util.inherits(Client, EventEmitter)
- Client.prototype._errorAllQueries = function (err) {
- const enqueueError = (query) => {
- process.nextTick(() => {
- query.native = this.native
- query.handleError(err)
- })
- }
- if (this._hasActiveQuery()) {
- enqueueError(this._activeQuery)
- this._activeQuery = null
- }
- this._queryQueue.forEach(enqueueError)
- this._queryQueue.length = 0
- }
- // connect to the backend
- // pass an optional callback to be called once connected
- // or with an error if there was a connection error
- Client.prototype._connect = function (cb) {
- var self = this
- if (this._connecting) {
- process.nextTick(() => cb(new Error('Client has already been connected. You cannot reuse a client.')))
- return
- }
- this._connecting = true
- this.connectionParameters.getLibpqConnectionString(function (err, conString) {
- if (self.connectionParameters.nativeConnectionString) conString = self.connectionParameters.nativeConnectionString
- if (err) return cb(err)
- self.native.connect(conString, function (err) {
- if (err) {
- self.native.end()
- return cb(err)
- }
- // set internal states to connected
- self._connected = true
- // handle connection errors from the native layer
- self.native.on('error', function (err) {
- self._queryable = false
- self._errorAllQueries(err)
- self.emit('error', err)
- })
- self.native.on('notification', function (msg) {
- self.emit('notification', {
- channel: msg.relname,
- payload: msg.extra,
- })
- })
- // signal we are connected now
- self.emit('connect')
- self._pulseQueryQueue(true)
- cb()
- })
- })
- }
- Client.prototype.connect = function (callback) {
- if (callback) {
- this._connect(callback)
- return
- }
- return new this._Promise((resolve, reject) => {
- this._connect((error) => {
- if (error) {
- reject(error)
- } else {
- resolve()
- }
- })
- })
- }
- // send a query to the server
- // this method is highly overloaded to take
- // 1) string query, optional array of parameters, optional function callback
- // 2) object query with {
- // string query
- // optional array values,
- // optional function callback instead of as a separate parameter
- // optional string name to name & cache the query plan
- // optional string rowMode = 'array' for an array of results
- // }
- Client.prototype.query = function (config, values, callback) {
- var query
- var result
- var readTimeout
- var readTimeoutTimer
- var queryCallback
- if (config === null || config === undefined) {
- throw new TypeError('Client was passed a null or undefined query')
- } else if (typeof config.submit === 'function') {
- readTimeout = config.query_timeout || this.connectionParameters.query_timeout
- result = query = config
- // accept query(new Query(...), (err, res) => { }) style
- if (typeof values === 'function') {
- config.callback = values
- }
- } else {
- readTimeout = this.connectionParameters.query_timeout
- query = new NativeQuery(config, values, callback)
- if (!query.callback) {
- let resolveOut, rejectOut
- result = new this._Promise((resolve, reject) => {
- resolveOut = resolve
- rejectOut = reject
- }).catch(err => {
- Error.captureStackTrace(err);
- throw err;
- })
- query.callback = (err, res) => (err ? rejectOut(err) : resolveOut(res))
- }
- }
- if (readTimeout) {
- queryCallback = query.callback
- readTimeoutTimer = setTimeout(() => {
- var error = new Error('Query read timeout')
- process.nextTick(() => {
- query.handleError(error, this.connection)
- })
- queryCallback(error)
- // we already returned an error,
- // just do nothing if query completes
- query.callback = () => {}
- // Remove from queue
- var index = this._queryQueue.indexOf(query)
- if (index > -1) {
- this._queryQueue.splice(index, 1)
- }
- this._pulseQueryQueue()
- }, readTimeout)
- query.callback = (err, res) => {
- clearTimeout(readTimeoutTimer)
- queryCallback(err, res)
- }
- }
- if (!this._queryable) {
- query.native = this.native
- process.nextTick(() => {
- query.handleError(new Error('Client has encountered a connection error and is not queryable'))
- })
- return result
- }
- if (this._ending) {
- query.native = this.native
- process.nextTick(() => {
- query.handleError(new Error('Client was closed and is not queryable'))
- })
- return result
- }
- this._queryQueue.push(query)
- this._pulseQueryQueue()
- return result
- }
- // disconnect from the backend server
- Client.prototype.end = function (cb) {
- var self = this
- this._ending = true
- if (!this._connected) {
- this.once('connect', this.end.bind(this, cb))
- }
- var result
- if (!cb) {
- result = new this._Promise(function (resolve, reject) {
- cb = (err) => (err ? reject(err) : resolve())
- })
- }
- this.native.end(function () {
- self._errorAllQueries(new Error('Connection terminated'))
- process.nextTick(() => {
- self.emit('end')
- if (cb) cb()
- })
- })
- return result
- }
- Client.prototype._hasActiveQuery = function () {
- return this._activeQuery && this._activeQuery.state !== 'error' && this._activeQuery.state !== 'end'
- }
- Client.prototype._pulseQueryQueue = function (initialConnection) {
- if (!this._connected) {
- return
- }
- if (this._hasActiveQuery()) {
- return
- }
- var query = this._queryQueue.shift()
- if (!query) {
- if (!initialConnection) {
- this.emit('drain')
- }
- return
- }
- this._activeQuery = query
- query.submit(this)
- var self = this
- query.once('_done', function () {
- self._pulseQueryQueue()
- })
- }
- // attempt to cancel an in-progress query
- Client.prototype.cancel = function (query) {
- if (this._activeQuery === query) {
- this.native.cancel(function () {})
- } else if (this._queryQueue.indexOf(query) !== -1) {
- this._queryQueue.splice(this._queryQueue.indexOf(query), 1)
- }
- }
- Client.prototype.ref = function () {}
- Client.prototype.unref = function () {}
- Client.prototype.setTypeParser = function (oid, format, parseFn) {
- return this._types.setTypeParser(oid, format, parseFn)
- }
- Client.prototype.getTypeParser = function (oid, format) {
- return this._types.getTypeParser(oid, format)
- }
|