subchannel.ts 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. /*
  2. * Copyright 2019 gRPC authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. */
  17. import { ChannelCredentials } from './channel-credentials';
  18. import { Metadata } from './metadata';
  19. import { ChannelOptions } from './channel-options';
  20. import { ConnectivityState } from './connectivity-state';
  21. import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
  22. import * as logging from './logging';
  23. import { LogVerbosity, Status } from './constants';
  24. import { GrpcUri, uriToString } from './uri-parser';
  25. import {
  26. SubchannelAddress,
  27. subchannelAddressToString,
  28. } from './subchannel-address';
  29. import {
  30. SubchannelRef,
  31. ChannelzTrace,
  32. ChannelzChildrenTracker,
  33. ChannelzChildrenTrackerStub,
  34. SubchannelInfo,
  35. registerChannelzSubchannel,
  36. ChannelzCallTracker,
  37. ChannelzCallTrackerStub,
  38. unregisterChannelzRef,
  39. ChannelzTraceStub,
  40. } from './channelz';
  41. import {
  42. ConnectivityStateListener,
  43. SubchannelInterface,
  44. } from './subchannel-interface';
  45. import { SubchannelCallInterceptingListener } from './subchannel-call';
  46. import { SubchannelCall } from './subchannel-call';
  47. import { CallEventTracker, SubchannelConnector, Transport } from './transport';
  48. const TRACER_NAME = 'subchannel';
  49. /* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
  50. * have a constant for the max signed 32 bit integer, so this is a simple way
  51. * to calculate it */
  52. const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
  53. export class Subchannel {
  54. /**
  55. * The subchannel's current connectivity state. Invariant: `session` === `null`
  56. * if and only if `connectivityState` is IDLE or TRANSIENT_FAILURE.
  57. */
  58. private connectivityState: ConnectivityState = ConnectivityState.IDLE;
  59. /**
  60. * The underlying http2 session used to make requests.
  61. */
  62. private transport: Transport | null = null;
  63. /**
  64. * Indicates that the subchannel should transition from TRANSIENT_FAILURE to
  65. * CONNECTING instead of IDLE when the backoff timeout ends.
  66. */
  67. private continueConnecting = false;
  68. /**
  69. * A list of listener functions that will be called whenever the connectivity
  70. * state changes. Will be modified by `addConnectivityStateListener` and
  71. * `removeConnectivityStateListener`
  72. */
  73. private stateListeners: Set<ConnectivityStateListener> = new Set();
  74. private backoffTimeout: BackoffTimeout;
  75. private keepaliveTime: number;
  76. /**
  77. * Tracks channels and subchannel pools with references to this subchannel
  78. */
  79. private refcount = 0;
  80. /**
  81. * A string representation of the subchannel address, for logging/tracing
  82. */
  83. private subchannelAddressString: string;
  84. // Channelz info
  85. private readonly channelzEnabled: boolean = true;
  86. private channelzRef: SubchannelRef;
  87. private channelzTrace: ChannelzTrace | ChannelzTraceStub;
  88. private callTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
  89. private childrenTracker:
  90. | ChannelzChildrenTracker
  91. | ChannelzChildrenTrackerStub;
  92. // Channelz socket info
  93. private streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub;
  94. /**
  95. * A class representing a connection to a single backend.
  96. * @param channelTarget The target string for the channel as a whole
  97. * @param subchannelAddress The address for the backend that this subchannel
  98. * will connect to
  99. * @param options The channel options, plus any specific subchannel options
  100. * for this subchannel
  101. * @param credentials The channel credentials used to establish this
  102. * connection
  103. */
  104. constructor(
  105. private channelTarget: GrpcUri,
  106. private subchannelAddress: SubchannelAddress,
  107. private options: ChannelOptions,
  108. private credentials: ChannelCredentials,
  109. private connector: SubchannelConnector
  110. ) {
  111. const backoffOptions: BackoffOptions = {
  112. initialDelay: options['grpc.initial_reconnect_backoff_ms'],
  113. maxDelay: options['grpc.max_reconnect_backoff_ms'],
  114. };
  115. this.backoffTimeout = new BackoffTimeout(() => {
  116. this.handleBackoffTimer();
  117. }, backoffOptions);
  118. this.backoffTimeout.unref();
  119. this.subchannelAddressString = subchannelAddressToString(subchannelAddress);
  120. this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
  121. if (options['grpc.enable_channelz'] === 0) {
  122. this.channelzEnabled = false;
  123. this.channelzTrace = new ChannelzTraceStub();
  124. this.callTracker = new ChannelzCallTrackerStub();
  125. this.childrenTracker = new ChannelzChildrenTrackerStub();
  126. this.streamTracker = new ChannelzCallTrackerStub();
  127. } else {
  128. this.channelzTrace = new ChannelzTrace();
  129. this.callTracker = new ChannelzCallTracker();
  130. this.childrenTracker = new ChannelzChildrenTracker();
  131. this.streamTracker = new ChannelzCallTracker();
  132. }
  133. this.channelzRef = registerChannelzSubchannel(
  134. this.subchannelAddressString,
  135. () => this.getChannelzInfo(),
  136. this.channelzEnabled
  137. );
  138. this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
  139. this.trace(
  140. 'Subchannel constructed with options ' +
  141. JSON.stringify(options, undefined, 2)
  142. );
  143. credentials._ref();
  144. }
  145. private getChannelzInfo(): SubchannelInfo {
  146. return {
  147. state: this.connectivityState,
  148. trace: this.channelzTrace,
  149. callTracker: this.callTracker,
  150. children: this.childrenTracker.getChildLists(),
  151. target: this.subchannelAddressString,
  152. };
  153. }
  154. private trace(text: string): void {
  155. logging.trace(
  156. LogVerbosity.DEBUG,
  157. TRACER_NAME,
  158. '(' +
  159. this.channelzRef.id +
  160. ') ' +
  161. this.subchannelAddressString +
  162. ' ' +
  163. text
  164. );
  165. }
  166. private refTrace(text: string): void {
  167. logging.trace(
  168. LogVerbosity.DEBUG,
  169. 'subchannel_refcount',
  170. '(' +
  171. this.channelzRef.id +
  172. ') ' +
  173. this.subchannelAddressString +
  174. ' ' +
  175. text
  176. );
  177. }
  178. private handleBackoffTimer() {
  179. if (this.continueConnecting) {
  180. this.transitionToState(
  181. [ConnectivityState.TRANSIENT_FAILURE],
  182. ConnectivityState.CONNECTING
  183. );
  184. } else {
  185. this.transitionToState(
  186. [ConnectivityState.TRANSIENT_FAILURE],
  187. ConnectivityState.IDLE
  188. );
  189. }
  190. }
  191. /**
  192. * Start a backoff timer with the current nextBackoff timeout
  193. */
  194. private startBackoff() {
  195. this.backoffTimeout.runOnce();
  196. }
  197. private stopBackoff() {
  198. this.backoffTimeout.stop();
  199. this.backoffTimeout.reset();
  200. }
  201. private startConnectingInternal() {
  202. let options = this.options;
  203. if (options['grpc.keepalive_time_ms']) {
  204. const adjustedKeepaliveTime = Math.min(
  205. this.keepaliveTime,
  206. KEEPALIVE_MAX_TIME_MS
  207. );
  208. options = { ...options, 'grpc.keepalive_time_ms': adjustedKeepaliveTime };
  209. }
  210. this.connector
  211. .connect(this.subchannelAddress, this.credentials, options)
  212. .then(
  213. transport => {
  214. if (
  215. this.transitionToState(
  216. [ConnectivityState.CONNECTING],
  217. ConnectivityState.READY
  218. )
  219. ) {
  220. this.transport = transport;
  221. if (this.channelzEnabled) {
  222. this.childrenTracker.refChild(transport.getChannelzRef());
  223. }
  224. transport.addDisconnectListener(tooManyPings => {
  225. this.transitionToState(
  226. [ConnectivityState.READY],
  227. ConnectivityState.IDLE
  228. );
  229. if (tooManyPings && this.keepaliveTime > 0) {
  230. this.keepaliveTime *= 2;
  231. logging.log(
  232. LogVerbosity.ERROR,
  233. `Connection to ${uriToString(this.channelTarget)} at ${
  234. this.subchannelAddressString
  235. } rejected by server because of excess pings. Increasing ping interval to ${
  236. this.keepaliveTime
  237. } ms`
  238. );
  239. }
  240. });
  241. } else {
  242. /* If we can't transition from CONNECTING to READY here, we will
  243. * not be using this transport, so release its resources. */
  244. transport.shutdown();
  245. }
  246. },
  247. error => {
  248. this.transitionToState(
  249. [ConnectivityState.CONNECTING],
  250. ConnectivityState.TRANSIENT_FAILURE,
  251. `${error}`
  252. );
  253. }
  254. );
  255. }
  256. /**
  257. * Initiate a state transition from any element of oldStates to the new
  258. * state. If the current connectivityState is not in oldStates, do nothing.
  259. * @param oldStates The set of states to transition from
  260. * @param newState The state to transition to
  261. * @returns True if the state changed, false otherwise
  262. */
  263. private transitionToState(
  264. oldStates: ConnectivityState[],
  265. newState: ConnectivityState,
  266. errorMessage?: string
  267. ): boolean {
  268. if (oldStates.indexOf(this.connectivityState) === -1) {
  269. return false;
  270. }
  271. if (errorMessage) {
  272. this.trace(
  273. ConnectivityState[this.connectivityState] +
  274. ' -> ' +
  275. ConnectivityState[newState] +
  276. ' with error "' + errorMessage + '"'
  277. );
  278. } else {
  279. this.trace(
  280. ConnectivityState[this.connectivityState] +
  281. ' -> ' +
  282. ConnectivityState[newState]
  283. );
  284. }
  285. if (this.channelzEnabled) {
  286. this.channelzTrace.addTrace(
  287. 'CT_INFO',
  288. 'Connectivity state change to ' + ConnectivityState[newState]
  289. );
  290. }
  291. const previousState = this.connectivityState;
  292. this.connectivityState = newState;
  293. switch (newState) {
  294. case ConnectivityState.READY:
  295. this.stopBackoff();
  296. break;
  297. case ConnectivityState.CONNECTING:
  298. this.startBackoff();
  299. this.startConnectingInternal();
  300. this.continueConnecting = false;
  301. break;
  302. case ConnectivityState.TRANSIENT_FAILURE:
  303. if (this.channelzEnabled && this.transport) {
  304. this.childrenTracker.unrefChild(this.transport.getChannelzRef());
  305. }
  306. this.transport?.shutdown();
  307. this.transport = null;
  308. /* If the backoff timer has already ended by the time we get to the
  309. * TRANSIENT_FAILURE state, we want to immediately transition out of
  310. * TRANSIENT_FAILURE as though the backoff timer is ending right now */
  311. if (!this.backoffTimeout.isRunning()) {
  312. process.nextTick(() => {
  313. this.handleBackoffTimer();
  314. });
  315. }
  316. break;
  317. case ConnectivityState.IDLE:
  318. if (this.channelzEnabled && this.transport) {
  319. this.childrenTracker.unrefChild(this.transport.getChannelzRef());
  320. }
  321. this.transport?.shutdown();
  322. this.transport = null;
  323. break;
  324. default:
  325. throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
  326. }
  327. for (const listener of this.stateListeners) {
  328. listener(this, previousState, newState, this.keepaliveTime, errorMessage);
  329. }
  330. return true;
  331. }
  332. ref() {
  333. this.refTrace('refcount ' + this.refcount + ' -> ' + (this.refcount + 1));
  334. this.refcount += 1;
  335. }
  336. unref() {
  337. this.refTrace('refcount ' + this.refcount + ' -> ' + (this.refcount - 1));
  338. this.refcount -= 1;
  339. if (this.refcount === 0) {
  340. this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
  341. unregisterChannelzRef(this.channelzRef);
  342. this.credentials._unref();
  343. process.nextTick(() => {
  344. this.transitionToState(
  345. [ConnectivityState.CONNECTING, ConnectivityState.READY],
  346. ConnectivityState.IDLE
  347. );
  348. });
  349. }
  350. }
  351. unrefIfOneRef(): boolean {
  352. if (this.refcount === 1) {
  353. this.unref();
  354. return true;
  355. }
  356. return false;
  357. }
  358. createCall(
  359. metadata: Metadata,
  360. host: string,
  361. method: string,
  362. listener: SubchannelCallInterceptingListener
  363. ): SubchannelCall {
  364. if (!this.transport) {
  365. throw new Error('Cannot create call, subchannel not READY');
  366. }
  367. let statsTracker: Partial<CallEventTracker>;
  368. if (this.channelzEnabled) {
  369. this.callTracker.addCallStarted();
  370. this.streamTracker.addCallStarted();
  371. statsTracker = {
  372. onCallEnd: status => {
  373. if (status.code === Status.OK) {
  374. this.callTracker.addCallSucceeded();
  375. } else {
  376. this.callTracker.addCallFailed();
  377. }
  378. },
  379. };
  380. } else {
  381. statsTracker = {};
  382. }
  383. return this.transport.createCall(
  384. metadata,
  385. host,
  386. method,
  387. listener,
  388. statsTracker
  389. );
  390. }
  391. /**
  392. * If the subchannel is currently IDLE, start connecting and switch to the
  393. * CONNECTING state. If the subchannel is current in TRANSIENT_FAILURE,
  394. * the next time it would transition to IDLE, start connecting again instead.
  395. * Otherwise, do nothing.
  396. */
  397. startConnecting() {
  398. process.nextTick(() => {
  399. /* First, try to transition from IDLE to connecting. If that doesn't happen
  400. * because the state is not currently IDLE, check if it is
  401. * TRANSIENT_FAILURE, and if so indicate that it should go back to
  402. * connecting after the backoff timer ends. Otherwise do nothing */
  403. if (
  404. !this.transitionToState(
  405. [ConnectivityState.IDLE],
  406. ConnectivityState.CONNECTING
  407. )
  408. ) {
  409. if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
  410. this.continueConnecting = true;
  411. }
  412. }
  413. });
  414. }
  415. /**
  416. * Get the subchannel's current connectivity state.
  417. */
  418. getConnectivityState() {
  419. return this.connectivityState;
  420. }
  421. /**
  422. * Add a listener function to be called whenever the subchannel's
  423. * connectivity state changes.
  424. * @param listener
  425. */
  426. addConnectivityStateListener(listener: ConnectivityStateListener) {
  427. this.stateListeners.add(listener);
  428. }
  429. /**
  430. * Remove a listener previously added with `addConnectivityStateListener`
  431. * @param listener A reference to a function previously passed to
  432. * `addConnectivityStateListener`
  433. */
  434. removeConnectivityStateListener(listener: ConnectivityStateListener) {
  435. this.stateListeners.delete(listener);
  436. }
  437. /**
  438. * Reset the backoff timeout, and immediately start connecting if in backoff.
  439. */
  440. resetBackoff() {
  441. process.nextTick(() => {
  442. this.backoffTimeout.reset();
  443. this.transitionToState(
  444. [ConnectivityState.TRANSIENT_FAILURE],
  445. ConnectivityState.CONNECTING
  446. );
  447. });
  448. }
  449. getAddress(): string {
  450. return this.subchannelAddressString;
  451. }
  452. getChannelzRef(): SubchannelRef {
  453. return this.channelzRef;
  454. }
  455. isHealthy(): boolean {
  456. return true;
  457. }
  458. addHealthStateWatcher(listener: (healthy: boolean) => void): void {
  459. // Do nothing with the listener
  460. }
  461. removeHealthStateWatcher(listener: (healthy: boolean) => void): void {
  462. // Do nothing with the listener
  463. }
  464. getRealSubchannel(): this {
  465. return this;
  466. }
  467. realSubchannelEquals(other: SubchannelInterface): boolean {
  468. return other.getRealSubchannel() === this;
  469. }
  470. throttleKeepalive(newKeepaliveTime: number) {
  471. if (newKeepaliveTime > this.keepaliveTime) {
  472. this.keepaliveTime = newKeepaliveTime;
  473. }
  474. }
  475. }