server-interceptors.js 28 KB


  1. "use strict";
  2. /*
  3. * Copyright 2024 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. Object.defineProperty(exports, "__esModule", { value: true });
  19. exports.getServerInterceptingCall = exports.BaseServerInterceptingCall = exports.ServerInterceptingCall = exports.ResponderBuilder = exports.isInterceptingServerListener = exports.ServerListenerBuilder = void 0;
  20. const metadata_1 = require("./metadata");
  21. const constants_1 = require("./constants");
  22. const http2 = require("http2");
  23. const error_1 = require("./error");
  24. const zlib = require("zlib");
  25. const stream_decoder_1 = require("./stream-decoder");
  26. const logging = require("./logging");
  27. const TRACER_NAME = 'server_call';
  28. function trace(text) {
  29. logging.trace(constants_1.LogVerbosity.DEBUG, TRACER_NAME, text);
  30. }
  31. class ServerListenerBuilder {
  32. constructor() {
  33. this.metadata = undefined;
  34. this.message = undefined;
  35. this.halfClose = undefined;
  36. this.cancel = undefined;
  37. }
  38. withOnReceiveMetadata(onReceiveMetadata) {
  39. this.metadata = onReceiveMetadata;
  40. return this;
  41. }
  42. withOnReceiveMessage(onReceiveMessage) {
  43. this.message = onReceiveMessage;
  44. return this;
  45. }
  46. withOnReceiveHalfClose(onReceiveHalfClose) {
  47. this.halfClose = onReceiveHalfClose;
  48. return this;
  49. }
  50. withOnCancel(onCancel) {
  51. this.cancel = onCancel;
  52. return this;
  53. }
  54. build() {
  55. return {
  56. onReceiveMetadata: this.metadata,
  57. onReceiveMessage: this.message,
  58. onReceiveHalfClose: this.halfClose,
  59. onCancel: this.cancel,
  60. };
  61. }
  62. }
  63. exports.ServerListenerBuilder = ServerListenerBuilder;
  64. function isInterceptingServerListener(listener) {
  65. return (listener.onReceiveMetadata !== undefined &&
  66. listener.onReceiveMetadata.length === 1);
  67. }
  68. exports.isInterceptingServerListener = isInterceptingServerListener;
  69. class InterceptingServerListenerImpl {
  70. constructor(listener, nextListener) {
  71. this.listener = listener;
  72. this.nextListener = nextListener;
  73. /**
  74. * Once the call is cancelled, ignore all other events.
  75. */
  76. this.cancelled = false;
  77. this.processingMetadata = false;
  78. this.hasPendingMessage = false;
  79. this.pendingMessage = null;
  80. this.processingMessage = false;
  81. this.hasPendingHalfClose = false;
  82. }
  83. processPendingMessage() {
  84. if (this.hasPendingMessage) {
  85. this.nextListener.onReceiveMessage(this.pendingMessage);
  86. this.pendingMessage = null;
  87. this.hasPendingMessage = false;
  88. }
  89. }
  90. processPendingHalfClose() {
  91. if (this.hasPendingHalfClose) {
  92. this.nextListener.onReceiveHalfClose();
  93. this.hasPendingHalfClose = false;
  94. }
  95. }
  96. onReceiveMetadata(metadata) {
  97. if (this.cancelled) {
  98. return;
  99. }
  100. this.processingMetadata = true;
  101. this.listener.onReceiveMetadata(metadata, interceptedMetadata => {
  102. this.processingMetadata = false;
  103. if (this.cancelled) {
  104. return;
  105. }
  106. this.nextListener.onReceiveMetadata(interceptedMetadata);
  107. this.processPendingMessage();
  108. this.processPendingHalfClose();
  109. });
  110. }
  111. onReceiveMessage(message) {
  112. if (this.cancelled) {
  113. return;
  114. }
  115. this.processingMessage = true;
  116. this.listener.onReceiveMessage(message, msg => {
  117. this.processingMessage = false;
  118. if (this.cancelled) {
  119. return;
  120. }
  121. if (this.processingMetadata) {
  122. this.pendingMessage = msg;
  123. this.hasPendingMessage = true;
  124. }
  125. else {
  126. this.nextListener.onReceiveMessage(msg);
  127. this.processPendingHalfClose();
  128. }
  129. });
  130. }
  131. onReceiveHalfClose() {
  132. if (this.cancelled) {
  133. return;
  134. }
  135. this.listener.onReceiveHalfClose(() => {
  136. if (this.cancelled) {
  137. return;
  138. }
  139. if (this.processingMetadata || this.processingMessage) {
  140. this.hasPendingHalfClose = true;
  141. }
  142. else {
  143. this.nextListener.onReceiveHalfClose();
  144. }
  145. });
  146. }
  147. onCancel() {
  148. this.cancelled = true;
  149. this.listener.onCancel();
  150. this.nextListener.onCancel();
  151. }
  152. }
  153. class ResponderBuilder {
  154. constructor() {
  155. this.start = undefined;
  156. this.metadata = undefined;
  157. this.message = undefined;
  158. this.status = undefined;
  159. }
  160. withStart(start) {
  161. this.start = start;
  162. return this;
  163. }
  164. withSendMetadata(sendMetadata) {
  165. this.metadata = sendMetadata;
  166. return this;
  167. }
  168. withSendMessage(sendMessage) {
  169. this.message = sendMessage;
  170. return this;
  171. }
  172. withSendStatus(sendStatus) {
  173. this.status = sendStatus;
  174. return this;
  175. }
  176. build() {
  177. return {
  178. start: this.start,
  179. sendMetadata: this.metadata,
  180. sendMessage: this.message,
  181. sendStatus: this.status,
  182. };
  183. }
  184. }
  185. exports.ResponderBuilder = ResponderBuilder;
  186. const defaultServerListener = {
  187. onReceiveMetadata: (metadata, next) => {
  188. next(metadata);
  189. },
  190. onReceiveMessage: (message, next) => {
  191. next(message);
  192. },
  193. onReceiveHalfClose: next => {
  194. next();
  195. },
  196. onCancel: () => { },
  197. };
  198. const defaultResponder = {
  199. start: next => {
  200. next();
  201. },
  202. sendMetadata: (metadata, next) => {
  203. next(metadata);
  204. },
  205. sendMessage: (message, next) => {
  206. next(message);
  207. },
  208. sendStatus: (status, next) => {
  209. next(status);
  210. },
  211. };
  212. class ServerInterceptingCall {
  213. constructor(nextCall, responder) {
  214. var _a, _b, _c, _d;
  215. this.nextCall = nextCall;
  216. this.processingMetadata = false;
  217. this.processingMessage = false;
  218. this.pendingMessage = null;
  219. this.pendingMessageCallback = null;
  220. this.pendingStatus = null;
  221. this.responder = {
  222. start: (_a = responder === null || responder === void 0 ? void 0 : responder.start) !== null && _a !== void 0 ? _a : defaultResponder.start,
  223. sendMetadata: (_b = responder === null || responder === void 0 ? void 0 : responder.sendMetadata) !== null && _b !== void 0 ? _b : defaultResponder.sendMetadata,
  224. sendMessage: (_c = responder === null || responder === void 0 ? void 0 : responder.sendMessage) !== null && _c !== void 0 ? _c : defaultResponder.sendMessage,
  225. sendStatus: (_d = responder === null || responder === void 0 ? void 0 : responder.sendStatus) !== null && _d !== void 0 ? _d : defaultResponder.sendStatus,
  226. };
  227. }
  228. processPendingMessage() {
  229. if (this.pendingMessageCallback) {
  230. this.nextCall.sendMessage(this.pendingMessage, this.pendingMessageCallback);
  231. this.pendingMessage = null;
  232. this.pendingMessageCallback = null;
  233. }
  234. }
  235. processPendingStatus() {
  236. if (this.pendingStatus) {
  237. this.nextCall.sendStatus(this.pendingStatus);
  238. this.pendingStatus = null;
  239. }
  240. }
  241. start(listener) {
  242. this.responder.start(interceptedListener => {
  243. var _a, _b, _c, _d;
  244. const fullInterceptedListener = {
  245. onReceiveMetadata: (_a = interceptedListener === null || interceptedListener === void 0 ? void 0 : interceptedListener.onReceiveMetadata) !== null && _a !== void 0 ? _a : defaultServerListener.onReceiveMetadata,
  246. onReceiveMessage: (_b = interceptedListener === null || interceptedListener === void 0 ? void 0 : interceptedListener.onReceiveMessage) !== null && _b !== void 0 ? _b : defaultServerListener.onReceiveMessage,
  247. onReceiveHalfClose: (_c = interceptedListener === null || interceptedListener === void 0 ? void 0 : interceptedListener.onReceiveHalfClose) !== null && _c !== void 0 ? _c : defaultServerListener.onReceiveHalfClose,
  248. onCancel: (_d = interceptedListener === null || interceptedListener === void 0 ? void 0 : interceptedListener.onCancel) !== null && _d !== void 0 ? _d : defaultServerListener.onCancel,
  249. };
  250. const finalInterceptingListener = new InterceptingServerListenerImpl(fullInterceptedListener, listener);
  251. this.nextCall.start(finalInterceptingListener);
  252. });
  253. }
  254. sendMetadata(metadata) {
  255. this.processingMetadata = true;
  256. this.responder.sendMetadata(metadata, interceptedMetadata => {
  257. this.processingMetadata = false;
  258. this.nextCall.sendMetadata(interceptedMetadata);
  259. this.processPendingMessage();
  260. this.processPendingStatus();
  261. });
  262. }
  263. sendMessage(message, callback) {
  264. this.processingMessage = true;
  265. this.responder.sendMessage(message, interceptedMessage => {
  266. this.processingMessage = false;
  267. if (this.processingMetadata) {
  268. this.pendingMessage = interceptedMessage;
  269. this.pendingMessageCallback = callback;
  270. }
  271. else {
  272. this.nextCall.sendMessage(interceptedMessage, callback);
  273. }
  274. });
  275. }
  276. sendStatus(status) {
  277. this.responder.sendStatus(status, interceptedStatus => {
  278. if (this.processingMetadata || this.processingMessage) {
  279. this.pendingStatus = interceptedStatus;
  280. }
  281. else {
  282. this.nextCall.sendStatus(interceptedStatus);
  283. }
  284. });
  285. }
  286. startRead() {
  287. this.nextCall.startRead();
  288. }
  289. getPeer() {
  290. return this.nextCall.getPeer();
  291. }
  292. getDeadline() {
  293. return this.nextCall.getDeadline();
  294. }
  295. getHost() {
  296. return this.nextCall.getHost();
  297. }
  298. }
  299. exports.ServerInterceptingCall = ServerInterceptingCall;
  300. const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding';
  301. const GRPC_ENCODING_HEADER = 'grpc-encoding';
  302. const GRPC_MESSAGE_HEADER = 'grpc-message';
  303. const GRPC_STATUS_HEADER = 'grpc-status';
  304. const GRPC_TIMEOUT_HEADER = 'grpc-timeout';
  305. const DEADLINE_REGEX = /(\d{1,8})\s*([HMSmun])/;
  306. const deadlineUnitsToMs = {
  307. H: 3600000,
  308. M: 60000,
  309. S: 1000,
  310. m: 1,
  311. u: 0.001,
  312. n: 0.000001,
  313. };
  314. const defaultCompressionHeaders = {
  315. // TODO(cjihrig): Remove these encoding headers from the default response
  316. // once compression is integrated.
  317. [GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip',
  318. [GRPC_ENCODING_HEADER]: 'identity',
  319. };
  320. const defaultResponseHeaders = {
  321. [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
  322. [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
  323. };
  324. const defaultResponseOptions = {
  325. waitForTrailers: true,
  326. };
  327. class BaseServerInterceptingCall {
  328. constructor(stream, headers, callEventTracker, handler, options) {
  329. var _a;
  330. this.stream = stream;
  331. this.callEventTracker = callEventTracker;
  332. this.handler = handler;
  333. this.listener = null;
  334. this.deadlineTimer = null;
  335. this.deadline = Infinity;
  336. this.maxSendMessageSize = constants_1.DEFAULT_MAX_SEND_MESSAGE_LENGTH;
  337. this.maxReceiveMessageSize = constants_1.DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
  338. this.cancelled = false;
  339. this.metadataSent = false;
  340. this.wantTrailers = false;
  341. this.cancelNotified = false;
  342. this.incomingEncoding = 'identity';
  343. this.readQueue = [];
  344. this.isReadPending = false;
  345. this.receivedHalfClose = false;
  346. this.streamEnded = false;
  347. this.stream.once('error', (err) => {
  348. /* We need an error handler to avoid uncaught error event exceptions, but
  349. * there is nothing we can reasonably do here. Any error event should
  350. * have a corresponding close event, which handles emitting the cancelled
  351. * event. And the stream is now in a bad state, so we can't reasonably
  352. * expect to be able to send an error over it. */
  353. });
  354. this.stream.once('close', () => {
  355. var _a;
  356. trace('Request to method ' +
  357. ((_a = this.handler) === null || _a === void 0 ? void 0 : _a.path) +
  358. ' stream closed with rstCode ' +
  359. this.stream.rstCode);
  360. if (this.callEventTracker && !this.streamEnded) {
  361. this.streamEnded = true;
  362. this.callEventTracker.onStreamEnd(false);
  363. this.callEventTracker.onCallEnd({
  364. code: constants_1.Status.CANCELLED,
  365. details: 'Stream closed before sending status',
  366. metadata: null,
  367. });
  368. }
  369. this.notifyOnCancel();
  370. });
  371. this.stream.on('data', (data) => {
  372. this.handleDataFrame(data);
  373. });
  374. this.stream.pause();
  375. this.stream.on('end', () => {
  376. this.handleEndEvent();
  377. });
  378. if ('grpc.max_send_message_length' in options) {
  379. this.maxSendMessageSize = options['grpc.max_send_message_length'];
  380. }
  381. if ('grpc.max_receive_message_length' in options) {
  382. this.maxReceiveMessageSize = options['grpc.max_receive_message_length'];
  383. }
  384. this.host = (_a = headers[':authority']) !== null && _a !== void 0 ? _a : headers.host;
  385. this.decoder = new stream_decoder_1.StreamDecoder(this.maxReceiveMessageSize);
  386. const metadata = metadata_1.Metadata.fromHttp2Headers(headers);
  387. if (logging.isTracerEnabled(TRACER_NAME)) {
  388. trace('Request to ' +
  389. this.handler.path +
  390. ' received headers ' +
  391. JSON.stringify(metadata.toJSON()));
  392. }
  393. const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER);
  394. if (timeoutHeader.length > 0) {
  395. this.handleTimeoutHeader(timeoutHeader[0]);
  396. }
  397. const encodingHeader = metadata.get(GRPC_ENCODING_HEADER);
  398. if (encodingHeader.length > 0) {
  399. this.incomingEncoding = encodingHeader[0];
  400. }
  401. // Remove several headers that should not be propagated to the application
  402. metadata.remove(GRPC_TIMEOUT_HEADER);
  403. metadata.remove(GRPC_ENCODING_HEADER);
  404. metadata.remove(GRPC_ACCEPT_ENCODING_HEADER);
  405. metadata.remove(http2.constants.HTTP2_HEADER_ACCEPT_ENCODING);
  406. metadata.remove(http2.constants.HTTP2_HEADER_TE);
  407. metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE);
  408. this.metadata = metadata;
  409. }
  410. handleTimeoutHeader(timeoutHeader) {
  411. const match = timeoutHeader.toString().match(DEADLINE_REGEX);
  412. if (match === null) {
  413. const status = {
  414. code: constants_1.Status.INTERNAL,
  415. details: `Invalid ${GRPC_TIMEOUT_HEADER} value "${timeoutHeader}"`,
  416. metadata: null,
  417. };
  418. // Wait for the constructor to complete before sending the error.
  419. process.nextTick(() => {
  420. this.sendStatus(status);
  421. });
  422. return;
  423. }
  424. const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;
  425. const now = new Date();
  426. this.deadline = now.setMilliseconds(now.getMilliseconds() + timeout);
  427. this.deadlineTimer = setTimeout(() => {
  428. const status = {
  429. code: constants_1.Status.DEADLINE_EXCEEDED,
  430. details: 'Deadline exceeded',
  431. metadata: null,
  432. };
  433. this.sendStatus(status);
  434. }, timeout);
  435. }
  436. checkCancelled() {
  437. /* In some cases the stream can become destroyed before the close event
  438. * fires. That creates a race condition that this check works around */
  439. if (!this.cancelled && (this.stream.destroyed || this.stream.closed)) {
  440. this.notifyOnCancel();
  441. this.cancelled = true;
  442. }
  443. return this.cancelled;
  444. }
  445. notifyOnCancel() {
  446. if (this.cancelNotified) {
  447. return;
  448. }
  449. this.cancelNotified = true;
  450. this.cancelled = true;
  451. process.nextTick(() => {
  452. var _a;
  453. (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onCancel();
  454. });
  455. if (this.deadlineTimer) {
  456. clearTimeout(this.deadlineTimer);
  457. }
  458. // Flush incoming data frames
  459. this.stream.resume();
  460. }
  461. /**
  462. * A server handler can start sending messages without explicitly sending
  463. * metadata. In that case, we need to send headers before sending any
  464. * messages. This function does that if necessary.
  465. */
  466. maybeSendMetadata() {
  467. if (!this.metadataSent) {
  468. this.sendMetadata(new metadata_1.Metadata());
  469. }
  470. }
  471. /**
  472. * Serialize a message to a length-delimited byte string.
  473. * @param value
  474. * @returns
  475. */
  476. serializeMessage(value) {
  477. const messageBuffer = this.handler.serialize(value);
  478. const byteLength = messageBuffer.byteLength;
  479. const output = Buffer.allocUnsafe(byteLength + 5);
  480. /* Note: response compression is currently not supported, so this
  481. * compressed bit is always 0. */
  482. output.writeUInt8(0, 0);
  483. output.writeUInt32BE(byteLength, 1);
  484. messageBuffer.copy(output, 5);
  485. return output;
  486. }
  487. decompressMessage(message, encoding) {
  488. const messageContents = message.subarray(5);
  489. if (encoding === 'identity') {
  490. return messageContents;
  491. }
  492. else if (encoding === 'deflate' || encoding === 'gzip') {
  493. let decompresser;
  494. if (encoding === 'deflate') {
  495. decompresser = zlib.createInflate();
  496. }
  497. else {
  498. decompresser = zlib.createGunzip();
  499. }
  500. return new Promise((resolve, reject) => {
  501. let totalLength = 0;
  502. const messageParts = [];
  503. decompresser.on('data', (chunk) => {
  504. messageParts.push(chunk);
  505. totalLength += chunk.byteLength;
  506. if (this.maxReceiveMessageSize !== -1 && totalLength > this.maxReceiveMessageSize) {
  507. decompresser.destroy();
  508. reject({
  509. code: constants_1.Status.RESOURCE_EXHAUSTED,
  510. details: `Received message that decompresses to a size larger than ${this.maxReceiveMessageSize}`
  511. });
  512. }
  513. });
  514. decompresser.on('end', () => {
  515. resolve(Buffer.concat(messageParts));
  516. });
  517. decompresser.write(messageContents);
  518. decompresser.end();
  519. });
  520. }
  521. else {
  522. return Promise.reject({
  523. code: constants_1.Status.UNIMPLEMENTED,
  524. details: `Received message compressed with unsupported encoding "${encoding}"`,
  525. });
  526. }
  527. }
  528. async decompressAndMaybePush(queueEntry) {
  529. if (queueEntry.type !== 'COMPRESSED') {
  530. throw new Error(`Invalid queue entry type: ${queueEntry.type}`);
  531. }
  532. const compressed = queueEntry.compressedMessage.readUInt8(0) === 1;
  533. const compressedMessageEncoding = compressed
  534. ? this.incomingEncoding
  535. : 'identity';
  536. let decompressedMessage;
  537. try {
  538. decompressedMessage = await this.decompressMessage(queueEntry.compressedMessage, compressedMessageEncoding);
  539. }
  540. catch (err) {
  541. this.sendStatus(err);
  542. return;
  543. }
  544. try {
  545. queueEntry.parsedMessage = this.handler.deserialize(decompressedMessage);
  546. }
  547. catch (err) {
  548. this.sendStatus({
  549. code: constants_1.Status.INTERNAL,
  550. details: `Error deserializing request: ${err.message}`,
  551. });
  552. return;
  553. }
  554. queueEntry.type = 'READABLE';
  555. this.maybePushNextMessage();
  556. }
  557. maybePushNextMessage() {
  558. if (this.listener &&
  559. this.isReadPending &&
  560. this.readQueue.length > 0 &&
  561. this.readQueue[0].type !== 'COMPRESSED') {
  562. this.isReadPending = false;
  563. const nextQueueEntry = this.readQueue.shift();
  564. if (nextQueueEntry.type === 'READABLE') {
  565. this.listener.onReceiveMessage(nextQueueEntry.parsedMessage);
  566. }
  567. else {
  568. // nextQueueEntry.type === 'HALF_CLOSE'
  569. this.listener.onReceiveHalfClose();
  570. }
  571. }
  572. }
  573. handleDataFrame(data) {
  574. var _a;
  575. if (this.checkCancelled()) {
  576. return;
  577. }
  578. trace('Request to ' +
  579. this.handler.path +
  580. ' received data frame of size ' +
  581. data.length);
  582. let rawMessages;
  583. try {
  584. rawMessages = this.decoder.write(data);
  585. }
  586. catch (e) {
  587. this.sendStatus({ code: constants_1.Status.RESOURCE_EXHAUSTED, details: e.message });
  588. return;
  589. }
  590. for (const messageBytes of rawMessages) {
  591. this.stream.pause();
  592. const queueEntry = {
  593. type: 'COMPRESSED',
  594. compressedMessage: messageBytes,
  595. parsedMessage: null,
  596. };
  597. this.readQueue.push(queueEntry);
  598. this.decompressAndMaybePush(queueEntry);
  599. (_a = this.callEventTracker) === null || _a === void 0 ? void 0 : _a.addMessageReceived();
  600. }
  601. }
  602. handleEndEvent() {
  603. this.readQueue.push({
  604. type: 'HALF_CLOSE',
  605. compressedMessage: null,
  606. parsedMessage: null,
  607. });
  608. this.receivedHalfClose = true;
  609. this.maybePushNextMessage();
  610. }
  611. start(listener) {
  612. trace('Request to ' + this.handler.path + ' start called');
  613. if (this.checkCancelled()) {
  614. return;
  615. }
  616. this.listener = listener;
  617. listener.onReceiveMetadata(this.metadata);
  618. }
  619. sendMetadata(metadata) {
  620. if (this.checkCancelled()) {
  621. return;
  622. }
  623. if (this.metadataSent) {
  624. return;
  625. }
  626. this.metadataSent = true;
  627. const custom = metadata ? metadata.toHttp2Headers() : null;
  628. const headers = Object.assign(Object.assign(Object.assign({}, defaultResponseHeaders), defaultCompressionHeaders), custom);
  629. this.stream.respond(headers, defaultResponseOptions);
  630. }
  631. sendMessage(message, callback) {
  632. if (this.checkCancelled()) {
  633. return;
  634. }
  635. let response;
  636. try {
  637. response = this.serializeMessage(message);
  638. }
  639. catch (e) {
  640. this.sendStatus({
  641. code: constants_1.Status.INTERNAL,
  642. details: `Error serializing response: ${(0, error_1.getErrorMessage)(e)}`,
  643. metadata: null,
  644. });
  645. return;
  646. }
  647. if (this.maxSendMessageSize !== -1 &&
  648. response.length - 5 > this.maxSendMessageSize) {
  649. this.sendStatus({
  650. code: constants_1.Status.RESOURCE_EXHAUSTED,
  651. details: `Sent message larger than max (${response.length} vs. ${this.maxSendMessageSize})`,
  652. metadata: null,
  653. });
  654. return;
  655. }
  656. this.maybeSendMetadata();
  657. trace('Request to ' +
  658. this.handler.path +
  659. ' sent data frame of size ' +
  660. response.length);
  661. this.stream.write(response, error => {
  662. var _a;
  663. if (error) {
  664. this.sendStatus({
  665. code: constants_1.Status.INTERNAL,
  666. details: `Error writing message: ${(0, error_1.getErrorMessage)(error)}`,
  667. metadata: null,
  668. });
  669. return;
  670. }
  671. (_a = this.callEventTracker) === null || _a === void 0 ? void 0 : _a.addMessageSent();
  672. callback();
  673. });
  674. }
  675. sendStatus(status) {
  676. var _a, _b;
  677. if (this.checkCancelled()) {
  678. return;
  679. }
  680. trace('Request to method ' +
  681. ((_a = this.handler) === null || _a === void 0 ? void 0 : _a.path) +
  682. ' ended with status code: ' +
  683. constants_1.Status[status.code] +
  684. ' details: ' +
  685. status.details);
  686. if (this.metadataSent) {
  687. if (!this.wantTrailers) {
  688. this.wantTrailers = true;
  689. this.stream.once('wantTrailers', () => {
  690. var _a;
  691. if (this.callEventTracker && !this.streamEnded) {
  692. this.streamEnded = true;
  693. this.callEventTracker.onStreamEnd(true);
  694. this.callEventTracker.onCallEnd(status);
  695. }
  696. const trailersToSend = Object.assign({ [GRPC_STATUS_HEADER]: status.code, [GRPC_MESSAGE_HEADER]: encodeURI(status.details) }, (_a = status.metadata) === null || _a === void 0 ? void 0 : _a.toHttp2Headers());
  697. this.stream.sendTrailers(trailersToSend);
  698. this.notifyOnCancel();
  699. });
  700. this.stream.end();
  701. }
  702. else {
  703. this.notifyOnCancel();
  704. }
  705. }
  706. else {
  707. if (this.callEventTracker && !this.streamEnded) {
  708. this.streamEnded = true;
  709. this.callEventTracker.onStreamEnd(true);
  710. this.callEventTracker.onCallEnd(status);
  711. }
  712. // Trailers-only response
  713. const trailersToSend = Object.assign(Object.assign({ [GRPC_STATUS_HEADER]: status.code, [GRPC_MESSAGE_HEADER]: encodeURI(status.details) }, defaultResponseHeaders), (_b = status.metadata) === null || _b === void 0 ? void 0 : _b.toHttp2Headers());
  714. this.stream.respond(trailersToSend, { endStream: true });
  715. this.notifyOnCancel();
  716. }
  717. }
  718. startRead() {
  719. trace('Request to ' + this.handler.path + ' startRead called');
  720. if (this.checkCancelled()) {
  721. return;
  722. }
  723. this.isReadPending = true;
  724. if (this.readQueue.length === 0) {
  725. if (!this.receivedHalfClose) {
  726. this.stream.resume();
  727. }
  728. }
  729. else {
  730. this.maybePushNextMessage();
  731. }
  732. }
  733. getPeer() {
  734. var _a;
  735. const socket = (_a = this.stream.session) === null || _a === void 0 ? void 0 : _a.socket;
  736. if (socket === null || socket === void 0 ? void 0 : socket.remoteAddress) {
  737. if (socket.remotePort) {
  738. return `${socket.remoteAddress}:${socket.remotePort}`;
  739. }
  740. else {
  741. return socket.remoteAddress;
  742. }
  743. }
  744. else {
  745. return 'unknown';
  746. }
  747. }
  748. getDeadline() {
  749. return this.deadline;
  750. }
  751. getHost() {
  752. return this.host;
  753. }
  754. }
  755. exports.BaseServerInterceptingCall = BaseServerInterceptingCall;
  756. function getServerInterceptingCall(interceptors, stream, headers, callEventTracker, handler, options) {
  757. const methodDefinition = {
  758. path: handler.path,
  759. requestStream: handler.type === 'clientStream' || handler.type === 'bidi',
  760. responseStream: handler.type === 'serverStream' || handler.type === 'bidi',
  761. requestDeserialize: handler.deserialize,
  762. responseSerialize: handler.serialize,
  763. };
  764. const baseCall = new BaseServerInterceptingCall(stream, headers, callEventTracker, handler, options);
  765. return interceptors.reduce((call, interceptor) => {
  766. return interceptor(methodDefinition, call);
  767. }, baseCall);
  768. }
  769. exports.getServerInterceptingCall = getServerInterceptingCall;
  770. //# sourceMappingURL=server-interceptors.js.map