sessions.ts 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052
  1. import { promisify } from 'util';
  2. import { Binary, type Document, Long, type Timestamp } from './bson';
  3. import type { CommandOptions, Connection } from './cmap/connection';
  4. import { ConnectionPoolMetrics } from './cmap/metrics';
  5. import { isSharded } from './cmap/wire_protocol/shared';
  6. import { PINNED, UNPINNED } from './constants';
  7. import type { AbstractCursor } from './cursor/abstract_cursor';
  8. import {
  9. type AnyError,
  10. MongoAPIError,
  11. MongoCompatibilityError,
  12. MONGODB_ERROR_CODES,
  13. type MongoDriverError,
  14. MongoError,
  15. MongoErrorLabel,
  16. MongoExpiredSessionError,
  17. MongoInvalidArgumentError,
  18. MongoRuntimeError,
  19. MongoServerError,
  20. MongoTransactionError,
  21. MongoWriteConcernError
  22. } from './error';
  23. import type { MongoClient, MongoOptions } from './mongo_client';
  24. import { TypedEventEmitter } from './mongo_types';
  25. import { executeOperation } from './operations/execute_operation';
  26. import { RunAdminCommandOperation } from './operations/run_command';
  27. import { ReadConcernLevel } from './read_concern';
  28. import { ReadPreference } from './read_preference';
  29. import { _advanceClusterTime, type ClusterTime, TopologyType } from './sdam/common';
  30. import {
  31. isTransactionCommand,
  32. Transaction,
  33. type TransactionOptions,
  34. TxnState
  35. } from './transactions';
  36. import {
  37. ByteUtils,
  38. calculateDurationInMs,
  39. type Callback,
  40. commandSupportsReadConcern,
  41. isPromiseLike,
  42. List,
  43. maxWireVersion,
  44. now,
  45. uuidV4
  46. } from './utils';
  47. import { WriteConcern } from './write_concern';
  48. const minWireVersionForShardedTransactions = 8;
  49. /** @public */
  50. export interface ClientSessionOptions {
  51. /** Whether causal consistency should be enabled on this session */
  52. causalConsistency?: boolean;
  53. /** Whether all read operations should be read from the same snapshot for this session (NOTE: not compatible with `causalConsistency=true`) */
  54. snapshot?: boolean;
  55. /** The default TransactionOptions to use for transactions started on this session. */
  56. defaultTransactionOptions?: TransactionOptions;
  57. /** @internal */
  58. owner?: symbol | AbstractCursor;
  59. /** @internal */
  60. explicit?: boolean;
  61. /** @internal */
  62. initialClusterTime?: ClusterTime;
  63. }
  64. /** @public */
  65. export type WithTransactionCallback<T = any> = (session: ClientSession) => Promise<T>;
  66. /** @public */
  67. export type ClientSessionEvents = {
  68. ended(session: ClientSession): void;
  69. };
  70. /** @internal */
  71. const kServerSession = Symbol('serverSession');
  72. /** @internal */
  73. const kSnapshotTime = Symbol('snapshotTime');
  74. /** @internal */
  75. const kSnapshotEnabled = Symbol('snapshotEnabled');
  76. /** @internal */
  77. const kPinnedConnection = Symbol('pinnedConnection');
  78. /** @internal Accumulates total number of increments to add to txnNumber when applying session to command */
  79. const kTxnNumberIncrement = Symbol('txnNumberIncrement');
  80. /** @public */
  81. export interface EndSessionOptions {
  82. /**
  83. * An optional error which caused the call to end this session
  84. * @internal
  85. */
  86. error?: AnyError;
  87. force?: boolean;
  88. forceClear?: boolean;
  89. }
  90. /**
  91. * A class representing a client session on the server
  92. *
  93. * NOTE: not meant to be instantiated directly.
  94. * @public
  95. */
  96. export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
  97. /** @internal */
  98. client: MongoClient;
  99. /** @internal */
  100. sessionPool: ServerSessionPool;
  101. hasEnded: boolean;
  102. clientOptions?: MongoOptions;
  103. supports: { causalConsistency: boolean };
  104. clusterTime?: ClusterTime;
  105. operationTime?: Timestamp;
  106. explicit: boolean;
  107. /** @internal */
  108. owner?: symbol | AbstractCursor;
  109. defaultTransactionOptions: TransactionOptions;
  110. transaction: Transaction;
  111. /** @internal */
  112. [kServerSession]: ServerSession | null;
  113. /** @internal */
  114. [kSnapshotTime]?: Timestamp;
  115. /** @internal */
  116. [kSnapshotEnabled] = false;
  117. /** @internal */
  118. [kPinnedConnection]?: Connection;
  119. /** @internal */
  120. [kTxnNumberIncrement]: number;
  121. /**
  122. * Create a client session.
  123. * @internal
  124. * @param client - The current client
  125. * @param sessionPool - The server session pool (Internal Class)
  126. * @param options - Optional settings
  127. * @param clientOptions - Optional settings provided when creating a MongoClient
  128. */
  129. constructor(
  130. client: MongoClient,
  131. sessionPool: ServerSessionPool,
  132. options: ClientSessionOptions,
  133. clientOptions?: MongoOptions
  134. ) {
  135. super();
  136. if (client == null) {
  137. // TODO(NODE-3483)
  138. throw new MongoRuntimeError('ClientSession requires a MongoClient');
  139. }
  140. if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) {
  141. // TODO(NODE-3483)
  142. throw new MongoRuntimeError('ClientSession requires a ServerSessionPool');
  143. }
  144. options = options ?? {};
  145. if (options.snapshot === true) {
  146. this[kSnapshotEnabled] = true;
  147. if (options.causalConsistency === true) {
  148. throw new MongoInvalidArgumentError(
  149. 'Properties "causalConsistency" and "snapshot" are mutually exclusive'
  150. );
  151. }
  152. }
  153. this.client = client;
  154. this.sessionPool = sessionPool;
  155. this.hasEnded = false;
  156. this.clientOptions = clientOptions;
  157. this.explicit = !!options.explicit;
  158. this[kServerSession] = this.explicit ? this.sessionPool.acquire() : null;
  159. this[kTxnNumberIncrement] = 0;
  160. const defaultCausalConsistencyValue = this.explicit && options.snapshot !== true;
  161. this.supports = {
  162. // if we can enable causal consistency, do so by default
  163. causalConsistency: options.causalConsistency ?? defaultCausalConsistencyValue
  164. };
  165. this.clusterTime = options.initialClusterTime;
  166. this.operationTime = undefined;
  167. this.owner = options.owner;
  168. this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
  169. this.transaction = new Transaction();
  170. }
  171. /** The server id associated with this session */
  172. get id(): ServerSessionId | undefined {
  173. return this[kServerSession]?.id;
  174. }
  175. get serverSession(): ServerSession {
  176. let serverSession = this[kServerSession];
  177. if (serverSession == null) {
  178. if (this.explicit) {
  179. throw new MongoRuntimeError('Unexpected null serverSession for an explicit session');
  180. }
  181. if (this.hasEnded) {
  182. throw new MongoRuntimeError('Unexpected null serverSession for an ended implicit session');
  183. }
  184. serverSession = this.sessionPool.acquire();
  185. this[kServerSession] = serverSession;
  186. }
  187. return serverSession;
  188. }
  189. /** Whether or not this session is configured for snapshot reads */
  190. get snapshotEnabled(): boolean {
  191. return this[kSnapshotEnabled];
  192. }
  193. get loadBalanced(): boolean {
  194. return this.client.topology?.description.type === TopologyType.LoadBalanced;
  195. }
  196. /** @internal */
  197. get pinnedConnection(): Connection | undefined {
  198. return this[kPinnedConnection];
  199. }
  200. /** @internal */
  201. pin(conn: Connection): void {
  202. if (this[kPinnedConnection]) {
  203. throw TypeError('Cannot pin multiple connections to the same session');
  204. }
  205. this[kPinnedConnection] = conn;
  206. conn.emit(
  207. PINNED,
  208. this.inTransaction() ? ConnectionPoolMetrics.TXN : ConnectionPoolMetrics.CURSOR
  209. );
  210. }
  211. /** @internal */
  212. unpin(options?: { force?: boolean; forceClear?: boolean; error?: AnyError }): void {
  213. if (this.loadBalanced) {
  214. return maybeClearPinnedConnection(this, options);
  215. }
  216. this.transaction.unpinServer();
  217. }
  218. get isPinned(): boolean {
  219. return this.loadBalanced ? !!this[kPinnedConnection] : this.transaction.isPinned;
  220. }
  221. /**
  222. * Ends this session on the server
  223. *
  224. * @param options - Optional settings. Currently reserved for future use
  225. */
  226. async endSession(options?: EndSessionOptions): Promise<void> {
  227. try {
  228. if (this.inTransaction()) {
  229. await this.abortTransaction();
  230. }
  231. if (!this.hasEnded) {
  232. const serverSession = this[kServerSession];
  233. if (serverSession != null) {
  234. // release the server session back to the pool
  235. this.sessionPool.release(serverSession);
  236. // Make sure a new serverSession never makes it onto this ClientSession
  237. Object.defineProperty(this, kServerSession, {
  238. value: ServerSession.clone(serverSession),
  239. writable: false
  240. });
  241. }
  242. // mark the session as ended, and emit a signal
  243. this.hasEnded = true;
  244. this.emit('ended', this);
  245. }
  246. } catch {
  247. // spec indicates that we should ignore all errors for `endSessions`
  248. } finally {
  249. maybeClearPinnedConnection(this, { force: true, ...options });
  250. }
  251. }
  252. /**
  253. * Advances the operationTime for a ClientSession.
  254. *
  255. * @param operationTime - the `BSON.Timestamp` of the operation type it is desired to advance to
  256. */
  257. advanceOperationTime(operationTime: Timestamp): void {
  258. if (this.operationTime == null) {
  259. this.operationTime = operationTime;
  260. return;
  261. }
  262. if (operationTime.greaterThan(this.operationTime)) {
  263. this.operationTime = operationTime;
  264. }
  265. }
  266. /**
  267. * Advances the clusterTime for a ClientSession to the provided clusterTime of another ClientSession
  268. *
  269. * @param clusterTime - the $clusterTime returned by the server from another session in the form of a document containing the `BSON.Timestamp` clusterTime and signature
  270. */
  271. advanceClusterTime(clusterTime: ClusterTime): void {
  272. if (!clusterTime || typeof clusterTime !== 'object') {
  273. throw new MongoInvalidArgumentError('input cluster time must be an object');
  274. }
  275. if (!clusterTime.clusterTime || clusterTime.clusterTime._bsontype !== 'Timestamp') {
  276. throw new MongoInvalidArgumentError(
  277. 'input cluster time "clusterTime" property must be a valid BSON Timestamp'
  278. );
  279. }
  280. if (
  281. !clusterTime.signature ||
  282. clusterTime.signature.hash?._bsontype !== 'Binary' ||
  283. (typeof clusterTime.signature.keyId !== 'bigint' &&
  284. typeof clusterTime.signature.keyId !== 'number' &&
  285. clusterTime.signature.keyId?._bsontype !== 'Long') // apparently we decode the key to number?
  286. ) {
  287. throw new MongoInvalidArgumentError(
  288. 'input cluster time must have a valid "signature" property with BSON Binary hash and BSON Long keyId'
  289. );
  290. }
  291. _advanceClusterTime(this, clusterTime);
  292. }
  293. /**
  294. * Used to determine if this session equals another
  295. *
  296. * @param session - The session to compare to
  297. */
  298. equals(session: ClientSession): boolean {
  299. if (!(session instanceof ClientSession)) {
  300. return false;
  301. }
  302. if (this.id == null || session.id == null) {
  303. return false;
  304. }
  305. return ByteUtils.equals(this.id.id.buffer, session.id.id.buffer);
  306. }
  307. /**
  308. * Increment the transaction number on the internal ServerSession
  309. *
  310. * @privateRemarks
  311. * This helper increments a value stored on the client session that will be
  312. * added to the serverSession's txnNumber upon applying it to a command.
  313. * This is because the serverSession is lazily acquired after a connection is obtained
  314. */
  315. incrementTransactionNumber(): void {
  316. this[kTxnNumberIncrement] += 1;
  317. }
  318. /** @returns whether this session is currently in a transaction or not */
  319. inTransaction(): boolean {
  320. return this.transaction.isActive;
  321. }
  322. /**
  323. * Starts a new transaction with the given options.
  324. *
  325. * @param options - Options for the transaction
  326. */
  327. startTransaction(options?: TransactionOptions): void {
  328. if (this[kSnapshotEnabled]) {
  329. throw new MongoCompatibilityError('Transactions are not supported in snapshot sessions');
  330. }
  331. if (this.inTransaction()) {
  332. throw new MongoTransactionError('Transaction already in progress');
  333. }
  334. if (this.isPinned && this.transaction.isCommitted) {
  335. this.unpin();
  336. }
  337. const topologyMaxWireVersion = maxWireVersion(this.client.topology);
  338. if (
  339. isSharded(this.client.topology) &&
  340. topologyMaxWireVersion != null &&
  341. topologyMaxWireVersion < minWireVersionForShardedTransactions
  342. ) {
  343. throw new MongoCompatibilityError(
  344. 'Transactions are not supported on sharded clusters in MongoDB < 4.2.'
  345. );
  346. }
  347. // increment txnNumber
  348. this.incrementTransactionNumber();
  349. // create transaction state
  350. this.transaction = new Transaction({
  351. readConcern:
  352. options?.readConcern ??
  353. this.defaultTransactionOptions.readConcern ??
  354. this.clientOptions?.readConcern,
  355. writeConcern:
  356. options?.writeConcern ??
  357. this.defaultTransactionOptions.writeConcern ??
  358. this.clientOptions?.writeConcern,
  359. readPreference:
  360. options?.readPreference ??
  361. this.defaultTransactionOptions.readPreference ??
  362. this.clientOptions?.readPreference,
  363. maxCommitTimeMS: options?.maxCommitTimeMS ?? this.defaultTransactionOptions.maxCommitTimeMS
  364. });
  365. this.transaction.transition(TxnState.STARTING_TRANSACTION);
  366. }
  367. /**
  368. * Commits the currently active transaction in this session.
  369. */
  370. async commitTransaction(): Promise<void> {
  371. return endTransactionAsync(this, 'commitTransaction');
  372. }
  373. /**
  374. * Aborts the currently active transaction in this session.
  375. */
  376. async abortTransaction(): Promise<void> {
  377. return endTransactionAsync(this, 'abortTransaction');
  378. }
  379. /**
  380. * This is here to ensure that ClientSession is never serialized to BSON.
  381. */
  382. toBSON(): never {
  383. throw new MongoRuntimeError('ClientSession cannot be serialized to BSON.');
  384. }
  385. /**
  386. * Starts a transaction and runs a provided function, ensuring the commitTransaction is always attempted when all operations run in the function have completed.
  387. *
  388. * **IMPORTANT:** This method requires the user to return a Promise, and `await` all operations.
  389. *
  390. * @remarks
  391. * This function:
  392. * - If all operations successfully complete and the `commitTransaction` operation is successful, then this function will return the result of the provided function.
  393. * - If the transaction is unable to complete or an error is thrown from within the provided function, then this function will throw an error.
  394. * - If the transaction is manually aborted within the provided function it will not throw.
  395. * - May be called multiple times if the driver needs to attempt to retry the operations.
  396. *
  397. * Checkout a descriptive example here:
  398. * @see https://www.mongodb.com/blog/post/quick-start-nodejs--mongodb--how-to-implement-transactions
  399. *
  400. * @param fn - callback to run within a transaction
  401. * @param options - optional settings for the transaction
  402. * @returns A raw command response or undefined
  403. */
  404. async withTransaction<T = any>(
  405. fn: WithTransactionCallback<T>,
  406. options?: TransactionOptions
  407. ): Promise<T> {
  408. const startTime = now();
  409. return attemptTransaction(this, startTime, fn, options);
  410. }
  411. }
  412. const MAX_WITH_TRANSACTION_TIMEOUT = 120000;
  413. const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
  414. 'CannotSatisfyWriteConcern',
  415. 'UnknownReplWriteConcern',
  416. 'UnsatisfiableWriteConcern'
  417. ]);
  418. function hasNotTimedOut(startTime: number, max: number) {
  419. return calculateDurationInMs(startTime) < max;
  420. }
  421. function isUnknownTransactionCommitResult(err: MongoError) {
  422. const isNonDeterministicWriteConcernError =
  423. err instanceof MongoServerError &&
  424. err.codeName &&
  425. NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName);
  426. return (
  427. isMaxTimeMSExpiredError(err) ||
  428. (!isNonDeterministicWriteConcernError &&
  429. err.code !== MONGODB_ERROR_CODES.UnsatisfiableWriteConcern &&
  430. err.code !== MONGODB_ERROR_CODES.UnknownReplWriteConcern)
  431. );
  432. }
  433. export function maybeClearPinnedConnection(
  434. session: ClientSession,
  435. options?: EndSessionOptions
  436. ): void {
  437. // unpin a connection if it has been pinned
  438. const conn = session[kPinnedConnection];
  439. const error = options?.error;
  440. if (
  441. session.inTransaction() &&
  442. error &&
  443. error instanceof MongoError &&
  444. error.hasErrorLabel(MongoErrorLabel.TransientTransactionError)
  445. ) {
  446. return;
  447. }
  448. const topology = session.client.topology;
  449. // NOTE: the spec talks about what to do on a network error only, but the tests seem to
  450. // to validate that we don't unpin on _all_ errors?
  451. if (conn && topology != null) {
  452. const servers = Array.from(topology.s.servers.values());
  453. const loadBalancer = servers[0];
  454. if (options?.error == null || options?.force) {
  455. loadBalancer.pool.checkIn(conn);
  456. conn.emit(
  457. UNPINNED,
  458. session.transaction.state !== TxnState.NO_TRANSACTION
  459. ? ConnectionPoolMetrics.TXN
  460. : ConnectionPoolMetrics.CURSOR
  461. );
  462. if (options?.forceClear) {
  463. loadBalancer.pool.clear({ serviceId: conn.serviceId });
  464. }
  465. }
  466. session[kPinnedConnection] = undefined;
  467. }
  468. }
  469. function isMaxTimeMSExpiredError(err: MongoError) {
  470. if (err == null || !(err instanceof MongoServerError)) {
  471. return false;
  472. }
  473. return (
  474. err.code === MONGODB_ERROR_CODES.MaxTimeMSExpired ||
  475. (err.writeConcernError && err.writeConcernError.code === MONGODB_ERROR_CODES.MaxTimeMSExpired)
  476. );
  477. }
  478. function attemptTransactionCommit<T>(
  479. session: ClientSession,
  480. startTime: number,
  481. fn: WithTransactionCallback<T>,
  482. result: any,
  483. options: TransactionOptions
  484. ): Promise<T> {
  485. return session.commitTransaction().then(
  486. () => result,
  487. (err: MongoError) => {
  488. if (
  489. err instanceof MongoError &&
  490. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
  491. !isMaxTimeMSExpiredError(err)
  492. ) {
  493. if (err.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)) {
  494. return attemptTransactionCommit(session, startTime, fn, result, options);
  495. }
  496. if (err.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
  497. return attemptTransaction(session, startTime, fn, options);
  498. }
  499. }
  500. throw err;
  501. }
  502. );
  503. }
  504. const USER_EXPLICIT_TXN_END_STATES = new Set<TxnState>([
  505. TxnState.NO_TRANSACTION,
  506. TxnState.TRANSACTION_COMMITTED,
  507. TxnState.TRANSACTION_ABORTED
  508. ]);
  509. function userExplicitlyEndedTransaction(session: ClientSession) {
  510. return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
  511. }
  512. function attemptTransaction<T>(
  513. session: ClientSession,
  514. startTime: number,
  515. fn: WithTransactionCallback<T>,
  516. options: TransactionOptions = {}
  517. ): Promise<any> {
  518. session.startTransaction(options);
  519. let promise;
  520. try {
  521. promise = fn(session);
  522. } catch (err) {
  523. promise = Promise.reject(err);
  524. }
  525. if (!isPromiseLike(promise)) {
  526. session.abortTransaction().catch(() => null);
  527. return Promise.reject(
  528. new MongoInvalidArgumentError('Function provided to `withTransaction` must return a Promise')
  529. );
  530. }
  531. return promise.then(
  532. result => {
  533. if (userExplicitlyEndedTransaction(session)) {
  534. return result;
  535. }
  536. return attemptTransactionCommit(session, startTime, fn, result, options);
  537. },
  538. err => {
  539. function maybeRetryOrThrow(err: MongoError): Promise<any> {
  540. if (
  541. err instanceof MongoError &&
  542. err.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
  543. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)
  544. ) {
  545. return attemptTransaction(session, startTime, fn, options);
  546. }
  547. if (isMaxTimeMSExpiredError(err)) {
  548. err.addErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult);
  549. }
  550. throw err;
  551. }
  552. if (session.inTransaction()) {
  553. return session.abortTransaction().then(() => maybeRetryOrThrow(err));
  554. }
  555. return maybeRetryOrThrow(err);
  556. }
  557. );
  558. }
  559. const endTransactionAsync = promisify(
  560. endTransaction as (
  561. session: ClientSession,
  562. commandName: 'abortTransaction' | 'commitTransaction',
  563. callback: (error: Error) => void
  564. ) => void
  565. );
  566. function endTransaction(
  567. session: ClientSession,
  568. commandName: 'abortTransaction' | 'commitTransaction',
  569. callback: Callback<void>
  570. ) {
  571. // handle any initial problematic cases
  572. const txnState = session.transaction.state;
  573. if (txnState === TxnState.NO_TRANSACTION) {
  574. callback(new MongoTransactionError('No transaction started'));
  575. return;
  576. }
  577. if (commandName === 'commitTransaction') {
  578. if (
  579. txnState === TxnState.STARTING_TRANSACTION ||
  580. txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
  581. ) {
  582. // the transaction was never started, we can safely exit here
  583. session.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY);
  584. callback();
  585. return;
  586. }
  587. if (txnState === TxnState.TRANSACTION_ABORTED) {
  588. callback(
  589. new MongoTransactionError('Cannot call commitTransaction after calling abortTransaction')
  590. );
  591. return;
  592. }
  593. } else {
  594. if (txnState === TxnState.STARTING_TRANSACTION) {
  595. // the transaction was never started, we can safely exit here
  596. session.transaction.transition(TxnState.TRANSACTION_ABORTED);
  597. callback();
  598. return;
  599. }
  600. if (txnState === TxnState.TRANSACTION_ABORTED) {
  601. callback(new MongoTransactionError('Cannot call abortTransaction twice'));
  602. return;
  603. }
  604. if (
  605. txnState === TxnState.TRANSACTION_COMMITTED ||
  606. txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
  607. ) {
  608. callback(
  609. new MongoTransactionError('Cannot call abortTransaction after calling commitTransaction')
  610. );
  611. return;
  612. }
  613. }
  614. // construct and send the command
  615. const command: Document = { [commandName]: 1 };
  616. // apply a writeConcern if specified
  617. let writeConcern;
  618. if (session.transaction.options.writeConcern) {
  619. writeConcern = Object.assign({}, session.transaction.options.writeConcern);
  620. } else if (session.clientOptions && session.clientOptions.writeConcern) {
  621. writeConcern = { w: session.clientOptions.writeConcern.w };
  622. }
  623. if (txnState === TxnState.TRANSACTION_COMMITTED) {
  624. writeConcern = Object.assign({ wtimeoutMS: 10000 }, writeConcern, { w: 'majority' });
  625. }
  626. if (writeConcern) {
  627. WriteConcern.apply(command, writeConcern);
  628. }
  629. if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) {
  630. Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS });
  631. }
  632. function commandHandler(error?: Error) {
  633. if (commandName !== 'commitTransaction') {
  634. session.transaction.transition(TxnState.TRANSACTION_ABORTED);
  635. if (session.loadBalanced) {
  636. maybeClearPinnedConnection(session, { force: false });
  637. }
  638. // The spec indicates that we should ignore all errors on `abortTransaction`
  639. return callback();
  640. }
  641. session.transaction.transition(TxnState.TRANSACTION_COMMITTED);
  642. if (error instanceof MongoError) {
  643. if (
  644. error.hasErrorLabel(MongoErrorLabel.RetryableWriteError) ||
  645. error instanceof MongoWriteConcernError ||
  646. isMaxTimeMSExpiredError(error)
  647. ) {
  648. if (isUnknownTransactionCommitResult(error)) {
  649. error.addErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult);
  650. // per txns spec, must unpin session in this case
  651. session.unpin({ error });
  652. }
  653. } else if (error.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
  654. session.unpin({ error });
  655. }
  656. }
  657. callback(error);
  658. }
  659. if (session.transaction.recoveryToken) {
  660. command.recoveryToken = session.transaction.recoveryToken;
  661. }
  662. // send the command
  663. executeOperation(
  664. session.client,
  665. new RunAdminCommandOperation(command, {
  666. session,
  667. readPreference: ReadPreference.primary,
  668. bypassPinningCheck: true
  669. }),
  670. error => {
  671. if (command.abortTransaction) {
  672. // always unpin on abort regardless of command outcome
  673. session.unpin();
  674. }
  675. if (error instanceof MongoError && error.hasErrorLabel(MongoErrorLabel.RetryableWriteError)) {
  676. // SPEC-1185: apply majority write concern when retrying commitTransaction
  677. if (command.commitTransaction) {
  678. // per txns spec, must unpin session in this case
  679. session.unpin({ force: true });
  680. command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
  681. w: 'majority'
  682. });
  683. }
  684. return executeOperation(
  685. session.client,
  686. new RunAdminCommandOperation(command, {
  687. session,
  688. readPreference: ReadPreference.primary,
  689. bypassPinningCheck: true
  690. }),
  691. commandHandler
  692. );
  693. }
  694. commandHandler(error);
  695. }
  696. );
  697. }
  698. /** @public */
  699. export type ServerSessionId = { id: Binary };
  700. /**
  701. * Reflects the existence of a session on the server. Can be reused by the session pool.
  702. * WARNING: not meant to be instantiated directly. For internal use only.
  703. * @public
  704. */
  705. export class ServerSession {
  706. id: ServerSessionId;
  707. lastUse: number;
  708. txnNumber: number;
  709. isDirty: boolean;
  710. /** @internal */
  711. constructor() {
  712. this.id = { id: new Binary(uuidV4(), Binary.SUBTYPE_UUID) };
  713. this.lastUse = now();
  714. this.txnNumber = 0;
  715. this.isDirty = false;
  716. }
  717. /**
  718. * Determines if the server session has timed out.
  719. *
  720. * @param sessionTimeoutMinutes - The server's "logicalSessionTimeoutMinutes"
  721. */
  722. hasTimedOut(sessionTimeoutMinutes: number): boolean {
  723. // Take the difference of the lastUse timestamp and now, which will result in a value in
  724. // milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes`
  725. const idleTimeMinutes = Math.round(
  726. ((calculateDurationInMs(this.lastUse) % 86400000) % 3600000) / 60000
  727. );
  728. return idleTimeMinutes > sessionTimeoutMinutes - 1;
  729. }
  730. /**
  731. * @internal
  732. * Cloning meant to keep a readable reference to the server session data
  733. * after ClientSession has ended
  734. */
  735. static clone(serverSession: ServerSession): Readonly<ServerSession> {
  736. const arrayBuffer = new ArrayBuffer(16);
  737. const idBytes = Buffer.from(arrayBuffer);
  738. idBytes.set(serverSession.id.id.buffer);
  739. const id = new Binary(idBytes, serverSession.id.id.sub_type);
  740. // Manual prototype construction to avoid modifying the constructor of this class
  741. return Object.setPrototypeOf(
  742. {
  743. id: { id },
  744. lastUse: serverSession.lastUse,
  745. txnNumber: serverSession.txnNumber,
  746. isDirty: serverSession.isDirty
  747. },
  748. ServerSession.prototype
  749. );
  750. }
  751. }
  752. /**
  753. * Maintains a pool of Server Sessions.
  754. * For internal use only
  755. * @internal
  756. */
  757. export class ServerSessionPool {
  758. client: MongoClient;
  759. sessions: List<ServerSession>;
  760. constructor(client: MongoClient) {
  761. if (client == null) {
  762. throw new MongoRuntimeError('ServerSessionPool requires a MongoClient');
  763. }
  764. this.client = client;
  765. this.sessions = new List<ServerSession>();
  766. }
  767. /**
  768. * Acquire a Server Session from the pool.
  769. * Iterates through each session in the pool, removing any stale sessions
  770. * along the way. The first non-stale session found is removed from the
  771. * pool and returned. If no non-stale session is found, a new ServerSession is created.
  772. */
  773. acquire(): ServerSession {
  774. const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10;
  775. let session: ServerSession | null = null;
  776. // Try to obtain from session pool
  777. while (this.sessions.length > 0) {
  778. const potentialSession = this.sessions.shift();
  779. if (
  780. potentialSession != null &&
  781. (!!this.client.topology?.loadBalanced ||
  782. !potentialSession.hasTimedOut(sessionTimeoutMinutes))
  783. ) {
  784. session = potentialSession;
  785. break;
  786. }
  787. }
  788. // If nothing valid came from the pool make a new one
  789. if (session == null) {
  790. session = new ServerSession();
  791. }
  792. return session;
  793. }
  794. /**
  795. * Release a session to the session pool
  796. * Adds the session back to the session pool if the session has not timed out yet.
  797. * This method also removes any stale sessions from the pool.
  798. *
  799. * @param session - The session to release to the pool
  800. */
  801. release(session: ServerSession): void {
  802. const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10;
  803. if (this.client.topology?.loadBalanced && !sessionTimeoutMinutes) {
  804. this.sessions.unshift(session);
  805. }
  806. if (!sessionTimeoutMinutes) {
  807. return;
  808. }
  809. this.sessions.prune(session => session.hasTimedOut(sessionTimeoutMinutes));
  810. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  811. if (session.isDirty) {
  812. return;
  813. }
  814. // otherwise, readd this session to the session pool
  815. this.sessions.unshift(session);
  816. }
  817. }
  818. }
  819. /**
  820. * Optionally decorate a command with sessions specific keys
  821. *
  822. * @param session - the session tracking transaction state
  823. * @param command - the command to decorate
  824. * @param options - Optional settings passed to calling operation
  825. *
  826. * @internal
  827. */
  828. export function applySession(
  829. session: ClientSession,
  830. command: Document,
  831. options: CommandOptions
  832. ): MongoDriverError | undefined {
  833. if (session.hasEnded) {
  834. return new MongoExpiredSessionError();
  835. }
  836. // May acquire serverSession here
  837. const serverSession = session.serverSession;
  838. if (serverSession == null) {
  839. return new MongoRuntimeError('Unable to acquire server session');
  840. }
  841. if (options.writeConcern?.w === 0) {
  842. if (session && session.explicit) {
  843. // Error if user provided an explicit session to an unacknowledged write (SPEC-1019)
  844. return new MongoAPIError('Cannot have explicit session with unacknowledged writes');
  845. }
  846. return;
  847. }
  848. // mark the last use of this session, and apply the `lsid`
  849. serverSession.lastUse = now();
  850. command.lsid = serverSession.id;
  851. const inTxnOrTxnCommand = session.inTransaction() || isTransactionCommand(command);
  852. const isRetryableWrite = !!options.willRetryWrite;
  853. if (isRetryableWrite || inTxnOrTxnCommand) {
  854. serverSession.txnNumber += session[kTxnNumberIncrement];
  855. session[kTxnNumberIncrement] = 0;
  856. // TODO(NODE-2674): Preserve int64 sent from MongoDB
  857. command.txnNumber = Long.fromNumber(serverSession.txnNumber);
  858. }
  859. if (!inTxnOrTxnCommand) {
  860. if (session.transaction.state !== TxnState.NO_TRANSACTION) {
  861. session.transaction.transition(TxnState.NO_TRANSACTION);
  862. }
  863. if (
  864. session.supports.causalConsistency &&
  865. session.operationTime &&
  866. commandSupportsReadConcern(command)
  867. ) {
  868. command.readConcern = command.readConcern || {};
  869. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  870. } else if (session[kSnapshotEnabled]) {
  871. command.readConcern = command.readConcern || { level: ReadConcernLevel.snapshot };
  872. if (session[kSnapshotTime] != null) {
  873. Object.assign(command.readConcern, { atClusterTime: session[kSnapshotTime] });
  874. }
  875. }
  876. return;
  877. }
  878. // now attempt to apply transaction-specific sessions data
  879. // `autocommit` must always be false to differentiate from retryable writes
  880. command.autocommit = false;
  881. if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
  882. session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
  883. command.startTransaction = true;
  884. const readConcern =
  885. session.transaction.options.readConcern || session?.clientOptions?.readConcern;
  886. if (readConcern) {
  887. command.readConcern = readConcern;
  888. }
  889. if (session.supports.causalConsistency && session.operationTime) {
  890. command.readConcern = command.readConcern || {};
  891. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  892. }
  893. }
  894. return;
  895. }
  896. export function updateSessionFromResponse(session: ClientSession, document: Document): void {
  897. if (document.$clusterTime) {
  898. _advanceClusterTime(session, document.$clusterTime);
  899. }
  900. if (document.operationTime && session && session.supports.causalConsistency) {
  901. session.advanceOperationTime(document.operationTime);
  902. }
  903. if (document.recoveryToken && session && session.inTransaction()) {
  904. session.transaction._recoveryToken = document.recoveryToken;
  905. }
  906. if (session?.[kSnapshotEnabled] && session[kSnapshotTime] == null) {
  907. // find and aggregate commands return atClusterTime on the cursor
  908. // distinct includes it in the response body
  909. const atClusterTime = document.cursor?.atClusterTime || document.atClusterTime;
  910. if (atClusterTime) {
  911. session[kSnapshotTime] = atClusterTime;
  912. }
  913. }
  914. }