srv_polling.ts 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import * as dns from 'dns';
  2. import { clearTimeout, setTimeout } from 'timers';
  3. import { MongoRuntimeError } from '../error';
  4. import { TypedEventEmitter } from '../mongo_types';
  5. import { HostAddress, matchesParentDomain } from '../utils';
  6. /**
  7. * @internal
  8. * @category Event
  9. */
  10. export class SrvPollingEvent {
  11. srvRecords: dns.SrvRecord[];
  12. constructor(srvRecords: dns.SrvRecord[]) {
  13. this.srvRecords = srvRecords;
  14. }
  15. hostnames(): Set<string> {
  16. return new Set(this.srvRecords.map(r => HostAddress.fromSrvRecord(r).toString()));
  17. }
  18. }
  19. /** @internal */
  20. export interface SrvPollerOptions {
  21. srvServiceName: string;
  22. srvMaxHosts: number;
  23. srvHost: string;
  24. heartbeatFrequencyMS: number;
  25. }
  26. /** @internal */
  27. export type SrvPollerEvents = {
  28. srvRecordDiscovery(event: SrvPollingEvent): void;
  29. };
  30. /** @internal */
  31. export class SrvPoller extends TypedEventEmitter<SrvPollerEvents> {
  32. srvHost: string;
  33. rescanSrvIntervalMS: number;
  34. heartbeatFrequencyMS: number;
  35. haMode: boolean;
  36. generation: number;
  37. srvMaxHosts: number;
  38. srvServiceName: string;
  39. _timeout?: NodeJS.Timeout;
  40. /** @event */
  41. static readonly SRV_RECORD_DISCOVERY = 'srvRecordDiscovery' as const;
  42. constructor(options: SrvPollerOptions) {
  43. super();
  44. if (!options || !options.srvHost) {
  45. throw new MongoRuntimeError('Options for SrvPoller must exist and include srvHost');
  46. }
  47. this.srvHost = options.srvHost;
  48. this.srvMaxHosts = options.srvMaxHosts ?? 0;
  49. this.srvServiceName = options.srvServiceName ?? 'mongodb';
  50. this.rescanSrvIntervalMS = 60000;
  51. this.heartbeatFrequencyMS = options.heartbeatFrequencyMS ?? 10000;
  52. this.haMode = false;
  53. this.generation = 0;
  54. this._timeout = undefined;
  55. }
  56. get srvAddress(): string {
  57. return `_${this.srvServiceName}._tcp.${this.srvHost}`;
  58. }
  59. get intervalMS(): number {
  60. return this.haMode ? this.heartbeatFrequencyMS : this.rescanSrvIntervalMS;
  61. }
  62. start(): void {
  63. if (!this._timeout) {
  64. this.schedule();
  65. }
  66. }
  67. stop(): void {
  68. if (this._timeout) {
  69. clearTimeout(this._timeout);
  70. this.generation += 1;
  71. this._timeout = undefined;
  72. }
  73. }
  74. // TODO(NODE-4994): implement new logging logic for SrvPoller failures
  75. schedule(): void {
  76. if (this._timeout) {
  77. clearTimeout(this._timeout);
  78. }
  79. this._timeout = setTimeout(() => {
  80. this._poll().catch(() => null);
  81. }, this.intervalMS);
  82. }
  83. success(srvRecords: dns.SrvRecord[]): void {
  84. this.haMode = false;
  85. this.schedule();
  86. this.emit(SrvPoller.SRV_RECORD_DISCOVERY, new SrvPollingEvent(srvRecords));
  87. }
  88. failure(): void {
  89. this.haMode = true;
  90. this.schedule();
  91. }
  92. async _poll(): Promise<void> {
  93. const generation = this.generation;
  94. let srvRecords;
  95. try {
  96. srvRecords = await dns.promises.resolveSrv(this.srvAddress);
  97. } catch (dnsError) {
  98. this.failure();
  99. return;
  100. }
  101. if (generation !== this.generation) {
  102. return;
  103. }
  104. const finalAddresses: dns.SrvRecord[] = [];
  105. for (const record of srvRecords) {
  106. if (matchesParentDomain(record.name, this.srvHost)) {
  107. finalAddresses.push(record);
  108. }
  109. }
  110. if (!finalAddresses.length) {
  111. this.failure();
  112. return;
  113. }
  114. this.success(finalAddresses);
  115. }
  116. }