sessions.js 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737
  1. "use strict";
  2. var _a;
  3. Object.defineProperty(exports, "__esModule", { value: true });
  4. exports.updateSessionFromResponse = exports.applySession = exports.ServerSessionPool = exports.ServerSession = exports.maybeClearPinnedConnection = exports.ClientSession = void 0;
  5. const util_1 = require("util");
  6. const bson_1 = require("./bson");
  7. const metrics_1 = require("./cmap/metrics");
  8. const shared_1 = require("./cmap/wire_protocol/shared");
  9. const constants_1 = require("./constants");
  10. const error_1 = require("./error");
  11. const mongo_types_1 = require("./mongo_types");
  12. const execute_operation_1 = require("./operations/execute_operation");
  13. const run_command_1 = require("./operations/run_command");
  14. const read_concern_1 = require("./read_concern");
  15. const read_preference_1 = require("./read_preference");
  16. const common_1 = require("./sdam/common");
  17. const transactions_1 = require("./transactions");
  18. const utils_1 = require("./utils");
  19. const write_concern_1 = require("./write_concern");
  20. const minWireVersionForShardedTransactions = 8;
  21. /** @internal */
  22. const kServerSession = Symbol('serverSession');
  23. /** @internal */
  24. const kSnapshotTime = Symbol('snapshotTime');
  25. /** @internal */
  26. const kSnapshotEnabled = Symbol('snapshotEnabled');
  27. /** @internal */
  28. const kPinnedConnection = Symbol('pinnedConnection');
  29. /** @internal Accumulates total number of increments to add to txnNumber when applying session to command */
  30. const kTxnNumberIncrement = Symbol('txnNumberIncrement');
  31. /**
  32. * A class representing a client session on the server
  33. *
  34. * NOTE: not meant to be instantiated directly.
  35. * @public
  36. */
  37. class ClientSession extends mongo_types_1.TypedEventEmitter {
  38. /**
  39. * Create a client session.
  40. * @internal
  41. * @param client - The current client
  42. * @param sessionPool - The server session pool (Internal Class)
  43. * @param options - Optional settings
  44. * @param clientOptions - Optional settings provided when creating a MongoClient
  45. */
  46. constructor(client, sessionPool, options, clientOptions) {
  47. super();
  48. /** @internal */
  49. this[_a] = false;
  50. if (client == null) {
  51. // TODO(NODE-3483)
  52. throw new error_1.MongoRuntimeError('ClientSession requires a MongoClient');
  53. }
  54. if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) {
  55. // TODO(NODE-3483)
  56. throw new error_1.MongoRuntimeError('ClientSession requires a ServerSessionPool');
  57. }
  58. options = options ?? {};
  59. if (options.snapshot === true) {
  60. this[kSnapshotEnabled] = true;
  61. if (options.causalConsistency === true) {
  62. throw new error_1.MongoInvalidArgumentError('Properties "causalConsistency" and "snapshot" are mutually exclusive');
  63. }
  64. }
  65. this.client = client;
  66. this.sessionPool = sessionPool;
  67. this.hasEnded = false;
  68. this.clientOptions = clientOptions;
  69. this.explicit = !!options.explicit;
  70. this[kServerSession] = this.explicit ? this.sessionPool.acquire() : null;
  71. this[kTxnNumberIncrement] = 0;
  72. const defaultCausalConsistencyValue = this.explicit && options.snapshot !== true;
  73. this.supports = {
  74. // if we can enable causal consistency, do so by default
  75. causalConsistency: options.causalConsistency ?? defaultCausalConsistencyValue
  76. };
  77. this.clusterTime = options.initialClusterTime;
  78. this.operationTime = undefined;
  79. this.owner = options.owner;
  80. this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
  81. this.transaction = new transactions_1.Transaction();
  82. }
  83. /** The server id associated with this session */
  84. get id() {
  85. return this[kServerSession]?.id;
  86. }
  87. get serverSession() {
  88. let serverSession = this[kServerSession];
  89. if (serverSession == null) {
  90. if (this.explicit) {
  91. throw new error_1.MongoRuntimeError('Unexpected null serverSession for an explicit session');
  92. }
  93. if (this.hasEnded) {
  94. throw new error_1.MongoRuntimeError('Unexpected null serverSession for an ended implicit session');
  95. }
  96. serverSession = this.sessionPool.acquire();
  97. this[kServerSession] = serverSession;
  98. }
  99. return serverSession;
  100. }
  101. /** Whether or not this session is configured for snapshot reads */
  102. get snapshotEnabled() {
  103. return this[kSnapshotEnabled];
  104. }
  105. get loadBalanced() {
  106. return this.client.topology?.description.type === common_1.TopologyType.LoadBalanced;
  107. }
  108. /** @internal */
  109. get pinnedConnection() {
  110. return this[kPinnedConnection];
  111. }
  112. /** @internal */
  113. pin(conn) {
  114. if (this[kPinnedConnection]) {
  115. throw TypeError('Cannot pin multiple connections to the same session');
  116. }
  117. this[kPinnedConnection] = conn;
  118. conn.emit(constants_1.PINNED, this.inTransaction() ? metrics_1.ConnectionPoolMetrics.TXN : metrics_1.ConnectionPoolMetrics.CURSOR);
  119. }
  120. /** @internal */
  121. unpin(options) {
  122. if (this.loadBalanced) {
  123. return maybeClearPinnedConnection(this, options);
  124. }
  125. this.transaction.unpinServer();
  126. }
  127. get isPinned() {
  128. return this.loadBalanced ? !!this[kPinnedConnection] : this.transaction.isPinned;
  129. }
  130. /**
  131. * Ends this session on the server
  132. *
  133. * @param options - Optional settings. Currently reserved for future use
  134. */
  135. async endSession(options) {
  136. try {
  137. if (this.inTransaction()) {
  138. await this.abortTransaction();
  139. }
  140. if (!this.hasEnded) {
  141. const serverSession = this[kServerSession];
  142. if (serverSession != null) {
  143. // release the server session back to the pool
  144. this.sessionPool.release(serverSession);
  145. // Make sure a new serverSession never makes it onto this ClientSession
  146. Object.defineProperty(this, kServerSession, {
  147. value: ServerSession.clone(serverSession),
  148. writable: false
  149. });
  150. }
  151. // mark the session as ended, and emit a signal
  152. this.hasEnded = true;
  153. this.emit('ended', this);
  154. }
  155. }
  156. catch {
  157. // spec indicates that we should ignore all errors for `endSessions`
  158. }
  159. finally {
  160. maybeClearPinnedConnection(this, { force: true, ...options });
  161. }
  162. }
  163. /**
  164. * Advances the operationTime for a ClientSession.
  165. *
  166. * @param operationTime - the `BSON.Timestamp` of the operation type it is desired to advance to
  167. */
  168. advanceOperationTime(operationTime) {
  169. if (this.operationTime == null) {
  170. this.operationTime = operationTime;
  171. return;
  172. }
  173. if (operationTime.greaterThan(this.operationTime)) {
  174. this.operationTime = operationTime;
  175. }
  176. }
  177. /**
  178. * Advances the clusterTime for a ClientSession to the provided clusterTime of another ClientSession
  179. *
  180. * @param clusterTime - the $clusterTime returned by the server from another session in the form of a document containing the `BSON.Timestamp` clusterTime and signature
  181. */
  182. advanceClusterTime(clusterTime) {
  183. if (!clusterTime || typeof clusterTime !== 'object') {
  184. throw new error_1.MongoInvalidArgumentError('input cluster time must be an object');
  185. }
  186. if (!clusterTime.clusterTime || clusterTime.clusterTime._bsontype !== 'Timestamp') {
  187. throw new error_1.MongoInvalidArgumentError('input cluster time "clusterTime" property must be a valid BSON Timestamp');
  188. }
  189. if (!clusterTime.signature ||
  190. clusterTime.signature.hash?._bsontype !== 'Binary' ||
  191. (typeof clusterTime.signature.keyId !== 'bigint' &&
  192. typeof clusterTime.signature.keyId !== 'number' &&
  193. clusterTime.signature.keyId?._bsontype !== 'Long') // apparently we decode the key to number?
  194. ) {
  195. throw new error_1.MongoInvalidArgumentError('input cluster time must have a valid "signature" property with BSON Binary hash and BSON Long keyId');
  196. }
  197. (0, common_1._advanceClusterTime)(this, clusterTime);
  198. }
  199. /**
  200. * Used to determine if this session equals another
  201. *
  202. * @param session - The session to compare to
  203. */
  204. equals(session) {
  205. if (!(session instanceof ClientSession)) {
  206. return false;
  207. }
  208. if (this.id == null || session.id == null) {
  209. return false;
  210. }
  211. return utils_1.ByteUtils.equals(this.id.id.buffer, session.id.id.buffer);
  212. }
  213. /**
  214. * Increment the transaction number on the internal ServerSession
  215. *
  216. * @privateRemarks
  217. * This helper increments a value stored on the client session that will be
  218. * added to the serverSession's txnNumber upon applying it to a command.
  219. * This is because the serverSession is lazily acquired after a connection is obtained
  220. */
  221. incrementTransactionNumber() {
  222. this[kTxnNumberIncrement] += 1;
  223. }
  224. /** @returns whether this session is currently in a transaction or not */
  225. inTransaction() {
  226. return this.transaction.isActive;
  227. }
  228. /**
  229. * Starts a new transaction with the given options.
  230. *
  231. * @param options - Options for the transaction
  232. */
  233. startTransaction(options) {
  234. if (this[kSnapshotEnabled]) {
  235. throw new error_1.MongoCompatibilityError('Transactions are not supported in snapshot sessions');
  236. }
  237. if (this.inTransaction()) {
  238. throw new error_1.MongoTransactionError('Transaction already in progress');
  239. }
  240. if (this.isPinned && this.transaction.isCommitted) {
  241. this.unpin();
  242. }
  243. const topologyMaxWireVersion = (0, utils_1.maxWireVersion)(this.client.topology);
  244. if ((0, shared_1.isSharded)(this.client.topology) &&
  245. topologyMaxWireVersion != null &&
  246. topologyMaxWireVersion < minWireVersionForShardedTransactions) {
  247. throw new error_1.MongoCompatibilityError('Transactions are not supported on sharded clusters in MongoDB < 4.2.');
  248. }
  249. // increment txnNumber
  250. this.incrementTransactionNumber();
  251. // create transaction state
  252. this.transaction = new transactions_1.Transaction({
  253. readConcern: options?.readConcern ??
  254. this.defaultTransactionOptions.readConcern ??
  255. this.clientOptions?.readConcern,
  256. writeConcern: options?.writeConcern ??
  257. this.defaultTransactionOptions.writeConcern ??
  258. this.clientOptions?.writeConcern,
  259. readPreference: options?.readPreference ??
  260. this.defaultTransactionOptions.readPreference ??
  261. this.clientOptions?.readPreference,
  262. maxCommitTimeMS: options?.maxCommitTimeMS ?? this.defaultTransactionOptions.maxCommitTimeMS
  263. });
  264. this.transaction.transition(transactions_1.TxnState.STARTING_TRANSACTION);
  265. }
  266. /**
  267. * Commits the currently active transaction in this session.
  268. */
  269. async commitTransaction() {
  270. return endTransactionAsync(this, 'commitTransaction');
  271. }
  272. /**
  273. * Aborts the currently active transaction in this session.
  274. */
  275. async abortTransaction() {
  276. return endTransactionAsync(this, 'abortTransaction');
  277. }
  278. /**
  279. * This is here to ensure that ClientSession is never serialized to BSON.
  280. */
  281. toBSON() {
  282. throw new error_1.MongoRuntimeError('ClientSession cannot be serialized to BSON.');
  283. }
  284. /**
  285. * Starts a transaction and runs a provided function, ensuring the commitTransaction is always attempted when all operations run in the function have completed.
  286. *
  287. * **IMPORTANT:** This method requires the user to return a Promise, and `await` all operations.
  288. *
  289. * @remarks
  290. * This function:
  291. * - If all operations successfully complete and the `commitTransaction` operation is successful, then this function will return the result of the provided function.
  292. * - If the transaction is unable to complete or an error is thrown from within the provided function, then this function will throw an error.
  293. * - If the transaction is manually aborted within the provided function it will not throw.
  294. * - May be called multiple times if the driver needs to attempt to retry the operations.
  295. *
  296. * Checkout a descriptive example here:
  297. * @see https://www.mongodb.com/blog/post/quick-start-nodejs--mongodb--how-to-implement-transactions
  298. *
  299. * @param fn - callback to run within a transaction
  300. * @param options - optional settings for the transaction
  301. * @returns A raw command response or undefined
  302. */
  303. async withTransaction(fn, options) {
  304. const startTime = (0, utils_1.now)();
  305. return attemptTransaction(this, startTime, fn, options);
  306. }
  307. }
  308. exports.ClientSession = ClientSession;
  309. _a = kSnapshotEnabled;
  310. const MAX_WITH_TRANSACTION_TIMEOUT = 120000;
  311. const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
  312. 'CannotSatisfyWriteConcern',
  313. 'UnknownReplWriteConcern',
  314. 'UnsatisfiableWriteConcern'
  315. ]);
  316. function hasNotTimedOut(startTime, max) {
  317. return (0, utils_1.calculateDurationInMs)(startTime) < max;
  318. }
  319. function isUnknownTransactionCommitResult(err) {
  320. const isNonDeterministicWriteConcernError = err instanceof error_1.MongoServerError &&
  321. err.codeName &&
  322. NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName);
  323. return (isMaxTimeMSExpiredError(err) ||
  324. (!isNonDeterministicWriteConcernError &&
  325. err.code !== error_1.MONGODB_ERROR_CODES.UnsatisfiableWriteConcern &&
  326. err.code !== error_1.MONGODB_ERROR_CODES.UnknownReplWriteConcern));
  327. }
  328. function maybeClearPinnedConnection(session, options) {
  329. // unpin a connection if it has been pinned
  330. const conn = session[kPinnedConnection];
  331. const error = options?.error;
  332. if (session.inTransaction() &&
  333. error &&
  334. error instanceof error_1.MongoError &&
  335. error.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) {
  336. return;
  337. }
  338. const topology = session.client.topology;
  339. // NOTE: the spec talks about what to do on a network error only, but the tests seem to
  340. // to validate that we don't unpin on _all_ errors?
  341. if (conn && topology != null) {
  342. const servers = Array.from(topology.s.servers.values());
  343. const loadBalancer = servers[0];
  344. if (options?.error == null || options?.force) {
  345. loadBalancer.pool.checkIn(conn);
  346. conn.emit(constants_1.UNPINNED, session.transaction.state !== transactions_1.TxnState.NO_TRANSACTION
  347. ? metrics_1.ConnectionPoolMetrics.TXN
  348. : metrics_1.ConnectionPoolMetrics.CURSOR);
  349. if (options?.forceClear) {
  350. loadBalancer.pool.clear({ serviceId: conn.serviceId });
  351. }
  352. }
  353. session[kPinnedConnection] = undefined;
  354. }
  355. }
  356. exports.maybeClearPinnedConnection = maybeClearPinnedConnection;
  357. function isMaxTimeMSExpiredError(err) {
  358. if (err == null || !(err instanceof error_1.MongoServerError)) {
  359. return false;
  360. }
  361. return (err.code === error_1.MONGODB_ERROR_CODES.MaxTimeMSExpired ||
  362. (err.writeConcernError && err.writeConcernError.code === error_1.MONGODB_ERROR_CODES.MaxTimeMSExpired));
  363. }
  364. function attemptTransactionCommit(session, startTime, fn, result, options) {
  365. return session.commitTransaction().then(() => result, (err) => {
  366. if (err instanceof error_1.MongoError &&
  367. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
  368. !isMaxTimeMSExpiredError(err)) {
  369. if (err.hasErrorLabel(error_1.MongoErrorLabel.UnknownTransactionCommitResult)) {
  370. return attemptTransactionCommit(session, startTime, fn, result, options);
  371. }
  372. if (err.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) {
  373. return attemptTransaction(session, startTime, fn, options);
  374. }
  375. }
  376. throw err;
  377. });
  378. }
  379. const USER_EXPLICIT_TXN_END_STATES = new Set([
  380. transactions_1.TxnState.NO_TRANSACTION,
  381. transactions_1.TxnState.TRANSACTION_COMMITTED,
  382. transactions_1.TxnState.TRANSACTION_ABORTED
  383. ]);
  384. function userExplicitlyEndedTransaction(session) {
  385. return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
  386. }
  387. function attemptTransaction(session, startTime, fn, options = {}) {
  388. session.startTransaction(options);
  389. let promise;
  390. try {
  391. promise = fn(session);
  392. }
  393. catch (err) {
  394. promise = Promise.reject(err);
  395. }
  396. if (!(0, utils_1.isPromiseLike)(promise)) {
  397. session.abortTransaction().catch(() => null);
  398. return Promise.reject(new error_1.MongoInvalidArgumentError('Function provided to `withTransaction` must return a Promise'));
  399. }
  400. return promise.then(result => {
  401. if (userExplicitlyEndedTransaction(session)) {
  402. return result;
  403. }
  404. return attemptTransactionCommit(session, startTime, fn, result, options);
  405. }, err => {
  406. function maybeRetryOrThrow(err) {
  407. if (err instanceof error_1.MongoError &&
  408. err.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError) &&
  409. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)) {
  410. return attemptTransaction(session, startTime, fn, options);
  411. }
  412. if (isMaxTimeMSExpiredError(err)) {
  413. err.addErrorLabel(error_1.MongoErrorLabel.UnknownTransactionCommitResult);
  414. }
  415. throw err;
  416. }
  417. if (session.inTransaction()) {
  418. return session.abortTransaction().then(() => maybeRetryOrThrow(err));
  419. }
  420. return maybeRetryOrThrow(err);
  421. });
  422. }
  423. const endTransactionAsync = (0, util_1.promisify)(endTransaction);
  424. function endTransaction(session, commandName, callback) {
  425. // handle any initial problematic cases
  426. const txnState = session.transaction.state;
  427. if (txnState === transactions_1.TxnState.NO_TRANSACTION) {
  428. callback(new error_1.MongoTransactionError('No transaction started'));
  429. return;
  430. }
  431. if (commandName === 'commitTransaction') {
  432. if (txnState === transactions_1.TxnState.STARTING_TRANSACTION ||
  433. txnState === transactions_1.TxnState.TRANSACTION_COMMITTED_EMPTY) {
  434. // the transaction was never started, we can safely exit here
  435. session.transaction.transition(transactions_1.TxnState.TRANSACTION_COMMITTED_EMPTY);
  436. callback();
  437. return;
  438. }
  439. if (txnState === transactions_1.TxnState.TRANSACTION_ABORTED) {
  440. callback(new error_1.MongoTransactionError('Cannot call commitTransaction after calling abortTransaction'));
  441. return;
  442. }
  443. }
  444. else {
  445. if (txnState === transactions_1.TxnState.STARTING_TRANSACTION) {
  446. // the transaction was never started, we can safely exit here
  447. session.transaction.transition(transactions_1.TxnState.TRANSACTION_ABORTED);
  448. callback();
  449. return;
  450. }
  451. if (txnState === transactions_1.TxnState.TRANSACTION_ABORTED) {
  452. callback(new error_1.MongoTransactionError('Cannot call abortTransaction twice'));
  453. return;
  454. }
  455. if (txnState === transactions_1.TxnState.TRANSACTION_COMMITTED ||
  456. txnState === transactions_1.TxnState.TRANSACTION_COMMITTED_EMPTY) {
  457. callback(new error_1.MongoTransactionError('Cannot call abortTransaction after calling commitTransaction'));
  458. return;
  459. }
  460. }
  461. // construct and send the command
  462. const command = { [commandName]: 1 };
  463. // apply a writeConcern if specified
  464. let writeConcern;
  465. if (session.transaction.options.writeConcern) {
  466. writeConcern = Object.assign({}, session.transaction.options.writeConcern);
  467. }
  468. else if (session.clientOptions && session.clientOptions.writeConcern) {
  469. writeConcern = { w: session.clientOptions.writeConcern.w };
  470. }
  471. if (txnState === transactions_1.TxnState.TRANSACTION_COMMITTED) {
  472. writeConcern = Object.assign({ wtimeoutMS: 10000 }, writeConcern, { w: 'majority' });
  473. }
  474. if (writeConcern) {
  475. write_concern_1.WriteConcern.apply(command, writeConcern);
  476. }
  477. if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) {
  478. Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS });
  479. }
  480. function commandHandler(error) {
  481. if (commandName !== 'commitTransaction') {
  482. session.transaction.transition(transactions_1.TxnState.TRANSACTION_ABORTED);
  483. if (session.loadBalanced) {
  484. maybeClearPinnedConnection(session, { force: false });
  485. }
  486. // The spec indicates that we should ignore all errors on `abortTransaction`
  487. return callback();
  488. }
  489. session.transaction.transition(transactions_1.TxnState.TRANSACTION_COMMITTED);
  490. if (error instanceof error_1.MongoError) {
  491. if (error.hasErrorLabel(error_1.MongoErrorLabel.RetryableWriteError) ||
  492. error instanceof error_1.MongoWriteConcernError ||
  493. isMaxTimeMSExpiredError(error)) {
  494. if (isUnknownTransactionCommitResult(error)) {
  495. error.addErrorLabel(error_1.MongoErrorLabel.UnknownTransactionCommitResult);
  496. // per txns spec, must unpin session in this case
  497. session.unpin({ error });
  498. }
  499. }
  500. else if (error.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) {
  501. session.unpin({ error });
  502. }
  503. }
  504. callback(error);
  505. }
  506. if (session.transaction.recoveryToken) {
  507. command.recoveryToken = session.transaction.recoveryToken;
  508. }
  509. // send the command
  510. (0, execute_operation_1.executeOperation)(session.client, new run_command_1.RunAdminCommandOperation(command, {
  511. session,
  512. readPreference: read_preference_1.ReadPreference.primary,
  513. bypassPinningCheck: true
  514. }), error => {
  515. if (command.abortTransaction) {
  516. // always unpin on abort regardless of command outcome
  517. session.unpin();
  518. }
  519. if (error instanceof error_1.MongoError && error.hasErrorLabel(error_1.MongoErrorLabel.RetryableWriteError)) {
  520. // SPEC-1185: apply majority write concern when retrying commitTransaction
  521. if (command.commitTransaction) {
  522. // per txns spec, must unpin session in this case
  523. session.unpin({ force: true });
  524. command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
  525. w: 'majority'
  526. });
  527. }
  528. return (0, execute_operation_1.executeOperation)(session.client, new run_command_1.RunAdminCommandOperation(command, {
  529. session,
  530. readPreference: read_preference_1.ReadPreference.primary,
  531. bypassPinningCheck: true
  532. }), commandHandler);
  533. }
  534. commandHandler(error);
  535. });
  536. }
  537. /**
  538. * Reflects the existence of a session on the server. Can be reused by the session pool.
  539. * WARNING: not meant to be instantiated directly. For internal use only.
  540. * @public
  541. */
  542. class ServerSession {
  543. /** @internal */
  544. constructor() {
  545. this.id = { id: new bson_1.Binary((0, utils_1.uuidV4)(), bson_1.Binary.SUBTYPE_UUID) };
  546. this.lastUse = (0, utils_1.now)();
  547. this.txnNumber = 0;
  548. this.isDirty = false;
  549. }
  550. /**
  551. * Determines if the server session has timed out.
  552. *
  553. * @param sessionTimeoutMinutes - The server's "logicalSessionTimeoutMinutes"
  554. */
  555. hasTimedOut(sessionTimeoutMinutes) {
  556. // Take the difference of the lastUse timestamp and now, which will result in a value in
  557. // milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes`
  558. const idleTimeMinutes = Math.round((((0, utils_1.calculateDurationInMs)(this.lastUse) % 86400000) % 3600000) / 60000);
  559. return idleTimeMinutes > sessionTimeoutMinutes - 1;
  560. }
  561. /**
  562. * @internal
  563. * Cloning meant to keep a readable reference to the server session data
  564. * after ClientSession has ended
  565. */
  566. static clone(serverSession) {
  567. const arrayBuffer = new ArrayBuffer(16);
  568. const idBytes = Buffer.from(arrayBuffer);
  569. idBytes.set(serverSession.id.id.buffer);
  570. const id = new bson_1.Binary(idBytes, serverSession.id.id.sub_type);
  571. // Manual prototype construction to avoid modifying the constructor of this class
  572. return Object.setPrototypeOf({
  573. id: { id },
  574. lastUse: serverSession.lastUse,
  575. txnNumber: serverSession.txnNumber,
  576. isDirty: serverSession.isDirty
  577. }, ServerSession.prototype);
  578. }
  579. }
  580. exports.ServerSession = ServerSession;
  581. /**
  582. * Maintains a pool of Server Sessions.
  583. * For internal use only
  584. * @internal
  585. */
  586. class ServerSessionPool {
  587. constructor(client) {
  588. if (client == null) {
  589. throw new error_1.MongoRuntimeError('ServerSessionPool requires a MongoClient');
  590. }
  591. this.client = client;
  592. this.sessions = new utils_1.List();
  593. }
  594. /**
  595. * Acquire a Server Session from the pool.
  596. * Iterates through each session in the pool, removing any stale sessions
  597. * along the way. The first non-stale session found is removed from the
  598. * pool and returned. If no non-stale session is found, a new ServerSession is created.
  599. */
  600. acquire() {
  601. const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10;
  602. let session = null;
  603. // Try to obtain from session pool
  604. while (this.sessions.length > 0) {
  605. const potentialSession = this.sessions.shift();
  606. if (potentialSession != null &&
  607. (!!this.client.topology?.loadBalanced ||
  608. !potentialSession.hasTimedOut(sessionTimeoutMinutes))) {
  609. session = potentialSession;
  610. break;
  611. }
  612. }
  613. // If nothing valid came from the pool make a new one
  614. if (session == null) {
  615. session = new ServerSession();
  616. }
  617. return session;
  618. }
  619. /**
  620. * Release a session to the session pool
  621. * Adds the session back to the session pool if the session has not timed out yet.
  622. * This method also removes any stale sessions from the pool.
  623. *
  624. * @param session - The session to release to the pool
  625. */
  626. release(session) {
  627. const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10;
  628. if (this.client.topology?.loadBalanced && !sessionTimeoutMinutes) {
  629. this.sessions.unshift(session);
  630. }
  631. if (!sessionTimeoutMinutes) {
  632. return;
  633. }
  634. this.sessions.prune(session => session.hasTimedOut(sessionTimeoutMinutes));
  635. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  636. if (session.isDirty) {
  637. return;
  638. }
  639. // otherwise, readd this session to the session pool
  640. this.sessions.unshift(session);
  641. }
  642. }
  643. }
  644. exports.ServerSessionPool = ServerSessionPool;
  645. /**
  646. * Optionally decorate a command with sessions specific keys
  647. *
  648. * @param session - the session tracking transaction state
  649. * @param command - the command to decorate
  650. * @param options - Optional settings passed to calling operation
  651. *
  652. * @internal
  653. */
  654. function applySession(session, command, options) {
  655. if (session.hasEnded) {
  656. return new error_1.MongoExpiredSessionError();
  657. }
  658. // May acquire serverSession here
  659. const serverSession = session.serverSession;
  660. if (serverSession == null) {
  661. return new error_1.MongoRuntimeError('Unable to acquire server session');
  662. }
  663. if (options.writeConcern?.w === 0) {
  664. if (session && session.explicit) {
  665. // Error if user provided an explicit session to an unacknowledged write (SPEC-1019)
  666. return new error_1.MongoAPIError('Cannot have explicit session with unacknowledged writes');
  667. }
  668. return;
  669. }
  670. // mark the last use of this session, and apply the `lsid`
  671. serverSession.lastUse = (0, utils_1.now)();
  672. command.lsid = serverSession.id;
  673. const inTxnOrTxnCommand = session.inTransaction() || (0, transactions_1.isTransactionCommand)(command);
  674. const isRetryableWrite = !!options.willRetryWrite;
  675. if (isRetryableWrite || inTxnOrTxnCommand) {
  676. serverSession.txnNumber += session[kTxnNumberIncrement];
  677. session[kTxnNumberIncrement] = 0;
  678. // TODO(NODE-2674): Preserve int64 sent from MongoDB
  679. command.txnNumber = bson_1.Long.fromNumber(serverSession.txnNumber);
  680. }
  681. if (!inTxnOrTxnCommand) {
  682. if (session.transaction.state !== transactions_1.TxnState.NO_TRANSACTION) {
  683. session.transaction.transition(transactions_1.TxnState.NO_TRANSACTION);
  684. }
  685. if (session.supports.causalConsistency &&
  686. session.operationTime &&
  687. (0, utils_1.commandSupportsReadConcern)(command)) {
  688. command.readConcern = command.readConcern || {};
  689. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  690. }
  691. else if (session[kSnapshotEnabled]) {
  692. command.readConcern = command.readConcern || { level: read_concern_1.ReadConcernLevel.snapshot };
  693. if (session[kSnapshotTime] != null) {
  694. Object.assign(command.readConcern, { atClusterTime: session[kSnapshotTime] });
  695. }
  696. }
  697. return;
  698. }
  699. // now attempt to apply transaction-specific sessions data
  700. // `autocommit` must always be false to differentiate from retryable writes
  701. command.autocommit = false;
  702. if (session.transaction.state === transactions_1.TxnState.STARTING_TRANSACTION) {
  703. session.transaction.transition(transactions_1.TxnState.TRANSACTION_IN_PROGRESS);
  704. command.startTransaction = true;
  705. const readConcern = session.transaction.options.readConcern || session?.clientOptions?.readConcern;
  706. if (readConcern) {
  707. command.readConcern = readConcern;
  708. }
  709. if (session.supports.causalConsistency && session.operationTime) {
  710. command.readConcern = command.readConcern || {};
  711. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  712. }
  713. }
  714. return;
  715. }
  716. exports.applySession = applySession;
  717. function updateSessionFromResponse(session, document) {
  718. if (document.$clusterTime) {
  719. (0, common_1._advanceClusterTime)(session, document.$clusterTime);
  720. }
  721. if (document.operationTime && session && session.supports.causalConsistency) {
  722. session.advanceOperationTime(document.operationTime);
  723. }
  724. if (document.recoveryToken && session && session.inTransaction()) {
  725. session.transaction._recoveryToken = document.recoveryToken;
  726. }
  727. if (session?.[kSnapshotEnabled] && session[kSnapshotTime] == null) {
  728. // find and aggregate commands return atClusterTime on the cursor
  729. // distinct includes it in the response body
  730. const atClusterTime = document.cursor?.atClusterTime || document.atClusterTime;
  731. if (atClusterTime) {
  732. session[kSnapshotTime] = atClusterTime;
  733. }
  734. }
  735. }
  736. exports.updateSessionFromResponse = updateSessionFromResponse;
  737. //# sourceMappingURL=sessions.js.map