diff --git a/buckaroo/dataflow/dataflow.py b/buckaroo/dataflow/dataflow.py index 3c2790f89..642297843 100644 --- a/buckaroo/dataflow/dataflow.py +++ b/buckaroo/dataflow/dataflow.py @@ -81,6 +81,16 @@ def __init__(self, raw_df): autocleaning_klass = SentinelAutocleaning autoclean_conf = tuple() + # Generic per-scope stat skiplist. Maps scope name → set of stat + # names that should NOT run when computing that scope's SD. Used + # to keep expensive stats (e.g. ``histogram`` on xorq, where per- + # column engine queries dominate latency) out of the hot path + # while still rendering on the cached raw scope. Honoured by + # ``_summary_sd`` (filt scope) and ``_populate_sd_cache`` (raw + + # clean scopes). Default empty = run everything; XorqDataflow + # overrides this to skip histograms on filt + clean. + skip_stats_by_scope: TDict[str, set] = {} + command_config = Dict({}).tag(sync=True) operation_results = Dict({'transformed_df':None, 'generated_py_code': ""}) @@ -237,7 +247,7 @@ def processed_sd(self) -> SDType: return self.processed_result[1] return {} - def _get_summary_sd(self, df:pd.DataFrame) -> Tuple[SDType, TAny]: + def _get_summary_sd(self, df:pd.DataFrame, skip_stat_names=None) -> Tuple[SDType, TAny]: analysis_klasses = self.analysis_klasses if analysis_klasses == "foo": return {'some-col': {'foo':8}}, {} @@ -264,7 +274,20 @@ def _summary_sd(self, change): if (id(df), id(klasses)) == self._summary_sd_cache_key: return self._summary_sd_cache_key = (id(df), id(klasses)) - result_summary_sd, errs = self._get_summary_sd(df) + # filt scope: ``summary_sd`` is what gets stored under the filt + # cache key in ``_populate_sd_cache``, so apply the filt skip + # here when a filter is active. We can't use ``operations`` for + # the filter check — it hasn't been updated yet in the cascade + # (``_operation_result`` sets ``self.cleaned`` first, triggering + # this observer via ``processed_result``, *then* sets + # ``self.operations``). ``quick_command_args`` is the upstream + # trait, set before ``_operation_result`` runs. + filter_active = bool(self.quick_command_args) + filt_skip = ( + self.skip_stats_by_scope.get('filt') + if (filter_active and self.skip_stats_by_scope) + else None) + result_summary_sd, errs = self._get_summary_sd(df, skip_stat_names=filt_skip) self.summary_sd = result_summary_sd self.errs = errs @@ -500,13 +523,35 @@ def _compute_scope_df(self, scope: str): return base return pp_result[0] if pp_result else base - def _scope_cache_key(self, chain): + def _effective_skip(self, scope, chains): + """The skip list to apply for ``scope`` given the current + ``chains`` shape. Returns ``None`` when the scope is effectively + identical to its parent scope (no filter active → filt is filt- + of-clean; no cleaning → clean is just raw). In those degenerate + cases applying a separate per-scope skip would create a phantom + distinct cache entry, triggering unnecessary recomputes (e.g. + lazy postprocessor re-runs from ``_compute_scope_df``).""" + if not self.skip_stats_by_scope: + return None + if scope == 'raw': + return self.skip_stats_by_scope.get('raw') + if scope == 'clean': + if chains['clean'] == chains['raw']: + return None + return self.skip_stats_by_scope.get('clean') + if scope == 'filt': + if chains['filt'] == chains['clean']: + return None + return self.skip_stats_by_scope.get('filt') + return None + + def _scope_cache_key(self, chain, scope=None, chains=None): """Hash that identifies a scope's SD-input identity. Includes the op chain *and* an identifier for the source dataframe (``id(sampled_df)``) *and* the post-processing method - — all three are inputs to the scope df, and a cache hit must - mean "same SD-producing inputs" not just "same chain". + *and* the per-scope effective skip — a cache hit must mean + "same SD-producing inputs" not just "same chain". - sampled_df identity addresses codex P1 on #783: a ``raw_df`` swap with an unchanged chain must invalidate. @@ -515,13 +560,19 @@ def _scope_cache_key(self, chain): post-processing replaces the df entirely (e.g. ``hide_post`` → ``SENTINEL_DF``), the raw scope's SD must reflect that new df, not the pre-post-processing one. + - effective skip: keeps the no-filter / no-cleaning case + collapsing raw + clean + filt under one key (see + ``_effective_skip``) while still segregating when the skip + actually applies. analysis_klasses is *not* included here; that's a separate invariant (codex P2, deferred — see follow-up issue). """ sampled_id = id(self.sampled_df) if self.sampled_df is not None else 0 pp = self.post_processing_method or '' - return hash_chain(chain, extra=f"{sampled_id}|{pp}") + scope_skip = self._effective_skip(scope, chains) if (scope and chains is not None) else None + skip_part = '|'.join(sorted(scope_skip)) if scope_skip else '' + return hash_chain(chain, extra=f"{sampled_id}|{pp}|{skip_part}") @observe('summary_sd', 'operations', 'analysis_klasses') @exception_protect('sd-cache-protector') @@ -548,7 +599,7 @@ def _populate_sd_cache(self, _change): if self.processed_df is None: return chains = split_chain_by_scope(self.operations) - keys = {scope: self._scope_cache_key(chain) + keys = {scope: self._scope_cache_key(chain, scope=scope, chains=chains) for scope, chain in chains.items()} new_cache = dict(self.summary_stats_cache) cache_grew = False @@ -565,7 +616,8 @@ def _populate_sd_cache(self, _change): scope_df = self._compute_scope_df(scope) if scope_df is None: continue - sd, _errs = self._get_summary_sd(scope_df) + scope_skip = self._effective_skip(scope, chains) + sd, _errs = self._get_summary_sd(scope_df, skip_stat_names=scope_skip) new_cache[keys[scope]] = sd cache_grew = True @@ -605,11 +657,16 @@ def _build_error_dataframe(self, e): ### start summary stats block #TAny closer to some error type @override - def _get_summary_sd(self, processed_df:pd.DataFrame) -> Tuple[SDType, TDict[str, TAny]]: + def _get_summary_sd(self, processed_df:pd.DataFrame, skip_stat_names=None) -> Tuple[SDType, TDict[str, TAny]]: + # ``skip_stat_names`` is threaded through from the per-scope + # ``skip_stats_by_scope`` config — lets a dataflow declare + # "don't run stat X on scope Y" without touching analysis_klasses + # globally. DfStatsV2 / XorqDfStatsV2 forward this to the + # underlying pipeline. stats = self.DFStatsClass( processed_df, self.analysis_klasses, - self.df_name, debug=self.debug) + self.df_name, debug=self.debug, skip_stat_names=skip_stat_names) sdf = stats.sdf if stats.errs: if self.debug: diff --git a/buckaroo/pluggable_analysis_framework/analysis_management.py b/buckaroo/pluggable_analysis_framework/analysis_management.py index 4c5c77434..80f0624e0 100644 --- a/buckaroo/pluggable_analysis_framework/analysis_management.py +++ b/buckaroo/pluggable_analysis_framework/analysis_management.py @@ -286,7 +286,12 @@ def verify_analysis_objects(kls, col_analysis_objs:AObjs): kls.ap_class(col_analysis_objs) def __init__(self, df_stats_df:pd.DataFrame, col_analysis_objs:AObjs, - operating_df_name:str=None, debug:bool=False) -> None: + operating_df_name:str=None, debug:bool=False, skip_stat_names=None) -> None: + # ``skip_stat_names`` is accepted for API parity with DfStatsV2 / + # XorqDfStatsV2 (it's threaded through from the dataflow's + # ``skip_stats_by_scope`` config). The v1 ``AnalysisPipeline`` + # doesn't support per-stat skipping, so we strip at the output + # level instead. self.df = self.get_operating_df(df_stats_df, force_full_eval=False) self.col_order = self.df.columns self.ap = self.ap_class(col_analysis_objs) @@ -294,6 +299,11 @@ def __init__(self, df_stats_df:pd.DataFrame, col_analysis_objs:AObjs, self.debug = debug self.sdf, self.errs = self.ap.process_df(self.df, self.debug) + if skip_stat_names: + for col_stats in self.sdf.values(): + for k in list(col_stats.keys()): + if k in skip_stat_names: + del col_stats[k] if self.errs: output_full_reproduce(self.errs, self.sdf, operating_df_name) diff --git a/buckaroo/pluggable_analysis_framework/df_stats_v2.py b/buckaroo/pluggable_analysis_framework/df_stats_v2.py index ac428fe93..cbfa2b2cb 100644 --- a/buckaroo/pluggable_analysis_framework/df_stats_v2.py +++ b/buckaroo/pluggable_analysis_framework/df_stats_v2.py @@ -40,15 +40,17 @@ def verify_analysis_objects(cls, col_analysis_objs: AObjs) -> None: cls.ap_class(col_analysis_objs) def __init__(self, df_stats_df: pd.DataFrame, col_analysis_objs: AObjs, operating_df_name: str = None, - debug: bool = False) -> None: + debug: bool = False, skip_stat_names=None) -> None: self.df = self.get_operating_df(df_stats_df, force_full_eval=False) self.col_order = self.df.columns self.ap = self.ap_class(col_analysis_objs) self.operating_df_name = operating_df_name self.debug = debug - # Process using v1-compatible output format - self.sdf, self.errs = self.ap.process_df_v1_compat(self.df, self.debug) + # ``skip_stat_names`` is the per-scope skiplist threaded through + # from the dataflow's ``skip_stats_by_scope`` config. + self.sdf, self.errs = self.ap.process_df_v1_compat(self.df, self.debug, + skip_stat_names=skip_stat_names) self.stat_errors = [] if self.errs: @@ -94,10 +96,12 @@ def get_operating_df(self, df): return df.sample(n=min(50_000, rows), seed=42) return df - def __init__(self, df, col_analysis_objs, operating_df_name=None, debug=False): + def __init__(self, df, col_analysis_objs, operating_df_name=None, debug=False, + skip_stat_names=None): self.df = self.get_operating_df(df) self.ap = StatPipeline(col_analysis_objs, unit_test=False) - self.sdf, self.errs = self.ap.process_df_v1_compat(self.df, debug) + self.sdf, self.errs = self.ap.process_df_v1_compat(self.df, debug, + skip_stat_names=skip_stat_names) self.stat_errors = [] if self.errs: output_full_reproduce(self.errs, self.sdf, operating_df_name) diff --git a/buckaroo/pluggable_analysis_framework/stat_pipeline.py b/buckaroo/pluggable_analysis_framework/stat_pipeline.py index 5f4bec8f4..2308cdbb0 100644 --- a/buckaroo/pluggable_analysis_framework/stat_pipeline.py +++ b/buckaroo/pluggable_analysis_framework/stat_pipeline.py @@ -234,13 +234,19 @@ def __init__(self, stat_funcs: list, unit_test: bool = True, record_timings: boo self._unit_test_result = self.unit_test() def process_column(self, column_name: str, column_dtype, raw_series=None, sampled_series=None, raw_dataframe=None, - initial_stats: Optional[Dict[str, Any]] = None) -> Tuple[Dict[str, Any], List[StatError]]: + initial_stats: Optional[Dict[str, Any]] = None, + skip_stat_names=None) -> Tuple[Dict[str, Any], List[StatError]]: """Process a single column through the stat DAG. 1. Filters stat functions by column dtype - 2. Executes in topological order with Ok/Err accumulator - 3. Returns (plain_dict, errors) + 2. Filters by ``skip_stat_names`` — explicit per-stat-name + skiplist threaded through from the dataflow's + ``skip_stats_by_scope`` config (e.g. histograms off on + filt/clean for xorq) + 3. Executes in topological order with Ok/Err accumulator + 4. Returns (plain_dict, errors) """ + skip_stat_names = skip_stat_names or set() # Build column-specific DAG (filters by dtype) external = set(self.EXTERNAL_KEYS) if initial_stats: @@ -255,6 +261,8 @@ def process_column(self, column_name: str, column_dtype, raw_series=None, sample accumulator[k] = Ok(v) record_timings = self.record_timings for sf in column_funcs: + if sf.name in skip_stat_names: + continue if record_timings: t0 = time.perf_counter() _execute_stat_func(sf, accumulator, column_name, raw_series=raw_series, sampled_series=sampled_series, @@ -268,11 +276,26 @@ def process_column(self, column_name: str, column_dtype, raw_series=None, sample for sk in sf.provides: col_key_to_func[sk.name] = sf - return resolve_accumulator(accumulator, column_name, col_key_to_func) - - def process_df(self, df: pd.DataFrame, debug: bool = False) -> Tuple[SDType, List[StatError]]: + result, errors = resolve_accumulator(accumulator, column_name, col_key_to_func) + # Output-level strip — covers v1 ColAnalysis wrappers where one + # ``StatFunc.name`` (e.g. ``DefaultSummaryStats__series``) + # provides many keys (``mean``, ``max``, ...). The input-level + # skip above can't tell which wrapper to drop; the output-level + # strip removes any key the caller asked to skip regardless of + # which producing stat func it came from. + if skip_stat_names: + for k in list(result.keys()): + if k in skip_stat_names: + del result[k] + return result, errors + + def process_df(self, df: pd.DataFrame, debug: bool = False, + skip_stat_names=None) -> Tuple[SDType, List[StatError]]: """Process all columns of a DataFrame. + ``skip_stat_names`` is the per-stat-name skiplist threaded + through from the dataflow's ``skip_stats_by_scope`` config. + Returns: (summary_dict, all_errors) where summary_dict is SDType-compatible (column_name -> {stat_name -> value}). @@ -292,14 +315,16 @@ def process_df(self, df: pd.DataFrame, debug: bool = False) -> Tuple[SDType, Lis col_result, col_errors = self.process_column(column_name=rewritten_col_name, column_dtype=col_dtype, raw_series=ser, sampled_series=ser, raw_dataframe=df, - initial_stats={'orig_col_name': orig_col_name, 'rewritten_col_name': rewritten_col_name}) + initial_stats={'orig_col_name': orig_col_name, 'rewritten_col_name': rewritten_col_name}, + skip_stat_names=skip_stat_names) summary[rewritten_col_name] = col_result all_errors.extend(col_errors) return summary, all_errors - def process_df_v1_compat(self, df: pd.DataFrame, debug: bool = False) -> Tuple[SDType, ErrDict]: + def process_df_v1_compat(self, df: pd.DataFrame, debug: bool = False, + skip_stat_names=None) -> Tuple[SDType, ErrDict]: """Process DataFrame with v1-compatible error format. Returns (SDType, ErrDict) matching the v1 AnalysisPipeline interface. @@ -307,7 +332,7 @@ def process_df_v1_compat(self, df: pd.DataFrame, debug: bool = False) -> Tuple[S that mixes ColAnalysis subclasses into the input list — DataFlow, autocleaning, server.data_loading, polars_buckaroo). """ - summary, errors = self.process_df(df, debug=debug) + summary, errors = self.process_df(df, debug=debug, skip_stat_names=skip_stat_names) # Convert StatError list to v1 ErrDict format errs: ErrDict = {} diff --git a/buckaroo/pluggable_analysis_framework/xorq_stat_pipeline.py b/buckaroo/pluggable_analysis_framework/xorq_stat_pipeline.py index 54720f9af..69ce1cb9d 100644 --- a/buckaroo/pluggable_analysis_framework/xorq_stat_pipeline.py +++ b/buckaroo/pluggable_analysis_framework/xorq_stat_pipeline.py @@ -165,7 +165,12 @@ def unit_test(self) -> Tuple[bool, List[StatError]]: finally: self.backend = saved_backend - def process_table(self, table) -> Tuple[SDType, List[StatError]]: + def process_table(self, table, skip_stat_names=None) -> Tuple[SDType, List[StatError]]: + """Run the pipeline. ``skip_stat_names`` is an explicit per- + stat-name skiplist — used by the dataflow's + ``skip_stats_by_scope`` config (e.g. histograms on filt/clean + scopes for xorq).""" + skip_stat_names = skip_stat_names or set() schema = table.schema() columns = list(table.columns) @@ -188,6 +193,8 @@ def process_table(self, table) -> Tuple[SDType, List[StatError]]: for sf in self.ordered_stat_funcs: if not _is_batch_func(sf): continue + if sf.name in skip_stat_names: + continue xorq_col_param = next(r.name for r in sf.requires if r.type is XorqColumn) for col in columns: col_dtype = schema[col] @@ -253,6 +260,10 @@ def process_table(self, table) -> Tuple[SDType, List[StatError]]: # (typically the batch-phase stats). if sf.provides and all(sk.name in col_accum for sk in sf.provides): continue + # Per-scope name skiplist (e.g. ``histogram`` not run + # on filt/clean scopes for xorq). + if sf.name in skip_stat_names: + continue _execute_stat_func(sf, col_accum, col, raw_series=None, sampled_series=None, raw_dataframe=None, xorq_expr=table, xorq_execute=self._execute) @@ -306,14 +317,15 @@ def add_stat(self, stat_func_or_class) -> Tuple[bool, List[StatError]]: return True, [] - def process_table_v1_compat(self, table) -> Tuple[SDType, ErrDict]: + def process_table_v1_compat(self, table, skip_stat_names=None) -> Tuple[SDType, ErrDict]: """Run process_table and convert errors to v1 ErrDict shape. Used by XorqDfStatsV2 / DataFlow consumers expecting the same ``{(col, stat): (Exception, kls)}`` shape that AnalysisPipeline - produced. + produced. ``skip_stat_names`` threads through the per-scope + skiplist. """ - summary, errors = self.process_table(table) + summary, errors = self.process_table(table, skip_stat_names=skip_stat_names) errs: ErrDict = {} for se in errors: kls = _find_v1_class(se.stat_func, self._original_inputs) if se.stat_func else None @@ -346,7 +358,8 @@ def verify_analysis_objects(cls, objs): # (issue #709). DAG validation still runs as part of __init__. XorqStatPipeline(objs, unit_test=False) - def __init__(self, table, col_analysis_objs, operating_df_name=None, debug=False): + def __init__(self, table, col_analysis_objs, operating_df_name=None, debug=False, + skip_stat_names=None): self.table = table # Skip the unit_test PERVERSE_DF run on each widget construction — # it doubles the SQL query count (issue #709). The DAG-validation @@ -355,7 +368,11 @@ def __init__(self, table, col_analysis_objs, operating_df_name=None, debug=False self.ap = XorqStatPipeline(col_analysis_objs, unit_test=False) self.operating_df_name = operating_df_name self.debug = debug - self.sdf, self.errs = self.ap.process_table_v1_compat(self.table) + # skip_stat_names is the per-scope "don't run this stat" filter + # the dataflow passes when e.g. histograms are excluded from the + # filt/clean scopes. None means run everything. + self.sdf, self.errs = self.ap.process_table_v1_compat(self.table, + skip_stat_names=skip_stat_names) self.stat_errors = [] if self.errs: output_full_reproduce(self.errs, self.sdf, operating_df_name) diff --git a/buckaroo/xorq_buckaroo.py b/buckaroo/xorq_buckaroo.py index 052dc72b4..8a0ac3a8c 100644 --- a/buckaroo/xorq_buckaroo.py +++ b/buckaroo/xorq_buckaroo.py @@ -138,8 +138,19 @@ class XorqDataflow(CustomizableDataflow): 2. ``_get_summary_sd`` re-keys the summary dict from original column names (what ``XorqStatPipeline`` produces) to the rewritten ``a, b, c`` names that ``pd_to_obj`` and the styling layer expect. + + Histogram is also skipped on the filt + clean scopes via + ``skip_stats_by_scope``. On xorq the per-column histogram queries + dominate state_change latency on remote tables — running them + three times per state_change (once per scope) is wasteful when + only the raw histogram is rendered. The bare ``histogram`` key + in ``merged_sd`` still comes from the raw scope; ``filtered_*`` + and ``cleaned_*`` simply omit a histogram entry. Clear the class + attribute on a subclass to opt back in. """ + skip_stats_by_scope = {'filt': {'histogram'}, 'clean': {'histogram'}} + def populate_df_meta(self) -> None: if self.processed_df is None: self.df_meta = { @@ -155,7 +166,7 @@ def populate_df_meta(self) -> None: 'rows_shown': rows_shown, 'total_rows': _expr_count(self.orig_df)} - def _get_summary_sd(self, processed_df): + def _get_summary_sd(self, processed_df, skip_stat_names=None): if _is_pandas(processed_df): # The error path (and any postprocessor that returns a pandas # DataFrame) doesn't go through XorqStatPipeline. Return a @@ -166,7 +177,7 @@ def _get_summary_sd(self, processed_df): 'orig_col_name': orig_col, 'rewritten_col_name': rewritten_col} return empty, {} - sdf, errs = super()._get_summary_sd(processed_df) + sdf, errs = super()._get_summary_sd(processed_df, skip_stat_names=skip_stat_names) rewritten = {} for orig_col, rewritten_col in old_col_new_col(processed_df): col_meta = dict(sdf.get(orig_col, {})) diff --git a/tests/unit/dataflow/scoped_summary_stats_test.py b/tests/unit/dataflow/scoped_summary_stats_test.py index 55c7951df..fba9bd4bb 100644 --- a/tests/unit/dataflow/scoped_summary_stats_test.py +++ b/tests/unit/dataflow/scoped_summary_stats_test.py @@ -142,6 +142,91 @@ def test_cleaning_only_does_not_emit_filtered_keys(): ) +class _SkipFiltMeanDataflow(ScopedDataflow): + """ScopedDataflow with the filt scope configured to skip the ``mean`` + stat. Used to pin the ``skip_stats_by_scope`` plumbing on the cheap + pandas DefaultSummaryStats path.""" + + skip_stats_by_scope = {'filt': {'mean'}} + + +class _SkipRawAndCleanMeanDataflow(ScopedDataflow): + skip_stats_by_scope = {'raw': {'mean'}, 'clean': {'mean'}} + + +def test_skip_stats_by_scope_excludes_named_stat_from_filt(): + """When ``skip_stats_by_scope = {'filt': {'mean'}}`` and a filter is + active, the dataflow must NOT compute ``mean`` on the filt scope — + so ``filtered_mean`` is absent from merged_sd. Other filtered_* + stats (e.g. ``filtered_length``) are unaffected. The bare raw + ``mean`` (from the raw scope) is still present. + + Equivalent contract used by XorqDataflow to skip the expensive + per-column histogram queries on filt/clean scopes while still + rendering bare histograms from raw.""" + df = pd.DataFrame({'a': [10, 20, 30, 40, 50], + 'b': ['foo', 'bar', 'foo', 'baz', 'foo']}) + dfc = _SkipFiltMeanDataflow(df) + dfc.quick_command_args = {'search': ['foo']} + + sd = dfc.merged_sd + assert 'mean' in sd['a'], ( + "bare raw `mean` must still be present — skip list only applies " + "to filt scope, not raw" + ) + assert 'filtered_length' in sd['a'], ( + "precondition: filtered_* layering must be active " + "(non-skipped stats should still produce filtered_* keys)" + ) + assert 'filtered_mean' not in sd['a'], ( + f"filtered_mean must be absent when skip_stats_by_scope says " + f"to skip `mean` on filt scope; got {sd['a'].get('filtered_mean')!r}. " + f"merged_sd['a'] keys: {sorted(sd['a'].keys())}" + ) + + +def test_skip_stats_by_scope_excludes_from_raw_and_clean(): + """``skip_stats_by_scope`` with raw/clean entries must keep those + stats out of the cached raw and clean scope SDs — so the bare + ``mean`` key (which comes from the raw scope) is absent. The filt + scope still computes ``mean``, so ``filtered_mean`` shows up when + a filter is active.""" + df = pd.DataFrame({'a': [10, 20, 30, 40, 50], + 'b': ['foo', 'bar', 'foo', 'baz', 'foo']}) + dfc = _SkipRawAndCleanMeanDataflow(df) + dfc.quick_command_args = {'search': ['foo']} + + sd = dfc.merged_sd + assert 'mean' not in sd['a'], ( + f"bare `mean` must be absent when skip_stats_by_scope skips it " + f"on raw/clean scopes; got {sd['a'].get('mean')!r}. " + f"merged_sd['a'] keys: {sorted(sd['a'].keys())}" + ) + assert 'filtered_mean' in sd['a'], ( + "filt scope is not in the skip dict, so filtered_mean should " + "still appear when filter is active" + ) + + +def test_skip_stats_by_scope_default_empty_runs_all_stats(): + """Without any ``skip_stats_by_scope`` override (default {}), behaviour + must be unchanged: every scope computes every stat, all filtered_* + keys appear when filter is active.""" + df = pd.DataFrame({'a': [10, 20, 30, 40, 50], + 'b': ['foo', 'bar', 'foo', 'baz', 'foo']}) + dfc = ScopedDataflow(df) + assert dfc.skip_stats_by_scope == {}, ( + "default skip_stats_by_scope on the base CustomizableDataflow " + "should be empty so existing dataflows keep their behaviour" + ) + dfc.quick_command_args = {'search': ['foo']} + sd = dfc.merged_sd + assert 'mean' in sd['a'] + assert 'filtered_mean' in sd['a'] + assert 'length' in sd['a'] + assert 'filtered_length' in sd['a'] + + def test_raw_df_change_invalidates_scoped_sd(): """Codex P1 from #783: the cache key was derived only from the op chain, so a ``raw_df`` swap with the same (empty) chain reused stale entries. diff --git a/tests/unit/test_xorq_buckaroo_widget.py b/tests/unit/test_xorq_buckaroo_widget.py index ee786be6b..7f5bf9cb7 100644 --- a/tests/unit/test_xorq_buckaroo_widget.py +++ b/tests/unit/test_xorq_buckaroo_widget.py @@ -182,6 +182,44 @@ def count(self): del first +class TestSkipStatsByScope: + """XorqDataflow opts out of computing the histogram stat on the + filt and clean scopes — those scope SDs feed ``filtered_histogram`` + and ``cleaned_histogram`` keys that the user has chosen to not + render. On xorq each histogram is N per-column engine queries, so + the skiplist saves a multiple of state_change latency. + + The polars / pandas dataflows leave ``skip_stats_by_scope`` at its + default ``{}`` — those backends keep computing filt/clean histograms + because materialising the scope df is cheap relative to the + per-column query cost on xorq.""" + + def test_xorq_dataflow_skips_histogram_on_filt_and_clean(self): + from buckaroo.xorq_buckaroo import XorqDataflow + + assert XorqDataflow.skip_stats_by_scope == { + 'filt': {'histogram'}, 'clean': {'histogram'}}, ( + "XorqDataflow.skip_stats_by_scope must skip the `histogram` " + "stat on the filt and clean scopes; got " + f"{XorqDataflow.skip_stats_by_scope!r}" + ) + + def test_polars_dataflow_keeps_histogram(self): + """Polars side keeps the default empty skiplist — histograms + compute on every scope.""" + import polars as pl + + from buckaroo.polars_buckaroo import PolarsBuckarooWidget + + df = pl.DataFrame({'a': [1, 2, 3, 4, 5]}) + w = PolarsBuckarooWidget(df) + assert w.dataflow.skip_stats_by_scope == {}, ( + "Polars dataflow must keep skip_stats_by_scope at the " + "default empty dict; got " + f"{w.dataflow.skip_stats_by_scope!r}" + ) + + class TestInstantiation: def test_smoke(self): XorqBuckarooWidget(_expr())