1
0

load-balancer-pick-first.ts 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. /*
  2. * Copyright 2019 gRPC authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. */
  17. import {
  18. LoadBalancer,
  19. ChannelControlHelper,
  20. TypedLoadBalancingConfig,
  21. registerDefaultLoadBalancerType,
  22. registerLoadBalancerType,
  23. createChildChannelControlHelper,
  24. } from './load-balancer';
  25. import { ConnectivityState } from './connectivity-state';
  26. import {
  27. QueuePicker,
  28. Picker,
  29. PickArgs,
  30. CompletePickResult,
  31. PickResultType,
  32. UnavailablePicker,
  33. } from './picker';
  34. import { Endpoint, SubchannelAddress, subchannelAddressToString } from './subchannel-address';
  35. import * as logging from './logging';
  36. import { LogVerbosity } from './constants';
  37. import {
  38. SubchannelInterface,
  39. ConnectivityStateListener,
  40. HealthListener,
  41. } from './subchannel-interface';
  42. import { isTcpSubchannelAddress } from './subchannel-address';
  43. import { isIPv6 } from 'net';
  44. import { ChannelOptions } from './channel-options';
  45. import { ChannelCredentials } from './channel-credentials';
  46. const TRACER_NAME = 'pick_first';
  47. function trace(text: string): void {
  48. logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
  49. }
  50. const TYPE_NAME = 'pick_first';
  51. /**
  52. * Delay after starting a connection on a subchannel before starting a
  53. * connection on the next subchannel in the list, for Happy Eyeballs algorithm.
  54. */
  55. const CONNECTION_DELAY_INTERVAL_MS = 250;
  56. export class PickFirstLoadBalancingConfig implements TypedLoadBalancingConfig {
  57. constructor(private readonly shuffleAddressList: boolean) {}
  58. getLoadBalancerName(): string {
  59. return TYPE_NAME;
  60. }
  61. toJsonObject(): object {
  62. return {
  63. [TYPE_NAME]: {
  64. shuffleAddressList: this.shuffleAddressList,
  65. },
  66. };
  67. }
  68. getShuffleAddressList() {
  69. return this.shuffleAddressList;
  70. }
  71. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  72. static createFromJson(obj: any) {
  73. if (
  74. 'shuffleAddressList' in obj &&
  75. !(typeof obj.shuffleAddressList === 'boolean')
  76. ) {
  77. throw new Error(
  78. 'pick_first config field shuffleAddressList must be a boolean if provided'
  79. );
  80. }
  81. return new PickFirstLoadBalancingConfig(obj.shuffleAddressList === true);
  82. }
  83. }
  84. /**
  85. * Picker for a `PickFirstLoadBalancer` in the READY state. Always returns the
  86. * picked subchannel.
  87. */
  88. class PickFirstPicker implements Picker {
  89. constructor(private subchannel: SubchannelInterface) {}
  90. pick(pickArgs: PickArgs): CompletePickResult {
  91. return {
  92. pickResultType: PickResultType.COMPLETE,
  93. subchannel: this.subchannel,
  94. status: null,
  95. onCallStarted: null,
  96. onCallEnded: null,
  97. };
  98. }
  99. }
  100. interface SubchannelChild {
  101. subchannel: SubchannelInterface;
  102. hasReportedTransientFailure: boolean;
  103. }
  104. /**
  105. * Return a new array with the elements of the input array in a random order
  106. * @param list The input array
  107. * @returns A shuffled array of the elements of list
  108. */
  109. export function shuffled<T>(list: T[]): T[] {
  110. const result = list.slice();
  111. for (let i = result.length - 1; i > 1; i--) {
  112. const j = Math.floor(Math.random() * (i + 1));
  113. const temp = result[i];
  114. result[i] = result[j];
  115. result[j] = temp;
  116. }
  117. return result;
  118. }
  119. /**
  120. * Interleave addresses in addressList by family in accordance with RFC-8304 section 4
  121. * @param addressList
  122. * @returns
  123. */
  124. function interleaveAddressFamilies(
  125. addressList: SubchannelAddress[]
  126. ): SubchannelAddress[] {
  127. const result: SubchannelAddress[] = [];
  128. const ipv6Addresses: SubchannelAddress[] = [];
  129. const ipv4Addresses: SubchannelAddress[] = [];
  130. const ipv6First =
  131. isTcpSubchannelAddress(addressList[0]) && isIPv6(addressList[0].host);
  132. for (const address of addressList) {
  133. if (isTcpSubchannelAddress(address) && isIPv6(address.host)) {
  134. ipv6Addresses.push(address);
  135. } else {
  136. ipv4Addresses.push(address);
  137. }
  138. }
  139. const firstList = ipv6First ? ipv6Addresses : ipv4Addresses;
  140. const secondList = ipv6First ? ipv4Addresses : ipv6Addresses;
  141. for (let i = 0; i < Math.max(firstList.length, secondList.length); i++) {
  142. if (i < firstList.length) {
  143. result.push(firstList[i]);
  144. }
  145. if (i < secondList.length) {
  146. result.push(secondList[i]);
  147. }
  148. }
  149. return result;
  150. }
  151. const REPORT_HEALTH_STATUS_OPTION_NAME =
  152. 'grpc-node.internal.pick-first.report_health_status';
  153. export class PickFirstLoadBalancer implements LoadBalancer {
  154. /**
  155. * The list of subchannels this load balancer is currently attempting to
  156. * connect to.
  157. */
  158. private children: SubchannelChild[] = [];
  159. /**
  160. * The current connectivity state of the load balancer.
  161. */
  162. private currentState: ConnectivityState = ConnectivityState.IDLE;
  163. /**
  164. * The index within the `subchannels` array of the subchannel with the most
  165. * recently started connection attempt.
  166. */
  167. private currentSubchannelIndex = 0;
  168. /**
  169. * The currently picked subchannel used for making calls. Populated if
  170. * and only if the load balancer's current state is READY. In that case,
  171. * the subchannel's current state is also READY.
  172. */
  173. private currentPick: SubchannelInterface | null = null;
  174. /**
  175. * Listener callback attached to each subchannel in the `subchannels` list
  176. * while establishing a connection.
  177. */
  178. private subchannelStateListener: ConnectivityStateListener = (
  179. subchannel,
  180. previousState,
  181. newState,
  182. keepaliveTime,
  183. errorMessage
  184. ) => {
  185. this.onSubchannelStateUpdate(
  186. subchannel,
  187. previousState,
  188. newState,
  189. errorMessage
  190. );
  191. };
  192. private pickedSubchannelHealthListener: HealthListener = () =>
  193. this.calculateAndReportNewState();
  194. /**
  195. * Timer reference for the timer tracking when to start
  196. */
  197. private connectionDelayTimeout: NodeJS.Timeout;
  198. /**
  199. * The LB policy enters sticky TRANSIENT_FAILURE mode when all
  200. * subchannels have failed to connect at least once, and it stays in that
  201. * mode until a connection attempt is successful. While in sticky TF mode,
  202. * the LB policy continuously attempts to connect to all of its subchannels.
  203. */
  204. private stickyTransientFailureMode = false;
  205. private reportHealthStatus: boolean;
  206. /**
  207. * The most recent error reported by any subchannel as it transitioned to
  208. * TRANSIENT_FAILURE.
  209. */
  210. private lastError: string | null = null;
  211. private latestAddressList: SubchannelAddress[] | null = null;
  212. /**
  213. * Load balancer that attempts to connect to each backend in the address list
  214. * in order, and picks the first one that connects, using it for every
  215. * request.
  216. * @param channelControlHelper `ChannelControlHelper` instance provided by
  217. * this load balancer's owner.
  218. */
  219. constructor(
  220. private readonly channelControlHelper: ChannelControlHelper,
  221. credentials: ChannelCredentials,
  222. options: ChannelOptions
  223. ) {
  224. this.connectionDelayTimeout = setTimeout(() => {}, 0);
  225. clearTimeout(this.connectionDelayTimeout);
  226. this.reportHealthStatus = options[REPORT_HEALTH_STATUS_OPTION_NAME];
  227. }
  228. private allChildrenHaveReportedTF(): boolean {
  229. return this.children.every(child => child.hasReportedTransientFailure);
  230. }
  231. private resetChildrenReportedTF() {
  232. this.children.every(child => child.hasReportedTransientFailure = false);
  233. }
  234. private calculateAndReportNewState() {
  235. if (this.currentPick) {
  236. if (this.reportHealthStatus && !this.currentPick.isHealthy()) {
  237. this.updateState(
  238. ConnectivityState.TRANSIENT_FAILURE,
  239. new UnavailablePicker({
  240. details: `Picked subchannel ${this.currentPick.getAddress()} is unhealthy`,
  241. })
  242. );
  243. } else {
  244. this.updateState(
  245. ConnectivityState.READY,
  246. new PickFirstPicker(this.currentPick)
  247. );
  248. }
  249. } else if (this.children.length === 0) {
  250. this.updateState(ConnectivityState.IDLE, new QueuePicker(this));
  251. } else {
  252. if (this.stickyTransientFailureMode) {
  253. this.updateState(
  254. ConnectivityState.TRANSIENT_FAILURE,
  255. new UnavailablePicker({
  256. details: `No connection established. Last error: ${this.lastError}`,
  257. })
  258. );
  259. } else {
  260. this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this));
  261. }
  262. }
  263. }
  264. private requestReresolution() {
  265. this.channelControlHelper.requestReresolution();
  266. }
  267. private maybeEnterStickyTransientFailureMode() {
  268. if (!this.allChildrenHaveReportedTF()) {
  269. return;
  270. }
  271. this.requestReresolution();
  272. this.resetChildrenReportedTF();
  273. if (this.stickyTransientFailureMode) {
  274. this.calculateAndReportNewState();
  275. return;
  276. }
  277. this.stickyTransientFailureMode = true;
  278. for (const { subchannel } of this.children) {
  279. subchannel.startConnecting();
  280. }
  281. this.calculateAndReportNewState();
  282. }
  283. private removeCurrentPick() {
  284. if (this.currentPick !== null) {
  285. this.currentPick.removeConnectivityStateListener(this.subchannelStateListener);
  286. this.channelControlHelper.removeChannelzChild(
  287. this.currentPick.getChannelzRef()
  288. );
  289. this.currentPick.removeHealthStateWatcher(
  290. this.pickedSubchannelHealthListener
  291. );
  292. // Unref last, to avoid triggering listeners
  293. this.currentPick.unref();
  294. this.currentPick = null;
  295. }
  296. }
  297. private onSubchannelStateUpdate(
  298. subchannel: SubchannelInterface,
  299. previousState: ConnectivityState,
  300. newState: ConnectivityState,
  301. errorMessage?: string
  302. ) {
  303. if (this.currentPick?.realSubchannelEquals(subchannel)) {
  304. if (newState !== ConnectivityState.READY) {
  305. this.removeCurrentPick();
  306. this.calculateAndReportNewState();
  307. }
  308. return;
  309. }
  310. for (const [index, child] of this.children.entries()) {
  311. if (subchannel.realSubchannelEquals(child.subchannel)) {
  312. if (newState === ConnectivityState.READY) {
  313. this.pickSubchannel(child.subchannel);
  314. }
  315. if (newState === ConnectivityState.TRANSIENT_FAILURE) {
  316. child.hasReportedTransientFailure = true;
  317. if (errorMessage) {
  318. this.lastError = errorMessage;
  319. }
  320. this.maybeEnterStickyTransientFailureMode();
  321. if (index === this.currentSubchannelIndex) {
  322. this.startNextSubchannelConnecting(index + 1);
  323. }
  324. }
  325. child.subchannel.startConnecting();
  326. return;
  327. }
  328. }
  329. }
  330. private startNextSubchannelConnecting(startIndex: number) {
  331. clearTimeout(this.connectionDelayTimeout);
  332. for (const [index, child] of this.children.entries()) {
  333. if (index >= startIndex) {
  334. const subchannelState = child.subchannel.getConnectivityState();
  335. if (
  336. subchannelState === ConnectivityState.IDLE ||
  337. subchannelState === ConnectivityState.CONNECTING
  338. ) {
  339. this.startConnecting(index);
  340. return;
  341. }
  342. }
  343. }
  344. this.maybeEnterStickyTransientFailureMode();
  345. }
  346. /**
  347. * Have a single subchannel in the `subchannels` list start connecting.
  348. * @param subchannelIndex The index into the `subchannels` list.
  349. */
  350. private startConnecting(subchannelIndex: number) {
  351. clearTimeout(this.connectionDelayTimeout);
  352. this.currentSubchannelIndex = subchannelIndex;
  353. if (
  354. this.children[subchannelIndex].subchannel.getConnectivityState() ===
  355. ConnectivityState.IDLE
  356. ) {
  357. trace(
  358. 'Start connecting to subchannel with address ' +
  359. this.children[subchannelIndex].subchannel.getAddress()
  360. );
  361. process.nextTick(() => {
  362. this.children[subchannelIndex]?.subchannel.startConnecting();
  363. });
  364. }
  365. this.connectionDelayTimeout = setTimeout(() => {
  366. this.startNextSubchannelConnecting(subchannelIndex + 1);
  367. }, CONNECTION_DELAY_INTERVAL_MS);
  368. this.connectionDelayTimeout.unref?.();
  369. }
  370. /**
  371. * Declare that the specified subchannel should be used to make requests.
  372. * This functions the same independent of whether subchannel is a member of
  373. * this.children and whether it is equal to this.currentPick.
  374. * Prerequisite: subchannel.getConnectivityState() === READY.
  375. * @param subchannel
  376. */
  377. private pickSubchannel(subchannel: SubchannelInterface) {
  378. trace('Pick subchannel with address ' + subchannel.getAddress());
  379. this.stickyTransientFailureMode = false;
  380. /* Ref before removeCurrentPick and resetSubchannelList to avoid the
  381. * refcount dropping to 0 during this process. */
  382. subchannel.ref();
  383. this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
  384. this.removeCurrentPick();
  385. this.resetSubchannelList();
  386. subchannel.addConnectivityStateListener(this.subchannelStateListener);
  387. subchannel.addHealthStateWatcher(this.pickedSubchannelHealthListener);
  388. this.currentPick = subchannel;
  389. clearTimeout(this.connectionDelayTimeout);
  390. this.calculateAndReportNewState();
  391. }
  392. private updateState(newState: ConnectivityState, picker: Picker) {
  393. trace(
  394. ConnectivityState[this.currentState] +
  395. ' -> ' +
  396. ConnectivityState[newState]
  397. );
  398. this.currentState = newState;
  399. this.channelControlHelper.updateState(newState, picker);
  400. }
  401. private resetSubchannelList() {
  402. for (const child of this.children) {
  403. /* Always remoev the connectivity state listener. If the subchannel is
  404. getting picked, it will be re-added then. */
  405. child.subchannel.removeConnectivityStateListener(
  406. this.subchannelStateListener
  407. );
  408. /* Refs are counted independently for the children list and the
  409. * currentPick, so we call unref whether or not the child is the
  410. * currentPick. Channelz child references are also refcounted, so
  411. * removeChannelzChild can be handled the same way. */
  412. child.subchannel.unref();
  413. this.channelControlHelper.removeChannelzChild(
  414. child.subchannel.getChannelzRef()
  415. );
  416. }
  417. this.currentSubchannelIndex = 0;
  418. this.children = [];
  419. }
  420. private connectToAddressList(addressList: SubchannelAddress[]) {
  421. trace('connectToAddressList([' + addressList.map(address => subchannelAddressToString(address)) + '])');
  422. const newChildrenList = addressList.map(address => ({
  423. subchannel: this.channelControlHelper.createSubchannel(address, {}, null),
  424. hasReportedTransientFailure: false,
  425. }));
  426. for (const { subchannel } of newChildrenList) {
  427. if (subchannel.getConnectivityState() === ConnectivityState.READY) {
  428. this.pickSubchannel(subchannel);
  429. return;
  430. }
  431. }
  432. /* Ref each subchannel before resetting the list, to ensure that
  433. * subchannels shared between the list don't drop to 0 refs during the
  434. * transition. */
  435. for (const { subchannel } of newChildrenList) {
  436. subchannel.ref();
  437. this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
  438. }
  439. this.resetSubchannelList();
  440. this.children = newChildrenList;
  441. for (const { subchannel } of this.children) {
  442. subchannel.addConnectivityStateListener(this.subchannelStateListener);
  443. }
  444. for (const child of this.children) {
  445. if (
  446. child.subchannel.getConnectivityState() ===
  447. ConnectivityState.TRANSIENT_FAILURE
  448. ) {
  449. child.hasReportedTransientFailure = true;
  450. }
  451. }
  452. this.startNextSubchannelConnecting(0);
  453. this.calculateAndReportNewState();
  454. }
  455. updateAddressList(
  456. endpointList: Endpoint[],
  457. lbConfig: TypedLoadBalancingConfig
  458. ): void {
  459. if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) {
  460. return;
  461. }
  462. /* Previously, an update would be discarded if it was identical to the
  463. * previous update, to minimize churn. Now the DNS resolver is
  464. * rate-limited, so that is less of a concern. */
  465. if (lbConfig.getShuffleAddressList()) {
  466. endpointList = shuffled(endpointList);
  467. }
  468. const rawAddressList = ([] as SubchannelAddress[]).concat(
  469. ...endpointList.map(endpoint => endpoint.addresses)
  470. );
  471. trace('updateAddressList([' + rawAddressList.map(address => subchannelAddressToString(address)) + '])');
  472. if (rawAddressList.length === 0) {
  473. throw new Error('No addresses in endpoint list passed to pick_first');
  474. }
  475. const addressList = interleaveAddressFamilies(rawAddressList);
  476. this.latestAddressList = addressList;
  477. this.connectToAddressList(addressList);
  478. }
  479. exitIdle() {
  480. if (
  481. this.currentState === ConnectivityState.IDLE &&
  482. this.latestAddressList
  483. ) {
  484. this.connectToAddressList(this.latestAddressList);
  485. }
  486. }
  487. resetBackoff() {
  488. /* The pick first load balancer does not have a connection backoff, so this
  489. * does nothing */
  490. }
  491. destroy() {
  492. this.resetSubchannelList();
  493. this.removeCurrentPick();
  494. }
  495. getTypeName(): string {
  496. return TYPE_NAME;
  497. }
  498. }
  499. const LEAF_CONFIG = new PickFirstLoadBalancingConfig(false);
  500. /**
  501. * This class handles the leaf load balancing operations for a single endpoint.
  502. * It is a thin wrapper around a PickFirstLoadBalancer with a different API
  503. * that more closely reflects how it will be used as a leaf balancer.
  504. */
  505. export class LeafLoadBalancer {
  506. private pickFirstBalancer: PickFirstLoadBalancer;
  507. private latestState: ConnectivityState = ConnectivityState.IDLE;
  508. private latestPicker: Picker;
  509. constructor(
  510. private endpoint: Endpoint,
  511. channelControlHelper: ChannelControlHelper,
  512. credentials: ChannelCredentials,
  513. options: ChannelOptions
  514. ) {
  515. const childChannelControlHelper = createChildChannelControlHelper(
  516. channelControlHelper,
  517. {
  518. updateState: (connectivityState, picker) => {
  519. this.latestState = connectivityState;
  520. this.latestPicker = picker;
  521. channelControlHelper.updateState(connectivityState, picker);
  522. },
  523. }
  524. );
  525. this.pickFirstBalancer = new PickFirstLoadBalancer(
  526. childChannelControlHelper,
  527. credentials,
  528. { ...options, [REPORT_HEALTH_STATUS_OPTION_NAME]: true }
  529. );
  530. this.latestPicker = new QueuePicker(this.pickFirstBalancer);
  531. }
  532. startConnecting() {
  533. this.pickFirstBalancer.updateAddressList([this.endpoint], LEAF_CONFIG);
  534. }
  535. /**
  536. * Update the endpoint associated with this LeafLoadBalancer to a new
  537. * endpoint. Does not trigger connection establishment if a connection
  538. * attempt is not already in progress.
  539. * @param newEndpoint
  540. */
  541. updateEndpoint(newEndpoint: Endpoint) {
  542. this.endpoint = newEndpoint;
  543. if (this.latestState !== ConnectivityState.IDLE) {
  544. this.startConnecting();
  545. }
  546. }
  547. getConnectivityState() {
  548. return this.latestState;
  549. }
  550. getPicker() {
  551. return this.latestPicker;
  552. }
  553. getEndpoint() {
  554. return this.endpoint;
  555. }
  556. exitIdle() {
  557. this.pickFirstBalancer.exitIdle();
  558. }
  559. destroy() {
  560. this.pickFirstBalancer.destroy();
  561. }
  562. }
  563. export function setup(): void {
  564. registerLoadBalancerType(
  565. TYPE_NAME,
  566. PickFirstLoadBalancer,
  567. PickFirstLoadBalancingConfig
  568. );
  569. registerDefaultLoadBalancerType(TYPE_NAME);
  570. }