subscribe.js 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. 'use strict';
  2. Object.defineProperty(exports, '__esModule', {
  3. value: true,
  4. });
  5. exports.createSourceEventStream = createSourceEventStream;
  6. exports.subscribe = subscribe;
  7. var _devAssert = require('../jsutils/devAssert.js');
  8. var _inspect = require('../jsutils/inspect.js');
  9. var _isAsyncIterable = require('../jsutils/isAsyncIterable.js');
  10. var _Path = require('../jsutils/Path.js');
  11. var _GraphQLError = require('../error/GraphQLError.js');
  12. var _locatedError = require('../error/locatedError.js');
  13. var _collectFields = require('./collectFields.js');
  14. var _execute = require('./execute.js');
  15. var _mapAsyncIterator = require('./mapAsyncIterator.js');
  16. var _values = require('./values.js');
  17. /**
  18. * Implements the "Subscribe" algorithm described in the GraphQL specification.
  19. *
  20. * Returns a Promise which resolves to either an AsyncIterator (if successful)
  21. * or an ExecutionResult (error). The promise will be rejected if the schema or
  22. * other arguments to this function are invalid, or if the resolved event stream
  23. * is not an async iterable.
  24. *
  25. * If the client-provided arguments to this function do not result in a
  26. * compliant subscription, a GraphQL Response (ExecutionResult) with
  27. * descriptive errors and no data will be returned.
  28. *
  29. * If the source stream could not be created due to faulty subscription
  30. * resolver logic or underlying systems, the promise will resolve to a single
  31. * ExecutionResult containing `errors` and no `data`.
  32. *
  33. * If the operation succeeded, the promise resolves to an AsyncIterator, which
  34. * yields a stream of ExecutionResults representing the response stream.
  35. *
  36. * Accepts either an object with named arguments, or individual arguments.
  37. */
  38. async function subscribe(args) {
  39. // Temporary for v15 to v16 migration. Remove in v17
  40. arguments.length < 2 ||
  41. (0, _devAssert.devAssert)(
  42. false,
  43. 'graphql@16 dropped long-deprecated support for positional arguments, please pass an object instead.',
  44. );
  45. const resultOrStream = await createSourceEventStream(args);
  46. if (!(0, _isAsyncIterable.isAsyncIterable)(resultOrStream)) {
  47. return resultOrStream;
  48. } // For each payload yielded from a subscription, map it over the normal
  49. // GraphQL `execute` function, with `payload` as the rootValue.
  50. // This implements the "MapSourceToResponseEvent" algorithm described in
  51. // the GraphQL specification. The `execute` function provides the
  52. // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
  53. // "ExecuteQuery" algorithm, for which `execute` is also used.
  54. const mapSourceToResponse = (payload) =>
  55. (0, _execute.execute)({ ...args, rootValue: payload }); // Map every source value to a ExecutionResult value as described above.
  56. return (0, _mapAsyncIterator.mapAsyncIterator)(
  57. resultOrStream,
  58. mapSourceToResponse,
  59. );
  60. }
  61. function toNormalizedArgs(args) {
  62. const firstArg = args[0];
  63. if (firstArg && 'document' in firstArg) {
  64. return firstArg;
  65. }
  66. return {
  67. schema: firstArg,
  68. // FIXME: when underlying TS bug fixed, see https://github.com/microsoft/TypeScript/issues/31613
  69. document: args[1],
  70. rootValue: args[2],
  71. contextValue: args[3],
  72. variableValues: args[4],
  73. operationName: args[5],
  74. subscribeFieldResolver: args[6],
  75. };
  76. }
  77. /**
  78. * Implements the "CreateSourceEventStream" algorithm described in the
  79. * GraphQL specification, resolving the subscription source event stream.
  80. *
  81. * Returns a Promise which resolves to either an AsyncIterable (if successful)
  82. * or an ExecutionResult (error). The promise will be rejected if the schema or
  83. * other arguments to this function are invalid, or if the resolved event stream
  84. * is not an async iterable.
  85. *
  86. * If the client-provided arguments to this function do not result in a
  87. * compliant subscription, a GraphQL Response (ExecutionResult) with
  88. * descriptive errors and no data will be returned.
  89. *
  90. * If the the source stream could not be created due to faulty subscription
  91. * resolver logic or underlying systems, the promise will resolve to a single
  92. * ExecutionResult containing `errors` and no `data`.
  93. *
  94. * If the operation succeeded, the promise resolves to the AsyncIterable for the
  95. * event stream returned by the resolver.
  96. *
  97. * A Source Event Stream represents a sequence of events, each of which triggers
  98. * a GraphQL execution for that event.
  99. *
  100. * This may be useful when hosting the stateful subscription service in a
  101. * different process or machine than the stateless GraphQL execution engine,
  102. * or otherwise separating these two steps. For more on this, see the
  103. * "Supporting Subscriptions at Scale" information in the GraphQL specification.
  104. */
  105. async function createSourceEventStream(...rawArgs) {
  106. const args = toNormalizedArgs(rawArgs);
  107. const { schema, document, variableValues } = args; // If arguments are missing or incorrectly typed, this is an internal
  108. // developer mistake which should throw an early error.
  109. (0, _execute.assertValidExecutionArguments)(schema, document, variableValues); // If a valid execution context cannot be created due to incorrect arguments,
  110. // a "Response" with only errors is returned.
  111. const exeContext = (0, _execute.buildExecutionContext)(args); // Return early errors if execution context failed.
  112. if (!('schema' in exeContext)) {
  113. return {
  114. errors: exeContext,
  115. };
  116. }
  117. try {
  118. const eventStream = await executeSubscription(exeContext); // Assert field returned an event stream, otherwise yield an error.
  119. if (!(0, _isAsyncIterable.isAsyncIterable)(eventStream)) {
  120. throw new Error(
  121. 'Subscription field must return Async Iterable. ' +
  122. `Received: ${(0, _inspect.inspect)(eventStream)}.`,
  123. );
  124. }
  125. return eventStream;
  126. } catch (error) {
  127. // If it GraphQLError, report it as an ExecutionResult, containing only errors and no data.
  128. // Otherwise treat the error as a system-class error and re-throw it.
  129. if (error instanceof _GraphQLError.GraphQLError) {
  130. return {
  131. errors: [error],
  132. };
  133. }
  134. throw error;
  135. }
  136. }
  137. async function executeSubscription(exeContext) {
  138. const { schema, fragments, operation, variableValues, rootValue } =
  139. exeContext;
  140. const rootType = schema.getSubscriptionType();
  141. if (rootType == null) {
  142. throw new _GraphQLError.GraphQLError(
  143. 'Schema is not configured to execute subscription operation.',
  144. {
  145. nodes: operation,
  146. },
  147. );
  148. }
  149. const rootFields = (0, _collectFields.collectFields)(
  150. schema,
  151. fragments,
  152. variableValues,
  153. rootType,
  154. operation.selectionSet,
  155. );
  156. const [responseName, fieldNodes] = [...rootFields.entries()][0];
  157. const fieldDef = (0, _execute.getFieldDef)(schema, rootType, fieldNodes[0]);
  158. if (!fieldDef) {
  159. const fieldName = fieldNodes[0].name.value;
  160. throw new _GraphQLError.GraphQLError(
  161. `The subscription field "${fieldName}" is not defined.`,
  162. {
  163. nodes: fieldNodes,
  164. },
  165. );
  166. }
  167. const path = (0, _Path.addPath)(undefined, responseName, rootType.name);
  168. const info = (0, _execute.buildResolveInfo)(
  169. exeContext,
  170. fieldDef,
  171. fieldNodes,
  172. rootType,
  173. path,
  174. );
  175. try {
  176. var _fieldDef$subscribe;
  177. // Implements the "ResolveFieldEventStream" algorithm from GraphQL specification.
  178. // It differs from "ResolveFieldValue" due to providing a different `resolveFn`.
  179. // Build a JS object of arguments from the field.arguments AST, using the
  180. // variables scope to fulfill any variable references.
  181. const args = (0, _values.getArgumentValues)(
  182. fieldDef,
  183. fieldNodes[0],
  184. variableValues,
  185. ); // The resolve function's optional third argument is a context value that
  186. // is provided to every resolve function within an execution. It is commonly
  187. // used to represent an authenticated user, or request-specific caches.
  188. const contextValue = exeContext.contextValue; // Call the `subscribe()` resolver or the default resolver to produce an
  189. // AsyncIterable yielding raw payloads.
  190. const resolveFn =
  191. (_fieldDef$subscribe = fieldDef.subscribe) !== null &&
  192. _fieldDef$subscribe !== void 0
  193. ? _fieldDef$subscribe
  194. : exeContext.subscribeFieldResolver;
  195. const eventStream = await resolveFn(rootValue, args, contextValue, info);
  196. if (eventStream instanceof Error) {
  197. throw eventStream;
  198. }
  199. return eventStream;
  200. } catch (error) {
  201. throw (0, _locatedError.locatedError)(
  202. error,
  203. fieldNodes,
  204. (0, _Path.pathToArray)(path),
  205. );
  206. }
  207. }