sessions.js 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739
  1. "use strict";
  2. var _a;
  3. Object.defineProperty(exports, "__esModule", { value: true });
  4. exports.updateSessionFromResponse = exports.applySession = exports.ServerSessionPool = exports.ServerSession = exports.maybeClearPinnedConnection = exports.ClientSession = void 0;
  5. const util_1 = require("util");
  6. const bson_1 = require("./bson");
  7. const metrics_1 = require("./cmap/metrics");
  8. const shared_1 = require("./cmap/wire_protocol/shared");
  9. const constants_1 = require("./constants");
  10. const error_1 = require("./error");
  11. const mongo_types_1 = require("./mongo_types");
  12. const execute_operation_1 = require("./operations/execute_operation");
  13. const run_command_1 = require("./operations/run_command");
  14. const read_concern_1 = require("./read_concern");
  15. const read_preference_1 = require("./read_preference");
  16. const common_1 = require("./sdam/common");
  17. const transactions_1 = require("./transactions");
  18. const utils_1 = require("./utils");
  19. const write_concern_1 = require("./write_concern");
  20. const minWireVersionForShardedTransactions = 8;
  21. /** @internal */
  22. const kServerSession = Symbol('serverSession');
  23. /** @internal */
  24. const kSnapshotTime = Symbol('snapshotTime');
  25. /** @internal */
  26. const kSnapshotEnabled = Symbol('snapshotEnabled');
  27. /** @internal */
  28. const kPinnedConnection = Symbol('pinnedConnection');
  29. /** @internal Accumulates total number of increments to add to txnNumber when applying session to command */
  30. const kTxnNumberIncrement = Symbol('txnNumberIncrement');
  31. /**
  32. * A class representing a client session on the server
  33. *
  34. * NOTE: not meant to be instantiated directly.
  35. * @public
  36. */
  37. class ClientSession extends mongo_types_1.TypedEventEmitter {
  38. /**
  39. * Create a client session.
  40. * @internal
  41. * @param client - The current client
  42. * @param sessionPool - The server session pool (Internal Class)
  43. * @param options - Optional settings
  44. * @param clientOptions - Optional settings provided when creating a MongoClient
  45. */
  46. constructor(client, sessionPool, options, clientOptions) {
  47. super();
  48. /** @internal */
  49. this[_a] = false;
  50. if (client == null) {
  51. // TODO(NODE-3483)
  52. throw new error_1.MongoRuntimeError('ClientSession requires a MongoClient');
  53. }
  54. if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) {
  55. // TODO(NODE-3483)
  56. throw new error_1.MongoRuntimeError('ClientSession requires a ServerSessionPool');
  57. }
  58. options = options ?? {};
  59. if (options.snapshot === true) {
  60. this[kSnapshotEnabled] = true;
  61. if (options.causalConsistency === true) {
  62. throw new error_1.MongoInvalidArgumentError('Properties "causalConsistency" and "snapshot" are mutually exclusive');
  63. }
  64. }
  65. this.client = client;
  66. this.sessionPool = sessionPool;
  67. this.hasEnded = false;
  68. this.clientOptions = clientOptions;
  69. this.explicit = !!options.explicit;
  70. this[kServerSession] = this.explicit ? this.sessionPool.acquire() : null;
  71. this[kTxnNumberIncrement] = 0;
  72. const defaultCausalConsistencyValue = this.explicit && options.snapshot !== true;
  73. this.supports = {
  74. // if we can enable causal consistency, do so by default
  75. causalConsistency: options.causalConsistency ?? defaultCausalConsistencyValue
  76. };
  77. this.clusterTime = options.initialClusterTime;
  78. this.operationTime = undefined;
  79. this.owner = options.owner;
  80. this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
  81. this.transaction = new transactions_1.Transaction();
  82. }
  83. /** The server id associated with this session */
  84. get id() {
  85. return this[kServerSession]?.id;
  86. }
  87. get serverSession() {
  88. let serverSession = this[kServerSession];
  89. if (serverSession == null) {
  90. if (this.explicit) {
  91. throw new error_1.MongoRuntimeError('Unexpected null serverSession for an explicit session');
  92. }
  93. if (this.hasEnded) {
  94. throw new error_1.MongoRuntimeError('Unexpected null serverSession for an ended implicit session');
  95. }
  96. serverSession = this.sessionPool.acquire();
  97. this[kServerSession] = serverSession;
  98. }
  99. return serverSession;
  100. }
  101. /** Whether or not this session is configured for snapshot reads */
  102. get snapshotEnabled() {
  103. return this[kSnapshotEnabled];
  104. }
  105. get loadBalanced() {
  106. return this.client.topology?.description.type === common_1.TopologyType.LoadBalanced;
  107. }
  108. /** @internal */
  109. get pinnedConnection() {
  110. return this[kPinnedConnection];
  111. }
  112. /** @internal */
  113. pin(conn) {
  114. if (this[kPinnedConnection]) {
  115. throw TypeError('Cannot pin multiple connections to the same session');
  116. }
  117. this[kPinnedConnection] = conn;
  118. conn.emit(constants_1.PINNED, this.inTransaction() ? metrics_1.ConnectionPoolMetrics.TXN : metrics_1.ConnectionPoolMetrics.CURSOR);
  119. }
  120. /** @internal */
  121. unpin(options) {
  122. if (this.loadBalanced) {
  123. return maybeClearPinnedConnection(this, options);
  124. }
  125. this.transaction.unpinServer();
  126. }
  127. get isPinned() {
  128. return this.loadBalanced ? !!this[kPinnedConnection] : this.transaction.isPinned;
  129. }
  130. /**
  131. * Ends this session on the server
  132. *
  133. * @param options - Optional settings. Currently reserved for future use
  134. */
  135. async endSession(options) {
  136. try {
  137. if (this.inTransaction()) {
  138. await this.abortTransaction();
  139. }
  140. if (!this.hasEnded) {
  141. const serverSession = this[kServerSession];
  142. if (serverSession != null) {
  143. // release the server session back to the pool
  144. this.sessionPool.release(serverSession);
  145. // Make sure a new serverSession never makes it onto this ClientSession
  146. Object.defineProperty(this, kServerSession, {
  147. value: ServerSession.clone(serverSession),
  148. writable: false
  149. });
  150. }
  151. // mark the session as ended, and emit a signal
  152. this.hasEnded = true;
  153. this.emit('ended', this);
  154. }
  155. }
  156. catch {
  157. // spec indicates that we should ignore all errors for `endSessions`
  158. }
  159. finally {
  160. maybeClearPinnedConnection(this, { force: true, ...options });
  161. }
  162. }
  163. /**
  164. * Advances the operationTime for a ClientSession.
  165. *
  166. * @param operationTime - the `BSON.Timestamp` of the operation type it is desired to advance to
  167. */
  168. advanceOperationTime(operationTime) {
  169. if (this.operationTime == null) {
  170. this.operationTime = operationTime;
  171. return;
  172. }
  173. if (operationTime.greaterThan(this.operationTime)) {
  174. this.operationTime = operationTime;
  175. }
  176. }
  177. /**
  178. * Advances the clusterTime for a ClientSession to the provided clusterTime of another ClientSession
  179. *
  180. * @param clusterTime - the $clusterTime returned by the server from another session in the form of a document containing the `BSON.Timestamp` clusterTime and signature
  181. */
  182. advanceClusterTime(clusterTime) {
  183. if (!clusterTime || typeof clusterTime !== 'object') {
  184. throw new error_1.MongoInvalidArgumentError('input cluster time must be an object');
  185. }
  186. if (!clusterTime.clusterTime || clusterTime.clusterTime._bsontype !== 'Timestamp') {
  187. throw new error_1.MongoInvalidArgumentError('input cluster time "clusterTime" property must be a valid BSON Timestamp');
  188. }
  189. if (!clusterTime.signature ||
  190. clusterTime.signature.hash?._bsontype !== 'Binary' ||
  191. (typeof clusterTime.signature.keyId !== 'bigint' &&
  192. typeof clusterTime.signature.keyId !== 'number' &&
  193. clusterTime.signature.keyId?._bsontype !== 'Long') // apparently we decode the key to number?
  194. ) {
  195. throw new error_1.MongoInvalidArgumentError('input cluster time must have a valid "signature" property with BSON Binary hash and BSON Long keyId');
  196. }
  197. (0, common_1._advanceClusterTime)(this, clusterTime);
  198. }
  199. /**
  200. * Used to determine if this session equals another
  201. *
  202. * @param session - The session to compare to
  203. */
  204. equals(session) {
  205. if (!(session instanceof ClientSession)) {
  206. return false;
  207. }
  208. if (this.id == null || session.id == null) {
  209. return false;
  210. }
  211. return utils_1.ByteUtils.equals(this.id.id.buffer, session.id.id.buffer);
  212. }
  213. /**
  214. * Increment the transaction number on the internal ServerSession
  215. *
  216. * @privateRemarks
  217. * This helper increments a value stored on the client session that will be
  218. * added to the serverSession's txnNumber upon applying it to a command.
  219. * This is because the serverSession is lazily acquired after a connection is obtained
  220. */
  221. incrementTransactionNumber() {
  222. this[kTxnNumberIncrement] += 1;
  223. }
  224. /** @returns whether this session is currently in a transaction or not */
  225. inTransaction() {
  226. return this.transaction.isActive;
  227. }
  228. /**
  229. * Starts a new transaction with the given options.
  230. *
  231. * @param options - Options for the transaction
  232. */
  233. startTransaction(options) {
  234. if (this[kSnapshotEnabled]) {
  235. throw new error_1.MongoCompatibilityError('Transactions are not supported in snapshot sessions');
  236. }
  237. if (this.inTransaction()) {
  238. throw new error_1.MongoTransactionError('Transaction already in progress');
  239. }
  240. if (this.isPinned && this.transaction.isCommitted) {
  241. this.unpin();
  242. }
  243. const topologyMaxWireVersion = (0, utils_1.maxWireVersion)(this.client.topology);
  244. if ((0, shared_1.isSharded)(this.client.topology) &&
  245. topologyMaxWireVersion != null &&
  246. topologyMaxWireVersion < minWireVersionForShardedTransactions) {
  247. throw new error_1.MongoCompatibilityError('Transactions are not supported on sharded clusters in MongoDB < 4.2.');
  248. }
  249. // increment txnNumber
  250. this.incrementTransactionNumber();
  251. // create transaction state
  252. this.transaction = new transactions_1.Transaction({
  253. readConcern: options?.readConcern ??
  254. this.defaultTransactionOptions.readConcern ??
  255. this.clientOptions?.readConcern,
  256. writeConcern: options?.writeConcern ??
  257. this.defaultTransactionOptions.writeConcern ??
  258. this.clientOptions?.writeConcern,
  259. readPreference: options?.readPreference ??
  260. this.defaultTransactionOptions.readPreference ??
  261. this.clientOptions?.readPreference,
  262. maxCommitTimeMS: options?.maxCommitTimeMS ?? this.defaultTransactionOptions.maxCommitTimeMS
  263. });
  264. this.transaction.transition(transactions_1.TxnState.STARTING_TRANSACTION);
  265. }
  266. /**
  267. * Commits the currently active transaction in this session.
  268. */
  269. async commitTransaction() {
  270. return endTransactionAsync(this, 'commitTransaction');
  271. }
  272. /**
  273. * Aborts the currently active transaction in this session.
  274. */
  275. async abortTransaction() {
  276. return endTransactionAsync(this, 'abortTransaction');
  277. }
  278. /**
  279. * This is here to ensure that ClientSession is never serialized to BSON.
  280. */
  281. toBSON() {
  282. throw new error_1.MongoRuntimeError('ClientSession cannot be serialized to BSON.');
  283. }
  284. /**
  285. * Runs a provided callback within a transaction, retrying either the commitTransaction operation
  286. * or entire transaction as needed (and when the error permits) to better ensure that
  287. * the transaction can complete successfully.
  288. *
  289. * **IMPORTANT:** This method requires the user to return a Promise, and `await` all operations.
  290. * Any callbacks that do not return a Promise will result in undefined behavior.
  291. *
  292. * @remarks
  293. * This function:
  294. * - Will return the command response from the final commitTransaction if every operation is successful (can be used as a truthy object)
  295. * - Will return `undefined` if the transaction is explicitly aborted with `await session.abortTransaction()`
  296. * - Will throw if one of the operations throws or `throw` statement is used inside the `withTransaction` callback
  297. *
  298. * Checkout a descriptive example here:
  299. * @see https://www.mongodb.com/developer/quickstart/node-transactions/
  300. *
  301. * @param fn - callback to run within a transaction
  302. * @param options - optional settings for the transaction
  303. * @returns A raw command response or undefined
  304. */
  305. async withTransaction(fn, options) {
  306. const startTime = (0, utils_1.now)();
  307. return attemptTransaction(this, startTime, fn, options);
  308. }
  309. }
  310. exports.ClientSession = ClientSession;
  311. _a = kSnapshotEnabled;
  312. const MAX_WITH_TRANSACTION_TIMEOUT = 120000;
  313. const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
  314. 'CannotSatisfyWriteConcern',
  315. 'UnknownReplWriteConcern',
  316. 'UnsatisfiableWriteConcern'
  317. ]);
  318. function hasNotTimedOut(startTime, max) {
  319. return (0, utils_1.calculateDurationInMs)(startTime) < max;
  320. }
  321. function isUnknownTransactionCommitResult(err) {
  322. const isNonDeterministicWriteConcernError = err instanceof error_1.MongoServerError &&
  323. err.codeName &&
  324. NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName);
  325. return (isMaxTimeMSExpiredError(err) ||
  326. (!isNonDeterministicWriteConcernError &&
  327. err.code !== error_1.MONGODB_ERROR_CODES.UnsatisfiableWriteConcern &&
  328. err.code !== error_1.MONGODB_ERROR_CODES.UnknownReplWriteConcern));
  329. }
  330. function maybeClearPinnedConnection(session, options) {
  331. // unpin a connection if it has been pinned
  332. const conn = session[kPinnedConnection];
  333. const error = options?.error;
  334. if (session.inTransaction() &&
  335. error &&
  336. error instanceof error_1.MongoError &&
  337. error.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) {
  338. return;
  339. }
  340. const topology = session.client.topology;
  341. // NOTE: the spec talks about what to do on a network error only, but the tests seem to
  342. // to validate that we don't unpin on _all_ errors?
  343. if (conn && topology != null) {
  344. const servers = Array.from(topology.s.servers.values());
  345. const loadBalancer = servers[0];
  346. if (options?.error == null || options?.force) {
  347. loadBalancer.pool.checkIn(conn);
  348. conn.emit(constants_1.UNPINNED, session.transaction.state !== transactions_1.TxnState.NO_TRANSACTION
  349. ? metrics_1.ConnectionPoolMetrics.TXN
  350. : metrics_1.ConnectionPoolMetrics.CURSOR);
  351. if (options?.forceClear) {
  352. loadBalancer.pool.clear({ serviceId: conn.serviceId });
  353. }
  354. }
  355. session[kPinnedConnection] = undefined;
  356. }
  357. }
  358. exports.maybeClearPinnedConnection = maybeClearPinnedConnection;
  359. function isMaxTimeMSExpiredError(err) {
  360. if (err == null || !(err instanceof error_1.MongoServerError)) {
  361. return false;
  362. }
  363. return (err.code === error_1.MONGODB_ERROR_CODES.MaxTimeMSExpired ||
  364. (err.writeConcernError && err.writeConcernError.code === error_1.MONGODB_ERROR_CODES.MaxTimeMSExpired));
  365. }
  366. function attemptTransactionCommit(session, startTime, fn, options) {
  367. return session.commitTransaction().catch((err) => {
  368. if (err instanceof error_1.MongoError &&
  369. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
  370. !isMaxTimeMSExpiredError(err)) {
  371. if (err.hasErrorLabel(error_1.MongoErrorLabel.UnknownTransactionCommitResult)) {
  372. return attemptTransactionCommit(session, startTime, fn, options);
  373. }
  374. if (err.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) {
  375. return attemptTransaction(session, startTime, fn, options);
  376. }
  377. }
  378. throw err;
  379. });
  380. }
  381. const USER_EXPLICIT_TXN_END_STATES = new Set([
  382. transactions_1.TxnState.NO_TRANSACTION,
  383. transactions_1.TxnState.TRANSACTION_COMMITTED,
  384. transactions_1.TxnState.TRANSACTION_ABORTED
  385. ]);
  386. function userExplicitlyEndedTransaction(session) {
  387. return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
  388. }
  389. function attemptTransaction(session, startTime, fn, options) {
  390. session.startTransaction(options);
  391. let promise;
  392. try {
  393. promise = fn(session);
  394. }
  395. catch (err) {
  396. promise = Promise.reject(err);
  397. }
  398. if (!(0, utils_1.isPromiseLike)(promise)) {
  399. session.abortTransaction().catch(() => null);
  400. throw new error_1.MongoInvalidArgumentError('Function provided to `withTransaction` must return a Promise');
  401. }
  402. return promise.then(() => {
  403. if (userExplicitlyEndedTransaction(session)) {
  404. return;
  405. }
  406. return attemptTransactionCommit(session, startTime, fn, options);
  407. }, err => {
  408. function maybeRetryOrThrow(err) {
  409. if (err instanceof error_1.MongoError &&
  410. err.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError) &&
  411. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)) {
  412. return attemptTransaction(session, startTime, fn, options);
  413. }
  414. if (isMaxTimeMSExpiredError(err)) {
  415. err.addErrorLabel(error_1.MongoErrorLabel.UnknownTransactionCommitResult);
  416. }
  417. throw err;
  418. }
  419. if (session.inTransaction()) {
  420. return session.abortTransaction().then(() => maybeRetryOrThrow(err));
  421. }
  422. return maybeRetryOrThrow(err);
  423. });
  424. }
  425. const endTransactionAsync = (0, util_1.promisify)(endTransaction);
  426. function endTransaction(session, commandName, callback) {
  427. // handle any initial problematic cases
  428. const txnState = session.transaction.state;
  429. if (txnState === transactions_1.TxnState.NO_TRANSACTION) {
  430. callback(new error_1.MongoTransactionError('No transaction started'));
  431. return;
  432. }
  433. if (commandName === 'commitTransaction') {
  434. if (txnState === transactions_1.TxnState.STARTING_TRANSACTION ||
  435. txnState === transactions_1.TxnState.TRANSACTION_COMMITTED_EMPTY) {
  436. // the transaction was never started, we can safely exit here
  437. session.transaction.transition(transactions_1.TxnState.TRANSACTION_COMMITTED_EMPTY);
  438. callback();
  439. return;
  440. }
  441. if (txnState === transactions_1.TxnState.TRANSACTION_ABORTED) {
  442. callback(new error_1.MongoTransactionError('Cannot call commitTransaction after calling abortTransaction'));
  443. return;
  444. }
  445. }
  446. else {
  447. if (txnState === transactions_1.TxnState.STARTING_TRANSACTION) {
  448. // the transaction was never started, we can safely exit here
  449. session.transaction.transition(transactions_1.TxnState.TRANSACTION_ABORTED);
  450. callback();
  451. return;
  452. }
  453. if (txnState === transactions_1.TxnState.TRANSACTION_ABORTED) {
  454. callback(new error_1.MongoTransactionError('Cannot call abortTransaction twice'));
  455. return;
  456. }
  457. if (txnState === transactions_1.TxnState.TRANSACTION_COMMITTED ||
  458. txnState === transactions_1.TxnState.TRANSACTION_COMMITTED_EMPTY) {
  459. callback(new error_1.MongoTransactionError('Cannot call abortTransaction after calling commitTransaction'));
  460. return;
  461. }
  462. }
  463. // construct and send the command
  464. const command = { [commandName]: 1 };
  465. // apply a writeConcern if specified
  466. let writeConcern;
  467. if (session.transaction.options.writeConcern) {
  468. writeConcern = Object.assign({}, session.transaction.options.writeConcern);
  469. }
  470. else if (session.clientOptions && session.clientOptions.writeConcern) {
  471. writeConcern = { w: session.clientOptions.writeConcern.w };
  472. }
  473. if (txnState === transactions_1.TxnState.TRANSACTION_COMMITTED) {
  474. writeConcern = Object.assign({ wtimeoutMS: 10000 }, writeConcern, { w: 'majority' });
  475. }
  476. if (writeConcern) {
  477. write_concern_1.WriteConcern.apply(command, writeConcern);
  478. }
  479. if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) {
  480. Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS });
  481. }
  482. function commandHandler(error, result) {
  483. if (commandName !== 'commitTransaction') {
  484. session.transaction.transition(transactions_1.TxnState.TRANSACTION_ABORTED);
  485. if (session.loadBalanced) {
  486. maybeClearPinnedConnection(session, { force: false });
  487. }
  488. // The spec indicates that we should ignore all errors on `abortTransaction`
  489. return callback();
  490. }
  491. session.transaction.transition(transactions_1.TxnState.TRANSACTION_COMMITTED);
  492. if (error instanceof error_1.MongoError) {
  493. if (error.hasErrorLabel(error_1.MongoErrorLabel.RetryableWriteError) ||
  494. error instanceof error_1.MongoWriteConcernError ||
  495. isMaxTimeMSExpiredError(error)) {
  496. if (isUnknownTransactionCommitResult(error)) {
  497. error.addErrorLabel(error_1.MongoErrorLabel.UnknownTransactionCommitResult);
  498. // per txns spec, must unpin session in this case
  499. session.unpin({ error });
  500. }
  501. }
  502. else if (error.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) {
  503. session.unpin({ error });
  504. }
  505. }
  506. callback(error, result);
  507. }
  508. if (session.transaction.recoveryToken) {
  509. command.recoveryToken = session.transaction.recoveryToken;
  510. }
  511. // send the command
  512. (0, execute_operation_1.executeOperation)(session.client, new run_command_1.RunAdminCommandOperation(undefined, command, {
  513. session,
  514. readPreference: read_preference_1.ReadPreference.primary,
  515. bypassPinningCheck: true
  516. }), (error, result) => {
  517. if (command.abortTransaction) {
  518. // always unpin on abort regardless of command outcome
  519. session.unpin();
  520. }
  521. if (error instanceof error_1.MongoError && error.hasErrorLabel(error_1.MongoErrorLabel.RetryableWriteError)) {
  522. // SPEC-1185: apply majority write concern when retrying commitTransaction
  523. if (command.commitTransaction) {
  524. // per txns spec, must unpin session in this case
  525. session.unpin({ force: true });
  526. command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
  527. w: 'majority'
  528. });
  529. }
  530. return (0, execute_operation_1.executeOperation)(session.client, new run_command_1.RunAdminCommandOperation(undefined, command, {
  531. session,
  532. readPreference: read_preference_1.ReadPreference.primary,
  533. bypassPinningCheck: true
  534. }), commandHandler);
  535. }
  536. commandHandler(error, result);
  537. });
  538. }
  539. /**
  540. * Reflects the existence of a session on the server. Can be reused by the session pool.
  541. * WARNING: not meant to be instantiated directly. For internal use only.
  542. * @public
  543. */
  544. class ServerSession {
  545. /** @internal */
  546. constructor() {
  547. this.id = { id: new bson_1.Binary((0, utils_1.uuidV4)(), bson_1.Binary.SUBTYPE_UUID) };
  548. this.lastUse = (0, utils_1.now)();
  549. this.txnNumber = 0;
  550. this.isDirty = false;
  551. }
  552. /**
  553. * Determines if the server session has timed out.
  554. *
  555. * @param sessionTimeoutMinutes - The server's "logicalSessionTimeoutMinutes"
  556. */
  557. hasTimedOut(sessionTimeoutMinutes) {
  558. // Take the difference of the lastUse timestamp and now, which will result in a value in
  559. // milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes`
  560. const idleTimeMinutes = Math.round((((0, utils_1.calculateDurationInMs)(this.lastUse) % 86400000) % 3600000) / 60000);
  561. return idleTimeMinutes > sessionTimeoutMinutes - 1;
  562. }
  563. /**
  564. * @internal
  565. * Cloning meant to keep a readable reference to the server session data
  566. * after ClientSession has ended
  567. */
  568. static clone(serverSession) {
  569. const arrayBuffer = new ArrayBuffer(16);
  570. const idBytes = Buffer.from(arrayBuffer);
  571. idBytes.set(serverSession.id.id.buffer);
  572. const id = new bson_1.Binary(idBytes, serverSession.id.id.sub_type);
  573. // Manual prototype construction to avoid modifying the constructor of this class
  574. return Object.setPrototypeOf({
  575. id: { id },
  576. lastUse: serverSession.lastUse,
  577. txnNumber: serverSession.txnNumber,
  578. isDirty: serverSession.isDirty
  579. }, ServerSession.prototype);
  580. }
  581. }
  582. exports.ServerSession = ServerSession;
  583. /**
  584. * Maintains a pool of Server Sessions.
  585. * For internal use only
  586. * @internal
  587. */
  588. class ServerSessionPool {
  589. constructor(client) {
  590. if (client == null) {
  591. throw new error_1.MongoRuntimeError('ServerSessionPool requires a MongoClient');
  592. }
  593. this.client = client;
  594. this.sessions = new utils_1.List();
  595. }
  596. /**
  597. * Acquire a Server Session from the pool.
  598. * Iterates through each session in the pool, removing any stale sessions
  599. * along the way. The first non-stale session found is removed from the
  600. * pool and returned. If no non-stale session is found, a new ServerSession is created.
  601. */
  602. acquire() {
  603. const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10;
  604. let session = null;
  605. // Try to obtain from session pool
  606. while (this.sessions.length > 0) {
  607. const potentialSession = this.sessions.shift();
  608. if (potentialSession != null &&
  609. (!!this.client.topology?.loadBalanced ||
  610. !potentialSession.hasTimedOut(sessionTimeoutMinutes))) {
  611. session = potentialSession;
  612. break;
  613. }
  614. }
  615. // If nothing valid came from the pool make a new one
  616. if (session == null) {
  617. session = new ServerSession();
  618. }
  619. return session;
  620. }
  621. /**
  622. * Release a session to the session pool
  623. * Adds the session back to the session pool if the session has not timed out yet.
  624. * This method also removes any stale sessions from the pool.
  625. *
  626. * @param session - The session to release to the pool
  627. */
  628. release(session) {
  629. const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10;
  630. if (this.client.topology?.loadBalanced && !sessionTimeoutMinutes) {
  631. this.sessions.unshift(session);
  632. }
  633. if (!sessionTimeoutMinutes) {
  634. return;
  635. }
  636. this.sessions.prune(session => session.hasTimedOut(sessionTimeoutMinutes));
  637. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  638. if (session.isDirty) {
  639. return;
  640. }
  641. // otherwise, readd this session to the session pool
  642. this.sessions.unshift(session);
  643. }
  644. }
  645. }
  646. exports.ServerSessionPool = ServerSessionPool;
  647. /**
  648. * Optionally decorate a command with sessions specific keys
  649. *
  650. * @param session - the session tracking transaction state
  651. * @param command - the command to decorate
  652. * @param options - Optional settings passed to calling operation
  653. *
  654. * @internal
  655. */
  656. function applySession(session, command, options) {
  657. if (session.hasEnded) {
  658. return new error_1.MongoExpiredSessionError();
  659. }
  660. // May acquire serverSession here
  661. const serverSession = session.serverSession;
  662. if (serverSession == null) {
  663. return new error_1.MongoRuntimeError('Unable to acquire server session');
  664. }
  665. if (options.writeConcern?.w === 0) {
  666. if (session && session.explicit) {
  667. // Error if user provided an explicit session to an unacknowledged write (SPEC-1019)
  668. return new error_1.MongoAPIError('Cannot have explicit session with unacknowledged writes');
  669. }
  670. return;
  671. }
  672. // mark the last use of this session, and apply the `lsid`
  673. serverSession.lastUse = (0, utils_1.now)();
  674. command.lsid = serverSession.id;
  675. const inTxnOrTxnCommand = session.inTransaction() || (0, transactions_1.isTransactionCommand)(command);
  676. const isRetryableWrite = !!options.willRetryWrite;
  677. if (isRetryableWrite || inTxnOrTxnCommand) {
  678. serverSession.txnNumber += session[kTxnNumberIncrement];
  679. session[kTxnNumberIncrement] = 0;
  680. // TODO(NODE-2674): Preserve int64 sent from MongoDB
  681. command.txnNumber = bson_1.Long.fromNumber(serverSession.txnNumber);
  682. }
  683. if (!inTxnOrTxnCommand) {
  684. if (session.transaction.state !== transactions_1.TxnState.NO_TRANSACTION) {
  685. session.transaction.transition(transactions_1.TxnState.NO_TRANSACTION);
  686. }
  687. if (session.supports.causalConsistency &&
  688. session.operationTime &&
  689. (0, utils_1.commandSupportsReadConcern)(command, options)) {
  690. command.readConcern = command.readConcern || {};
  691. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  692. }
  693. else if (session[kSnapshotEnabled]) {
  694. command.readConcern = command.readConcern || { level: read_concern_1.ReadConcernLevel.snapshot };
  695. if (session[kSnapshotTime] != null) {
  696. Object.assign(command.readConcern, { atClusterTime: session[kSnapshotTime] });
  697. }
  698. }
  699. return;
  700. }
  701. // now attempt to apply transaction-specific sessions data
  702. // `autocommit` must always be false to differentiate from retryable writes
  703. command.autocommit = false;
  704. if (session.transaction.state === transactions_1.TxnState.STARTING_TRANSACTION) {
  705. session.transaction.transition(transactions_1.TxnState.TRANSACTION_IN_PROGRESS);
  706. command.startTransaction = true;
  707. const readConcern = session.transaction.options.readConcern || session?.clientOptions?.readConcern;
  708. if (readConcern) {
  709. command.readConcern = readConcern;
  710. }
  711. if (session.supports.causalConsistency && session.operationTime) {
  712. command.readConcern = command.readConcern || {};
  713. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  714. }
  715. }
  716. return;
  717. }
  718. exports.applySession = applySession;
  719. function updateSessionFromResponse(session, document) {
  720. if (document.$clusterTime) {
  721. (0, common_1._advanceClusterTime)(session, document.$clusterTime);
  722. }
  723. if (document.operationTime && session && session.supports.causalConsistency) {
  724. session.advanceOperationTime(document.operationTime);
  725. }
  726. if (document.recoveryToken && session && session.inTransaction()) {
  727. session.transaction._recoveryToken = document.recoveryToken;
  728. }
  729. if (session?.[kSnapshotEnabled] && session[kSnapshotTime] == null) {
  730. // find and aggregate commands return atClusterTime on the cursor
  731. // distinct includes it in the response body
  732. const atClusterTime = document.cursor?.atClusterTime || document.atClusterTime;
  733. if (atClusterTime) {
  734. session[kSnapshotTime] = atClusterTime;
  735. }
  736. }
  737. }
  738. exports.updateSessionFromResponse = updateSessionFromResponse;
  739. //# sourceMappingURL=sessions.js.map