Skip to content

Commit e1b3617

Browse files
lmeyerovclaude
andauthored
feat(gfql): two-tier pass execution — Tier 1 structural + Tier 2 fixed-point loop (#1189) (#1191)
* feat(gfql): two-tier pass execution — Tier 1 structural + Tier 2 fixed-point loop (#1189) Extend PassManager to support two explicit pass tiers: - Tier 1 (structural): each pass runs exactly once in configured order - Tier 2 (rewrite): all passes repeat in a fixed-point loop until a full sweep produces no changes, bounded by max_iterations (default 100) PassResult gains `changed: bool = True` for Tier 2 convergence signalling. Default True preserves backward compatibility for passes that predate two-tier semantics. Add UnnestApply as Tier 1 structural pass: rewrites non-correlated Apply nodes (correlation_vars == frozenset()) to Join(join_type="cross"), exposing them to downstream join-ordering passes. Correlated Apply nodes are left untouched. Wire PredicatePushdownPass as the first Tier 2 rewrite rule; set changed=pushed > 0 for correct convergence detection. Update DEFAULT_LOGICAL_PASSES and DEFAULT_TIER2_PASSES accordingly and wire both into _run_logical_pass_pipeline in gfql_unified.py. 19 new tests cover tier-1 ordering, tier-2 fixed-point convergence, verifier failure propagation in both tiers, max_iterations breach, UnnestApply rewrites, and backward-compat single-arg PassManager call. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(gfql): address wave-1 review findings for two-tier pass manager (#1189) - Remove duplicate DEFAULT_LOGICAL_PASSES/DEFAULT_TIER2_PASSES sentinel names from manager.py (inline as ()) to prevent silent empty-pass imports from the manager module; populated defaults remain in passes/__init__.py - Add comment to lowering.py explaining compilation-time vs runtime pass pipeline split and why double-application of PredicatePushdownPass is safe - Add output_schema preservation test for UnnestApply rewrite - Add combined smoke test: default PassManager config (UnnestApply T1 + PredicatePushdown T2) runs without error on a plain plan Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(gfql): wave-2 polish — Tier 1 API in lowering.py, real integration test (#1189) - lowering.py: use tier1_passes= (single-execution) instead of tier2_passes= to match the "single pass at compilation time" intent; update comment - test_unnest_apply.py: replace smoke test with real integration test that builds Apply(Filter(PatternMatch)) and asserts UnnestApply rewrites the outer Apply to Join and PredicatePushdownPass pushes the predicate into PatternMatch Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(gfql/passes): remove redundant compilation-time predicate pushdown from lowering.py The PassManager call that applied PredicatePushdownPass at compile time was added before the runtime pass pipeline in gfql_unified.py was fully wired. Now that DEFAULT_TIER2_PASSES includes PredicatePushdownPass and _run_logical_pass_pipeline runs on every query execution, the compilation-time application is redundant and can be removed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(gfql/passes): accumulate Tier 2 integer metadata across iterations + document slot enumeration Tier 2 metadata previously overwrote per iteration, so final metadata showed pushed_predicates=0 after convergence (last iteration always pushes nothing). Now integer fields are summed across iterations so the final metadata reflects cumulative work. Also documents that the _unnest_tree child-slot tuple is exhaustive over all current LogicalPlan subclasses. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(gfql/tests): update test_lowering to reflect runtime-only predicate pushdown The compilation step (lowering.py) no longer applies PredicatePushdownPass; it now happens in the runtime pass pipeline via DEFAULT_TIER2_PASSES. Update the test to assert the correct post-compilation state: the plan contains a Filter node for the WHERE predicate (not yet pushed into PatternMatch.predicates). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 23fb3a9 commit e1b3617

10 files changed

Lines changed: 424 additions & 16 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
3232
- **GFQL / Cypher public API**: `compile_cypher()`, `compile_cypher_query()`, `CompiledCypherQuery`, `CompiledCypherUnionQuery`, and `CompiledCypherProcedureCall` are deprecated and **scheduled for removal in a future release**. All emit `DeprecationWarning` at use. Migrate to `g.gfql(..., language="cypher")` for execution or `cypher_to_gfql()` / `gfql_from_cypher()` for chain translation. Tracked in #1169.
3333

3434
### Added
35+
- **GFQL / two-tier pass execution**: Extended `PassManager` to support two explicit pass tiers: Tier 1 structural passes run once in configured order; Tier 2 rewrite rules run in a fixed-point loop until a full sweep makes no changes or `max_iterations` is exceeded. `PassResult` gains a `changed: bool` field (default `True` for backward compatibility) used by the convergence check. Added `UnnestApply` as the first Tier 1 structural pass — rewrites non-correlated `Apply` nodes (empty `correlation_vars`) to `Join(join_type="cross")`, exposing them to downstream join-ordering passes; correlated Apply nodes are preserved. `PredicatePushdownPass` is wired as the first Tier 2 rewrite rule and now sets `changed=pushed > 0` for correct convergence. `DEFAULT_LOGICAL_PASSES` and `DEFAULT_TIER2_PASSES` are populated accordingly and wired into `gfql()` execution. 19 new unit tests across `test_pass_manager.py` and `test_unnest_apply.py` (#1189).
3536
- **GFQL / pass framework skeleton**: Added `graphistry/compute/gfql/passes/` with `LogicalPass`, `PassResult`, and deterministic `PassManager.run()` sequencing. The pass manager now invokes IR `verify()` after each pass and fails fast on invalid pass output. Wired a new logical-pass pipeline hook into `gfql()` execution between logical-plan and physical-planner stages using a default no-op pass configuration to preserve runtime behavior. Added focused tests for pass ordering, verifier-failure propagation, and runtime pipeline hook invocation (`test_pass_manager.py`, `test_runtime_physical_cutover.py`) (#1180).
3637
- **GFQL / predicate pushdown safety**: Added `graphistry/compute/gfql/ir/pushdown_safety.py` with three reusable utilities for `PredicatePushdownPass`: `is_null_rejecting(pred, null_extended_aliases)` — conservative syntactic heuristic returning True when a predicate references a null-extended alias (OPTIONAL MATCH) and does not use a null-safe form (IS NULL, IS NOT NULL, COALESCE, NULLIF); `is_null_safe` — inverse; `with_barrier_blocks_pushdown(scope_stack, pred_refs)` — returns True when a WITH-clause `ScopeFrame` prevents backward predicate movement for the given reference set. All three exported from `ir/__init__.py`. 41 unit tests (#1181).
3738
- **GFQL / predicate pushdown rewrite**: Added `PredicatePushdownPass` implementation in `graphistry/compute/gfql/passes/predicate_pushdown.py` and wired it into logical planning route execution. The pass rewrites `Filter(input=PatternMatch(...))` by pushing safe predicates into `PatternMatch.predicates`, keeps residual filters for partial-push cases, and blocks null-rejecting pushdown into optional arms using existing safety helpers. Added focused pass tests and a lowering-route integration assertion (`test_predicate_pushdown_pass.py`, `cypher/test_lowering.py`) (#1187).

graphistry/compute/gfql/cypher/lowering.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
from graphistry.compute.gfql.ir.types import ScalarType
4848
from graphistry.compute.gfql.ir.verifier import verify as verify_logical_plan
4949
from graphistry.compute.gfql.logical_planner import LogicalPlanner
50-
from graphistry.compute.gfql.passes import PassManager, PredicatePushdownPass
5150
from graphistry.compute.predicates.ASTPredicate import ASTPredicate
5251
from graphistry.compute.predicates.comparison import eq, ge, gt, isna, le, lt, ne, notna
5352
from graphistry.compute.predicates.is_in import is_in
@@ -8335,7 +8334,6 @@ def _logical_plan_route_for_query(
83358334
logical_plan = LogicalPlanner(
83368335
allow_unknown_match_aliases=allow_unknown_match_aliases
83378336
).plan(bound_ir, ctx)
8338-
logical_plan = PassManager((PredicatePushdownPass(),)).run(logical_plan, ctx).plan
83398337
except GFQLValidationError as exc:
83408338
return None, str(exc.message)
83418339
_verify_selected_logical_plan(logical_plan)
Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,21 @@
11
"""Logical plan pass framework."""
22

3-
from .manager import DEFAULT_LOGICAL_PASSES, LogicalPass, PassManager, PassResult
3+
from .manager import LogicalPass, PassManager, PassResult
44
from .predicate_pushdown import PredicatePushdownPass
5+
from .unnest_apply import UnnestApply
6+
7+
# Tier 1: structural passes that run once in order.
8+
DEFAULT_LOGICAL_PASSES = (UnnestApply(),)
9+
10+
# Tier 2: rewrite rules that run in a fixed-point loop until convergence.
11+
DEFAULT_TIER2_PASSES = (PredicatePushdownPass(),)
512

613
__all__ = [
714
"DEFAULT_LOGICAL_PASSES",
15+
"DEFAULT_TIER2_PASSES",
816
"LogicalPass",
917
"PassManager",
1018
"PassResult",
1119
"PredicatePushdownPass",
20+
"UnnestApply",
1221
]

graphistry/compute/gfql/passes/manager.py

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from __future__ import annotations
33

44
from dataclasses import dataclass, field
5-
from typing import Dict, Optional, Protocol, Sequence, Tuple
5+
from typing import Dict, Protocol, Sequence, Tuple
66

77
from graphistry.compute.exceptions import ErrorCode, GFQLValidationError
88
from graphistry.compute.gfql.ir.compilation import CompilerError, PlanContext
@@ -16,6 +16,9 @@ class PassResult:
1616

1717
plan: LogicalPlan
1818
metadata: Dict[str, object] = field(default_factory=dict)
19+
# Tier 2 convergence signal: set False when the pass made no changes.
20+
# Defaults to True so passes that predate two-tier semantics are conservative.
21+
changed: bool = True
1922

2023

2124
class LogicalPass(Protocol):
@@ -25,22 +28,42 @@ class LogicalPass(Protocol):
2528

2629
def run(self, plan: LogicalPlan, ctx: PlanContext) -> PassResult:
2730
"""Transform a logical plan and return a PassResult."""
31+
...
2832

2933

30-
DEFAULT_LOGICAL_PASSES: Tuple[LogicalPass, ...] = ()
34+
_DEFAULT_MAX_ITERATIONS = 100
3135

3236

3337
class PassManager:
34-
"""Sequential pass runner with verifier guards after each pass."""
35-
36-
def __init__(self, passes: Sequence[LogicalPass] = DEFAULT_LOGICAL_PASSES) -> None:
37-
self._passes: Tuple[LogicalPass, ...] = tuple(passes)
38+
"""Two-tier pass runner with verifier guards after each pass.
39+
40+
Tier 1 (*tier1_passes*): each pass runs exactly once in configured order.
41+
Tier 2 (*tier2_passes*): all passes run repeatedly in a fixed-point loop
42+
until a full sweep produces no changes (every pass returns
43+
``PassResult.changed=False``). Bounded by *max_iterations* to guarantee
44+
termination.
45+
46+
Populated defaults (``DEFAULT_LOGICAL_PASSES``, ``DEFAULT_TIER2_PASSES``)
47+
are defined in the package ``__init__`` to avoid circular imports.
48+
"""
49+
50+
def __init__(
51+
self,
52+
tier1_passes: Sequence[LogicalPass] = (),
53+
tier2_passes: Sequence[LogicalPass] = (),
54+
*,
55+
max_iterations: int = _DEFAULT_MAX_ITERATIONS,
56+
) -> None:
57+
self._tier1: Tuple[LogicalPass, ...] = tuple(tier1_passes)
58+
self._tier2: Tuple[LogicalPass, ...] = tuple(tier2_passes)
59+
self._max_iterations = max_iterations
3860

3961
def run(self, logical_plan: LogicalPlan, ctx: PlanContext) -> PassResult:
4062
current = logical_plan
4163
merged_metadata: Dict[str, object] = {}
4264

43-
for logical_pass in self._passes:
65+
# --- Tier 1: structural passes, each runs exactly once ---
66+
for logical_pass in self._tier1:
4467
result = logical_pass.run(current, ctx)
4568
current = result.plan
4669
if result.metadata:
@@ -49,9 +72,42 @@ def run(self, logical_plan: LogicalPlan, ctx: PlanContext) -> PassResult:
4972
if diagnostics:
5073
raise _verification_error(logical_pass.name, diagnostics)
5174

75+
# --- Tier 2: rewrite rules, fixed-point loop ---
76+
if self._tier2:
77+
for _ in range(self._max_iterations):
78+
any_changed = False
79+
for logical_pass in self._tier2:
80+
result = logical_pass.run(current, ctx)
81+
current = result.plan
82+
if result.metadata:
83+
_accumulate_metadata(merged_metadata, logical_pass.name, result.metadata)
84+
if result.changed:
85+
any_changed = True
86+
diagnostics = verify(current)
87+
if diagnostics:
88+
raise _verification_error(logical_pass.name, diagnostics)
89+
if not any_changed:
90+
break
91+
else:
92+
raise _convergence_error(self._max_iterations)
93+
5294
return PassResult(plan=current, metadata=merged_metadata)
5395

5496

97+
def _accumulate_metadata(
98+
merged: Dict[str, object], pass_name: str, new: Dict[str, object]
99+
) -> None:
100+
existing = merged.get(pass_name)
101+
if isinstance(existing, dict):
102+
acc: Dict[str, object] = dict(existing)
103+
for k, v in new.items():
104+
prev = acc.get(k)
105+
acc[k] = prev + v if isinstance(prev, int) and isinstance(v, int) else v # type: ignore[operator]
106+
merged[pass_name] = acc
107+
else:
108+
merged[pass_name] = dict(new)
109+
110+
55111
def _verification_error(pass_name: str, diagnostics: Sequence[CompilerError]) -> GFQLValidationError:
56112
message = "; ".join(error.message for error in diagnostics[:3])
57113
if len(diagnostics) > 3:
@@ -64,3 +120,14 @@ def _verification_error(pass_name: str, diagnostics: Sequence[CompilerError]) ->
64120
suggestion=message or "Ensure pass output satisfies LogicalPlan verifier invariants.",
65121
language="cypher",
66122
)
123+
124+
125+
def _convergence_error(max_iterations: int) -> GFQLValidationError:
126+
return GFQLValidationError(
127+
ErrorCode.E108,
128+
f"Tier 2 pass loop did not converge after {max_iterations} iterations",
129+
field="pass",
130+
value="tier2",
131+
suggestion="Check that Tier 2 passes converge and set PassResult.changed=False when unchanged.",
132+
language="cypher",
133+
)

graphistry/compute/gfql/passes/predicate_pushdown.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def run(self, plan: LogicalPlan, ctx) -> PassResult: # noqa: ANN001
3030
"pushed_predicates": pushed,
3131
"residual_predicates": residual,
3232
},
33+
changed=pushed > 0,
3334
)
3435

3536

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""UnnestApply: Tier 1 structural pass eliminating non-correlated Apply operators.
2+
3+
A non-correlated Apply (``correlation_vars == frozenset()``) is semantically
4+
equivalent to a cross join because the subquery does not reference any variable
5+
from the outer input. Rewriting to ``Join(join_type="cross")`` exposes the
6+
shape to downstream join-ordering and predicate-pushdown passes.
7+
8+
Correlated Apply operators are left untouched.
9+
"""
10+
from __future__ import annotations
11+
12+
from dataclasses import replace
13+
from typing import Any, Tuple, cast
14+
15+
from graphistry.compute.gfql.ir.logical_plan import Apply, Join, LogicalPlan
16+
from graphistry.compute.gfql.passes.manager import LogicalPass, PassResult
17+
18+
19+
class UnnestApply:
20+
"""Rewrite non-correlated Apply nodes to cross Join nodes."""
21+
22+
name = "unnest_apply"
23+
24+
def run(self, plan: LogicalPlan, ctx: Any) -> PassResult: # noqa: ANN401
25+
_ = ctx
26+
rewritten, count = _unnest_tree(plan)
27+
return PassResult(
28+
plan=rewritten,
29+
metadata={"unnested": count},
30+
changed=count > 0,
31+
)
32+
33+
34+
def _unnest_tree(plan: LogicalPlan) -> Tuple[LogicalPlan, int]:
35+
count = 0
36+
children_updates = {}
37+
# Exhaustive list of plan-child slot names across all LogicalPlan subclasses (logical_plan.py).
38+
for slot in ("input", "left", "right", "subquery"):
39+
child = getattr(plan, slot, None)
40+
if isinstance(child, LogicalPlan):
41+
rewritten_child, child_count = _unnest_tree(child)
42+
count += child_count
43+
if rewritten_child is not child:
44+
children_updates[slot] = rewritten_child
45+
46+
current: LogicalPlan = (
47+
cast(LogicalPlan, replace(cast(Any, plan), **children_updates))
48+
if children_updates
49+
else plan
50+
)
51+
52+
if isinstance(current, Apply) and not current.correlation_vars:
53+
joined = Join(
54+
op_id=current.op_id,
55+
output_schema=current.output_schema,
56+
left=current.input,
57+
right=current.subquery,
58+
condition=None,
59+
join_type="cross",
60+
)
61+
return joined, count + 1
62+
63+
return current, count

graphistry/compute/gfql_unified.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
SamePathExecutorWrapper,
5050
WavefrontExecutorWrapper,
5151
)
52-
from graphistry.compute.gfql.passes import DEFAULT_LOGICAL_PASSES, PassManager
52+
from graphistry.compute.gfql.passes import DEFAULT_LOGICAL_PASSES, DEFAULT_TIER2_PASSES, PassManager
5353
from graphistry.compute.gfql.row.pipeline import is_row_pipeline_call
5454
from graphistry.compute.typing import DataFrameT, SeriesT
5555
from graphistry.compute.util.generate_safe_column_name import generate_safe_column_name
@@ -682,8 +682,8 @@ def _execute_compiled_query_non_union(
682682

683683

684684
def _run_logical_pass_pipeline(logical_plan: LogicalPlan, ctx: PlanContext) -> LogicalPlan:
685-
"""Run logical pass pipeline with default no-op pass configuration."""
686-
return PassManager(DEFAULT_LOGICAL_PASSES).run(logical_plan, ctx).plan
685+
"""Run logical pass pipeline: Tier 1 structural passes then Tier 2 fixed-point rewrite loop."""
686+
return PassManager(DEFAULT_LOGICAL_PASSES, DEFAULT_TIER2_PASSES).run(logical_plan, ctx).plan
687687

688688

689689
def _execute_compiled_query_via_physical_plan(

graphistry/tests/compute/gfql/cypher/test_lowering.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -786,7 +786,9 @@ def test_logical_plan_route_for_query_allows_unknown_alias_match_shape_when_opte
786786
assert defer_reason is None
787787

788788

789-
def test_logical_plan_route_for_query_pushes_where_predicate_into_pattern_match() -> None:
789+
def test_logical_plan_route_for_query_emits_filter_for_where_predicate() -> None:
790+
# Compilation emits a Filter node for the WHERE clause; predicate pushdown into
791+
# PatternMatch.predicates happens later in the runtime pass pipeline (gfql_unified.py).
790792
query = _parse_query("MATCH (a)-[r]->(b) WHERE r.weight > 5 RETURN b")
791793
bound_ir = FrontendBinder().bind(query, PlanContext())
792794

@@ -803,10 +805,11 @@ def _walk(node): # noqa: ANN001, ANN202
803805
yield from _walk(child)
804806

805807
nodes = list(_walk(logical_plan))
808+
# Predicate is in a Filter node — not yet pushed into PatternMatch
809+
assert any(isinstance(node, Filter) and "alias='r'" in node.predicate.expression for node in nodes)
806810
pattern_nodes = [node for node in nodes if isinstance(node, PatternMatch)]
807811
assert pattern_nodes
808-
assert any("alias='r'" in pred.expression for pred in pattern_nodes[0].predicates)
809-
assert not any(isinstance(node, Filter) and "alias='r'" in node.predicate.expression for node in nodes)
812+
assert not any("alias='r'" in pred.expression for pred in pattern_nodes[0].predicates)
810813

811814

812815
def test_compiled_query_sets_logical_plan_route_for_call_shape() -> None:

0 commit comments

Comments
 (0)