Skip to content

Commit 82fe341

Browse files
committed
add pending changes from graphql-js
1 parent 6aa94b7 commit 82fe341

3 files changed

Lines changed: 89 additions & 110 deletions

File tree

packages/executor/src/execution/IncrementalGraph.ts

Lines changed: 29 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ export class IncrementalGraph {
2222
private _rootNodes: Set<SubsequentResultRecord>;
2323

2424
private _completedQueue: Array<IncrementalDataRecordResult>;
25-
private _nextQueue: Array<
26-
(iterable: IteratorResult<Iterable<IncrementalDataRecordResult>>) => void
27-
>;
25+
private _nextQueue: Array<(iterable: Iterable<IncrementalDataRecordResult> | undefined) => void>;
2826

2927
constructor() {
3028
this._rootNodes = new Set();
@@ -60,46 +58,30 @@ export class IncrementalGraph {
6058
}
6159
}
6260

63-
currentCompletedIncrementalData() {
64-
return {
65-
[Symbol.iterator]() {
66-
return this;
67-
},
68-
next: (): IteratorResult<IncrementalDataRecordResult> => {
69-
const value = this._completedQueue.shift();
70-
if (value !== undefined) {
71-
return { value, done: false };
72-
}
73-
return { value: undefined, done: true };
74-
},
75-
};
61+
*currentCompletedBatch(): Generator<IncrementalDataRecordResult> {
62+
let completed;
63+
while ((completed = this._completedQueue.shift()) !== undefined) {
64+
yield completed;
65+
}
66+
if (this._rootNodes.size === 0) {
67+
for (const resolve of this._nextQueue) {
68+
resolve(undefined);
69+
}
70+
}
7671
}
7772

78-
completedIncrementalData() {
79-
return {
80-
[Symbol.asyncIterator]() {
81-
return this;
82-
},
83-
next: (): Promise<IteratorResult<Iterable<IncrementalDataRecordResult>>> => {
84-
const firstResult = this._completedQueue.shift();
85-
if (firstResult !== undefined) {
86-
return Promise.resolve({
87-
value: this._yieldCurrentCompletedIncrementalData(firstResult),
88-
done: false,
89-
});
90-
}
91-
const { promise, resolve } =
92-
createDeferred<IteratorResult<Iterable<IncrementalDataRecordResult>>>();
93-
this._nextQueue.push(resolve);
94-
return promise;
95-
},
96-
return: (): Promise<IteratorResult<Iterable<IncrementalDataRecordResult>>> => {
97-
for (const resolve of this._nextQueue) {
98-
resolve({ value: undefined, done: true });
99-
}
100-
return Promise.resolve({ value: undefined, done: true });
101-
},
102-
};
73+
nextCompletedBatch(): Promise<Iterable<IncrementalDataRecordResult> | undefined> {
74+
const { promise, resolve } = createDeferred<
75+
Iterable<IncrementalDataRecordResult> | undefined
76+
>();
77+
this._nextQueue.push(resolve);
78+
return promise;
79+
}
80+
81+
abort(): void {
82+
for (const resolve of this._nextQueue) {
83+
resolve(undefined);
84+
}
10385
}
10486

10587
hasNext(): boolean {
@@ -144,11 +126,6 @@ export class IncrementalGraph {
144126

145127
private _removePending(subsequentResultRecord: SubsequentResultRecord): void {
146128
this._rootNodes.delete(subsequentResultRecord);
147-
if (this._rootNodes.size === 0) {
148-
for (const resolve of this._nextQueue) {
149-
resolve({ value: undefined, done: true });
150-
}
151-
}
152129
}
153130

154131
private _addIncrementalDataRecords(
@@ -312,15 +289,17 @@ export class IncrementalGraph {
312289
while ((completed = this._completedQueue.shift()) !== undefined) {
313290
yield completed;
314291
}
292+
if (this._rootNodes.size === 0) {
293+
for (const resolve of this._nextQueue) {
294+
resolve(undefined);
295+
}
296+
}
315297
}
316298

317299
private _enqueue(completed: IncrementalDataRecordResult): void {
318300
const next = this._nextQueue.shift();
319301
if (next !== undefined) {
320-
next({
321-
value: this._yieldCurrentCompletedIncrementalData(completed),
322-
done: false,
323-
});
302+
next(this._yieldCurrentCompletedIncrementalData(completed));
324303
return;
325304
}
326305
this._completedQueue.push(completed);

packages/executor/src/execution/IncrementalPublisher.ts

Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,19 @@ interface SubsequentIncrementalExecutionResultContext<TData = any> {
4848
completed: Array<CompletedResult>;
4949
}
5050

51+
/**
52+
* The IncrementalPublisherState Enum tracks the state of the IncrementalPublisher, which is initialized to
53+
* "Started". When there are no more incremental results to publish, the state is set to "Completed". On the
54+
* next call to next, clean-up is potentially performed and the state is set to "Finished".
55+
*
56+
* If the IncrementalPublisher is ended early, it may be advanced directly from "Started" to "Finished".
57+
*/
58+
enum IncrementalPublisherState {
59+
Started = 1,
60+
Completed = 2,
61+
Finished = 3,
62+
}
63+
5164
/**
5265
* This class is used to publish incremental results to the client, enabling semi-concurrent
5366
* execution while preserving result order.
@@ -113,18 +126,32 @@ class IncrementalPublisher {
113126
void,
114127
void
115128
> {
116-
let isDone = false;
129+
let incrementalPublisherState: IncrementalPublisherState = IncrementalPublisherState.Started;
130+
131+
const _finish = async (): Promise<void> => {
132+
incrementalPublisherState = IncrementalPublisherState.Finished;
133+
this._incrementalGraph.abort();
134+
await this._returnAsyncIterators();
135+
};
117136

118137
this._context.signal?.addEventListener('abort', () => {
119-
this._incrementalGraph.completedIncrementalData().return();
138+
this._incrementalGraph.abort();
120139
});
121140

122141
const _next = async (): Promise<
123142
IteratorResult<SubsequentIncrementalExecutionResult<TData>, void>
124143
> => {
125-
if (isDone) {
126-
await this._returnAsyncIteratorsIgnoringErrors();
127-
return { value: undefined, done: true };
144+
switch (incrementalPublisherState) {
145+
case IncrementalPublisherState.Finished: {
146+
return { value: undefined, done: true };
147+
}
148+
case IncrementalPublisherState.Completed: {
149+
await _finish();
150+
return { value: undefined, done: true };
151+
}
152+
case IncrementalPublisherState.Started: {
153+
// continue
154+
}
128155
}
129156

130157
const context: SubsequentIncrementalExecutionResultContext<TData> = {
@@ -133,12 +160,10 @@ class IncrementalPublisher {
133160
completed: [],
134161
};
135162

136-
let currentCompletedIncrementalData =
137-
this._incrementalGraph.currentCompletedIncrementalData();
138-
const completedIncrementalData = this._incrementalGraph.completedIncrementalData();
139-
const asyncIterator = completedIncrementalData[Symbol.asyncIterator]();
163+
let batch: Iterable<IncrementalDataRecordResult> | undefined =
164+
this._incrementalGraph.currentCompletedBatch();
140165
do {
141-
for (const completedResult of currentCompletedIncrementalData) {
166+
for (const completedResult of batch) {
142167
this._handleCompletedIncrementalData(completedResult, context);
143168
}
144169

@@ -147,7 +172,7 @@ class IncrementalPublisher {
147172
const hasNext = this._incrementalGraph.hasNext();
148173

149174
if (!hasNext) {
150-
isDone = true;
175+
incrementalPublisherState = IncrementalPublisherState.Completed;
151176
}
152177

153178
const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult<TData> =
@@ -169,31 +194,27 @@ class IncrementalPublisher {
169194
return { value: subsequentIncrementalExecutionResult, done: false };
170195
}
171196

172-
const iteration = await asyncIterator.next();
173-
currentCompletedIncrementalData = iteration.value;
174-
} while (currentCompletedIncrementalData !== undefined);
197+
batch = await this._incrementalGraph.nextCompletedBatch();
198+
} while (batch !== undefined);
175199

176200
if (this._context.signal?.aborted) {
177201
throw this._context.signal.reason;
178202
}
179203

180-
await this._returnAsyncIteratorsIgnoringErrors();
181204
return { value: undefined, done: true };
182205
};
183206

184207
const _return = async (): Promise<
185208
IteratorResult<SubsequentIncrementalExecutionResult<TData>, void>
186209
> => {
187-
isDone = true;
188-
await this._returnAsyncIterators();
210+
await _finish();
189211
return { value: undefined, done: true };
190212
};
191213

192214
const _throw = async (
193215
error?: unknown,
194216
): Promise<IteratorResult<SubsequentIncrementalExecutionResult<TData>, void>> => {
195-
isDone = true;
196-
await this._returnAsyncIterators();
217+
await _finish();
197218
return Promise.reject(error);
198219
};
199220

@@ -400,7 +421,7 @@ class IncrementalPublisher {
400421
}
401422

402423
private async _returnAsyncIterators(): Promise<void> {
403-
await this._incrementalGraph.completedIncrementalData().return();
424+
await this._incrementalGraph.abort();
404425

405426
const cancellableStreams = this._context.cancellableStreams;
406427
if (cancellableStreams === undefined) {
@@ -414,10 +435,4 @@ class IncrementalPublisher {
414435
}
415436
await Promise.all(promises);
416437
}
417-
418-
private async _returnAsyncIteratorsIgnoringErrors(): Promise<void> {
419-
await this._returnAsyncIterators().catch(() => {
420-
// Ignore errors
421-
});
422-
}
423438
}

packages/executor/src/execution/__tests__/stream-test.ts

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
} from 'graphql';
1111
import { createDeferred, MaybePromise } from '@graphql-tools/utils';
1212
import { expectJSON } from '../../__testUtils__/expectJSON.js';
13+
import { expectPromise } from '../../__testUtils__/expectPromise.js';
1314
import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js';
1415
import { execute } from '../execute.js';
1516
import type {
@@ -1831,7 +1832,7 @@ describe('Execute: stream directive', () => {
18311832
]);
18321833
});
18331834

1834-
it('Returns iterator and ignores errors when stream payloads are filtered', async () => {
1835+
it('Returns iterator and passes through errors when stream payloads are filtered', async () => {
18351836
let returned = false;
18361837
let requested = false;
18371838
const iterable = {
@@ -1854,7 +1855,7 @@ describe('Execute: stream directive', () => {
18541855
},
18551856
return: () => {
18561857
returned = true;
1857-
// Ignores errors from return.
1858+
// This error should be passed through.
18581859
return Promise.reject(new Error('Oops'));
18591860
},
18601861
}),
@@ -1927,8 +1928,8 @@ describe('Execute: stream directive', () => {
19271928
},
19281929
});
19291930

1930-
const result3 = await iterator.next();
1931-
expectJSON(result3).toDeepEqual({ done: true, value: undefined });
1931+
const result3Promise = iterator.next();
1932+
await expectPromise(result3Promise).toRejectWith('Oops');
19321933

19331934
expect(returned).toBeTruthy();
19341935
});
@@ -2371,29 +2372,24 @@ describe('Execute: stream directive', () => {
23712372
});
23722373
it('Returns underlying async iterables when returned generator is returned', async () => {
23732374
let returned = false;
2374-
let index = 0;
23752375
const iterable = {
23762376
[Symbol.asyncIterator]: () => ({
2377-
next: () => {
2378-
const friend = friends[index++];
2379-
if (friend == null) {
2380-
return Promise.resolve({ done: true, value: undefined });
2381-
}
2382-
return Promise.resolve({ done: false, value: friend });
2383-
},
2377+
next: () =>
2378+
new Promise(() => {
2379+
/* never resolves */
2380+
}),
23842381
return: () => {
23852382
returned = true;
2383+
// This error should be passed through.
2384+
return Promise.reject(new Error('Oops'));
23862385
},
23872386
}),
23882387
};
23892388

23902389
const document = parse(/* GraphQL */ `
23912390
query {
2392-
friendList @stream(initialCount: 1) {
2391+
friendList @stream {
23932392
id
2394-
... @defer {
2395-
name
2396-
}
23972393
}
23982394
}
23992395
`);
@@ -2412,26 +2408,21 @@ describe('Execute: stream directive', () => {
24122408
const result1 = executeResult.initialResult;
24132409
expectJSON(result1).toDeepEqual({
24142410
data: {
2415-
friendList: [
2416-
{
2417-
id: '1',
2418-
},
2419-
],
2411+
friendList: [],
24202412
},
2421-
pending: [
2422-
{ id: '0', path: ['friendList', 0] },
2423-
{ id: '1', path: ['friendList'] },
2424-
],
2413+
pending: [{ id: '0', path: ['friendList'] }],
24252414
hasNext: true,
24262415
});
2416+
2417+
const result2Promise = iterator.next();
24272418
const returnPromise = iterator.return();
24282419

2429-
const result2 = await iterator.next();
2420+
const result2 = await result2Promise;
24302421
expectJSON(result2).toDeepEqual({
24312422
done: true,
24322423
value: undefined,
24332424
});
2434-
await returnPromise;
2425+
await expectPromise(returnPromise).toRejectWith('Oops');
24352426
expect(returned).toBeTruthy();
24362427
});
24372428
it('Can return async iterable when underlying iterable does not have a return method', async () => {
@@ -2553,13 +2544,7 @@ describe('Execute: stream directive', () => {
25532544
done: true,
25542545
value: undefined,
25552546
});
2556-
try {
2557-
await throwPromise; /* c8 ignore start */
2558-
// Not reachable, always throws
2559-
/* c8 ignore stop */
2560-
} catch (e) {
2561-
// ignore error
2562-
}
2547+
await expectPromise(throwPromise).toRejectWith('bad');
25632548
expect(returned).toBeTruthy();
25642549
});
25652550
});

0 commit comments

Comments
 (0)