123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422 |
- import retry from 'async-retry';
- import { subscribe } from 'graphql';
- import fetch from 'node-fetch';
- import { ensureError, ensureGraphQLError } from '../../errorNormalize.js';
- import { HeaderMap } from '../../utils/HeaderMap.js';
- export function ApolloServerPluginSubscriptionCallback(options = Object.create(null)) {
- const subscriptionManager = new SubscriptionManager(options);
- const logger = options.logger
- ? prefixedLogger(options.logger, 'SubscriptionCallback')
- : undefined;
- return {
- async requestDidStart({ request }) {
- const subscriptionExtension = request?.extensions?.subscription;
- if (!subscriptionExtension)
- return;
- let { callbackUrl, subscriptionId: id, verifier, heartbeatIntervalMs, } = subscriptionExtension;
- callbackUrl = callbackUrl || subscriptionExtension.callback_url;
- id = id || subscriptionExtension.subscription_id;
- heartbeatIntervalMs =
- heartbeatIntervalMs ??
- subscriptionExtension.heartbeat_interval_ms ??
- 5000;
- return {
- async responseForOperation() {
- logger?.debug('Received new subscription request', id);
- return {
- http: {
- status: 200,
- headers: new HeaderMap([['content-type', 'application/json']]),
- },
- body: {
- kind: 'single',
- singleResult: {
- data: null,
- },
- },
- };
- },
- async willSendResponse({ request, schema, document, contextValue, operationName, response, }) {
- try {
- await subscriptionManager.checkRequest({
- callbackUrl,
- id,
- verifier,
- });
- }
- catch (e) {
- const graphqlError = ensureGraphQLError(e);
- logger?.error(`\`check\` request failed: ${graphqlError.message}`, id);
- if (response.body.kind === 'single') {
- response.body.singleResult.errors = [graphqlError];
- response.http.status = 500;
- }
- return;
- }
- subscriptionManager.initHeartbeat({
- callbackUrl,
- id,
- verifier,
- heartbeatIntervalMs,
- });
- logger?.debug(`Starting graphql-js subscription`, id);
- let subscription;
- try {
- subscription = await subscribe({
- schema,
- document: document,
- variableValues: request.variables,
- contextValue: contextValue,
- operationName: operationName,
- });
- }
- catch (e) {
- const graphqlError = ensureGraphQLError(e);
- logger?.error(`Programming error: graphql-js subscribe() threw unexpectedly! Please report this bug to Apollo. The error was: ${e}`, id);
- subscriptionManager.completeRequest({
- errors: [graphqlError],
- callbackUrl,
- id,
- verifier,
- });
- return;
- }
- if ('errors' in subscription) {
- logger?.error(`graphql-js subscription unsuccessful: [\n\t${subscription.errors
- ?.map((e) => e.message)
- .join(',\n\t')}\n]`, id);
- try {
- subscriptionManager.completeRequest({
- errors: subscription.errors,
- callbackUrl,
- id,
- verifier,
- });
- }
- catch (e) {
- logger?.error(`\`complete\` request failed: ${e}`, id);
- }
- }
- else if (isAsyncIterable(subscription)) {
- logger?.debug('graphql-js subscription successful', id);
- subscriptionManager.startConsumingSubscription({
- subscription,
- callbackUrl,
- id,
- verifier,
- });
- }
- logger?.debug(`Responding to original subscription request`, id);
- },
- };
- },
- async serverWillStart() {
- return {
- async drainServer() {
- logger?.debug('Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals');
- await subscriptionManager.cleanup();
- logger?.debug('Successfully cleaned up outstanding subscriptions and heartbeat intervals.');
- },
- };
- },
- };
- }
- function isAsyncIterable(value) {
- return value && typeof value[Symbol.asyncIterator] === 'function';
- }
- class SubscriptionManager {
- constructor(options) {
- this.requestsInFlight = new Set();
- this.subscriptionInfoByCallbackUrl = new Map();
- this.maxConsecutiveHeartbeatFailures =
- options.maxConsecutiveHeartbeatFailures ?? 5;
- this.retryConfig = {
- retries: 5,
- minTimeout: 100,
- maxTimeout: 1000,
- ...options.retry,
- };
- this.logger = options.logger
- ? prefixedLogger(options.logger, 'SubscriptionManager')
- : undefined;
- }
- async retryFetch({ url, action, id, verifier, payload, errors, headers, }) {
- let response;
- try {
- const maybeWithErrors = errors?.length ? ` with errors` : '';
- this.logger?.debug(`Sending \`${action}\` request to router` + maybeWithErrors, id);
- return retry(async (bail) => {
- response = fetch(url, {
- method: 'POST',
- headers: {
- 'content-type': 'application/json',
- ...headers,
- },
- body: JSON.stringify({
- kind: 'subscription',
- action,
- id,
- verifier,
- ...(payload && { payload }),
- ...(errors?.length && { errors }),
- }),
- });
- this.requestsInFlight.add(response);
- const result = await response;
- if (!result.ok) {
- if (result.status >= 500) {
- throw new Error(`\`${action}\` request failed with unexpected status code: ${result.status}`);
- }
- else {
- if (result.status === 404) {
- this.logger?.debug(`\`${action}\` request received 404, terminating subscription`, id);
- }
- else {
- const errMsg = `\`${action}\` request failed with unexpected status code: ${result.status}, terminating subscription`;
- this.logger?.debug(errMsg, id);
- bail(new Error(errMsg));
- }
- this.terminateSubscription(id, url);
- return result;
- }
- }
- this.logger?.debug(`\`${action}\` request successful`, id);
- return result;
- }, {
- ...this.retryConfig,
- onRetry: (e, attempt) => {
- this.requestsInFlight.delete(response);
- this.logger?.warn(`Retrying \`${action}\` request (attempt ${attempt}) due to error: ${e.message}`, id);
- this.retryConfig?.onRetry?.(e, attempt);
- },
- });
- }
- finally {
- this.requestsInFlight.delete(response);
- }
- }
- async checkRequest({ callbackUrl, id, verifier, }) {
- return this.retryFetch({
- url: callbackUrl,
- action: 'check',
- id,
- verifier,
- headers: { 'subscription-protocol': 'callback/1.0' },
- });
- }
- initHeartbeat({ callbackUrl, id, verifier, heartbeatIntervalMs, }) {
- if (!this.subscriptionInfoByCallbackUrl.has(callbackUrl)) {
- this.subscriptionInfoByCallbackUrl.set(callbackUrl, {});
- }
- if (heartbeatIntervalMs === 0) {
- this.logger?.debug(`Heartbeat disabled for ${callbackUrl}`, id);
- return;
- }
- this.logger?.debug(`Starting new heartbeat interval for ${callbackUrl}`, id);
- let consecutiveHeartbeatFailureCount = 0;
- const heartbeatInterval = setInterval(async () => {
- let heartbeatRequest;
- let resolveHeartbeatPromise;
- const heartbeatPromise = new Promise((r) => {
- resolveHeartbeatPromise = r;
- });
- const existingSubscriptionInfo = this.subscriptionInfoByCallbackUrl.get(callbackUrl);
- if (!existingSubscriptionInfo?.heartbeat) {
- clearInterval(heartbeatInterval);
- this.logger?.error(`Programming error: Heartbeat interval unexpectedly missing for ${callbackUrl}. This is probably a bug in Apollo Server.`);
- return;
- }
- const existingHeartbeat = existingSubscriptionInfo.heartbeat;
- const { queue } = existingHeartbeat;
- queue.push(heartbeatPromise);
- if (queue.length > 1) {
- const requestBeforeMe = queue[existingHeartbeat?.queue.length - 2];
- await requestBeforeMe;
- }
- try {
- this.logger?.debug(`Sending \`check\` request to ${callbackUrl} for ID: ${id}`);
- heartbeatRequest = fetch(callbackUrl, {
- method: 'POST',
- body: JSON.stringify({
- kind: 'subscription',
- action: 'check',
- id,
- verifier,
- }),
- headers: {
- 'content-type': 'application/json',
- 'subscription-protocol': 'callback/1.0',
- },
- });
- this.requestsInFlight.add(heartbeatRequest);
- const result = await heartbeatRequest;
- this.logger?.debug(`Heartbeat received response for ID: ${id}`);
- if (result.ok) {
- this.logger?.debug(`Heartbeat request successful, ID: ${id}`);
- }
- else if (result.status === 400) {
- this.logger?.debug(`Heartbeat request received invalid ID: ${id}`);
- this.terminateSubscription(id, callbackUrl);
- }
- else if (result.status === 404) {
- this.logger?.debug(`Heartbeat request received invalid ID: ${id}`);
- this.terminateSubscription(id, callbackUrl);
- }
- else {
- throw new Error(`Unexpected status code: ${result.status}`);
- }
- consecutiveHeartbeatFailureCount = 0;
- }
- catch (e) {
- const err = ensureError(e);
- this.logger?.error(`Heartbeat request failed (${++consecutiveHeartbeatFailureCount} consecutive): ${err.message}`, existingHeartbeat.id);
- if (consecutiveHeartbeatFailureCount >=
- this.maxConsecutiveHeartbeatFailures) {
- this.logger?.error(`Heartbeat request failed ${consecutiveHeartbeatFailureCount} times, terminating subscriptions and heartbeat interval: ${err.message}`, existingHeartbeat.id);
- this.terminateSubscription(id, callbackUrl);
- }
- return;
- }
- finally {
- if (heartbeatRequest) {
- this.requestsInFlight.delete(heartbeatRequest);
- }
- existingHeartbeat?.queue.shift();
- resolveHeartbeatPromise();
- }
- }, heartbeatIntervalMs);
- const subscriptionInfo = this.subscriptionInfoByCallbackUrl.get(callbackUrl);
- subscriptionInfo.heartbeat = {
- interval: heartbeatInterval,
- id,
- verifier,
- queue: [],
- };
- }
- terminateSubscription(id, callbackUrl) {
- this.logger?.debug(`Terminating subscriptions for ID: ${id}`);
- const subscriptionInfo = this.subscriptionInfoByCallbackUrl.get(callbackUrl);
- if (!subscriptionInfo) {
- this.logger?.error(`No subscriptions found for ${callbackUrl}, skipping termination`);
- return;
- }
- const { subscription, heartbeat } = subscriptionInfo;
- if (subscription) {
- subscription.cancelled = true;
- subscription.asyncIter?.return();
- }
- if (heartbeat) {
- this.logger?.debug(`Terminating heartbeat interval for ${callbackUrl}`);
- clearInterval(heartbeat.interval);
- }
- this.subscriptionInfoByCallbackUrl.delete(callbackUrl);
- }
- startConsumingSubscription({ subscription, callbackUrl, id, verifier, }) {
- const self = this;
- const subscriptionObject = {
- asyncIter: subscription,
- cancelled: false,
- async startConsumingSubscription() {
- self.logger?.debug(`Listening to graphql-js subscription`, id);
- try {
- for await (const payload of subscription) {
- if (this.cancelled) {
- self.logger?.debug(`Subscription already cancelled, ignoring current and future payloads`, id);
- return;
- }
- try {
- await self.retryFetch({
- url: callbackUrl,
- action: 'next',
- id,
- verifier,
- payload,
- });
- }
- catch (e) {
- const originalError = ensureError(e);
- self.logger?.error(`\`next\` request failed, terminating subscription: ${originalError.message}`, id);
- self.terminateSubscription(id, callbackUrl);
- }
- }
- self.logger?.debug(`Subscription completed without errors`, id);
- await this.completeSubscription();
- }
- catch (e) {
- const error = ensureGraphQLError(e);
- self.logger?.error(`Generator threw an error, terminating subscription: ${error.message}`, id);
- this.completeSubscription([error]);
- }
- },
- async completeSubscription(errors) {
- if (this.cancelled)
- return;
- this.cancelled = true;
- try {
- await self.completeRequest({
- callbackUrl,
- id,
- verifier,
- ...(errors && { errors }),
- });
- }
- catch (e) {
- const error = ensureError(e);
- self.logger?.error(`\`complete\` request failed: ${error.message}`, id);
- }
- finally {
- self.terminateSubscription(id, callbackUrl);
- }
- },
- };
- subscriptionObject.startConsumingSubscription();
- const subscriptionInfo = this.subscriptionInfoByCallbackUrl.get(callbackUrl);
- if (!subscriptionInfo) {
- this.logger?.error(`No existing heartbeat found for ${callbackUrl}, skipping subscription`);
- }
- else {
- subscriptionInfo.subscription = subscriptionObject;
- }
- }
- async completeRequest({ errors, callbackUrl, id, verifier, }) {
- return this.retryFetch({
- url: callbackUrl,
- action: 'complete',
- id,
- verifier,
- errors,
- });
- }
- collectAllSubscriptions() {
- return Array.from(this.subscriptionInfoByCallbackUrl.values()).reduce((subscriptions, { subscription }) => {
- if (subscription) {
- subscriptions.push(subscription);
- }
- return subscriptions;
- }, []);
- }
- async cleanup() {
- await Promise.allSettled(Array.from(this.subscriptionInfoByCallbackUrl.values()).map(async ({ heartbeat }) => {
- clearInterval(heartbeat?.interval);
- await heartbeat?.queue[heartbeat.queue.length - 1];
- }));
- await Promise.allSettled(this.collectAllSubscriptions()
- .filter((s) => !s.cancelled)
- .map((s) => s.completeSubscription()));
- await Promise.allSettled(this.requestsInFlight.values());
- }
- }
- function prefixedLogger(logger, prefix) {
- function log(level) {
- return function (message, id) {
- logger[level](`${prefix}${id ? `[${id}]` : ''}: ${message}`);
- };
- }
- return {
- debug: log('debug'),
- error: log('error'),
- info: log('info'),
- warn: log('warn'),
- };
- }
- //# sourceMappingURL=index.js.map
|