server.ts 64 KB


  1. /*
  2. * Copyright 2019 gRPC authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. */
  17. import * as http2 from 'http2';
  18. import * as util from 'util';
  19. import { ServiceError } from './call';
  20. import { Status, LogVerbosity } from './constants';
  21. import { Deserialize, Serialize, ServiceDefinition } from './make-client';
  22. import { Metadata } from './metadata';
  23. import {
  24. BidiStreamingHandler,
  25. ClientStreamingHandler,
  26. HandleCall,
  27. Handler,
  28. HandlerType,
  29. sendUnaryData,
  30. ServerDuplexStream,
  31. ServerDuplexStreamImpl,
  32. ServerReadableStream,
  33. ServerStreamingHandler,
  34. ServerUnaryCall,
  35. ServerWritableStream,
  36. ServerWritableStreamImpl,
  37. UnaryHandler,
  38. ServerErrorResponse,
  39. ServerStatusResponse,
  40. serverErrorToStatus,
  41. } from './server-call';
  42. import { SecureContextWatcher, ServerCredentials } from './server-credentials';
  43. import { ChannelOptions } from './channel-options';
  44. import {
  45. createResolver,
  46. ResolverListener,
  47. mapUriDefaultScheme,
  48. } from './resolver';
  49. import * as logging from './logging';
  50. import {
  51. SubchannelAddress,
  52. isTcpSubchannelAddress,
  53. subchannelAddressToString,
  54. stringToSubchannelAddress,
  55. } from './subchannel-address';
  56. import {
  57. GrpcUri,
  58. combineHostPort,
  59. parseUri,
  60. splitHostPort,
  61. uriToString,
  62. } from './uri-parser';
  63. import {
  64. ChannelzCallTracker,
  65. ChannelzCallTrackerStub,
  66. ChannelzChildrenTracker,
  67. ChannelzChildrenTrackerStub,
  68. ChannelzTrace,
  69. ChannelzTraceStub,
  70. registerChannelzServer,
  71. registerChannelzSocket,
  72. ServerInfo,
  73. ServerRef,
  74. SocketInfo,
  75. SocketRef,
  76. TlsInfo,
  77. unregisterChannelzRef,
  78. } from './channelz';
  79. import { CipherNameAndProtocol, TLSSocket } from 'tls';
  80. import {
  81. ServerInterceptingCallInterface,
  82. ServerInterceptor,
  83. getServerInterceptingCall,
  84. } from './server-interceptors';
  85. import { PartialStatusObject } from './call-interface';
  86. import { CallEventTracker } from './transport';
  87. import { Socket } from 'net';
  88. import { Duplex } from 'stream';
  89. const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31);
  90. const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
  91. const KEEPALIVE_TIMEOUT_MS = 20000;
  92. const MAX_CONNECTION_IDLE_MS = ~(1 << 31);
  93. const { HTTP2_HEADER_PATH } = http2.constants;
  94. const TRACER_NAME = 'server';
  95. const kMaxAge = Buffer.from('max_age');
  96. type AnyHttp2Server = http2.Http2Server | http2.Http2SecureServer;
  97. interface BindResult {
  98. port: number;
  99. count: number;
  100. errors: string[];
  101. }
  102. interface SingleAddressBindResult {
  103. port: number;
  104. error?: string;
  105. }
  106. function noop(): void {}
  107. /**
  108. * Decorator to wrap a class method with util.deprecate
  109. * @param message The message to output if the deprecated method is called
  110. * @returns
  111. */
  112. function deprecate(message: string) {
  113. return function <This, Args extends any[], Return>(
  114. target: (this: This, ...args: Args) => Return,
  115. context: ClassMethodDecoratorContext<
  116. This,
  117. (this: This, ...args: Args) => Return
  118. >
  119. ) {
  120. return util.deprecate(target, message);
  121. };
  122. }
  123. function getUnimplementedStatusResponse(
  124. methodName: string
  125. ): PartialStatusObject {
  126. return {
  127. code: Status.UNIMPLEMENTED,
  128. details: `The server does not implement the method ${methodName}`,
  129. };
  130. }
  131. /* eslint-disable @typescript-eslint/no-explicit-any */
  132. type UntypedUnaryHandler = UnaryHandler<any, any>;
  133. type UntypedClientStreamingHandler = ClientStreamingHandler<any, any>;
  134. type UntypedServerStreamingHandler = ServerStreamingHandler<any, any>;
  135. type UntypedBidiStreamingHandler = BidiStreamingHandler<any, any>;
  136. export type UntypedHandleCall = HandleCall<any, any>;
  137. type UntypedHandler = Handler<any, any>;
  138. export interface UntypedServiceImplementation {
  139. [name: string]: UntypedHandleCall;
  140. }
  141. function getDefaultHandler(handlerType: HandlerType, methodName: string) {
  142. const unimplementedStatusResponse =
  143. getUnimplementedStatusResponse(methodName);
  144. switch (handlerType) {
  145. case 'unary':
  146. return (
  147. call: ServerUnaryCall<any, any>,
  148. callback: sendUnaryData<any>
  149. ) => {
  150. callback(unimplementedStatusResponse as ServiceError, null);
  151. };
  152. case 'clientStream':
  153. return (
  154. call: ServerReadableStream<any, any>,
  155. callback: sendUnaryData<any>
  156. ) => {
  157. callback(unimplementedStatusResponse as ServiceError, null);
  158. };
  159. case 'serverStream':
  160. return (call: ServerWritableStream<any, any>) => {
  161. call.emit('error', unimplementedStatusResponse);
  162. };
  163. case 'bidi':
  164. return (call: ServerDuplexStream<any, any>) => {
  165. call.emit('error', unimplementedStatusResponse);
  166. };
  167. default:
  168. throw new Error(`Invalid handlerType ${handlerType}`);
  169. }
  170. }
  171. interface ChannelzSessionInfo {
  172. ref: SocketRef;
  173. streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
  174. messagesSent: number;
  175. messagesReceived: number;
  176. keepAlivesSent: number;
  177. lastMessageSentTimestamp: Date | null;
  178. lastMessageReceivedTimestamp: Date | null;
  179. }
  180. /**
  181. * Information related to a single invocation of bindAsync. This should be
  182. * tracked in a map keyed by target string, normalized with a pass through
  183. * parseUri -> mapUriDefaultScheme -> uriToString. If the target has a port
  184. * number and the port number is 0, the target string is modified with the
  185. * concrete bound port.
  186. */
  187. interface BoundPort {
  188. /**
  189. * The key used to refer to this object in the boundPorts map.
  190. */
  191. mapKey: string;
  192. /**
  193. * The target string, passed through parseUri -> mapUriDefaultScheme. Used
  194. * to determine the final key when the port number is 0.
  195. */
  196. originalUri: GrpcUri;
  197. /**
  198. * If there is a pending bindAsync operation, this is a promise that resolves
  199. * with the port number when that operation succeeds. If there is no such
  200. * operation pending, this is null.
  201. */
  202. completionPromise: Promise<number> | null;
  203. /**
  204. * The port number that was actually bound. Populated only after
  205. * completionPromise resolves.
  206. */
  207. portNumber: number;
  208. /**
  209. * Set by unbind if called while pending is true.
  210. */
  211. cancelled: boolean;
  212. /**
  213. * The credentials object passed to the original bindAsync call.
  214. */
  215. credentials: ServerCredentials;
  216. /**
  217. * The set of servers associated with this listening port. A target string
  218. * that expands to multiple addresses will result in multiple listening
  219. * servers.
  220. */
  221. listeningServers: Set<AnyHttp2Server>;
  222. }
  223. /**
  224. * Should be in a map keyed by AnyHttp2Server.
  225. */
  226. interface Http2ServerInfo {
  227. channelzRef: SocketRef;
  228. sessions: Set<http2.ServerHttp2Session>;
  229. }
  230. interface SessionIdleTimeoutTracker {
  231. activeStreams: number;
  232. lastIdle: number;
  233. timeout: NodeJS.Timeout;
  234. onClose: (session: http2.ServerHttp2Session) => void | null;
  235. }
  236. export interface ServerOptions extends ChannelOptions {
  237. interceptors?: ServerInterceptor[];
  238. }
  239. export interface ConnectionInjector {
  240. injectConnection(connection: Duplex): void;
  241. drain(graceTimeMs: number): void;
  242. destroy(): void;
  243. }
  244. export class Server {
  245. private boundPorts: Map<string, BoundPort> = new Map();
  246. private http2Servers: Map<AnyHttp2Server, Http2ServerInfo> = new Map();
  247. private sessionIdleTimeouts = new Map<
  248. http2.ServerHttp2Session,
  249. SessionIdleTimeoutTracker
  250. >();
  251. private handlers: Map<string, UntypedHandler> = new Map<
  252. string,
  253. UntypedHandler
  254. >();
  255. private sessions = new Map<http2.ServerHttp2Session, ChannelzSessionInfo>();
  256. /**
  257. * This field only exists to ensure that the start method throws an error if
  258. * it is called twice, as it did previously.
  259. */
  260. private started = false;
  261. private shutdown = false;
  262. private options: ServerOptions;
  263. private serverAddressString = 'null';
  264. // Channelz Info
  265. private readonly channelzEnabled: boolean = true;
  266. private channelzRef: ServerRef;
  267. private channelzTrace: ChannelzTrace | ChannelzTraceStub;
  268. private callTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
  269. private listenerChildrenTracker:
  270. | ChannelzChildrenTracker
  271. | ChannelzChildrenTrackerStub;
  272. private sessionChildrenTracker:
  273. | ChannelzChildrenTracker
  274. | ChannelzChildrenTrackerStub;
  275. private readonly maxConnectionAgeMs: number;
  276. private readonly maxConnectionAgeGraceMs: number;
  277. private readonly keepaliveTimeMs: number;
  278. private readonly keepaliveTimeoutMs: number;
  279. private readonly sessionIdleTimeout: number;
  280. private readonly interceptors: ServerInterceptor[];
  281. /**
  282. * Options that will be used to construct all Http2Server instances for this
  283. * Server.
  284. */
  285. private commonServerOptions: http2.ServerOptions;
  286. constructor(options?: ServerOptions) {
  287. this.options = options ?? {};
  288. if (this.options['grpc.enable_channelz'] === 0) {
  289. this.channelzEnabled = false;
  290. this.channelzTrace = new ChannelzTraceStub();
  291. this.callTracker = new ChannelzCallTrackerStub();
  292. this.listenerChildrenTracker = new ChannelzChildrenTrackerStub();
  293. this.sessionChildrenTracker = new ChannelzChildrenTrackerStub();
  294. } else {
  295. this.channelzTrace = new ChannelzTrace();
  296. this.callTracker = new ChannelzCallTracker();
  297. this.listenerChildrenTracker = new ChannelzChildrenTracker();
  298. this.sessionChildrenTracker = new ChannelzChildrenTracker();
  299. }
  300. this.channelzRef = registerChannelzServer(
  301. 'server',
  302. () => this.getChannelzInfo(),
  303. this.channelzEnabled
  304. );
  305. this.channelzTrace.addTrace('CT_INFO', 'Server created');
  306. this.maxConnectionAgeMs =
  307. this.options['grpc.max_connection_age_ms'] ?? UNLIMITED_CONNECTION_AGE_MS;
  308. this.maxConnectionAgeGraceMs =
  309. this.options['grpc.max_connection_age_grace_ms'] ??
  310. UNLIMITED_CONNECTION_AGE_MS;
  311. this.keepaliveTimeMs =
  312. this.options['grpc.keepalive_time_ms'] ?? KEEPALIVE_MAX_TIME_MS;
  313. this.keepaliveTimeoutMs =
  314. this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS;
  315. this.sessionIdleTimeout =
  316. this.options['grpc.max_connection_idle_ms'] ?? MAX_CONNECTION_IDLE_MS;
  317. this.commonServerOptions = {
  318. maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER,
  319. };
  320. if ('grpc-node.max_session_memory' in this.options) {
  321. this.commonServerOptions.maxSessionMemory =
  322. this.options['grpc-node.max_session_memory'];
  323. } else {
  324. /* By default, set a very large max session memory limit, to effectively
  325. * disable enforcement of the limit. Some testing indicates that Node's
  326. * behavior degrades badly when this limit is reached, so we solve that
  327. * by disabling the check entirely. */
  328. this.commonServerOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
  329. }
  330. if ('grpc.max_concurrent_streams' in this.options) {
  331. this.commonServerOptions.settings = {
  332. maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
  333. };
  334. }
  335. this.interceptors = this.options.interceptors ?? [];
  336. this.trace('Server constructed');
  337. }
  338. private getChannelzInfo(): ServerInfo {
  339. return {
  340. trace: this.channelzTrace,
  341. callTracker: this.callTracker,
  342. listenerChildren: this.listenerChildrenTracker.getChildLists(),
  343. sessionChildren: this.sessionChildrenTracker.getChildLists(),
  344. };
  345. }
  346. private getChannelzSessionInfo(
  347. session: http2.ServerHttp2Session
  348. ): SocketInfo {
  349. const sessionInfo = this.sessions.get(session)!;
  350. const sessionSocket = session.socket;
  351. const remoteAddress = sessionSocket.remoteAddress
  352. ? stringToSubchannelAddress(
  353. sessionSocket.remoteAddress,
  354. sessionSocket.remotePort
  355. )
  356. : null;
  357. const localAddress = sessionSocket.localAddress
  358. ? stringToSubchannelAddress(
  359. sessionSocket.localAddress!,
  360. sessionSocket.localPort
  361. )
  362. : null;
  363. let tlsInfo: TlsInfo | null;
  364. if (session.encrypted) {
  365. const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
  366. const cipherInfo: CipherNameAndProtocol & { standardName?: string } =
  367. tlsSocket.getCipher();
  368. const certificate = tlsSocket.getCertificate();
  369. const peerCertificate = tlsSocket.getPeerCertificate();
  370. tlsInfo = {
  371. cipherSuiteStandardName: cipherInfo.standardName ?? null,
  372. cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
  373. localCertificate:
  374. certificate && 'raw' in certificate ? certificate.raw : null,
  375. remoteCertificate:
  376. peerCertificate && 'raw' in peerCertificate
  377. ? peerCertificate.raw
  378. : null,
  379. };
  380. } else {
  381. tlsInfo = null;
  382. }
  383. const socketInfo: SocketInfo = {
  384. remoteAddress: remoteAddress,
  385. localAddress: localAddress,
  386. security: tlsInfo,
  387. remoteName: null,
  388. streamsStarted: sessionInfo.streamTracker.callsStarted,
  389. streamsSucceeded: sessionInfo.streamTracker.callsSucceeded,
  390. streamsFailed: sessionInfo.streamTracker.callsFailed,
  391. messagesSent: sessionInfo.messagesSent,
  392. messagesReceived: sessionInfo.messagesReceived,
  393. keepAlivesSent: sessionInfo.keepAlivesSent,
  394. lastLocalStreamCreatedTimestamp: null,
  395. lastRemoteStreamCreatedTimestamp:
  396. sessionInfo.streamTracker.lastCallStartedTimestamp,
  397. lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp,
  398. lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp,
  399. localFlowControlWindow: session.state.localWindowSize ?? null,
  400. remoteFlowControlWindow: session.state.remoteWindowSize ?? null,
  401. };
  402. return socketInfo;
  403. }
  404. private trace(text: string): void {
  405. logging.trace(
  406. LogVerbosity.DEBUG,
  407. TRACER_NAME,
  408. '(' + this.channelzRef.id + ') ' + text
  409. );
  410. }
  411. private keepaliveTrace(text: string): void {
  412. logging.trace(
  413. LogVerbosity.DEBUG,
  414. 'keepalive',
  415. '(' + this.channelzRef.id + ') ' + text
  416. );
  417. }
  418. addProtoService(): never {
  419. throw new Error('Not implemented. Use addService() instead');
  420. }
  421. addService(
  422. service: ServiceDefinition,
  423. implementation: UntypedServiceImplementation
  424. ): void {
  425. if (
  426. service === null ||
  427. typeof service !== 'object' ||
  428. implementation === null ||
  429. typeof implementation !== 'object'
  430. ) {
  431. throw new Error('addService() requires two objects as arguments');
  432. }
  433. const serviceKeys = Object.keys(service);
  434. if (serviceKeys.length === 0) {
  435. throw new Error('Cannot add an empty service to a server');
  436. }
  437. serviceKeys.forEach(name => {
  438. const attrs = service[name];
  439. let methodType: HandlerType;
  440. if (attrs.requestStream) {
  441. if (attrs.responseStream) {
  442. methodType = 'bidi';
  443. } else {
  444. methodType = 'clientStream';
  445. }
  446. } else {
  447. if (attrs.responseStream) {
  448. methodType = 'serverStream';
  449. } else {
  450. methodType = 'unary';
  451. }
  452. }
  453. let implFn = implementation[name];
  454. let impl;
  455. if (implFn === undefined && typeof attrs.originalName === 'string') {
  456. implFn = implementation[attrs.originalName];
  457. }
  458. if (implFn !== undefined) {
  459. impl = implFn.bind(implementation);
  460. } else {
  461. impl = getDefaultHandler(methodType, name);
  462. }
  463. const success = this.register(
  464. attrs.path,
  465. impl as UntypedHandleCall,
  466. attrs.responseSerialize,
  467. attrs.requestDeserialize,
  468. methodType
  469. );
  470. if (success === false) {
  471. throw new Error(`Method handler for ${attrs.path} already provided.`);
  472. }
  473. });
  474. }
  475. removeService(service: ServiceDefinition): void {
  476. if (service === null || typeof service !== 'object') {
  477. throw new Error('removeService() requires object as argument');
  478. }
  479. const serviceKeys = Object.keys(service);
  480. serviceKeys.forEach(name => {
  481. const attrs = service[name];
  482. this.unregister(attrs.path);
  483. });
  484. }
  485. bind(port: string, creds: ServerCredentials): never {
  486. throw new Error('Not implemented. Use bindAsync() instead');
  487. }
  488. private registerListenerToChannelz(boundAddress: SubchannelAddress) {
  489. return registerChannelzSocket(
  490. subchannelAddressToString(boundAddress),
  491. () => {
  492. return {
  493. localAddress: boundAddress,
  494. remoteAddress: null,
  495. security: null,
  496. remoteName: null,
  497. streamsStarted: 0,
  498. streamsSucceeded: 0,
  499. streamsFailed: 0,
  500. messagesSent: 0,
  501. messagesReceived: 0,
  502. keepAlivesSent: 0,
  503. lastLocalStreamCreatedTimestamp: null,
  504. lastRemoteStreamCreatedTimestamp: null,
  505. lastMessageSentTimestamp: null,
  506. lastMessageReceivedTimestamp: null,
  507. localFlowControlWindow: null,
  508. remoteFlowControlWindow: null,
  509. };
  510. },
  511. this.channelzEnabled
  512. );
  513. }
  514. private createHttp2Server(credentials: ServerCredentials) {
  515. let http2Server: http2.Http2Server | http2.Http2SecureServer;
  516. if (credentials._isSecure()) {
  517. const credentialsSettings = credentials._getSettings();
  518. const secureServerOptions: http2.SecureServerOptions = {
  519. ...this.commonServerOptions,
  520. ...credentialsSettings,
  521. enableTrace: this.options['grpc-node.tls_enable_trace'] === 1
  522. };
  523. let areCredentialsValid = credentialsSettings !== null;
  524. http2Server = http2.createSecureServer(secureServerOptions);
  525. http2Server.on('connection', (socket: Socket) => {
  526. if (!areCredentialsValid) {
  527. socket.destroy();
  528. }
  529. });
  530. http2Server.on('secureConnection', (socket: TLSSocket) => {
  531. /* These errors need to be handled by the user of Http2SecureServer,
  532. * according to https://github.com/nodejs/node/issues/35824 */
  533. socket.on('error', (e: Error) => {
  534. this.trace(
  535. 'An incoming TLS connection closed with error: ' + e.message
  536. );
  537. });
  538. });
  539. const credsWatcher: SecureContextWatcher = options => {
  540. if (options) {
  541. (http2Server as http2.Http2SecureServer).setSecureContext(options);
  542. }
  543. areCredentialsValid = options !== null;
  544. }
  545. credentials._addWatcher(credsWatcher);
  546. http2Server.on('close', () => {
  547. credentials._removeWatcher(credsWatcher);
  548. });
  549. } else {
  550. http2Server = http2.createServer(this.commonServerOptions);
  551. }
  552. http2Server.setTimeout(0, noop);
  553. this._setupHandlers(http2Server, credentials._getInterceptors());
  554. return http2Server;
  555. }
  556. private bindOneAddress(
  557. address: SubchannelAddress,
  558. boundPortObject: BoundPort
  559. ): Promise<SingleAddressBindResult> {
  560. this.trace('Attempting to bind ' + subchannelAddressToString(address));
  561. const http2Server = this.createHttp2Server(boundPortObject.credentials);
  562. return new Promise<SingleAddressBindResult>((resolve, reject) => {
  563. const onError = (err: Error) => {
  564. this.trace(
  565. 'Failed to bind ' +
  566. subchannelAddressToString(address) +
  567. ' with error ' +
  568. err.message
  569. );
  570. resolve({
  571. port: 'port' in address ? address.port : 1,
  572. error: err.message,
  573. });
  574. };
  575. http2Server.once('error', onError);
  576. http2Server.listen(address, () => {
  577. const boundAddress = http2Server.address()!;
  578. let boundSubchannelAddress: SubchannelAddress;
  579. if (typeof boundAddress === 'string') {
  580. boundSubchannelAddress = {
  581. path: boundAddress,
  582. };
  583. } else {
  584. boundSubchannelAddress = {
  585. host: boundAddress.address,
  586. port: boundAddress.port,
  587. };
  588. }
  589. const channelzRef = this.registerListenerToChannelz(
  590. boundSubchannelAddress
  591. );
  592. this.listenerChildrenTracker.refChild(channelzRef);
  593. this.http2Servers.set(http2Server, {
  594. channelzRef: channelzRef,
  595. sessions: new Set(),
  596. });
  597. boundPortObject.listeningServers.add(http2Server);
  598. this.trace(
  599. 'Successfully bound ' +
  600. subchannelAddressToString(boundSubchannelAddress)
  601. );
  602. resolve({
  603. port:
  604. 'port' in boundSubchannelAddress ? boundSubchannelAddress.port : 1,
  605. });
  606. http2Server.removeListener('error', onError);
  607. });
  608. });
  609. }
  610. private async bindManyPorts(
  611. addressList: SubchannelAddress[],
  612. boundPortObject: BoundPort
  613. ): Promise<BindResult> {
  614. if (addressList.length === 0) {
  615. return {
  616. count: 0,
  617. port: 0,
  618. errors: [],
  619. };
  620. }
  621. if (isTcpSubchannelAddress(addressList[0]) && addressList[0].port === 0) {
  622. /* If binding to port 0, first try to bind the first address, then bind
  623. * the rest of the address list to the specific port that it binds. */
  624. const firstAddressResult = await this.bindOneAddress(
  625. addressList[0],
  626. boundPortObject
  627. );
  628. if (firstAddressResult.error) {
  629. /* If the first address fails to bind, try the same operation starting
  630. * from the second item in the list. */
  631. const restAddressResult = await this.bindManyPorts(
  632. addressList.slice(1),
  633. boundPortObject
  634. );
  635. return {
  636. ...restAddressResult,
  637. errors: [firstAddressResult.error, ...restAddressResult.errors],
  638. };
  639. } else {
  640. const restAddresses = addressList
  641. .slice(1)
  642. .map(address =>
  643. isTcpSubchannelAddress(address)
  644. ? { host: address.host, port: firstAddressResult.port }
  645. : address
  646. );
  647. const restAddressResult = await Promise.all(
  648. restAddresses.map(address =>
  649. this.bindOneAddress(address, boundPortObject)
  650. )
  651. );
  652. const allResults = [firstAddressResult, ...restAddressResult];
  653. return {
  654. count: allResults.filter(result => result.error === undefined).length,
  655. port: firstAddressResult.port,
  656. errors: allResults
  657. .filter(result => result.error)
  658. .map(result => result.error!),
  659. };
  660. }
  661. } else {
  662. const allResults = await Promise.all(
  663. addressList.map(address =>
  664. this.bindOneAddress(address, boundPortObject)
  665. )
  666. );
  667. return {
  668. count: allResults.filter(result => result.error === undefined).length,
  669. port: allResults[0].port,
  670. errors: allResults
  671. .filter(result => result.error)
  672. .map(result => result.error!),
  673. };
  674. }
  675. }
  676. private async bindAddressList(
  677. addressList: SubchannelAddress[],
  678. boundPortObject: BoundPort
  679. ): Promise<number> {
  680. const bindResult = await this.bindManyPorts(addressList, boundPortObject);
  681. if (bindResult.count > 0) {
  682. if (bindResult.count < addressList.length) {
  683. logging.log(
  684. LogVerbosity.INFO,
  685. `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
  686. );
  687. }
  688. return bindResult.port;
  689. } else {
  690. const errorString = `No address added out of total ${addressList.length} resolved`;
  691. logging.log(LogVerbosity.ERROR, errorString);
  692. throw new Error(
  693. `${errorString} errors: [${bindResult.errors.join(',')}]`
  694. );
  695. }
  696. }
  697. private resolvePort(port: GrpcUri): Promise<SubchannelAddress[]> {
  698. return new Promise<SubchannelAddress[]>((resolve, reject) => {
  699. const resolverListener: ResolverListener = {
  700. onSuccessfulResolution: (
  701. endpointList,
  702. serviceConfig,
  703. serviceConfigError
  704. ) => {
  705. // We only want one resolution result. Discard all future results
  706. resolverListener.onSuccessfulResolution = () => {};
  707. const addressList = ([] as SubchannelAddress[]).concat(
  708. ...endpointList.map(endpoint => endpoint.addresses)
  709. );
  710. if (addressList.length === 0) {
  711. reject(new Error(`No addresses resolved for port ${port}`));
  712. return;
  713. }
  714. resolve(addressList);
  715. },
  716. onError: error => {
  717. reject(new Error(error.details));
  718. },
  719. };
  720. const resolver = createResolver(port, resolverListener, this.options);
  721. resolver.updateResolution();
  722. });
  723. }
  724. private async bindPort(
  725. port: GrpcUri,
  726. boundPortObject: BoundPort
  727. ): Promise<number> {
  728. const addressList = await this.resolvePort(port);
  729. if (boundPortObject.cancelled) {
  730. this.completeUnbind(boundPortObject);
  731. throw new Error('bindAsync operation cancelled by unbind call');
  732. }
  733. const portNumber = await this.bindAddressList(addressList, boundPortObject);
  734. if (boundPortObject.cancelled) {
  735. this.completeUnbind(boundPortObject);
  736. throw new Error('bindAsync operation cancelled by unbind call');
  737. }
  738. return portNumber;
  739. }
  740. private normalizePort(port: string): GrpcUri {
  741. const initialPortUri = parseUri(port);
  742. if (initialPortUri === null) {
  743. throw new Error(`Could not parse port "${port}"`);
  744. }
  745. const portUri = mapUriDefaultScheme(initialPortUri);
  746. if (portUri === null) {
  747. throw new Error(`Could not get a default scheme for port "${port}"`);
  748. }
  749. return portUri;
  750. }
  751. bindAsync(
  752. port: string,
  753. creds: ServerCredentials,
  754. callback: (error: Error | null, port: number) => void
  755. ): void {
  756. if (this.shutdown) {
  757. throw new Error('bindAsync called after shutdown');
  758. }
  759. if (typeof port !== 'string') {
  760. throw new TypeError('port must be a string');
  761. }
  762. if (creds === null || !(creds instanceof ServerCredentials)) {
  763. throw new TypeError('creds must be a ServerCredentials object');
  764. }
  765. if (typeof callback !== 'function') {
  766. throw new TypeError('callback must be a function');
  767. }
  768. this.trace('bindAsync port=' + port);
  769. const portUri = this.normalizePort(port);
  770. const deferredCallback = (error: Error | null, port: number) => {
  771. process.nextTick(() => callback(error, port));
  772. };
  773. /* First, if this port is already bound or that bind operation is in
  774. * progress, use that result. */
  775. let boundPortObject = this.boundPorts.get(uriToString(portUri));
  776. if (boundPortObject) {
  777. if (!creds._equals(boundPortObject.credentials)) {
  778. deferredCallback(
  779. new Error(`${port} already bound with incompatible credentials`),
  780. 0
  781. );
  782. return;
  783. }
  784. /* If that operation has previously been cancelled by an unbind call,
  785. * uncancel it. */
  786. boundPortObject.cancelled = false;
  787. if (boundPortObject.completionPromise) {
  788. boundPortObject.completionPromise.then(
  789. portNum => callback(null, portNum),
  790. error => callback(error as Error, 0)
  791. );
  792. } else {
  793. deferredCallback(null, boundPortObject.portNumber);
  794. }
  795. return;
  796. }
  797. boundPortObject = {
  798. mapKey: uriToString(portUri),
  799. originalUri: portUri,
  800. completionPromise: null,
  801. cancelled: false,
  802. portNumber: 0,
  803. credentials: creds,
  804. listeningServers: new Set(),
  805. };
  806. const splitPort = splitHostPort(portUri.path);
  807. const completionPromise = this.bindPort(portUri, boundPortObject);
  808. boundPortObject.completionPromise = completionPromise;
  809. /* If the port number is 0, defer populating the map entry until after the
  810. * bind operation completes and we have a specific port number. Otherwise,
  811. * populate it immediately. */
  812. if (splitPort?.port === 0) {
  813. completionPromise.then(
  814. portNum => {
  815. const finalUri: GrpcUri = {
  816. scheme: portUri.scheme,
  817. authority: portUri.authority,
  818. path: combineHostPort({ host: splitPort.host, port: portNum }),
  819. };
  820. boundPortObject!.mapKey = uriToString(finalUri);
  821. boundPortObject!.completionPromise = null;
  822. boundPortObject!.portNumber = portNum;
  823. this.boundPorts.set(boundPortObject!.mapKey, boundPortObject!);
  824. callback(null, portNum);
  825. },
  826. error => {
  827. callback(error, 0);
  828. }
  829. );
  830. } else {
  831. this.boundPorts.set(boundPortObject.mapKey, boundPortObject);
  832. completionPromise.then(
  833. portNum => {
  834. boundPortObject!.completionPromise = null;
  835. boundPortObject!.portNumber = portNum;
  836. callback(null, portNum);
  837. },
  838. error => {
  839. callback(error, 0);
  840. }
  841. );
  842. }
  843. }
  844. private registerInjectorToChannelz() {
  845. return registerChannelzSocket(
  846. 'injector',
  847. () => {
  848. return {
  849. localAddress: null,
  850. remoteAddress: null,
  851. security: null,
  852. remoteName: null,
  853. streamsStarted: 0,
  854. streamsSucceeded: 0,
  855. streamsFailed: 0,
  856. messagesSent: 0,
  857. messagesReceived: 0,
  858. keepAlivesSent: 0,
  859. lastLocalStreamCreatedTimestamp: null,
  860. lastRemoteStreamCreatedTimestamp: null,
  861. lastMessageSentTimestamp: null,
  862. lastMessageReceivedTimestamp: null,
  863. localFlowControlWindow: null,
  864. remoteFlowControlWindow: null,
  865. };
  866. },
  867. this.channelzEnabled
  868. );
  869. }
  870. createConnectionInjector(credentials: ServerCredentials): ConnectionInjector {
  871. if (credentials === null || !(credentials instanceof ServerCredentials)) {
  872. throw new TypeError('creds must be a ServerCredentials object');
  873. }
  874. const server = this.createHttp2Server(credentials);
  875. const channelzRef = this.registerInjectorToChannelz();
  876. if (this.channelzEnabled) {
  877. this.listenerChildrenTracker.refChild(channelzRef);
  878. }
  879. const sessionsSet: Set<http2.ServerHttp2Session> = new Set();
  880. this.http2Servers.set(server, {
  881. channelzRef: channelzRef,
  882. sessions: sessionsSet
  883. });
  884. return {
  885. injectConnection: (connection: Duplex) => {
  886. server.emit('connection', connection);
  887. },
  888. drain: (graceTimeMs: number) => {
  889. for (const session of sessionsSet) {
  890. this.closeSession(session);
  891. }
  892. setTimeout(() => {
  893. for (const session of sessionsSet) {
  894. session.destroy(http2.constants.NGHTTP2_CANCEL as any);
  895. }
  896. }, graceTimeMs).unref?.();
  897. },
  898. destroy: () => {
  899. this.closeServer(server)
  900. for (const session of sessionsSet) {
  901. this.closeSession(session);
  902. }
  903. }
  904. };
  905. }
  906. private closeServer(server: AnyHttp2Server, callback?: () => void) {
  907. this.trace(
  908. 'Closing server with address ' + JSON.stringify(server.address())
  909. );
  910. const serverInfo = this.http2Servers.get(server);
  911. server.close(() => {
  912. if (serverInfo) {
  913. this.listenerChildrenTracker.unrefChild(serverInfo.channelzRef);
  914. unregisterChannelzRef(serverInfo.channelzRef);
  915. }
  916. this.http2Servers.delete(server);
  917. callback?.();
  918. });
  919. }
  920. private closeSession(
  921. session: http2.ServerHttp2Session,
  922. callback?: () => void
  923. ) {
  924. this.trace('Closing session initiated by ' + session.socket?.remoteAddress);
  925. const sessionInfo = this.sessions.get(session);
  926. const closeCallback = () => {
  927. if (sessionInfo) {
  928. this.sessionChildrenTracker.unrefChild(sessionInfo.ref);
  929. unregisterChannelzRef(sessionInfo.ref);
  930. }
  931. callback?.();
  932. };
  933. if (session.closed) {
  934. queueMicrotask(closeCallback);
  935. } else {
  936. session.close(closeCallback);
  937. }
  938. }
  939. private completeUnbind(boundPortObject: BoundPort) {
  940. for (const server of boundPortObject.listeningServers) {
  941. const serverInfo = this.http2Servers.get(server);
  942. this.closeServer(server, () => {
  943. boundPortObject.listeningServers.delete(server);
  944. });
  945. if (serverInfo) {
  946. for (const session of serverInfo.sessions) {
  947. this.closeSession(session);
  948. }
  949. }
  950. }
  951. this.boundPorts.delete(boundPortObject.mapKey);
  952. }
  953. /**
  954. * Unbind a previously bound port, or cancel an in-progress bindAsync
  955. * operation. If port 0 was bound, only the actual bound port can be
  956. * unbound. For example, if bindAsync was called with "localhost:0" and the
  957. * bound port result was 54321, it can be unbound as "localhost:54321".
  958. * @param port
  959. */
  960. unbind(port: string): void {
  961. this.trace('unbind port=' + port);
  962. const portUri = this.normalizePort(port);
  963. const splitPort = splitHostPort(portUri.path);
  964. if (splitPort?.port === 0) {
  965. throw new Error('Cannot unbind port 0');
  966. }
  967. const boundPortObject = this.boundPorts.get(uriToString(portUri));
  968. if (boundPortObject) {
  969. this.trace(
  970. 'unbinding ' +
  971. boundPortObject.mapKey +
  972. ' originally bound as ' +
  973. uriToString(boundPortObject.originalUri)
  974. );
  975. /* If the bind operation is pending, the cancelled flag will trigger
  976. * the unbind operation later. */
  977. if (boundPortObject.completionPromise) {
  978. boundPortObject.cancelled = true;
  979. } else {
  980. this.completeUnbind(boundPortObject);
  981. }
  982. }
  983. }
  984. /**
  985. * Gracefully close all connections associated with a previously bound port.
  986. * After the grace time, forcefully close all remaining open connections.
  987. *
  988. * If port 0 was bound, only the actual bound port can be
  989. * drained. For example, if bindAsync was called with "localhost:0" and the
  990. * bound port result was 54321, it can be drained as "localhost:54321".
  991. * @param port
  992. * @param graceTimeMs
  993. * @returns
  994. */
  995. drain(port: string, graceTimeMs: number): void {
  996. this.trace('drain port=' + port + ' graceTimeMs=' + graceTimeMs);
  997. const portUri = this.normalizePort(port);
  998. const splitPort = splitHostPort(portUri.path);
  999. if (splitPort?.port === 0) {
  1000. throw new Error('Cannot drain port 0');
  1001. }
  1002. const boundPortObject = this.boundPorts.get(uriToString(portUri));
  1003. if (!boundPortObject) {
  1004. return;
  1005. }
  1006. const allSessions: Set<http2.Http2Session> = new Set();
  1007. for (const http2Server of boundPortObject.listeningServers) {
  1008. const serverEntry = this.http2Servers.get(http2Server);
  1009. if (serverEntry) {
  1010. for (const session of serverEntry.sessions) {
  1011. allSessions.add(session);
  1012. this.closeSession(session, () => {
  1013. allSessions.delete(session);
  1014. });
  1015. }
  1016. }
  1017. }
  1018. /* After the grace time ends, send another goaway to all remaining sessions
  1019. * with the CANCEL code. */
  1020. setTimeout(() => {
  1021. for (const session of allSessions) {
  1022. session.destroy(http2.constants.NGHTTP2_CANCEL as any);
  1023. }
  1024. }, graceTimeMs).unref?.();
  1025. }
  1026. forceShutdown(): void {
  1027. for (const boundPortObject of this.boundPorts.values()) {
  1028. boundPortObject.cancelled = true;
  1029. }
  1030. this.boundPorts.clear();
  1031. // Close the server if it is still running.
  1032. for (const server of this.http2Servers.keys()) {
  1033. this.closeServer(server);
  1034. }
  1035. // Always destroy any available sessions. It's possible that one or more
  1036. // tryShutdown() calls are in progress. Don't wait on them to finish.
  1037. this.sessions.forEach((channelzInfo, session) => {
  1038. this.closeSession(session);
  1039. // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
  1040. // recognize destroy(code) as a valid signature.
  1041. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  1042. session.destroy(http2.constants.NGHTTP2_CANCEL as any);
  1043. });
  1044. this.sessions.clear();
  1045. unregisterChannelzRef(this.channelzRef);
  1046. this.shutdown = true;
  1047. }
  1048. register<RequestType, ResponseType>(
  1049. name: string,
  1050. handler: HandleCall<RequestType, ResponseType>,
  1051. serialize: Serialize<ResponseType>,
  1052. deserialize: Deserialize<RequestType>,
  1053. type: string
  1054. ): boolean {
  1055. if (this.handlers.has(name)) {
  1056. return false;
  1057. }
  1058. this.handlers.set(name, {
  1059. func: handler,
  1060. serialize,
  1061. deserialize,
  1062. type,
  1063. path: name,
  1064. } as UntypedHandler);
  1065. return true;
  1066. }
  1067. unregister(name: string): boolean {
  1068. return this.handlers.delete(name);
  1069. }
  1070. /**
  1071. * @deprecated No longer needed as of version 1.10.x
  1072. */
  1073. @deprecate(
  1074. 'Calling start() is no longer necessary. It can be safely omitted.'
  1075. )
  1076. start(): void {
  1077. if (
  1078. this.http2Servers.size === 0 ||
  1079. [...this.http2Servers.keys()].every(server => !server.listening)
  1080. ) {
  1081. throw new Error('server must be bound in order to start');
  1082. }
  1083. if (this.started === true) {
  1084. throw new Error('server is already started');
  1085. }
  1086. this.started = true;
  1087. }
  1088. tryShutdown(callback: (error?: Error) => void): void {
  1089. const wrappedCallback = (error?: Error) => {
  1090. unregisterChannelzRef(this.channelzRef);
  1091. callback(error);
  1092. };
  1093. let pendingChecks = 0;
  1094. function maybeCallback(): void {
  1095. pendingChecks--;
  1096. if (pendingChecks === 0) {
  1097. wrappedCallback();
  1098. }
  1099. }
  1100. this.shutdown = true;
  1101. for (const [serverKey, server] of this.http2Servers.entries()) {
  1102. pendingChecks++;
  1103. const serverString = server.channelzRef.name;
  1104. this.trace('Waiting for server ' + serverString + ' to close');
  1105. this.closeServer(serverKey, () => {
  1106. this.trace('Server ' + serverString + ' finished closing');
  1107. maybeCallback();
  1108. });
  1109. for (const session of server.sessions.keys()) {
  1110. pendingChecks++;
  1111. const sessionString = session.socket?.remoteAddress;
  1112. this.trace('Waiting for session ' + sessionString + ' to close');
  1113. this.closeSession(session, () => {
  1114. this.trace('Session ' + sessionString + ' finished closing');
  1115. maybeCallback();
  1116. });
  1117. }
  1118. }
  1119. if (pendingChecks === 0) {
  1120. wrappedCallback();
  1121. }
  1122. }
  1123. addHttp2Port(): never {
  1124. throw new Error('Not yet implemented');
  1125. }
  1126. /**
  1127. * Get the channelz reference object for this server. The returned value is
  1128. * garbage if channelz is disabled for this server.
  1129. * @returns
  1130. */
  1131. getChannelzRef() {
  1132. return this.channelzRef;
  1133. }
  1134. private _verifyContentType(
  1135. stream: http2.ServerHttp2Stream,
  1136. headers: http2.IncomingHttpHeaders
  1137. ): boolean {
  1138. const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
  1139. if (
  1140. typeof contentType !== 'string' ||
  1141. !contentType.startsWith('application/grpc')
  1142. ) {
  1143. stream.respond(
  1144. {
  1145. [http2.constants.HTTP2_HEADER_STATUS]:
  1146. http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
  1147. },
  1148. { endStream: true }
  1149. );
  1150. return false;
  1151. }
  1152. return true;
  1153. }
  1154. private _retrieveHandler(path: string): Handler<any, any> | null {
  1155. this.trace(
  1156. 'Received call to method ' +
  1157. path +
  1158. ' at address ' +
  1159. this.serverAddressString
  1160. );
  1161. const handler = this.handlers.get(path);
  1162. if (handler === undefined) {
  1163. this.trace(
  1164. 'No handler registered for method ' +
  1165. path +
  1166. '. Sending UNIMPLEMENTED status.'
  1167. );
  1168. return null;
  1169. }
  1170. return handler;
  1171. }
  1172. private _respondWithError(
  1173. err: PartialStatusObject,
  1174. stream: http2.ServerHttp2Stream,
  1175. channelzSessionInfo: ChannelzSessionInfo | null = null
  1176. ) {
  1177. const trailersToSend = {
  1178. 'grpc-status': err.code ?? Status.INTERNAL,
  1179. 'grpc-message': err.details,
  1180. [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
  1181. [http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
  1182. ...err.metadata?.toHttp2Headers(),
  1183. };
  1184. stream.respond(trailersToSend, { endStream: true });
  1185. this.callTracker.addCallFailed();
  1186. channelzSessionInfo?.streamTracker.addCallFailed();
  1187. }
  1188. private _channelzHandler(
  1189. extraInterceptors: ServerInterceptor[],
  1190. stream: http2.ServerHttp2Stream,
  1191. headers: http2.IncomingHttpHeaders
  1192. ) {
  1193. // for handling idle timeout
  1194. this.onStreamOpened(stream);
  1195. const channelzSessionInfo = this.sessions.get(
  1196. stream.session as http2.ServerHttp2Session
  1197. );
  1198. this.callTracker.addCallStarted();
  1199. channelzSessionInfo?.streamTracker.addCallStarted();
  1200. if (!this._verifyContentType(stream, headers)) {
  1201. this.callTracker.addCallFailed();
  1202. channelzSessionInfo?.streamTracker.addCallFailed();
  1203. return;
  1204. }
  1205. const path = headers[HTTP2_HEADER_PATH] as string;
  1206. const handler = this._retrieveHandler(path);
  1207. if (!handler) {
  1208. this._respondWithError(
  1209. getUnimplementedStatusResponse(path),
  1210. stream,
  1211. channelzSessionInfo
  1212. );
  1213. return;
  1214. }
  1215. const callEventTracker: CallEventTracker = {
  1216. addMessageSent: () => {
  1217. if (channelzSessionInfo) {
  1218. channelzSessionInfo.messagesSent += 1;
  1219. channelzSessionInfo.lastMessageSentTimestamp = new Date();
  1220. }
  1221. },
  1222. addMessageReceived: () => {
  1223. if (channelzSessionInfo) {
  1224. channelzSessionInfo.messagesReceived += 1;
  1225. channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
  1226. }
  1227. },
  1228. onCallEnd: status => {
  1229. if (status.code === Status.OK) {
  1230. this.callTracker.addCallSucceeded();
  1231. } else {
  1232. this.callTracker.addCallFailed();
  1233. }
  1234. },
  1235. onStreamEnd: success => {
  1236. if (channelzSessionInfo) {
  1237. if (success) {
  1238. channelzSessionInfo.streamTracker.addCallSucceeded();
  1239. } else {
  1240. channelzSessionInfo.streamTracker.addCallFailed();
  1241. }
  1242. }
  1243. },
  1244. };
  1245. const call = getServerInterceptingCall(
  1246. [...extraInterceptors, ...this.interceptors],
  1247. stream,
  1248. headers,
  1249. callEventTracker,
  1250. handler,
  1251. this.options
  1252. );
  1253. if (!this._runHandlerForCall(call, handler)) {
  1254. this.callTracker.addCallFailed();
  1255. channelzSessionInfo?.streamTracker.addCallFailed();
  1256. call.sendStatus({
  1257. code: Status.INTERNAL,
  1258. details: `Unknown handler type: ${handler.type}`,
  1259. });
  1260. }
  1261. }
  1262. private _streamHandler(
  1263. extraInterceptors: ServerInterceptor[],
  1264. stream: http2.ServerHttp2Stream,
  1265. headers: http2.IncomingHttpHeaders
  1266. ) {
  1267. // for handling idle timeout
  1268. this.onStreamOpened(stream);
  1269. if (this._verifyContentType(stream, headers) !== true) {
  1270. return;
  1271. }
  1272. const path = headers[HTTP2_HEADER_PATH] as string;
  1273. const handler = this._retrieveHandler(path);
  1274. if (!handler) {
  1275. this._respondWithError(
  1276. getUnimplementedStatusResponse(path),
  1277. stream,
  1278. null
  1279. );
  1280. return;
  1281. }
  1282. const call = getServerInterceptingCall(
  1283. [...extraInterceptors, ...this.interceptors],
  1284. stream,
  1285. headers,
  1286. null,
  1287. handler,
  1288. this.options
  1289. );
  1290. if (!this._runHandlerForCall(call, handler)) {
  1291. call.sendStatus({
  1292. code: Status.INTERNAL,
  1293. details: `Unknown handler type: ${handler.type}`,
  1294. });
  1295. }
  1296. }
  1297. private _runHandlerForCall(
  1298. call: ServerInterceptingCallInterface,
  1299. handler:
  1300. | UntypedUnaryHandler
  1301. | UntypedClientStreamingHandler
  1302. | UntypedServerStreamingHandler
  1303. | UntypedBidiStreamingHandler
  1304. ): boolean {
  1305. const { type } = handler;
  1306. if (type === 'unary') {
  1307. handleUnary(call, handler);
  1308. } else if (type === 'clientStream') {
  1309. handleClientStreaming(call, handler);
  1310. } else if (type === 'serverStream') {
  1311. handleServerStreaming(call, handler);
  1312. } else if (type === 'bidi') {
  1313. handleBidiStreaming(call, handler);
  1314. } else {
  1315. return false;
  1316. }
  1317. return true;
  1318. }
  1319. private _setupHandlers(
  1320. http2Server: http2.Http2Server | http2.Http2SecureServer,
  1321. extraInterceptors: ServerInterceptor[]
  1322. ): void {
  1323. if (http2Server === null) {
  1324. return;
  1325. }
  1326. const serverAddress = http2Server.address();
  1327. let serverAddressString = 'null';
  1328. if (serverAddress) {
  1329. if (typeof serverAddress === 'string') {
  1330. serverAddressString = serverAddress;
  1331. } else {
  1332. serverAddressString = serverAddress.address + ':' + serverAddress.port;
  1333. }
  1334. }
  1335. this.serverAddressString = serverAddressString;
  1336. const handler = this.channelzEnabled
  1337. ? this._channelzHandler
  1338. : this._streamHandler;
  1339. const sessionHandler = this.channelzEnabled
  1340. ? this._channelzSessionHandler(http2Server)
  1341. : this._sessionHandler(http2Server);
  1342. http2Server.on('stream', handler.bind(this, extraInterceptors));
  1343. http2Server.on('session', sessionHandler);
  1344. }
  1345. private _sessionHandler(
  1346. http2Server: http2.Http2Server | http2.Http2SecureServer
  1347. ) {
  1348. return (session: http2.ServerHttp2Session) => {
  1349. this.http2Servers.get(http2Server)?.sessions.add(session);
  1350. let connectionAgeTimer: NodeJS.Timeout | null = null;
  1351. let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
  1352. let keepaliveTimer: NodeJS.Timeout | null = null;
  1353. let sessionClosedByServer = false;
  1354. const idleTimeoutObj = this.enableIdleTimeout(session);
  1355. if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
  1356. // Apply a random jitter within a +/-10% range
  1357. const jitterMagnitude = this.maxConnectionAgeMs / 10;
  1358. const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
  1359. connectionAgeTimer = setTimeout(() => {
  1360. sessionClosedByServer = true;
  1361. this.trace(
  1362. 'Connection dropped by max connection age: ' +
  1363. session.socket?.remoteAddress
  1364. );
  1365. try {
  1366. session.goaway(
  1367. http2.constants.NGHTTP2_NO_ERROR,
  1368. ~(1 << 31),
  1369. kMaxAge
  1370. );
  1371. } catch (e) {
  1372. // The goaway can't be sent because the session is already closed
  1373. session.destroy();
  1374. return;
  1375. }
  1376. session.close();
  1377. /* Allow a grace period after sending the GOAWAY before forcibly
  1378. * closing the connection. */
  1379. if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
  1380. connectionAgeGraceTimer = setTimeout(() => {
  1381. session.destroy();
  1382. }, this.maxConnectionAgeGraceMs);
  1383. connectionAgeGraceTimer.unref?.();
  1384. }
  1385. }, this.maxConnectionAgeMs + jitter);
  1386. connectionAgeTimer.unref?.();
  1387. }
  1388. const clearKeepaliveTimeout = () => {
  1389. if (keepaliveTimer) {
  1390. clearTimeout(keepaliveTimer);
  1391. keepaliveTimer = null;
  1392. }
  1393. };
  1394. const canSendPing = () => {
  1395. return (
  1396. !session.destroyed &&
  1397. this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS &&
  1398. this.keepaliveTimeMs > 0
  1399. );
  1400. };
  1401. /* eslint-disable-next-line prefer-const */
  1402. let sendPing: () => void; // hoisted for use in maybeStartKeepalivePingTimer
  1403. const maybeStartKeepalivePingTimer = () => {
  1404. if (!canSendPing()) {
  1405. return;
  1406. }
  1407. this.keepaliveTrace(
  1408. 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms'
  1409. );
  1410. keepaliveTimer = setTimeout(() => {
  1411. clearKeepaliveTimeout();
  1412. sendPing();
  1413. }, this.keepaliveTimeMs);
  1414. keepaliveTimer.unref?.();
  1415. };
  1416. sendPing = () => {
  1417. if (!canSendPing()) {
  1418. return;
  1419. }
  1420. this.keepaliveTrace(
  1421. 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms'
  1422. );
  1423. let pingSendError = '';
  1424. try {
  1425. const pingSentSuccessfully = session.ping(
  1426. (err: Error | null, duration: number, payload: Buffer) => {
  1427. clearKeepaliveTimeout();
  1428. if (err) {
  1429. this.keepaliveTrace('Ping failed with error: ' + err.message);
  1430. sessionClosedByServer = true;
  1431. session.close();
  1432. } else {
  1433. this.keepaliveTrace('Received ping response');
  1434. maybeStartKeepalivePingTimer();
  1435. }
  1436. }
  1437. );
  1438. if (!pingSentSuccessfully) {
  1439. pingSendError = 'Ping returned false';
  1440. }
  1441. } catch (e) {
  1442. // grpc/grpc-node#2139
  1443. pingSendError =
  1444. (e instanceof Error ? e.message : '') || 'Unknown error';
  1445. }
  1446. if (pingSendError) {
  1447. this.keepaliveTrace('Ping send failed: ' + pingSendError);
  1448. this.trace(
  1449. 'Connection dropped due to ping send error: ' + pingSendError
  1450. );
  1451. sessionClosedByServer = true;
  1452. session.close();
  1453. return;
  1454. }
  1455. keepaliveTimer = setTimeout(() => {
  1456. clearKeepaliveTimeout();
  1457. this.keepaliveTrace('Ping timeout passed without response');
  1458. this.trace('Connection dropped by keepalive timeout');
  1459. sessionClosedByServer = true;
  1460. session.close();
  1461. }, this.keepaliveTimeoutMs);
  1462. keepaliveTimer.unref?.();
  1463. };
  1464. maybeStartKeepalivePingTimer();
  1465. session.on('close', () => {
  1466. if (!sessionClosedByServer) {
  1467. this.trace(
  1468. `Connection dropped by client ${session.socket?.remoteAddress}`
  1469. );
  1470. }
  1471. if (connectionAgeTimer) {
  1472. clearTimeout(connectionAgeTimer);
  1473. }
  1474. if (connectionAgeGraceTimer) {
  1475. clearTimeout(connectionAgeGraceTimer);
  1476. }
  1477. clearKeepaliveTimeout();
  1478. if (idleTimeoutObj !== null) {
  1479. clearTimeout(idleTimeoutObj.timeout);
  1480. this.sessionIdleTimeouts.delete(session);
  1481. }
  1482. this.http2Servers.get(http2Server)?.sessions.delete(session);
  1483. });
  1484. };
  1485. }
  1486. private _channelzSessionHandler(
  1487. http2Server: http2.Http2Server | http2.Http2SecureServer
  1488. ) {
  1489. return (session: http2.ServerHttp2Session) => {
  1490. const channelzRef = registerChannelzSocket(
  1491. session.socket?.remoteAddress ?? 'unknown',
  1492. this.getChannelzSessionInfo.bind(this, session),
  1493. this.channelzEnabled
  1494. );
  1495. const channelzSessionInfo: ChannelzSessionInfo = {
  1496. ref: channelzRef,
  1497. streamTracker: new ChannelzCallTracker(),
  1498. messagesSent: 0,
  1499. messagesReceived: 0,
  1500. keepAlivesSent: 0,
  1501. lastMessageSentTimestamp: null,
  1502. lastMessageReceivedTimestamp: null,
  1503. };
  1504. this.http2Servers.get(http2Server)?.sessions.add(session);
  1505. this.sessions.set(session, channelzSessionInfo);
  1506. const clientAddress = `${session.socket.remoteAddress}:${session.socket.remotePort}`;
  1507. this.channelzTrace.addTrace(
  1508. 'CT_INFO',
  1509. 'Connection established by client ' + clientAddress
  1510. );
  1511. this.trace('Connection established by client ' + clientAddress);
  1512. this.sessionChildrenTracker.refChild(channelzRef);
  1513. let connectionAgeTimer: NodeJS.Timeout | null = null;
  1514. let connectionAgeGraceTimer: NodeJS.Timeout | null = null;
  1515. let keepaliveTimeout: NodeJS.Timeout | null = null;
  1516. let sessionClosedByServer = false;
  1517. const idleTimeoutObj = this.enableIdleTimeout(session);
  1518. if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) {
  1519. // Apply a random jitter within a +/-10% range
  1520. const jitterMagnitude = this.maxConnectionAgeMs / 10;
  1521. const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude;
  1522. connectionAgeTimer = setTimeout(() => {
  1523. sessionClosedByServer = true;
  1524. this.channelzTrace.addTrace(
  1525. 'CT_INFO',
  1526. 'Connection dropped by max connection age from ' + clientAddress
  1527. );
  1528. try {
  1529. session.goaway(
  1530. http2.constants.NGHTTP2_NO_ERROR,
  1531. ~(1 << 31),
  1532. kMaxAge
  1533. );
  1534. } catch (e) {
  1535. // The goaway can't be sent because the session is already closed
  1536. session.destroy();
  1537. return;
  1538. }
  1539. session.close();
  1540. /* Allow a grace period after sending the GOAWAY before forcibly
  1541. * closing the connection. */
  1542. if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) {
  1543. connectionAgeGraceTimer = setTimeout(() => {
  1544. session.destroy();
  1545. }, this.maxConnectionAgeGraceMs);
  1546. connectionAgeGraceTimer.unref?.();
  1547. }
  1548. }, this.maxConnectionAgeMs + jitter);
  1549. connectionAgeTimer.unref?.();
  1550. }
  1551. const clearKeepaliveTimeout = () => {
  1552. if (keepaliveTimeout) {
  1553. clearTimeout(keepaliveTimeout);
  1554. keepaliveTimeout = null;
  1555. }
  1556. };
  1557. const canSendPing = () => {
  1558. return (
  1559. !session.destroyed &&
  1560. this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS &&
  1561. this.keepaliveTimeMs > 0
  1562. );
  1563. };
  1564. /* eslint-disable-next-line prefer-const */
  1565. let sendPing: () => void; // hoisted for use in maybeStartKeepalivePingTimer
  1566. const maybeStartKeepalivePingTimer = () => {
  1567. if (!canSendPing()) {
  1568. return;
  1569. }
  1570. this.keepaliveTrace(
  1571. 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms'
  1572. );
  1573. keepaliveTimeout = setTimeout(() => {
  1574. clearKeepaliveTimeout();
  1575. sendPing();
  1576. }, this.keepaliveTimeMs);
  1577. keepaliveTimeout.unref?.();
  1578. };
  1579. sendPing = () => {
  1580. if (!canSendPing()) {
  1581. return;
  1582. }
  1583. this.keepaliveTrace(
  1584. 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms'
  1585. );
  1586. let pingSendError = '';
  1587. try {
  1588. const pingSentSuccessfully = session.ping(
  1589. (err: Error | null, duration: number, payload: Buffer) => {
  1590. clearKeepaliveTimeout();
  1591. if (err) {
  1592. this.keepaliveTrace('Ping failed with error: ' + err.message);
  1593. this.channelzTrace.addTrace(
  1594. 'CT_INFO',
  1595. 'Connection dropped due to error of a ping frame ' +
  1596. err.message +
  1597. ' return in ' +
  1598. duration
  1599. );
  1600. sessionClosedByServer = true;
  1601. session.close();
  1602. } else {
  1603. this.keepaliveTrace('Received ping response');
  1604. maybeStartKeepalivePingTimer();
  1605. }
  1606. }
  1607. );
  1608. if (!pingSentSuccessfully) {
  1609. pingSendError = 'Ping returned false';
  1610. }
  1611. } catch (e) {
  1612. // grpc/grpc-node#2139
  1613. pingSendError =
  1614. (e instanceof Error ? e.message : '') || 'Unknown error';
  1615. }
  1616. if (pingSendError) {
  1617. this.keepaliveTrace('Ping send failed: ' + pingSendError);
  1618. this.channelzTrace.addTrace(
  1619. 'CT_INFO',
  1620. 'Connection dropped due to ping send error: ' + pingSendError
  1621. );
  1622. sessionClosedByServer = true;
  1623. session.close();
  1624. return;
  1625. }
  1626. channelzSessionInfo.keepAlivesSent += 1;
  1627. keepaliveTimeout = setTimeout(() => {
  1628. clearKeepaliveTimeout();
  1629. this.keepaliveTrace('Ping timeout passed without response');
  1630. this.channelzTrace.addTrace(
  1631. 'CT_INFO',
  1632. 'Connection dropped by keepalive timeout from ' + clientAddress
  1633. );
  1634. sessionClosedByServer = true;
  1635. session.close();
  1636. }, this.keepaliveTimeoutMs);
  1637. keepaliveTimeout.unref?.();
  1638. };
  1639. maybeStartKeepalivePingTimer();
  1640. session.on('close', () => {
  1641. if (!sessionClosedByServer) {
  1642. this.channelzTrace.addTrace(
  1643. 'CT_INFO',
  1644. 'Connection dropped by client ' + clientAddress
  1645. );
  1646. }
  1647. this.sessionChildrenTracker.unrefChild(channelzRef);
  1648. unregisterChannelzRef(channelzRef);
  1649. if (connectionAgeTimer) {
  1650. clearTimeout(connectionAgeTimer);
  1651. }
  1652. if (connectionAgeGraceTimer) {
  1653. clearTimeout(connectionAgeGraceTimer);
  1654. }
  1655. clearKeepaliveTimeout();
  1656. if (idleTimeoutObj !== null) {
  1657. clearTimeout(idleTimeoutObj.timeout);
  1658. this.sessionIdleTimeouts.delete(session);
  1659. }
  1660. this.http2Servers.get(http2Server)?.sessions.delete(session);
  1661. this.sessions.delete(session);
  1662. });
  1663. };
  1664. }
  1665. private enableIdleTimeout(
  1666. session: http2.ServerHttp2Session
  1667. ): SessionIdleTimeoutTracker | null {
  1668. if (this.sessionIdleTimeout >= MAX_CONNECTION_IDLE_MS) {
  1669. return null;
  1670. }
  1671. const idleTimeoutObj: SessionIdleTimeoutTracker = {
  1672. activeStreams: 0,
  1673. lastIdle: Date.now(),
  1674. onClose: this.onStreamClose.bind(this, session),
  1675. timeout: setTimeout(
  1676. this.onIdleTimeout,
  1677. this.sessionIdleTimeout,
  1678. this,
  1679. session
  1680. ),
  1681. };
  1682. idleTimeoutObj.timeout.unref?.();
  1683. this.sessionIdleTimeouts.set(session, idleTimeoutObj);
  1684. const { socket } = session;
  1685. this.trace(
  1686. 'Enable idle timeout for ' +
  1687. socket.remoteAddress +
  1688. ':' +
  1689. socket.remotePort
  1690. );
  1691. return idleTimeoutObj;
  1692. }
  1693. private onIdleTimeout(
  1694. this: undefined,
  1695. ctx: Server,
  1696. session: http2.ServerHttp2Session
  1697. ) {
  1698. const { socket } = session;
  1699. const sessionInfo = ctx.sessionIdleTimeouts.get(session);
  1700. // if it is called while we have activeStreams - timer will not be rescheduled
  1701. // until last active stream is closed, then it will call .refresh() on the timer
  1702. // important part is to not clearTimeout(timer) or it becomes unusable
  1703. // for future refreshes
  1704. if (
  1705. sessionInfo !== undefined &&
  1706. sessionInfo.activeStreams === 0
  1707. ) {
  1708. if (Date.now() - sessionInfo.lastIdle >= ctx.sessionIdleTimeout) {
  1709. ctx.trace(
  1710. 'Session idle timeout triggered for ' +
  1711. socket?.remoteAddress +
  1712. ':' +
  1713. socket?.remotePort +
  1714. ' last idle at ' +
  1715. sessionInfo.lastIdle
  1716. );
  1717. ctx.closeSession(session);
  1718. } else {
  1719. sessionInfo.timeout.refresh();
  1720. }
  1721. }
  1722. }
  1723. private onStreamOpened(stream: http2.ServerHttp2Stream) {
  1724. const session = stream.session as http2.ServerHttp2Session;
  1725. const idleTimeoutObj = this.sessionIdleTimeouts.get(session);
  1726. if (idleTimeoutObj) {
  1727. idleTimeoutObj.activeStreams += 1;
  1728. stream.once('close', idleTimeoutObj.onClose);
  1729. }
  1730. }
  1731. private onStreamClose(session: http2.ServerHttp2Session) {
  1732. const idleTimeoutObj = this.sessionIdleTimeouts.get(session);
  1733. if (idleTimeoutObj) {
  1734. idleTimeoutObj.activeStreams -= 1;
  1735. if (idleTimeoutObj.activeStreams === 0) {
  1736. idleTimeoutObj.lastIdle = Date.now();
  1737. idleTimeoutObj.timeout.refresh();
  1738. this.trace(
  1739. 'Session onStreamClose' +
  1740. session.socket?.remoteAddress +
  1741. ':' +
  1742. session.socket?.remotePort +
  1743. ' at ' +
  1744. idleTimeoutObj.lastIdle
  1745. );
  1746. }
  1747. }
  1748. }
  1749. }
  1750. async function handleUnary<RequestType, ResponseType>(
  1751. call: ServerInterceptingCallInterface,
  1752. handler: UnaryHandler<RequestType, ResponseType>
  1753. ): Promise<void> {
  1754. let stream: ServerUnaryCall<RequestType, ResponseType>;
  1755. function respond(
  1756. err: ServerErrorResponse | ServerStatusResponse | null,
  1757. value?: ResponseType | null,
  1758. trailer?: Metadata,
  1759. flags?: number
  1760. ) {
  1761. if (err) {
  1762. call.sendStatus(serverErrorToStatus(err, trailer));
  1763. return;
  1764. }
  1765. call.sendMessage(value, () => {
  1766. call.sendStatus({
  1767. code: Status.OK,
  1768. details: 'OK',
  1769. metadata: trailer ?? null,
  1770. });
  1771. });
  1772. }
  1773. let requestMetadata: Metadata;
  1774. let requestMessage: RequestType | null = null;
  1775. call.start({
  1776. onReceiveMetadata(metadata) {
  1777. requestMetadata = metadata;
  1778. call.startRead();
  1779. },
  1780. onReceiveMessage(message) {
  1781. if (requestMessage) {
  1782. call.sendStatus({
  1783. code: Status.UNIMPLEMENTED,
  1784. details: `Received a second request message for server streaming method ${handler.path}`,
  1785. metadata: null,
  1786. });
  1787. return;
  1788. }
  1789. requestMessage = message;
  1790. call.startRead();
  1791. },
  1792. onReceiveHalfClose() {
  1793. if (!requestMessage) {
  1794. call.sendStatus({
  1795. code: Status.UNIMPLEMENTED,
  1796. details: `Received no request message for server streaming method ${handler.path}`,
  1797. metadata: null,
  1798. });
  1799. return;
  1800. }
  1801. stream = new ServerWritableStreamImpl(
  1802. handler.path,
  1803. call,
  1804. requestMetadata,
  1805. requestMessage
  1806. );
  1807. try {
  1808. handler.func(stream, respond);
  1809. } catch (err) {
  1810. call.sendStatus({
  1811. code: Status.UNKNOWN,
  1812. details: `Server method handler threw error ${
  1813. (err as Error).message
  1814. }`,
  1815. metadata: null,
  1816. });
  1817. }
  1818. },
  1819. onCancel() {
  1820. if (stream) {
  1821. stream.cancelled = true;
  1822. stream.emit('cancelled', 'cancelled');
  1823. }
  1824. },
  1825. });
  1826. }
  1827. function handleClientStreaming<RequestType, ResponseType>(
  1828. call: ServerInterceptingCallInterface,
  1829. handler: ClientStreamingHandler<RequestType, ResponseType>
  1830. ): void {
  1831. let stream: ServerReadableStream<RequestType, ResponseType>;
  1832. function respond(
  1833. err: ServerErrorResponse | ServerStatusResponse | null,
  1834. value?: ResponseType | null,
  1835. trailer?: Metadata,
  1836. flags?: number
  1837. ) {
  1838. if (err) {
  1839. call.sendStatus(serverErrorToStatus(err, trailer));
  1840. return;
  1841. }
  1842. call.sendMessage(value, () => {
  1843. call.sendStatus({
  1844. code: Status.OK,
  1845. details: 'OK',
  1846. metadata: trailer ?? null,
  1847. });
  1848. });
  1849. }
  1850. call.start({
  1851. onReceiveMetadata(metadata) {
  1852. stream = new ServerDuplexStreamImpl(handler.path, call, metadata);
  1853. try {
  1854. handler.func(stream, respond);
  1855. } catch (err) {
  1856. call.sendStatus({
  1857. code: Status.UNKNOWN,
  1858. details: `Server method handler threw error ${
  1859. (err as Error).message
  1860. }`,
  1861. metadata: null,
  1862. });
  1863. }
  1864. },
  1865. onReceiveMessage(message) {
  1866. stream.push(message);
  1867. },
  1868. onReceiveHalfClose() {
  1869. stream.push(null);
  1870. },
  1871. onCancel() {
  1872. if (stream) {
  1873. stream.cancelled = true;
  1874. stream.emit('cancelled', 'cancelled');
  1875. stream.destroy();
  1876. }
  1877. },
  1878. });
  1879. }
  1880. function handleServerStreaming<RequestType, ResponseType>(
  1881. call: ServerInterceptingCallInterface,
  1882. handler: ServerStreamingHandler<RequestType, ResponseType>
  1883. ): void {
  1884. let stream: ServerWritableStream<RequestType, ResponseType>;
  1885. let requestMetadata: Metadata;
  1886. let requestMessage: RequestType | null = null;
  1887. call.start({
  1888. onReceiveMetadata(metadata) {
  1889. requestMetadata = metadata;
  1890. call.startRead();
  1891. },
  1892. onReceiveMessage(message) {
  1893. if (requestMessage) {
  1894. call.sendStatus({
  1895. code: Status.UNIMPLEMENTED,
  1896. details: `Received a second request message for server streaming method ${handler.path}`,
  1897. metadata: null,
  1898. });
  1899. return;
  1900. }
  1901. requestMessage = message;
  1902. call.startRead();
  1903. },
  1904. onReceiveHalfClose() {
  1905. if (!requestMessage) {
  1906. call.sendStatus({
  1907. code: Status.UNIMPLEMENTED,
  1908. details: `Received no request message for server streaming method ${handler.path}`,
  1909. metadata: null,
  1910. });
  1911. return;
  1912. }
  1913. stream = new ServerWritableStreamImpl(
  1914. handler.path,
  1915. call,
  1916. requestMetadata,
  1917. requestMessage
  1918. );
  1919. try {
  1920. handler.func(stream);
  1921. } catch (err) {
  1922. call.sendStatus({
  1923. code: Status.UNKNOWN,
  1924. details: `Server method handler threw error ${
  1925. (err as Error).message
  1926. }`,
  1927. metadata: null,
  1928. });
  1929. }
  1930. },
  1931. onCancel() {
  1932. if (stream) {
  1933. stream.cancelled = true;
  1934. stream.emit('cancelled', 'cancelled');
  1935. stream.destroy();
  1936. }
  1937. },
  1938. });
  1939. }
  1940. function handleBidiStreaming<RequestType, ResponseType>(
  1941. call: ServerInterceptingCallInterface,
  1942. handler: BidiStreamingHandler<RequestType, ResponseType>
  1943. ): void {
  1944. let stream: ServerDuplexStream<RequestType, ResponseType>;
  1945. call.start({
  1946. onReceiveMetadata(metadata) {
  1947. stream = new ServerDuplexStreamImpl(handler.path, call, metadata);
  1948. try {
  1949. handler.func(stream);
  1950. } catch (err) {
  1951. call.sendStatus({
  1952. code: Status.UNKNOWN,
  1953. details: `Server method handler threw error ${
  1954. (err as Error).message
  1955. }`,
  1956. metadata: null,
  1957. });
  1958. }
  1959. },
  1960. onReceiveMessage(message) {
  1961. stream.push(message);
  1962. },
  1963. onReceiveHalfClose() {
  1964. stream.push(null);
  1965. },
  1966. onCancel() {
  1967. if (stream) {
  1968. stream.cancelled = true;
  1969. stream.emit('cancelled', 'cancelled');
  1970. stream.destroy();
  1971. }
  1972. },
  1973. });
  1974. }