Skip to content

Commit d52b310

Browse files
authored
GFQL: add scope-aware predicate pushdown barriers (#1190) (#1192)
1 parent e1b3617 commit d52b310

8 files changed

Lines changed: 115 additions & 14 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
3636
- **GFQL / pass framework skeleton**: Added `graphistry/compute/gfql/passes/` with `LogicalPass`, `PassResult`, and deterministic `PassManager.run()` sequencing. The pass manager now invokes IR `verify()` after each pass and fails fast on invalid pass output. Wired a new logical-pass pipeline hook into `gfql()` execution between logical-plan and physical-planner stages using a default no-op pass configuration to preserve runtime behavior. Added focused tests for pass ordering, verifier-failure propagation, and runtime pipeline hook invocation (`test_pass_manager.py`, `test_runtime_physical_cutover.py`) (#1180).
3737
- **GFQL / predicate pushdown safety**: Added `graphistry/compute/gfql/ir/pushdown_safety.py` with three reusable utilities for `PredicatePushdownPass`: `is_null_rejecting(pred, null_extended_aliases)` — conservative syntactic heuristic returning True when a predicate references a null-extended alias (OPTIONAL MATCH) and does not use a null-safe form (IS NULL, IS NOT NULL, COALESCE, NULLIF); `is_null_safe` — inverse; `with_barrier_blocks_pushdown(scope_stack, pred_refs)` — returns True when a WITH-clause `ScopeFrame` prevents backward predicate movement for the given reference set. All three exported from `ir/__init__.py`. 41 unit tests (#1181).
3838
- **GFQL / predicate pushdown rewrite**: Added `PredicatePushdownPass` implementation in `graphistry/compute/gfql/passes/predicate_pushdown.py` and wired it into logical planning route execution. The pass rewrites `Filter(input=PatternMatch(...))` by pushing safe predicates into `PatternMatch.predicates`, keeps residual filters for partial-push cases, and blocks null-rejecting pushdown into optional arms using existing safety helpers. Added focused pass tests and a lowering-route integration assertion (`test_predicate_pushdown_pass.py`, `cypher/test_lowering.py`) (#1187).
39+
- **GFQL / scope-aware pushdown barriers**: Threaded binder scope metadata (`BoundIR.scope_stack`) into runtime logical-pass context via `CompiledCypherExecutionExtras` and `PlanContext`, and updated `PredicatePushdownPass` to enforce `with_barrier_blocks_pushdown()` using real scope data before moving conjuncts into `PatternMatch.predicates`. Added targeted regressions for blocked vs allowed WITH-boundary movement and compile-route scope metadata threading (`test_predicate_pushdown_pass.py`, `cypher/test_lowering.py`) (#1190).
3940
- **GFQL / remote wire migration (M3 follow-up)**: `chain_remote.py` remote Cypher string path no longer imports or dispatches on `CompiledCypher*` classes for wire serialization. It now validates/serializes structural compiled-query shapes (`chain`, `graph_bindings`, `procedure_call`, `use_ref`) so remote wire payload generation is decoupled from compiler IR class identity while preserving existing Let and CALL wire formats. Added parity tests in `test_chain_remote_v2.py` for structural fake compiled-query inputs (including Let bindings/result serialization and structural union rejection) (#1168).
4041
- **GFQL / M3 compatibility-deletion gate (PR3 slice)**: Cypher public compatibility surfaces now mark `compile_cypher()` and legacy `CompiledCypher*` exports as deprecated while retaining runtime compatibility. `graphistry.compute.gfql.cypher` now serves deprecated `CompiledCypherProcedureCall`, `CompiledCypherQuery`, `CompiledCypherUnionQuery`, and `compile_cypher_query` via lazy compatibility accessors with explicit deprecation warnings; `compile_cypher()` now emits a deprecation warning and migration guidance (`g.gfql(..., language="cypher")`, `cypher_to_gfql(...)`). API docs now describe `compile_cypher()` as deprecated/internal-shape oriented. Deferred boundaries remain unchanged: remote-wire migration stays tracked in #1168 and hard removal/versioned API cleanup stays tracked in #1169 (#1174).
4142
- **GFQL / M3 runtime cutover to PhysicalPlanner**: `gfql()` compiled-query execution in `graphistry/compute/gfql_unified.py` now dispatches through `PhysicalPlanner.plan(...)` for planned logical routes, then executes via physical operator wrappers (`same_path`, `wavefront`, `row_pipeline`). Added bounded compatibility shims for currently-required lanes (`CALL`-backed compiled queries and connected OPTIONAL wavefront payloads), and explicit validation failure when a planned wavefront route lacks executable join payload metadata (no silent unmatched fallback). Added focused runtime cutover tests in `graphistry/tests/compute/gfql/test_runtime_physical_cutover.py` covering planner invocation, optional-wavefront parity, compatibility-shim continuity, and explicit wavefront mismatch failure behavior (#1173).

graphistry/compute/gfql/cypher/lowering.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from graphistry.compute.chain import Chain
3333
from graphistry.compute.exceptions import ErrorCode, GFQLValidationError
3434
from graphistry.compute.gfql.frontends.cypher.binder import FrontendBinder
35-
from graphistry.compute.gfql.ir.bound_ir import BoundIR
35+
from graphistry.compute.gfql.ir.bound_ir import BoundIR, ScopeFrame
3636
from graphistry.compute.gfql.ir.compilation import PlanContext
3737
from graphistry.compute.gfql.ir.logical_plan import (
3838
Join as LogicalJoin,
@@ -146,6 +146,7 @@ class CompiledCypherExecutionExtras:
146146
optional_reentry: bool = False
147147
scalar_reentry_alias: Optional[str] = None
148148
scalar_reentry_columns: Tuple[str, ...] = ()
149+
scope_stack: Tuple[ScopeFrame, ...] = ()
149150
logical_plan: Optional[LogicalPlan] = None
150151
logical_plan_defer_reason: Optional[str] = None
151152

@@ -204,6 +205,10 @@ def scalar_reentry_alias(self) -> Optional[str]:
204205
def scalar_reentry_columns(self) -> Tuple[str, ...]:
205206
return () if self.execution_extras is None else self.execution_extras.scalar_reentry_columns
206207

208+
@property
209+
def scope_stack(self) -> Tuple[ScopeFrame, ...]:
210+
return () if self.execution_extras is None else self.execution_extras.scope_stack
211+
207212
@property
208213
def logical_plan(self) -> Optional[LogicalPlan]:
209214
return None if self.execution_extras is None else self.execution_extras.logical_plan
@@ -303,6 +308,7 @@ def _normalize_execution_extras(
303308
and execution_extras.optional_reentry is False
304309
and execution_extras.scalar_reentry_alias is None
305310
and execution_extras.scalar_reentry_columns == ()
311+
and execution_extras.scope_stack == ()
306312
and execution_extras.logical_plan is None
307313
and execution_extras.logical_plan_defer_reason is None
308314
):
@@ -337,6 +343,7 @@ def _execution_extras_with(
337343
optional_reentry: bool = False,
338344
scalar_reentry_alias: Optional[str] = None,
339345
scalar_reentry_columns: Tuple[str, ...] = (),
346+
scope_stack: Tuple[ScopeFrame, ...] = (),
340347
logical_plan: Optional[LogicalPlan] = None,
341348
logical_plan_defer_reason: Optional[str] = None,
342349
) -> Optional[CompiledCypherExecutionExtras]:
@@ -351,6 +358,7 @@ def _execution_extras_with(
351358
optional_reentry=optional_reentry,
352359
scalar_reentry_alias=scalar_reentry_alias,
353360
scalar_reentry_columns=scalar_reentry_columns,
361+
scope_stack=scope_stack,
354362
logical_plan=logical_plan,
355363
logical_plan_defer_reason=logical_plan_defer_reason,
356364
)
@@ -8364,6 +8372,7 @@ def _attach_logical_plan_route(
83648372
optional_reentry=result.optional_reentry,
83658373
scalar_reentry_alias=result.scalar_reentry_alias,
83668374
scalar_reentry_columns=result.scalar_reentry_columns,
8375+
scope_stack=result.scope_stack,
83678376
logical_plan=effective_logical_plan,
83688377
logical_plan_defer_reason=effective_defer_reason,
83698378
),
@@ -8430,13 +8439,30 @@ def compile_cypher_query(
84308439
compiled_bindings = ()
84318440
logical_plan: Optional[LogicalPlan] = None
84328441
logical_plan_defer_reason: Optional[str] = None
8442+
_bound_scope_stack: Tuple[ScopeFrame, ...] = ()
84338443

84348444
def _attach_graph_context(result: CompiledCypherQuery) -> CompiledCypherQuery:
84358445
out = result
84368446
if not compiled_bindings and _use_ref is None:
84378447
out = result
84388448
else:
84398449
out = replace(result, graph_bindings=compiled_bindings, use_ref=_use_ref)
8450+
out = replace(
8451+
out,
8452+
execution_extras=_execution_extras_with(
8453+
out,
8454+
connected_optional_match=out.connected_optional_match,
8455+
connected_match_join=out.connected_match_join,
8456+
query_graph=out.query_graph,
8457+
start_nodes_query=out.start_nodes_query,
8458+
optional_reentry=out.optional_reentry,
8459+
scalar_reentry_alias=out.scalar_reentry_alias,
8460+
scalar_reentry_columns=out.scalar_reentry_columns,
8461+
scope_stack=_bound_scope_stack,
8462+
logical_plan=out.logical_plan,
8463+
logical_plan_defer_reason=out.logical_plan_defer_reason,
8464+
),
8465+
)
84408466
return _attach_logical_plan_route(
84418467
out,
84428468
logical_plan=logical_plan,
@@ -8452,6 +8478,7 @@ def _attach_graph_context(result: CompiledCypherQuery) -> CompiledCypherQuery:
84528478
# Re-bind after normalization so scope and semantic metadata reflect the
84538479
# lowered query shape consumed by downstream lowering decisions.
84548480
bound_ir = FrontendBinder().bind(query, PlanContext())
8481+
_bound_scope_stack = tuple(bound_ir.scope_stack)
84558482
bound_context = _build_bound_lowering_context(bound_ir=bound_ir, params=params)
84568483
params = bound_context.params
84578484
_reject_unsupported_where_expr_forms(query)

graphistry/compute/gfql/ir/compilation.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from enum import Enum
66
from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Literal, Optional, Tuple
77

8-
from graphistry.compute.gfql.ir.bound_ir import BoundIR, SemanticTable
8+
from graphistry.compute.gfql.ir.bound_ir import BoundIR, ScopeFrame, SemanticTable
99
from graphistry.compute.gfql.ir.logical_plan import LogicalPlan
1010
from graphistry.compute.gfql.ir.query_graph import QueryGraph
1111

@@ -80,6 +80,8 @@ class PlanContext:
8080
indexes: List[IndexDescriptor] = field(default_factory=list)
8181
backend: BackendCapabilities = field(default_factory=BackendCapabilities)
8282
config: CompilerConfig = field(default_factory=CompilerConfig)
83+
# Pass-visible binder scope metadata for optimization safety checks.
84+
scope_stack: Tuple[ScopeFrame, ...] = field(default_factory=tuple)
8385

8486

8587
@dataclass(frozen=True)

graphistry/compute/gfql/ir/pushdown_safety.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"""
1515
from __future__ import annotations
1616

17-
from typing import FrozenSet, List
17+
from typing import FrozenSet, Sequence
1818

1919
from graphistry.compute.gfql.ir.bound_ir import ScopeFrame
2020
from graphistry.compute.gfql.ir.types import BoundPredicate
@@ -96,7 +96,7 @@ def is_null_safe(
9696
# ---------------------------------------------------------------------------
9797

9898
def with_barrier_blocks_pushdown(
99-
scope_stack: List[ScopeFrame],
99+
scope_stack: Sequence[ScopeFrame],
100100
predicate_refs: FrozenSet[str],
101101
) -> bool:
102102
"""Return True if a WITH boundary in *scope_stack* blocks backward pushdown.

graphistry/compute/gfql/passes/predicate_pushdown.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88

99
import re
1010
from dataclasses import replace
11-
from typing import Any, FrozenSet, List, Tuple, cast
11+
from typing import Any, FrozenSet, List, Sequence, Tuple, cast
1212

13+
from graphistry.compute.gfql.ir.compilation import PlanContext
1314
from graphistry.compute.gfql.ir.logical_plan import Filter, LogicalPlan, PatternMatch
14-
from graphistry.compute.gfql.ir.pushdown_safety import is_null_rejecting
15+
from graphistry.compute.gfql.ir.pushdown_safety import is_null_rejecting, with_barrier_blocks_pushdown
16+
from graphistry.compute.gfql.ir.bound_ir import ScopeFrame
1517
from graphistry.compute.gfql.ir.types import BoundPredicate
1618
from graphistry.compute.gfql.passes.manager import LogicalPass, PassResult
1719

@@ -21,9 +23,11 @@ class PredicatePushdownPass(LogicalPass):
2123

2224
name = "predicate_pushdown"
2325

24-
def run(self, plan: LogicalPlan, ctx) -> PassResult: # noqa: ANN001
25-
_ = ctx
26-
rewritten, pushed, residual = _rewrite_tree(plan)
26+
def run(self, plan: LogicalPlan, ctx: PlanContext) -> PassResult:
27+
rewritten, pushed, residual = _rewrite_tree(
28+
plan,
29+
scope_stack=ctx.scope_stack,
30+
)
2731
return PassResult(
2832
plan=rewritten,
2933
metadata={
@@ -34,14 +38,21 @@ def run(self, plan: LogicalPlan, ctx) -> PassResult: # noqa: ANN001
3438
)
3539

3640

37-
def _rewrite_tree(plan: LogicalPlan) -> Tuple[LogicalPlan, int, int]:
41+
def _rewrite_tree(
42+
plan: LogicalPlan,
43+
*,
44+
scope_stack: Sequence[ScopeFrame] = (),
45+
) -> Tuple[LogicalPlan, int, int]:
3846
pushed = 0
3947
residual = 0
4048
children_updates = {}
4149
for slot in ("input", "left", "right", "subquery"):
4250
child = getattr(plan, slot, None)
4351
if isinstance(child, LogicalPlan):
44-
rewritten_child, child_pushed, child_residual = _rewrite_tree(child)
52+
rewritten_child, child_pushed, child_residual = _rewrite_tree(
53+
child,
54+
scope_stack=scope_stack,
55+
)
4556
pushed += child_pushed
4657
residual += child_residual
4758
if rewritten_child is not child:
@@ -53,13 +64,20 @@ def _rewrite_tree(plan: LogicalPlan) -> Tuple[LogicalPlan, int, int]:
5364
else plan
5465
)
5566
if isinstance(current, Filter) and isinstance(current.input, PatternMatch):
56-
rewritten_filter, local_pushed, local_residual = _push_filter_into_pattern(current)
67+
rewritten_filter, local_pushed, local_residual = _push_filter_into_pattern(
68+
current,
69+
scope_stack=scope_stack,
70+
)
5771
return rewritten_filter, pushed + local_pushed, residual + local_residual
5872

5973
return current, pushed, residual
6074

6175

62-
def _push_filter_into_pattern(filter_op: Filter) -> Tuple[LogicalPlan, int, int]:
76+
def _push_filter_into_pattern(
77+
filter_op: Filter,
78+
*,
79+
scope_stack: Sequence[ScopeFrame] = (),
80+
) -> Tuple[LogicalPlan, int, int]:
6381
assert isinstance(filter_op.input, PatternMatch)
6482
pattern = cast(PatternMatch, filter_op.input)
6583
conjuncts = _split_conjuncts(filter_op.predicate)
@@ -76,6 +94,9 @@ def _push_filter_into_pattern(filter_op: Filter) -> Tuple[LogicalPlan, int, int]
7694
expression=conjunct.expression,
7795
references=_predicate_refs_for_analysis(conjunct, candidate_aliases),
7896
)
97+
if with_barrier_blocks_pushdown(scope_stack, analysis_predicate.references):
98+
kept.append(conjunct)
99+
continue
79100
if pattern.optional and is_null_rejecting(analysis_predicate, null_extended_aliases):
80101
kept.append(conjunct)
81102
continue

graphistry/compute/gfql_unified.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,7 @@ def _execute_compiled_query_non_union(
640640
start_nodes=start_nodes,
641641
)
642642

643-
ctx = PlanContext()
643+
ctx = PlanContext(scope_stack=compiled_query.scope_stack)
644644
logical_plan = _run_logical_pass_pipeline(logical_plan, ctx)
645645

646646
try:

graphistry/tests/compute/gfql/cypher/test_lowering.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,14 @@ def test_compiled_query_sets_logical_plan_route_for_covered_shape() -> None:
740740
assert compiled.logical_plan_defer_reason is None
741741

742742

743+
def test_compiled_query_threads_bound_scope_stack_for_runtime_passes() -> None:
744+
compiled = _compile_query("MATCH (n:Person) RETURN n")
745+
assert compiled.scope_stack
746+
assert compiled.scope_stack[0].origin_clause.upper() == "MATCH"
747+
assert compiled.scope_stack[-1].origin_clause.upper() == "RETURN"
748+
assert "n" in compiled.scope_stack[-1].visible_vars
749+
750+
743751
def test_compiled_query_sets_logical_plan_route_for_match_scalar_return_shape() -> None:
744752
compiled = _compile_query("MATCH (n:Person) RETURN 1 AS x")
745753
assert compiled.logical_plan_route == "planned"

graphistry/tests/compute/gfql/test_predicate_pushdown_pass.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
from graphistry.compute.gfql.ir.bound_ir import ScopeFrame
34
from graphistry.compute.gfql.ir.compilation import PlanContext
45
from graphistry.compute.gfql.ir.logical_plan import Filter, NodeScan, PatternMatch, Project, RowSchema
56
from graphistry.compute.gfql.ir.types import BoundPredicate, NodeRef
@@ -151,3 +152,44 @@ def test_predicate_pushdown_does_not_cross_with_like_projection_barrier() -> Non
151152
assert isinstance(result.input, Project)
152153
assert isinstance(result.input.input, PatternMatch)
153154
assert result.input.input.predicates == []
155+
156+
157+
def test_predicate_pushdown_blocks_when_scope_metadata_reports_with_barrier() -> None:
158+
plan = Filter(
159+
op_id=3,
160+
input=PatternMatch(op_id=2, pattern={"aliases": ("n",)}, output_schema=_schema("n")),
161+
predicate=_pred("n.age > 5", frozenset({"n"})),
162+
output_schema=_schema("n"),
163+
)
164+
ctx = PlanContext(
165+
scope_stack=(
166+
ScopeFrame(visible_vars=frozenset({"n"}), schema=_schema("n"), origin_clause="MATCH"),
167+
ScopeFrame(visible_vars=frozenset({"m"}), schema=_schema("m"), origin_clause="WITH"),
168+
),
169+
)
170+
171+
result = PredicatePushdownPass().run(plan, ctx).plan
172+
173+
assert isinstance(result, Filter)
174+
assert isinstance(result.input, PatternMatch)
175+
assert result.input.predicates == []
176+
177+
178+
def test_predicate_pushdown_allows_when_scope_metadata_preserves_alias_across_with() -> None:
179+
plan = Filter(
180+
op_id=3,
181+
input=PatternMatch(op_id=2, pattern={"aliases": ("n",)}, output_schema=_schema("n")),
182+
predicate=_pred("n.age > 5", frozenset({"n"})),
183+
output_schema=_schema("n"),
184+
)
185+
ctx = PlanContext(
186+
scope_stack=(
187+
ScopeFrame(visible_vars=frozenset({"n"}), schema=_schema("n"), origin_clause="MATCH"),
188+
ScopeFrame(visible_vars=frozenset({"n"}), schema=_schema("n"), origin_clause="WITH"),
189+
),
190+
)
191+
192+
result = PredicatePushdownPass().run(plan, ctx).plan
193+
194+
assert isinstance(result, PatternMatch)
195+
assert [pred.expression for pred in result.predicates] == ["n.age > 5"]

0 commit comments

Comments
 (0)