12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052 |
- import { promisify } from 'util';
- import { Binary, type Document, Long, type Timestamp } from './bson';
- import type { CommandOptions, Connection } from './cmap/connection';
- import { ConnectionPoolMetrics } from './cmap/metrics';
- import { isSharded } from './cmap/wire_protocol/shared';
- import { PINNED, UNPINNED } from './constants';
- import type { AbstractCursor } from './cursor/abstract_cursor';
- import {
- type AnyError,
- MongoAPIError,
- MongoCompatibilityError,
- MONGODB_ERROR_CODES,
- type MongoDriverError,
- MongoError,
- MongoErrorLabel,
- MongoExpiredSessionError,
- MongoInvalidArgumentError,
- MongoRuntimeError,
- MongoServerError,
- MongoTransactionError,
- MongoWriteConcernError
- } from './error';
- import type { MongoClient, MongoOptions } from './mongo_client';
- import { TypedEventEmitter } from './mongo_types';
- import { executeOperation } from './operations/execute_operation';
- import { RunAdminCommandOperation } from './operations/run_command';
- import { ReadConcernLevel } from './read_concern';
- import { ReadPreference } from './read_preference';
- import { _advanceClusterTime, type ClusterTime, TopologyType } from './sdam/common';
- import {
- isTransactionCommand,
- Transaction,
- type TransactionOptions,
- TxnState
- } from './transactions';
- import {
- ByteUtils,
- calculateDurationInMs,
- type Callback,
- commandSupportsReadConcern,
- isPromiseLike,
- List,
- maxWireVersion,
- now,
- uuidV4
- } from './utils';
- import { WriteConcern } from './write_concern';
- const minWireVersionForShardedTransactions = 8;
- /** @public */
- export interface ClientSessionOptions {
- /** Whether causal consistency should be enabled on this session */
- causalConsistency?: boolean;
- /** Whether all read operations should be read from the same snapshot for this session (NOTE: not compatible with `causalConsistency=true`) */
- snapshot?: boolean;
- /** The default TransactionOptions to use for transactions started on this session. */
- defaultTransactionOptions?: TransactionOptions;
- /** @internal */
- owner?: symbol | AbstractCursor;
- /** @internal */
- explicit?: boolean;
- /** @internal */
- initialClusterTime?: ClusterTime;
- }
- /** @public */
- export type WithTransactionCallback<T = any> = (session: ClientSession) => Promise<T>;
- /** @public */
- export type ClientSessionEvents = {
- ended(session: ClientSession): void;
- };
- /** @internal */
- const kServerSession = Symbol('serverSession');
- /** @internal */
- const kSnapshotTime = Symbol('snapshotTime');
- /** @internal */
- const kSnapshotEnabled = Symbol('snapshotEnabled');
- /** @internal */
- const kPinnedConnection = Symbol('pinnedConnection');
- /** @internal Accumulates total number of increments to add to txnNumber when applying session to command */
- const kTxnNumberIncrement = Symbol('txnNumberIncrement');
- /** @public */
- export interface EndSessionOptions {
- /**
- * An optional error which caused the call to end this session
- * @internal
- */
- error?: AnyError;
- force?: boolean;
- forceClear?: boolean;
- }
- /**
- * A class representing a client session on the server
- *
- * NOTE: not meant to be instantiated directly.
- * @public
- */
- export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
- /** @internal */
- client: MongoClient;
- /** @internal */
- sessionPool: ServerSessionPool;
- hasEnded: boolean;
- clientOptions?: MongoOptions;
- supports: { causalConsistency: boolean };
- clusterTime?: ClusterTime;
- operationTime?: Timestamp;
- explicit: boolean;
- /** @internal */
- owner?: symbol | AbstractCursor;
- defaultTransactionOptions: TransactionOptions;
- transaction: Transaction;
- /** @internal */
- [kServerSession]: ServerSession | null;
- /** @internal */
- [kSnapshotTime]?: Timestamp;
- /** @internal */
- [kSnapshotEnabled] = false;
- /** @internal */
- [kPinnedConnection]?: Connection;
- /** @internal */
- [kTxnNumberIncrement]: number;
- /**
- * Create a client session.
- * @internal
- * @param client - The current client
- * @param sessionPool - The server session pool (Internal Class)
- * @param options - Optional settings
- * @param clientOptions - Optional settings provided when creating a MongoClient
- */
- constructor(
- client: MongoClient,
- sessionPool: ServerSessionPool,
- options: ClientSessionOptions,
- clientOptions?: MongoOptions
- ) {
- super();
- if (client == null) {
- // TODO(NODE-3483)
- throw new MongoRuntimeError('ClientSession requires a MongoClient');
- }
- if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) {
- // TODO(NODE-3483)
- throw new MongoRuntimeError('ClientSession requires a ServerSessionPool');
- }
- options = options ?? {};
- if (options.snapshot === true) {
- this[kSnapshotEnabled] = true;
- if (options.causalConsistency === true) {
- throw new MongoInvalidArgumentError(
- 'Properties "causalConsistency" and "snapshot" are mutually exclusive'
- );
- }
- }
- this.client = client;
- this.sessionPool = sessionPool;
- this.hasEnded = false;
- this.clientOptions = clientOptions;
- this.explicit = !!options.explicit;
- this[kServerSession] = this.explicit ? this.sessionPool.acquire() : null;
- this[kTxnNumberIncrement] = 0;
- const defaultCausalConsistencyValue = this.explicit && options.snapshot !== true;
- this.supports = {
- // if we can enable causal consistency, do so by default
- causalConsistency: options.causalConsistency ?? defaultCausalConsistencyValue
- };
- this.clusterTime = options.initialClusterTime;
- this.operationTime = undefined;
- this.owner = options.owner;
- this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
- this.transaction = new Transaction();
- }
- /** The server id associated with this session */
- get id(): ServerSessionId | undefined {
- return this[kServerSession]?.id;
- }
- get serverSession(): ServerSession {
- let serverSession = this[kServerSession];
- if (serverSession == null) {
- if (this.explicit) {
- throw new MongoRuntimeError('Unexpected null serverSession for an explicit session');
- }
- if (this.hasEnded) {
- throw new MongoRuntimeError('Unexpected null serverSession for an ended implicit session');
- }
- serverSession = this.sessionPool.acquire();
- this[kServerSession] = serverSession;
- }
- return serverSession;
- }
- /** Whether or not this session is configured for snapshot reads */
- get snapshotEnabled(): boolean {
- return this[kSnapshotEnabled];
- }
- get loadBalanced(): boolean {
- return this.client.topology?.description.type === TopologyType.LoadBalanced;
- }
- /** @internal */
- get pinnedConnection(): Connection | undefined {
- return this[kPinnedConnection];
- }
- /** @internal */
- pin(conn: Connection): void {
- if (this[kPinnedConnection]) {
- throw TypeError('Cannot pin multiple connections to the same session');
- }
- this[kPinnedConnection] = conn;
- conn.emit(
- PINNED,
- this.inTransaction() ? ConnectionPoolMetrics.TXN : ConnectionPoolMetrics.CURSOR
- );
- }
- /** @internal */
- unpin(options?: { force?: boolean; forceClear?: boolean; error?: AnyError }): void {
- if (this.loadBalanced) {
- return maybeClearPinnedConnection(this, options);
- }
- this.transaction.unpinServer();
- }
- get isPinned(): boolean {
- return this.loadBalanced ? !!this[kPinnedConnection] : this.transaction.isPinned;
- }
- /**
- * Ends this session on the server
- *
- * @param options - Optional settings. Currently reserved for future use
- */
- async endSession(options?: EndSessionOptions): Promise<void> {
- try {
- if (this.inTransaction()) {
- await this.abortTransaction();
- }
- if (!this.hasEnded) {
- const serverSession = this[kServerSession];
- if (serverSession != null) {
- // release the server session back to the pool
- this.sessionPool.release(serverSession);
- // Make sure a new serverSession never makes it onto this ClientSession
- Object.defineProperty(this, kServerSession, {
- value: ServerSession.clone(serverSession),
- writable: false
- });
- }
- // mark the session as ended, and emit a signal
- this.hasEnded = true;
- this.emit('ended', this);
- }
- } catch {
- // spec indicates that we should ignore all errors for `endSessions`
- } finally {
- maybeClearPinnedConnection(this, { force: true, ...options });
- }
- }
- /**
- * Advances the operationTime for a ClientSession.
- *
- * @param operationTime - the `BSON.Timestamp` of the operation type it is desired to advance to
- */
- advanceOperationTime(operationTime: Timestamp): void {
- if (this.operationTime == null) {
- this.operationTime = operationTime;
- return;
- }
- if (operationTime.greaterThan(this.operationTime)) {
- this.operationTime = operationTime;
- }
- }
- /**
- * Advances the clusterTime for a ClientSession to the provided clusterTime of another ClientSession
- *
- * @param clusterTime - the $clusterTime returned by the server from another session in the form of a document containing the `BSON.Timestamp` clusterTime and signature
- */
- advanceClusterTime(clusterTime: ClusterTime): void {
- if (!clusterTime || typeof clusterTime !== 'object') {
- throw new MongoInvalidArgumentError('input cluster time must be an object');
- }
- if (!clusterTime.clusterTime || clusterTime.clusterTime._bsontype !== 'Timestamp') {
- throw new MongoInvalidArgumentError(
- 'input cluster time "clusterTime" property must be a valid BSON Timestamp'
- );
- }
- if (
- !clusterTime.signature ||
- clusterTime.signature.hash?._bsontype !== 'Binary' ||
- (typeof clusterTime.signature.keyId !== 'bigint' &&
- typeof clusterTime.signature.keyId !== 'number' &&
- clusterTime.signature.keyId?._bsontype !== 'Long') // apparently we decode the key to number?
- ) {
- throw new MongoInvalidArgumentError(
- 'input cluster time must have a valid "signature" property with BSON Binary hash and BSON Long keyId'
- );
- }
- _advanceClusterTime(this, clusterTime);
- }
- /**
- * Used to determine if this session equals another
- *
- * @param session - The session to compare to
- */
- equals(session: ClientSession): boolean {
- if (!(session instanceof ClientSession)) {
- return false;
- }
- if (this.id == null || session.id == null) {
- return false;
- }
- return ByteUtils.equals(this.id.id.buffer, session.id.id.buffer);
- }
- /**
- * Increment the transaction number on the internal ServerSession
- *
- * @privateRemarks
- * This helper increments a value stored on the client session that will be
- * added to the serverSession's txnNumber upon applying it to a command.
- * This is because the serverSession is lazily acquired after a connection is obtained
- */
- incrementTransactionNumber(): void {
- this[kTxnNumberIncrement] += 1;
- }
- /** @returns whether this session is currently in a transaction or not */
- inTransaction(): boolean {
- return this.transaction.isActive;
- }
- /**
- * Starts a new transaction with the given options.
- *
- * @param options - Options for the transaction
- */
- startTransaction(options?: TransactionOptions): void {
- if (this[kSnapshotEnabled]) {
- throw new MongoCompatibilityError('Transactions are not supported in snapshot sessions');
- }
- if (this.inTransaction()) {
- throw new MongoTransactionError('Transaction already in progress');
- }
- if (this.isPinned && this.transaction.isCommitted) {
- this.unpin();
- }
- const topologyMaxWireVersion = maxWireVersion(this.client.topology);
- if (
- isSharded(this.client.topology) &&
- topologyMaxWireVersion != null &&
- topologyMaxWireVersion < minWireVersionForShardedTransactions
- ) {
- throw new MongoCompatibilityError(
- 'Transactions are not supported on sharded clusters in MongoDB < 4.2.'
- );
- }
- // increment txnNumber
- this.incrementTransactionNumber();
- // create transaction state
- this.transaction = new Transaction({
- readConcern:
- options?.readConcern ??
- this.defaultTransactionOptions.readConcern ??
- this.clientOptions?.readConcern,
- writeConcern:
- options?.writeConcern ??
- this.defaultTransactionOptions.writeConcern ??
- this.clientOptions?.writeConcern,
- readPreference:
- options?.readPreference ??
- this.defaultTransactionOptions.readPreference ??
- this.clientOptions?.readPreference,
- maxCommitTimeMS: options?.maxCommitTimeMS ?? this.defaultTransactionOptions.maxCommitTimeMS
- });
- this.transaction.transition(TxnState.STARTING_TRANSACTION);
- }
- /**
- * Commits the currently active transaction in this session.
- */
- async commitTransaction(): Promise<void> {
- return endTransactionAsync(this, 'commitTransaction');
- }
- /**
- * Aborts the currently active transaction in this session.
- */
- async abortTransaction(): Promise<void> {
- return endTransactionAsync(this, 'abortTransaction');
- }
- /**
- * This is here to ensure that ClientSession is never serialized to BSON.
- */
- toBSON(): never {
- throw new MongoRuntimeError('ClientSession cannot be serialized to BSON.');
- }
- /**
- * Starts a transaction and runs a provided function, ensuring the commitTransaction is always attempted when all operations run in the function have completed.
- *
- * **IMPORTANT:** This method requires the user to return a Promise, and `await` all operations.
- *
- * @remarks
- * This function:
- * - If all operations successfully complete and the `commitTransaction` operation is successful, then this function will return the result of the provided function.
- * - If the transaction is unable to complete or an error is thrown from within the provided function, then this function will throw an error.
- * - If the transaction is manually aborted within the provided function it will not throw.
- * - May be called multiple times if the driver needs to attempt to retry the operations.
- *
- * Checkout a descriptive example here:
- * @see https://www.mongodb.com/blog/post/quick-start-nodejs--mongodb--how-to-implement-transactions
- *
- * @param fn - callback to run within a transaction
- * @param options - optional settings for the transaction
- * @returns A raw command response or undefined
- */
- async withTransaction<T = any>(
- fn: WithTransactionCallback<T>,
- options?: TransactionOptions
- ): Promise<T> {
- const startTime = now();
- return attemptTransaction(this, startTime, fn, options);
- }
- }
- const MAX_WITH_TRANSACTION_TIMEOUT = 120000;
- const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
- 'CannotSatisfyWriteConcern',
- 'UnknownReplWriteConcern',
- 'UnsatisfiableWriteConcern'
- ]);
- function hasNotTimedOut(startTime: number, max: number) {
- return calculateDurationInMs(startTime) < max;
- }
- function isUnknownTransactionCommitResult(err: MongoError) {
- const isNonDeterministicWriteConcernError =
- err instanceof MongoServerError &&
- err.codeName &&
- NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName);
- return (
- isMaxTimeMSExpiredError(err) ||
- (!isNonDeterministicWriteConcernError &&
- err.code !== MONGODB_ERROR_CODES.UnsatisfiableWriteConcern &&
- err.code !== MONGODB_ERROR_CODES.UnknownReplWriteConcern)
- );
- }
- export function maybeClearPinnedConnection(
- session: ClientSession,
- options?: EndSessionOptions
- ): void {
- // unpin a connection if it has been pinned
- const conn = session[kPinnedConnection];
- const error = options?.error;
- if (
- session.inTransaction() &&
- error &&
- error instanceof MongoError &&
- error.hasErrorLabel(MongoErrorLabel.TransientTransactionError)
- ) {
- return;
- }
- const topology = session.client.topology;
- // NOTE: the spec talks about what to do on a network error only, but the tests seem to
- // to validate that we don't unpin on _all_ errors?
- if (conn && topology != null) {
- const servers = Array.from(topology.s.servers.values());
- const loadBalancer = servers[0];
- if (options?.error == null || options?.force) {
- loadBalancer.pool.checkIn(conn);
- conn.emit(
- UNPINNED,
- session.transaction.state !== TxnState.NO_TRANSACTION
- ? ConnectionPoolMetrics.TXN
- : ConnectionPoolMetrics.CURSOR
- );
- if (options?.forceClear) {
- loadBalancer.pool.clear({ serviceId: conn.serviceId });
- }
- }
- session[kPinnedConnection] = undefined;
- }
- }
- function isMaxTimeMSExpiredError(err: MongoError) {
- if (err == null || !(err instanceof MongoServerError)) {
- return false;
- }
- return (
- err.code === MONGODB_ERROR_CODES.MaxTimeMSExpired ||
- (err.writeConcernError && err.writeConcernError.code === MONGODB_ERROR_CODES.MaxTimeMSExpired)
- );
- }
- function attemptTransactionCommit<T>(
- session: ClientSession,
- startTime: number,
- fn: WithTransactionCallback<T>,
- result: any,
- options: TransactionOptions
- ): Promise<T> {
- return session.commitTransaction().then(
- () => result,
- (err: MongoError) => {
- if (
- err instanceof MongoError &&
- hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
- !isMaxTimeMSExpiredError(err)
- ) {
- if (err.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)) {
- return attemptTransactionCommit(session, startTime, fn, result, options);
- }
- if (err.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
- return attemptTransaction(session, startTime, fn, options);
- }
- }
- throw err;
- }
- );
- }
- const USER_EXPLICIT_TXN_END_STATES = new Set<TxnState>([
- TxnState.NO_TRANSACTION,
- TxnState.TRANSACTION_COMMITTED,
- TxnState.TRANSACTION_ABORTED
- ]);
- function userExplicitlyEndedTransaction(session: ClientSession) {
- return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
- }
- function attemptTransaction<T>(
- session: ClientSession,
- startTime: number,
- fn: WithTransactionCallback<T>,
- options: TransactionOptions = {}
- ): Promise<any> {
- session.startTransaction(options);
- let promise;
- try {
- promise = fn(session);
- } catch (err) {
- promise = Promise.reject(err);
- }
- if (!isPromiseLike(promise)) {
- session.abortTransaction().catch(() => null);
- return Promise.reject(
- new MongoInvalidArgumentError('Function provided to `withTransaction` must return a Promise')
- );
- }
- return promise.then(
- result => {
- if (userExplicitlyEndedTransaction(session)) {
- return result;
- }
- return attemptTransactionCommit(session, startTime, fn, result, options);
- },
- err => {
- function maybeRetryOrThrow(err: MongoError): Promise<any> {
- if (
- err instanceof MongoError &&
- err.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
- hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)
- ) {
- return attemptTransaction(session, startTime, fn, options);
- }
- if (isMaxTimeMSExpiredError(err)) {
- err.addErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult);
- }
- throw err;
- }
- if (session.inTransaction()) {
- return session.abortTransaction().then(() => maybeRetryOrThrow(err));
- }
- return maybeRetryOrThrow(err);
- }
- );
- }
- const endTransactionAsync = promisify(
- endTransaction as (
- session: ClientSession,
- commandName: 'abortTransaction' | 'commitTransaction',
- callback: (error: Error) => void
- ) => void
- );
- function endTransaction(
- session: ClientSession,
- commandName: 'abortTransaction' | 'commitTransaction',
- callback: Callback<void>
- ) {
- // handle any initial problematic cases
- const txnState = session.transaction.state;
- if (txnState === TxnState.NO_TRANSACTION) {
- callback(new MongoTransactionError('No transaction started'));
- return;
- }
- if (commandName === 'commitTransaction') {
- if (
- txnState === TxnState.STARTING_TRANSACTION ||
- txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
- ) {
- // the transaction was never started, we can safely exit here
- session.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY);
- callback();
- return;
- }
- if (txnState === TxnState.TRANSACTION_ABORTED) {
- callback(
- new MongoTransactionError('Cannot call commitTransaction after calling abortTransaction')
- );
- return;
- }
- } else {
- if (txnState === TxnState.STARTING_TRANSACTION) {
- // the transaction was never started, we can safely exit here
- session.transaction.transition(TxnState.TRANSACTION_ABORTED);
- callback();
- return;
- }
- if (txnState === TxnState.TRANSACTION_ABORTED) {
- callback(new MongoTransactionError('Cannot call abortTransaction twice'));
- return;
- }
- if (
- txnState === TxnState.TRANSACTION_COMMITTED ||
- txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
- ) {
- callback(
- new MongoTransactionError('Cannot call abortTransaction after calling commitTransaction')
- );
- return;
- }
- }
- // construct and send the command
- const command: Document = { [commandName]: 1 };
- // apply a writeConcern if specified
- let writeConcern;
- if (session.transaction.options.writeConcern) {
- writeConcern = Object.assign({}, session.transaction.options.writeConcern);
- } else if (session.clientOptions && session.clientOptions.writeConcern) {
- writeConcern = { w: session.clientOptions.writeConcern.w };
- }
- if (txnState === TxnState.TRANSACTION_COMMITTED) {
- writeConcern = Object.assign({ wtimeoutMS: 10000 }, writeConcern, { w: 'majority' });
- }
- if (writeConcern) {
- WriteConcern.apply(command, writeConcern);
- }
- if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) {
- Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS });
- }
- function commandHandler(error?: Error) {
- if (commandName !== 'commitTransaction') {
- session.transaction.transition(TxnState.TRANSACTION_ABORTED);
- if (session.loadBalanced) {
- maybeClearPinnedConnection(session, { force: false });
- }
- // The spec indicates that we should ignore all errors on `abortTransaction`
- return callback();
- }
- session.transaction.transition(TxnState.TRANSACTION_COMMITTED);
- if (error instanceof MongoError) {
- if (
- error.hasErrorLabel(MongoErrorLabel.RetryableWriteError) ||
- error instanceof MongoWriteConcernError ||
- isMaxTimeMSExpiredError(error)
- ) {
- if (isUnknownTransactionCommitResult(error)) {
- error.addErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult);
- // per txns spec, must unpin session in this case
- session.unpin({ error });
- }
- } else if (error.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
- session.unpin({ error });
- }
- }
- callback(error);
- }
- if (session.transaction.recoveryToken) {
- command.recoveryToken = session.transaction.recoveryToken;
- }
- // send the command
- executeOperation(
- session.client,
- new RunAdminCommandOperation(command, {
- session,
- readPreference: ReadPreference.primary,
- bypassPinningCheck: true
- }),
- error => {
- if (command.abortTransaction) {
- // always unpin on abort regardless of command outcome
- session.unpin();
- }
- if (error instanceof MongoError && error.hasErrorLabel(MongoErrorLabel.RetryableWriteError)) {
- // SPEC-1185: apply majority write concern when retrying commitTransaction
- if (command.commitTransaction) {
- // per txns spec, must unpin session in this case
- session.unpin({ force: true });
- command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
- w: 'majority'
- });
- }
- return executeOperation(
- session.client,
- new RunAdminCommandOperation(command, {
- session,
- readPreference: ReadPreference.primary,
- bypassPinningCheck: true
- }),
- commandHandler
- );
- }
- commandHandler(error);
- }
- );
- }
- /** @public */
- export type ServerSessionId = { id: Binary };
- /**
- * Reflects the existence of a session on the server. Can be reused by the session pool.
- * WARNING: not meant to be instantiated directly. For internal use only.
- * @public
- */
- export class ServerSession {
- id: ServerSessionId;
- lastUse: number;
- txnNumber: number;
- isDirty: boolean;
- /** @internal */
- constructor() {
- this.id = { id: new Binary(uuidV4(), Binary.SUBTYPE_UUID) };
- this.lastUse = now();
- this.txnNumber = 0;
- this.isDirty = false;
- }
- /**
- * Determines if the server session has timed out.
- *
- * @param sessionTimeoutMinutes - The server's "logicalSessionTimeoutMinutes"
- */
- hasTimedOut(sessionTimeoutMinutes: number): boolean {
- // Take the difference of the lastUse timestamp and now, which will result in a value in
- // milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes`
- const idleTimeMinutes = Math.round(
- ((calculateDurationInMs(this.lastUse) % 86400000) % 3600000) / 60000
- );
- return idleTimeMinutes > sessionTimeoutMinutes - 1;
- }
- /**
- * @internal
- * Cloning meant to keep a readable reference to the server session data
- * after ClientSession has ended
- */
- static clone(serverSession: ServerSession): Readonly<ServerSession> {
- const arrayBuffer = new ArrayBuffer(16);
- const idBytes = Buffer.from(arrayBuffer);
- idBytes.set(serverSession.id.id.buffer);
- const id = new Binary(idBytes, serverSession.id.id.sub_type);
- // Manual prototype construction to avoid modifying the constructor of this class
- return Object.setPrototypeOf(
- {
- id: { id },
- lastUse: serverSession.lastUse,
- txnNumber: serverSession.txnNumber,
- isDirty: serverSession.isDirty
- },
- ServerSession.prototype
- );
- }
- }
- /**
- * Maintains a pool of Server Sessions.
- * For internal use only
- * @internal
- */
- export class ServerSessionPool {
- client: MongoClient;
- sessions: List<ServerSession>;
- constructor(client: MongoClient) {
- if (client == null) {
- throw new MongoRuntimeError('ServerSessionPool requires a MongoClient');
- }
- this.client = client;
- this.sessions = new List<ServerSession>();
- }
- /**
- * Acquire a Server Session from the pool.
- * Iterates through each session in the pool, removing any stale sessions
- * along the way. The first non-stale session found is removed from the
- * pool and returned. If no non-stale session is found, a new ServerSession is created.
- */
- acquire(): ServerSession {
- const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10;
- let session: ServerSession | null = null;
- // Try to obtain from session pool
- while (this.sessions.length > 0) {
- const potentialSession = this.sessions.shift();
- if (
- potentialSession != null &&
- (!!this.client.topology?.loadBalanced ||
- !potentialSession.hasTimedOut(sessionTimeoutMinutes))
- ) {
- session = potentialSession;
- break;
- }
- }
- // If nothing valid came from the pool make a new one
- if (session == null) {
- session = new ServerSession();
- }
- return session;
- }
- /**
- * Release a session to the session pool
- * Adds the session back to the session pool if the session has not timed out yet.
- * This method also removes any stale sessions from the pool.
- *
- * @param session - The session to release to the pool
- */
- release(session: ServerSession): void {
- const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10;
- if (this.client.topology?.loadBalanced && !sessionTimeoutMinutes) {
- this.sessions.unshift(session);
- }
- if (!sessionTimeoutMinutes) {
- return;
- }
- this.sessions.prune(session => session.hasTimedOut(sessionTimeoutMinutes));
- if (!session.hasTimedOut(sessionTimeoutMinutes)) {
- if (session.isDirty) {
- return;
- }
- // otherwise, readd this session to the session pool
- this.sessions.unshift(session);
- }
- }
- }
- /**
- * Optionally decorate a command with sessions specific keys
- *
- * @param session - the session tracking transaction state
- * @param command - the command to decorate
- * @param options - Optional settings passed to calling operation
- *
- * @internal
- */
- export function applySession(
- session: ClientSession,
- command: Document,
- options: CommandOptions
- ): MongoDriverError | undefined {
- if (session.hasEnded) {
- return new MongoExpiredSessionError();
- }
- // May acquire serverSession here
- const serverSession = session.serverSession;
- if (serverSession == null) {
- return new MongoRuntimeError('Unable to acquire server session');
- }
- if (options.writeConcern?.w === 0) {
- if (session && session.explicit) {
- // Error if user provided an explicit session to an unacknowledged write (SPEC-1019)
- return new MongoAPIError('Cannot have explicit session with unacknowledged writes');
- }
- return;
- }
- // mark the last use of this session, and apply the `lsid`
- serverSession.lastUse = now();
- command.lsid = serverSession.id;
- const inTxnOrTxnCommand = session.inTransaction() || isTransactionCommand(command);
- const isRetryableWrite = !!options.willRetryWrite;
- if (isRetryableWrite || inTxnOrTxnCommand) {
- serverSession.txnNumber += session[kTxnNumberIncrement];
- session[kTxnNumberIncrement] = 0;
- // TODO(NODE-2674): Preserve int64 sent from MongoDB
- command.txnNumber = Long.fromNumber(serverSession.txnNumber);
- }
- if (!inTxnOrTxnCommand) {
- if (session.transaction.state !== TxnState.NO_TRANSACTION) {
- session.transaction.transition(TxnState.NO_TRANSACTION);
- }
- if (
- session.supports.causalConsistency &&
- session.operationTime &&
- commandSupportsReadConcern(command)
- ) {
- command.readConcern = command.readConcern || {};
- Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
- } else if (session[kSnapshotEnabled]) {
- command.readConcern = command.readConcern || { level: ReadConcernLevel.snapshot };
- if (session[kSnapshotTime] != null) {
- Object.assign(command.readConcern, { atClusterTime: session[kSnapshotTime] });
- }
- }
- return;
- }
- // now attempt to apply transaction-specific sessions data
- // `autocommit` must always be false to differentiate from retryable writes
- command.autocommit = false;
- if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
- session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
- command.startTransaction = true;
- const readConcern =
- session.transaction.options.readConcern || session?.clientOptions?.readConcern;
- if (readConcern) {
- command.readConcern = readConcern;
- }
- if (session.supports.causalConsistency && session.operationTime) {
- command.readConcern = command.readConcern || {};
- Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
- }
- }
- return;
- }
- export function updateSessionFromResponse(session: ClientSession, document: Document): void {
- if (document.$clusterTime) {
- _advanceClusterTime(session, document.$clusterTime);
- }
- if (document.operationTime && session && session.supports.causalConsistency) {
- session.advanceOperationTime(document.operationTime);
- }
- if (document.recoveryToken && session && session.inTransaction()) {
- session.transaction._recoveryToken = document.recoveryToken;
- }
- if (session?.[kSnapshotEnabled] && session[kSnapshotTime] == null) {
- // find and aggregate commands return atClusterTime on the cursor
- // distinct includes it in the response body
- const atClusterTime = document.cursor?.atClusterTime || document.atClusterTime;
- if (atClusterTime) {
- session[kSnapshotTime] = atClusterTime;
- }
- }
- }
|