123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.MessageStream = void 0;
- const stream_1 = require("stream");
- const error_1 = require("../error");
- const utils_1 = require("../utils");
- const commands_1 = require("./commands");
- const compression_1 = require("./wire_protocol/compression");
- const constants_1 = require("./wire_protocol/constants");
- const MESSAGE_HEADER_SIZE = 16;
- const COMPRESSION_DETAILS_SIZE = 9;
- const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;
- const kBuffer = Symbol('buffer');
- class MessageStream extends stream_1.Duplex {
- constructor(options = {}) {
- super(options);
-
- this.isMonitoringConnection = false;
- this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize;
- this[kBuffer] = new utils_1.BufferPool();
- }
- get buffer() {
- return this[kBuffer];
- }
- _write(chunk, _, callback) {
- this[kBuffer].append(chunk);
- processIncomingData(this, callback);
- }
- _read( ) {
-
-
- return;
- }
- writeCommand(command, operationDescription) {
- const agreedCompressor = operationDescription.agreedCompressor ?? 'none';
- if (agreedCompressor === 'none' || !canCompress(command)) {
- const data = command.toBin();
- this.push(Array.isArray(data) ? Buffer.concat(data) : data);
- return;
- }
-
- const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBin());
- const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);
-
- const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
- const options = {
- agreedCompressor,
- zlibCompressionLevel: operationDescription.zlibCompressionLevel ?? 0
- };
-
- (0, compression_1.compress)(options, messageToBeCompressed).then(compressedMessage => {
-
- const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
- msgHeader.writeInt32LE(MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length, 0);
- msgHeader.writeInt32LE(command.requestId, 4);
- msgHeader.writeInt32LE(0, 8);
- msgHeader.writeInt32LE(constants_1.OP_COMPRESSED, 12);
-
- const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
- compressionDetails.writeInt32LE(originalCommandOpCode, 0);
- compressionDetails.writeInt32LE(messageToBeCompressed.length, 4);
- compressionDetails.writeUInt8(compression_1.Compressor[agreedCompressor], 8);
- this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage]));
- }, error => {
- operationDescription.cb(error);
- });
- }
- }
- exports.MessageStream = MessageStream;
- function canCompress(command) {
- const commandDoc = command instanceof commands_1.Msg ? command.command : command.query;
- const commandName = Object.keys(commandDoc)[0];
- return !compression_1.uncompressibleCommands.has(commandName);
- }
- function processIncomingData(stream, callback) {
- const buffer = stream[kBuffer];
- const sizeOfMessage = buffer.getInt32();
- if (sizeOfMessage == null) {
- return callback();
- }
- if (sizeOfMessage < 0) {
- return callback(new error_1.MongoParseError(`Invalid message size: ${sizeOfMessage}`));
- }
- if (sizeOfMessage > stream.maxBsonMessageSize) {
- return callback(new error_1.MongoParseError(`Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}`));
- }
- if (sizeOfMessage > buffer.length) {
- return callback();
- }
- const message = buffer.read(sizeOfMessage);
- const messageHeader = {
- length: message.readInt32LE(0),
- requestId: message.readInt32LE(4),
- responseTo: message.readInt32LE(8),
- opCode: message.readInt32LE(12)
- };
- const monitorHasAnotherHello = () => {
- if (stream.isMonitoringConnection) {
-
- const sizeOfMessage = buffer.getInt32();
- if (sizeOfMessage != null && sizeOfMessage <= buffer.length) {
- return true;
- }
- }
- return false;
- };
- let ResponseType = messageHeader.opCode === constants_1.OP_MSG ? commands_1.BinMsg : commands_1.Response;
- if (messageHeader.opCode !== constants_1.OP_COMPRESSED) {
- const messageBody = message.subarray(MESSAGE_HEADER_SIZE);
-
-
-
- if (monitorHasAnotherHello()) {
- return processIncomingData(stream, callback);
- }
- stream.emit('message', new ResponseType(message, messageHeader, messageBody));
- if (buffer.length >= 4) {
- return processIncomingData(stream, callback);
- }
- return callback();
- }
- messageHeader.fromCompressed = true;
- messageHeader.opCode = message.readInt32LE(MESSAGE_HEADER_SIZE);
- messageHeader.length = message.readInt32LE(MESSAGE_HEADER_SIZE + 4);
- const compressorID = message[MESSAGE_HEADER_SIZE + 8];
- const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9);
-
- ResponseType = messageHeader.opCode === constants_1.OP_MSG ? commands_1.BinMsg : commands_1.Response;
- (0, compression_1.decompress)(compressorID, compressedBuffer).then(messageBody => {
- if (messageBody.length !== messageHeader.length) {
- return callback(new error_1.MongoDecompressionError('Message body and message header must be the same length'));
- }
-
-
-
- if (monitorHasAnotherHello()) {
- return processIncomingData(stream, callback);
- }
- stream.emit('message', new ResponseType(message, messageHeader, messageBody));
- if (buffer.length >= 4) {
- return processIncomingData(stream, callback);
- }
- return callback();
- }, error => {
- return callback(error);
- });
- }
|