query.js 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. 'use strict';
  2. const process = require('process');
  3. const Timers = require('timers');
  4. const Readable = require('stream').Readable;
  5. const Command = require('./command.js');
  6. const Packets = require('../packets/index.js');
  7. const getTextParser = require('../parsers/text_parser.js');
  8. const ServerStatus = require('../constants/server_status.js');
  9. const EmptyPacket = new Packets.Packet(0, Buffer.allocUnsafe(4), 0, 4);
  10. // http://dev.mysql.com/doc/internals/en/com-query.html
  11. class Query extends Command {
  12. constructor(options, callback) {
  13. super();
  14. this.sql = options.sql;
  15. this.values = options.values;
  16. this._queryOptions = options;
  17. this.namedPlaceholders = options.namedPlaceholders || false;
  18. this.onResult = callback;
  19. this.timeout = options.timeout;
  20. this.queryTimeout = null;
  21. this._fieldCount = 0;
  22. this._rowParser = null;
  23. this._fields = [];
  24. this._rows = [];
  25. this._receivedFieldsCount = 0;
  26. this._resultIndex = 0;
  27. this._localStream = null;
  28. this._unpipeStream = function () { };
  29. this._streamFactory = options.infileStreamFactory;
  30. this._connection = null;
  31. }
  32. then() {
  33. const err =
  34. "You have tried to call .then(), .catch(), or invoked await on the result of query that is not a promise, which is a programming error. Try calling con.promise().query(), or require('mysql2/promise') instead of 'mysql2' for a promise-compatible version of the query interface. To learn how to use async/await or Promises check out documentation at https://sidorares.github.io/node-mysql2/docs#using-promise-wrapper, or the mysql2 documentation at https://sidorares.github.io/node-mysql2/docs/documentation/promise-wrapper";
  35. // eslint-disable-next-line
  36. console.log(err);
  37. throw new Error(err);
  38. }
  39. /* eslint no-unused-vars: ["error", { "argsIgnorePattern": "^_" }] */
  40. start(_packet, connection) {
  41. if (connection.config.debug) {
  42. // eslint-disable-next-line
  43. console.log(' Sending query command: %s', this.sql);
  44. }
  45. this._connection = connection;
  46. this.options = Object.assign({}, connection.config, this._queryOptions);
  47. this._setTimeout();
  48. const cmdPacket = new Packets.Query(
  49. this.sql,
  50. connection.config.charsetNumber
  51. );
  52. connection.writePacket(cmdPacket.toPacket(1));
  53. return Query.prototype.resultsetHeader;
  54. }
  55. done() {
  56. this._unpipeStream();
  57. // if all ready timeout, return null directly
  58. if (this.timeout && !this.queryTimeout) {
  59. return null;
  60. }
  61. // else clear timer
  62. if (this.queryTimeout) {
  63. Timers.clearTimeout(this.queryTimeout);
  64. this.queryTimeout = null;
  65. }
  66. if (this.onResult) {
  67. let rows, fields;
  68. if (this._resultIndex === 0) {
  69. rows = this._rows[0];
  70. fields = this._fields[0];
  71. } else {
  72. rows = this._rows;
  73. fields = this._fields;
  74. }
  75. if (fields) {
  76. process.nextTick(() => {
  77. this.onResult(null, rows, fields);
  78. });
  79. } else {
  80. process.nextTick(() => {
  81. this.onResult(null, rows);
  82. });
  83. }
  84. }
  85. return null;
  86. }
  87. doneInsert(rs) {
  88. if (this._localStreamError) {
  89. if (this.onResult) {
  90. this.onResult(this._localStreamError, rs);
  91. } else {
  92. this.emit('error', this._localStreamError);
  93. }
  94. return null;
  95. }
  96. this._rows.push(rs);
  97. this._fields.push(void 0);
  98. this.emit('fields', void 0);
  99. this.emit('result', rs);
  100. if (rs.serverStatus & ServerStatus.SERVER_MORE_RESULTS_EXISTS) {
  101. this._resultIndex++;
  102. return this.resultsetHeader;
  103. }
  104. return this.done();
  105. }
  106. resultsetHeader(packet, connection) {
  107. const rs = new Packets.ResultSetHeader(packet, connection);
  108. this._fieldCount = rs.fieldCount;
  109. if (connection.config.debug) {
  110. // eslint-disable-next-line
  111. console.log(
  112. ` Resultset header received, expecting ${rs.fieldCount} column definition packets`
  113. );
  114. }
  115. if (this._fieldCount === 0) {
  116. return this.doneInsert(rs);
  117. }
  118. if (this._fieldCount === null) {
  119. return this._streamLocalInfile(connection, rs.infileName);
  120. }
  121. this._receivedFieldsCount = 0;
  122. this._rows.push([]);
  123. this._fields.push([]);
  124. return this.readField;
  125. }
  126. _streamLocalInfile(connection, path) {
  127. if (this._streamFactory) {
  128. this._localStream = this._streamFactory(path);
  129. } else {
  130. this._localStreamError = new Error(
  131. `As a result of LOCAL INFILE command server wants to read ${path} file, but as of v2.0 you must provide streamFactory option returning ReadStream.`
  132. );
  133. connection.writePacket(EmptyPacket);
  134. return this.infileOk;
  135. }
  136. const onConnectionError = () => {
  137. this._unpipeStream();
  138. };
  139. const onDrain = () => {
  140. this._localStream.resume();
  141. };
  142. const onPause = () => {
  143. this._localStream.pause();
  144. };
  145. const onData = function (data) {
  146. const dataWithHeader = Buffer.allocUnsafe(data.length + 4);
  147. data.copy(dataWithHeader, 4);
  148. connection.writePacket(
  149. new Packets.Packet(0, dataWithHeader, 0, dataWithHeader.length)
  150. );
  151. };
  152. const onEnd = () => {
  153. connection.removeListener('error', onConnectionError);
  154. connection.writePacket(EmptyPacket);
  155. };
  156. const onError = err => {
  157. this._localStreamError = err;
  158. connection.removeListener('error', onConnectionError);
  159. connection.writePacket(EmptyPacket);
  160. };
  161. this._unpipeStream = () => {
  162. connection.stream.removeListener('pause', onPause);
  163. connection.stream.removeListener('drain', onDrain);
  164. this._localStream.removeListener('data', onData);
  165. this._localStream.removeListener('end', onEnd);
  166. this._localStream.removeListener('error', onError);
  167. };
  168. connection.stream.on('pause', onPause);
  169. connection.stream.on('drain', onDrain);
  170. this._localStream.on('data', onData);
  171. this._localStream.on('end', onEnd);
  172. this._localStream.on('error', onError);
  173. connection.once('error', onConnectionError);
  174. return this.infileOk;
  175. }
  176. readField(packet, connection) {
  177. this._receivedFieldsCount++;
  178. // Often there is much more data in the column definition than in the row itself
  179. // If you set manually _fields[0] to array of ColumnDefinition's (from previous call)
  180. // you can 'cache' result of parsing. Field packets still received, but ignored in that case
  181. // this is the reason _receivedFieldsCount exist (otherwise we could just use current length of fields array)
  182. if (this._fields[this._resultIndex].length !== this._fieldCount) {
  183. const field = new Packets.ColumnDefinition(
  184. packet,
  185. connection.clientEncoding
  186. );
  187. this._fields[this._resultIndex].push(field);
  188. if (connection.config.debug) {
  189. /* eslint-disable no-console */
  190. console.log(' Column definition:');
  191. console.log(` name: ${field.name}`);
  192. console.log(` type: ${field.columnType}`);
  193. console.log(` flags: ${field.flags}`);
  194. /* eslint-enable no-console */
  195. }
  196. }
  197. // last field received
  198. if (this._receivedFieldsCount === this._fieldCount) {
  199. const fields = this._fields[this._resultIndex];
  200. this.emit('fields', fields);
  201. this._rowParser = new (getTextParser(fields, this.options, connection.config))(fields);
  202. return Query.prototype.fieldsEOF;
  203. }
  204. return Query.prototype.readField;
  205. }
  206. fieldsEOF(packet, connection) {
  207. // check EOF
  208. if (!packet.isEOF()) {
  209. return connection.protocolError('Expected EOF packet');
  210. }
  211. return this.row;
  212. }
  213. /* eslint no-unused-vars: ["error", { "argsIgnorePattern": "^_" }] */
  214. row(packet, _connection) {
  215. if (packet.isEOF()) {
  216. const status = packet.eofStatusFlags();
  217. const moreResults = status & ServerStatus.SERVER_MORE_RESULTS_EXISTS;
  218. if (moreResults) {
  219. this._resultIndex++;
  220. return Query.prototype.resultsetHeader;
  221. }
  222. return this.done();
  223. }
  224. let row;
  225. try {
  226. row = this._rowParser.next(
  227. packet,
  228. this._fields[this._resultIndex],
  229. this.options
  230. );
  231. } catch (err) {
  232. this._localStreamError = err;
  233. return this.doneInsert(null);
  234. }
  235. if (this.onResult) {
  236. this._rows[this._resultIndex].push(row);
  237. } else {
  238. this.emit('result', row, this._resultIndex);
  239. }
  240. return Query.prototype.row;
  241. }
  242. infileOk(packet, connection) {
  243. const rs = new Packets.ResultSetHeader(packet, connection);
  244. return this.doneInsert(rs);
  245. }
  246. stream(options) {
  247. options = options || {};
  248. options.objectMode = true;
  249. const stream = new Readable(options);
  250. stream._read = () => {
  251. this._connection && this._connection.resume();
  252. };
  253. this.on('result', (row, resultSetIndex) => {
  254. if (!stream.push(row)) {
  255. this._connection.pause();
  256. }
  257. stream.emit('result', row, resultSetIndex); // replicate old emitter
  258. });
  259. this.on('error', err => {
  260. stream.emit('error', err); // Pass on any errors
  261. });
  262. this.on('end', () => {
  263. stream.push(null); // pushing null, indicating EOF
  264. });
  265. this.on('fields', fields => {
  266. stream.emit('fields', fields); // replicate old emitter
  267. });
  268. stream.on('end', () => {
  269. stream.emit('close');
  270. });
  271. return stream;
  272. }
  273. _setTimeout() {
  274. if (this.timeout) {
  275. const timeoutHandler = this._handleTimeoutError.bind(this);
  276. this.queryTimeout = Timers.setTimeout(
  277. timeoutHandler,
  278. this.timeout
  279. );
  280. }
  281. }
  282. _handleTimeoutError() {
  283. if (this.queryTimeout) {
  284. Timers.clearTimeout(this.queryTimeout);
  285. this.queryTimeout = null;
  286. }
  287. const err = new Error('Query inactivity timeout');
  288. err.errorno = 'PROTOCOL_SEQUENCE_TIMEOUT';
  289. err.code = 'PROTOCOL_SEQUENCE_TIMEOUT';
  290. err.syscall = 'query';
  291. if (this.onResult) {
  292. this.onResult(err);
  293. } else {
  294. this.emit('error', err);
  295. }
  296. }
  297. }
  298. Query.prototype.catch = Query.prototype.then;
  299. module.exports = Query;