Skip to content

Commit 76743bf

Browse files
authored
feat(scoped-sd): wire cleaned_* into merged_sd + Codex P2 fix (#789)
* test(scoped-sd): failing tests for cleaned_* scope + Codex P2 invalidation Adds four tests pinning the deferred items on #785: - ``test_cleaned_keys_appear_when_cleaning_active`` — clean scope SD must be layered into ``merged_sd`` with a ``cleaned_*`` prefix when cleaning ops are active. - ``test_cleaning_only_does_not_emit_filtered_keys`` — the broken ``filter_active`` gate currently mislabels cleaning-affected stats as ``filtered_*`` when no search filter is active. The right gate is on chain-shape diff (``filt != clean``). - ``test_filter_and_clean_both_emit_correctly`` — both scopes layered without cross-talk; ``cleaned_null_count`` reflects the clean df, ``filtered_null_count`` reflects the search-nulled df. - ``test_analysis_klasses_change_invalidates_scoped_sd`` — Codex P2 pin: a swap of ``analysis_klasses`` must invalidate the per-scope SD cache so the new stat klass's keys surface in ``merged_sd``. All four are expected to fail on this commit; fixes follow. * feat(scoped-sd): wire cleaned_* into merged_sd + chain-shape gates + Codex P2 A. ``_merged_sd``: reads the clean scope's SD from the keyed cache and layers ``cleaned_*`` keys between the raw bare keys and the ``filtered_*`` keys. Adds ``clean_sd_key`` to the observed set so the observer fires after ``_populate_sd_cache`` has updated the pointer. Replaces the broken ``filter_active = filt_sd_key != raw_sd_key`` gate (which fired whenever cleaning was active, mislabelling cleaning-affected stats as ``filtered_*``) with chain-shape diffs: chains = split_chain_by_scope(self.operations) cleaning_active = chains['clean'] != chains['raw'] filter_active = chains['filt'] != chains['clean'] ``filtered_*`` now only fires when the filt chain extends the clean chain — i.e. a real quick-command op was added. B. ``_scope_cache_key``: includes ``id(self.analysis_klasses)`` in the ``extra`` arg passed to ``hash_chain``. Pins Codex P2: a klass-list swap with an unchanged op chain now produces a distinct cache key, so the scope SD is recomputed against the new klasses. The previously-failing ``test_cleaning_only_does_not_emit_filtered_keys`` (the gate-bug pin from the failing-tests commit) now passes naturally; no test changes in this commit.
1 parent 8c6210a commit 76743bf

2 files changed

Lines changed: 155 additions & 53 deletions

File tree

buckaroo/dataflow/dataflow.py

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -405,21 +405,23 @@ def setup_options_from_analysis(self):
405405
df_data_dict = Any({'empty':[]}).tag(sync=True)
406406

407407

408-
@observe('summary_sd', 'processed_result', 'filt_sd_key')
408+
@observe('summary_sd', 'processed_result', 'clean_sd_key', 'filt_sd_key')
409409
@exception_protect('merged_sd-protector')
410410
def _merged_sd(self, change):
411411
# Bare keys come from the raw scope's SD (computed on
412-
# sampled_df). ``filtered_*`` keys are layered on top from the
413-
# filt scope's SD when the filter is active. Scope SDs are read
414-
# from the keyed cache (#783) — the cache observer
415-
# ``_populate_sd_cache`` is what computes and stores them; this
416-
# observer just assembles the wire shape #777's `?key` JS
417-
# consumes.
412+
# sampled_df). ``cleaned_*`` keys are layered on top from the
413+
# clean scope's SD when cleaning is active; ``filtered_*`` keys
414+
# are layered on top from the filt scope's SD when a search
415+
# filter is active. Scope SDs are read from the keyed cache
416+
# (#783) — the cache observer ``_populate_sd_cache`` is what
417+
# computes and stores them; this observer just assembles the
418+
# wire shape #777's `?key` JS consumes.
418419
#
419-
# filt_sd_key is in the observed set so this fires after
420-
# ``_populate_sd_cache`` has updated the pointer (which it
421-
# always does, even on a pure cache hit) — guarantees the
422-
# cache lookups below see the right keys for the current state.
420+
# clean_sd_key and filt_sd_key are in the observed set so this
421+
# fires after ``_populate_sd_cache`` has updated the pointers
422+
# (which it always does, even on a pure cache hit) — guarantees
423+
# the cache lookups below see the right keys for the current
424+
# state.
423425

424426
# Resolve scope SDs. Falls back to summary_sd / cleaned_sd
425427
# for pre-cache-population states (initial startup, the brief
@@ -428,17 +430,23 @@ def _merged_sd(self, change):
428430
raw_sd = cache.get(self.raw_sd_key) if self.raw_sd_key else None
429431
if raw_sd is None:
430432
raw_sd = self.summary_sd or {}
433+
clean_sd = cache.get(self.clean_sd_key) if self.clean_sd_key else None
434+
if clean_sd is None:
435+
clean_sd = self.cleaned_sd or {}
431436
filt_sd = cache.get(self.filt_sd_key) if self.filt_sd_key else None
432437
if filt_sd is None:
433438
filt_sd = self.summary_sd or {}
434439

435-
# ``filtered_*`` keys reflect "search filter applied on top of
436-
# cleaning", so the gate is "filt chain has ops the clean chain
437-
# doesn't" — i.e. at least one quick-command op is present. Keying
438-
# off ``filt_sd_key != raw_sd_key`` would also fire for
439-
# cleaning-only states, mislabelling cleaned stats as filtered
440-
# until the deferred ``cleaned_*`` scope lands.
440+
# Gate each prefixed layer on a chain-shape diff between scopes.
441+
# ``cleaned_*`` fires when the clean chain has ops the raw chain
442+
# doesn't (cleaning is on); ``filtered_*`` fires when the filt
443+
# chain has ops the clean chain doesn't (at least one
444+
# quick-command op is present, i.e. a search filter is on).
445+
# Keying off ``filt_sd_key != raw_sd_key`` would also fire
446+
# ``filtered_*`` for cleaning-only states, mislabelling cleaned
447+
# stats as filtered.
441448
chains = split_chain_by_scope(self.operations)
449+
cleaning_active = chains['clean'] != chains['raw']
442450
filter_active = chains['filt'] != chains['clean']
443451

444452
if self.processed_df is None:
@@ -451,6 +459,13 @@ def _merged_sd(self, change):
451459
intermediate_sd = merge_sds(rewritten_init_sd, self.cleaned_sd, raw_sd)
452460
base = merge_sd_overrides(intermediate_sd, self.processed_df, self.processed_sd)
453461

462+
# Layer ``cleaned_*`` keys on top when cleaning is active.
463+
if cleaning_active and clean_sd:
464+
for col, stats in clean_sd.items():
465+
col_dict = base.setdefault(col, {})
466+
for stat_name, val in stats.items():
467+
col_dict[f'cleaned_{stat_name}'] = val
468+
454469
# Layer ``filtered_*`` keys on top when a filter is active.
455470
if filter_active and filt_sd:
456471
for col, stats in filt_sd.items():
@@ -505,8 +520,9 @@ def _scope_cache_key(self, chain):
505520
506521
Includes the op chain *and* an identifier for the source
507522
dataframe (``id(sampled_df)``) *and* the post-processing method
508-
— all three are inputs to the scope df, and a cache hit must
509-
mean "same SD-producing inputs" not just "same chain".
523+
*and* the analysis-klasses identity — all four are inputs to
524+
the scope's SD, and a cache hit must mean "same SD-producing
525+
inputs" not just "same chain".
510526
511527
- sampled_df identity addresses codex P1 on #783: a ``raw_df``
512528
swap with an unchanged chain must invalidate.
@@ -515,13 +531,14 @@ def _scope_cache_key(self, chain):
515531
post-processing replaces the df entirely (e.g. ``hide_post``
516532
→ ``SENTINEL_DF``), the raw scope's SD must reflect that
517533
new df, not the pre-post-processing one.
518-
519-
analysis_klasses is *not* included here; that's a separate
520-
invariant (codex P2, deferred — see follow-up issue).
534+
- analysis_klasses identity addresses codex P2 on #783: a
535+
klass-list swap with an unchanged chain must invalidate so
536+
new stat klasses surface in ``merged_sd``.
521537
"""
522538
sampled_id = id(self.sampled_df) if self.sampled_df is not None else 0
523539
pp = self.post_processing_method or ''
524-
return hash_chain(chain, extra=f"{sampled_id}|{pp}")
540+
klasses_id = id(self.analysis_klasses)
541+
return hash_chain(chain, extra=f"{sampled_id}|{pp}|{klasses_id}")
525542

526543
@observe('summary_sd', 'operations', 'analysis_klasses')
527544
@exception_protect('sd-cache-protector')

tests/unit/dataflow/scoped_summary_stats_test.py

Lines changed: 115 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -112,36 +112,6 @@ def test_bare_mean_is_raw_not_filtered():
112112
)
113113

114114

115-
def test_cleaning_only_does_not_emit_filtered_keys():
116-
"""Cleaning ops in the chain (but no search/quick-command) must NOT
117-
cause ``filtered_*`` keys to appear. ``filtered_*`` semantically means
118-
"search filter is active"; a key-inequality gate (filt_sd_key !=
119-
raw_sd_key) would mislabel cleaning-affected stats as filtered until
120-
the deferred ``cleaned_*`` scope lands. The gate must be on the
121-
chains themselves: filt != clean.
122-
"""
123-
df = pd.DataFrame({'a': ['10', '20', '30', '40', '50'],
124-
'b': ['foo', 'bar', 'foo', 'baz', 'foo']})
125-
dfc = ScopedDataflow(df)
126-
dfc.cleaning_method = 'default'
127-
128-
clean_chain = [op for op in (dfc.operations or [])
129-
if isinstance(op, list) and len(op) > 0]
130-
assert len(clean_chain) > 0, (
131-
"precondition: cleaning_method='default' should have produced "
132-
"cleaning ops for a numeric-string column"
133-
)
134-
135-
sd = dfc.merged_sd
136-
filtered_keys = [k for k in sd.get('a', {}) if k.startswith('filtered_')]
137-
assert filtered_keys == [], (
138-
f"cleaning-only state must not emit filtered_* keys; got "
139-
f"{filtered_keys}. The `filter_active` gate is firing on "
140-
f"filt_sd_key != raw_sd_key instead of on the chain-shape "
141-
f"difference between filt and clean."
142-
)
143-
144-
145115
def test_raw_df_change_invalidates_scoped_sd():
146116
"""Codex P1 from #783: the cache key was derived only from the op chain,
147117
so a ``raw_df`` swap with the same (empty) chain reused stale entries.
@@ -166,3 +136,118 @@ def test_raw_df_change_invalidates_scoped_sd():
166136
f"mean (400.0); got {dfc.merged_sd['a']['mean']} — likely a stale "
167137
f"cache entry keyed only by the (unchanged) op chain"
168138
)
139+
140+
141+
def test_cleaned_keys_appear_when_cleaning_active():
142+
"""When ``cleaning_method`` produces auto-clean ops, the clean scope's
143+
SD must be layered into ``merged_sd`` with a ``cleaned_*`` prefix.
144+
145+
Column 'a' is numeric-string. ``safe_int`` casts it to a UInt8 column,
146+
so the clean scope's ``mean`` is 30.0 (computed on ints) while the
147+
raw scope's ``mean`` is the string-column fallback (0).
148+
"""
149+
df = pd.DataFrame({'a': ['10', '20', '30', '40', '50'],
150+
'b': ['foo', 'bar', 'foo', 'baz', 'foo']})
151+
dfc = ScopedDataflow(df)
152+
dfc.cleaning_method = 'default'
153+
154+
sd = dfc.merged_sd
155+
assert 'cleaned_mean' in sd['a'], (
156+
f"cleaning active: `cleaned_mean` should be emitted alongside raw "
157+
f"`mean`; got keys {sorted(sd['a'].keys())}"
158+
)
159+
assert sd['a']['cleaned_mean'] == 30.0, (
160+
f"`cleaned_mean` should be the int-cast mean (30.0); got "
161+
f"{sd['a']['cleaned_mean']}"
162+
)
163+
164+
165+
def test_cleaning_only_does_not_emit_filtered_keys():
166+
"""The pre-#785 ``filter_active`` gate was keyed on
167+
``filt_sd_key != raw_sd_key``, which fires whenever the clean chain is
168+
non-empty — even with no search filter. The right gate is on chain
169+
shape: ``filtered_*`` only when ``filt`` differs from ``clean``.
170+
171+
With cleaning active but no quick-command args, ``merged_sd`` must
172+
have ``cleaned_*`` keys and NO ``filtered_*`` keys.
173+
"""
174+
df = pd.DataFrame({'a': ['10', '20', '30', '40', '50'],
175+
'b': ['foo', 'bar', 'foo', 'baz', 'foo']})
176+
dfc = ScopedDataflow(df)
177+
dfc.cleaning_method = 'default'
178+
179+
sd = dfc.merged_sd
180+
filtered_keys = [k for k in sd['a'] if k.startswith('filtered_')]
181+
assert filtered_keys == [], (
182+
f"cleaning-only state must not emit filtered_* keys; got "
183+
f"{filtered_keys}"
184+
)
185+
cleaned_keys = [k for k in sd['a'] if k.startswith('cleaned_')]
186+
assert cleaned_keys, (
187+
"cleaning-only state should emit cleaned_* keys; got none"
188+
)
189+
190+
191+
def test_filter_and_clean_both_emit_correctly():
192+
"""With both cleaning and a search filter active, ``merged_sd``
193+
carries bare raw keys, ``cleaned_*`` keys reflecting the clean scope,
194+
and ``filtered_*`` keys reflecting the filt scope. The three layers
195+
do not cross-talk.
196+
197+
'a' is numeric-string; safe_int casts it. Search 'foo' on 'b' keeps
198+
the foo rows (length 4 in raw / clean scopes, with the filt scope
199+
nulling out non-foo rows in 'a' → 3 nulls).
200+
"""
201+
df = pd.DataFrame({'a': ['10', '20', '30', '40', '50', '60', '70'],
202+
'b': ['foo', 'bar', 'foo', 'baz', 'foo', 'bar', 'foo']})
203+
dfc = ScopedDataflow(df)
204+
dfc.cleaning_method = 'default'
205+
dfc.quick_command_args = {'search': ['foo']}
206+
207+
sd = dfc.merged_sd['a']
208+
cleaned_keys = [k for k in sd if k.startswith('cleaned_')]
209+
filtered_keys = [k for k in sd if k.startswith('filtered_')]
210+
assert cleaned_keys, f"both-active: cleaned_* keys missing; got {sorted(sd.keys())}"
211+
assert filtered_keys, f"both-active: filtered_* keys missing; got {sorted(sd.keys())}"
212+
213+
# Cross-talk check: the filt scope nulls out non-foo rows in 'a' (3
214+
# nulls), while the clean scope leaves all 7 rows intact (0 nulls).
215+
assert sd['cleaned_null_count'] == 0, (
216+
f"cleaned_null_count should reflect the clean scope (0); got "
217+
f"{sd['cleaned_null_count']}"
218+
)
219+
assert sd['filtered_null_count'] == 3, (
220+
f"filtered_null_count should reflect the filt scope (3 nulls); "
221+
f"got {sd['filtered_null_count']}"
222+
)
223+
224+
225+
def test_analysis_klasses_change_invalidates_scoped_sd():
226+
"""Codex P2 from #783: ``_scope_cache_key`` was hashed from chain +
227+
sampled_df + post_processing_method only. Two dataflows with the
228+
same df + chain but different ``analysis_klasses`` would collide on
229+
the same cache key, so a klass swap left stale SD blobs in the
230+
cache. Including ``id(analysis_klasses)`` in the cache key must
231+
produce distinct keys for distinct klass lists.
232+
233+
Asserted at the ``_scope_cache_key`` level because ``analysis_klasses``
234+
is a plain class attribute (not a traitlet) on ``DataFlow`` — setting
235+
it on the instance doesn't fire observers, so the merged_sd-level
236+
behavior can't be exercised end-to-end without an unrelated
237+
architectural change. The cache-key contract is the load-bearing
238+
invariant.
239+
"""
240+
df = pd.DataFrame({'a': [10, 20, 30, 40, 50],
241+
'b': ['foo', 'bar', 'foo', 'baz', 'foo']})
242+
dfc1 = ScopedDataflow(df)
243+
key1 = dfc1._scope_cache_key([])
244+
245+
dfc2 = ScopedDataflow(df)
246+
dfc2.analysis_klasses = [StylingAnalysis, DefaultSummaryStats, CleaningGenOps]
247+
key2 = dfc2._scope_cache_key([])
248+
249+
assert key1 != key2, (
250+
f"scope cache key must differ when analysis_klasses differs; "
251+
f"got the same key {key1} for both — likely the cache key still "
252+
f"omits analysis_klasses identity (Codex P2)"
253+
)

0 commit comments

Comments
 (0)