topology.js 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.ServerCapabilities = exports.Topology = void 0;
  4. const util_1 = require("util");
  5. const connection_string_1 = require("../connection_string");
  6. const constants_1 = require("../constants");
  7. const error_1 = require("../error");
  8. const mongo_types_1 = require("../mongo_types");
  9. const read_preference_1 = require("../read_preference");
  10. const utils_1 = require("../utils");
  11. const common_1 = require("./common");
  12. const events_1 = require("./events");
  13. const server_1 = require("./server");
  14. const server_description_1 = require("./server_description");
  15. const server_selection_1 = require("./server_selection");
  16. const srv_polling_1 = require("./srv_polling");
  17. const topology_description_1 = require("./topology_description");
  18. // Global state
  19. let globalTopologyCounter = 0;
  20. const stateTransition = (0, utils_1.makeStateMachine)({
  21. [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, common_1.STATE_CONNECTING],
  22. [common_1.STATE_CONNECTING]: [common_1.STATE_CONNECTING, common_1.STATE_CLOSING, common_1.STATE_CONNECTED, common_1.STATE_CLOSED],
  23. [common_1.STATE_CONNECTED]: [common_1.STATE_CONNECTED, common_1.STATE_CLOSING, common_1.STATE_CLOSED],
  24. [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, common_1.STATE_CLOSED]
  25. });
  26. /** @internal */
  27. const kCancelled = Symbol('cancelled');
  28. /** @internal */
  29. const kWaitQueue = Symbol('waitQueue');
  30. /**
  31. * A container of server instances representing a connection to a MongoDB topology.
  32. * @internal
  33. */
  34. class Topology extends mongo_types_1.TypedEventEmitter {
  35. /**
  36. * @param seedlist - a list of HostAddress instances to connect to
  37. */
  38. constructor(client, seeds, options) {
  39. super();
  40. this.client = client;
  41. this.selectServerAsync = (0, util_1.promisify)((selector, options, callback) => this.selectServer(selector, options, callback));
  42. // Options should only be undefined in tests, MongoClient will always have defined options
  43. options = options ?? {
  44. hosts: [utils_1.HostAddress.fromString('localhost:27017')],
  45. ...Object.fromEntries(connection_string_1.DEFAULT_OPTIONS.entries()),
  46. ...Object.fromEntries(connection_string_1.FEATURE_FLAGS.entries())
  47. };
  48. if (typeof seeds === 'string') {
  49. seeds = [utils_1.HostAddress.fromString(seeds)];
  50. }
  51. else if (!Array.isArray(seeds)) {
  52. seeds = [seeds];
  53. }
  54. const seedlist = [];
  55. for (const seed of seeds) {
  56. if (typeof seed === 'string') {
  57. seedlist.push(utils_1.HostAddress.fromString(seed));
  58. }
  59. else if (seed instanceof utils_1.HostAddress) {
  60. seedlist.push(seed);
  61. }
  62. else {
  63. // FIXME(NODE-3483): May need to be a MongoParseError
  64. throw new error_1.MongoRuntimeError(`Topology cannot be constructed from ${JSON.stringify(seed)}`);
  65. }
  66. }
  67. const topologyType = topologyTypeFromOptions(options);
  68. const topologyId = globalTopologyCounter++;
  69. const selectedHosts = options.srvMaxHosts == null ||
  70. options.srvMaxHosts === 0 ||
  71. options.srvMaxHosts >= seedlist.length
  72. ? seedlist
  73. : (0, utils_1.shuffle)(seedlist, options.srvMaxHosts);
  74. const serverDescriptions = new Map();
  75. for (const hostAddress of selectedHosts) {
  76. serverDescriptions.set(hostAddress.toString(), new server_description_1.ServerDescription(hostAddress));
  77. }
  78. this[kWaitQueue] = new utils_1.List();
  79. this.s = {
  80. // the id of this topology
  81. id: topologyId,
  82. // passed in options
  83. options,
  84. // initial seedlist of servers to connect to
  85. seedlist,
  86. // initial state
  87. state: common_1.STATE_CLOSED,
  88. // the topology description
  89. description: new topology_description_1.TopologyDescription(topologyType, serverDescriptions, options.replicaSet, undefined, undefined, undefined, options),
  90. serverSelectionTimeoutMS: options.serverSelectionTimeoutMS,
  91. heartbeatFrequencyMS: options.heartbeatFrequencyMS,
  92. minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS,
  93. // a map of server instances to normalized addresses
  94. servers: new Map(),
  95. credentials: options?.credentials,
  96. clusterTime: undefined,
  97. // timer management
  98. connectionTimers: new Set(),
  99. detectShardedTopology: ev => this.detectShardedTopology(ev),
  100. detectSrvRecords: ev => this.detectSrvRecords(ev)
  101. };
  102. if (options.srvHost && !options.loadBalanced) {
  103. this.s.srvPoller =
  104. options.srvPoller ??
  105. new srv_polling_1.SrvPoller({
  106. heartbeatFrequencyMS: this.s.heartbeatFrequencyMS,
  107. srvHost: options.srvHost,
  108. srvMaxHosts: options.srvMaxHosts,
  109. srvServiceName: options.srvServiceName
  110. });
  111. this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
  112. }
  113. }
  114. detectShardedTopology(event) {
  115. const previousType = event.previousDescription.type;
  116. const newType = event.newDescription.type;
  117. const transitionToSharded = previousType !== common_1.TopologyType.Sharded && newType === common_1.TopologyType.Sharded;
  118. const srvListeners = this.s.srvPoller?.listeners(srv_polling_1.SrvPoller.SRV_RECORD_DISCOVERY);
  119. const listeningToSrvPolling = !!srvListeners?.includes(this.s.detectSrvRecords);
  120. if (transitionToSharded && !listeningToSrvPolling) {
  121. this.s.srvPoller?.on(srv_polling_1.SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
  122. this.s.srvPoller?.start();
  123. }
  124. }
  125. detectSrvRecords(ev) {
  126. const previousTopologyDescription = this.s.description;
  127. this.s.description = this.s.description.updateFromSrvPollingEvent(ev, this.s.options.srvMaxHosts);
  128. if (this.s.description === previousTopologyDescription) {
  129. // Nothing changed, so return
  130. return;
  131. }
  132. updateServers(this);
  133. this.emit(Topology.TOPOLOGY_DESCRIPTION_CHANGED, new events_1.TopologyDescriptionChangedEvent(this.s.id, previousTopologyDescription, this.s.description));
  134. }
  135. /**
  136. * @returns A `TopologyDescription` for this topology
  137. */
  138. get description() {
  139. return this.s.description;
  140. }
  141. get loadBalanced() {
  142. return this.s.options.loadBalanced;
  143. }
  144. get capabilities() {
  145. return new ServerCapabilities(this.lastHello());
  146. }
  147. connect(options, callback) {
  148. if (typeof options === 'function')
  149. (callback = options), (options = {});
  150. options = options ?? {};
  151. if (this.s.state === common_1.STATE_CONNECTED) {
  152. if (typeof callback === 'function') {
  153. callback();
  154. }
  155. return;
  156. }
  157. stateTransition(this, common_1.STATE_CONNECTING);
  158. // emit SDAM monitoring events
  159. this.emit(Topology.TOPOLOGY_OPENING, new events_1.TopologyOpeningEvent(this.s.id));
  160. // emit an event for the topology change
  161. this.emit(Topology.TOPOLOGY_DESCRIPTION_CHANGED, new events_1.TopologyDescriptionChangedEvent(this.s.id, new topology_description_1.TopologyDescription(common_1.TopologyType.Unknown), // initial is always Unknown
  162. this.s.description));
  163. // connect all known servers, then attempt server selection to connect
  164. const serverDescriptions = Array.from(this.s.description.servers.values());
  165. this.s.servers = new Map(serverDescriptions.map(serverDescription => [
  166. serverDescription.address,
  167. createAndConnectServer(this, serverDescription)
  168. ]));
  169. // In load balancer mode we need to fake a server description getting
  170. // emitted from the monitor, since the monitor doesn't exist.
  171. if (this.s.options.loadBalanced) {
  172. for (const description of serverDescriptions) {
  173. const newDescription = new server_description_1.ServerDescription(description.hostAddress, undefined, {
  174. loadBalanced: this.s.options.loadBalanced
  175. });
  176. this.serverUpdateHandler(newDescription);
  177. }
  178. }
  179. const exitWithError = (error) => callback ? callback(error) : this.emit(Topology.ERROR, error);
  180. const readPreference = options.readPreference ?? read_preference_1.ReadPreference.primary;
  181. this.selectServer((0, server_selection_1.readPreferenceServerSelector)(readPreference), options, (err, server) => {
  182. if (err) {
  183. return this.close({ force: false }, () => exitWithError(err));
  184. }
  185. // TODO: NODE-2471
  186. const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
  187. if (!skipPingOnConnect && server && this.s.credentials) {
  188. server.command((0, utils_1.ns)('admin.$cmd'), { ping: 1 }, {}, err => {
  189. if (err) {
  190. return exitWithError(err);
  191. }
  192. stateTransition(this, common_1.STATE_CONNECTED);
  193. this.emit(Topology.OPEN, this);
  194. this.emit(Topology.CONNECT, this);
  195. callback?.(undefined, this);
  196. });
  197. return;
  198. }
  199. stateTransition(this, common_1.STATE_CONNECTED);
  200. this.emit(Topology.OPEN, this);
  201. this.emit(Topology.CONNECT, this);
  202. callback?.(undefined, this);
  203. });
  204. }
  205. close(options, callback) {
  206. options = options ?? { force: false };
  207. if (this.s.state === common_1.STATE_CLOSED || this.s.state === common_1.STATE_CLOSING) {
  208. return callback?.();
  209. }
  210. const destroyedServers = Array.from(this.s.servers.values(), server => {
  211. return (0, util_1.promisify)(destroyServer)(server, this, { force: !!options?.force });
  212. });
  213. Promise.all(destroyedServers)
  214. .then(() => {
  215. this.s.servers.clear();
  216. stateTransition(this, common_1.STATE_CLOSING);
  217. drainWaitQueue(this[kWaitQueue], new error_1.MongoTopologyClosedError());
  218. (0, common_1.drainTimerQueue)(this.s.connectionTimers);
  219. if (this.s.srvPoller) {
  220. this.s.srvPoller.stop();
  221. this.s.srvPoller.removeListener(srv_polling_1.SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
  222. }
  223. this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
  224. stateTransition(this, common_1.STATE_CLOSED);
  225. // emit an event for close
  226. this.emit(Topology.TOPOLOGY_CLOSED, new events_1.TopologyClosedEvent(this.s.id));
  227. })
  228. .finally(() => callback?.());
  229. }
  230. /**
  231. * Selects a server according to the selection predicate provided
  232. *
  233. * @param selector - An optional selector to select servers by, defaults to a random selection within a latency window
  234. * @param options - Optional settings related to server selection
  235. * @param callback - The callback used to indicate success or failure
  236. * @returns An instance of a `Server` meeting the criteria of the predicate provided
  237. */
  238. selectServer(selector, options, callback) {
  239. let serverSelector;
  240. if (typeof selector !== 'function') {
  241. if (typeof selector === 'string') {
  242. serverSelector = (0, server_selection_1.readPreferenceServerSelector)(read_preference_1.ReadPreference.fromString(selector));
  243. }
  244. else {
  245. let readPreference;
  246. if (selector instanceof read_preference_1.ReadPreference) {
  247. readPreference = selector;
  248. }
  249. else {
  250. read_preference_1.ReadPreference.translate(options);
  251. readPreference = options.readPreference || read_preference_1.ReadPreference.primary;
  252. }
  253. serverSelector = (0, server_selection_1.readPreferenceServerSelector)(readPreference);
  254. }
  255. }
  256. else {
  257. serverSelector = selector;
  258. }
  259. options = Object.assign({}, { serverSelectionTimeoutMS: this.s.serverSelectionTimeoutMS }, options);
  260. const isSharded = this.description.type === common_1.TopologyType.Sharded;
  261. const session = options.session;
  262. const transaction = session && session.transaction;
  263. if (isSharded && transaction && transaction.server) {
  264. callback(undefined, transaction.server);
  265. return;
  266. }
  267. const waitQueueMember = {
  268. serverSelector,
  269. transaction,
  270. callback,
  271. timeoutController: new utils_1.TimeoutController(options.serverSelectionTimeoutMS)
  272. };
  273. waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
  274. waitQueueMember[kCancelled] = true;
  275. waitQueueMember.timeoutController.clear();
  276. const timeoutError = new error_1.MongoServerSelectionError(`Server selection timed out after ${options.serverSelectionTimeoutMS} ms`, this.description);
  277. waitQueueMember.callback(timeoutError);
  278. });
  279. this[kWaitQueue].push(waitQueueMember);
  280. processWaitQueue(this);
  281. }
  282. /**
  283. * Update the internal TopologyDescription with a ServerDescription
  284. *
  285. * @param serverDescription - The server to update in the internal list of server descriptions
  286. */
  287. serverUpdateHandler(serverDescription) {
  288. if (!this.s.description.hasServer(serverDescription.address)) {
  289. return;
  290. }
  291. // ignore this server update if its from an outdated topologyVersion
  292. if (isStaleServerDescription(this.s.description, serverDescription)) {
  293. return;
  294. }
  295. // these will be used for monitoring events later
  296. const previousTopologyDescription = this.s.description;
  297. const previousServerDescription = this.s.description.servers.get(serverDescription.address);
  298. if (!previousServerDescription) {
  299. return;
  300. }
  301. // Driver Sessions Spec: "Whenever a driver receives a cluster time from
  302. // a server it MUST compare it to the current highest seen cluster time
  303. // for the deployment. If the new cluster time is higher than the
  304. // highest seen cluster time it MUST become the new highest seen cluster
  305. // time. Two cluster times are compared using only the BsonTimestamp
  306. // value of the clusterTime embedded field."
  307. const clusterTime = serverDescription.$clusterTime;
  308. if (clusterTime) {
  309. (0, common_1._advanceClusterTime)(this, clusterTime);
  310. }
  311. // If we already know all the information contained in this updated description, then
  312. // we don't need to emit SDAM events, but still need to update the description, in order
  313. // to keep client-tracked attributes like last update time and round trip time up to date
  314. const equalDescriptions = previousServerDescription && previousServerDescription.equals(serverDescription);
  315. // first update the TopologyDescription
  316. this.s.description = this.s.description.update(serverDescription);
  317. if (this.s.description.compatibilityError) {
  318. this.emit(Topology.ERROR, new error_1.MongoCompatibilityError(this.s.description.compatibilityError));
  319. return;
  320. }
  321. // emit monitoring events for this change
  322. if (!equalDescriptions) {
  323. const newDescription = this.s.description.servers.get(serverDescription.address);
  324. if (newDescription) {
  325. this.emit(Topology.SERVER_DESCRIPTION_CHANGED, new events_1.ServerDescriptionChangedEvent(this.s.id, serverDescription.address, previousServerDescription, newDescription));
  326. }
  327. }
  328. // update server list from updated descriptions
  329. updateServers(this, serverDescription);
  330. // attempt to resolve any outstanding server selection attempts
  331. if (this[kWaitQueue].length > 0) {
  332. processWaitQueue(this);
  333. }
  334. if (!equalDescriptions) {
  335. this.emit(Topology.TOPOLOGY_DESCRIPTION_CHANGED, new events_1.TopologyDescriptionChangedEvent(this.s.id, previousTopologyDescription, this.s.description));
  336. }
  337. }
  338. auth(credentials, callback) {
  339. if (typeof credentials === 'function')
  340. (callback = credentials), (credentials = undefined);
  341. if (typeof callback === 'function')
  342. callback(undefined, true);
  343. }
  344. get clientMetadata() {
  345. return this.s.options.metadata;
  346. }
  347. isConnected() {
  348. return this.s.state === common_1.STATE_CONNECTED;
  349. }
  350. isDestroyed() {
  351. return this.s.state === common_1.STATE_CLOSED;
  352. }
  353. // NOTE: There are many places in code where we explicitly check the last hello
  354. // to do feature support detection. This should be done any other way, but for
  355. // now we will just return the first hello seen, which should suffice.
  356. lastHello() {
  357. const serverDescriptions = Array.from(this.description.servers.values());
  358. if (serverDescriptions.length === 0)
  359. return {};
  360. const sd = serverDescriptions.filter((sd) => sd.type !== common_1.ServerType.Unknown)[0];
  361. const result = sd || { maxWireVersion: this.description.commonWireVersion };
  362. return result;
  363. }
  364. get commonWireVersion() {
  365. return this.description.commonWireVersion;
  366. }
  367. get logicalSessionTimeoutMinutes() {
  368. return this.description.logicalSessionTimeoutMinutes;
  369. }
  370. get clusterTime() {
  371. return this.s.clusterTime;
  372. }
  373. set clusterTime(clusterTime) {
  374. this.s.clusterTime = clusterTime;
  375. }
  376. }
  377. /** @event */
  378. Topology.SERVER_OPENING = constants_1.SERVER_OPENING;
  379. /** @event */
  380. Topology.SERVER_CLOSED = constants_1.SERVER_CLOSED;
  381. /** @event */
  382. Topology.SERVER_DESCRIPTION_CHANGED = constants_1.SERVER_DESCRIPTION_CHANGED;
  383. /** @event */
  384. Topology.TOPOLOGY_OPENING = constants_1.TOPOLOGY_OPENING;
  385. /** @event */
  386. Topology.TOPOLOGY_CLOSED = constants_1.TOPOLOGY_CLOSED;
  387. /** @event */
  388. Topology.TOPOLOGY_DESCRIPTION_CHANGED = constants_1.TOPOLOGY_DESCRIPTION_CHANGED;
  389. /** @event */
  390. Topology.ERROR = constants_1.ERROR;
  391. /** @event */
  392. Topology.OPEN = constants_1.OPEN;
  393. /** @event */
  394. Topology.CONNECT = constants_1.CONNECT;
  395. /** @event */
  396. Topology.CLOSE = constants_1.CLOSE;
  397. /** @event */
  398. Topology.TIMEOUT = constants_1.TIMEOUT;
  399. exports.Topology = Topology;
  400. /** Destroys a server, and removes all event listeners from the instance */
  401. function destroyServer(server, topology, options, callback) {
  402. options = options ?? { force: false };
  403. for (const event of constants_1.LOCAL_SERVER_EVENTS) {
  404. server.removeAllListeners(event);
  405. }
  406. server.destroy(options, () => {
  407. topology.emit(Topology.SERVER_CLOSED, new events_1.ServerClosedEvent(topology.s.id, server.description.address));
  408. for (const event of constants_1.SERVER_RELAY_EVENTS) {
  409. server.removeAllListeners(event);
  410. }
  411. if (typeof callback === 'function') {
  412. callback();
  413. }
  414. });
  415. }
  416. /** Predicts the TopologyType from options */
  417. function topologyTypeFromOptions(options) {
  418. if (options?.directConnection) {
  419. return common_1.TopologyType.Single;
  420. }
  421. if (options?.replicaSet) {
  422. return common_1.TopologyType.ReplicaSetNoPrimary;
  423. }
  424. if (options?.loadBalanced) {
  425. return common_1.TopologyType.LoadBalanced;
  426. }
  427. return common_1.TopologyType.Unknown;
  428. }
  429. /**
  430. * Creates new server instances and attempts to connect them
  431. *
  432. * @param topology - The topology that this server belongs to
  433. * @param serverDescription - The description for the server to initialize and connect to
  434. */
  435. function createAndConnectServer(topology, serverDescription) {
  436. topology.emit(Topology.SERVER_OPENING, new events_1.ServerOpeningEvent(topology.s.id, serverDescription.address));
  437. const server = new server_1.Server(topology, serverDescription, topology.s.options);
  438. for (const event of constants_1.SERVER_RELAY_EVENTS) {
  439. server.on(event, (e) => topology.emit(event, e));
  440. }
  441. server.on(server_1.Server.DESCRIPTION_RECEIVED, description => topology.serverUpdateHandler(description));
  442. server.connect();
  443. return server;
  444. }
  445. /**
  446. * @param topology - Topology to update.
  447. * @param incomingServerDescription - New server description.
  448. */
  449. function updateServers(topology, incomingServerDescription) {
  450. // update the internal server's description
  451. if (incomingServerDescription && topology.s.servers.has(incomingServerDescription.address)) {
  452. const server = topology.s.servers.get(incomingServerDescription.address);
  453. if (server) {
  454. server.s.description = incomingServerDescription;
  455. if (incomingServerDescription.error instanceof error_1.MongoError &&
  456. incomingServerDescription.error.hasErrorLabel(error_1.MongoErrorLabel.ResetPool)) {
  457. const interruptInUseConnections = incomingServerDescription.error.hasErrorLabel(error_1.MongoErrorLabel.InterruptInUseConnections);
  458. server.pool.clear({ interruptInUseConnections });
  459. }
  460. else if (incomingServerDescription.error == null) {
  461. const newTopologyType = topology.s.description.type;
  462. const shouldMarkPoolReady = incomingServerDescription.isDataBearing ||
  463. (incomingServerDescription.type !== common_1.ServerType.Unknown &&
  464. newTopologyType === common_1.TopologyType.Single);
  465. if (shouldMarkPoolReady) {
  466. server.pool.ready();
  467. }
  468. }
  469. }
  470. }
  471. // add new servers for all descriptions we currently don't know about locally
  472. for (const serverDescription of topology.description.servers.values()) {
  473. if (!topology.s.servers.has(serverDescription.address)) {
  474. const server = createAndConnectServer(topology, serverDescription);
  475. topology.s.servers.set(serverDescription.address, server);
  476. }
  477. }
  478. // for all servers no longer known, remove their descriptions and destroy their instances
  479. for (const entry of topology.s.servers) {
  480. const serverAddress = entry[0];
  481. if (topology.description.hasServer(serverAddress)) {
  482. continue;
  483. }
  484. if (!topology.s.servers.has(serverAddress)) {
  485. continue;
  486. }
  487. const server = topology.s.servers.get(serverAddress);
  488. topology.s.servers.delete(serverAddress);
  489. // prepare server for garbage collection
  490. if (server) {
  491. destroyServer(server, topology);
  492. }
  493. }
  494. }
  495. function drainWaitQueue(queue, err) {
  496. while (queue.length) {
  497. const waitQueueMember = queue.shift();
  498. if (!waitQueueMember) {
  499. continue;
  500. }
  501. waitQueueMember.timeoutController.clear();
  502. if (!waitQueueMember[kCancelled]) {
  503. waitQueueMember.callback(err);
  504. }
  505. }
  506. }
  507. function processWaitQueue(topology) {
  508. if (topology.s.state === common_1.STATE_CLOSED) {
  509. drainWaitQueue(topology[kWaitQueue], new error_1.MongoTopologyClosedError());
  510. return;
  511. }
  512. const isSharded = topology.description.type === common_1.TopologyType.Sharded;
  513. const serverDescriptions = Array.from(topology.description.servers.values());
  514. const membersToProcess = topology[kWaitQueue].length;
  515. for (let i = 0; i < membersToProcess; ++i) {
  516. const waitQueueMember = topology[kWaitQueue].shift();
  517. if (!waitQueueMember) {
  518. continue;
  519. }
  520. if (waitQueueMember[kCancelled]) {
  521. continue;
  522. }
  523. let selectedDescriptions;
  524. try {
  525. const serverSelector = waitQueueMember.serverSelector;
  526. selectedDescriptions = serverSelector
  527. ? serverSelector(topology.description, serverDescriptions)
  528. : serverDescriptions;
  529. }
  530. catch (e) {
  531. waitQueueMember.timeoutController.clear();
  532. waitQueueMember.callback(e);
  533. continue;
  534. }
  535. let selectedServer;
  536. if (selectedDescriptions.length === 0) {
  537. topology[kWaitQueue].push(waitQueueMember);
  538. continue;
  539. }
  540. else if (selectedDescriptions.length === 1) {
  541. selectedServer = topology.s.servers.get(selectedDescriptions[0].address);
  542. }
  543. else {
  544. const descriptions = (0, utils_1.shuffle)(selectedDescriptions, 2);
  545. const server1 = topology.s.servers.get(descriptions[0].address);
  546. const server2 = topology.s.servers.get(descriptions[1].address);
  547. selectedServer =
  548. server1 && server2 && server1.s.operationCount < server2.s.operationCount
  549. ? server1
  550. : server2;
  551. }
  552. if (!selectedServer) {
  553. waitQueueMember.callback(new error_1.MongoServerSelectionError('server selection returned a server description but the server was not found in the topology', topology.description));
  554. return;
  555. }
  556. const transaction = waitQueueMember.transaction;
  557. if (isSharded && transaction && transaction.isActive && selectedServer) {
  558. transaction.pinServer(selectedServer);
  559. }
  560. waitQueueMember.timeoutController.clear();
  561. waitQueueMember.callback(undefined, selectedServer);
  562. }
  563. if (topology[kWaitQueue].length > 0) {
  564. // ensure all server monitors attempt monitoring soon
  565. for (const [, server] of topology.s.servers) {
  566. process.nextTick(function scheduleServerCheck() {
  567. return server.requestCheck();
  568. });
  569. }
  570. }
  571. }
  572. function isStaleServerDescription(topologyDescription, incomingServerDescription) {
  573. const currentServerDescription = topologyDescription.servers.get(incomingServerDescription.address);
  574. const currentTopologyVersion = currentServerDescription?.topologyVersion;
  575. return ((0, server_description_1.compareTopologyVersion)(currentTopologyVersion, incomingServerDescription.topologyVersion) > 0);
  576. }
  577. /** @public */
  578. class ServerCapabilities {
  579. constructor(hello) {
  580. this.minWireVersion = hello.minWireVersion || 0;
  581. this.maxWireVersion = hello.maxWireVersion || 0;
  582. }
  583. get hasAggregationCursor() {
  584. return this.maxWireVersion >= 1;
  585. }
  586. get hasWriteCommands() {
  587. return this.maxWireVersion >= 2;
  588. }
  589. get hasTextSearch() {
  590. return this.minWireVersion >= 0;
  591. }
  592. get hasAuthCommands() {
  593. return this.maxWireVersion >= 1;
  594. }
  595. get hasListCollectionsCommand() {
  596. return this.maxWireVersion >= 3;
  597. }
  598. get hasListIndexesCommand() {
  599. return this.maxWireVersion >= 3;
  600. }
  601. get supportsSnapshotReads() {
  602. return this.maxWireVersion >= 13;
  603. }
  604. get commandsTakeWriteConcern() {
  605. return this.maxWireVersion >= 5;
  606. }
  607. get commandsTakeCollation() {
  608. return this.maxWireVersion >= 5;
  609. }
  610. }
  611. exports.ServerCapabilities = ServerCapabilities;
  612. //# sourceMappingURL=topology.js.map