123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- /*
- * Copyright (c) 2015-present, Vitaly Tomilov
- *
- * See the LICENSE file at the top-level directory of this distribution
- * for licensing information.
- *
- * Removal or modification of this copyright notice is prohibited.
- */
- const {Events} = require('./events');
- const npm = {
- utils: require('./utils'),
- text: require('./text')
- };
- ////////////////////////////////////////////
- // Streams query data into any destination,
- // with the help of pg-query-stream library.
- function $stream(ctx, qs, initCB, config) {
- const $p = config.promise;
- // istanbul ignore next:
- // we do not provide code coverage for the Native Bindings specifics
- if (ctx.options.pgNative) {
- return $p.reject(new Error(npm.text.nativeStreaming));
- }
- // Stream class was renamed again, see the following issue:
- // https://github.com/brianc/node-postgres/issues/2412
- if (!qs || !qs.constructor || qs.constructor.name !== 'QueryStream') {
- // invalid or missing stream object;
- return $p.reject(new TypeError(npm.text.invalidStream));
- }
- if (qs._reading || qs._closed) {
- // stream object is in the wrong state;
- return $p.reject(new Error(npm.text.invalidStreamState));
- }
- if (typeof initCB !== 'function') {
- // parameter `initCB` must be passed as the initialization callback;
- return $p.reject(new TypeError(npm.text.invalidStreamCB));
- }
- let error = Events.query(ctx.options, getContext());
- if (error) {
- error = getError(error);
- Events.error(ctx.options, error, getContext());
- return $p.reject(error);
- }
- const stream = ctx.db.client.query(qs);
- stream.on('data', onData);
- stream.on('error', onError);
- stream.on('end', onEnd);
- try {
- initCB.call(this, stream); // the stream must be initialized during the call;
- } catch (e) {
- release();
- error = getError(e);
- Events.error(ctx.options, error, getContext());
- return $p.reject(error);
- }
- const start = Date.now();
- let resolve, reject, nRows = 0;
- function onData(data) {
- nRows++;
- error = Events.receive(ctx.options, [data], undefined, getContext());
- if (error) {
- onError(error);
- }
- }
- function onError(e) {
- release();
- stream.destroy();
- e = getError(e);
- Events.error(ctx.options, e, getContext());
- reject(e);
- }
- function onEnd() {
- release();
- resolve({
- processed: nRows, // total number of rows processed;
- duration: Date.now() - start // duration, in milliseconds;
- });
- }
- function release() {
- stream.removeListener('data', onData);
- stream.removeListener('error', onError);
- stream.removeListener('end', onEnd);
- }
- function getError(e) {
- return e instanceof npm.utils.InternalError ? e.error : e;
- }
- function getContext() {
- let client;
- if (ctx.db) {
- client = ctx.db.client;
- } else {
- error = new Error(npm.text.looseQuery);
- }
- return {
- client,
- dc: ctx.dc,
- query: qs.cursor.text,
- params: qs.cursor.values,
- ctx: ctx.ctx
- };
- }
- return $p((res, rej) => {
- resolve = res;
- reject = rej;
- });
- }
- module.exports = $stream;
|