Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions plans/0787-xorq-rows-first-coverage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Follow-up: restore the dropped phase-divergence assertion on #787

> PR #787 (rows-first state_change spike) open question, called out in
> the PR body as **the most important item to resolve before
> un-gating**. The two spike tests currently assert the wire shape (two
> ``initial_state`` frames per state_change, infinite_request services
> in between) but do *not* assert that phase 2's stats payload differs
> from phase 1's. Without that assertion the spike test would still
> pass if ``recompute_summary_sd()`` were a no-op — we can't detect
> regression of the spike's whole point. The original draft of the
> test had this assertion; it was dropped because it failed on xorq.

## What "stats differ between phases" means

Phase 1 of the spike emits an ``initial_state`` *before* the analysis
pipeline runs, with ``_defer_summary_sd = True`` short-circuiting the
``_summary_sd`` observer. ``self.summary_sd`` therefore stays at its
prior value. Phase 2 lifts the gate via ``recompute_summary_sd()``,
re-runs the pipeline against the *new* ``processed_df``, and emits a
second ``initial_state``.

If the protocol works, phase 1's ``merged_sd`` is the **previous**
state's stats and phase 2's is the **current** state's stats. The
dropped assertion was something like:

```python
self.assertNotEqual(
phase1["df_data_dict"]["all_stats"],
phase2["df_data_dict"]["all_stats"])
```

The reason it failed on xorq is not yet established — the PR body
flags this as TBD. This file proposes how to investigate and what
shape the restored assertion should take.

## Hypotheses for the xorq failure

Ranked rough-to-likely:

1. **Phase 1 emits empty/initial stats on xorq.** The xorq path may
reach phase 1 before *any* ``summary_sd`` has been computed — the
prior value is the initial empty dict. Phase 2 then computes real
stats. ``phase1.stats != phase2.stats`` would pass for that case,
so this can't be the failure mode unless the *phase 2* stats are
also empty.
2. **`recompute_summary_sd()` is a no-op on xorq.** ``XorqDataflow``
inherits ``_summary_sd`` and ``_defer_summary_sd`` from
``DataFlow``, but its ``_get_summary_sd`` overrides the base
(``xorq_buckaroo.py:137``). If the overridden path early-exits when
the gate flag has already done its job, phase 2 produces nothing
new and the assertion fails because the payloads are equal.
3. **Both phases emit the same ``processed_df`` payload because xorq
evaluates ``processed_result`` lazily.** The cascade
``processed_result → summary_sd → merged_sd`` may not have fully
landed by phase 1 send time, so phase 1's broadcast already
includes phase-2-like stats. The 10ms ``call_later`` is too short
to matter.
4. **The xorq summary cache is hot.** Phase 1 and phase 2 read the
same cached SD because the cache invalidation didn't fire between
them. Less likely given the ``_defer_summary_sd`` gate, but worth
checking.

The instrumentation to disambiguate: log
``id(self.summary_sd)`` and ``self.summary_sd.get('a', {}).get('mean')``
at three points — before phase 1 broadcast, before
``recompute_summary_sd()``, after ``recompute_summary_sd()``. Run the
test once on pandas and once on xorq and diff the logs.

## What the restored assertion should pin

Not just "the dicts differ." That's too coarse — it passes when phase
1 is empty *and* phase 2 is real, which masks "phase 1 is real and
phase 2 is a no-op" (the actual failure mode the assertion is meant
to catch).

The robust form: pick a stat whose value is **observable and
deterministic** under the spike's state_change, and assert both
phases:

```python
# Test setup: load 10-row dataframe, state change adds a search filter
# that keeps 4 rows.
phase1_stats = phase1["df_data_dict"]["all_stats"]
phase2_stats = phase2["df_data_dict"]["all_stats"]

# Phase 1 carries the prior (no-filter) length: 10.
self.assertEqual(phase1_stats["idx"]["length"], 10)

# Phase 2 carries the filtered length: 4.
self.assertEqual(phase2_stats["idx"]["filtered_length"], 4)
```

This pins both phases to concrete values, not just "they differ." On
xorq specifically, ``filtered_length`` is the load-bearing value —
it's the only stat the spike's state_change actually causes to
change, since the filter is what moves it.

## Recommended sequence

1. **Add instrumentation** (the three log points above). Run both
pandas and xorq variants of the existing test under verbose logging
and diff. This is throwaway code, doesn't ship.
2. **Identify the specific failure mode** from the diff.
3. **Decide whether the failure is a bug in ``XorqDataflow`` or a
limitation of the spike's protocol.** If the former, fix the
dataflow (likely a missed observer override or a cache-key bug). If
the latter, the spike is xorq-incompatible and that's a design
verdict, not a test-coverage gap.
4. **Restore the assertion** in the load-bearing form above. Add it
to *both* spike tests (the second test currently only asserts wire
order, not stat divergence).

## What if the failure is structural and unfixable?

The PR body's "Open questions" section also notes inter-phase race
issues and 10ms delay tuning. If the xorq path is structurally
incompatible — e.g. xorq's lazy evaluation makes "compute stats after
emitting rows" meaningless because the rows themselves trigger the
stats compute — the right outcome is to **scope the spike to pandas
only**. The env flag would gate-by-backend (e.g.
``_ROWS_FIRST_SPIKE and isinstance(dataflow, ServerDataflow)``) and the
xorq path would keep today's single-frame rebroadcast. Document it as
a known limitation rather than blocking the spike on xorq parity.

## Out of scope here

- Inter-phase ``state_change`` race (a second state_change between
phases). Separate plan — needs a serialise / cancel-previous
strategy.
- Delay tuning for non-localhost RTT.
- Cache-hit fast path (skip phase 2 when ``filt_sd_key`` is already
cached).
- Phase-2 message type (``stats_update`` vs second ``initial_state``).
Worth doing but independent of the assertion-coverage gap this plan
addresses.

## Estimated scope

- Instrumentation + diagnosis: ~1 hour.
- Fix or scope decision: depends on root cause.
- Test restoration: ~30 lines, both spike tests.
165 changes: 165 additions & 0 deletions plans/0788-pushdown-consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# Follow-up: actually consume the `pushdown=` metadata

> PR #788 follow-up note. #788 lands the *metadata*
> (`StatFunc.pushdown: Tuple[str, ...]`) and the decorator kwarg. No
> code reads the field yet. This file maps out what "reading it" looks
> like, picks a recommended shape, and lists the open questions to
> answer before we ship the consumer.

## What's in #788

- `StatFunc.pushdown: Tuple[str, ...]`, default `()`.
- `@stat(pushdown=("xorq", "polars"))` plumbs into that field.
- Bare-string special case so `@stat(pushdown="xorq")` doesn't become
`('x','o','r','q')` (Codex P2, fixed in this branch).

Three tests pin the metadata round-trip. Nothing reads `sf.pushdown`.

## The decision the consumer has to make

For each `(stat_func, column)` pair the pipeline is about to evaluate
on a non-pandas backend (today: xorq), choose one of:

- **Materialize-then-call.** Today's behaviour. Pull the column into a
pandas Series, call the stat function. Cost: a full column round-trip
through pandas memory for every stat.
- **Push down.** Substitute the stat's body for the engine-native form
of the same operation. Cost: an extra entry in the batched
`table.aggregate(...)`. Won't work for stats whose body uses
pandas-only ops (`value_counts`, `groupby`, scipy calls, …).

`pushdown=` is the marker that says "the substitution is safe for this
function on these backends." It is *not* a per-call dispatcher — the
consumer needs to know how to produce the engine-side form.

## Two interpretations of "knows how to produce the engine-side form"

### Interp A: trust-the-author duck typing

The function body uses only ops that exist on both `pandas.Series` and
`ibis.Column`. The pipeline rebinds the `RawSeries` parameter to the
table column expression and re-runs the function. `ser.mean()` returns
a float on pandas, an ibis aggregate expression on xorq — fold the
latter into the batch aggregate.

Pros: zero code per stat. The `pushdown=` tuple is the *only* contract
the author writes.

Cons: silently wrong for ops that exist on both APIs but have
different semantics (timezone handling, null propagation, sort
stability). Hard to test exhaustively.

### Interp B: separate registered xorq form

The author writes two `@stat` functions (or one with two bodies). The
`pushdown=` tuple is the link: the consumer looks up a stat with the
same `name` in the xorq registry and prefers that one. Today the
distinction is already there in `_is_batch_func` — `XorqColumn` vs
`RawSeries`.

Pros: explicit semantics. Each backend's form is tested independently.

Cons: doubles authoring overhead. Asks the author to maintain two
forms.

**Recommendation: Interp A**, opt-in per stat. The author who flips
`pushdown=("xorq",)` is asserting "I read my body and the operations
I use are semantically identical on `ibis.Column`." That's a smaller
ask than "write a second function" and matches the existing
`XorqColumn` mechanism (which already trusts the author to return an
ibis expression).

## Recommended consumer shape

`XorqStatPipeline._build_batch_aggregate(table)` currently folds in
every `_is_batch_func` stat. Extend it to also fold in
`pushdown_eligible_for("xorq")` stats:

```python
def _pushdown_eligible_for(sf: StatFunc, backend_id: str) -> bool:
"""Stat is pushdown-marked for this backend, with all-raw deps."""
if backend_id not in sf.pushdown:
return False
# Same all-raw-deps constraint as _is_batch_func — derived stats
# can't run in the pre-aggregate phase.
for r in sf.requires:
if r.type in RAW_MARKER_TYPES:
continue
return False
return True
```

In the batch-aggregate loop, when a stat is pushdown-eligible:

1. Synthesize a `XorqColumn` binding for every `RawSeries` /
`SampledSeries` parameter (rebind to `table[col_name]`).
2. Call the original function body. It returns an ibis expression
(because every op the body invokes is overloaded the same way).
3. Fold that expression into the batch aggregate under the stat's
`provides` key, same as `_is_batch_func` does today.

Failure mode: the function body invokes an op that doesn't exist on
`ibis.Column` (e.g. `ser.value_counts()`). The exception fires at
batch-build time, in `_unit_test_result`. The unit_test against
`PERVERSE_DF` already catches construction-time bugs; this extends
that coverage to "the pushdown-marked stat actually pushes down."

## What about `pushdown=("polars",)`?

Polars has no `XorqStatPipeline` analogue today. The metadata is
load-bearing for future expansion but unused at consume time. Wire the
consumer through a `BACKEND_REGISTRY` keyed on backend id; xorq is the
only entry today. Polars lands when there's a polars pipeline (#769
is the natural home).

## Open questions for the implementer

1. **Pipeline construction vs per-process-table eligibility.** Today
`_is_batch_func` is evaluated at pipeline-build time. Pushdown
eligibility could be the same — `pushdown` is static metadata. But
the substitution requires a `table` reference (for `table[col]`).
Defer the substitution to `process_table` and treat the metadata
check as build-time only.

2. **Backward compat with `XorqColumn`-typed stats.** If a stat
already takes `XorqColumn` *and* declares `pushdown=("xorq",)`, the
pushdown path would synthesize a re-binding even though the stat
already accepts the engine-side column. Decide: ignore pushdown on
stats that already speak xorq natively (return `False` from
`_pushdown_eligible_for` when any param is `XorqColumn`), or treat
it as a no-op.

3. **Error attribution.** A pushdown stat that fails at execute time
(because the synthesized expression is malformed, not because the
stat function body raised) should surface as a `StatError` for
that stat's name, not as a generic batch-aggregate failure.

4. **Should the unit_test cover both paths?** A stat with
`pushdown=("xorq",)` has two execution modes: pandas and xorq. The
xorq unit_test runs against `PERVERSE_DF` wrapped as a memtable;
the pandas unit_test runs against `PERVERSE_DF` directly. Both
should succeed before we trust the marker.

5. **Telemetry.** Worth logging which stats actually pushed down vs
fell back, especially during rollout. A pipeline-level counter
per `process_table` call is probably enough.

## Test plan for the consumer PR

- **Failing test pre-fix (one CI run):** an `@stat(pushdown=("xorq",))
def mean(ser: RawSeries) -> float: return float(ser.mean())` is
applied to an `xo.memtable`. Without the consumer, the test asserts
the stat ran via pandas materialization (probe: count of
`table.execute()` calls, or a sentinel). With the consumer, the
stat should appear in the batch aggregate (probe: inspect the
compiled aggregate's column refs).
- **Fix commit:** the `_pushdown_eligible_for` check + the
`XorqStatPipeline` integration.
- **Regression coverage:** `value_counts`-style stat marked
`pushdown=("xorq",)` raises at unit_test time (catches "author
marked a stat as pushdown-safe when it isn't").

## Estimated scope

~150 lines of pipeline code + ~80 lines of tests. The metadata is
already in place; this is just the read path.
Loading
Loading