123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- 'use strict'
- const Result = require('pg/lib/result.js')
- const prepare = require('pg/lib/utils.js').prepareValue
- const EventEmitter = require('events').EventEmitter
- const util = require('util')
- let nextUniqueID = 1 // concept borrowed from org.postgresql.core.v3.QueryExecutorImpl
- class Cursor extends EventEmitter {
- constructor(text, values, config) {
- super()
- this._conf = config || {}
- this.text = text
- this.values = values ? values.map(prepare) : null
- this.connection = null
- this._queue = []
- this.state = 'initialized'
- this._result = new Result(this._conf.rowMode, this._conf.types)
- this._Promise = this._conf.Promise || global.Promise
- this._cb = null
- this._rows = null
- this._portal = null
- this._ifNoData = this._ifNoData.bind(this)
- this._rowDescription = this._rowDescription.bind(this)
- }
- _ifNoData() {
- this.state = 'idle'
- this._shiftQueue()
- if (this.connection) {
- this.connection.removeListener('rowDescription', this._rowDescription)
- }
- }
- _rowDescription() {
- if (this.connection) {
- this.connection.removeListener('noData', this._ifNoData)
- }
- }
- submit(connection) {
- this.state = 'submitted'
- this.connection = connection
- this._portal = 'C_' + nextUniqueID++
- const con = connection
- con.parse(
- {
- text: this.text,
- },
- true
- )
- con.bind(
- {
- portal: this._portal,
- values: this.values,
- },
- true
- )
- con.describe(
- {
- type: 'P',
- name: this._portal, // AWS Redshift requires a portal name
- },
- true
- )
- con.flush()
- if (this._conf.types) {
- this._result._getTypeParser = this._conf.types.getTypeParser
- }
- con.once('noData', this._ifNoData)
- con.once('rowDescription', this._rowDescription)
- }
- _shiftQueue() {
- if (this._queue.length) {
- this._getRows.apply(this, this._queue.shift())
- }
- }
- _closePortal() {
- if (this.state === 'done') return
- // because we opened a named portal to stream results
- // we need to close the same named portal. Leaving a named portal
- // open can lock tables for modification if inside a transaction.
- // see https://github.com/brianc/node-pg-cursor/issues/56
- this.connection.close({ type: 'P', name: this._portal })
- // If we've received an error we already sent a sync message.
- // do not send another sync as it triggers another readyForQuery message.
- if (this.state !== 'error') {
- this.connection.sync()
- }
- this.state = 'done'
- }
- handleRowDescription(msg) {
- this._result.addFields(msg.fields)
- this.state = 'idle'
- this._shiftQueue()
- }
- handleDataRow(msg) {
- const row = this._result.parseRow(msg.fields)
- this.emit('row', row, this._result)
- this._rows.push(row)
- }
- _sendRows() {
- this.state = 'idle'
- setImmediate(() => {
- const cb = this._cb
- // remove callback before calling it
- // because likely a new one will be added
- // within the call to this callback
- this._cb = null
- if (cb) {
- this._result.rows = this._rows
- cb(null, this._rows, this._result)
- }
- this._rows = []
- })
- }
- handleCommandComplete(msg) {
- this._result.addCommandComplete(msg)
- this._closePortal()
- }
- handlePortalSuspended() {
- this._sendRows()
- }
- handleReadyForQuery() {
- this._sendRows()
- this.state = 'done'
- this.emit('end', this._result)
- }
- handleEmptyQuery() {
- this.connection.sync()
- }
- handleError(msg) {
- // If this cursor has already closed, don't try to handle the error.
- if (this.state === 'done') return
- // If we're in an initialized state we've never been submitted
- // and don't have a connection instance reference yet.
- // This can happen if you queue a stream and close the client before
- // the client has submitted the stream. In this scenario we don't have
- // a connection so there's nothing to unsubscribe from.
- if (this.state !== 'initialized') {
- this.connection.removeListener('noData', this._ifNoData)
- this.connection.removeListener('rowDescription', this._rowDescription)
- // call sync to trigger a readyForQuery
- this.connection.sync()
- }
- this.state = 'error'
- this._error = msg
- // satisfy any waiting callback
- if (this._cb) {
- this._cb(msg)
- }
- // dispatch error to all waiting callbacks
- for (let i = 0; i < this._queue.length; i++) {
- const queuedCallback = this._queue[i][1]
- queuedCallback.call(this, msg)
- }
- this._queue.length = 0
- if (this.listenerCount('error') > 0) {
- // only dispatch error events if we have a listener
- this.emit('error', msg)
- }
- }
- _getRows(rows, cb) {
- this.state = 'busy'
- this._cb = cb
- this._rows = []
- const msg = {
- portal: this._portal,
- rows: rows,
- }
- this.connection.execute(msg, true)
- this.connection.flush()
- }
- // users really shouldn't be calling 'end' here and terminating a connection to postgres
- // via the low level connection.end api
- end(cb) {
- if (this.state !== 'initialized') {
- this.connection.sync()
- }
- this.connection.once('end', cb)
- this.connection.end()
- }
- close(cb) {
- let promise
- if (!cb) {
- promise = new this._Promise((resolve, reject) => {
- cb = (err) => (err ? reject(err) : resolve())
- })
- }
- if (!this.connection || this.state === 'done') {
- setImmediate(cb)
- return promise
- }
- this._closePortal()
- this.connection.once('readyForQuery', function () {
- cb()
- })
- // Return the promise (or undefined)
- return promise
- }
- read(rows, cb) {
- let promise
- if (!cb) {
- promise = new this._Promise((resolve, reject) => {
- cb = (err, rows) => (err ? reject(err) : resolve(rows))
- })
- }
- if (this.state === 'idle' || this.state === 'submitted') {
- this._getRows(rows, cb)
- } else if (this.state === 'busy' || this.state === 'initialized') {
- this._queue.push([rows, cb])
- } else if (this.state === 'error') {
- setImmediate(() => cb(this._error))
- } else if (this.state === 'done') {
- setImmediate(() => cb(null, []))
- } else {
- throw new Error('Unknown state: ' + this.state)
- }
- // Return the promise (or undefined)
- return promise
- }
- }
- Cursor.prototype.end = util.deprecate(
- Cursor.prototype.end,
- 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.'
- )
- module.exports = Cursor
|