index.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. import retry from 'async-retry';
  2. import { subscribe } from 'graphql';
  3. import fetch from 'node-fetch';
  4. import { ensureError, ensureGraphQLError } from '../../errorNormalize.js';
  5. import { HeaderMap } from '../../utils/HeaderMap.js';
  6. export function ApolloServerPluginSubscriptionCallback(options = Object.create(null)) {
  7. const subscriptionManager = new SubscriptionManager(options);
  8. const logger = options.logger
  9. ? prefixedLogger(options.logger, 'SubscriptionCallback')
  10. : undefined;
  11. return {
  12. async requestDidStart({ request }) {
  13. const subscriptionExtension = request?.extensions?.subscription;
  14. if (!subscriptionExtension)
  15. return;
  16. let { callbackUrl, subscriptionId: id, verifier, heartbeatIntervalMs, } = subscriptionExtension;
  17. callbackUrl = callbackUrl || subscriptionExtension.callback_url;
  18. id = id || subscriptionExtension.subscription_id;
  19. heartbeatIntervalMs =
  20. heartbeatIntervalMs ??
  21. subscriptionExtension.heartbeat_interval_ms ??
  22. 5000;
  23. return {
  24. async responseForOperation() {
  25. logger?.debug('Received new subscription request', id);
  26. return {
  27. http: {
  28. status: 200,
  29. headers: new HeaderMap([['content-type', 'application/json']]),
  30. },
  31. body: {
  32. kind: 'single',
  33. singleResult: {
  34. data: null,
  35. },
  36. },
  37. };
  38. },
  39. async willSendResponse({ request, schema, document, contextValue, operationName, response, }) {
  40. try {
  41. await subscriptionManager.checkRequest({
  42. callbackUrl,
  43. id,
  44. verifier,
  45. });
  46. }
  47. catch (e) {
  48. const graphqlError = ensureGraphQLError(e);
  49. logger?.error(`\`check\` request failed: ${graphqlError.message}`, id);
  50. if (response.body.kind === 'single') {
  51. response.body.singleResult.errors = [graphqlError];
  52. response.http.status = 500;
  53. }
  54. return;
  55. }
  56. subscriptionManager.initHeartbeat({
  57. callbackUrl,
  58. id,
  59. verifier,
  60. heartbeatIntervalMs,
  61. });
  62. logger?.debug(`Starting graphql-js subscription`, id);
  63. let subscription;
  64. try {
  65. subscription = await subscribe({
  66. schema,
  67. document: document,
  68. variableValues: request.variables,
  69. contextValue: contextValue,
  70. operationName: operationName,
  71. });
  72. }
  73. catch (e) {
  74. const graphqlError = ensureGraphQLError(e);
  75. logger?.error(`Programming error: graphql-js subscribe() threw unexpectedly! Please report this bug to Apollo. The error was: ${e}`, id);
  76. subscriptionManager.completeRequest({
  77. errors: [graphqlError],
  78. callbackUrl,
  79. id,
  80. verifier,
  81. });
  82. return;
  83. }
  84. if ('errors' in subscription) {
  85. logger?.error(`graphql-js subscription unsuccessful: [\n\t${subscription.errors
  86. ?.map((e) => e.message)
  87. .join(',\n\t')}\n]`, id);
  88. try {
  89. subscriptionManager.completeRequest({
  90. errors: subscription.errors,
  91. callbackUrl,
  92. id,
  93. verifier,
  94. });
  95. }
  96. catch (e) {
  97. logger?.error(`\`complete\` request failed: ${e}`, id);
  98. }
  99. }
  100. else if (isAsyncIterable(subscription)) {
  101. logger?.debug('graphql-js subscription successful', id);
  102. subscriptionManager.startConsumingSubscription({
  103. subscription,
  104. callbackUrl,
  105. id,
  106. verifier,
  107. });
  108. }
  109. logger?.debug(`Responding to original subscription request`, id);
  110. },
  111. };
  112. },
  113. async serverWillStart() {
  114. return {
  115. async drainServer() {
  116. logger?.debug('Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals');
  117. await subscriptionManager.cleanup();
  118. logger?.debug('Successfully cleaned up outstanding subscriptions and heartbeat intervals.');
  119. },
  120. };
  121. },
  122. };
  123. }
  124. function isAsyncIterable(value) {
  125. return value && typeof value[Symbol.asyncIterator] === 'function';
  126. }
  127. class SubscriptionManager {
  128. constructor(options) {
  129. this.requestsInFlight = new Set();
  130. this.subscriptionInfoByCallbackUrl = new Map();
  131. this.maxConsecutiveHeartbeatFailures =
  132. options.maxConsecutiveHeartbeatFailures ?? 5;
  133. this.retryConfig = {
  134. retries: 5,
  135. minTimeout: 100,
  136. maxTimeout: 1000,
  137. ...options.retry,
  138. };
  139. this.logger = options.logger
  140. ? prefixedLogger(options.logger, 'SubscriptionManager')
  141. : undefined;
  142. }
  143. async retryFetch({ url, action, id, verifier, payload, errors, headers, }) {
  144. let response;
  145. try {
  146. const maybeWithErrors = errors?.length ? ` with errors` : '';
  147. this.logger?.debug(`Sending \`${action}\` request to router` + maybeWithErrors, id);
  148. return retry(async (bail) => {
  149. response = fetch(url, {
  150. method: 'POST',
  151. headers: {
  152. 'content-type': 'application/json',
  153. ...headers,
  154. },
  155. body: JSON.stringify({
  156. kind: 'subscription',
  157. action,
  158. id,
  159. verifier,
  160. ...(payload && { payload }),
  161. ...(errors?.length && { errors }),
  162. }),
  163. });
  164. this.requestsInFlight.add(response);
  165. const result = await response;
  166. if (!result.ok) {
  167. if (result.status >= 500) {
  168. throw new Error(`\`${action}\` request failed with unexpected status code: ${result.status}`);
  169. }
  170. else {
  171. if (result.status === 404) {
  172. this.logger?.debug(`\`${action}\` request received 404, terminating subscription`, id);
  173. }
  174. else {
  175. const errMsg = `\`${action}\` request failed with unexpected status code: ${result.status}, terminating subscription`;
  176. this.logger?.debug(errMsg, id);
  177. bail(new Error(errMsg));
  178. }
  179. this.terminateSubscription(id, url);
  180. return result;
  181. }
  182. }
  183. this.logger?.debug(`\`${action}\` request successful`, id);
  184. return result;
  185. }, {
  186. ...this.retryConfig,
  187. onRetry: (e, attempt) => {
  188. this.requestsInFlight.delete(response);
  189. this.logger?.warn(`Retrying \`${action}\` request (attempt ${attempt}) due to error: ${e.message}`, id);
  190. this.retryConfig?.onRetry?.(e, attempt);
  191. },
  192. });
  193. }
  194. finally {
  195. this.requestsInFlight.delete(response);
  196. }
  197. }
  198. async checkRequest({ callbackUrl, id, verifier, }) {
  199. return this.retryFetch({
  200. url: callbackUrl,
  201. action: 'check',
  202. id,
  203. verifier,
  204. headers: { 'subscription-protocol': 'callback/1.0' },
  205. });
  206. }
  207. initHeartbeat({ callbackUrl, id, verifier, heartbeatIntervalMs, }) {
  208. if (!this.subscriptionInfoByCallbackUrl.has(callbackUrl)) {
  209. this.subscriptionInfoByCallbackUrl.set(callbackUrl, {});
  210. }
  211. if (heartbeatIntervalMs === 0) {
  212. this.logger?.debug(`Heartbeat disabled for ${callbackUrl}`, id);
  213. return;
  214. }
  215. this.logger?.debug(`Starting new heartbeat interval for ${callbackUrl}`, id);
  216. let consecutiveHeartbeatFailureCount = 0;
  217. const heartbeatInterval = setInterval(async () => {
  218. let heartbeatRequest;
  219. let resolveHeartbeatPromise;
  220. const heartbeatPromise = new Promise((r) => {
  221. resolveHeartbeatPromise = r;
  222. });
  223. const existingSubscriptionInfo = this.subscriptionInfoByCallbackUrl.get(callbackUrl);
  224. if (!existingSubscriptionInfo?.heartbeat) {
  225. clearInterval(heartbeatInterval);
  226. this.logger?.error(`Programming error: Heartbeat interval unexpectedly missing for ${callbackUrl}. This is probably a bug in Apollo Server.`);
  227. return;
  228. }
  229. const existingHeartbeat = existingSubscriptionInfo.heartbeat;
  230. const { queue } = existingHeartbeat;
  231. queue.push(heartbeatPromise);
  232. if (queue.length > 1) {
  233. const requestBeforeMe = queue[existingHeartbeat?.queue.length - 2];
  234. await requestBeforeMe;
  235. }
  236. try {
  237. this.logger?.debug(`Sending \`check\` request to ${callbackUrl} for ID: ${id}`);
  238. heartbeatRequest = fetch(callbackUrl, {
  239. method: 'POST',
  240. body: JSON.stringify({
  241. kind: 'subscription',
  242. action: 'check',
  243. id,
  244. verifier,
  245. }),
  246. headers: {
  247. 'content-type': 'application/json',
  248. 'subscription-protocol': 'callback/1.0',
  249. },
  250. });
  251. this.requestsInFlight.add(heartbeatRequest);
  252. const result = await heartbeatRequest;
  253. this.logger?.debug(`Heartbeat received response for ID: ${id}`);
  254. if (result.ok) {
  255. this.logger?.debug(`Heartbeat request successful, ID: ${id}`);
  256. }
  257. else if (result.status === 400) {
  258. this.logger?.debug(`Heartbeat request received invalid ID: ${id}`);
  259. this.terminateSubscription(id, callbackUrl);
  260. }
  261. else if (result.status === 404) {
  262. this.logger?.debug(`Heartbeat request received invalid ID: ${id}`);
  263. this.terminateSubscription(id, callbackUrl);
  264. }
  265. else {
  266. throw new Error(`Unexpected status code: ${result.status}`);
  267. }
  268. consecutiveHeartbeatFailureCount = 0;
  269. }
  270. catch (e) {
  271. const err = ensureError(e);
  272. this.logger?.error(`Heartbeat request failed (${++consecutiveHeartbeatFailureCount} consecutive): ${err.message}`, existingHeartbeat.id);
  273. if (consecutiveHeartbeatFailureCount >=
  274. this.maxConsecutiveHeartbeatFailures) {
  275. this.logger?.error(`Heartbeat request failed ${consecutiveHeartbeatFailureCount} times, terminating subscriptions and heartbeat interval: ${err.message}`, existingHeartbeat.id);
  276. this.terminateSubscription(id, callbackUrl);
  277. }
  278. return;
  279. }
  280. finally {
  281. if (heartbeatRequest) {
  282. this.requestsInFlight.delete(heartbeatRequest);
  283. }
  284. existingHeartbeat?.queue.shift();
  285. resolveHeartbeatPromise();
  286. }
  287. }, heartbeatIntervalMs);
  288. const subscriptionInfo = this.subscriptionInfoByCallbackUrl.get(callbackUrl);
  289. subscriptionInfo.heartbeat = {
  290. interval: heartbeatInterval,
  291. id,
  292. verifier,
  293. queue: [],
  294. };
  295. }
  296. terminateSubscription(id, callbackUrl) {
  297. this.logger?.debug(`Terminating subscriptions for ID: ${id}`);
  298. const subscriptionInfo = this.subscriptionInfoByCallbackUrl.get(callbackUrl);
  299. if (!subscriptionInfo) {
  300. this.logger?.error(`No subscriptions found for ${callbackUrl}, skipping termination`);
  301. return;
  302. }
  303. const { subscription, heartbeat } = subscriptionInfo;
  304. if (subscription) {
  305. subscription.cancelled = true;
  306. subscription.asyncIter?.return();
  307. }
  308. if (heartbeat) {
  309. this.logger?.debug(`Terminating heartbeat interval for ${callbackUrl}`);
  310. clearInterval(heartbeat.interval);
  311. }
  312. this.subscriptionInfoByCallbackUrl.delete(callbackUrl);
  313. }
  314. startConsumingSubscription({ subscription, callbackUrl, id, verifier, }) {
  315. const self = this;
  316. const subscriptionObject = {
  317. asyncIter: subscription,
  318. cancelled: false,
  319. async startConsumingSubscription() {
  320. self.logger?.debug(`Listening to graphql-js subscription`, id);
  321. try {
  322. for await (const payload of subscription) {
  323. if (this.cancelled) {
  324. self.logger?.debug(`Subscription already cancelled, ignoring current and future payloads`, id);
  325. return;
  326. }
  327. try {
  328. await self.retryFetch({
  329. url: callbackUrl,
  330. action: 'next',
  331. id,
  332. verifier,
  333. payload,
  334. });
  335. }
  336. catch (e) {
  337. const originalError = ensureError(e);
  338. self.logger?.error(`\`next\` request failed, terminating subscription: ${originalError.message}`, id);
  339. self.terminateSubscription(id, callbackUrl);
  340. }
  341. }
  342. self.logger?.debug(`Subscription completed without errors`, id);
  343. await this.completeSubscription();
  344. }
  345. catch (e) {
  346. const error = ensureGraphQLError(e);
  347. self.logger?.error(`Generator threw an error, terminating subscription: ${error.message}`, id);
  348. this.completeSubscription([error]);
  349. }
  350. },
  351. async completeSubscription(errors) {
  352. if (this.cancelled)
  353. return;
  354. this.cancelled = true;
  355. try {
  356. await self.completeRequest({
  357. callbackUrl,
  358. id,
  359. verifier,
  360. ...(errors && { errors }),
  361. });
  362. }
  363. catch (e) {
  364. const error = ensureError(e);
  365. self.logger?.error(`\`complete\` request failed: ${error.message}`, id);
  366. }
  367. finally {
  368. self.terminateSubscription(id, callbackUrl);
  369. }
  370. },
  371. };
  372. subscriptionObject.startConsumingSubscription();
  373. const subscriptionInfo = this.subscriptionInfoByCallbackUrl.get(callbackUrl);
  374. if (!subscriptionInfo) {
  375. this.logger?.error(`No existing heartbeat found for ${callbackUrl}, skipping subscription`);
  376. }
  377. else {
  378. subscriptionInfo.subscription = subscriptionObject;
  379. }
  380. }
  381. async completeRequest({ errors, callbackUrl, id, verifier, }) {
  382. return this.retryFetch({
  383. url: callbackUrl,
  384. action: 'complete',
  385. id,
  386. verifier,
  387. errors,
  388. });
  389. }
  390. collectAllSubscriptions() {
  391. return Array.from(this.subscriptionInfoByCallbackUrl.values()).reduce((subscriptions, { subscription }) => {
  392. if (subscription) {
  393. subscriptions.push(subscription);
  394. }
  395. return subscriptions;
  396. }, []);
  397. }
  398. async cleanup() {
  399. await Promise.allSettled(Array.from(this.subscriptionInfoByCallbackUrl.values()).map(async ({ heartbeat }) => {
  400. clearInterval(heartbeat?.interval);
  401. await heartbeat?.queue[heartbeat.queue.length - 1];
  402. }));
  403. await Promise.allSettled(this.collectAllSubscriptions()
  404. .filter((s) => !s.cancelled)
  405. .map((s) => s.completeSubscription()));
  406. await Promise.allSettled(this.requestsInFlight.values());
  407. }
  408. }
  409. function prefixedLogger(logger, prefix) {
  410. function log(level) {
  411. return function (message, id) {
  412. logger[level](`${prefix}${id ? `[${id}]` : ''}: ${message}`);
  413. };
  414. }
  415. return {
  416. debug: log('debug'),
  417. error: log('error'),
  418. info: log('info'),
  419. warn: log('warn'),
  420. };
  421. }
  422. //# sourceMappingURL=index.js.map