Skip to content

Commit aca182e

Browse files
committed
upgrade executor to non-duplicating incremental delivery format
1 parent 7d445ed commit aca182e

24 files changed

Lines changed: 5373 additions & 1487 deletions
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/**
2+
* ES6 Map with additional `add` method to accumulate items.
3+
*/
4+
export class AccumulatorMap<K, T> extends Map<K, Array<T>> {
5+
get [Symbol.toStringTag]() {
6+
return 'AccumulatorMap';
7+
}
8+
9+
add(key: K, item: T): void {
10+
const group = this.get(key);
11+
if (group === undefined) {
12+
this.set(key, [item]);
13+
} else {
14+
group.push(item);
15+
}
16+
}
17+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { isPromise } from '@graphql-tools/utils';
2+
import type { MaybePromise } from '@graphql-tools/utils';
3+
4+
/**
5+
* A BoxedPromiseOrValue is a container for a value or promise where the value
6+
* will be updated when the promise resolves.
7+
*
8+
* A BoxedPromiseOrValue may only be used with promises whose possible
9+
* rejection has already been handled, otherwise this will lead to unhandled
10+
* promise rejections.
11+
*
12+
* @internal
13+
* */
14+
export class BoxedPromiseOrValue<T> {
15+
value: MaybePromise<T>;
16+
17+
constructor(value: MaybePromise<T>) {
18+
this.value = value;
19+
if (isPromise(value)) {
20+
value.then(resolved => {
21+
this.value = resolved;
22+
});
23+
}
24+
}
25+
}
Lines changed: 340 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,340 @@
1+
import type { GraphQLError } from 'graphql';
2+
import { isPromise } from '@graphql-tools/utils';
3+
import { BoxedPromiseOrValue } from './BoxedPromiseOrValue.js';
4+
import { promiseWithResolvers } from './promiseWithResolvers.js';
5+
import type {
6+
DeferredFragmentRecord,
7+
DeferredGroupedFieldSetRecord,
8+
IncrementalDataRecord,
9+
IncrementalDataRecordResult,
10+
ReconcilableDeferredGroupedFieldSetResult,
11+
StreamItemRecord,
12+
StreamRecord,
13+
SubsequentResultRecord,
14+
} from './types.js';
15+
import { isDeferredGroupedFieldSetRecord } from './types.js';
16+
17+
interface DeferredFragmentNode {
18+
deferredFragmentRecord: DeferredFragmentRecord;
19+
deferredGroupedFieldSetRecords: Set<DeferredGroupedFieldSetRecord>;
20+
reconcilableResults: Set<ReconcilableDeferredGroupedFieldSetResult>;
21+
children: Array<DeferredFragmentNode>;
22+
}
23+
24+
function isDeferredFragmentNode(
25+
node: DeferredFragmentNode | undefined,
26+
): node is DeferredFragmentNode {
27+
return node !== undefined;
28+
}
29+
30+
function isStreamNode(
31+
record: SubsequentResultNode | IncrementalDataRecord,
32+
): record is StreamRecord {
33+
return 'streamItemQueue' in record;
34+
}
35+
36+
type SubsequentResultNode = DeferredFragmentNode | StreamRecord;
37+
38+
/**
39+
* @internal
40+
*/
41+
export class IncrementalGraph {
42+
private _pending: Set<SubsequentResultNode>;
43+
private _deferredFragmentNodes: Map<DeferredFragmentRecord, DeferredFragmentNode>;
44+
45+
private _newPending: Set<SubsequentResultNode>;
46+
private _newIncrementalDataRecords: Set<IncrementalDataRecord>;
47+
private _completedQueue: Array<IncrementalDataRecordResult>;
48+
private _nextQueue: Array<
49+
(iterable: IteratorResult<Iterable<IncrementalDataRecordResult>>) => void
50+
>;
51+
52+
constructor() {
53+
this._pending = new Set();
54+
this._deferredFragmentNodes = new Map();
55+
this._newIncrementalDataRecords = new Set();
56+
this._newPending = new Set();
57+
this._completedQueue = [];
58+
this._nextQueue = [];
59+
}
60+
61+
addIncrementalDataRecords(incrementalDataRecords: ReadonlyArray<IncrementalDataRecord>): void {
62+
for (const incrementalDataRecord of incrementalDataRecords) {
63+
if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) {
64+
this._addDeferredGroupedFieldSetRecord(incrementalDataRecord);
65+
} else {
66+
this._addStreamRecord(incrementalDataRecord);
67+
}
68+
}
69+
}
70+
71+
addCompletedReconcilableDeferredGroupedFieldSet(
72+
reconcilableResult: ReconcilableDeferredGroupedFieldSetResult,
73+
): void {
74+
const deferredFragmentNodes: Array<DeferredFragmentNode> =
75+
reconcilableResult.deferredGroupedFieldSetRecord.deferredFragmentRecords
76+
.map(deferredFragmentRecord => this._deferredFragmentNodes.get(deferredFragmentRecord))
77+
.filter<DeferredFragmentNode>(isDeferredFragmentNode);
78+
for (const deferredFragmentNode of deferredFragmentNodes) {
79+
deferredFragmentNode.deferredGroupedFieldSetRecords.delete(
80+
reconcilableResult.deferredGroupedFieldSetRecord,
81+
);
82+
deferredFragmentNode.reconcilableResults.add(reconcilableResult);
83+
}
84+
}
85+
86+
getNewPending(): ReadonlyArray<SubsequentResultRecord> {
87+
const newPending: Array<SubsequentResultRecord> = [];
88+
for (const node of this._newPending) {
89+
if (isStreamNode(node)) {
90+
this._pending.add(node);
91+
newPending.push(node);
92+
this._newIncrementalDataRecords.add(node);
93+
} else if (node.deferredGroupedFieldSetRecords.size > 0) {
94+
for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) {
95+
this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode);
96+
}
97+
this._pending.add(node);
98+
newPending.push(node.deferredFragmentRecord);
99+
} else {
100+
for (const child of node.children) {
101+
this._newPending.add(child);
102+
}
103+
}
104+
}
105+
this._newPending.clear();
106+
107+
for (const incrementalDataRecord of this._newIncrementalDataRecords) {
108+
if (isStreamNode(incrementalDataRecord)) {
109+
this._onStreamItems(incrementalDataRecord, incrementalDataRecord.streamItemQueue);
110+
} else {
111+
const deferredGroupedFieldSetResult = incrementalDataRecord.result;
112+
const result =
113+
deferredGroupedFieldSetResult instanceof BoxedPromiseOrValue
114+
? deferredGroupedFieldSetResult.value
115+
: deferredGroupedFieldSetResult().value;
116+
117+
if (isPromise(result)) {
118+
result.then(resolved => this._enqueue(resolved));
119+
} else {
120+
this._enqueue(result);
121+
}
122+
}
123+
}
124+
this._newIncrementalDataRecords.clear();
125+
126+
return newPending;
127+
}
128+
129+
completedIncrementalData() {
130+
return {
131+
[Symbol.asyncIterator]() {
132+
return this;
133+
},
134+
next: (): Promise<IteratorResult<Iterable<IncrementalDataRecordResult>>> => {
135+
const firstResult = this._completedQueue.shift();
136+
if (firstResult !== undefined) {
137+
return Promise.resolve({
138+
value: this._yieldCurrentCompletedIncrementalData(firstResult),
139+
done: false,
140+
});
141+
}
142+
const { promise, resolve } =
143+
promiseWithResolvers<IteratorResult<Iterable<IncrementalDataRecordResult>>>();
144+
this._nextQueue.push(resolve);
145+
return promise;
146+
},
147+
return: (): Promise<IteratorResult<Iterable<IncrementalDataRecordResult>>> => {
148+
for (const resolve of this._nextQueue) {
149+
resolve({ value: undefined, done: true });
150+
}
151+
return Promise.resolve({ value: undefined, done: true });
152+
},
153+
};
154+
}
155+
156+
hasNext(): boolean {
157+
return this._pending.size > 0;
158+
}
159+
160+
completeDeferredFragment(
161+
deferredFragmentRecord: DeferredFragmentRecord,
162+
): Array<ReconcilableDeferredGroupedFieldSetResult> | undefined {
163+
const deferredFragmentNode = this._deferredFragmentNodes.get(deferredFragmentRecord);
164+
// TODO: add test case?
165+
/* c8 ignore next 3 */
166+
if (deferredFragmentNode === undefined) {
167+
return undefined;
168+
}
169+
if (deferredFragmentNode.deferredGroupedFieldSetRecords.size > 0) {
170+
return;
171+
}
172+
const reconcilableResults = Array.from(deferredFragmentNode.reconcilableResults);
173+
for (const reconcilableResult of reconcilableResults) {
174+
for (const otherDeferredFragmentRecord of reconcilableResult.deferredGroupedFieldSetRecord
175+
.deferredFragmentRecords) {
176+
const otherDeferredFragmentNode = this._deferredFragmentNodes.get(
177+
otherDeferredFragmentRecord,
178+
);
179+
if (otherDeferredFragmentNode === undefined) {
180+
continue;
181+
}
182+
otherDeferredFragmentNode.reconcilableResults.delete(reconcilableResult);
183+
}
184+
}
185+
this._removePending(deferredFragmentNode);
186+
for (const child of deferredFragmentNode.children) {
187+
this._newPending.add(child);
188+
}
189+
return reconcilableResults;
190+
}
191+
192+
removeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord): boolean {
193+
const deferredFragmentNode = this._deferredFragmentNodes.get(deferredFragmentRecord);
194+
if (deferredFragmentNode === undefined) {
195+
return false;
196+
}
197+
this._removePending(deferredFragmentNode);
198+
this._deferredFragmentNodes.delete(deferredFragmentRecord);
199+
// TODO: add test case for an erroring deferred fragment with child defers
200+
/* c8 ignore next 3 */
201+
for (const child of deferredFragmentNode.children) {
202+
this.removeDeferredFragment(child.deferredFragmentRecord);
203+
}
204+
return true;
205+
}
206+
207+
removeStream(streamRecord: StreamRecord): void {
208+
this._removePending(streamRecord);
209+
}
210+
211+
private _removePending(subsequentResultNode: SubsequentResultNode): void {
212+
this._pending.delete(subsequentResultNode);
213+
if (this._pending.size === 0) {
214+
for (const resolve of this._nextQueue) {
215+
resolve({ value: undefined, done: true });
216+
}
217+
}
218+
}
219+
220+
private _addDeferredGroupedFieldSetRecord(
221+
deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord,
222+
): void {
223+
for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) {
224+
const deferredFragmentNode = this._addDeferredFragmentNode(deferredFragmentRecord);
225+
if (this._pending.has(deferredFragmentNode)) {
226+
this._newIncrementalDataRecords.add(deferredGroupedFieldSetRecord);
227+
}
228+
deferredFragmentNode.deferredGroupedFieldSetRecords.add(deferredGroupedFieldSetRecord);
229+
}
230+
}
231+
232+
private _addStreamRecord(streamRecord: StreamRecord): void {
233+
this._newPending.add(streamRecord);
234+
}
235+
236+
private _addDeferredFragmentNode(
237+
deferredFragmentRecord: DeferredFragmentRecord,
238+
): DeferredFragmentNode {
239+
let deferredFragmentNode = this._deferredFragmentNodes.get(deferredFragmentRecord);
240+
if (deferredFragmentNode !== undefined) {
241+
return deferredFragmentNode;
242+
}
243+
deferredFragmentNode = {
244+
deferredFragmentRecord,
245+
deferredGroupedFieldSetRecords: new Set(),
246+
reconcilableResults: new Set(),
247+
children: [],
248+
};
249+
this._deferredFragmentNodes.set(deferredFragmentRecord, deferredFragmentNode);
250+
const parent = deferredFragmentRecord.parent;
251+
if (parent === undefined) {
252+
this._newPending.add(deferredFragmentNode);
253+
return deferredFragmentNode;
254+
}
255+
const parentNode = this._addDeferredFragmentNode(parent);
256+
parentNode.children.push(deferredFragmentNode);
257+
return deferredFragmentNode;
258+
}
259+
260+
private async _onStreamItems(
261+
streamRecord: StreamRecord,
262+
streamItemQueue: Array<StreamItemRecord>,
263+
): Promise<void> {
264+
let items: Array<unknown> = [];
265+
let errors: Array<GraphQLError> = [];
266+
let incrementalDataRecords: Array<IncrementalDataRecord> = [];
267+
let streamItemRecord: StreamItemRecord | undefined;
268+
while ((streamItemRecord = streamItemQueue.shift()) !== undefined) {
269+
let result =
270+
typeof streamItemRecord === 'function' ? streamItemRecord().value : streamItemRecord.value;
271+
if (isPromise(result)) {
272+
if (items.length > 0) {
273+
this._enqueue({
274+
streamRecord,
275+
result:
276+
// TODO add additional test case or rework for coverage
277+
errors.length > 0 /* c8 ignore start */
278+
? { items, errors } /* c8 ignore stop */
279+
: { items },
280+
incrementalDataRecords,
281+
});
282+
items = [];
283+
errors = [];
284+
incrementalDataRecords = [];
285+
}
286+
result = await result;
287+
// wait an additional tick to coalesce resolving additional promises
288+
// within the queue
289+
await Promise.resolve();
290+
}
291+
if (result.item === undefined) {
292+
if (items.length > 0) {
293+
this._enqueue({
294+
streamRecord,
295+
result: errors.length > 0 ? { items, errors } : { items },
296+
incrementalDataRecords,
297+
});
298+
}
299+
this._enqueue(
300+
result.errors === undefined
301+
? { streamRecord }
302+
: {
303+
streamRecord,
304+
errors: result.errors,
305+
},
306+
);
307+
return;
308+
}
309+
items.push(result.item);
310+
if (result.errors !== undefined) {
311+
errors.push(...result.errors);
312+
}
313+
if (result.incrementalDataRecords !== undefined) {
314+
incrementalDataRecords.push(...result.incrementalDataRecords);
315+
}
316+
}
317+
}
318+
319+
private *_yieldCurrentCompletedIncrementalData(
320+
first: IncrementalDataRecordResult,
321+
): Generator<IncrementalDataRecordResult> {
322+
yield first;
323+
let completed;
324+
while ((completed = this._completedQueue.shift()) !== undefined) {
325+
yield completed;
326+
}
327+
}
328+
329+
private _enqueue(completed: IncrementalDataRecordResult): void {
330+
const next = this._nextQueue.shift();
331+
if (next !== undefined) {
332+
next({
333+
value: this._yieldCurrentCompletedIncrementalData(completed),
334+
done: false,
335+
});
336+
return;
337+
}
338+
this._completedQueue.push(completed);
339+
}
340+
}

0 commit comments

Comments
 (0)