1
1
import { inspect } from '../jsutils/inspect' ;
2
2
import { isAsyncIterable } from '../jsutils/isAsyncIterable' ;
3
+ import { isPromise } from '../jsutils/isPromise' ;
3
4
import type { Maybe } from '../jsutils/Maybe' ;
4
5
import { addPath , pathToArray } from '../jsutils/Path' ;
6
+ import type { PromiseOrValue } from '../jsutils/PromiseOrValue' ;
5
7
6
8
import { GraphQLError } from '../error/GraphQLError' ;
7
9
import { locatedError } from '../error/locatedError' ;
@@ -47,9 +49,11 @@ import { getArgumentValues } from './values';
47
49
*
48
50
* Accepts either an object with named arguments, or individual arguments.
49
51
*/
50
- export async function subscribe (
52
+ export function subscribe (
51
53
args : ExecutionArgs ,
52
- ) : Promise < AsyncGenerator < ExecutionResult , void , void > | ExecutionResult > {
54
+ ) : PromiseOrValue <
55
+ AsyncGenerator < ExecutionResult , void , void > | ExecutionResult
56
+ > {
53
57
const {
54
58
schema,
55
59
document,
@@ -61,7 +65,7 @@ export async function subscribe(
61
65
subscribeFieldResolver,
62
66
} = args ;
63
67
64
- const resultOrStream = await createSourceEventStream (
68
+ const resultOrStream = createSourceEventStream (
65
69
schema ,
66
70
document ,
67
71
rootValue ,
@@ -71,6 +75,42 @@ export async function subscribe(
71
75
subscribeFieldResolver ,
72
76
) ;
73
77
78
+ if ( isPromise ( resultOrStream ) ) {
79
+ return resultOrStream . then ( ( resolvedResultOrStream ) =>
80
+ mapSourceToResponse (
81
+ schema ,
82
+ document ,
83
+ resolvedResultOrStream ,
84
+ contextValue ,
85
+ variableValues ,
86
+ operationName ,
87
+ fieldResolver ,
88
+ ) ,
89
+ ) ;
90
+ }
91
+
92
+ return mapSourceToResponse (
93
+ schema ,
94
+ document ,
95
+ resultOrStream ,
96
+ contextValue ,
97
+ variableValues ,
98
+ operationName ,
99
+ fieldResolver ,
100
+ ) ;
101
+ }
102
+
103
+ function mapSourceToResponse (
104
+ schema : GraphQLSchema ,
105
+ document : DocumentNode ,
106
+ resultOrStream : ExecutionResult | AsyncIterable < unknown > ,
107
+ contextValue ?: unknown ,
108
+ variableValues ?: Maybe < { readonly [ variable : string ] : unknown } > ,
109
+ operationName ?: Maybe < string > ,
110
+ fieldResolver ?: Maybe < GraphQLFieldResolver < any , any > > ,
111
+ ) : PromiseOrValue <
112
+ AsyncGenerator < ExecutionResult , void , void > | ExecutionResult
113
+ > {
74
114
if ( ! isAsyncIterable ( resultOrStream ) ) {
75
115
return resultOrStream ;
76
116
}
@@ -81,7 +121,7 @@ export async function subscribe(
81
121
// the GraphQL specification. The `execute` function provides the
82
122
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
83
123
// "ExecuteQuery" algorithm, for which `execute` is also used.
84
- const mapSourceToResponse = ( payload : unknown ) =>
124
+ return mapAsyncIterator ( resultOrStream , ( payload : unknown ) =>
85
125
execute ( {
86
126
schema,
87
127
document,
@@ -90,10 +130,8 @@ export async function subscribe(
90
130
variableValues,
91
131
operationName,
92
132
fieldResolver,
93
- } ) ;
94
-
95
- // Map every source value to a ExecutionResult value as described above.
96
- return mapAsyncIterator ( resultOrStream , mapSourceToResponse ) ;
133
+ } ) ,
134
+ ) ;
97
135
}
98
136
99
137
/**
@@ -124,15 +162,15 @@ export async function subscribe(
124
162
* or otherwise separating these two steps. For more on this, see the
125
163
* "Supporting Subscriptions at Scale" information in the GraphQL specification.
126
164
*/
127
- export async function createSourceEventStream (
165
+ export function createSourceEventStream (
128
166
schema : GraphQLSchema ,
129
167
document : DocumentNode ,
130
168
rootValue ?: unknown ,
131
169
contextValue ?: unknown ,
132
170
variableValues ?: Maybe < { readonly [ variable : string ] : unknown } > ,
133
171
operationName ?: Maybe < string > ,
134
172
subscribeFieldResolver ?: Maybe < GraphQLFieldResolver < any , any > > ,
135
- ) : Promise < AsyncIterable < unknown > | ExecutionResult > {
173
+ ) : PromiseOrValue < AsyncIterable < unknown > | ExecutionResult > {
136
174
// If arguments are missing or incorrectly typed, this is an internal
137
175
// developer mistake which should throw an early error.
138
176
assertValidExecutionArguments ( schema , document , variableValues ) ;
@@ -155,17 +193,22 @@ export async function createSourceEventStream(
155
193
}
156
194
157
195
try {
158
- const eventStream = await executeSubscription ( exeContext ) ;
159
-
160
- // Assert field returned an event stream, otherwise yield an error.
161
- if ( ! isAsyncIterable ( eventStream ) ) {
162
- throw new Error (
163
- 'Subscription field must return Async Iterable. ' +
164
- `Received: ${ inspect ( eventStream ) } .` ,
165
- ) ;
196
+ const eventStream = executeSubscription ( exeContext ) ;
197
+
198
+ if ( isPromise ( eventStream ) ) {
199
+ return eventStream
200
+ . then ( ( resolvedEventStream ) => ensureAsyncIterable ( resolvedEventStream ) )
201
+ . then ( undefined , ( error ) => {
202
+ // If it GraphQLError, report it as an ExecutionResult, containing only errors and no data.
203
+ // Otherwise treat the error as a system-class error and re-throw it.
204
+ if ( error instanceof GraphQLError ) {
205
+ return { errors : [ error ] } ;
206
+ }
207
+ throw error ;
208
+ } ) ;
166
209
}
167
210
168
- return eventStream ;
211
+ return ensureAsyncIterable ( eventStream ) ;
169
212
} catch ( error ) {
170
213
// If it GraphQLError, report it as an ExecutionResult, containing only errors and no data.
171
214
// Otherwise treat the error as a system-class error and re-throw it.
@@ -176,9 +219,19 @@ export async function createSourceEventStream(
176
219
}
177
220
}
178
221
179
- async function executeSubscription (
180
- exeContext : ExecutionContext ,
181
- ) : Promise < unknown > {
222
+ function ensureAsyncIterable ( eventStream : unknown ) : AsyncIterable < unknown > {
223
+ // Assert field returned an event stream, otherwise yield an error.
224
+ if ( ! isAsyncIterable ( eventStream ) ) {
225
+ throw new Error (
226
+ 'Subscription field must return Async Iterable. ' +
227
+ `Received: ${ inspect ( eventStream ) } .` ,
228
+ ) ;
229
+ }
230
+
231
+ return eventStream ;
232
+ }
233
+
234
+ function executeSubscription ( exeContext : ExecutionContext ) : unknown {
182
235
const { schema, fragments, operation, variableValues, rootValue } =
183
236
exeContext ;
184
237
@@ -233,13 +286,26 @@ async function executeSubscription(
233
286
// Call the `subscribe()` resolver or the default resolver to produce an
234
287
// AsyncIterable yielding raw payloads.
235
288
const resolveFn = fieldDef . subscribe ?? exeContext . subscribeFieldResolver ;
236
- const eventStream = await resolveFn ( rootValue , args , contextValue , info ) ;
237
289
238
- if ( eventStream instanceof Error ) {
239
- throw eventStream ;
290
+ const eventStream = resolveFn ( rootValue , args , contextValue , info ) ;
291
+
292
+ if ( isPromise ( eventStream ) ) {
293
+ return eventStream
294
+ . then ( ( resolvedEventStream ) => throwReturnedError ( resolvedEventStream ) )
295
+ . then ( undefined , ( error ) => {
296
+ throw locatedError ( error , fieldNodes , pathToArray ( path ) ) ;
297
+ } ) ;
240
298
}
241
- return eventStream ;
299
+
300
+ return throwReturnedError ( eventStream ) ;
242
301
} catch ( error ) {
243
302
throw locatedError ( error , fieldNodes , pathToArray ( path ) ) ;
244
303
}
245
304
}
305
+
306
+ function throwReturnedError ( eventStream : unknown ) : unknown {
307
+ if ( eventStream instanceof Error ) {
308
+ throw eventStream ;
309
+ }
310
+ return eventStream ;
311
+ }
0 commit comments