topology.js 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.ServerCapabilities = exports.Topology = void 0;
  4. const timers_1 = require("timers");
  5. const util_1 = require("util");
  6. const connection_string_1 = require("../connection_string");
  7. const constants_1 = require("../constants");
  8. const error_1 = require("../error");
  9. const mongo_types_1 = require("../mongo_types");
  10. const read_preference_1 = require("../read_preference");
  11. const utils_1 = require("../utils");
  12. const common_1 = require("./common");
  13. const events_1 = require("./events");
  14. const server_1 = require("./server");
  15. const server_description_1 = require("./server_description");
  16. const server_selection_1 = require("./server_selection");
  17. const srv_polling_1 = require("./srv_polling");
  18. const topology_description_1 = require("./topology_description");
  19. // Global state
  20. let globalTopologyCounter = 0;
  21. const stateTransition = (0, utils_1.makeStateMachine)({
  22. [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, common_1.STATE_CONNECTING],
  23. [common_1.STATE_CONNECTING]: [common_1.STATE_CONNECTING, common_1.STATE_CLOSING, common_1.STATE_CONNECTED, common_1.STATE_CLOSED],
  24. [common_1.STATE_CONNECTED]: [common_1.STATE_CONNECTED, common_1.STATE_CLOSING, common_1.STATE_CLOSED],
  25. [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, common_1.STATE_CLOSED]
  26. });
  27. /** @internal */
  28. const kCancelled = Symbol('cancelled');
  29. /** @internal */
  30. const kWaitQueue = Symbol('waitQueue');
  31. /**
  32. * A container of server instances representing a connection to a MongoDB topology.
  33. * @internal
  34. */
  35. class Topology extends mongo_types_1.TypedEventEmitter {
  36. /**
  37. * @param seedlist - a list of HostAddress instances to connect to
  38. */
  39. constructor(client, seeds, options) {
  40. super();
  41. this.client = client;
  42. this.selectServerAsync = (0, util_1.promisify)((selector, options, callback) => this.selectServer(selector, options, callback));
  43. // Options should only be undefined in tests, MongoClient will always have defined options
  44. options = options ?? {
  45. hosts: [utils_1.HostAddress.fromString('localhost:27017')],
  46. ...Object.fromEntries(connection_string_1.DEFAULT_OPTIONS.entries()),
  47. ...Object.fromEntries(connection_string_1.FEATURE_FLAGS.entries())
  48. };
  49. if (typeof seeds === 'string') {
  50. seeds = [utils_1.HostAddress.fromString(seeds)];
  51. }
  52. else if (!Array.isArray(seeds)) {
  53. seeds = [seeds];
  54. }
  55. const seedlist = [];
  56. for (const seed of seeds) {
  57. if (typeof seed === 'string') {
  58. seedlist.push(utils_1.HostAddress.fromString(seed));
  59. }
  60. else if (seed instanceof utils_1.HostAddress) {
  61. seedlist.push(seed);
  62. }
  63. else {
  64. // FIXME(NODE-3483): May need to be a MongoParseError
  65. throw new error_1.MongoRuntimeError(`Topology cannot be constructed from ${JSON.stringify(seed)}`);
  66. }
  67. }
  68. const topologyType = topologyTypeFromOptions(options);
  69. const topologyId = globalTopologyCounter++;
  70. const selectedHosts = options.srvMaxHosts == null ||
  71. options.srvMaxHosts === 0 ||
  72. options.srvMaxHosts >= seedlist.length
  73. ? seedlist
  74. : (0, utils_1.shuffle)(seedlist, options.srvMaxHosts);
  75. const serverDescriptions = new Map();
  76. for (const hostAddress of selectedHosts) {
  77. serverDescriptions.set(hostAddress.toString(), new server_description_1.ServerDescription(hostAddress));
  78. }
  79. this[kWaitQueue] = new utils_1.List();
  80. this.s = {
  81. // the id of this topology
  82. id: topologyId,
  83. // passed in options
  84. options,
  85. // initial seedlist of servers to connect to
  86. seedlist,
  87. // initial state
  88. state: common_1.STATE_CLOSED,
  89. // the topology description
  90. description: new topology_description_1.TopologyDescription(topologyType, serverDescriptions, options.replicaSet, undefined, undefined, undefined, options),
  91. serverSelectionTimeoutMS: options.serverSelectionTimeoutMS,
  92. heartbeatFrequencyMS: options.heartbeatFrequencyMS,
  93. minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS,
  94. // a map of server instances to normalized addresses
  95. servers: new Map(),
  96. credentials: options?.credentials,
  97. clusterTime: undefined,
  98. // timer management
  99. connectionTimers: new Set(),
  100. detectShardedTopology: ev => this.detectShardedTopology(ev),
  101. detectSrvRecords: ev => this.detectSrvRecords(ev)
  102. };
  103. if (options.srvHost && !options.loadBalanced) {
  104. this.s.srvPoller =
  105. options.srvPoller ??
  106. new srv_polling_1.SrvPoller({
  107. heartbeatFrequencyMS: this.s.heartbeatFrequencyMS,
  108. srvHost: options.srvHost,
  109. srvMaxHosts: options.srvMaxHosts,
  110. srvServiceName: options.srvServiceName
  111. });
  112. this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
  113. }
  114. }
  115. detectShardedTopology(event) {
  116. const previousType = event.previousDescription.type;
  117. const newType = event.newDescription.type;
  118. const transitionToSharded = previousType !== common_1.TopologyType.Sharded && newType === common_1.TopologyType.Sharded;
  119. const srvListeners = this.s.srvPoller?.listeners(srv_polling_1.SrvPoller.SRV_RECORD_DISCOVERY);
  120. const listeningToSrvPolling = !!srvListeners?.includes(this.s.detectSrvRecords);
  121. if (transitionToSharded && !listeningToSrvPolling) {
  122. this.s.srvPoller?.on(srv_polling_1.SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
  123. this.s.srvPoller?.start();
  124. }
  125. }
  126. detectSrvRecords(ev) {
  127. const previousTopologyDescription = this.s.description;
  128. this.s.description = this.s.description.updateFromSrvPollingEvent(ev, this.s.options.srvMaxHosts);
  129. if (this.s.description === previousTopologyDescription) {
  130. // Nothing changed, so return
  131. return;
  132. }
  133. updateServers(this);
  134. this.emit(Topology.TOPOLOGY_DESCRIPTION_CHANGED, new events_1.TopologyDescriptionChangedEvent(this.s.id, previousTopologyDescription, this.s.description));
  135. }
  136. /**
  137. * @returns A `TopologyDescription` for this topology
  138. */
  139. get description() {
  140. return this.s.description;
  141. }
  142. get loadBalanced() {
  143. return this.s.options.loadBalanced;
  144. }
  145. get capabilities() {
  146. return new ServerCapabilities(this.lastHello());
  147. }
  148. connect(options, callback) {
  149. if (typeof options === 'function')
  150. (callback = options), (options = {});
  151. options = options ?? {};
  152. if (this.s.state === common_1.STATE_CONNECTED) {
  153. if (typeof callback === 'function') {
  154. callback();
  155. }
  156. return;
  157. }
  158. stateTransition(this, common_1.STATE_CONNECTING);
  159. // emit SDAM monitoring events
  160. this.emit(Topology.TOPOLOGY_OPENING, new events_1.TopologyOpeningEvent(this.s.id));
  161. // emit an event for the topology change
  162. this.emit(Topology.TOPOLOGY_DESCRIPTION_CHANGED, new events_1.TopologyDescriptionChangedEvent(this.s.id, new topology_description_1.TopologyDescription(common_1.TopologyType.Unknown), // initial is always Unknown
  163. this.s.description));
  164. // connect all known servers, then attempt server selection to connect
  165. const serverDescriptions = Array.from(this.s.description.servers.values());
  166. this.s.servers = new Map(serverDescriptions.map(serverDescription => [
  167. serverDescription.address,
  168. createAndConnectServer(this, serverDescription)
  169. ]));
  170. // In load balancer mode we need to fake a server description getting
  171. // emitted from the monitor, since the monitor doesn't exist.
  172. if (this.s.options.loadBalanced) {
  173. for (const description of serverDescriptions) {
  174. const newDescription = new server_description_1.ServerDescription(description.hostAddress, undefined, {
  175. loadBalanced: this.s.options.loadBalanced
  176. });
  177. this.serverUpdateHandler(newDescription);
  178. }
  179. }
  180. const exitWithError = (error) => callback ? callback(error) : this.emit(Topology.ERROR, error);
  181. const readPreference = options.readPreference ?? read_preference_1.ReadPreference.primary;
  182. this.selectServer((0, server_selection_1.readPreferenceServerSelector)(readPreference), options, (err, server) => {
  183. if (err) {
  184. return this.close({ force: false }, () => exitWithError(err));
  185. }
  186. // TODO: NODE-2471
  187. const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
  188. if (!skipPingOnConnect && server && this.s.credentials) {
  189. server.command((0, utils_1.ns)('admin.$cmd'), { ping: 1 }, {}, err => {
  190. if (err) {
  191. return exitWithError(err);
  192. }
  193. stateTransition(this, common_1.STATE_CONNECTED);
  194. this.emit(Topology.OPEN, this);
  195. this.emit(Topology.CONNECT, this);
  196. callback?.(undefined, this);
  197. });
  198. return;
  199. }
  200. stateTransition(this, common_1.STATE_CONNECTED);
  201. this.emit(Topology.OPEN, this);
  202. this.emit(Topology.CONNECT, this);
  203. callback?.(undefined, this);
  204. });
  205. }
  206. close(options, callback) {
  207. options = options ?? { force: false };
  208. if (this.s.state === common_1.STATE_CLOSED || this.s.state === common_1.STATE_CLOSING) {
  209. return callback?.();
  210. }
  211. const destroyedServers = Array.from(this.s.servers.values(), server => {
  212. return (0, util_1.promisify)(destroyServer)(server, this, { force: !!options?.force });
  213. });
  214. Promise.all(destroyedServers)
  215. .then(() => {
  216. this.s.servers.clear();
  217. stateTransition(this, common_1.STATE_CLOSING);
  218. drainWaitQueue(this[kWaitQueue], new error_1.MongoTopologyClosedError());
  219. (0, common_1.drainTimerQueue)(this.s.connectionTimers);
  220. if (this.s.srvPoller) {
  221. this.s.srvPoller.stop();
  222. this.s.srvPoller.removeListener(srv_polling_1.SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
  223. }
  224. this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
  225. stateTransition(this, common_1.STATE_CLOSED);
  226. // emit an event for close
  227. this.emit(Topology.TOPOLOGY_CLOSED, new events_1.TopologyClosedEvent(this.s.id));
  228. })
  229. .finally(() => callback?.());
  230. }
  231. /**
  232. * Selects a server according to the selection predicate provided
  233. *
  234. * @param selector - An optional selector to select servers by, defaults to a random selection within a latency window
  235. * @param options - Optional settings related to server selection
  236. * @param callback - The callback used to indicate success or failure
  237. * @returns An instance of a `Server` meeting the criteria of the predicate provided
  238. */
  239. selectServer(selector, options, callback) {
  240. let serverSelector;
  241. if (typeof selector !== 'function') {
  242. if (typeof selector === 'string') {
  243. serverSelector = (0, server_selection_1.readPreferenceServerSelector)(read_preference_1.ReadPreference.fromString(selector));
  244. }
  245. else {
  246. let readPreference;
  247. if (selector instanceof read_preference_1.ReadPreference) {
  248. readPreference = selector;
  249. }
  250. else {
  251. read_preference_1.ReadPreference.translate(options);
  252. readPreference = options.readPreference || read_preference_1.ReadPreference.primary;
  253. }
  254. serverSelector = (0, server_selection_1.readPreferenceServerSelector)(readPreference);
  255. }
  256. }
  257. else {
  258. serverSelector = selector;
  259. }
  260. options = Object.assign({}, { serverSelectionTimeoutMS: this.s.serverSelectionTimeoutMS }, options);
  261. const isSharded = this.description.type === common_1.TopologyType.Sharded;
  262. const session = options.session;
  263. const transaction = session && session.transaction;
  264. if (isSharded && transaction && transaction.server) {
  265. callback(undefined, transaction.server);
  266. return;
  267. }
  268. const waitQueueMember = {
  269. serverSelector,
  270. transaction,
  271. callback
  272. };
  273. const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS;
  274. if (serverSelectionTimeoutMS) {
  275. waitQueueMember.timer = (0, timers_1.setTimeout)(() => {
  276. waitQueueMember[kCancelled] = true;
  277. waitQueueMember.timer = undefined;
  278. const timeoutError = new error_1.MongoServerSelectionError(`Server selection timed out after ${serverSelectionTimeoutMS} ms`, this.description);
  279. waitQueueMember.callback(timeoutError);
  280. }, serverSelectionTimeoutMS);
  281. }
  282. this[kWaitQueue].push(waitQueueMember);
  283. processWaitQueue(this);
  284. }
  285. /**
  286. * Update the internal TopologyDescription with a ServerDescription
  287. *
  288. * @param serverDescription - The server to update in the internal list of server descriptions
  289. */
  290. serverUpdateHandler(serverDescription) {
  291. if (!this.s.description.hasServer(serverDescription.address)) {
  292. return;
  293. }
  294. // ignore this server update if its from an outdated topologyVersion
  295. if (isStaleServerDescription(this.s.description, serverDescription)) {
  296. return;
  297. }
  298. // these will be used for monitoring events later
  299. const previousTopologyDescription = this.s.description;
  300. const previousServerDescription = this.s.description.servers.get(serverDescription.address);
  301. if (!previousServerDescription) {
  302. return;
  303. }
  304. // Driver Sessions Spec: "Whenever a driver receives a cluster time from
  305. // a server it MUST compare it to the current highest seen cluster time
  306. // for the deployment. If the new cluster time is higher than the
  307. // highest seen cluster time it MUST become the new highest seen cluster
  308. // time. Two cluster times are compared using only the BsonTimestamp
  309. // value of the clusterTime embedded field."
  310. const clusterTime = serverDescription.$clusterTime;
  311. if (clusterTime) {
  312. (0, common_1._advanceClusterTime)(this, clusterTime);
  313. }
  314. // If we already know all the information contained in this updated description, then
  315. // we don't need to emit SDAM events, but still need to update the description, in order
  316. // to keep client-tracked attributes like last update time and round trip time up to date
  317. const equalDescriptions = previousServerDescription && previousServerDescription.equals(serverDescription);
  318. // first update the TopologyDescription
  319. this.s.description = this.s.description.update(serverDescription);
  320. if (this.s.description.compatibilityError) {
  321. this.emit(Topology.ERROR, new error_1.MongoCompatibilityError(this.s.description.compatibilityError));
  322. return;
  323. }
  324. // emit monitoring events for this change
  325. if (!equalDescriptions) {
  326. const newDescription = this.s.description.servers.get(serverDescription.address);
  327. if (newDescription) {
  328. this.emit(Topology.SERVER_DESCRIPTION_CHANGED, new events_1.ServerDescriptionChangedEvent(this.s.id, serverDescription.address, previousServerDescription, newDescription));
  329. }
  330. }
  331. // update server list from updated descriptions
  332. updateServers(this, serverDescription);
  333. // attempt to resolve any outstanding server selection attempts
  334. if (this[kWaitQueue].length > 0) {
  335. processWaitQueue(this);
  336. }
  337. if (!equalDescriptions) {
  338. this.emit(Topology.TOPOLOGY_DESCRIPTION_CHANGED, new events_1.TopologyDescriptionChangedEvent(this.s.id, previousTopologyDescription, this.s.description));
  339. }
  340. }
  341. auth(credentials, callback) {
  342. if (typeof credentials === 'function')
  343. (callback = credentials), (credentials = undefined);
  344. if (typeof callback === 'function')
  345. callback(undefined, true);
  346. }
  347. get clientMetadata() {
  348. return this.s.options.metadata;
  349. }
  350. isConnected() {
  351. return this.s.state === common_1.STATE_CONNECTED;
  352. }
  353. isDestroyed() {
  354. return this.s.state === common_1.STATE_CLOSED;
  355. }
  356. // NOTE: There are many places in code where we explicitly check the last hello
  357. // to do feature support detection. This should be done any other way, but for
  358. // now we will just return the first hello seen, which should suffice.
  359. lastHello() {
  360. const serverDescriptions = Array.from(this.description.servers.values());
  361. if (serverDescriptions.length === 0)
  362. return {};
  363. const sd = serverDescriptions.filter((sd) => sd.type !== common_1.ServerType.Unknown)[0];
  364. const result = sd || { maxWireVersion: this.description.commonWireVersion };
  365. return result;
  366. }
  367. get commonWireVersion() {
  368. return this.description.commonWireVersion;
  369. }
  370. get logicalSessionTimeoutMinutes() {
  371. return this.description.logicalSessionTimeoutMinutes;
  372. }
  373. get clusterTime() {
  374. return this.s.clusterTime;
  375. }
  376. set clusterTime(clusterTime) {
  377. this.s.clusterTime = clusterTime;
  378. }
  379. }
  380. /** @event */
  381. Topology.SERVER_OPENING = constants_1.SERVER_OPENING;
  382. /** @event */
  383. Topology.SERVER_CLOSED = constants_1.SERVER_CLOSED;
  384. /** @event */
  385. Topology.SERVER_DESCRIPTION_CHANGED = constants_1.SERVER_DESCRIPTION_CHANGED;
  386. /** @event */
  387. Topology.TOPOLOGY_OPENING = constants_1.TOPOLOGY_OPENING;
  388. /** @event */
  389. Topology.TOPOLOGY_CLOSED = constants_1.TOPOLOGY_CLOSED;
  390. /** @event */
  391. Topology.TOPOLOGY_DESCRIPTION_CHANGED = constants_1.TOPOLOGY_DESCRIPTION_CHANGED;
  392. /** @event */
  393. Topology.ERROR = constants_1.ERROR;
  394. /** @event */
  395. Topology.OPEN = constants_1.OPEN;
  396. /** @event */
  397. Topology.CONNECT = constants_1.CONNECT;
  398. /** @event */
  399. Topology.CLOSE = constants_1.CLOSE;
  400. /** @event */
  401. Topology.TIMEOUT = constants_1.TIMEOUT;
  402. exports.Topology = Topology;
  403. /** Destroys a server, and removes all event listeners from the instance */
  404. function destroyServer(server, topology, options, callback) {
  405. options = options ?? { force: false };
  406. for (const event of constants_1.LOCAL_SERVER_EVENTS) {
  407. server.removeAllListeners(event);
  408. }
  409. server.destroy(options, () => {
  410. topology.emit(Topology.SERVER_CLOSED, new events_1.ServerClosedEvent(topology.s.id, server.description.address));
  411. for (const event of constants_1.SERVER_RELAY_EVENTS) {
  412. server.removeAllListeners(event);
  413. }
  414. if (typeof callback === 'function') {
  415. callback();
  416. }
  417. });
  418. }
  419. /** Predicts the TopologyType from options */
  420. function topologyTypeFromOptions(options) {
  421. if (options?.directConnection) {
  422. return common_1.TopologyType.Single;
  423. }
  424. if (options?.replicaSet) {
  425. return common_1.TopologyType.ReplicaSetNoPrimary;
  426. }
  427. if (options?.loadBalanced) {
  428. return common_1.TopologyType.LoadBalanced;
  429. }
  430. return common_1.TopologyType.Unknown;
  431. }
  432. /**
  433. * Creates new server instances and attempts to connect them
  434. *
  435. * @param topology - The topology that this server belongs to
  436. * @param serverDescription - The description for the server to initialize and connect to
  437. */
  438. function createAndConnectServer(topology, serverDescription) {
  439. topology.emit(Topology.SERVER_OPENING, new events_1.ServerOpeningEvent(topology.s.id, serverDescription.address));
  440. const server = new server_1.Server(topology, serverDescription, topology.s.options);
  441. for (const event of constants_1.SERVER_RELAY_EVENTS) {
  442. server.on(event, (e) => topology.emit(event, e));
  443. }
  444. server.on(server_1.Server.DESCRIPTION_RECEIVED, description => topology.serverUpdateHandler(description));
  445. server.connect();
  446. return server;
  447. }
  448. /**
  449. * @param topology - Topology to update.
  450. * @param incomingServerDescription - New server description.
  451. */
  452. function updateServers(topology, incomingServerDescription) {
  453. // update the internal server's description
  454. if (incomingServerDescription && topology.s.servers.has(incomingServerDescription.address)) {
  455. const server = topology.s.servers.get(incomingServerDescription.address);
  456. if (server) {
  457. server.s.description = incomingServerDescription;
  458. if (incomingServerDescription.error instanceof error_1.MongoError &&
  459. incomingServerDescription.error.hasErrorLabel(error_1.MongoErrorLabel.ResetPool)) {
  460. const interruptInUseConnections = incomingServerDescription.error.hasErrorLabel(error_1.MongoErrorLabel.InterruptInUseConnections);
  461. server.pool.clear({ interruptInUseConnections });
  462. }
  463. else if (incomingServerDescription.error == null) {
  464. const newTopologyType = topology.s.description.type;
  465. const shouldMarkPoolReady = incomingServerDescription.isDataBearing ||
  466. (incomingServerDescription.type !== common_1.ServerType.Unknown &&
  467. newTopologyType === common_1.TopologyType.Single);
  468. if (shouldMarkPoolReady) {
  469. server.pool.ready();
  470. }
  471. }
  472. }
  473. }
  474. // add new servers for all descriptions we currently don't know about locally
  475. for (const serverDescription of topology.description.servers.values()) {
  476. if (!topology.s.servers.has(serverDescription.address)) {
  477. const server = createAndConnectServer(topology, serverDescription);
  478. topology.s.servers.set(serverDescription.address, server);
  479. }
  480. }
  481. // for all servers no longer known, remove their descriptions and destroy their instances
  482. for (const entry of topology.s.servers) {
  483. const serverAddress = entry[0];
  484. if (topology.description.hasServer(serverAddress)) {
  485. continue;
  486. }
  487. if (!topology.s.servers.has(serverAddress)) {
  488. continue;
  489. }
  490. const server = topology.s.servers.get(serverAddress);
  491. topology.s.servers.delete(serverAddress);
  492. // prepare server for garbage collection
  493. if (server) {
  494. destroyServer(server, topology);
  495. }
  496. }
  497. }
  498. function drainWaitQueue(queue, err) {
  499. while (queue.length) {
  500. const waitQueueMember = queue.shift();
  501. if (!waitQueueMember) {
  502. continue;
  503. }
  504. if (waitQueueMember.timer) {
  505. (0, timers_1.clearTimeout)(waitQueueMember.timer);
  506. }
  507. if (!waitQueueMember[kCancelled]) {
  508. waitQueueMember.callback(err);
  509. }
  510. }
  511. }
  512. function processWaitQueue(topology) {
  513. if (topology.s.state === common_1.STATE_CLOSED) {
  514. drainWaitQueue(topology[kWaitQueue], new error_1.MongoTopologyClosedError());
  515. return;
  516. }
  517. const isSharded = topology.description.type === common_1.TopologyType.Sharded;
  518. const serverDescriptions = Array.from(topology.description.servers.values());
  519. const membersToProcess = topology[kWaitQueue].length;
  520. for (let i = 0; i < membersToProcess; ++i) {
  521. const waitQueueMember = topology[kWaitQueue].shift();
  522. if (!waitQueueMember) {
  523. continue;
  524. }
  525. if (waitQueueMember[kCancelled]) {
  526. continue;
  527. }
  528. let selectedDescriptions;
  529. try {
  530. const serverSelector = waitQueueMember.serverSelector;
  531. selectedDescriptions = serverSelector
  532. ? serverSelector(topology.description, serverDescriptions)
  533. : serverDescriptions;
  534. }
  535. catch (e) {
  536. if (waitQueueMember.timer) {
  537. (0, timers_1.clearTimeout)(waitQueueMember.timer);
  538. }
  539. waitQueueMember.callback(e);
  540. continue;
  541. }
  542. let selectedServer;
  543. if (selectedDescriptions.length === 0) {
  544. topology[kWaitQueue].push(waitQueueMember);
  545. continue;
  546. }
  547. else if (selectedDescriptions.length === 1) {
  548. selectedServer = topology.s.servers.get(selectedDescriptions[0].address);
  549. }
  550. else {
  551. const descriptions = (0, utils_1.shuffle)(selectedDescriptions, 2);
  552. const server1 = topology.s.servers.get(descriptions[0].address);
  553. const server2 = topology.s.servers.get(descriptions[1].address);
  554. selectedServer =
  555. server1 && server2 && server1.s.operationCount < server2.s.operationCount
  556. ? server1
  557. : server2;
  558. }
  559. if (!selectedServer) {
  560. waitQueueMember.callback(new error_1.MongoServerSelectionError('server selection returned a server description but the server was not found in the topology', topology.description));
  561. return;
  562. }
  563. const transaction = waitQueueMember.transaction;
  564. if (isSharded && transaction && transaction.isActive && selectedServer) {
  565. transaction.pinServer(selectedServer);
  566. }
  567. if (waitQueueMember.timer) {
  568. (0, timers_1.clearTimeout)(waitQueueMember.timer);
  569. }
  570. waitQueueMember.callback(undefined, selectedServer);
  571. }
  572. if (topology[kWaitQueue].length > 0) {
  573. // ensure all server monitors attempt monitoring soon
  574. for (const [, server] of topology.s.servers) {
  575. process.nextTick(function scheduleServerCheck() {
  576. return server.requestCheck();
  577. });
  578. }
  579. }
  580. }
  581. function isStaleServerDescription(topologyDescription, incomingServerDescription) {
  582. const currentServerDescription = topologyDescription.servers.get(incomingServerDescription.address);
  583. const currentTopologyVersion = currentServerDescription?.topologyVersion;
  584. return ((0, server_description_1.compareTopologyVersion)(currentTopologyVersion, incomingServerDescription.topologyVersion) > 0);
  585. }
  586. /** @public */
  587. class ServerCapabilities {
  588. constructor(hello) {
  589. this.minWireVersion = hello.minWireVersion || 0;
  590. this.maxWireVersion = hello.maxWireVersion || 0;
  591. }
  592. get hasAggregationCursor() {
  593. return this.maxWireVersion >= 1;
  594. }
  595. get hasWriteCommands() {
  596. return this.maxWireVersion >= 2;
  597. }
  598. get hasTextSearch() {
  599. return this.minWireVersion >= 0;
  600. }
  601. get hasAuthCommands() {
  602. return this.maxWireVersion >= 1;
  603. }
  604. get hasListCollectionsCommand() {
  605. return this.maxWireVersion >= 3;
  606. }
  607. get hasListIndexesCommand() {
  608. return this.maxWireVersion >= 3;
  609. }
  610. get supportsSnapshotReads() {
  611. return this.maxWireVersion >= 13;
  612. }
  613. get commandsTakeWriteConcern() {
  614. return this.maxWireVersion >= 5;
  615. }
  616. get commandsTakeCollation() {
  617. return this.maxWireVersion >= 5;
  618. }
  619. }
  620. exports.ServerCapabilities = ServerCapabilities;
  621. //# sourceMappingURL=topology.js.map