stream.js 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. /*
  2. * Copyright (c) 2015-present, Vitaly Tomilov
  3. *
  4. * See the LICENSE file at the top-level directory of this distribution
  5. * for licensing information.
  6. *
  7. * Removal or modification of this copyright notice is prohibited.
  8. */
  9. const {Events} = require('./events');
  10. const npm = {
  11. utils: require('./utils'),
  12. text: require('./text')
  13. };
  14. ////////////////////////////////////////////
  15. // Streams query data into any destination,
  16. // with the help of pg-query-stream library.
  17. function $stream(ctx, qs, initCB, config) {
  18. const $p = config.promise;
  19. // istanbul ignore next:
  20. // we do not provide code coverage for the Native Bindings specifics
  21. if (ctx.options.pgNative) {
  22. return $p.reject(new Error(npm.text.nativeStreaming));
  23. }
  24. // Stream class was renamed again, see the following issue:
  25. // https://github.com/brianc/node-postgres/issues/2412
  26. if (!qs || !qs.constructor || qs.constructor.name !== 'QueryStream') {
  27. // invalid or missing stream object;
  28. return $p.reject(new TypeError(npm.text.invalidStream));
  29. }
  30. if (qs._reading || qs._closed) {
  31. // stream object is in the wrong state;
  32. return $p.reject(new Error(npm.text.invalidStreamState));
  33. }
  34. if (typeof initCB !== 'function') {
  35. // parameter `initCB` must be passed as the initialization callback;
  36. return $p.reject(new TypeError(npm.text.invalidStreamCB));
  37. }
  38. let error = Events.query(ctx.options, getContext());
  39. if (error) {
  40. error = getError(error);
  41. Events.error(ctx.options, error, getContext());
  42. return $p.reject(error);
  43. }
  44. const stream = ctx.db.client.query(qs);
  45. stream.on('data', onData);
  46. stream.on('error', onError);
  47. stream.on('end', onEnd);
  48. try {
  49. initCB.call(this, stream); // the stream must be initialized during the call;
  50. } catch (e) {
  51. release();
  52. error = getError(e);
  53. Events.error(ctx.options, error, getContext());
  54. return $p.reject(error);
  55. }
  56. const start = Date.now();
  57. let resolve, reject, nRows = 0;
  58. function onData(data) {
  59. nRows++;
  60. error = Events.receive(ctx.options, [data], undefined, getContext());
  61. if (error) {
  62. onError(error);
  63. }
  64. }
  65. function onError(e) {
  66. release();
  67. stream.destroy();
  68. e = getError(e);
  69. Events.error(ctx.options, e, getContext());
  70. reject(e);
  71. }
  72. function onEnd() {
  73. release();
  74. resolve({
  75. processed: nRows, // total number of rows processed;
  76. duration: Date.now() - start // duration, in milliseconds;
  77. });
  78. }
  79. function release() {
  80. stream.removeListener('data', onData);
  81. stream.removeListener('error', onError);
  82. stream.removeListener('end', onEnd);
  83. }
  84. function getError(e) {
  85. return e instanceof npm.utils.InternalError ? e.error : e;
  86. }
  87. function getContext() {
  88. let client;
  89. if (ctx.db) {
  90. client = ctx.db.client;
  91. } else {
  92. error = new Error(npm.text.looseQuery);
  93. }
  94. return {
  95. client,
  96. dc: ctx.dc,
  97. query: qs.cursor.text,
  98. params: qs.cursor.values,
  99. ctx: ctx.ctx
  100. };
  101. }
  102. return $p((res, rej) => {
  103. resolve = res;
  104. reject = rej;
  105. });
  106. }
  107. module.exports = $stream;