Skip to content

Commit a41a0de

Browse files
committed
fix(incremental): do not initiate non-pending execution groups
Replicates graphql/graphql-js@4b0c113
1 parent 074a7c9 commit a41a0de

File tree

3 files changed

+236
-28
lines changed

3 files changed

+236
-28
lines changed

src/graphql/execution/execute.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1655,12 +1655,8 @@ def executor(
16551655
defer_map,
16561656
)
16571657

1658-
should_defer_this_defer_usage_set = should_defer(
1659-
parent_defer_usages, defer_usage_set
1660-
)
1661-
1662-
if should_defer_this_defer_usage_set:
1663-
if self.enable_early_execution:
1658+
if self.enable_early_execution:
1659+
if should_defer(parent_defer_usages, defer_usage_set):
16641660

16651661
async def execute_async(
16661662
executor: Callable[
@@ -1674,18 +1670,17 @@ async def execute_async(
16741670

16751671
deferred_record.result = BoxedAwaitableOrValue(execute_async())
16761672
else:
1673+
deferred_record.result = BoxedAwaitableOrValue(executor())
1674+
else:
16771675

1678-
def execute_sync(
1679-
executor: Callable[
1680-
[], AwaitableOrValue[DeferredGroupedFieldSetResult]
1681-
] = executor,
1682-
) -> BoxedAwaitableOrValue[DeferredGroupedFieldSetResult]:
1683-
return BoxedAwaitableOrValue(executor())
1684-
1685-
deferred_record.result = execute_sync
1676+
def execute_sync(
1677+
executor: Callable[
1678+
[], AwaitableOrValue[DeferredGroupedFieldSetResult]
1679+
] = executor,
1680+
) -> BoxedAwaitableOrValue[DeferredGroupedFieldSetResult]:
1681+
return BoxedAwaitableOrValue(executor())
16861682

1687-
else:
1688-
deferred_record.result = BoxedAwaitableOrValue(executor())
1683+
deferred_record.result = execute_sync
16891684

16901685
append_record(deferred_record)
16911686

src/graphql/execution/incremental_graph.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,10 @@ def complete_deferred_fragment(
127127
| None
128128
):
129129
"""Complete a deferred fragment."""
130-
if deferred_fragment_record not in self._root_nodes:
131-
return None # pragma: no cover
132-
if deferred_fragment_record.deferred_grouped_field_set_records:
130+
if (
131+
deferred_fragment_record not in self._root_nodes
132+
or deferred_fragment_record.deferred_grouped_field_set_records
133+
):
133134
return None
134135
reconcilable_results = list(deferred_fragment_record.reconcilable_results)
135136
self._remove_root_node(deferred_fragment_record)
@@ -268,19 +269,17 @@ def _on_deferred_grouped_field_set(
268269
) -> None:
269270
"""Handle deferred grouped field set record."""
270271
deferred_grouped_field_set_result = deferred_grouped_field_set_record.result
271-
result = (
272-
deferred_grouped_field_set_result.value
273-
if isinstance(deferred_grouped_field_set_result, BoxedAwaitableOrValue)
274-
else deferred_grouped_field_set_result().value
275-
)
276-
if is_awaitable(result):
272+
if not isinstance(deferred_grouped_field_set_result, BoxedAwaitableOrValue):
273+
deferred_grouped_field_set_result = deferred_grouped_field_set_result()
274+
value = deferred_grouped_field_set_result.value
275+
if is_awaitable(value):
277276

278277
async def await_and_enqueue() -> None:
279-
self._enqueue(await result)
278+
self._enqueue(await value)
280279

281280
self._add_task(await_and_enqueue())
282281
else:
283-
self._enqueue(result)
282+
self._enqueue(value)
284283

285284
async def _on_stream_items(self, stream_record: StreamRecord) -> None:
286285
"""Handle stream items."""

tests/execution/test_defer.py

Lines changed: 215 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from asyncio import sleep
3+
from asyncio import Event, sleep
44
from typing import TYPE_CHECKING, Any, NamedTuple, cast
55

66
import pytest
@@ -1206,6 +1206,220 @@ async def separately_emits_nested_defer_frags_var_subfields_same_prio_diff_level
12061206
},
12071207
]
12081208

1209+
async def initiates_deferred_grouped_field_sets_only_if_released_as_pending():
1210+
"""Initiates deferred grouped field sets only if released as pending
1211+
1212+
Initiates deferred grouped field sets only if they have been released
1213+
as pending.
1214+
"""
1215+
document = parse(
1216+
"""
1217+
query {
1218+
... @defer {
1219+
a {
1220+
... @defer {
1221+
b {
1222+
c { d }
1223+
}
1224+
}
1225+
}
1226+
}
1227+
... @defer {
1228+
a {
1229+
someField
1230+
... @defer {
1231+
b {
1232+
e { f }
1233+
}
1234+
}
1235+
}
1236+
}
1237+
}
1238+
"""
1239+
)
1240+
1241+
slow_field_event = Event()
1242+
c_resolver_called = False
1243+
e_resolver_called = False
1244+
1245+
async def resolve_slow_field(_info):
1246+
await slow_field_event.wait()
1247+
return "someField"
1248+
1249+
def resolve_c(_info):
1250+
nonlocal c_resolver_called
1251+
c_resolver_called = True
1252+
return {"d": "d"}
1253+
1254+
def resolve_e(_info):
1255+
nonlocal e_resolver_called
1256+
e_resolver_called = True
1257+
return {"f": "f"}
1258+
1259+
execute_result = experimental_execute_incrementally(
1260+
schema,
1261+
document,
1262+
root_value={
1263+
"a": {
1264+
"someField": resolve_slow_field,
1265+
"b": {
1266+
"c": resolve_c,
1267+
"e": resolve_e,
1268+
},
1269+
}
1270+
},
1271+
enable_early_execution=False,
1272+
)
1273+
1274+
assert isinstance(execute_result, ExperimentalIncrementalExecutionResults)
1275+
1276+
result1 = execute_result.initial_result
1277+
assert result1.formatted == {
1278+
"data": {},
1279+
"pending": [{"id": "0", "path": []}, {"id": "1", "path": []}],
1280+
"hasNext": True,
1281+
}
1282+
1283+
iterator = execute_result.subsequent_results
1284+
1285+
assert c_resolver_called is False
1286+
assert e_resolver_called is False
1287+
1288+
result2 = await anext(iterator)
1289+
assert result2.formatted == {
1290+
"pending": [{"id": "2", "path": ["a"]}],
1291+
"incremental": [
1292+
{"data": {"a": {}}, "id": "0"},
1293+
{"data": {"b": {}}, "id": "2"},
1294+
{"data": {"c": {"d": "d"}}, "id": "2", "subPath": ["b"]},
1295+
],
1296+
"completed": [{"id": "0"}, {"id": "2"}],
1297+
"hasNext": True,
1298+
}
1299+
1300+
assert c_resolver_called is True
1301+
assert e_resolver_called is False
1302+
1303+
slow_field_event.set()
1304+
1305+
result3 = await anext(iterator)
1306+
assert result3.formatted == {
1307+
"pending": [{"id": "3", "path": ["a"]}],
1308+
"incremental": [
1309+
{"data": {"someField": "someField"}, "id": "1", "subPath": ["a"]},
1310+
{"data": {"e": {"f": "f"}}, "id": "3", "subPath": ["b"]},
1311+
],
1312+
"completed": [{"id": "1"}, {"id": "3"}],
1313+
"hasNext": False,
1314+
}
1315+
1316+
assert e_resolver_called is True
1317+
1318+
with pytest.raises(StopAsyncIteration):
1319+
await anext(iterator)
1320+
1321+
async def initiates_unique_deferred_grouped_field_sets_after_sibling_defers():
1322+
"""Initiates unique deferred grouped field sets after sibling defers.
1323+
1324+
Initiates unique deferred grouped field sets after those that are common
1325+
to sibling defers.
1326+
"""
1327+
document = parse(
1328+
"""
1329+
query {
1330+
... @defer {
1331+
a {
1332+
... @defer {
1333+
b {
1334+
c { d }
1335+
}
1336+
}
1337+
}
1338+
}
1339+
... @defer {
1340+
a {
1341+
... @defer {
1342+
b {
1343+
c { d }
1344+
e { f }
1345+
}
1346+
}
1347+
}
1348+
}
1349+
}
1350+
"""
1351+
)
1352+
1353+
c_event = Event()
1354+
c_resolver_called = False
1355+
e_resolver_called = False
1356+
1357+
async def resolve_c(_info):
1358+
nonlocal c_resolver_called
1359+
c_resolver_called = True
1360+
await c_event.wait()
1361+
return {"d": "d"}
1362+
1363+
def resolve_e(_info):
1364+
nonlocal e_resolver_called
1365+
e_resolver_called = True
1366+
return {"f": "f"}
1367+
1368+
execute_result = experimental_execute_incrementally(
1369+
schema,
1370+
document,
1371+
root_value={
1372+
"a": {
1373+
"b": {
1374+
"c": resolve_c,
1375+
"e": resolve_e,
1376+
}
1377+
}
1378+
},
1379+
enable_early_execution=False,
1380+
)
1381+
1382+
assert isinstance(execute_result, ExperimentalIncrementalExecutionResults)
1383+
1384+
result1 = execute_result.initial_result
1385+
assert result1.formatted == {
1386+
"data": {},
1387+
"pending": [{"id": "0", "path": []}, {"id": "1", "path": []}],
1388+
"hasNext": True,
1389+
}
1390+
1391+
iterator = execute_result.subsequent_results
1392+
1393+
assert c_resolver_called is False
1394+
assert e_resolver_called is False
1395+
1396+
result2 = await anext(iterator)
1397+
assert result2.formatted == {
1398+
"pending": [{"id": "2", "path": ["a"]}, {"id": "3", "path": ["a"]}],
1399+
"incremental": [{"data": {"a": {}}, "id": "0"}],
1400+
"completed": [{"id": "0"}, {"id": "1"}],
1401+
"hasNext": True,
1402+
}
1403+
1404+
await sleep(0) # let resolve_c start and suspend at c_event.wait()
1405+
c_event.set()
1406+
1407+
assert c_resolver_called is True
1408+
assert e_resolver_called is False
1409+
1410+
result3 = await anext(iterator)
1411+
assert result3.formatted == {
1412+
"incremental": [
1413+
{"data": {"b": {"c": {"d": "d"}}}, "id": "2"},
1414+
{"data": {"e": {"f": "f"}}, "id": "3", "subPath": ["b"]},
1415+
],
1416+
"completed": [{"id": "2"}, {"id": "3"}],
1417+
"hasNext": False,
1418+
}
1419+
1420+
with pytest.raises(StopAsyncIteration):
1421+
await anext(iterator)
1422+
12091423
async def can_deduplicate_multiple_defers_on_the_same_object():
12101424
"""Can deduplicate multiple defers on the same object"""
12111425
document = parse(

0 commit comments

Comments
 (0)