index.js 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. "use strict";
  2. var __importDefault = (this && this.__importDefault) || function (mod) {
  3. return (mod && mod.__esModule) ? mod : { "default": mod };
  4. };
  5. Object.defineProperty(exports, "__esModule", { value: true });
  6. exports.ApolloServerPluginSubscriptionCallback = void 0;
  7. const async_retry_1 = __importDefault(require("async-retry"));
  8. const graphql_1 = require("graphql");
  9. const node_fetch_1 = __importDefault(require("node-fetch"));
  10. const errorNormalize_js_1 = require("../../errorNormalize.js");
  11. const HeaderMap_js_1 = require("../../utils/HeaderMap.js");
  12. function ApolloServerPluginSubscriptionCallback(options = Object.create(null)) {
  13. const subscriptionManager = new SubscriptionManager(options);
  14. const logger = options.logger
  15. ? prefixedLogger(options.logger, 'SubscriptionCallback')
  16. : undefined;
  17. return {
  18. async requestDidStart({ request }) {
  19. const subscriptionExtension = request?.extensions?.subscription;
  20. if (!subscriptionExtension)
  21. return;
  22. let { callbackUrl, subscriptionId: id, verifier, heartbeatIntervalMs, } = subscriptionExtension;
  23. callbackUrl = callbackUrl || subscriptionExtension.callback_url;
  24. id = id || subscriptionExtension.subscription_id;
  25. heartbeatIntervalMs =
  26. heartbeatIntervalMs ??
  27. subscriptionExtension.heartbeat_interval_ms ??
  28. 5000;
  29. return {
  30. async responseForOperation() {
  31. logger?.debug('Received new subscription request', id);
  32. return {
  33. http: {
  34. status: 200,
  35. headers: new HeaderMap_js_1.HeaderMap([['content-type', 'application/json']]),
  36. },
  37. body: {
  38. kind: 'single',
  39. singleResult: {
  40. data: null,
  41. },
  42. },
  43. };
  44. },
  45. async willSendResponse({ request, schema, document, contextValue, operationName, response, }) {
  46. try {
  47. await subscriptionManager.checkRequest({
  48. callbackUrl,
  49. id,
  50. verifier,
  51. });
  52. }
  53. catch (e) {
  54. const graphqlError = (0, errorNormalize_js_1.ensureGraphQLError)(e);
  55. logger?.error(`\`check\` request failed: ${graphqlError.message}`, id);
  56. if (response.body.kind === 'single') {
  57. response.body.singleResult.errors = [graphqlError];
  58. response.http.status = 500;
  59. }
  60. return;
  61. }
  62. subscriptionManager.initHeartbeat({
  63. callbackUrl,
  64. id,
  65. verifier,
  66. heartbeatIntervalMs,
  67. });
  68. logger?.debug(`Starting graphql-js subscription`, id);
  69. let subscription;
  70. try {
  71. subscription = await (0, graphql_1.subscribe)({
  72. schema,
  73. document: document,
  74. variableValues: request.variables,
  75. contextValue: contextValue,
  76. operationName: operationName,
  77. });
  78. }
  79. catch (e) {
  80. const graphqlError = (0, errorNormalize_js_1.ensureGraphQLError)(e);
  81. logger?.error(`Programming error: graphql-js subscribe() threw unexpectedly! Please report this bug to Apollo. The error was: ${e}`, id);
  82. subscriptionManager.completeRequest({
  83. errors: [graphqlError],
  84. callbackUrl,
  85. id,
  86. verifier,
  87. });
  88. return;
  89. }
  90. if ('errors' in subscription) {
  91. logger?.error(`graphql-js subscription unsuccessful: [\n\t${subscription.errors
  92. ?.map((e) => e.message)
  93. .join(',\n\t')}\n]`, id);
  94. try {
  95. subscriptionManager.completeRequest({
  96. errors: subscription.errors,
  97. callbackUrl,
  98. id,
  99. verifier,
  100. });
  101. }
  102. catch (e) {
  103. logger?.error(`\`complete\` request failed: ${e}`, id);
  104. }
  105. }
  106. else if (isAsyncIterable(subscription)) {
  107. logger?.debug('graphql-js subscription successful', id);
  108. subscriptionManager.startConsumingSubscription({
  109. subscription,
  110. callbackUrl,
  111. id,
  112. verifier,
  113. });
  114. }
  115. logger?.debug(`Responding to original subscription request`, id);
  116. },
  117. };
  118. },
  119. async serverWillStart() {
  120. return {
  121. async drainServer() {
  122. logger?.debug('Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals');
  123. await subscriptionManager.cleanup();
  124. logger?.debug('Successfully cleaned up outstanding subscriptions and heartbeat intervals.');
  125. },
  126. };
  127. },
  128. };
  129. }
  130. exports.ApolloServerPluginSubscriptionCallback = ApolloServerPluginSubscriptionCallback;
  131. function isAsyncIterable(value) {
  132. return value && typeof value[Symbol.asyncIterator] === 'function';
  133. }
  134. class SubscriptionManager {
  135. constructor(options) {
  136. this.requestsInFlight = new Set();
  137. this.subscriptionInfoByCallbackUrl = new Map();
  138. this.maxConsecutiveHeartbeatFailures =
  139. options.maxConsecutiveHeartbeatFailures ?? 5;
  140. this.retryConfig = {
  141. retries: 5,
  142. minTimeout: 100,
  143. maxTimeout: 1000,
  144. ...options.retry,
  145. };
  146. this.logger = options.logger
  147. ? prefixedLogger(options.logger, 'SubscriptionManager')
  148. : undefined;
  149. }
  150. async retryFetch({ url, action, id, verifier, payload, errors, headers, }) {
  151. let response;
  152. try {
  153. const maybeWithErrors = errors?.length ? ` with errors` : '';
  154. this.logger?.debug(`Sending \`${action}\` request to router` + maybeWithErrors, id);
  155. return (0, async_retry_1.default)(async (bail) => {
  156. response = (0, node_fetch_1.default)(url, {
  157. method: 'POST',
  158. headers: {
  159. 'content-type': 'application/json',
  160. ...headers,
  161. },
  162. body: JSON.stringify({
  163. kind: 'subscription',
  164. action,
  165. id,
  166. verifier,
  167. ...(payload && { payload }),
  168. ...(errors?.length && { errors }),
  169. }),
  170. });
  171. this.requestsInFlight.add(response);
  172. const result = await response;
  173. if (!result.ok) {
  174. if (result.status >= 500) {
  175. throw new Error(`\`${action}\` request failed with unexpected status code: ${result.status}`);
  176. }
  177. else {
  178. if (result.status === 404) {
  179. this.logger?.debug(`\`${action}\` request received 404, terminating subscription`, id);
  180. }
  181. else {
  182. const errMsg = `\`${action}\` request failed with unexpected status code: ${result.status}, terminating subscription`;
  183. this.logger?.debug(errMsg, id);
  184. bail(new Error(errMsg));
  185. }
  186. this.terminateSubscription(id, url);
  187. return result;
  188. }
  189. }
  190. this.logger?.debug(`\`${action}\` request successful`, id);
  191. return result;
  192. }, {
  193. ...this.retryConfig,
  194. onRetry: (e, attempt) => {
  195. this.requestsInFlight.delete(response);
  196. this.logger?.warn(`Retrying \`${action}\` request (attempt ${attempt}) due to error: ${e.message}`, id);
  197. this.retryConfig?.onRetry?.(e, attempt);
  198. },
  199. });
  200. }
  201. finally {
  202. this.requestsInFlight.delete(response);
  203. }
  204. }
  205. async checkRequest({ callbackUrl, id, verifier, }) {
  206. return this.retryFetch({
  207. url: callbackUrl,
  208. action: 'check',
  209. id,
  210. verifier,
  211. headers: { 'subscription-protocol': 'callback/1.0' },
  212. });
  213. }
  214. initHeartbeat({ callbackUrl, id, verifier, heartbeatIntervalMs, }) {
  215. if (!this.subscriptionInfoByCallbackUrl.has(callbackUrl)) {
  216. this.subscriptionInfoByCallbackUrl.set(callbackUrl, {});
  217. }
  218. if (heartbeatIntervalMs === 0) {
  219. this.logger?.debug(`Heartbeat disabled for ${callbackUrl}`, id);
  220. return;
  221. }
  222. this.logger?.debug(`Starting new heartbeat interval for ${callbackUrl}`, id);
  223. let consecutiveHeartbeatFailureCount = 0;
  224. const heartbeatInterval = setInterval(async () => {
  225. let heartbeatRequest;
  226. let resolveHeartbeatPromise;
  227. const heartbeatPromise = new Promise((r) => {
  228. resolveHeartbeatPromise = r;
  229. });
  230. const existingSubscriptionInfo = this.subscriptionInfoByCallbackUrl.get(callbackUrl);
  231. if (!existingSubscriptionInfo?.heartbeat) {
  232. clearInterval(heartbeatInterval);
  233. this.logger?.error(`Programming error: Heartbeat interval unexpectedly missing for ${callbackUrl}. This is probably a bug in Apollo Server.`);
  234. return;
  235. }
  236. const existingHeartbeat = existingSubscriptionInfo.heartbeat;
  237. const { queue } = existingHeartbeat;
  238. queue.push(heartbeatPromise);
  239. if (queue.length > 1) {
  240. const requestBeforeMe = queue[existingHeartbeat?.queue.length - 2];
  241. await requestBeforeMe;
  242. }
  243. try {
  244. this.logger?.debug(`Sending \`check\` request to ${callbackUrl} for ID: ${id}`);
  245. heartbeatRequest = (0, node_fetch_1.default)(callbackUrl, {
  246. method: 'POST',
  247. body: JSON.stringify({
  248. kind: 'subscription',
  249. action: 'check',
  250. id,
  251. verifier,
  252. }),
  253. headers: {
  254. 'content-type': 'application/json',
  255. 'subscription-protocol': 'callback/1.0',
  256. },
  257. });
  258. this.requestsInFlight.add(heartbeatRequest);
  259. const result = await heartbeatRequest;
  260. this.logger?.debug(`Heartbeat received response for ID: ${id}`);
  261. if (result.ok) {
  262. this.logger?.debug(`Heartbeat request successful, ID: ${id}`);
  263. }
  264. else if (result.status === 400) {
  265. this.logger?.debug(`Heartbeat request received invalid ID: ${id}`);
  266. this.terminateSubscription(id, callbackUrl);
  267. }
  268. else if (result.status === 404) {
  269. this.logger?.debug(`Heartbeat request received invalid ID: ${id}`);
  270. this.terminateSubscription(id, callbackUrl);
  271. }
  272. else {
  273. throw new Error(`Unexpected status code: ${result.status}`);
  274. }
  275. consecutiveHeartbeatFailureCount = 0;
  276. }
  277. catch (e) {
  278. const err = (0, errorNormalize_js_1.ensureError)(e);
  279. this.logger?.error(`Heartbeat request failed (${++consecutiveHeartbeatFailureCount} consecutive): ${err.message}`, existingHeartbeat.id);
  280. if (consecutiveHeartbeatFailureCount >=
  281. this.maxConsecutiveHeartbeatFailures) {
  282. this.logger?.error(`Heartbeat request failed ${consecutiveHeartbeatFailureCount} times, terminating subscriptions and heartbeat interval: ${err.message}`, existingHeartbeat.id);
  283. this.terminateSubscription(id, callbackUrl);
  284. }
  285. return;
  286. }
  287. finally {
  288. if (heartbeatRequest) {
  289. this.requestsInFlight.delete(heartbeatRequest);
  290. }
  291. existingHeartbeat?.queue.shift();
  292. resolveHeartbeatPromise();
  293. }
  294. }, heartbeatIntervalMs);
  295. const subscriptionInfo = this.subscriptionInfoByCallbackUrl.get(callbackUrl);
  296. subscriptionInfo.heartbeat = {
  297. interval: heartbeatInterval,
  298. id,
  299. verifier,
  300. queue: [],
  301. };
  302. }
  303. terminateSubscription(id, callbackUrl) {
  304. this.logger?.debug(`Terminating subscriptions for ID: ${id}`);
  305. const subscriptionInfo = this.subscriptionInfoByCallbackUrl.get(callbackUrl);
  306. if (!subscriptionInfo) {
  307. this.logger?.error(`No subscriptions found for ${callbackUrl}, skipping termination`);
  308. return;
  309. }
  310. const { subscription, heartbeat } = subscriptionInfo;
  311. if (subscription) {
  312. subscription.cancelled = true;
  313. subscription.asyncIter?.return();
  314. }
  315. if (heartbeat) {
  316. this.logger?.debug(`Terminating heartbeat interval for ${callbackUrl}`);
  317. clearInterval(heartbeat.interval);
  318. }
  319. this.subscriptionInfoByCallbackUrl.delete(callbackUrl);
  320. }
  321. startConsumingSubscription({ subscription, callbackUrl, id, verifier, }) {
  322. const self = this;
  323. const subscriptionObject = {
  324. asyncIter: subscription,
  325. cancelled: false,
  326. async startConsumingSubscription() {
  327. self.logger?.debug(`Listening to graphql-js subscription`, id);
  328. try {
  329. for await (const payload of subscription) {
  330. if (this.cancelled) {
  331. self.logger?.debug(`Subscription already cancelled, ignoring current and future payloads`, id);
  332. return;
  333. }
  334. try {
  335. await self.retryFetch({
  336. url: callbackUrl,
  337. action: 'next',
  338. id,
  339. verifier,
  340. payload,
  341. });
  342. }
  343. catch (e) {
  344. const originalError = (0, errorNormalize_js_1.ensureError)(e);
  345. self.logger?.error(`\`next\` request failed, terminating subscription: ${originalError.message}`, id);
  346. self.terminateSubscription(id, callbackUrl);
  347. }
  348. }
  349. self.logger?.debug(`Subscription completed without errors`, id);
  350. await this.completeSubscription();
  351. }
  352. catch (e) {
  353. const error = (0, errorNormalize_js_1.ensureGraphQLError)(e);
  354. self.logger?.error(`Generator threw an error, terminating subscription: ${error.message}`, id);
  355. this.completeSubscription([error]);
  356. }
  357. },
  358. async completeSubscription(errors) {
  359. if (this.cancelled)
  360. return;
  361. this.cancelled = true;
  362. try {
  363. await self.completeRequest({
  364. callbackUrl,
  365. id,
  366. verifier,
  367. ...(errors && { errors }),
  368. });
  369. }
  370. catch (e) {
  371. const error = (0, errorNormalize_js_1.ensureError)(e);
  372. self.logger?.error(`\`complete\` request failed: ${error.message}`, id);
  373. }
  374. finally {
  375. self.terminateSubscription(id, callbackUrl);
  376. }
  377. },
  378. };
  379. subscriptionObject.startConsumingSubscription();
  380. const subscriptionInfo = this.subscriptionInfoByCallbackUrl.get(callbackUrl);
  381. if (!subscriptionInfo) {
  382. this.logger?.error(`No existing heartbeat found for ${callbackUrl}, skipping subscription`);
  383. }
  384. else {
  385. subscriptionInfo.subscription = subscriptionObject;
  386. }
  387. }
  388. async completeRequest({ errors, callbackUrl, id, verifier, }) {
  389. return this.retryFetch({
  390. url: callbackUrl,
  391. action: 'complete',
  392. id,
  393. verifier,
  394. errors,
  395. });
  396. }
  397. collectAllSubscriptions() {
  398. return Array.from(this.subscriptionInfoByCallbackUrl.values()).reduce((subscriptions, { subscription }) => {
  399. if (subscription) {
  400. subscriptions.push(subscription);
  401. }
  402. return subscriptions;
  403. }, []);
  404. }
  405. async cleanup() {
  406. await Promise.allSettled(Array.from(this.subscriptionInfoByCallbackUrl.values()).map(async ({ heartbeat }) => {
  407. clearInterval(heartbeat?.interval);
  408. await heartbeat?.queue[heartbeat.queue.length - 1];
  409. }));
  410. await Promise.allSettled(this.collectAllSubscriptions()
  411. .filter((s) => !s.cancelled)
  412. .map((s) => s.completeSubscription()));
  413. await Promise.allSettled(this.requestsInFlight.values());
  414. }
  415. }
  416. function prefixedLogger(logger, prefix) {
  417. function log(level) {
  418. return function (message, id) {
  419. logger[level](`${prefix}${id ? `[${id}]` : ''}: ${message}`);
  420. };
  421. }
  422. return {
  423. debug: log('debug'),
  424. error: log('error'),
  425. info: log('info'),
  426. warn: log('warn'),
  427. };
  428. }
  429. //# sourceMappingURL=index.js.map