server_selection.js 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.readPreferenceServerSelector = exports.secondaryWritableServerSelector = exports.sameServerSelector = exports.writableServerSelector = exports.MIN_SECONDARY_WRITE_WIRE_VERSION = void 0;
  4. const error_1 = require("../error");
  5. const read_preference_1 = require("../read_preference");
  6. const common_1 = require("./common");
  7. // max staleness constants
  8. const IDLE_WRITE_PERIOD = 10000;
  9. const SMALLEST_MAX_STALENESS_SECONDS = 90;
  10. // Minimum version to try writes on secondaries.
  11. exports.MIN_SECONDARY_WRITE_WIRE_VERSION = 13;
  12. /**
  13. * Returns a server selector that selects for writable servers
  14. */
  15. function writableServerSelector() {
  16. return (topologyDescription, servers) => latencyWindowReducer(topologyDescription, servers.filter((s) => s.isWritable));
  17. }
  18. exports.writableServerSelector = writableServerSelector;
  19. /**
  20. * The purpose of this selector is to select the same server, only
  21. * if it is in a state that it can have commands sent to it.
  22. */
  23. function sameServerSelector(description) {
  24. return (topologyDescription, servers) => {
  25. if (!description)
  26. return [];
  27. // Filter the servers to match the provided description only if
  28. // the type is not unknown.
  29. return servers.filter(sd => {
  30. return sd.address === description.address && sd.type !== common_1.ServerType.Unknown;
  31. });
  32. };
  33. }
  34. exports.sameServerSelector = sameServerSelector;
  35. /**
  36. * Returns a server selector that uses a read preference to select a
  37. * server potentially for a write on a secondary.
  38. */
  39. function secondaryWritableServerSelector(wireVersion, readPreference) {
  40. // If server version < 5.0, read preference always primary.
  41. // If server version >= 5.0...
  42. // - If read preference is supplied, use that.
  43. // - If no read preference is supplied, use primary.
  44. if (!readPreference ||
  45. !wireVersion ||
  46. (wireVersion && wireVersion < exports.MIN_SECONDARY_WRITE_WIRE_VERSION)) {
  47. return readPreferenceServerSelector(read_preference_1.ReadPreference.primary);
  48. }
  49. return readPreferenceServerSelector(readPreference);
  50. }
  51. exports.secondaryWritableServerSelector = secondaryWritableServerSelector;
  52. /**
  53. * Reduces the passed in array of servers by the rules of the "Max Staleness" specification
  54. * found here: https://github.com/mongodb/specifications/blob/master/source/max-staleness/max-staleness.rst
  55. *
  56. * @param readPreference - The read preference providing max staleness guidance
  57. * @param topologyDescription - The topology description
  58. * @param servers - The list of server descriptions to be reduced
  59. * @returns The list of servers that satisfy the requirements of max staleness
  60. */
  61. function maxStalenessReducer(readPreference, topologyDescription, servers) {
  62. if (readPreference.maxStalenessSeconds == null || readPreference.maxStalenessSeconds < 0) {
  63. return servers;
  64. }
  65. const maxStaleness = readPreference.maxStalenessSeconds;
  66. const maxStalenessVariance = (topologyDescription.heartbeatFrequencyMS + IDLE_WRITE_PERIOD) / 1000;
  67. if (maxStaleness < maxStalenessVariance) {
  68. throw new error_1.MongoInvalidArgumentError(`Option "maxStalenessSeconds" must be at least ${maxStalenessVariance} seconds`);
  69. }
  70. if (maxStaleness < SMALLEST_MAX_STALENESS_SECONDS) {
  71. throw new error_1.MongoInvalidArgumentError(`Option "maxStalenessSeconds" must be at least ${SMALLEST_MAX_STALENESS_SECONDS} seconds`);
  72. }
  73. if (topologyDescription.type === common_1.TopologyType.ReplicaSetWithPrimary) {
  74. const primary = Array.from(topologyDescription.servers.values()).filter(primaryFilter)[0];
  75. return servers.reduce((result, server) => {
  76. const stalenessMS = server.lastUpdateTime -
  77. server.lastWriteDate -
  78. (primary.lastUpdateTime - primary.lastWriteDate) +
  79. topologyDescription.heartbeatFrequencyMS;
  80. const staleness = stalenessMS / 1000;
  81. const maxStalenessSeconds = readPreference.maxStalenessSeconds ?? 0;
  82. if (staleness <= maxStalenessSeconds) {
  83. result.push(server);
  84. }
  85. return result;
  86. }, []);
  87. }
  88. if (topologyDescription.type === common_1.TopologyType.ReplicaSetNoPrimary) {
  89. if (servers.length === 0) {
  90. return servers;
  91. }
  92. const sMax = servers.reduce((max, s) => s.lastWriteDate > max.lastWriteDate ? s : max);
  93. return servers.reduce((result, server) => {
  94. const stalenessMS = sMax.lastWriteDate - server.lastWriteDate + topologyDescription.heartbeatFrequencyMS;
  95. const staleness = stalenessMS / 1000;
  96. const maxStalenessSeconds = readPreference.maxStalenessSeconds ?? 0;
  97. if (staleness <= maxStalenessSeconds) {
  98. result.push(server);
  99. }
  100. return result;
  101. }, []);
  102. }
  103. return servers;
  104. }
  105. /**
  106. * Determines whether a server's tags match a given set of tags
  107. *
  108. * @param tagSet - The requested tag set to match
  109. * @param serverTags - The server's tags
  110. */
  111. function tagSetMatch(tagSet, serverTags) {
  112. const keys = Object.keys(tagSet);
  113. const serverTagKeys = Object.keys(serverTags);
  114. for (let i = 0; i < keys.length; ++i) {
  115. const key = keys[i];
  116. if (serverTagKeys.indexOf(key) === -1 || serverTags[key] !== tagSet[key]) {
  117. return false;
  118. }
  119. }
  120. return true;
  121. }
  122. /**
  123. * Reduces a set of server descriptions based on tags requested by the read preference
  124. *
  125. * @param readPreference - The read preference providing the requested tags
  126. * @param servers - The list of server descriptions to reduce
  127. * @returns The list of servers matching the requested tags
  128. */
  129. function tagSetReducer(readPreference, servers) {
  130. if (readPreference.tags == null ||
  131. (Array.isArray(readPreference.tags) && readPreference.tags.length === 0)) {
  132. return servers;
  133. }
  134. for (let i = 0; i < readPreference.tags.length; ++i) {
  135. const tagSet = readPreference.tags[i];
  136. const serversMatchingTagset = servers.reduce((matched, server) => {
  137. if (tagSetMatch(tagSet, server.tags))
  138. matched.push(server);
  139. return matched;
  140. }, []);
  141. if (serversMatchingTagset.length) {
  142. return serversMatchingTagset;
  143. }
  144. }
  145. return [];
  146. }
  147. /**
  148. * Reduces a list of servers to ensure they fall within an acceptable latency window. This is
  149. * further specified in the "Server Selection" specification, found here:
  150. * https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst
  151. *
  152. * @param topologyDescription - The topology description
  153. * @param servers - The list of servers to reduce
  154. * @returns The servers which fall within an acceptable latency window
  155. */
  156. function latencyWindowReducer(topologyDescription, servers) {
  157. const low = servers.reduce((min, server) => min === -1 ? server.roundTripTime : Math.min(server.roundTripTime, min), -1);
  158. const high = low + topologyDescription.localThresholdMS;
  159. return servers.reduce((result, server) => {
  160. if (server.roundTripTime <= high && server.roundTripTime >= low)
  161. result.push(server);
  162. return result;
  163. }, []);
  164. }
  165. // filters
  166. function primaryFilter(server) {
  167. return server.type === common_1.ServerType.RSPrimary;
  168. }
  169. function secondaryFilter(server) {
  170. return server.type === common_1.ServerType.RSSecondary;
  171. }
  172. function nearestFilter(server) {
  173. return server.type === common_1.ServerType.RSSecondary || server.type === common_1.ServerType.RSPrimary;
  174. }
  175. function knownFilter(server) {
  176. return server.type !== common_1.ServerType.Unknown;
  177. }
  178. function loadBalancerFilter(server) {
  179. return server.type === common_1.ServerType.LoadBalancer;
  180. }
  181. /**
  182. * Returns a function which selects servers based on a provided read preference
  183. *
  184. * @param readPreference - The read preference to select with
  185. */
  186. function readPreferenceServerSelector(readPreference) {
  187. if (!readPreference.isValid()) {
  188. throw new error_1.MongoInvalidArgumentError('Invalid read preference specified');
  189. }
  190. return (topologyDescription, servers) => {
  191. const commonWireVersion = topologyDescription.commonWireVersion;
  192. if (commonWireVersion &&
  193. readPreference.minWireVersion &&
  194. readPreference.minWireVersion > commonWireVersion) {
  195. throw new error_1.MongoCompatibilityError(`Minimum wire version '${readPreference.minWireVersion}' required, but found '${commonWireVersion}'`);
  196. }
  197. if (topologyDescription.type === common_1.TopologyType.LoadBalanced) {
  198. return servers.filter(loadBalancerFilter);
  199. }
  200. if (topologyDescription.type === common_1.TopologyType.Unknown) {
  201. return [];
  202. }
  203. if (topologyDescription.type === common_1.TopologyType.Single ||
  204. topologyDescription.type === common_1.TopologyType.Sharded) {
  205. return latencyWindowReducer(topologyDescription, servers.filter(knownFilter));
  206. }
  207. const mode = readPreference.mode;
  208. if (mode === read_preference_1.ReadPreference.PRIMARY) {
  209. return servers.filter(primaryFilter);
  210. }
  211. if (mode === read_preference_1.ReadPreference.PRIMARY_PREFERRED) {
  212. const result = servers.filter(primaryFilter);
  213. if (result.length) {
  214. return result;
  215. }
  216. }
  217. const filter = mode === read_preference_1.ReadPreference.NEAREST ? nearestFilter : secondaryFilter;
  218. const selectedServers = latencyWindowReducer(topologyDescription, tagSetReducer(readPreference, maxStalenessReducer(readPreference, topologyDescription, servers.filter(filter))));
  219. if (mode === read_preference_1.ReadPreference.SECONDARY_PREFERRED && selectedServers.length === 0) {
  220. return servers.filter(primaryFilter);
  221. }
  222. return selectedServers;
  223. };
  224. }
  225. exports.readPreferenceServerSelector = readPreferenceServerSelector;
  226. //# sourceMappingURL=server_selection.js.map