monitor.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.MonitorInterval = exports.RTTPinger = exports.Monitor = void 0;
  4. const timers_1 = require("timers");
  5. const bson_1 = require("../bson");
  6. const connect_1 = require("../cmap/connect");
  7. const connection_1 = require("../cmap/connection");
  8. const constants_1 = require("../constants");
  9. const error_1 = require("../error");
  10. const mongo_types_1 = require("../mongo_types");
  11. const utils_1 = require("../utils");
  12. const common_1 = require("./common");
  13. const events_1 = require("./events");
  14. const server_1 = require("./server");
  15. /** @internal */
  16. const kServer = Symbol('server');
  17. /** @internal */
  18. const kMonitorId = Symbol('monitorId');
  19. /** @internal */
  20. const kConnection = Symbol('connection');
  21. /** @internal */
  22. const kCancellationToken = Symbol('cancellationToken');
  23. /** @internal */
  24. const kRTTPinger = Symbol('rttPinger');
  25. /** @internal */
  26. const kRoundTripTime = Symbol('roundTripTime');
  27. const STATE_IDLE = 'idle';
  28. const STATE_MONITORING = 'monitoring';
  29. const stateTransition = (0, utils_1.makeStateMachine)({
  30. [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, STATE_IDLE, common_1.STATE_CLOSED],
  31. [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, STATE_MONITORING],
  32. [STATE_IDLE]: [STATE_IDLE, STATE_MONITORING, common_1.STATE_CLOSING],
  33. [STATE_MONITORING]: [STATE_MONITORING, STATE_IDLE, common_1.STATE_CLOSING]
  34. });
  35. const INVALID_REQUEST_CHECK_STATES = new Set([common_1.STATE_CLOSING, common_1.STATE_CLOSED, STATE_MONITORING]);
  36. function isInCloseState(monitor) {
  37. return monitor.s.state === common_1.STATE_CLOSED || monitor.s.state === common_1.STATE_CLOSING;
  38. }
  39. /** @internal */
  40. class Monitor extends mongo_types_1.TypedEventEmitter {
  41. get connection() {
  42. return this[kConnection];
  43. }
  44. constructor(server, options) {
  45. super();
  46. this[kServer] = server;
  47. this[kConnection] = undefined;
  48. this[kCancellationToken] = new mongo_types_1.CancellationToken();
  49. this[kCancellationToken].setMaxListeners(Infinity);
  50. this[kMonitorId] = undefined;
  51. this.s = {
  52. state: common_1.STATE_CLOSED
  53. };
  54. this.address = server.description.address;
  55. this.options = Object.freeze({
  56. connectTimeoutMS: options.connectTimeoutMS ?? 10000,
  57. heartbeatFrequencyMS: options.heartbeatFrequencyMS ?? 10000,
  58. minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500
  59. });
  60. const cancellationToken = this[kCancellationToken];
  61. // TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
  62. const connectOptions = Object.assign({
  63. id: '<monitor>',
  64. generation: server.pool.generation,
  65. connectionType: connection_1.Connection,
  66. cancellationToken,
  67. hostAddress: server.description.hostAddress
  68. }, options,
  69. // force BSON serialization options
  70. {
  71. raw: false,
  72. useBigInt64: false,
  73. promoteLongs: true,
  74. promoteValues: true,
  75. promoteBuffers: true
  76. });
  77. // ensure no authentication is used for monitoring
  78. delete connectOptions.credentials;
  79. if (connectOptions.autoEncrypter) {
  80. delete connectOptions.autoEncrypter;
  81. }
  82. this.connectOptions = Object.freeze(connectOptions);
  83. }
  84. connect() {
  85. if (this.s.state !== common_1.STATE_CLOSED) {
  86. return;
  87. }
  88. // start
  89. const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
  90. const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
  91. this[kMonitorId] = new MonitorInterval(monitorServer(this), {
  92. heartbeatFrequencyMS: heartbeatFrequencyMS,
  93. minHeartbeatFrequencyMS: minHeartbeatFrequencyMS,
  94. immediate: true
  95. });
  96. }
  97. requestCheck() {
  98. if (INVALID_REQUEST_CHECK_STATES.has(this.s.state)) {
  99. return;
  100. }
  101. this[kMonitorId]?.wake();
  102. }
  103. reset() {
  104. const topologyVersion = this[kServer].description.topologyVersion;
  105. if (isInCloseState(this) || topologyVersion == null) {
  106. return;
  107. }
  108. stateTransition(this, common_1.STATE_CLOSING);
  109. resetMonitorState(this);
  110. // restart monitor
  111. stateTransition(this, STATE_IDLE);
  112. // restart monitoring
  113. const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
  114. const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
  115. this[kMonitorId] = new MonitorInterval(monitorServer(this), {
  116. heartbeatFrequencyMS: heartbeatFrequencyMS,
  117. minHeartbeatFrequencyMS: minHeartbeatFrequencyMS
  118. });
  119. }
  120. close() {
  121. if (isInCloseState(this)) {
  122. return;
  123. }
  124. stateTransition(this, common_1.STATE_CLOSING);
  125. resetMonitorState(this);
  126. // close monitor
  127. this.emit('close');
  128. stateTransition(this, common_1.STATE_CLOSED);
  129. }
  130. }
  131. exports.Monitor = Monitor;
  132. function resetMonitorState(monitor) {
  133. monitor[kMonitorId]?.stop();
  134. monitor[kMonitorId] = undefined;
  135. monitor[kRTTPinger]?.close();
  136. monitor[kRTTPinger] = undefined;
  137. monitor[kCancellationToken].emit('cancel');
  138. monitor[kConnection]?.destroy({ force: true });
  139. monitor[kConnection] = undefined;
  140. }
  141. function checkServer(monitor, callback) {
  142. let start = (0, utils_1.now)();
  143. const topologyVersion = monitor[kServer].description.topologyVersion;
  144. const isAwaitable = topologyVersion != null;
  145. monitor.emit(server_1.Server.SERVER_HEARTBEAT_STARTED, new events_1.ServerHeartbeatStartedEvent(monitor.address, isAwaitable));
  146. function failureHandler(err) {
  147. monitor[kConnection]?.destroy({ force: true });
  148. monitor[kConnection] = undefined;
  149. monitor.emit(server_1.Server.SERVER_HEARTBEAT_FAILED, new events_1.ServerHeartbeatFailedEvent(monitor.address, (0, utils_1.calculateDurationInMs)(start), err, isAwaitable));
  150. const error = !(err instanceof error_1.MongoError)
  151. ? new error_1.MongoError(error_1.MongoError.buildErrorMessage(err), { cause: err })
  152. : err;
  153. error.addErrorLabel(error_1.MongoErrorLabel.ResetPool);
  154. if (error instanceof error_1.MongoNetworkTimeoutError) {
  155. error.addErrorLabel(error_1.MongoErrorLabel.InterruptInUseConnections);
  156. }
  157. monitor.emit('resetServer', error);
  158. callback(err);
  159. }
  160. const connection = monitor[kConnection];
  161. if (connection && !connection.closed) {
  162. const { serverApi, helloOk } = connection;
  163. const connectTimeoutMS = monitor.options.connectTimeoutMS;
  164. const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS;
  165. const cmd = {
  166. [serverApi?.version || helloOk ? 'hello' : constants_1.LEGACY_HELLO_COMMAND]: 1,
  167. ...(isAwaitable && topologyVersion
  168. ? { maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) }
  169. : {})
  170. };
  171. const options = isAwaitable
  172. ? {
  173. socketTimeoutMS: connectTimeoutMS ? connectTimeoutMS + maxAwaitTimeMS : 0,
  174. exhaustAllowed: true
  175. }
  176. : { socketTimeoutMS: connectTimeoutMS };
  177. if (isAwaitable && monitor[kRTTPinger] == null) {
  178. monitor[kRTTPinger] = new RTTPinger(monitor[kCancellationToken], Object.assign({ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS }, monitor.connectOptions));
  179. }
  180. connection.command((0, utils_1.ns)('admin.$cmd'), cmd, options, (err, hello) => {
  181. if (err) {
  182. return failureHandler(err);
  183. }
  184. if (!('isWritablePrimary' in hello)) {
  185. // Provide hello-style response document.
  186. hello.isWritablePrimary = hello[constants_1.LEGACY_HELLO_COMMAND];
  187. }
  188. const rttPinger = monitor[kRTTPinger];
  189. const duration = isAwaitable && rttPinger ? rttPinger.roundTripTime : (0, utils_1.calculateDurationInMs)(start);
  190. const awaited = isAwaitable && hello.topologyVersion != null;
  191. monitor.emit(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, new events_1.ServerHeartbeatSucceededEvent(monitor.address, duration, hello, awaited));
  192. // if we are using the streaming protocol then we immediately issue another `started`
  193. // event, otherwise the "check" is complete and return to the main monitor loop
  194. if (awaited) {
  195. monitor.emit(server_1.Server.SERVER_HEARTBEAT_STARTED, new events_1.ServerHeartbeatStartedEvent(monitor.address, true));
  196. start = (0, utils_1.now)();
  197. }
  198. else {
  199. monitor[kRTTPinger]?.close();
  200. monitor[kRTTPinger] = undefined;
  201. callback(undefined, hello);
  202. }
  203. });
  204. return;
  205. }
  206. // connecting does an implicit `hello`
  207. (0, connect_1.connect)(monitor.connectOptions, (err, conn) => {
  208. if (err) {
  209. monitor[kConnection] = undefined;
  210. failureHandler(err);
  211. return;
  212. }
  213. if (conn) {
  214. // Tell the connection that we are using the streaming protocol so that the
  215. // connection's message stream will only read the last hello on the buffer.
  216. conn.isMonitoringConnection = true;
  217. if (isInCloseState(monitor)) {
  218. conn.destroy({ force: true });
  219. return;
  220. }
  221. monitor[kConnection] = conn;
  222. monitor.emit(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, new events_1.ServerHeartbeatSucceededEvent(monitor.address, (0, utils_1.calculateDurationInMs)(start), conn.hello, false));
  223. callback(undefined, conn.hello);
  224. }
  225. });
  226. }
  227. function monitorServer(monitor) {
  228. return (callback) => {
  229. if (monitor.s.state === STATE_MONITORING) {
  230. process.nextTick(callback);
  231. return;
  232. }
  233. stateTransition(monitor, STATE_MONITORING);
  234. function done() {
  235. if (!isInCloseState(monitor)) {
  236. stateTransition(monitor, STATE_IDLE);
  237. }
  238. callback();
  239. }
  240. checkServer(monitor, (err, hello) => {
  241. if (err) {
  242. // otherwise an error occurred on initial discovery, also bail
  243. if (monitor[kServer].description.type === common_1.ServerType.Unknown) {
  244. return done();
  245. }
  246. }
  247. // if the check indicates streaming is supported, immediately reschedule monitoring
  248. if (hello && hello.topologyVersion) {
  249. (0, timers_1.setTimeout)(() => {
  250. if (!isInCloseState(monitor)) {
  251. monitor[kMonitorId]?.wake();
  252. }
  253. }, 0);
  254. }
  255. done();
  256. });
  257. };
  258. }
  259. function makeTopologyVersion(tv) {
  260. return {
  261. processId: tv.processId,
  262. // tests mock counter as just number, but in a real situation counter should always be a Long
  263. // TODO(NODE-2674): Preserve int64 sent from MongoDB
  264. counter: bson_1.Long.isLong(tv.counter) ? tv.counter : bson_1.Long.fromNumber(tv.counter)
  265. };
  266. }
  267. /** @internal */
  268. class RTTPinger {
  269. constructor(cancellationToken, options) {
  270. this[kConnection] = undefined;
  271. this[kCancellationToken] = cancellationToken;
  272. this[kRoundTripTime] = 0;
  273. this.closed = false;
  274. const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
  275. this[kMonitorId] = (0, timers_1.setTimeout)(() => measureRoundTripTime(this, options), heartbeatFrequencyMS);
  276. }
  277. get roundTripTime() {
  278. return this[kRoundTripTime];
  279. }
  280. close() {
  281. this.closed = true;
  282. (0, timers_1.clearTimeout)(this[kMonitorId]);
  283. this[kConnection]?.destroy({ force: true });
  284. this[kConnection] = undefined;
  285. }
  286. }
  287. exports.RTTPinger = RTTPinger;
  288. function measureRoundTripTime(rttPinger, options) {
  289. const start = (0, utils_1.now)();
  290. options.cancellationToken = rttPinger[kCancellationToken];
  291. const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
  292. if (rttPinger.closed) {
  293. return;
  294. }
  295. function measureAndReschedule(conn) {
  296. if (rttPinger.closed) {
  297. conn?.destroy({ force: true });
  298. return;
  299. }
  300. if (rttPinger[kConnection] == null) {
  301. rttPinger[kConnection] = conn;
  302. }
  303. rttPinger[kRoundTripTime] = (0, utils_1.calculateDurationInMs)(start);
  304. rttPinger[kMonitorId] = (0, timers_1.setTimeout)(() => measureRoundTripTime(rttPinger, options), heartbeatFrequencyMS);
  305. }
  306. const connection = rttPinger[kConnection];
  307. if (connection == null) {
  308. (0, connect_1.connect)(options, (err, conn) => {
  309. if (err) {
  310. rttPinger[kConnection] = undefined;
  311. rttPinger[kRoundTripTime] = 0;
  312. return;
  313. }
  314. measureAndReschedule(conn);
  315. });
  316. return;
  317. }
  318. connection.command((0, utils_1.ns)('admin.$cmd'), { [constants_1.LEGACY_HELLO_COMMAND]: 1 }, undefined, err => {
  319. if (err) {
  320. rttPinger[kConnection] = undefined;
  321. rttPinger[kRoundTripTime] = 0;
  322. return;
  323. }
  324. measureAndReschedule();
  325. });
  326. }
  327. /**
  328. * @internal
  329. */
  330. class MonitorInterval {
  331. constructor(fn, options = {}) {
  332. this.isExpeditedCallToFnScheduled = false;
  333. this.stopped = false;
  334. this.isExecutionInProgress = false;
  335. this.hasExecutedOnce = false;
  336. this._executeAndReschedule = () => {
  337. if (this.stopped)
  338. return;
  339. if (this.timerId) {
  340. (0, timers_1.clearTimeout)(this.timerId);
  341. }
  342. this.isExpeditedCallToFnScheduled = false;
  343. this.isExecutionInProgress = true;
  344. this.fn(() => {
  345. this.lastExecutionEnded = (0, utils_1.now)();
  346. this.isExecutionInProgress = false;
  347. this._reschedule(this.heartbeatFrequencyMS);
  348. });
  349. };
  350. this.fn = fn;
  351. this.lastExecutionEnded = -Infinity;
  352. this.heartbeatFrequencyMS = options.heartbeatFrequencyMS ?? 1000;
  353. this.minHeartbeatFrequencyMS = options.minHeartbeatFrequencyMS ?? 500;
  354. if (options.immediate) {
  355. this._executeAndReschedule();
  356. }
  357. else {
  358. this._reschedule(undefined);
  359. }
  360. }
  361. wake() {
  362. const currentTime = (0, utils_1.now)();
  363. const timeSinceLastCall = currentTime - this.lastExecutionEnded;
  364. // TODO(NODE-4674): Add error handling and logging to the monitor
  365. if (timeSinceLastCall < 0) {
  366. return this._executeAndReschedule();
  367. }
  368. if (this.isExecutionInProgress) {
  369. return;
  370. }
  371. // debounce multiple calls to wake within the `minInterval`
  372. if (this.isExpeditedCallToFnScheduled) {
  373. return;
  374. }
  375. // reschedule a call as soon as possible, ensuring the call never happens
  376. // faster than the `minInterval`
  377. if (timeSinceLastCall < this.minHeartbeatFrequencyMS) {
  378. this.isExpeditedCallToFnScheduled = true;
  379. this._reschedule(this.minHeartbeatFrequencyMS - timeSinceLastCall);
  380. return;
  381. }
  382. this._executeAndReschedule();
  383. }
  384. stop() {
  385. this.stopped = true;
  386. if (this.timerId) {
  387. (0, timers_1.clearTimeout)(this.timerId);
  388. this.timerId = undefined;
  389. }
  390. this.lastExecutionEnded = -Infinity;
  391. this.isExpeditedCallToFnScheduled = false;
  392. }
  393. toString() {
  394. return JSON.stringify(this);
  395. }
  396. toJSON() {
  397. const currentTime = (0, utils_1.now)();
  398. const timeSinceLastCall = currentTime - this.lastExecutionEnded;
  399. return {
  400. timerId: this.timerId != null ? 'set' : 'cleared',
  401. lastCallTime: this.lastExecutionEnded,
  402. isExpeditedCheckScheduled: this.isExpeditedCallToFnScheduled,
  403. stopped: this.stopped,
  404. heartbeatFrequencyMS: this.heartbeatFrequencyMS,
  405. minHeartbeatFrequencyMS: this.minHeartbeatFrequencyMS,
  406. currentTime,
  407. timeSinceLastCall
  408. };
  409. }
  410. _reschedule(ms) {
  411. if (this.stopped)
  412. return;
  413. if (this.timerId) {
  414. (0, timers_1.clearTimeout)(this.timerId);
  415. }
  416. this.timerId = (0, timers_1.setTimeout)(this._executeAndReschedule, ms || this.heartbeatFrequencyMS);
  417. }
  418. }
  419. exports.MonitorInterval = MonitorInterval;
  420. //# sourceMappingURL=monitor.js.map