123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577 |
- /*
- * Copyright 2019 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- import { Metadata } from './metadata';
- import {
- StatusObject,
- Listener,
- MetadataListener,
- MessageListener,
- StatusListener,
- FullListener,
- InterceptingListener,
- InterceptingListenerImpl,
- isInterceptingListener,
- MessageContext,
- Call,
- } from './call-interface';
- import { Status } from './constants';
- import { Channel } from './channel';
- import { CallOptions } from './client';
- import { ClientMethodDefinition } from './make-client';
- import { getErrorMessage } from './error';
- /**
- * Error class associated with passing both interceptors and interceptor
- * providers to a client constructor or as call options.
- */
- export class InterceptorConfigurationError extends Error {
- constructor(message: string) {
- super(message);
- this.name = 'InterceptorConfigurationError';
- Error.captureStackTrace(this, InterceptorConfigurationError);
- }
- }
- export interface MetadataRequester {
- (
- metadata: Metadata,
- listener: InterceptingListener,
- next: (
- metadata: Metadata,
- listener: InterceptingListener | Listener
- ) => void
- ): void;
- }
- export interface MessageRequester {
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- (message: any, next: (message: any) => void): void;
- }
- export interface CloseRequester {
- (next: () => void): void;
- }
- export interface CancelRequester {
- (next: () => void): void;
- }
- /**
- * An object with methods for intercepting and modifying outgoing call operations.
- */
- export interface FullRequester {
- start: MetadataRequester;
- sendMessage: MessageRequester;
- halfClose: CloseRequester;
- cancel: CancelRequester;
- }
- export type Requester = Partial<FullRequester>;
- export class ListenerBuilder {
- private metadata: MetadataListener | undefined = undefined;
- private message: MessageListener | undefined = undefined;
- private status: StatusListener | undefined = undefined;
- withOnReceiveMetadata(onReceiveMetadata: MetadataListener): this {
- this.metadata = onReceiveMetadata;
- return this;
- }
- withOnReceiveMessage(onReceiveMessage: MessageListener): this {
- this.message = onReceiveMessage;
- return this;
- }
- withOnReceiveStatus(onReceiveStatus: StatusListener): this {
- this.status = onReceiveStatus;
- return this;
- }
- build(): Listener {
- return {
- onReceiveMetadata: this.metadata,
- onReceiveMessage: this.message,
- onReceiveStatus: this.status,
- };
- }
- }
- export class RequesterBuilder {
- private start: MetadataRequester | undefined = undefined;
- private message: MessageRequester | undefined = undefined;
- private halfClose: CloseRequester | undefined = undefined;
- private cancel: CancelRequester | undefined = undefined;
- withStart(start: MetadataRequester): this {
- this.start = start;
- return this;
- }
- withSendMessage(sendMessage: MessageRequester): this {
- this.message = sendMessage;
- return this;
- }
- withHalfClose(halfClose: CloseRequester): this {
- this.halfClose = halfClose;
- return this;
- }
- withCancel(cancel: CancelRequester): this {
- this.cancel = cancel;
- return this;
- }
- build(): Requester {
- return {
- start: this.start,
- sendMessage: this.message,
- halfClose: this.halfClose,
- cancel: this.cancel,
- };
- }
- }
- /**
- * A Listener with a default pass-through implementation of each method. Used
- * for filling out Listeners with some methods omitted.
- */
- const defaultListener: FullListener = {
- onReceiveMetadata: (metadata, next) => {
- next(metadata);
- },
- onReceiveMessage: (message, next) => {
- next(message);
- },
- onReceiveStatus: (status, next) => {
- next(status);
- },
- };
- /**
- * A Requester with a default pass-through implementation of each method. Used
- * for filling out Requesters with some methods omitted.
- */
- const defaultRequester: FullRequester = {
- start: (metadata, listener, next) => {
- next(metadata, listener);
- },
- sendMessage: (message, next) => {
- next(message);
- },
- halfClose: next => {
- next();
- },
- cancel: next => {
- next();
- },
- };
- export interface InterceptorOptions extends CallOptions {
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- method_definition: ClientMethodDefinition<any, any>;
- }
- export interface InterceptingCallInterface {
- cancelWithStatus(status: Status, details: string): void;
- getPeer(): string;
- start(metadata: Metadata, listener?: Partial<InterceptingListener>): void;
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- sendMessageWithContext(context: MessageContext, message: any): void;
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- sendMessage(message: any): void;
- startRead(): void;
- halfClose(): void;
- }
- export class InterceptingCall implements InterceptingCallInterface {
- /**
- * The requester that this InterceptingCall uses to modify outgoing operations
- */
- private requester: FullRequester;
- /**
- * Indicates that metadata has been passed to the requester's start
- * method but it has not been passed to the corresponding next callback
- */
- private processingMetadata = false;
- /**
- * Message context for a pending message that is waiting for
- */
- private pendingMessageContext: MessageContext | null = null;
- private pendingMessage: any;
- /**
- * Indicates that a message has been passed to the requester's sendMessage
- * method but it has not been passed to the corresponding next callback
- */
- private processingMessage = false;
- /**
- * Indicates that a status was received but could not be propagated because
- * a message was still being processed.
- */
- private pendingHalfClose = false;
- constructor(
- private nextCall: InterceptingCallInterface,
- requester?: Requester
- ) {
- if (requester) {
- this.requester = {
- start: requester.start ?? defaultRequester.start,
- sendMessage: requester.sendMessage ?? defaultRequester.sendMessage,
- halfClose: requester.halfClose ?? defaultRequester.halfClose,
- cancel: requester.cancel ?? defaultRequester.cancel,
- };
- } else {
- this.requester = defaultRequester;
- }
- }
- cancelWithStatus(status: Status, details: string) {
- this.requester.cancel(() => {
- this.nextCall.cancelWithStatus(status, details);
- });
- }
- getPeer() {
- return this.nextCall.getPeer();
- }
- private processPendingMessage() {
- if (this.pendingMessageContext) {
- this.nextCall.sendMessageWithContext(
- this.pendingMessageContext,
- this.pendingMessage
- );
- this.pendingMessageContext = null;
- this.pendingMessage = null;
- }
- }
- private processPendingHalfClose() {
- if (this.pendingHalfClose) {
- this.nextCall.halfClose();
- }
- }
- start(
- metadata: Metadata,
- interceptingListener?: Partial<InterceptingListener>
- ): void {
- const fullInterceptingListener: InterceptingListener = {
- onReceiveMetadata:
- interceptingListener?.onReceiveMetadata?.bind(interceptingListener) ??
- (metadata => {}),
- onReceiveMessage:
- interceptingListener?.onReceiveMessage?.bind(interceptingListener) ??
- (message => {}),
- onReceiveStatus:
- interceptingListener?.onReceiveStatus?.bind(interceptingListener) ??
- (status => {}),
- };
- this.processingMetadata = true;
- this.requester.start(metadata, fullInterceptingListener, (md, listener) => {
- this.processingMetadata = false;
- let finalInterceptingListener: InterceptingListener;
- if (isInterceptingListener(listener)) {
- finalInterceptingListener = listener;
- } else {
- const fullListener: FullListener = {
- onReceiveMetadata:
- listener.onReceiveMetadata ?? defaultListener.onReceiveMetadata,
- onReceiveMessage:
- listener.onReceiveMessage ?? defaultListener.onReceiveMessage,
- onReceiveStatus:
- listener.onReceiveStatus ?? defaultListener.onReceiveStatus,
- };
- finalInterceptingListener = new InterceptingListenerImpl(
- fullListener,
- fullInterceptingListener
- );
- }
- this.nextCall.start(md, finalInterceptingListener);
- this.processPendingMessage();
- this.processPendingHalfClose();
- });
- }
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- sendMessageWithContext(context: MessageContext, message: any): void {
- this.processingMessage = true;
- this.requester.sendMessage(message, finalMessage => {
- this.processingMessage = false;
- if (this.processingMetadata) {
- this.pendingMessageContext = context;
- this.pendingMessage = message;
- } else {
- this.nextCall.sendMessageWithContext(context, finalMessage);
- this.processPendingHalfClose();
- }
- });
- }
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- sendMessage(message: any): void {
- this.sendMessageWithContext({}, message);
- }
- startRead(): void {
- this.nextCall.startRead();
- }
- halfClose(): void {
- this.requester.halfClose(() => {
- if (this.processingMetadata || this.processingMessage) {
- this.pendingHalfClose = true;
- } else {
- this.nextCall.halfClose();
- }
- });
- }
- }
- function getCall(channel: Channel, path: string, options: CallOptions): Call {
- const deadline = options.deadline ?? Infinity;
- const host = options.host;
- const parent = options.parent ?? null;
- const propagateFlags = options.propagate_flags;
- const credentials = options.credentials;
- const call = channel.createCall(path, deadline, host, parent, propagateFlags);
- if (credentials) {
- call.setCredentials(credentials);
- }
- return call;
- }
- /**
- * InterceptingCall implementation that directly owns the underlying Call
- * object and handles serialization and deseraizliation.
- */
- class BaseInterceptingCall implements InterceptingCallInterface {
- constructor(
- protected call: Call,
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- protected methodDefinition: ClientMethodDefinition<any, any>
- ) {}
- cancelWithStatus(status: Status, details: string): void {
- this.call.cancelWithStatus(status, details);
- }
- getPeer(): string {
- return this.call.getPeer();
- }
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- sendMessageWithContext(context: MessageContext, message: any): void {
- let serialized: Buffer;
- try {
- serialized = this.methodDefinition.requestSerialize(message);
- } catch (e) {
- this.call.cancelWithStatus(
- Status.INTERNAL,
- `Request message serialization failure: ${getErrorMessage(e)}`
- );
- return;
- }
- this.call.sendMessageWithContext(context, serialized);
- }
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- sendMessage(message: any) {
- this.sendMessageWithContext({}, message);
- }
- start(
- metadata: Metadata,
- interceptingListener?: Partial<InterceptingListener>
- ): void {
- let readError: StatusObject | null = null;
- this.call.start(metadata, {
- onReceiveMetadata: metadata => {
- interceptingListener?.onReceiveMetadata?.(metadata);
- },
- onReceiveMessage: message => {
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- let deserialized: any;
- try {
- deserialized = this.methodDefinition.responseDeserialize(message);
- } catch (e) {
- readError = {
- code: Status.INTERNAL,
- details: `Response message parsing error: ${getErrorMessage(e)}`,
- metadata: new Metadata(),
- };
- this.call.cancelWithStatus(readError.code, readError.details);
- return;
- }
- interceptingListener?.onReceiveMessage?.(deserialized);
- },
- onReceiveStatus: status => {
- if (readError) {
- interceptingListener?.onReceiveStatus?.(readError);
- } else {
- interceptingListener?.onReceiveStatus?.(status);
- }
- },
- });
- }
- startRead() {
- this.call.startRead();
- }
- halfClose(): void {
- this.call.halfClose();
- }
- }
- /**
- * BaseInterceptingCall with special-cased behavior for methods with unary
- * responses.
- */
- class BaseUnaryInterceptingCall
- extends BaseInterceptingCall
- implements InterceptingCallInterface
- {
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- constructor(call: Call, methodDefinition: ClientMethodDefinition<any, any>) {
- super(call, methodDefinition);
- }
- start(metadata: Metadata, listener?: Partial<InterceptingListener>): void {
- let receivedMessage = false;
- const wrapperListener: InterceptingListener = {
- onReceiveMetadata:
- listener?.onReceiveMetadata?.bind(listener) ?? (metadata => {}),
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- onReceiveMessage: (message: any) => {
- receivedMessage = true;
- listener?.onReceiveMessage?.(message);
- },
- onReceiveStatus: (status: StatusObject) => {
- if (!receivedMessage) {
- listener?.onReceiveMessage?.(null);
- }
- listener?.onReceiveStatus?.(status);
- },
- };
- super.start(metadata, wrapperListener);
- this.call.startRead();
- }
- }
- /**
- * BaseInterceptingCall with special-cased behavior for methods with streaming
- * responses.
- */
- class BaseStreamingInterceptingCall
- extends BaseInterceptingCall
- implements InterceptingCallInterface {}
- function getBottomInterceptingCall(
- channel: Channel,
- options: InterceptorOptions,
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- methodDefinition: ClientMethodDefinition<any, any>
- ) {
- const call = getCall(channel, methodDefinition.path, options);
- if (methodDefinition.responseStream) {
- return new BaseStreamingInterceptingCall(call, methodDefinition);
- } else {
- return new BaseUnaryInterceptingCall(call, methodDefinition);
- }
- }
- export interface NextCall {
- (options: InterceptorOptions): InterceptingCallInterface;
- }
- export interface Interceptor {
- (options: InterceptorOptions, nextCall: NextCall): InterceptingCall;
- }
- export interface InterceptorProvider {
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- (methodDefinition: ClientMethodDefinition<any, any>): Interceptor;
- }
- export interface InterceptorArguments {
- clientInterceptors: Interceptor[];
- clientInterceptorProviders: InterceptorProvider[];
- callInterceptors: Interceptor[];
- callInterceptorProviders: InterceptorProvider[];
- }
- export function getInterceptingCall(
- interceptorArgs: InterceptorArguments,
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- methodDefinition: ClientMethodDefinition<any, any>,
- options: CallOptions,
- channel: Channel
- ): InterceptingCallInterface {
- if (
- interceptorArgs.clientInterceptors.length > 0 &&
- interceptorArgs.clientInterceptorProviders.length > 0
- ) {
- throw new InterceptorConfigurationError(
- 'Both interceptors and interceptor_providers were passed as options ' +
- 'to the client constructor. Only one of these is allowed.'
- );
- }
- if (
- interceptorArgs.callInterceptors.length > 0 &&
- interceptorArgs.callInterceptorProviders.length > 0
- ) {
- throw new InterceptorConfigurationError(
- 'Both interceptors and interceptor_providers were passed as call ' +
- 'options. Only one of these is allowed.'
- );
- }
- let interceptors: Interceptor[] = [];
- // Interceptors passed to the call override interceptors passed to the client constructor
- if (
- interceptorArgs.callInterceptors.length > 0 ||
- interceptorArgs.callInterceptorProviders.length > 0
- ) {
- interceptors = ([] as Interceptor[])
- .concat(
- interceptorArgs.callInterceptors,
- interceptorArgs.callInterceptorProviders.map(provider =>
- provider(methodDefinition)
- )
- )
- .filter(interceptor => interceptor);
- // Filter out falsy values when providers return nothing
- } else {
- interceptors = ([] as Interceptor[])
- .concat(
- interceptorArgs.clientInterceptors,
- interceptorArgs.clientInterceptorProviders.map(provider =>
- provider(methodDefinition)
- )
- )
- .filter(interceptor => interceptor);
- // Filter out falsy values when providers return nothing
- }
- const interceptorOptions = Object.assign({}, options, {
- method_definition: methodDefinition,
- });
- /* For each interceptor in the list, the nextCall function passed to it is
- * based on the next interceptor in the list, using a nextCall function
- * constructed with the following interceptor in the list, and so on. The
- * initialValue, which is effectively at the end of the list, is a nextCall
- * function that invokes getBottomInterceptingCall, the result of which
- * handles (de)serialization and also gets the underlying call from the
- * channel. */
- const getCall: NextCall = interceptors.reduceRight<NextCall>(
- (nextCall: NextCall, nextInterceptor: Interceptor) => {
- return currentOptions => nextInterceptor(currentOptions, nextCall);
- },
- (finalOptions: InterceptorOptions) =>
- getBottomInterceptingCall(channel, finalOptions, methodDefinition)
- );
- return getCall(interceptorOptions);
- }
|