Skip to content

Commit cd69998

Browse files
committed
Return underlying AsyncIterators when execute result is returned (#2843)
# Conflicts: # src/execution/execute.ts
1 parent 8859512 commit cd69998

File tree

2 files changed

+231
-3
lines changed

2 files changed

+231
-3
lines changed

src/execution/__tests__/stream-test.ts

+199
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { expect } from 'chai';
22
import { describe, it } from 'mocha';
33

4+
import { invariant } from '../../jsutils/invariant';
45
import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
56
import { parse } from '../../language/parser';
67

@@ -74,6 +75,36 @@ const query = new GraphQLObjectType({
7475
yield await Promise.resolve({});
7576
},
7677
},
78+
asyncIterableListDelayed: {
79+
type: new GraphQLList(friendType),
80+
async *resolve() {
81+
for (const friend of friends) {
82+
// pause an additional ms before yielding to allow time
83+
// for tests to return or throw before next value is processed.
84+
// eslint-disable-next-line no-await-in-loop
85+
await new Promise((r) => setTimeout(r, 1));
86+
yield friend;
87+
}
88+
},
89+
},
90+
asyncIterableListNoReturn: {
91+
type: new GraphQLList(friendType),
92+
resolve() {
93+
let i = 0;
94+
return {
95+
[Symbol.asyncIterator]: () => ({
96+
async next() {
97+
const friend = friends[i++];
98+
if (friend) {
99+
await new Promise((r) => setTimeout(r, 1));
100+
return { value: friend, done: false };
101+
}
102+
return { value: undefined, done: true };
103+
},
104+
}),
105+
};
106+
},
107+
},
77108
asyncIterableListDelayedClose: {
78109
type: new GraphQLList(friendType),
79110
async *resolve() {
@@ -697,4 +728,172 @@ describe('Execute: stream directive', () => {
697728
},
698729
]);
699730
});
731+
it('Returns underlying async iterables when dispatcher is returned', async () => {
732+
const document = parse(`
733+
query {
734+
asyncIterableListDelayed @stream(initialCount: 1) {
735+
name
736+
id
737+
}
738+
}
739+
`);
740+
const schema = new GraphQLSchema({ query });
741+
742+
const executeResult = await execute({ schema, document, rootValue: {} });
743+
invariant(isAsyncIterable(executeResult));
744+
const iterator = executeResult[Symbol.asyncIterator]();
745+
746+
const result1 = await iterator.next();
747+
expect(result1).to.deep.equal({
748+
done: false,
749+
value: {
750+
data: {
751+
asyncIterableListDelayed: [
752+
{
753+
id: '1',
754+
name: 'Luke',
755+
},
756+
],
757+
},
758+
hasNext: true,
759+
},
760+
});
761+
762+
iterator.return?.();
763+
764+
// this result had started processing before return was called
765+
const result2 = await iterator.next();
766+
expect(result2).to.deep.equal({
767+
done: false,
768+
value: {
769+
data: {
770+
id: '2',
771+
name: 'Han',
772+
},
773+
hasNext: true,
774+
path: ['asyncIterableListDelayed', 1],
775+
},
776+
});
777+
778+
// third result is not returned because async iterator has returned
779+
const result3 = await iterator.next();
780+
expect(result3).to.deep.equal({
781+
done: false,
782+
value: {
783+
hasNext: false,
784+
},
785+
});
786+
});
787+
it('Can return async iterable when underlying iterable does not have a return method', async () => {
788+
const document = parse(`
789+
query {
790+
asyncIterableListNoReturn @stream(initialCount: 1) {
791+
name
792+
id
793+
}
794+
}
795+
`);
796+
const schema = new GraphQLSchema({ query });
797+
798+
const executeResult = await execute({ schema, document, rootValue: {} });
799+
invariant(isAsyncIterable(executeResult));
800+
const iterator = executeResult[Symbol.asyncIterator]();
801+
802+
const result1 = await iterator.next();
803+
expect(result1).to.deep.equal({
804+
done: false,
805+
value: {
806+
data: {
807+
asyncIterableListNoReturn: [
808+
{
809+
id: '1',
810+
name: 'Luke',
811+
},
812+
],
813+
},
814+
hasNext: true,
815+
},
816+
});
817+
818+
iterator.return?.();
819+
820+
// this result had started processing before return was called
821+
const result2 = await iterator.next();
822+
expect(result2).to.deep.equal({
823+
done: false,
824+
value: {
825+
data: {
826+
id: '2',
827+
name: 'Han',
828+
},
829+
hasNext: true,
830+
path: ['asyncIterableListNoReturn', 1],
831+
},
832+
});
833+
834+
// third result is not returned because async iterator has returned
835+
const result3 = await iterator.next();
836+
expect(result3).to.deep.equal({
837+
done: false,
838+
value: {
839+
hasNext: false,
840+
},
841+
});
842+
});
843+
it('Returns underlying async iterables when dispatcher is thrown', async () => {
844+
const document = parse(`
845+
query {
846+
asyncIterableListDelayed @stream(initialCount: 1) {
847+
name
848+
id
849+
}
850+
}
851+
`);
852+
const schema = new GraphQLSchema({ query });
853+
854+
const executeResult = await execute({ schema, document, rootValue: {} });
855+
invariant(isAsyncIterable(executeResult));
856+
const iterator = executeResult[Symbol.asyncIterator]();
857+
858+
const result1 = await iterator.next();
859+
expect(result1).to.deep.equal({
860+
done: false,
861+
value: {
862+
data: {
863+
asyncIterableListDelayed: [
864+
{
865+
id: '1',
866+
name: 'Luke',
867+
},
868+
],
869+
},
870+
hasNext: true,
871+
},
872+
});
873+
874+
iterator.throw?.(new Error('bad'));
875+
876+
// this result had started processing before return was called
877+
const result2 = await iterator.next();
878+
expect(result2).to.deep.equal({
879+
done: false,
880+
value: {
881+
data: {
882+
id: '2',
883+
name: 'Han',
884+
},
885+
hasNext: true,
886+
path: ['asyncIterableListDelayed', 1],
887+
},
888+
});
889+
890+
// third result is not returned because async iterator has returned
891+
const result3 = await iterator.next();
892+
expect(result3).to.deep.equal({
893+
done: false,
894+
value: {
895+
hasNext: false,
896+
},
897+
});
898+
});
700899
});

src/execution/execute.ts

+32-3
Original file line numberDiff line numberDiff line change
@@ -1620,10 +1620,14 @@ interface DispatcherResult {
16201620
export class Dispatcher {
16211621
_subsequentPayloads: Array<Promise<IteratorResult<DispatcherResult, void>>>;
16221622
_initialResult?: ExecutionResult;
1623+
_iterators: Array<AsyncIterator<unknown>>;
1624+
_isDone: boolean;
16231625
_hasReturnedInitialResult: boolean;
16241626

16251627
constructor() {
16261628
this._subsequentPayloads = [];
1629+
this._iterators = [];
1630+
this._isDone = false;
16271631
this._hasReturnedInitialResult = false;
16281632
}
16291633

@@ -1692,13 +1696,16 @@ export class Dispatcher {
16921696
label?: string,
16931697
): void {
16941698
const subsequentPayloads = this._subsequentPayloads;
1699+
const iterators = this._iterators;
1700+
iterators.push(iterator);
16951701
function next(index: number) {
16961702
const fieldPath = addPath(path, index, undefined);
16971703
const patchErrors: Array<GraphQLError> = [];
16981704
subsequentPayloads.push(
16991705
iterator.next().then(
17001706
({ value: data, done }) => {
17011707
if (done) {
1708+
iterators.splice(iterators.indexOf(iterator), 1);
17021709
return { value: undefined, done: true };
17031710
}
17041711

@@ -1769,6 +1776,14 @@ export class Dispatcher {
17691776
}
17701777

17711778
_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
1779+
if (this._isDone) {
1780+
return Promise.resolve({
1781+
value: {
1782+
hasNext: false,
1783+
},
1784+
done: false,
1785+
});
1786+
}
17721787
return new Promise<{
17731788
promise: Promise<IteratorResult<DispatcherResult, void>>;
17741789
}>((resolve) => {
@@ -1828,15 +1843,29 @@ export class Dispatcher {
18281843
return this._race();
18291844
}
18301845

1831-
get(
1832-
initialResult: ExecutionResult,
1833-
): AsyncIterableIterator<AsyncExecutionResult> {
1846+
async _return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
1847+
await Promise.all(this._iterators.map((iterator) => iterator.return?.()));
1848+
this._isDone = true;
1849+
return { value: undefined, done: true };
1850+
}
1851+
1852+
async _throw(
1853+
error?: unknown,
1854+
): Promise<IteratorResult<AsyncExecutionResult, void>> {
1855+
await Promise.all(this._iterators.map((iterator) => iterator.return?.()));
1856+
this._isDone = true;
1857+
return Promise.reject(error);
1858+
}
1859+
1860+
get(initialResult: ExecutionResult): AsyncGenerator<AsyncExecutionResult> {
18341861
this._initialResult = initialResult;
18351862
return {
18361863
[Symbol.asyncIterator]() {
18371864
return this;
18381865
},
18391866
next: () => this._next(),
1867+
return: () => this._return(),
1868+
throw: (error?: unknown) => this._throw(error),
18401869
};
18411870
}
18421871
}

0 commit comments

Comments
 (0)