abstract_cursor.js 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.assertUninitialized = exports.AbstractCursor = exports.CURSOR_FLAGS = void 0;
  4. const stream_1 = require("stream");
  5. const util_1 = require("util");
  6. const bson_1 = require("../bson");
  7. const error_1 = require("../error");
  8. const mongo_types_1 = require("../mongo_types");
  9. const execute_operation_1 = require("../operations/execute_operation");
  10. const get_more_1 = require("../operations/get_more");
  11. const kill_cursors_1 = require("../operations/kill_cursors");
  12. const read_concern_1 = require("../read_concern");
  13. const read_preference_1 = require("../read_preference");
  14. const sessions_1 = require("../sessions");
  15. const utils_1 = require("../utils");
  16. /** @internal */
  17. const kId = Symbol('id');
  18. /** @internal */
  19. const kDocuments = Symbol('documents');
  20. /** @internal */
  21. const kServer = Symbol('server');
  22. /** @internal */
  23. const kNamespace = Symbol('namespace');
  24. /** @internal */
  25. const kClient = Symbol('client');
  26. /** @internal */
  27. const kSession = Symbol('session');
  28. /** @internal */
  29. const kOptions = Symbol('options');
  30. /** @internal */
  31. const kTransform = Symbol('transform');
  32. /** @internal */
  33. const kInitialized = Symbol('initialized');
  34. /** @internal */
  35. const kClosed = Symbol('closed');
  36. /** @internal */
  37. const kKilled = Symbol('killed');
  38. /** @internal */
  39. const kInit = Symbol('kInit');
  40. /** @public */
  41. exports.CURSOR_FLAGS = [
  42. 'tailable',
  43. 'oplogReplay',
  44. 'noCursorTimeout',
  45. 'awaitData',
  46. 'exhaust',
  47. 'partial'
  48. ];
  49. /** @public */
  50. class AbstractCursor extends mongo_types_1.TypedEventEmitter {
  51. /** @internal */
  52. constructor(client, namespace, options = {}) {
  53. super();
  54. if (!client.s.isMongoClient) {
  55. throw new error_1.MongoRuntimeError('Cursor must be constructed with MongoClient');
  56. }
  57. this[kClient] = client;
  58. this[kNamespace] = namespace;
  59. this[kId] = null;
  60. this[kDocuments] = new utils_1.List();
  61. this[kInitialized] = false;
  62. this[kClosed] = false;
  63. this[kKilled] = false;
  64. this[kOptions] = {
  65. readPreference: options.readPreference && options.readPreference instanceof read_preference_1.ReadPreference
  66. ? options.readPreference
  67. : read_preference_1.ReadPreference.primary,
  68. ...(0, bson_1.pluckBSONSerializeOptions)(options)
  69. };
  70. const readConcern = read_concern_1.ReadConcern.fromOptions(options);
  71. if (readConcern) {
  72. this[kOptions].readConcern = readConcern;
  73. }
  74. if (typeof options.batchSize === 'number') {
  75. this[kOptions].batchSize = options.batchSize;
  76. }
  77. // we check for undefined specifically here to allow falsy values
  78. // eslint-disable-next-line no-restricted-syntax
  79. if (options.comment !== undefined) {
  80. this[kOptions].comment = options.comment;
  81. }
  82. if (typeof options.maxTimeMS === 'number') {
  83. this[kOptions].maxTimeMS = options.maxTimeMS;
  84. }
  85. if (typeof options.maxAwaitTimeMS === 'number') {
  86. this[kOptions].maxAwaitTimeMS = options.maxAwaitTimeMS;
  87. }
  88. if (options.session instanceof sessions_1.ClientSession) {
  89. this[kSession] = options.session;
  90. }
  91. else {
  92. this[kSession] = this[kClient].startSession({ owner: this, explicit: false });
  93. }
  94. }
  95. get id() {
  96. return this[kId] ?? undefined;
  97. }
  98. /** @internal */
  99. get isDead() {
  100. return (this[kId]?.isZero() ?? false) || this[kClosed] || this[kKilled];
  101. }
  102. /** @internal */
  103. get client() {
  104. return this[kClient];
  105. }
  106. /** @internal */
  107. get server() {
  108. return this[kServer];
  109. }
  110. get namespace() {
  111. return this[kNamespace];
  112. }
  113. get readPreference() {
  114. return this[kOptions].readPreference;
  115. }
  116. get readConcern() {
  117. return this[kOptions].readConcern;
  118. }
  119. /** @internal */
  120. get session() {
  121. return this[kSession];
  122. }
  123. set session(clientSession) {
  124. this[kSession] = clientSession;
  125. }
  126. /** @internal */
  127. get cursorOptions() {
  128. return this[kOptions];
  129. }
  130. get closed() {
  131. return this[kClosed];
  132. }
  133. get killed() {
  134. return this[kKilled];
  135. }
  136. get loadBalanced() {
  137. return !!this[kClient].topology?.loadBalanced;
  138. }
  139. /** Returns current buffered documents length */
  140. bufferedCount() {
  141. return this[kDocuments].length;
  142. }
  143. /** Returns current buffered documents */
  144. readBufferedDocuments(number) {
  145. const bufferedDocs = [];
  146. const documentsToRead = Math.min(number ?? this[kDocuments].length, this[kDocuments].length);
  147. for (let count = 0; count < documentsToRead; count++) {
  148. const document = this[kDocuments].shift();
  149. if (document != null) {
  150. bufferedDocs.push(document);
  151. }
  152. }
  153. return bufferedDocs;
  154. }
  155. async *[Symbol.asyncIterator]() {
  156. if (this.closed) {
  157. return;
  158. }
  159. try {
  160. while (true) {
  161. const document = await this.next();
  162. // Intentional strict null check, because users can map cursors to falsey values.
  163. // We allow mapping to all values except for null.
  164. // eslint-disable-next-line no-restricted-syntax
  165. if (document === null) {
  166. if (!this.closed) {
  167. const message = 'Cursor returned a `null` document, but the cursor is not exhausted. Mapping documents to `null` is not supported in the cursor transform.';
  168. await cleanupCursorAsync(this, { needsToEmitClosed: true }).catch(() => null);
  169. throw new error_1.MongoAPIError(message);
  170. }
  171. break;
  172. }
  173. yield document;
  174. if (this[kId] === bson_1.Long.ZERO) {
  175. // Cursor exhausted
  176. break;
  177. }
  178. }
  179. }
  180. finally {
  181. // Only close the cursor if it has not already been closed. This finally clause handles
  182. // the case when a user would break out of a for await of loop early.
  183. if (!this.closed) {
  184. await this.close().catch(() => null);
  185. }
  186. }
  187. }
  188. stream(options) {
  189. if (options?.transform) {
  190. const transform = options.transform;
  191. const readable = new ReadableCursorStream(this);
  192. return readable.pipe(new stream_1.Transform({
  193. objectMode: true,
  194. highWaterMark: 1,
  195. transform(chunk, _, callback) {
  196. try {
  197. const transformed = transform(chunk);
  198. callback(undefined, transformed);
  199. }
  200. catch (err) {
  201. callback(err);
  202. }
  203. }
  204. }));
  205. }
  206. return new ReadableCursorStream(this);
  207. }
  208. async hasNext() {
  209. if (this[kId] === bson_1.Long.ZERO) {
  210. return false;
  211. }
  212. if (this[kDocuments].length !== 0) {
  213. return true;
  214. }
  215. const doc = await next(this, { blocking: true, transform: false });
  216. if (doc) {
  217. this[kDocuments].unshift(doc);
  218. return true;
  219. }
  220. return false;
  221. }
  222. /** Get the next available document from the cursor, returns null if no more documents are available. */
  223. async next() {
  224. if (this[kId] === bson_1.Long.ZERO) {
  225. throw new error_1.MongoCursorExhaustedError();
  226. }
  227. return next(this, { blocking: true, transform: true });
  228. }
  229. /**
  230. * Try to get the next available document from the cursor or `null` if an empty batch is returned
  231. */
  232. async tryNext() {
  233. if (this[kId] === bson_1.Long.ZERO) {
  234. throw new error_1.MongoCursorExhaustedError();
  235. }
  236. return next(this, { blocking: false, transform: true });
  237. }
  238. /**
  239. * Iterates over all the documents for this cursor using the iterator, callback pattern.
  240. *
  241. * If the iterator returns `false`, iteration will stop.
  242. *
  243. * @param iterator - The iteration callback.
  244. * @deprecated - Will be removed in a future release. Use for await...of instead.
  245. */
  246. async forEach(iterator) {
  247. if (typeof iterator !== 'function') {
  248. throw new error_1.MongoInvalidArgumentError('Argument "iterator" must be a function');
  249. }
  250. for await (const document of this) {
  251. const result = iterator(document);
  252. if (result === false) {
  253. break;
  254. }
  255. }
  256. }
  257. async close() {
  258. const needsToEmitClosed = !this[kClosed];
  259. this[kClosed] = true;
  260. await cleanupCursorAsync(this, { needsToEmitClosed });
  261. }
  262. /**
  263. * Returns an array of documents. The caller is responsible for making sure that there
  264. * is enough memory to store the results. Note that the array only contains partial
  265. * results when this cursor had been previously accessed. In that case,
  266. * cursor.rewind() can be used to reset the cursor.
  267. */
  268. async toArray() {
  269. const array = [];
  270. for await (const document of this) {
  271. array.push(document);
  272. }
  273. return array;
  274. }
  275. /**
  276. * Add a cursor flag to the cursor
  277. *
  278. * @param flag - The flag to set, must be one of following ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'partial' -.
  279. * @param value - The flag boolean value.
  280. */
  281. addCursorFlag(flag, value) {
  282. assertUninitialized(this);
  283. if (!exports.CURSOR_FLAGS.includes(flag)) {
  284. throw new error_1.MongoInvalidArgumentError(`Flag ${flag} is not one of ${exports.CURSOR_FLAGS}`);
  285. }
  286. if (typeof value !== 'boolean') {
  287. throw new error_1.MongoInvalidArgumentError(`Flag ${flag} must be a boolean value`);
  288. }
  289. this[kOptions][flag] = value;
  290. return this;
  291. }
  292. /**
  293. * Map all documents using the provided function
  294. * If there is a transform set on the cursor, that will be called first and the result passed to
  295. * this function's transform.
  296. *
  297. * @remarks
  298. *
  299. * **Note** Cursors use `null` internally to indicate that there are no more documents in the cursor. Providing a mapping
  300. * function that maps values to `null` will result in the cursor closing itself before it has finished iterating
  301. * all documents. This will **not** result in a memory leak, just surprising behavior. For example:
  302. *
  303. * ```typescript
  304. * const cursor = collection.find({});
  305. * cursor.map(() => null);
  306. *
  307. * const documents = await cursor.toArray();
  308. * // documents is always [], regardless of how many documents are in the collection.
  309. * ```
  310. *
  311. * Other falsey values are allowed:
  312. *
  313. * ```typescript
  314. * const cursor = collection.find({});
  315. * cursor.map(() => '');
  316. *
  317. * const documents = await cursor.toArray();
  318. * // documents is now an array of empty strings
  319. * ```
  320. *
  321. * **Note for Typescript Users:** adding a transform changes the return type of the iteration of this cursor,
  322. * it **does not** return a new instance of a cursor. This means when calling map,
  323. * you should always assign the result to a new variable in order to get a correctly typed cursor variable.
  324. * Take note of the following example:
  325. *
  326. * @example
  327. * ```typescript
  328. * const cursor: FindCursor<Document> = coll.find();
  329. * const mappedCursor: FindCursor<number> = cursor.map(doc => Object.keys(doc).length);
  330. * const keyCounts: number[] = await mappedCursor.toArray(); // cursor.toArray() still returns Document[]
  331. * ```
  332. * @param transform - The mapping transformation method.
  333. */
  334. map(transform) {
  335. assertUninitialized(this);
  336. const oldTransform = this[kTransform]; // TODO(NODE-3283): Improve transform typing
  337. if (oldTransform) {
  338. this[kTransform] = doc => {
  339. return transform(oldTransform(doc));
  340. };
  341. }
  342. else {
  343. this[kTransform] = transform;
  344. }
  345. return this;
  346. }
  347. /**
  348. * Set the ReadPreference for the cursor.
  349. *
  350. * @param readPreference - The new read preference for the cursor.
  351. */
  352. withReadPreference(readPreference) {
  353. assertUninitialized(this);
  354. if (readPreference instanceof read_preference_1.ReadPreference) {
  355. this[kOptions].readPreference = readPreference;
  356. }
  357. else if (typeof readPreference === 'string') {
  358. this[kOptions].readPreference = read_preference_1.ReadPreference.fromString(readPreference);
  359. }
  360. else {
  361. throw new error_1.MongoInvalidArgumentError(`Invalid read preference: ${readPreference}`);
  362. }
  363. return this;
  364. }
  365. /**
  366. * Set the ReadPreference for the cursor.
  367. *
  368. * @param readPreference - The new read preference for the cursor.
  369. */
  370. withReadConcern(readConcern) {
  371. assertUninitialized(this);
  372. const resolvedReadConcern = read_concern_1.ReadConcern.fromOptions({ readConcern });
  373. if (resolvedReadConcern) {
  374. this[kOptions].readConcern = resolvedReadConcern;
  375. }
  376. return this;
  377. }
  378. /**
  379. * Set a maxTimeMS on the cursor query, allowing for hard timeout limits on queries (Only supported on MongoDB 2.6 or higher)
  380. *
  381. * @param value - Number of milliseconds to wait before aborting the query.
  382. */
  383. maxTimeMS(value) {
  384. assertUninitialized(this);
  385. if (typeof value !== 'number') {
  386. throw new error_1.MongoInvalidArgumentError('Argument for maxTimeMS must be a number');
  387. }
  388. this[kOptions].maxTimeMS = value;
  389. return this;
  390. }
  391. /**
  392. * Set the batch size for the cursor.
  393. *
  394. * @param value - The number of documents to return per batch. See {@link https://www.mongodb.com/docs/manual/reference/command/find/|find command documentation}.
  395. */
  396. batchSize(value) {
  397. assertUninitialized(this);
  398. if (this[kOptions].tailable) {
  399. throw new error_1.MongoTailableCursorError('Tailable cursor does not support batchSize');
  400. }
  401. if (typeof value !== 'number') {
  402. throw new error_1.MongoInvalidArgumentError('Operation "batchSize" requires an integer');
  403. }
  404. this[kOptions].batchSize = value;
  405. return this;
  406. }
  407. /**
  408. * Rewind this cursor to its uninitialized state. Any options that are present on the cursor will
  409. * remain in effect. Iterating this cursor will cause new queries to be sent to the server, even
  410. * if the resultant data has already been retrieved by this cursor.
  411. */
  412. rewind() {
  413. if (!this[kInitialized]) {
  414. return;
  415. }
  416. this[kId] = null;
  417. this[kDocuments].clear();
  418. this[kClosed] = false;
  419. this[kKilled] = false;
  420. this[kInitialized] = false;
  421. const session = this[kSession];
  422. if (session) {
  423. // We only want to end this session if we created it, and it hasn't ended yet
  424. if (session.explicit === false) {
  425. if (!session.hasEnded) {
  426. session.endSession().catch(() => null);
  427. }
  428. this[kSession] = this.client.startSession({ owner: this, explicit: false });
  429. }
  430. }
  431. }
  432. /** @internal */
  433. _getMore(batchSize, callback) {
  434. // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
  435. const getMoreOperation = new get_more_1.GetMoreOperation(this[kNamespace], this[kId], this[kServer], {
  436. ...this[kOptions],
  437. session: this[kSession],
  438. batchSize
  439. });
  440. (0, execute_operation_1.executeOperation)(this[kClient], getMoreOperation, callback);
  441. }
  442. /**
  443. * @internal
  444. *
  445. * This function is exposed for the unified test runner's createChangeStream
  446. * operation. We cannot refactor to use the abstract _initialize method without
  447. * a significant refactor.
  448. */
  449. [kInit](callback) {
  450. this._initialize(this[kSession], (error, state) => {
  451. if (state) {
  452. const response = state.response;
  453. this[kServer] = state.server;
  454. if (response.cursor) {
  455. // TODO(NODE-2674): Preserve int64 sent from MongoDB
  456. this[kId] =
  457. typeof response.cursor.id === 'number'
  458. ? bson_1.Long.fromNumber(response.cursor.id)
  459. : typeof response.cursor.id === 'bigint'
  460. ? bson_1.Long.fromBigInt(response.cursor.id)
  461. : response.cursor.id;
  462. if (response.cursor.ns) {
  463. this[kNamespace] = (0, utils_1.ns)(response.cursor.ns);
  464. }
  465. this[kDocuments].pushMany(response.cursor.firstBatch);
  466. }
  467. // When server responses return without a cursor document, we close this cursor
  468. // and return the raw server response. This is often the case for explain commands
  469. // for example
  470. if (this[kId] == null) {
  471. this[kId] = bson_1.Long.ZERO;
  472. // TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
  473. this[kDocuments].push(state.response);
  474. }
  475. }
  476. // the cursor is now initialized, even if an error occurred or it is dead
  477. this[kInitialized] = true;
  478. if (error) {
  479. return cleanupCursor(this, { error }, () => callback(error, undefined));
  480. }
  481. if (this.isDead) {
  482. return cleanupCursor(this, undefined, () => callback());
  483. }
  484. callback();
  485. });
  486. }
  487. }
  488. /** @event */
  489. AbstractCursor.CLOSE = 'close';
  490. exports.AbstractCursor = AbstractCursor;
  491. /**
  492. * @param cursor - the cursor on which to call `next`
  493. * @param blocking - a boolean indicating whether or not the cursor should `block` until data
  494. * is available. Generally, this flag is set to `false` because if the getMore returns no documents,
  495. * the cursor has been exhausted. In certain scenarios (ChangeStreams, tailable await cursors and
  496. * `tryNext`, for example) blocking is necessary because a getMore returning no documents does
  497. * not indicate the end of the cursor.
  498. * @param transform - if true, the cursor's transform function is applied to the result document (if the transform exists)
  499. * @returns the next document in the cursor, or `null`. When `blocking` is `true`, a `null` document means
  500. * the cursor has been exhausted. Otherwise, it means that there is no document available in the cursor's buffer.
  501. */
  502. async function next(cursor, { blocking, transform }) {
  503. if (cursor.closed) {
  504. return null;
  505. }
  506. do {
  507. if (cursor[kId] == null) {
  508. // All cursors must operate within a session, one must be made implicitly if not explicitly provided
  509. await (0, util_1.promisify)(cursor[kInit].bind(cursor))();
  510. }
  511. if (cursor[kDocuments].length !== 0) {
  512. const doc = cursor[kDocuments].shift();
  513. if (doc != null && transform && cursor[kTransform]) {
  514. try {
  515. return cursor[kTransform](doc);
  516. }
  517. catch (error) {
  518. // `cleanupCursorAsync` should never throw, but if it does we want to throw the original
  519. // error instead.
  520. await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => null);
  521. throw error;
  522. }
  523. }
  524. return doc;
  525. }
  526. if (cursor.isDead) {
  527. // if the cursor is dead, we clean it up
  528. // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
  529. // and we should surface the error
  530. await cleanupCursorAsync(cursor, {});
  531. return null;
  532. }
  533. // otherwise need to call getMore
  534. const batchSize = cursor[kOptions].batchSize || 1000;
  535. try {
  536. const response = await (0, util_1.promisify)(cursor._getMore.bind(cursor))(batchSize);
  537. if (response) {
  538. const cursorId = typeof response.cursor.id === 'number'
  539. ? bson_1.Long.fromNumber(response.cursor.id)
  540. : typeof response.cursor.id === 'bigint'
  541. ? bson_1.Long.fromBigInt(response.cursor.id)
  542. : response.cursor.id;
  543. cursor[kDocuments].pushMany(response.cursor.nextBatch);
  544. cursor[kId] = cursorId;
  545. }
  546. }
  547. catch (error) {
  548. // `cleanupCursorAsync` should never throw, but if it does we want to throw the original
  549. // error instead.
  550. await cleanupCursorAsync(cursor, { error }).catch(() => null);
  551. throw error;
  552. }
  553. if (cursor.isDead) {
  554. // If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
  555. // we intentionally clean up the cursor to release its session back into the pool before the cursor
  556. // is iterated. This prevents a cursor that is exhausted on the server from holding
  557. // onto a session indefinitely until the AbstractCursor is iterated.
  558. //
  559. // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
  560. // and we should surface the error
  561. await cleanupCursorAsync(cursor, {});
  562. }
  563. if (cursor[kDocuments].length === 0 && blocking === false) {
  564. return null;
  565. }
  566. } while (!cursor.isDead || cursor[kDocuments].length !== 0);
  567. return null;
  568. }
  569. const cleanupCursorAsync = (0, util_1.promisify)(cleanupCursor);
  570. function cleanupCursor(cursor, options, callback) {
  571. const cursorId = cursor[kId];
  572. const cursorNs = cursor[kNamespace];
  573. const server = cursor[kServer];
  574. const session = cursor[kSession];
  575. const error = options?.error;
  576. // Cursors only emit closed events once the client-side cursor has been exhausted fully or there
  577. // was an error. Notably, when the server returns a cursor id of 0 and a non-empty batch, we
  578. // cleanup the cursor but don't emit a `close` event.
  579. const needsToEmitClosed = options?.needsToEmitClosed ?? cursor[kDocuments].length === 0;
  580. if (error) {
  581. if (cursor.loadBalanced && error instanceof error_1.MongoNetworkError) {
  582. return completeCleanup();
  583. }
  584. }
  585. if (cursorId == null || server == null || cursorId.isZero() || cursorNs == null) {
  586. if (needsToEmitClosed) {
  587. cursor[kClosed] = true;
  588. cursor[kId] = bson_1.Long.ZERO;
  589. cursor.emit(AbstractCursor.CLOSE);
  590. }
  591. if (session) {
  592. if (session.owner === cursor) {
  593. session.endSession({ error }).finally(() => {
  594. callback();
  595. });
  596. return;
  597. }
  598. if (!session.inTransaction()) {
  599. (0, sessions_1.maybeClearPinnedConnection)(session, { error });
  600. }
  601. }
  602. return callback();
  603. }
  604. function completeCleanup() {
  605. if (session) {
  606. if (session.owner === cursor) {
  607. session.endSession({ error }).finally(() => {
  608. cursor.emit(AbstractCursor.CLOSE);
  609. callback();
  610. });
  611. return;
  612. }
  613. if (!session.inTransaction()) {
  614. (0, sessions_1.maybeClearPinnedConnection)(session, { error });
  615. }
  616. }
  617. cursor.emit(AbstractCursor.CLOSE);
  618. return callback();
  619. }
  620. cursor[kKilled] = true;
  621. if (session.hasEnded) {
  622. return completeCleanup();
  623. }
  624. (0, execute_operation_1.executeOperation)(cursor[kClient], new kill_cursors_1.KillCursorsOperation(cursorId, cursorNs, server, { session }))
  625. .catch(() => null)
  626. .finally(completeCleanup);
  627. }
  628. /** @internal */
  629. function assertUninitialized(cursor) {
  630. if (cursor[kInitialized]) {
  631. throw new error_1.MongoCursorInUseError();
  632. }
  633. }
  634. exports.assertUninitialized = assertUninitialized;
  635. class ReadableCursorStream extends stream_1.Readable {
  636. constructor(cursor) {
  637. super({
  638. objectMode: true,
  639. autoDestroy: false,
  640. highWaterMark: 1
  641. });
  642. this._readInProgress = false;
  643. this._cursor = cursor;
  644. }
  645. // eslint-disable-next-line @typescript-eslint/no-unused-vars
  646. _read(size) {
  647. if (!this._readInProgress) {
  648. this._readInProgress = true;
  649. this._readNext();
  650. }
  651. }
  652. _destroy(error, callback) {
  653. this._cursor.close().then(() => callback(error), closeError => callback(closeError));
  654. }
  655. _readNext() {
  656. next(this._cursor, { blocking: true, transform: true }).then(result => {
  657. if (result == null) {
  658. this.push(null);
  659. }
  660. else if (this.destroyed) {
  661. this._cursor.close().catch(() => null);
  662. }
  663. else {
  664. if (this.push(result)) {
  665. return this._readNext();
  666. }
  667. this._readInProgress = false;
  668. }
  669. }, err => {
  670. // NOTE: This is questionable, but we have a test backing the behavior. It seems the
  671. // desired behavior is that a stream ends cleanly when a user explicitly closes
  672. // a client during iteration. Alternatively, we could do the "right" thing and
  673. // propagate the error message by removing this special case.
  674. if (err.message.match(/server is closed/)) {
  675. this._cursor.close().catch(() => null);
  676. return this.push(null);
  677. }
  678. // NOTE: This is also perhaps questionable. The rationale here is that these errors tend
  679. // to be "operation was interrupted", where a cursor has been closed but there is an
  680. // active getMore in-flight. This used to check if the cursor was killed but once
  681. // that changed to happen in cleanup legitimate errors would not destroy the
  682. // stream. There are change streams test specifically test these cases.
  683. if (err.message.match(/operation was interrupted/)) {
  684. return this.push(null);
  685. }
  686. // NOTE: The two above checks on the message of the error will cause a null to be pushed
  687. // to the stream, thus closing the stream before the destroy call happens. This means
  688. // that either of those error messages on a change stream will not get a proper
  689. // 'error' event to be emitted (the error passed to destroy). Change stream resumability
  690. // relies on that error event to be emitted to create its new cursor and thus was not
  691. // working on 4.4 servers because the error emitted on failover was "interrupted at
  692. // shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down".
  693. // See NODE-4475.
  694. return this.destroy(err);
  695. });
  696. }
  697. }
  698. //# sourceMappingURL=abstract_cursor.js.map