query.js 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. 'use strict'
  2. var EventEmitter = require('events').EventEmitter
  3. var util = require('util')
  4. var utils = require('../utils')
  5. var NativeQuery = (module.exports = function (config, values, callback) {
  6. EventEmitter.call(this)
  7. config = utils.normalizeQueryConfig(config, values, callback)
  8. this.text = config.text
  9. this.values = config.values
  10. this.name = config.name
  11. this.queryMode = config.queryMode
  12. this.callback = config.callback
  13. this.state = 'new'
  14. this._arrayMode = config.rowMode === 'array'
  15. // if the 'row' event is listened for
  16. // then emit them as they come in
  17. // without setting singleRowMode to true
  18. // this has almost no meaning because libpq
  19. // reads all rows into memory befor returning any
  20. this._emitRowEvents = false
  21. this.on(
  22. 'newListener',
  23. function (event) {
  24. if (event === 'row') this._emitRowEvents = true
  25. }.bind(this)
  26. )
  27. })
  28. util.inherits(NativeQuery, EventEmitter)
  29. var errorFieldMap = {
  30. /* eslint-disable quote-props */
  31. sqlState: 'code',
  32. statementPosition: 'position',
  33. messagePrimary: 'message',
  34. context: 'where',
  35. schemaName: 'schema',
  36. tableName: 'table',
  37. columnName: 'column',
  38. dataTypeName: 'dataType',
  39. constraintName: 'constraint',
  40. sourceFile: 'file',
  41. sourceLine: 'line',
  42. sourceFunction: 'routine',
  43. }
  44. NativeQuery.prototype.handleError = function (err) {
  45. // copy pq error fields into the error object
  46. var fields = this.native.pq.resultErrorFields()
  47. if (fields) {
  48. for (var key in fields) {
  49. var normalizedFieldName = errorFieldMap[key] || key
  50. err[normalizedFieldName] = fields[key]
  51. }
  52. }
  53. if (this.callback) {
  54. this.callback(err)
  55. } else {
  56. this.emit('error', err)
  57. }
  58. this.state = 'error'
  59. }
  60. NativeQuery.prototype.then = function (onSuccess, onFailure) {
  61. return this._getPromise().then(onSuccess, onFailure)
  62. }
  63. NativeQuery.prototype.catch = function (callback) {
  64. return this._getPromise().catch(callback)
  65. }
  66. NativeQuery.prototype._getPromise = function () {
  67. if (this._promise) return this._promise
  68. this._promise = new Promise(
  69. function (resolve, reject) {
  70. this._once('end', resolve)
  71. this._once('error', reject)
  72. }.bind(this)
  73. )
  74. return this._promise
  75. }
  76. NativeQuery.prototype.submit = function (client) {
  77. this.state = 'running'
  78. var self = this
  79. this.native = client.native
  80. client.native.arrayMode = this._arrayMode
  81. var after = function (err, rows, results) {
  82. client.native.arrayMode = false
  83. setImmediate(function () {
  84. self.emit('_done')
  85. })
  86. // handle possible query error
  87. if (err) {
  88. return self.handleError(err)
  89. }
  90. // emit row events for each row in the result
  91. if (self._emitRowEvents) {
  92. if (results.length > 1) {
  93. rows.forEach((rowOfRows, i) => {
  94. rowOfRows.forEach((row) => {
  95. self.emit('row', row, results[i])
  96. })
  97. })
  98. } else {
  99. rows.forEach(function (row) {
  100. self.emit('row', row, results)
  101. })
  102. }
  103. }
  104. // handle successful result
  105. self.state = 'end'
  106. self.emit('end', results)
  107. if (self.callback) {
  108. self.callback(null, results)
  109. }
  110. }
  111. if (process.domain) {
  112. after = process.domain.bind(after)
  113. }
  114. // named query
  115. if (this.name) {
  116. if (this.name.length > 63) {
  117. /* eslint-disable no-console */
  118. console.error('Warning! Postgres only supports 63 characters for query names.')
  119. console.error('You supplied %s (%s)', this.name, this.name.length)
  120. console.error('This can cause conflicts and silent errors executing queries')
  121. /* eslint-enable no-console */
  122. }
  123. var values = (this.values || []).map(utils.prepareValue)
  124. // check if the client has already executed this named query
  125. // if so...just execute it again - skip the planning phase
  126. if (client.namedQueries[this.name]) {
  127. if (this.text && client.namedQueries[this.name] !== this.text) {
  128. const err = new Error(`Prepared statements must be unique - '${this.name}' was used for a different statement`)
  129. return after(err)
  130. }
  131. return client.native.execute(this.name, values, after)
  132. }
  133. // plan the named query the first time, then execute it
  134. return client.native.prepare(this.name, this.text, values.length, function (err) {
  135. if (err) return after(err)
  136. client.namedQueries[self.name] = self.text
  137. return self.native.execute(self.name, values, after)
  138. })
  139. } else if (this.values) {
  140. if (!Array.isArray(this.values)) {
  141. const err = new Error('Query values must be an array')
  142. return after(err)
  143. }
  144. var vals = this.values.map(utils.prepareValue)
  145. client.native.query(this.text, vals, after)
  146. } else if (this.queryMode === 'extended') {
  147. client.native.query(this.text, [], after)
  148. } else {
  149. client.native.query(this.text, after)
  150. }
  151. }