execute_operation.ts 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. import type { Document } from '../bson';
  2. import {
  3. isRetryableReadError,
  4. isRetryableWriteError,
  5. MongoCompatibilityError,
  6. MONGODB_ERROR_CODES,
  7. MongoError,
  8. MongoErrorLabel,
  9. MongoExpiredSessionError,
  10. MongoNetworkError,
  11. MongoNotConnectedError,
  12. MongoRuntimeError,
  13. MongoServerError,
  14. MongoTransactionError,
  15. MongoUnexpectedServerResponseError
  16. } from '../error';
  17. import type { MongoClient } from '../mongo_client';
  18. import { ReadPreference } from '../read_preference';
  19. import type { Server } from '../sdam/server';
  20. import {
  21. sameServerSelector,
  22. secondaryWritableServerSelector,
  23. type ServerSelector
  24. } from '../sdam/server_selection';
  25. import type { Topology } from '../sdam/topology';
  26. import type { ClientSession } from '../sessions';
  27. import { type Callback, maybeCallback, supportsRetryableWrites } from '../utils';
  28. import { AbstractCallbackOperation, Aspect } from './operation';
  29. const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
  30. const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
  31. 'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.';
  32. type ResultTypeFromOperation<TOperation> = TOperation extends AbstractCallbackOperation<infer K>
  33. ? K
  34. : never;
  35. /** @internal */
  36. export interface ExecutionResult {
  37. /** The server selected for the operation */
  38. server: Server;
  39. /** The session used for this operation, may be implicitly created */
  40. session?: ClientSession;
  41. /** The raw server response for the operation */
  42. response: Document;
  43. }
  44. /**
  45. * Executes the given operation with provided arguments.
  46. * @internal
  47. *
  48. * @remarks
  49. * This method reduces large amounts of duplication in the entire codebase by providing
  50. * a single point for determining whether callbacks or promises should be used. Additionally
  51. * it allows for a single point of entry to provide features such as implicit sessions, which
  52. * are required by the Driver Sessions specification in the event that a ClientSession is
  53. * not provided
  54. *
  55. * @param topology - The topology to execute this operation on
  56. * @param operation - The operation to execute
  57. * @param callback - The command result callback
  58. */
  59. export function executeOperation<
  60. T extends AbstractCallbackOperation<TResult>,
  61. TResult = ResultTypeFromOperation<T>
  62. >(client: MongoClient, operation: T): Promise<TResult>;
  63. export function executeOperation<
  64. T extends AbstractCallbackOperation<TResult>,
  65. TResult = ResultTypeFromOperation<T>
  66. >(client: MongoClient, operation: T, callback: Callback<TResult>): void;
  67. export function executeOperation<
  68. T extends AbstractCallbackOperation<TResult>,
  69. TResult = ResultTypeFromOperation<T>
  70. >(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void;
  71. export function executeOperation<
  72. T extends AbstractCallbackOperation<TResult>,
  73. TResult = ResultTypeFromOperation<T>
  74. >(client: MongoClient, operation: T, callback?: Callback<TResult>): Promise<TResult> | void {
  75. return maybeCallback(() => executeOperationAsync(client, operation), callback);
  76. }
  77. async function executeOperationAsync<
  78. T extends AbstractCallbackOperation<TResult>,
  79. TResult = ResultTypeFromOperation<T>
  80. >(client: MongoClient, operation: T): Promise<TResult> {
  81. if (!(operation instanceof AbstractCallbackOperation)) {
  82. // TODO(NODE-3483): Extend MongoRuntimeError
  83. throw new MongoRuntimeError('This method requires a valid operation instance');
  84. }
  85. if (client.topology == null) {
  86. // Auto connect on operation
  87. if (client.s.hasBeenClosed) {
  88. throw new MongoNotConnectedError('Client must be connected before running operations');
  89. }
  90. client.s.options[Symbol.for('@@mdb.skipPingOnConnect')] = true;
  91. try {
  92. await client.connect();
  93. } finally {
  94. delete client.s.options[Symbol.for('@@mdb.skipPingOnConnect')];
  95. }
  96. }
  97. const { topology } = client;
  98. if (topology == null) {
  99. throw new MongoRuntimeError('client.connect did not create a topology but also did not throw');
  100. }
  101. // The driver sessions spec mandates that we implicitly create sessions for operations
  102. // that are not explicitly provided with a session.
  103. let session = operation.session;
  104. let owner: symbol | undefined;
  105. if (session == null) {
  106. owner = Symbol();
  107. session = client.startSession({ owner, explicit: false });
  108. } else if (session.hasEnded) {
  109. throw new MongoExpiredSessionError('Use of expired sessions is not permitted');
  110. } else if (session.snapshotEnabled && !topology.capabilities.supportsSnapshotReads) {
  111. throw new MongoCompatibilityError('Snapshot reads require MongoDB 5.0 or later');
  112. }
  113. const readPreference = operation.readPreference ?? ReadPreference.primary;
  114. const inTransaction = !!session?.inTransaction();
  115. if (inTransaction && !readPreference.equals(ReadPreference.primary)) {
  116. throw new MongoTransactionError(
  117. `Read preference in a transaction must be primary, not: ${readPreference.mode}`
  118. );
  119. }
  120. if (session?.isPinned && session.transaction.isCommitted && !operation.bypassPinningCheck) {
  121. session.unpin();
  122. }
  123. let selector: ReadPreference | ServerSelector;
  124. if (operation.hasAspect(Aspect.MUST_SELECT_SAME_SERVER)) {
  125. // GetMore and KillCursor operations must always select the same server, but run through
  126. // server selection to potentially force monitor checks if the server is
  127. // in an unknown state.
  128. selector = sameServerSelector(operation.server?.description);
  129. } else if (operation.trySecondaryWrite) {
  130. // If operation should try to write to secondary use the custom server selector
  131. // otherwise provide the read preference.
  132. selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference);
  133. } else {
  134. selector = readPreference;
  135. }
  136. const server = await topology.selectServerAsync(selector, { session });
  137. if (session == null) {
  138. // No session also means it is not retryable, early exit
  139. return operation.execute(server, undefined);
  140. }
  141. if (!operation.hasAspect(Aspect.RETRYABLE)) {
  142. // non-retryable operation, early exit
  143. try {
  144. return await operation.execute(server, session);
  145. } finally {
  146. if (session?.owner != null && session.owner === owner) {
  147. await session.endSession().catch(() => null);
  148. }
  149. }
  150. }
  151. const willRetryRead = topology.s.options.retryReads && !inTransaction && operation.canRetryRead;
  152. const willRetryWrite =
  153. topology.s.options.retryWrites &&
  154. !inTransaction &&
  155. supportsRetryableWrites(server) &&
  156. operation.canRetryWrite;
  157. const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
  158. const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION);
  159. const willRetry = (hasReadAspect && willRetryRead) || (hasWriteAspect && willRetryWrite);
  160. if (hasWriteAspect && willRetryWrite) {
  161. operation.options.willRetryWrite = true;
  162. session.incrementTransactionNumber();
  163. }
  164. try {
  165. return await operation.execute(server, session);
  166. } catch (operationError) {
  167. if (willRetry && operationError instanceof MongoError) {
  168. return await retryOperation(operation, operationError, {
  169. session,
  170. topology,
  171. selector
  172. });
  173. }
  174. throw operationError;
  175. } finally {
  176. if (session?.owner != null && session.owner === owner) {
  177. await session.endSession().catch(() => null);
  178. }
  179. }
  180. }
  181. /** @internal */
  182. type RetryOptions = {
  183. session: ClientSession;
  184. topology: Topology;
  185. selector: ReadPreference | ServerSelector;
  186. };
  187. async function retryOperation<
  188. T extends AbstractCallbackOperation<TResult>,
  189. TResult = ResultTypeFromOperation<T>
  190. >(
  191. operation: T,
  192. originalError: MongoError,
  193. { session, topology, selector }: RetryOptions
  194. ): Promise<TResult> {
  195. const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION);
  196. const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION);
  197. if (isWriteOperation && originalError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
  198. throw new MongoServerError({
  199. message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
  200. errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
  201. originalError
  202. });
  203. }
  204. if (isWriteOperation && !isRetryableWriteError(originalError)) {
  205. throw originalError;
  206. }
  207. if (isReadOperation && !isRetryableReadError(originalError)) {
  208. throw originalError;
  209. }
  210. if (
  211. originalError instanceof MongoNetworkError &&
  212. session.isPinned &&
  213. !session.inTransaction() &&
  214. operation.hasAspect(Aspect.CURSOR_CREATING)
  215. ) {
  216. // If we have a cursor and the initial command fails with a network error,
  217. // we can retry it on another connection. So we need to check it back in, clear the
  218. // pool for the service id, and retry again.
  219. session.unpin({ force: true, forceClear: true });
  220. }
  221. // select a new server, and attempt to retry the operation
  222. const server = await topology.selectServerAsync(selector, { session });
  223. if (isWriteOperation && !supportsRetryableWrites(server)) {
  224. throw new MongoUnexpectedServerResponseError(
  225. 'Selected server does not support retryable writes'
  226. );
  227. }
  228. try {
  229. return await operation.execute(server, session);
  230. } catch (retryError) {
  231. if (
  232. retryError instanceof MongoError &&
  233. retryError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
  234. ) {
  235. throw originalError;
  236. }
  237. throw retryError;
  238. }
  239. }