subscribe.mjs 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. import { devAssert } from '../jsutils/devAssert.mjs';
  2. import { inspect } from '../jsutils/inspect.mjs';
  3. import { isAsyncIterable } from '../jsutils/isAsyncIterable.mjs';
  4. import { addPath, pathToArray } from '../jsutils/Path.mjs';
  5. import { GraphQLError } from '../error/GraphQLError.mjs';
  6. import { locatedError } from '../error/locatedError.mjs';
  7. import { collectFields } from './collectFields.mjs';
  8. import {
  9. assertValidExecutionArguments,
  10. buildExecutionContext,
  11. buildResolveInfo,
  12. execute,
  13. getFieldDef,
  14. } from './execute.mjs';
  15. import { mapAsyncIterator } from './mapAsyncIterator.mjs';
  16. import { getArgumentValues } from './values.mjs';
  17. /**
  18. * Implements the "Subscribe" algorithm described in the GraphQL specification.
  19. *
  20. * Returns a Promise which resolves to either an AsyncIterator (if successful)
  21. * or an ExecutionResult (error). The promise will be rejected if the schema or
  22. * other arguments to this function are invalid, or if the resolved event stream
  23. * is not an async iterable.
  24. *
  25. * If the client-provided arguments to this function do not result in a
  26. * compliant subscription, a GraphQL Response (ExecutionResult) with
  27. * descriptive errors and no data will be returned.
  28. *
  29. * If the source stream could not be created due to faulty subscription
  30. * resolver logic or underlying systems, the promise will resolve to a single
  31. * ExecutionResult containing `errors` and no `data`.
  32. *
  33. * If the operation succeeded, the promise resolves to an AsyncIterator, which
  34. * yields a stream of ExecutionResults representing the response stream.
  35. *
  36. * Accepts either an object with named arguments, or individual arguments.
  37. */
  38. export async function subscribe(args) {
  39. // Temporary for v15 to v16 migration. Remove in v17
  40. arguments.length < 2 ||
  41. devAssert(
  42. false,
  43. 'graphql@16 dropped long-deprecated support for positional arguments, please pass an object instead.',
  44. );
  45. const resultOrStream = await createSourceEventStream(args);
  46. if (!isAsyncIterable(resultOrStream)) {
  47. return resultOrStream;
  48. } // For each payload yielded from a subscription, map it over the normal
  49. // GraphQL `execute` function, with `payload` as the rootValue.
  50. // This implements the "MapSourceToResponseEvent" algorithm described in
  51. // the GraphQL specification. The `execute` function provides the
  52. // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
  53. // "ExecuteQuery" algorithm, for which `execute` is also used.
  54. const mapSourceToResponse = (payload) =>
  55. execute({ ...args, rootValue: payload }); // Map every source value to a ExecutionResult value as described above.
  56. return mapAsyncIterator(resultOrStream, mapSourceToResponse);
  57. }
  58. function toNormalizedArgs(args) {
  59. const firstArg = args[0];
  60. if (firstArg && 'document' in firstArg) {
  61. return firstArg;
  62. }
  63. return {
  64. schema: firstArg,
  65. // FIXME: when underlying TS bug fixed, see https://github.com/microsoft/TypeScript/issues/31613
  66. document: args[1],
  67. rootValue: args[2],
  68. contextValue: args[3],
  69. variableValues: args[4],
  70. operationName: args[5],
  71. subscribeFieldResolver: args[6],
  72. };
  73. }
  74. /**
  75. * Implements the "CreateSourceEventStream" algorithm described in the
  76. * GraphQL specification, resolving the subscription source event stream.
  77. *
  78. * Returns a Promise which resolves to either an AsyncIterable (if successful)
  79. * or an ExecutionResult (error). The promise will be rejected if the schema or
  80. * other arguments to this function are invalid, or if the resolved event stream
  81. * is not an async iterable.
  82. *
  83. * If the client-provided arguments to this function do not result in a
  84. * compliant subscription, a GraphQL Response (ExecutionResult) with
  85. * descriptive errors and no data will be returned.
  86. *
  87. * If the the source stream could not be created due to faulty subscription
  88. * resolver logic or underlying systems, the promise will resolve to a single
  89. * ExecutionResult containing `errors` and no `data`.
  90. *
  91. * If the operation succeeded, the promise resolves to the AsyncIterable for the
  92. * event stream returned by the resolver.
  93. *
  94. * A Source Event Stream represents a sequence of events, each of which triggers
  95. * a GraphQL execution for that event.
  96. *
  97. * This may be useful when hosting the stateful subscription service in a
  98. * different process or machine than the stateless GraphQL execution engine,
  99. * or otherwise separating these two steps. For more on this, see the
  100. * "Supporting Subscriptions at Scale" information in the GraphQL specification.
  101. */
  102. export async function createSourceEventStream(...rawArgs) {
  103. const args = toNormalizedArgs(rawArgs);
  104. const { schema, document, variableValues } = args; // If arguments are missing or incorrectly typed, this is an internal
  105. // developer mistake which should throw an early error.
  106. assertValidExecutionArguments(schema, document, variableValues); // If a valid execution context cannot be created due to incorrect arguments,
  107. // a "Response" with only errors is returned.
  108. const exeContext = buildExecutionContext(args); // Return early errors if execution context failed.
  109. if (!('schema' in exeContext)) {
  110. return {
  111. errors: exeContext,
  112. };
  113. }
  114. try {
  115. const eventStream = await executeSubscription(exeContext); // Assert field returned an event stream, otherwise yield an error.
  116. if (!isAsyncIterable(eventStream)) {
  117. throw new Error(
  118. 'Subscription field must return Async Iterable. ' +
  119. `Received: ${inspect(eventStream)}.`,
  120. );
  121. }
  122. return eventStream;
  123. } catch (error) {
  124. // If it GraphQLError, report it as an ExecutionResult, containing only errors and no data.
  125. // Otherwise treat the error as a system-class error and re-throw it.
  126. if (error instanceof GraphQLError) {
  127. return {
  128. errors: [error],
  129. };
  130. }
  131. throw error;
  132. }
  133. }
  134. async function executeSubscription(exeContext) {
  135. const { schema, fragments, operation, variableValues, rootValue } =
  136. exeContext;
  137. const rootType = schema.getSubscriptionType();
  138. if (rootType == null) {
  139. throw new GraphQLError(
  140. 'Schema is not configured to execute subscription operation.',
  141. {
  142. nodes: operation,
  143. },
  144. );
  145. }
  146. const rootFields = collectFields(
  147. schema,
  148. fragments,
  149. variableValues,
  150. rootType,
  151. operation.selectionSet,
  152. );
  153. const [responseName, fieldNodes] = [...rootFields.entries()][0];
  154. const fieldDef = getFieldDef(schema, rootType, fieldNodes[0]);
  155. if (!fieldDef) {
  156. const fieldName = fieldNodes[0].name.value;
  157. throw new GraphQLError(
  158. `The subscription field "${fieldName}" is not defined.`,
  159. {
  160. nodes: fieldNodes,
  161. },
  162. );
  163. }
  164. const path = addPath(undefined, responseName, rootType.name);
  165. const info = buildResolveInfo(
  166. exeContext,
  167. fieldDef,
  168. fieldNodes,
  169. rootType,
  170. path,
  171. );
  172. try {
  173. var _fieldDef$subscribe;
  174. // Implements the "ResolveFieldEventStream" algorithm from GraphQL specification.
  175. // It differs from "ResolveFieldValue" due to providing a different `resolveFn`.
  176. // Build a JS object of arguments from the field.arguments AST, using the
  177. // variables scope to fulfill any variable references.
  178. const args = getArgumentValues(fieldDef, fieldNodes[0], variableValues); // The resolve function's optional third argument is a context value that
  179. // is provided to every resolve function within an execution. It is commonly
  180. // used to represent an authenticated user, or request-specific caches.
  181. const contextValue = exeContext.contextValue; // Call the `subscribe()` resolver or the default resolver to produce an
  182. // AsyncIterable yielding raw payloads.
  183. const resolveFn =
  184. (_fieldDef$subscribe = fieldDef.subscribe) !== null &&
  185. _fieldDef$subscribe !== void 0
  186. ? _fieldDef$subscribe
  187. : exeContext.subscribeFieldResolver;
  188. const eventStream = await resolveFn(rootValue, args, contextValue, info);
  189. if (eventStream instanceof Error) {
  190. throw eventStream;
  191. }
  192. return eventStream;
  193. } catch (error) {
  194. throw locatedError(error, fieldNodes, pathToArray(path));
  195. }
  196. }