server.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.Server = void 0;
  4. const util_1 = require("util");
  5. const connection_1 = require("../cmap/connection");
  6. const connection_pool_1 = require("../cmap/connection_pool");
  7. const errors_1 = require("../cmap/errors");
  8. const constants_1 = require("../constants");
  9. const error_1 = require("../error");
  10. const mongo_types_1 = require("../mongo_types");
  11. const transactions_1 = require("../transactions");
  12. const utils_1 = require("../utils");
  13. const common_1 = require("./common");
  14. const monitor_1 = require("./monitor");
  15. const server_description_1 = require("./server_description");
  16. const stateTransition = (0, utils_1.makeStateMachine)({
  17. [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, common_1.STATE_CONNECTING],
  18. [common_1.STATE_CONNECTING]: [common_1.STATE_CONNECTING, common_1.STATE_CLOSING, common_1.STATE_CONNECTED, common_1.STATE_CLOSED],
  19. [common_1.STATE_CONNECTED]: [common_1.STATE_CONNECTED, common_1.STATE_CLOSING, common_1.STATE_CLOSED],
  20. [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, common_1.STATE_CLOSED]
  21. });
  22. /** @internal */
  23. const kMonitor = Symbol('monitor');
  24. /** @internal */
  25. class Server extends mongo_types_1.TypedEventEmitter {
  26. /**
  27. * Create a server
  28. */
  29. constructor(topology, description, options) {
  30. super();
  31. this.commandAsync = (0, util_1.promisify)((ns, cmd, options,
  32. // callback type defines Document result because result is never nullish when it succeeds, otherwise promise rejects
  33. callback) => this.command(ns, cmd, options, callback));
  34. this.serverApi = options.serverApi;
  35. const poolOptions = { hostAddress: description.hostAddress, ...options };
  36. this.topology = topology;
  37. this.pool = new connection_pool_1.ConnectionPool(this, poolOptions);
  38. this.s = {
  39. description,
  40. options,
  41. state: common_1.STATE_CLOSED,
  42. operationCount: 0
  43. };
  44. for (const event of [...constants_1.CMAP_EVENTS, ...constants_1.APM_EVENTS]) {
  45. this.pool.on(event, (e) => this.emit(event, e));
  46. }
  47. this.pool.on(connection_1.Connection.CLUSTER_TIME_RECEIVED, (clusterTime) => {
  48. this.clusterTime = clusterTime;
  49. });
  50. if (this.loadBalanced) {
  51. this[kMonitor] = null;
  52. // monitoring is disabled in load balancing mode
  53. return;
  54. }
  55. // create the monitor
  56. // TODO(NODE-4144): Remove new variable for type narrowing
  57. const monitor = new monitor_1.Monitor(this, this.s.options);
  58. this[kMonitor] = monitor;
  59. for (const event of constants_1.HEARTBEAT_EVENTS) {
  60. monitor.on(event, (e) => this.emit(event, e));
  61. }
  62. monitor.on('resetServer', (error) => markServerUnknown(this, error));
  63. monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event) => {
  64. this.emit(Server.DESCRIPTION_RECEIVED, new server_description_1.ServerDescription(this.description.hostAddress, event.reply, {
  65. roundTripTime: calculateRoundTripTime(this.description.roundTripTime, event.duration)
  66. }));
  67. if (this.s.state === common_1.STATE_CONNECTING) {
  68. stateTransition(this, common_1.STATE_CONNECTED);
  69. this.emit(Server.CONNECT, this);
  70. }
  71. });
  72. }
  73. get clusterTime() {
  74. return this.topology.clusterTime;
  75. }
  76. set clusterTime(clusterTime) {
  77. this.topology.clusterTime = clusterTime;
  78. }
  79. get description() {
  80. return this.s.description;
  81. }
  82. get name() {
  83. return this.s.description.address;
  84. }
  85. get autoEncrypter() {
  86. if (this.s.options && this.s.options.autoEncrypter) {
  87. return this.s.options.autoEncrypter;
  88. }
  89. return;
  90. }
  91. get loadBalanced() {
  92. return this.topology.description.type === common_1.TopologyType.LoadBalanced;
  93. }
  94. /**
  95. * Initiate server connect
  96. */
  97. connect() {
  98. if (this.s.state !== common_1.STATE_CLOSED) {
  99. return;
  100. }
  101. stateTransition(this, common_1.STATE_CONNECTING);
  102. // If in load balancer mode we automatically set the server to
  103. // a load balancer. It never transitions out of this state and
  104. // has no monitor.
  105. if (!this.loadBalanced) {
  106. this[kMonitor]?.connect();
  107. }
  108. else {
  109. stateTransition(this, common_1.STATE_CONNECTED);
  110. this.emit(Server.CONNECT, this);
  111. }
  112. }
  113. /** Destroy the server connection */
  114. destroy(options, callback) {
  115. if (typeof options === 'function') {
  116. callback = options;
  117. options = { force: false };
  118. }
  119. options = Object.assign({}, { force: false }, options);
  120. if (this.s.state === common_1.STATE_CLOSED) {
  121. if (typeof callback === 'function') {
  122. callback();
  123. }
  124. return;
  125. }
  126. stateTransition(this, common_1.STATE_CLOSING);
  127. if (!this.loadBalanced) {
  128. this[kMonitor]?.close();
  129. }
  130. this.pool.close(options, err => {
  131. stateTransition(this, common_1.STATE_CLOSED);
  132. this.emit('closed');
  133. if (typeof callback === 'function') {
  134. callback(err);
  135. }
  136. });
  137. }
  138. /**
  139. * Immediately schedule monitoring of this server. If there already an attempt being made
  140. * this will be a no-op.
  141. */
  142. requestCheck() {
  143. if (!this.loadBalanced) {
  144. this[kMonitor]?.requestCheck();
  145. }
  146. }
  147. /**
  148. * Execute a command
  149. * @internal
  150. */
  151. command(ns, cmd, options, callback) {
  152. if (callback == null) {
  153. throw new error_1.MongoInvalidArgumentError('Callback must be provided');
  154. }
  155. if (ns.db == null || typeof ns === 'string') {
  156. throw new error_1.MongoInvalidArgumentError('Namespace must not be a string');
  157. }
  158. if (this.s.state === common_1.STATE_CLOSING || this.s.state === common_1.STATE_CLOSED) {
  159. callback(new error_1.MongoServerClosedError());
  160. return;
  161. }
  162. // Clone the options
  163. const finalOptions = Object.assign({}, options, { wireProtocolCommand: false });
  164. // There are cases where we need to flag the read preference not to get sent in
  165. // the command, such as pre-5.0 servers attempting to perform an aggregate write
  166. // with a non-primary read preference. In this case the effective read preference
  167. // (primary) is not the same as the provided and must be removed completely.
  168. if (finalOptions.omitReadPreference) {
  169. delete finalOptions.readPreference;
  170. }
  171. const session = finalOptions.session;
  172. const conn = session?.pinnedConnection;
  173. // NOTE: This is a hack! We can't retrieve the connections used for executing an operation
  174. // (and prevent them from being checked back in) at the point of operation execution.
  175. // This should be considered as part of the work for NODE-2882
  176. // NOTE:
  177. // When incrementing operation count, it's important that we increment it before we
  178. // attempt to check out a connection from the pool. This ensures that operations that
  179. // are waiting for a connection are included in the operation count. Load balanced
  180. // mode will only ever have a single server, so the operation count doesn't matter.
  181. // Incrementing the operation count above the logic to handle load balanced mode would
  182. // require special logic to decrement it again, or would double increment (the load
  183. // balanced code makes a recursive call). Instead, we increment the count after this
  184. // check.
  185. if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) {
  186. this.pool.checkOut((err, checkedOut) => {
  187. if (err || checkedOut == null) {
  188. if (callback)
  189. return callback(err);
  190. return;
  191. }
  192. session.pin(checkedOut);
  193. this.command(ns, cmd, finalOptions, callback);
  194. });
  195. return;
  196. }
  197. this.incrementOperationCount();
  198. this.pool.withConnection(conn, (err, conn, cb) => {
  199. if (err || !conn) {
  200. this.decrementOperationCount();
  201. if (!err) {
  202. return cb(new error_1.MongoRuntimeError('Failed to create connection without error'));
  203. }
  204. if (!(err instanceof errors_1.PoolClearedError)) {
  205. this.handleError(err);
  206. }
  207. return cb(err);
  208. }
  209. conn.command(ns, cmd, finalOptions, makeOperationHandler(this, conn, cmd, finalOptions, (error, response) => {
  210. this.decrementOperationCount();
  211. cb(error, response);
  212. }));
  213. }, callback);
  214. }
  215. /**
  216. * Handle SDAM error
  217. * @internal
  218. */
  219. handleError(error, connection) {
  220. if (!(error instanceof error_1.MongoError)) {
  221. return;
  222. }
  223. const isStaleError = error.connectionGeneration && error.connectionGeneration < this.pool.generation;
  224. if (isStaleError) {
  225. return;
  226. }
  227. const isNetworkNonTimeoutError = error instanceof error_1.MongoNetworkError && !(error instanceof error_1.MongoNetworkTimeoutError);
  228. const isNetworkTimeoutBeforeHandshakeError = (0, error_1.isNetworkErrorBeforeHandshake)(error);
  229. const isAuthHandshakeError = error.hasErrorLabel(error_1.MongoErrorLabel.HandshakeError);
  230. if (isNetworkNonTimeoutError || isNetworkTimeoutBeforeHandshakeError || isAuthHandshakeError) {
  231. // In load balanced mode we never mark the server as unknown and always
  232. // clear for the specific service id.
  233. if (!this.loadBalanced) {
  234. error.addErrorLabel(error_1.MongoErrorLabel.ResetPool);
  235. markServerUnknown(this, error);
  236. }
  237. else if (connection) {
  238. this.pool.clear({ serviceId: connection.serviceId });
  239. }
  240. }
  241. else {
  242. if ((0, error_1.isSDAMUnrecoverableError)(error)) {
  243. if (shouldHandleStateChangeError(this, error)) {
  244. const shouldClearPool = (0, utils_1.maxWireVersion)(this) <= 7 || (0, error_1.isNodeShuttingDownError)(error);
  245. if (this.loadBalanced && connection && shouldClearPool) {
  246. this.pool.clear({ serviceId: connection.serviceId });
  247. }
  248. if (!this.loadBalanced) {
  249. if (shouldClearPool) {
  250. error.addErrorLabel(error_1.MongoErrorLabel.ResetPool);
  251. }
  252. markServerUnknown(this, error);
  253. process.nextTick(() => this.requestCheck());
  254. }
  255. }
  256. }
  257. }
  258. }
  259. /**
  260. * Decrement the operation count, returning the new count.
  261. */
  262. decrementOperationCount() {
  263. return (this.s.operationCount -= 1);
  264. }
  265. /**
  266. * Increment the operation count, returning the new count.
  267. */
  268. incrementOperationCount() {
  269. return (this.s.operationCount += 1);
  270. }
  271. }
  272. /** @event */
  273. Server.SERVER_HEARTBEAT_STARTED = constants_1.SERVER_HEARTBEAT_STARTED;
  274. /** @event */
  275. Server.SERVER_HEARTBEAT_SUCCEEDED = constants_1.SERVER_HEARTBEAT_SUCCEEDED;
  276. /** @event */
  277. Server.SERVER_HEARTBEAT_FAILED = constants_1.SERVER_HEARTBEAT_FAILED;
  278. /** @event */
  279. Server.CONNECT = constants_1.CONNECT;
  280. /** @event */
  281. Server.DESCRIPTION_RECEIVED = constants_1.DESCRIPTION_RECEIVED;
  282. /** @event */
  283. Server.CLOSED = constants_1.CLOSED;
  284. /** @event */
  285. Server.ENDED = constants_1.ENDED;
  286. exports.Server = Server;
  287. function calculateRoundTripTime(oldRtt, duration) {
  288. if (oldRtt === -1) {
  289. return duration;
  290. }
  291. const alpha = 0.2;
  292. return alpha * duration + (1 - alpha) * oldRtt;
  293. }
  294. function markServerUnknown(server, error) {
  295. // Load balancer servers can never be marked unknown.
  296. if (server.loadBalanced) {
  297. return;
  298. }
  299. if (error instanceof error_1.MongoNetworkError && !(error instanceof error_1.MongoNetworkTimeoutError)) {
  300. server[kMonitor]?.reset();
  301. }
  302. server.emit(Server.DESCRIPTION_RECEIVED, new server_description_1.ServerDescription(server.description.hostAddress, undefined, { error }));
  303. }
  304. function isPinnableCommand(cmd, session) {
  305. if (session) {
  306. return (session.inTransaction() ||
  307. 'aggregate' in cmd ||
  308. 'find' in cmd ||
  309. 'getMore' in cmd ||
  310. 'listCollections' in cmd ||
  311. 'listIndexes' in cmd);
  312. }
  313. return false;
  314. }
  315. function connectionIsStale(pool, connection) {
  316. if (connection.serviceId) {
  317. return (connection.generation !== pool.serviceGenerations.get(connection.serviceId.toHexString()));
  318. }
  319. return connection.generation !== pool.generation;
  320. }
  321. function shouldHandleStateChangeError(server, err) {
  322. const etv = err.topologyVersion;
  323. const stv = server.description.topologyVersion;
  324. return (0, server_description_1.compareTopologyVersion)(stv, etv) < 0;
  325. }
  326. function inActiveTransaction(session, cmd) {
  327. return session && session.inTransaction() && !(0, transactions_1.isTransactionCommand)(cmd);
  328. }
  329. /** this checks the retryWrites option passed down from the client options, it
  330. * does not check if the server supports retryable writes */
  331. function isRetryableWritesEnabled(topology) {
  332. return topology.s.options.retryWrites !== false;
  333. }
  334. function makeOperationHandler(server, connection, cmd, options, callback) {
  335. const session = options?.session;
  336. return function handleOperationResult(error, result) {
  337. // We should not swallow an error if it is present.
  338. if (error == null && result != null) {
  339. return callback(undefined, result);
  340. }
  341. if (options != null && 'noResponse' in options && options.noResponse === true) {
  342. return callback(undefined, null);
  343. }
  344. if (!error) {
  345. return callback(new error_1.MongoUnexpectedServerResponseError('Empty response with no error'));
  346. }
  347. if (!(error instanceof error_1.MongoError)) {
  348. // Node.js or some other error we have not special handling for
  349. return callback(error);
  350. }
  351. if (connectionIsStale(server.pool, connection)) {
  352. return callback(error);
  353. }
  354. if (error instanceof error_1.MongoNetworkError) {
  355. if (session && !session.hasEnded && session.serverSession) {
  356. session.serverSession.isDirty = true;
  357. }
  358. // inActiveTransaction check handles commit and abort.
  359. if (inActiveTransaction(session, cmd) &&
  360. !error.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) {
  361. error.addErrorLabel(error_1.MongoErrorLabel.TransientTransactionError);
  362. }
  363. if ((isRetryableWritesEnabled(server.topology) || (0, transactions_1.isTransactionCommand)(cmd)) &&
  364. (0, utils_1.supportsRetryableWrites)(server) &&
  365. !inActiveTransaction(session, cmd)) {
  366. error.addErrorLabel(error_1.MongoErrorLabel.RetryableWriteError);
  367. }
  368. }
  369. else {
  370. if ((isRetryableWritesEnabled(server.topology) || (0, transactions_1.isTransactionCommand)(cmd)) &&
  371. (0, error_1.needsRetryableWriteLabel)(error, (0, utils_1.maxWireVersion)(server)) &&
  372. !inActiveTransaction(session, cmd)) {
  373. error.addErrorLabel(error_1.MongoErrorLabel.RetryableWriteError);
  374. }
  375. }
  376. if (session &&
  377. session.isPinned &&
  378. error.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) {
  379. session.unpin({ force: true });
  380. }
  381. server.handleError(error, connection);
  382. return callback(error);
  383. };
  384. }
  385. //# sourceMappingURL=server.js.map