execute_operation.js 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.executeOperation = void 0;
  4. const error_1 = require("../error");
  5. const read_preference_1 = require("../read_preference");
  6. const server_selection_1 = require("../sdam/server_selection");
  7. const utils_1 = require("../utils");
  8. const operation_1 = require("./operation");
  9. const MMAPv1_RETRY_WRITES_ERROR_CODE = error_1.MONGODB_ERROR_CODES.IllegalOperation;
  10. const MMAPv1_RETRY_WRITES_ERROR_MESSAGE = 'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.';
  11. function executeOperation(client, operation, callback) {
  12. return (0, utils_1.maybeCallback)(() => executeOperationAsync(client, operation), callback);
  13. }
  14. exports.executeOperation = executeOperation;
  15. async function executeOperationAsync(client, operation) {
  16. if (!(operation instanceof operation_1.AbstractOperation)) {
  17. // TODO(NODE-3483): Extend MongoRuntimeError
  18. throw new error_1.MongoRuntimeError('This method requires a valid operation instance');
  19. }
  20. if (client.topology == null) {
  21. // Auto connect on operation
  22. if (client.s.hasBeenClosed) {
  23. throw new error_1.MongoNotConnectedError('Client must be connected before running operations');
  24. }
  25. client.s.options[Symbol.for('@@mdb.skipPingOnConnect')] = true;
  26. try {
  27. await client.connect();
  28. }
  29. finally {
  30. delete client.s.options[Symbol.for('@@mdb.skipPingOnConnect')];
  31. }
  32. }
  33. const { topology } = client;
  34. if (topology == null) {
  35. throw new error_1.MongoRuntimeError('client.connect did not create a topology but also did not throw');
  36. }
  37. // The driver sessions spec mandates that we implicitly create sessions for operations
  38. // that are not explicitly provided with a session.
  39. let session = operation.session;
  40. let owner;
  41. if (session == null) {
  42. owner = Symbol();
  43. session = client.startSession({ owner, explicit: false });
  44. }
  45. else if (session.hasEnded) {
  46. throw new error_1.MongoExpiredSessionError('Use of expired sessions is not permitted');
  47. }
  48. else if (session.snapshotEnabled && !topology.capabilities.supportsSnapshotReads) {
  49. throw new error_1.MongoCompatibilityError('Snapshot reads require MongoDB 5.0 or later');
  50. }
  51. else if (session.client !== client) {
  52. throw new error_1.MongoInvalidArgumentError('ClientSession must be from the same MongoClient');
  53. }
  54. const readPreference = operation.readPreference ?? read_preference_1.ReadPreference.primary;
  55. const inTransaction = !!session?.inTransaction();
  56. if (inTransaction && !readPreference.equals(read_preference_1.ReadPreference.primary)) {
  57. throw new error_1.MongoTransactionError(`Read preference in a transaction must be primary, not: ${readPreference.mode}`);
  58. }
  59. if (session?.isPinned && session.transaction.isCommitted && !operation.bypassPinningCheck) {
  60. session.unpin();
  61. }
  62. let selector;
  63. if (operation.hasAspect(operation_1.Aspect.MUST_SELECT_SAME_SERVER)) {
  64. // GetMore and KillCursor operations must always select the same server, but run through
  65. // server selection to potentially force monitor checks if the server is
  66. // in an unknown state.
  67. selector = (0, server_selection_1.sameServerSelector)(operation.server?.description);
  68. }
  69. else if (operation.trySecondaryWrite) {
  70. // If operation should try to write to secondary use the custom server selector
  71. // otherwise provide the read preference.
  72. selector = (0, server_selection_1.secondaryWritableServerSelector)(topology.commonWireVersion, readPreference);
  73. }
  74. else {
  75. selector = readPreference;
  76. }
  77. const server = await topology.selectServerAsync(selector, { session });
  78. if (session == null) {
  79. // No session also means it is not retryable, early exit
  80. return operation.execute(server, undefined);
  81. }
  82. if (!operation.hasAspect(operation_1.Aspect.RETRYABLE)) {
  83. // non-retryable operation, early exit
  84. try {
  85. return await operation.execute(server, session);
  86. }
  87. finally {
  88. if (session?.owner != null && session.owner === owner) {
  89. await session.endSession().catch(() => null);
  90. }
  91. }
  92. }
  93. const willRetryRead = topology.s.options.retryReads && !inTransaction && operation.canRetryRead;
  94. const willRetryWrite = topology.s.options.retryWrites &&
  95. !inTransaction &&
  96. (0, utils_1.supportsRetryableWrites)(server) &&
  97. operation.canRetryWrite;
  98. const hasReadAspect = operation.hasAspect(operation_1.Aspect.READ_OPERATION);
  99. const hasWriteAspect = operation.hasAspect(operation_1.Aspect.WRITE_OPERATION);
  100. const willRetry = (hasReadAspect && willRetryRead) || (hasWriteAspect && willRetryWrite);
  101. if (hasWriteAspect && willRetryWrite) {
  102. operation.options.willRetryWrite = true;
  103. session.incrementTransactionNumber();
  104. }
  105. try {
  106. return await operation.execute(server, session);
  107. }
  108. catch (operationError) {
  109. if (willRetry && operationError instanceof error_1.MongoError) {
  110. return await retryOperation(operation, operationError, {
  111. session,
  112. topology,
  113. selector
  114. });
  115. }
  116. throw operationError;
  117. }
  118. finally {
  119. if (session?.owner != null && session.owner === owner) {
  120. await session.endSession().catch(() => null);
  121. }
  122. }
  123. }
  124. async function retryOperation(operation, originalError, { session, topology, selector }) {
  125. const isWriteOperation = operation.hasAspect(operation_1.Aspect.WRITE_OPERATION);
  126. const isReadOperation = operation.hasAspect(operation_1.Aspect.READ_OPERATION);
  127. if (isWriteOperation && originalError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
  128. throw new error_1.MongoServerError({
  129. message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
  130. errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
  131. originalError
  132. });
  133. }
  134. if (isWriteOperation && !(0, error_1.isRetryableWriteError)(originalError)) {
  135. throw originalError;
  136. }
  137. if (isReadOperation && !(0, error_1.isRetryableReadError)(originalError)) {
  138. throw originalError;
  139. }
  140. if (originalError instanceof error_1.MongoNetworkError &&
  141. session.isPinned &&
  142. !session.inTransaction() &&
  143. operation.hasAspect(operation_1.Aspect.CURSOR_CREATING)) {
  144. // If we have a cursor and the initial command fails with a network error,
  145. // we can retry it on another connection. So we need to check it back in, clear the
  146. // pool for the service id, and retry again.
  147. session.unpin({ force: true, forceClear: true });
  148. }
  149. // select a new server, and attempt to retry the operation
  150. const server = await topology.selectServerAsync(selector, { session });
  151. if (isWriteOperation && !(0, utils_1.supportsRetryableWrites)(server)) {
  152. throw new error_1.MongoUnexpectedServerResponseError('Selected server does not support retryable writes');
  153. }
  154. try {
  155. return await operation.execute(server, session);
  156. }
  157. catch (retryError) {
  158. if (retryError instanceof error_1.MongoError &&
  159. retryError.hasErrorLabel(error_1.MongoErrorLabel.NoWritesPerformed)) {
  160. throw originalError;
  161. }
  162. throw retryError;
  163. }
  164. }
  165. //# sourceMappingURL=execute_operation.js.map