query.js 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. 'use strict'
  2. const { EventEmitter } = require('events')
  3. const Result = require('./result')
  4. const utils = require('./utils')
  5. class Query extends EventEmitter {
  6. constructor(config, values, callback) {
  7. super()
  8. config = utils.normalizeQueryConfig(config, values, callback)
  9. this.text = config.text
  10. this.values = config.values
  11. this.rows = config.rows
  12. this.types = config.types
  13. this.name = config.name
  14. this.queryMode = config.queryMode
  15. this.binary = config.binary
  16. // use unique portal name each time
  17. this.portal = config.portal || ''
  18. this.callback = config.callback
  19. this._rowMode = config.rowMode
  20. if (process.domain && config.callback) {
  21. this.callback = process.domain.bind(config.callback)
  22. }
  23. this._result = new Result(this._rowMode, this.types)
  24. // potential for multiple results
  25. this._results = this._result
  26. this._canceledDueToError = false
  27. }
  28. requiresPreparation() {
  29. if (this.queryMode === 'extended') {
  30. return true
  31. }
  32. // named queries must always be prepared
  33. if (this.name) {
  34. return true
  35. }
  36. // always prepare if there are max number of rows expected per
  37. // portal execution
  38. if (this.rows) {
  39. return true
  40. }
  41. // don't prepare empty text queries
  42. if (!this.text) {
  43. return false
  44. }
  45. // prepare if there are values
  46. if (!this.values) {
  47. return false
  48. }
  49. return this.values.length > 0
  50. }
  51. _checkForMultirow() {
  52. // if we already have a result with a command property
  53. // then we've already executed one query in a multi-statement simple query
  54. // turn our results into an array of results
  55. if (this._result.command) {
  56. if (!Array.isArray(this._results)) {
  57. this._results = [this._result]
  58. }
  59. this._result = new Result(this._rowMode, this._result._types)
  60. this._results.push(this._result)
  61. }
  62. }
  63. // associates row metadata from the supplied
  64. // message with this query object
  65. // metadata used when parsing row results
  66. handleRowDescription(msg) {
  67. this._checkForMultirow()
  68. this._result.addFields(msg.fields)
  69. this._accumulateRows = this.callback || !this.listeners('row').length
  70. }
  71. handleDataRow(msg) {
  72. let row
  73. if (this._canceledDueToError) {
  74. return
  75. }
  76. try {
  77. row = this._result.parseRow(msg.fields)
  78. } catch (err) {
  79. this._canceledDueToError = err
  80. return
  81. }
  82. this.emit('row', row, this._result)
  83. if (this._accumulateRows) {
  84. this._result.addRow(row)
  85. }
  86. }
  87. handleCommandComplete(msg, connection) {
  88. this._checkForMultirow()
  89. this._result.addCommandComplete(msg)
  90. // need to sync after each command complete of a prepared statement
  91. // if we were using a row count which results in multiple calls to _getRows
  92. if (this.rows) {
  93. connection.sync()
  94. }
  95. }
  96. // if a named prepared statement is created with empty query text
  97. // the backend will send an emptyQuery message but *not* a command complete message
  98. // since we pipeline sync immediately after execute we don't need to do anything here
  99. // unless we have rows specified, in which case we did not pipeline the intial sync call
  100. handleEmptyQuery(connection) {
  101. if (this.rows) {
  102. connection.sync()
  103. }
  104. }
  105. handleError(err, connection) {
  106. // need to sync after error during a prepared statement
  107. if (this._canceledDueToError) {
  108. err = this._canceledDueToError
  109. this._canceledDueToError = false
  110. }
  111. // if callback supplied do not emit error event as uncaught error
  112. // events will bubble up to node process
  113. if (this.callback) {
  114. return this.callback(err)
  115. }
  116. this.emit('error', err)
  117. }
  118. handleReadyForQuery(con) {
  119. if (this._canceledDueToError) {
  120. return this.handleError(this._canceledDueToError, con)
  121. }
  122. if (this.callback) {
  123. try {
  124. this.callback(null, this._results)
  125. } catch (err) {
  126. process.nextTick(() => {
  127. throw err
  128. })
  129. }
  130. }
  131. this.emit('end', this._results)
  132. }
  133. submit(connection) {
  134. if (typeof this.text !== 'string' && typeof this.name !== 'string') {
  135. return new Error('A query must have either text or a name. Supplying neither is unsupported.')
  136. }
  137. const previous = connection.parsedStatements[this.name]
  138. if (this.text && previous && this.text !== previous) {
  139. return new Error(`Prepared statements must be unique - '${this.name}' was used for a different statement`)
  140. }
  141. if (this.values && !Array.isArray(this.values)) {
  142. return new Error('Query values must be an array')
  143. }
  144. if (this.requiresPreparation()) {
  145. this.prepare(connection)
  146. } else {
  147. connection.query(this.text)
  148. }
  149. return null
  150. }
  151. hasBeenParsed(connection) {
  152. return this.name && connection.parsedStatements[this.name]
  153. }
  154. handlePortalSuspended(connection) {
  155. this._getRows(connection, this.rows)
  156. }
  157. _getRows(connection, rows) {
  158. connection.execute({
  159. portal: this.portal,
  160. rows: rows,
  161. })
  162. // if we're not reading pages of rows send the sync command
  163. // to indicate the pipeline is finished
  164. if (!rows) {
  165. connection.sync()
  166. } else {
  167. // otherwise flush the call out to read more rows
  168. connection.flush()
  169. }
  170. }
  171. // http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY
  172. prepare(connection) {
  173. // TODO refactor this poor encapsulation
  174. if (!this.hasBeenParsed(connection)) {
  175. connection.parse({
  176. text: this.text,
  177. name: this.name,
  178. types: this.types,
  179. })
  180. }
  181. // because we're mapping user supplied values to
  182. // postgres wire protocol compatible values it could
  183. // throw an exception, so try/catch this section
  184. try {
  185. connection.bind({
  186. portal: this.portal,
  187. statement: this.name,
  188. values: this.values,
  189. binary: this.binary,
  190. valueMapper: utils.prepareValue,
  191. })
  192. } catch (err) {
  193. this.handleError(err, connection)
  194. return
  195. }
  196. connection.describe({
  197. type: 'P',
  198. name: this.portal || '',
  199. })
  200. this._getRows(connection, this.rows)
  201. }
  202. handleCopyInResponse(connection) {
  203. connection.sendCopyFail('No source stream defined')
  204. }
  205. // eslint-disable-next-line no-unused-vars
  206. handleCopyData(msg, connection) {
  207. // noop
  208. }
  209. }
  210. module.exports = Query