client-interceptors.ts 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577
  1. /*
  2. * Copyright 2019 gRPC authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. */
  17. import { Metadata } from './metadata';
  18. import {
  19. StatusObject,
  20. Listener,
  21. MetadataListener,
  22. MessageListener,
  23. StatusListener,
  24. FullListener,
  25. InterceptingListener,
  26. InterceptingListenerImpl,
  27. isInterceptingListener,
  28. MessageContext,
  29. Call,
  30. } from './call-interface';
  31. import { Status } from './constants';
  32. import { Channel } from './channel';
  33. import { CallOptions } from './client';
  34. import { ClientMethodDefinition } from './make-client';
  35. import { getErrorMessage } from './error';
  36. /**
  37. * Error class associated with passing both interceptors and interceptor
  38. * providers to a client constructor or as call options.
  39. */
  40. export class InterceptorConfigurationError extends Error {
  41. constructor(message: string) {
  42. super(message);
  43. this.name = 'InterceptorConfigurationError';
  44. Error.captureStackTrace(this, InterceptorConfigurationError);
  45. }
  46. }
  47. export interface MetadataRequester {
  48. (
  49. metadata: Metadata,
  50. listener: InterceptingListener,
  51. next: (
  52. metadata: Metadata,
  53. listener: InterceptingListener | Listener
  54. ) => void
  55. ): void;
  56. }
  57. export interface MessageRequester {
  58. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  59. (message: any, next: (message: any) => void): void;
  60. }
  61. export interface CloseRequester {
  62. (next: () => void): void;
  63. }
  64. export interface CancelRequester {
  65. (next: () => void): void;
  66. }
  67. /**
  68. * An object with methods for intercepting and modifying outgoing call operations.
  69. */
  70. export interface FullRequester {
  71. start: MetadataRequester;
  72. sendMessage: MessageRequester;
  73. halfClose: CloseRequester;
  74. cancel: CancelRequester;
  75. }
  76. export type Requester = Partial<FullRequester>;
  77. export class ListenerBuilder {
  78. private metadata: MetadataListener | undefined = undefined;
  79. private message: MessageListener | undefined = undefined;
  80. private status: StatusListener | undefined = undefined;
  81. withOnReceiveMetadata(onReceiveMetadata: MetadataListener): this {
  82. this.metadata = onReceiveMetadata;
  83. return this;
  84. }
  85. withOnReceiveMessage(onReceiveMessage: MessageListener): this {
  86. this.message = onReceiveMessage;
  87. return this;
  88. }
  89. withOnReceiveStatus(onReceiveStatus: StatusListener): this {
  90. this.status = onReceiveStatus;
  91. return this;
  92. }
  93. build(): Listener {
  94. return {
  95. onReceiveMetadata: this.metadata,
  96. onReceiveMessage: this.message,
  97. onReceiveStatus: this.status,
  98. };
  99. }
  100. }
  101. export class RequesterBuilder {
  102. private start: MetadataRequester | undefined = undefined;
  103. private message: MessageRequester | undefined = undefined;
  104. private halfClose: CloseRequester | undefined = undefined;
  105. private cancel: CancelRequester | undefined = undefined;
  106. withStart(start: MetadataRequester): this {
  107. this.start = start;
  108. return this;
  109. }
  110. withSendMessage(sendMessage: MessageRequester): this {
  111. this.message = sendMessage;
  112. return this;
  113. }
  114. withHalfClose(halfClose: CloseRequester): this {
  115. this.halfClose = halfClose;
  116. return this;
  117. }
  118. withCancel(cancel: CancelRequester): this {
  119. this.cancel = cancel;
  120. return this;
  121. }
  122. build(): Requester {
  123. return {
  124. start: this.start,
  125. sendMessage: this.message,
  126. halfClose: this.halfClose,
  127. cancel: this.cancel,
  128. };
  129. }
  130. }
  131. /**
  132. * A Listener with a default pass-through implementation of each method. Used
  133. * for filling out Listeners with some methods omitted.
  134. */
  135. const defaultListener: FullListener = {
  136. onReceiveMetadata: (metadata, next) => {
  137. next(metadata);
  138. },
  139. onReceiveMessage: (message, next) => {
  140. next(message);
  141. },
  142. onReceiveStatus: (status, next) => {
  143. next(status);
  144. },
  145. };
  146. /**
  147. * A Requester with a default pass-through implementation of each method. Used
  148. * for filling out Requesters with some methods omitted.
  149. */
  150. const defaultRequester: FullRequester = {
  151. start: (metadata, listener, next) => {
  152. next(metadata, listener);
  153. },
  154. sendMessage: (message, next) => {
  155. next(message);
  156. },
  157. halfClose: next => {
  158. next();
  159. },
  160. cancel: next => {
  161. next();
  162. },
  163. };
  164. export interface InterceptorOptions extends CallOptions {
  165. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  166. method_definition: ClientMethodDefinition<any, any>;
  167. }
  168. export interface InterceptingCallInterface {
  169. cancelWithStatus(status: Status, details: string): void;
  170. getPeer(): string;
  171. start(metadata: Metadata, listener?: Partial<InterceptingListener>): void;
  172. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  173. sendMessageWithContext(context: MessageContext, message: any): void;
  174. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  175. sendMessage(message: any): void;
  176. startRead(): void;
  177. halfClose(): void;
  178. }
  179. export class InterceptingCall implements InterceptingCallInterface {
  180. /**
  181. * The requester that this InterceptingCall uses to modify outgoing operations
  182. */
  183. private requester: FullRequester;
  184. /**
  185. * Indicates that metadata has been passed to the requester's start
  186. * method but it has not been passed to the corresponding next callback
  187. */
  188. private processingMetadata = false;
  189. /**
  190. * Message context for a pending message that is waiting for
  191. */
  192. private pendingMessageContext: MessageContext | null = null;
  193. private pendingMessage: any;
  194. /**
  195. * Indicates that a message has been passed to the requester's sendMessage
  196. * method but it has not been passed to the corresponding next callback
  197. */
  198. private processingMessage = false;
  199. /**
  200. * Indicates that a status was received but could not be propagated because
  201. * a message was still being processed.
  202. */
  203. private pendingHalfClose = false;
  204. constructor(
  205. private nextCall: InterceptingCallInterface,
  206. requester?: Requester
  207. ) {
  208. if (requester) {
  209. this.requester = {
  210. start: requester.start ?? defaultRequester.start,
  211. sendMessage: requester.sendMessage ?? defaultRequester.sendMessage,
  212. halfClose: requester.halfClose ?? defaultRequester.halfClose,
  213. cancel: requester.cancel ?? defaultRequester.cancel,
  214. };
  215. } else {
  216. this.requester = defaultRequester;
  217. }
  218. }
  219. cancelWithStatus(status: Status, details: string) {
  220. this.requester.cancel(() => {
  221. this.nextCall.cancelWithStatus(status, details);
  222. });
  223. }
  224. getPeer() {
  225. return this.nextCall.getPeer();
  226. }
  227. private processPendingMessage() {
  228. if (this.pendingMessageContext) {
  229. this.nextCall.sendMessageWithContext(
  230. this.pendingMessageContext,
  231. this.pendingMessage
  232. );
  233. this.pendingMessageContext = null;
  234. this.pendingMessage = null;
  235. }
  236. }
  237. private processPendingHalfClose() {
  238. if (this.pendingHalfClose) {
  239. this.nextCall.halfClose();
  240. }
  241. }
  242. start(
  243. metadata: Metadata,
  244. interceptingListener?: Partial<InterceptingListener>
  245. ): void {
  246. const fullInterceptingListener: InterceptingListener = {
  247. onReceiveMetadata:
  248. interceptingListener?.onReceiveMetadata?.bind(interceptingListener) ??
  249. (metadata => {}),
  250. onReceiveMessage:
  251. interceptingListener?.onReceiveMessage?.bind(interceptingListener) ??
  252. (message => {}),
  253. onReceiveStatus:
  254. interceptingListener?.onReceiveStatus?.bind(interceptingListener) ??
  255. (status => {}),
  256. };
  257. this.processingMetadata = true;
  258. this.requester.start(metadata, fullInterceptingListener, (md, listener) => {
  259. this.processingMetadata = false;
  260. let finalInterceptingListener: InterceptingListener;
  261. if (isInterceptingListener(listener)) {
  262. finalInterceptingListener = listener;
  263. } else {
  264. const fullListener: FullListener = {
  265. onReceiveMetadata:
  266. listener.onReceiveMetadata ?? defaultListener.onReceiveMetadata,
  267. onReceiveMessage:
  268. listener.onReceiveMessage ?? defaultListener.onReceiveMessage,
  269. onReceiveStatus:
  270. listener.onReceiveStatus ?? defaultListener.onReceiveStatus,
  271. };
  272. finalInterceptingListener = new InterceptingListenerImpl(
  273. fullListener,
  274. fullInterceptingListener
  275. );
  276. }
  277. this.nextCall.start(md, finalInterceptingListener);
  278. this.processPendingMessage();
  279. this.processPendingHalfClose();
  280. });
  281. }
  282. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  283. sendMessageWithContext(context: MessageContext, message: any): void {
  284. this.processingMessage = true;
  285. this.requester.sendMessage(message, finalMessage => {
  286. this.processingMessage = false;
  287. if (this.processingMetadata) {
  288. this.pendingMessageContext = context;
  289. this.pendingMessage = message;
  290. } else {
  291. this.nextCall.sendMessageWithContext(context, finalMessage);
  292. this.processPendingHalfClose();
  293. }
  294. });
  295. }
  296. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  297. sendMessage(message: any): void {
  298. this.sendMessageWithContext({}, message);
  299. }
  300. startRead(): void {
  301. this.nextCall.startRead();
  302. }
  303. halfClose(): void {
  304. this.requester.halfClose(() => {
  305. if (this.processingMetadata || this.processingMessage) {
  306. this.pendingHalfClose = true;
  307. } else {
  308. this.nextCall.halfClose();
  309. }
  310. });
  311. }
  312. }
  313. function getCall(channel: Channel, path: string, options: CallOptions): Call {
  314. const deadline = options.deadline ?? Infinity;
  315. const host = options.host;
  316. const parent = options.parent ?? null;
  317. const propagateFlags = options.propagate_flags;
  318. const credentials = options.credentials;
  319. const call = channel.createCall(path, deadline, host, parent, propagateFlags);
  320. if (credentials) {
  321. call.setCredentials(credentials);
  322. }
  323. return call;
  324. }
  325. /**
  326. * InterceptingCall implementation that directly owns the underlying Call
  327. * object and handles serialization and deseraizliation.
  328. */
  329. class BaseInterceptingCall implements InterceptingCallInterface {
  330. constructor(
  331. protected call: Call,
  332. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  333. protected methodDefinition: ClientMethodDefinition<any, any>
  334. ) {}
  335. cancelWithStatus(status: Status, details: string): void {
  336. this.call.cancelWithStatus(status, details);
  337. }
  338. getPeer(): string {
  339. return this.call.getPeer();
  340. }
  341. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  342. sendMessageWithContext(context: MessageContext, message: any): void {
  343. let serialized: Buffer;
  344. try {
  345. serialized = this.methodDefinition.requestSerialize(message);
  346. } catch (e) {
  347. this.call.cancelWithStatus(
  348. Status.INTERNAL,
  349. `Request message serialization failure: ${getErrorMessage(e)}`
  350. );
  351. return;
  352. }
  353. this.call.sendMessageWithContext(context, serialized);
  354. }
  355. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  356. sendMessage(message: any) {
  357. this.sendMessageWithContext({}, message);
  358. }
  359. start(
  360. metadata: Metadata,
  361. interceptingListener?: Partial<InterceptingListener>
  362. ): void {
  363. let readError: StatusObject | null = null;
  364. this.call.start(metadata, {
  365. onReceiveMetadata: metadata => {
  366. interceptingListener?.onReceiveMetadata?.(metadata);
  367. },
  368. onReceiveMessage: message => {
  369. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  370. let deserialized: any;
  371. try {
  372. deserialized = this.methodDefinition.responseDeserialize(message);
  373. } catch (e) {
  374. readError = {
  375. code: Status.INTERNAL,
  376. details: `Response message parsing error: ${getErrorMessage(e)}`,
  377. metadata: new Metadata(),
  378. };
  379. this.call.cancelWithStatus(readError.code, readError.details);
  380. return;
  381. }
  382. interceptingListener?.onReceiveMessage?.(deserialized);
  383. },
  384. onReceiveStatus: status => {
  385. if (readError) {
  386. interceptingListener?.onReceiveStatus?.(readError);
  387. } else {
  388. interceptingListener?.onReceiveStatus?.(status);
  389. }
  390. },
  391. });
  392. }
  393. startRead() {
  394. this.call.startRead();
  395. }
  396. halfClose(): void {
  397. this.call.halfClose();
  398. }
  399. }
  400. /**
  401. * BaseInterceptingCall with special-cased behavior for methods with unary
  402. * responses.
  403. */
  404. class BaseUnaryInterceptingCall
  405. extends BaseInterceptingCall
  406. implements InterceptingCallInterface
  407. {
  408. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  409. constructor(call: Call, methodDefinition: ClientMethodDefinition<any, any>) {
  410. super(call, methodDefinition);
  411. }
  412. start(metadata: Metadata, listener?: Partial<InterceptingListener>): void {
  413. let receivedMessage = false;
  414. const wrapperListener: InterceptingListener = {
  415. onReceiveMetadata:
  416. listener?.onReceiveMetadata?.bind(listener) ?? (metadata => {}),
  417. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  418. onReceiveMessage: (message: any) => {
  419. receivedMessage = true;
  420. listener?.onReceiveMessage?.(message);
  421. },
  422. onReceiveStatus: (status: StatusObject) => {
  423. if (!receivedMessage) {
  424. listener?.onReceiveMessage?.(null);
  425. }
  426. listener?.onReceiveStatus?.(status);
  427. },
  428. };
  429. super.start(metadata, wrapperListener);
  430. this.call.startRead();
  431. }
  432. }
  433. /**
  434. * BaseInterceptingCall with special-cased behavior for methods with streaming
  435. * responses.
  436. */
  437. class BaseStreamingInterceptingCall
  438. extends BaseInterceptingCall
  439. implements InterceptingCallInterface {}
  440. function getBottomInterceptingCall(
  441. channel: Channel,
  442. options: InterceptorOptions,
  443. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  444. methodDefinition: ClientMethodDefinition<any, any>
  445. ) {
  446. const call = getCall(channel, methodDefinition.path, options);
  447. if (methodDefinition.responseStream) {
  448. return new BaseStreamingInterceptingCall(call, methodDefinition);
  449. } else {
  450. return new BaseUnaryInterceptingCall(call, methodDefinition);
  451. }
  452. }
  453. export interface NextCall {
  454. (options: InterceptorOptions): InterceptingCallInterface;
  455. }
  456. export interface Interceptor {
  457. (options: InterceptorOptions, nextCall: NextCall): InterceptingCall;
  458. }
  459. export interface InterceptorProvider {
  460. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  461. (methodDefinition: ClientMethodDefinition<any, any>): Interceptor;
  462. }
  463. export interface InterceptorArguments {
  464. clientInterceptors: Interceptor[];
  465. clientInterceptorProviders: InterceptorProvider[];
  466. callInterceptors: Interceptor[];
  467. callInterceptorProviders: InterceptorProvider[];
  468. }
  469. export function getInterceptingCall(
  470. interceptorArgs: InterceptorArguments,
  471. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  472. methodDefinition: ClientMethodDefinition<any, any>,
  473. options: CallOptions,
  474. channel: Channel
  475. ): InterceptingCallInterface {
  476. if (
  477. interceptorArgs.clientInterceptors.length > 0 &&
  478. interceptorArgs.clientInterceptorProviders.length > 0
  479. ) {
  480. throw new InterceptorConfigurationError(
  481. 'Both interceptors and interceptor_providers were passed as options ' +
  482. 'to the client constructor. Only one of these is allowed.'
  483. );
  484. }
  485. if (
  486. interceptorArgs.callInterceptors.length > 0 &&
  487. interceptorArgs.callInterceptorProviders.length > 0
  488. ) {
  489. throw new InterceptorConfigurationError(
  490. 'Both interceptors and interceptor_providers were passed as call ' +
  491. 'options. Only one of these is allowed.'
  492. );
  493. }
  494. let interceptors: Interceptor[] = [];
  495. // Interceptors passed to the call override interceptors passed to the client constructor
  496. if (
  497. interceptorArgs.callInterceptors.length > 0 ||
  498. interceptorArgs.callInterceptorProviders.length > 0
  499. ) {
  500. interceptors = ([] as Interceptor[])
  501. .concat(
  502. interceptorArgs.callInterceptors,
  503. interceptorArgs.callInterceptorProviders.map(provider =>
  504. provider(methodDefinition)
  505. )
  506. )
  507. .filter(interceptor => interceptor);
  508. // Filter out falsy values when providers return nothing
  509. } else {
  510. interceptors = ([] as Interceptor[])
  511. .concat(
  512. interceptorArgs.clientInterceptors,
  513. interceptorArgs.clientInterceptorProviders.map(provider =>
  514. provider(methodDefinition)
  515. )
  516. )
  517. .filter(interceptor => interceptor);
  518. // Filter out falsy values when providers return nothing
  519. }
  520. const interceptorOptions = Object.assign({}, options, {
  521. method_definition: methodDefinition,
  522. });
  523. /* For each interceptor in the list, the nextCall function passed to it is
  524. * based on the next interceptor in the list, using a nextCall function
  525. * constructed with the following interceptor in the list, and so on. The
  526. * initialValue, which is effectively at the end of the list, is a nextCall
  527. * function that invokes getBottomInterceptingCall, the result of which
  528. * handles (de)serialization and also gets the underlying call from the
  529. * channel. */
  530. const getCall: NextCall = interceptors.reduceRight<NextCall>(
  531. (nextCall: NextCall, nextInterceptor: Interceptor) => {
  532. return currentOptions => nextInterceptor(currentOptions, nextCall);
  533. },
  534. (finalOptions: InterceptorOptions) =>
  535. getBottomInterceptingCall(channel, finalOptions, methodDefinition)
  536. );
  537. return getCall(interceptorOptions);
  538. }