connection.js 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.hasSessionSupport = exports.CryptoConnection = exports.Connection = void 0;
  4. const timers_1 = require("timers");
  5. const util_1 = require("util");
  6. const constants_1 = require("../constants");
  7. const error_1 = require("../error");
  8. const mongo_types_1 = require("../mongo_types");
  9. const sessions_1 = require("../sessions");
  10. const utils_1 = require("../utils");
  11. const command_monitoring_events_1 = require("./command_monitoring_events");
  12. const commands_1 = require("./commands");
  13. const message_stream_1 = require("./message_stream");
  14. const stream_description_1 = require("./stream_description");
  15. const shared_1 = require("./wire_protocol/shared");
  16. /** @internal */
  17. const kStream = Symbol('stream');
  18. /** @internal */
  19. const kQueue = Symbol('queue');
  20. /** @internal */
  21. const kMessageStream = Symbol('messageStream');
  22. /** @internal */
  23. const kGeneration = Symbol('generation');
  24. /** @internal */
  25. const kLastUseTime = Symbol('lastUseTime');
  26. /** @internal */
  27. const kClusterTime = Symbol('clusterTime');
  28. /** @internal */
  29. const kDescription = Symbol('description');
  30. /** @internal */
  31. const kHello = Symbol('hello');
  32. /** @internal */
  33. const kAutoEncrypter = Symbol('autoEncrypter');
  34. /** @internal */
  35. const kDelayedTimeoutId = Symbol('delayedTimeoutId');
  36. const INVALID_QUEUE_SIZE = 'Connection internal queue contains more than 1 operation description';
  37. /** @internal */
  38. class Connection extends mongo_types_1.TypedEventEmitter {
  39. constructor(stream, options) {
  40. super();
  41. this.commandAsync = (0, util_1.promisify)((ns, cmd, options, callback) => this.command(ns, cmd, options, callback));
  42. this.id = options.id;
  43. this.address = streamIdentifier(stream, options);
  44. this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
  45. this.monitorCommands = options.monitorCommands;
  46. this.serverApi = options.serverApi;
  47. this.closed = false;
  48. this[kHello] = null;
  49. this[kClusterTime] = null;
  50. this[kDescription] = new stream_description_1.StreamDescription(this.address, options);
  51. this[kGeneration] = options.generation;
  52. this[kLastUseTime] = (0, utils_1.now)();
  53. // setup parser stream and message handling
  54. this[kQueue] = new Map();
  55. this[kMessageStream] = new message_stream_1.MessageStream({
  56. ...options,
  57. maxBsonMessageSize: this.hello?.maxBsonMessageSize
  58. });
  59. this[kStream] = stream;
  60. this[kDelayedTimeoutId] = null;
  61. this[kMessageStream].on('message', message => this.onMessage(message));
  62. this[kMessageStream].on('error', error => this.onError(error));
  63. this[kStream].on('close', () => this.onClose());
  64. this[kStream].on('timeout', () => this.onTimeout());
  65. this[kStream].on('error', () => {
  66. /* ignore errors, listen to `close` instead */
  67. });
  68. // hook the message stream up to the passed in stream
  69. this[kStream].pipe(this[kMessageStream]);
  70. this[kMessageStream].pipe(this[kStream]);
  71. }
  72. get description() {
  73. return this[kDescription];
  74. }
  75. get hello() {
  76. return this[kHello];
  77. }
  78. // the `connect` method stores the result of the handshake hello on the connection
  79. set hello(response) {
  80. this[kDescription].receiveResponse(response);
  81. this[kDescription] = Object.freeze(this[kDescription]);
  82. // TODO: remove this, and only use the `StreamDescription` in the future
  83. this[kHello] = response;
  84. }
  85. // Set the whether the message stream is for a monitoring connection.
  86. set isMonitoringConnection(value) {
  87. this[kMessageStream].isMonitoringConnection = value;
  88. }
  89. get isMonitoringConnection() {
  90. return this[kMessageStream].isMonitoringConnection;
  91. }
  92. get serviceId() {
  93. return this.hello?.serviceId;
  94. }
  95. get loadBalanced() {
  96. return this.description.loadBalanced;
  97. }
  98. get generation() {
  99. return this[kGeneration] || 0;
  100. }
  101. set generation(generation) {
  102. this[kGeneration] = generation;
  103. }
  104. get idleTime() {
  105. return (0, utils_1.calculateDurationInMs)(this[kLastUseTime]);
  106. }
  107. get clusterTime() {
  108. return this[kClusterTime];
  109. }
  110. get stream() {
  111. return this[kStream];
  112. }
  113. markAvailable() {
  114. this[kLastUseTime] = (0, utils_1.now)();
  115. }
  116. onError(error) {
  117. this.cleanup(true, error);
  118. }
  119. onClose() {
  120. const message = `connection ${this.id} to ${this.address} closed`;
  121. this.cleanup(true, new error_1.MongoNetworkError(message));
  122. }
  123. onTimeout() {
  124. this[kDelayedTimeoutId] = (0, timers_1.setTimeout)(() => {
  125. const message = `connection ${this.id} to ${this.address} timed out`;
  126. const beforeHandshake = this.hello == null;
  127. this.cleanup(true, new error_1.MongoNetworkTimeoutError(message, { beforeHandshake }));
  128. }, 1).unref(); // No need for this timer to hold the event loop open
  129. }
  130. onMessage(message) {
  131. const delayedTimeoutId = this[kDelayedTimeoutId];
  132. if (delayedTimeoutId != null) {
  133. (0, timers_1.clearTimeout)(delayedTimeoutId);
  134. this[kDelayedTimeoutId] = null;
  135. }
  136. const socketTimeoutMS = this[kStream].timeout ?? 0;
  137. this[kStream].setTimeout(0);
  138. // always emit the message, in case we are streaming
  139. this.emit('message', message);
  140. let operationDescription = this[kQueue].get(message.responseTo);
  141. if (!operationDescription && this.isMonitoringConnection) {
  142. // This is how we recover when the initial hello's requestId is not
  143. // the responseTo when hello responses have been skipped:
  144. // First check if the map is of invalid size
  145. if (this[kQueue].size > 1) {
  146. this.cleanup(true, new error_1.MongoRuntimeError(INVALID_QUEUE_SIZE));
  147. }
  148. else {
  149. // Get the first orphaned operation description.
  150. const entry = this[kQueue].entries().next();
  151. if (entry.value != null) {
  152. const [requestId, orphaned] = entry.value;
  153. // If the orphaned operation description exists then set it.
  154. operationDescription = orphaned;
  155. // Remove the entry with the bad request id from the queue.
  156. this[kQueue].delete(requestId);
  157. }
  158. }
  159. }
  160. if (!operationDescription) {
  161. return;
  162. }
  163. const callback = operationDescription.cb;
  164. // SERVER-45775: For exhaust responses we should be able to use the same requestId to
  165. // track response, however the server currently synthetically produces remote requests
  166. // making the `responseTo` change on each response
  167. this[kQueue].delete(message.responseTo);
  168. if ('moreToCome' in message && message.moreToCome) {
  169. // If the operation description check above does find an orphaned
  170. // description and sets the operationDescription then this line will put one
  171. // back in the queue with the correct requestId and will resolve not being able
  172. // to find the next one via the responseTo of the next streaming hello.
  173. this[kQueue].set(message.requestId, operationDescription);
  174. this[kStream].setTimeout(socketTimeoutMS);
  175. }
  176. try {
  177. // Pass in the entire description because it has BSON parsing options
  178. message.parse(operationDescription);
  179. }
  180. catch (err) {
  181. // If this error is generated by our own code, it will already have the correct class applied
  182. // if it is not, then it is coming from a catastrophic data parse failure or the BSON library
  183. // in either case, it should not be wrapped
  184. callback(err);
  185. return;
  186. }
  187. if (message.documents[0]) {
  188. const document = message.documents[0];
  189. const session = operationDescription.session;
  190. if (session) {
  191. (0, sessions_1.updateSessionFromResponse)(session, document);
  192. }
  193. if (document.$clusterTime) {
  194. this[kClusterTime] = document.$clusterTime;
  195. this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
  196. }
  197. if (document.writeConcernError) {
  198. callback(new error_1.MongoWriteConcernError(document.writeConcernError, document), document);
  199. return;
  200. }
  201. if (document.ok === 0 || document.$err || document.errmsg || document.code) {
  202. callback(new error_1.MongoServerError(document));
  203. return;
  204. }
  205. }
  206. callback(undefined, message.documents[0]);
  207. }
  208. destroy(options, callback) {
  209. if (this.closed) {
  210. process.nextTick(() => callback?.());
  211. return;
  212. }
  213. if (typeof callback === 'function') {
  214. this.once('close', () => process.nextTick(() => callback()));
  215. }
  216. // load balanced mode requires that these listeners remain on the connection
  217. // after cleanup on timeouts, errors or close so we remove them before calling
  218. // cleanup.
  219. this.removeAllListeners(Connection.PINNED);
  220. this.removeAllListeners(Connection.UNPINNED);
  221. const message = `connection ${this.id} to ${this.address} closed`;
  222. this.cleanup(options.force, new error_1.MongoNetworkError(message));
  223. }
  224. /**
  225. * A method that cleans up the connection. When `force` is true, this method
  226. * forcibly destroys the socket.
  227. *
  228. * If an error is provided, any in-flight operations will be closed with the error.
  229. *
  230. * This method does nothing if the connection is already closed.
  231. */
  232. cleanup(force, error) {
  233. if (this.closed) {
  234. return;
  235. }
  236. this.closed = true;
  237. const completeCleanup = () => {
  238. for (const op of this[kQueue].values()) {
  239. op.cb(error);
  240. }
  241. this[kQueue].clear();
  242. this.emit(Connection.CLOSE);
  243. };
  244. this[kStream].removeAllListeners();
  245. this[kMessageStream].removeAllListeners();
  246. this[kMessageStream].destroy();
  247. if (force) {
  248. this[kStream].destroy();
  249. completeCleanup();
  250. return;
  251. }
  252. if (!this[kStream].writableEnded) {
  253. this[kStream].end(() => {
  254. this[kStream].destroy();
  255. completeCleanup();
  256. });
  257. }
  258. else {
  259. completeCleanup();
  260. }
  261. }
  262. command(ns, command, options, callback) {
  263. let cmd = { ...command };
  264. const readPreference = (0, shared_1.getReadPreference)(options);
  265. const shouldUseOpMsg = supportsOpMsg(this);
  266. const session = options?.session;
  267. let clusterTime = this.clusterTime;
  268. if (this.serverApi) {
  269. const { version, strict, deprecationErrors } = this.serverApi;
  270. cmd.apiVersion = version;
  271. if (strict != null)
  272. cmd.apiStrict = strict;
  273. if (deprecationErrors != null)
  274. cmd.apiDeprecationErrors = deprecationErrors;
  275. }
  276. if (hasSessionSupport(this) && session) {
  277. if (session.clusterTime &&
  278. clusterTime &&
  279. session.clusterTime.clusterTime.greaterThan(clusterTime.clusterTime)) {
  280. clusterTime = session.clusterTime;
  281. }
  282. const err = (0, sessions_1.applySession)(session, cmd, options);
  283. if (err) {
  284. return callback(err);
  285. }
  286. }
  287. else if (session?.explicit) {
  288. return callback(new error_1.MongoCompatibilityError('Current topology does not support sessions'));
  289. }
  290. // if we have a known cluster time, gossip it
  291. if (clusterTime) {
  292. cmd.$clusterTime = clusterTime;
  293. }
  294. if ((0, shared_1.isSharded)(this) && !shouldUseOpMsg && readPreference && readPreference.mode !== 'primary') {
  295. cmd = {
  296. $query: cmd,
  297. $readPreference: readPreference.toJSON()
  298. };
  299. }
  300. const commandOptions = Object.assign({
  301. numberToSkip: 0,
  302. numberToReturn: -1,
  303. checkKeys: false,
  304. // This value is not overridable
  305. secondaryOk: readPreference.secondaryOk()
  306. }, options);
  307. const message = shouldUseOpMsg
  308. ? new commands_1.Msg(ns.db, cmd, commandOptions)
  309. : new commands_1.Query(ns.db, cmd, commandOptions);
  310. try {
  311. write(this, message, commandOptions, callback);
  312. }
  313. catch (err) {
  314. callback(err);
  315. }
  316. }
  317. }
  318. /** @event */
  319. Connection.COMMAND_STARTED = constants_1.COMMAND_STARTED;
  320. /** @event */
  321. Connection.COMMAND_SUCCEEDED = constants_1.COMMAND_SUCCEEDED;
  322. /** @event */
  323. Connection.COMMAND_FAILED = constants_1.COMMAND_FAILED;
  324. /** @event */
  325. Connection.CLUSTER_TIME_RECEIVED = constants_1.CLUSTER_TIME_RECEIVED;
  326. /** @event */
  327. Connection.CLOSE = constants_1.CLOSE;
  328. /** @event */
  329. Connection.MESSAGE = constants_1.MESSAGE;
  330. /** @event */
  331. Connection.PINNED = constants_1.PINNED;
  332. /** @event */
  333. Connection.UNPINNED = constants_1.UNPINNED;
  334. exports.Connection = Connection;
  335. /** @internal */
  336. class CryptoConnection extends Connection {
  337. constructor(stream, options) {
  338. super(stream, options);
  339. this[kAutoEncrypter] = options.autoEncrypter;
  340. }
  341. /** @internal @override */
  342. command(ns, cmd, options, callback) {
  343. const autoEncrypter = this[kAutoEncrypter];
  344. if (!autoEncrypter) {
  345. return callback(new error_1.MongoMissingDependencyError('No AutoEncrypter available for encryption'));
  346. }
  347. const serverWireVersion = (0, utils_1.maxWireVersion)(this);
  348. if (serverWireVersion === 0) {
  349. // This means the initial handshake hasn't happened yet
  350. return super.command(ns, cmd, options, callback);
  351. }
  352. if (serverWireVersion < 8) {
  353. callback(new error_1.MongoCompatibilityError('Auto-encryption requires a minimum MongoDB version of 4.2'));
  354. return;
  355. }
  356. // Save sort or indexKeys based on the command being run
  357. // the encrypt API serializes our JS objects to BSON to pass to the native code layer
  358. // and then deserializes the encrypted result, the protocol level components
  359. // of the command (ex. sort) are then converted to JS objects potentially losing
  360. // import key order information. These fields are never encrypted so we can save the values
  361. // from before the encryption and replace them after encryption has been performed
  362. const sort = cmd.find || cmd.findAndModify ? cmd.sort : null;
  363. const indexKeys = cmd.createIndexes
  364. ? cmd.indexes.map((index) => index.key)
  365. : null;
  366. autoEncrypter.encrypt(ns.toString(), cmd, options).then(encrypted => {
  367. // Replace the saved values
  368. if (sort != null && (cmd.find || cmd.findAndModify)) {
  369. encrypted.sort = sort;
  370. }
  371. if (indexKeys != null && cmd.createIndexes) {
  372. for (const [offset, index] of indexKeys.entries()) {
  373. // @ts-expect-error `encrypted` is a generic "command", but we've narrowed for only `createIndexes` commands here
  374. encrypted.indexes[offset].key = index;
  375. }
  376. }
  377. super.command(ns, encrypted, options, (err, response) => {
  378. if (err || response == null) {
  379. callback(err, response);
  380. return;
  381. }
  382. autoEncrypter.decrypt(response, options).then(res => callback(undefined, res), err => callback(err));
  383. });
  384. }, err => {
  385. if (err) {
  386. callback(err, null);
  387. }
  388. });
  389. }
  390. }
  391. exports.CryptoConnection = CryptoConnection;
  392. /** @internal */
  393. function hasSessionSupport(conn) {
  394. const description = conn.description;
  395. return description.logicalSessionTimeoutMinutes != null;
  396. }
  397. exports.hasSessionSupport = hasSessionSupport;
  398. function supportsOpMsg(conn) {
  399. const description = conn.description;
  400. if (description == null) {
  401. return false;
  402. }
  403. return (0, utils_1.maxWireVersion)(conn) >= 6 && !description.__nodejs_mock_server__;
  404. }
  405. function streamIdentifier(stream, options) {
  406. if (options.proxyHost) {
  407. // If proxy options are specified, the properties of `stream` itself
  408. // will not accurately reflect what endpoint this is connected to.
  409. return options.hostAddress.toString();
  410. }
  411. const { remoteAddress, remotePort } = stream;
  412. if (typeof remoteAddress === 'string' && typeof remotePort === 'number') {
  413. return utils_1.HostAddress.fromHostPort(remoteAddress, remotePort).toString();
  414. }
  415. return (0, utils_1.uuidV4)().toString('hex');
  416. }
  417. function write(conn, command, options, callback) {
  418. options = options ?? {};
  419. const operationDescription = {
  420. requestId: command.requestId,
  421. cb: callback,
  422. session: options.session,
  423. noResponse: typeof options.noResponse === 'boolean' ? options.noResponse : false,
  424. documentsReturnedIn: options.documentsReturnedIn,
  425. // for BSON parsing
  426. useBigInt64: typeof options.useBigInt64 === 'boolean' ? options.useBigInt64 : false,
  427. promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
  428. promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
  429. promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false,
  430. bsonRegExp: typeof options.bsonRegExp === 'boolean' ? options.bsonRegExp : false,
  431. enableUtf8Validation: typeof options.enableUtf8Validation === 'boolean' ? options.enableUtf8Validation : true,
  432. raw: typeof options.raw === 'boolean' ? options.raw : false,
  433. started: 0
  434. };
  435. if (conn[kDescription] && conn[kDescription].compressor) {
  436. operationDescription.agreedCompressor = conn[kDescription].compressor;
  437. if (conn[kDescription].zlibCompressionLevel) {
  438. operationDescription.zlibCompressionLevel = conn[kDescription].zlibCompressionLevel;
  439. }
  440. }
  441. if (typeof options.socketTimeoutMS === 'number') {
  442. conn[kStream].setTimeout(options.socketTimeoutMS);
  443. }
  444. else if (conn.socketTimeoutMS !== 0) {
  445. conn[kStream].setTimeout(conn.socketTimeoutMS);
  446. }
  447. // if command monitoring is enabled we need to modify the callback here
  448. if (conn.monitorCommands) {
  449. conn.emit(Connection.COMMAND_STARTED, new command_monitoring_events_1.CommandStartedEvent(conn, command));
  450. operationDescription.started = (0, utils_1.now)();
  451. operationDescription.cb = (err, reply) => {
  452. // Command monitoring spec states that if ok is 1, then we must always emit
  453. // a command succeeded event, even if there's an error. Write concern errors
  454. // will have an ok: 1 in their reply.
  455. if (err && reply?.ok !== 1) {
  456. conn.emit(Connection.COMMAND_FAILED, new command_monitoring_events_1.CommandFailedEvent(conn, command, err, operationDescription.started));
  457. }
  458. else {
  459. if (reply && (reply.ok === 0 || reply.$err)) {
  460. conn.emit(Connection.COMMAND_FAILED, new command_monitoring_events_1.CommandFailedEvent(conn, command, reply, operationDescription.started));
  461. }
  462. else {
  463. conn.emit(Connection.COMMAND_SUCCEEDED, new command_monitoring_events_1.CommandSucceededEvent(conn, command, reply, operationDescription.started));
  464. }
  465. }
  466. if (typeof callback === 'function') {
  467. // Since we're passing through the reply with the write concern error now, we
  468. // need it not to be provided to the original callback in this case so
  469. // retryability does not get tricked into thinking the command actually
  470. // succeeded.
  471. callback(err, err instanceof error_1.MongoWriteConcernError ? undefined : reply);
  472. }
  473. };
  474. }
  475. if (!operationDescription.noResponse) {
  476. conn[kQueue].set(operationDescription.requestId, operationDescription);
  477. }
  478. try {
  479. conn[kMessageStream].writeCommand(command, operationDescription);
  480. }
  481. catch (e) {
  482. if (!operationDescription.noResponse) {
  483. conn[kQueue].delete(operationDescription.requestId);
  484. operationDescription.cb(e);
  485. return;
  486. }
  487. }
  488. if (operationDescription.noResponse) {
  489. operationDescription.cb();
  490. }
  491. }
  492. //# sourceMappingURL=connection.js.map