call-interface.ts 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. /*
  2. * Copyright 2022 gRPC authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. */
  17. import { CallCredentials } from './call-credentials';
  18. import { Status } from './constants';
  19. import { Deadline } from './deadline';
  20. import { Metadata } from './metadata';
  21. import { ServerSurfaceCall } from './server-call';
  22. export interface CallStreamOptions {
  23. deadline: Deadline;
  24. flags: number;
  25. host: string;
  26. parentCall: ServerSurfaceCall | null;
  27. }
  28. export type PartialCallStreamOptions = Partial<CallStreamOptions>;
  29. export interface StatusObject {
  30. code: Status;
  31. details: string;
  32. metadata: Metadata;
  33. }
  34. export type PartialStatusObject = Pick<StatusObject, 'code' | 'details'> & {
  35. metadata?: Metadata | null | undefined;
  36. };
  37. export const enum WriteFlags {
  38. BufferHint = 1,
  39. NoCompress = 2,
  40. WriteThrough = 4,
  41. }
  42. export interface WriteObject {
  43. message: Buffer;
  44. flags?: number;
  45. }
  46. export interface MetadataListener {
  47. (metadata: Metadata, next: (metadata: Metadata) => void): void;
  48. }
  49. export interface MessageListener {
  50. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  51. (message: any, next: (message: any) => void): void;
  52. }
  53. export interface StatusListener {
  54. (status: StatusObject, next: (status: StatusObject) => void): void;
  55. }
  56. export interface FullListener {
  57. onReceiveMetadata: MetadataListener;
  58. onReceiveMessage: MessageListener;
  59. onReceiveStatus: StatusListener;
  60. }
  61. export type Listener = Partial<FullListener>;
  62. /**
  63. * An object with methods for handling the responses to a call.
  64. */
  65. export interface InterceptingListener {
  66. onReceiveMetadata(metadata: Metadata): void;
  67. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  68. onReceiveMessage(message: any): void;
  69. onReceiveStatus(status: StatusObject): void;
  70. }
  71. export function isInterceptingListener(
  72. listener: Listener | InterceptingListener
  73. ): listener is InterceptingListener {
  74. return (
  75. listener.onReceiveMetadata !== undefined &&
  76. listener.onReceiveMetadata.length === 1
  77. );
  78. }
  79. export class InterceptingListenerImpl implements InterceptingListener {
  80. private processingMetadata = false;
  81. private hasPendingMessage = false;
  82. private pendingMessage: any;
  83. private processingMessage = false;
  84. private pendingStatus: StatusObject | null = null;
  85. constructor(
  86. private listener: FullListener,
  87. private nextListener: InterceptingListener
  88. ) {}
  89. private processPendingMessage() {
  90. if (this.hasPendingMessage) {
  91. this.nextListener.onReceiveMessage(this.pendingMessage);
  92. this.pendingMessage = null;
  93. this.hasPendingMessage = false;
  94. }
  95. }
  96. private processPendingStatus() {
  97. if (this.pendingStatus) {
  98. this.nextListener.onReceiveStatus(this.pendingStatus);
  99. }
  100. }
  101. onReceiveMetadata(metadata: Metadata): void {
  102. this.processingMetadata = true;
  103. this.listener.onReceiveMetadata(metadata, metadata => {
  104. this.processingMetadata = false;
  105. this.nextListener.onReceiveMetadata(metadata);
  106. this.processPendingMessage();
  107. this.processPendingStatus();
  108. });
  109. }
  110. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  111. onReceiveMessage(message: any): void {
  112. /* If this listener processes messages asynchronously, the last message may
  113. * be reordered with respect to the status */
  114. this.processingMessage = true;
  115. this.listener.onReceiveMessage(message, msg => {
  116. this.processingMessage = false;
  117. if (this.processingMetadata) {
  118. this.pendingMessage = msg;
  119. this.hasPendingMessage = true;
  120. } else {
  121. this.nextListener.onReceiveMessage(msg);
  122. this.processPendingStatus();
  123. }
  124. });
  125. }
  126. onReceiveStatus(status: StatusObject): void {
  127. this.listener.onReceiveStatus(status, processedStatus => {
  128. if (this.processingMetadata || this.processingMessage) {
  129. this.pendingStatus = processedStatus;
  130. } else {
  131. this.nextListener.onReceiveStatus(processedStatus);
  132. }
  133. });
  134. }
  135. }
  136. export interface WriteCallback {
  137. (error?: Error | null): void;
  138. }
  139. export interface MessageContext {
  140. callback?: WriteCallback;
  141. flags?: number;
  142. }
  143. export interface Call {
  144. cancelWithStatus(status: Status, details: string): void;
  145. getPeer(): string;
  146. start(metadata: Metadata, listener: InterceptingListener): void;
  147. sendMessageWithContext(context: MessageContext, message: Buffer): void;
  148. startRead(): void;
  149. halfClose(): void;
  150. getCallNumber(): number;
  151. setCredentials(credentials: CallCredentials): void;
  152. }
  153. export interface DeadlineInfoProvider {
  154. getDeadlineInfo(): string[];
  155. }