index.js 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. 'use strict'
  2. const Result = require('pg/lib/result.js')
  3. const prepare = require('pg/lib/utils.js').prepareValue
  4. const EventEmitter = require('events').EventEmitter
  5. const util = require('util')
  6. let nextUniqueID = 1 // concept borrowed from org.postgresql.core.v3.QueryExecutorImpl
  7. class Cursor extends EventEmitter {
  8. constructor(text, values, config) {
  9. super()
  10. this._conf = config || {}
  11. this.text = text
  12. this.values = values ? values.map(prepare) : null
  13. this.connection = null
  14. this._queue = []
  15. this.state = 'initialized'
  16. this._result = new Result(this._conf.rowMode, this._conf.types)
  17. this._Promise = this._conf.Promise || global.Promise
  18. this._cb = null
  19. this._rows = null
  20. this._portal = null
  21. this._ifNoData = this._ifNoData.bind(this)
  22. this._rowDescription = this._rowDescription.bind(this)
  23. }
  24. _ifNoData() {
  25. this.state = 'idle'
  26. this._shiftQueue()
  27. if (this.connection) {
  28. this.connection.removeListener('rowDescription', this._rowDescription)
  29. }
  30. }
  31. _rowDescription() {
  32. if (this.connection) {
  33. this.connection.removeListener('noData', this._ifNoData)
  34. }
  35. }
  36. submit(connection) {
  37. this.state = 'submitted'
  38. this.connection = connection
  39. this._portal = 'C_' + nextUniqueID++
  40. const con = connection
  41. con.parse(
  42. {
  43. text: this.text,
  44. },
  45. true
  46. )
  47. con.bind(
  48. {
  49. portal: this._portal,
  50. values: this.values,
  51. },
  52. true
  53. )
  54. con.describe(
  55. {
  56. type: 'P',
  57. name: this._portal, // AWS Redshift requires a portal name
  58. },
  59. true
  60. )
  61. con.flush()
  62. if (this._conf.types) {
  63. this._result._getTypeParser = this._conf.types.getTypeParser
  64. }
  65. con.once('noData', this._ifNoData)
  66. con.once('rowDescription', this._rowDescription)
  67. }
  68. _shiftQueue() {
  69. if (this._queue.length) {
  70. this._getRows.apply(this, this._queue.shift())
  71. }
  72. }
  73. _closePortal() {
  74. if (this.state === 'done') return
  75. // because we opened a named portal to stream results
  76. // we need to close the same named portal. Leaving a named portal
  77. // open can lock tables for modification if inside a transaction.
  78. // see https://github.com/brianc/node-pg-cursor/issues/56
  79. this.connection.close({ type: 'P', name: this._portal })
  80. // If we've received an error we already sent a sync message.
  81. // do not send another sync as it triggers another readyForQuery message.
  82. if (this.state !== 'error') {
  83. this.connection.sync()
  84. }
  85. this.state = 'done'
  86. }
  87. handleRowDescription(msg) {
  88. this._result.addFields(msg.fields)
  89. this.state = 'idle'
  90. this._shiftQueue()
  91. }
  92. handleDataRow(msg) {
  93. const row = this._result.parseRow(msg.fields)
  94. this.emit('row', row, this._result)
  95. this._rows.push(row)
  96. }
  97. _sendRows() {
  98. this.state = 'idle'
  99. setImmediate(() => {
  100. const cb = this._cb
  101. // remove callback before calling it
  102. // because likely a new one will be added
  103. // within the call to this callback
  104. this._cb = null
  105. if (cb) {
  106. this._result.rows = this._rows
  107. cb(null, this._rows, this._result)
  108. }
  109. this._rows = []
  110. })
  111. }
  112. handleCommandComplete(msg) {
  113. this._result.addCommandComplete(msg)
  114. this._closePortal()
  115. }
  116. handlePortalSuspended() {
  117. this._sendRows()
  118. }
  119. handleReadyForQuery() {
  120. this._sendRows()
  121. this.state = 'done'
  122. this.emit('end', this._result)
  123. }
  124. handleEmptyQuery() {
  125. this.connection.sync()
  126. }
  127. handleError(msg) {
  128. // If this cursor has already closed, don't try to handle the error.
  129. if (this.state === 'done') return
  130. // If we're in an initialized state we've never been submitted
  131. // and don't have a connection instance reference yet.
  132. // This can happen if you queue a stream and close the client before
  133. // the client has submitted the stream. In this scenario we don't have
  134. // a connection so there's nothing to unsubscribe from.
  135. if (this.state !== 'initialized') {
  136. this.connection.removeListener('noData', this._ifNoData)
  137. this.connection.removeListener('rowDescription', this._rowDescription)
  138. // call sync to trigger a readyForQuery
  139. this.connection.sync()
  140. }
  141. this.state = 'error'
  142. this._error = msg
  143. // satisfy any waiting callback
  144. if (this._cb) {
  145. this._cb(msg)
  146. }
  147. // dispatch error to all waiting callbacks
  148. for (let i = 0; i < this._queue.length; i++) {
  149. const queuedCallback = this._queue[i][1]
  150. queuedCallback.call(this, msg)
  151. }
  152. this._queue.length = 0
  153. if (this.listenerCount('error') > 0) {
  154. // only dispatch error events if we have a listener
  155. this.emit('error', msg)
  156. }
  157. }
  158. _getRows(rows, cb) {
  159. this.state = 'busy'
  160. this._cb = cb
  161. this._rows = []
  162. const msg = {
  163. portal: this._portal,
  164. rows: rows,
  165. }
  166. this.connection.execute(msg, true)
  167. this.connection.flush()
  168. }
  169. // users really shouldn't be calling 'end' here and terminating a connection to postgres
  170. // via the low level connection.end api
  171. end(cb) {
  172. if (this.state !== 'initialized') {
  173. this.connection.sync()
  174. }
  175. this.connection.once('end', cb)
  176. this.connection.end()
  177. }
  178. close(cb) {
  179. let promise
  180. if (!cb) {
  181. promise = new this._Promise((resolve, reject) => {
  182. cb = (err) => (err ? reject(err) : resolve())
  183. })
  184. }
  185. if (!this.connection || this.state === 'done') {
  186. setImmediate(cb)
  187. return promise
  188. }
  189. this._closePortal()
  190. this.connection.once('readyForQuery', function () {
  191. cb()
  192. })
  193. // Return the promise (or undefined)
  194. return promise
  195. }
  196. read(rows, cb) {
  197. let promise
  198. if (!cb) {
  199. promise = new this._Promise((resolve, reject) => {
  200. cb = (err, rows) => (err ? reject(err) : resolve(rows))
  201. })
  202. }
  203. if (this.state === 'idle' || this.state === 'submitted') {
  204. this._getRows(rows, cb)
  205. } else if (this.state === 'busy' || this.state === 'initialized') {
  206. this._queue.push([rows, cb])
  207. } else if (this.state === 'error') {
  208. setImmediate(() => cb(this._error))
  209. } else if (this.state === 'done') {
  210. setImmediate(() => cb(null, []))
  211. } else {
  212. throw new Error('Unknown state: ' + this.state)
  213. }
  214. // Return the promise (or undefined)
  215. return promise
  216. }
  217. }
  218. Cursor.prototype.end = util.deprecate(
  219. Cursor.prototype.end,
  220. 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.'
  221. )
  222. module.exports = Cursor