From e62aa03ee5c73a982b7b30491a15f44f61b06661 Mon Sep 17 00:00:00 2001 From: janrth Date: Mon, 30 Mar 2026 15:57:27 +0200 Subject: [PATCH 1/4] Add partition_by support for lag transforms across fit, predict, CV, and AutoML --- mlforecast/core.py | 410 +++++++++++++++++++++++++++++++++-- mlforecast/lag_transforms.py | 78 +++++-- tests/test_auto.py | 68 +++++- tests/test_core.py | 369 +++++++++++++++++++++++++++++++ tests/test_forecast.py | 75 +++++++ 5 files changed, 965 insertions(+), 35 deletions(-) diff --git a/mlforecast/core.py b/mlforecast/core.py index b0cf0e18..0c0505ab 100644 --- a/mlforecast/core.py +++ b/mlforecast/core.py @@ -17,6 +17,7 @@ List, Mapping, Optional, + Sequence, Tuple, Union, ) @@ -26,6 +27,7 @@ import numpy as np import pandas as pd import utilsforecast.processing as ufp +from coreforecast.grouped_array import GroupedArray as CoreGroupedArray from sklearn.base import BaseEstimator from sklearn.pipeline import Pipeline from utilsforecast.compat import ( @@ -132,6 +134,10 @@ def _as_tuple(x): return (x,) +def _dedupe_preserve_order(items: Iterable[str]) -> List[str]: + return list(dict.fromkeys(items)) + + Freq = Union[int, str] Lags = Iterable[int] LagTransform = Union[Callable, Tuple[Callable, Any]] @@ -241,6 +247,13 @@ def __init__( namer=lag_transforms_namer, ) self.ga: GroupedArray + self._partition_states: Dict[ + Tuple[str, Tuple[str, ...], Tuple[str, ...]], Dict[str, Any] + ] = {} + self._current_partition_keys: Optional[ + Dict[Tuple[str, Tuple[str, ...], Tuple[str, ...]], DFType] + ] = None + self._current_step_x: Optional[DFType] = None def _get_core_lag_tfms(self) -> Dict[str, _BaseLagTransform]: return { @@ -251,7 +264,9 @@ def _get_global_tfms(self) -> Dict[str, _BaseLagTransform]: return { k: v for k, v in self.transforms.items() - if isinstance(v, _BaseLagTransform) and getattr(v, "global_", False) + if isinstance(v, _BaseLagTransform) + and getattr(v, "global_", False) + and not getattr(v, "partition_by", None) } def _get_group_tfms(self) -> Dict[Tuple[str, ...], Dict[str, _BaseLagTransform]]: @@ -260,12 +275,40 @@ def _get_group_tfms(self) -> Dict[Tuple[str, ...], Dict[str, _BaseLagTransform]] if not isinstance(tfm, _BaseLagTransform): continue groupby = getattr(tfm, "groupby", None) - if not groupby: + if not groupby or getattr(tfm, "partition_by", None): continue key = tuple(groupby) grouped.setdefault(key, {})[name] = tfm return grouped + def _get_partition_tfms( + self, + ) -> Dict[Tuple[str, Tuple[str, ...], Tuple[str, ...]], Dict[str, _BaseLagTransform]]: + grouped: Dict[ + Tuple[str, Tuple[str, ...], Tuple[str, ...]], + Dict[str, _BaseLagTransform], + ] = {} + for name, tfm in self.transforms.items(): + if not isinstance(tfm, _BaseLagTransform): + continue + partition_by = getattr(tfm, "partition_by", None) + if not partition_by: + continue + if getattr(tfm, "global_", False): + mode = "global" + group_cols: Tuple[str, ...] = () + else: + groupby = getattr(tfm, "groupby", None) + if groupby: + mode = "group" + group_cols = tuple(groupby) + else: + mode = "local" + group_cols = () + key = (mode, group_cols, tuple(partition_by)) + grouped.setdefault(key, {})[name] = tfm + return grouped + def _get_local_tfms( self, transforms: Mapping[str, Union[Tuple[Any, ...], _BaseLagTransform]], @@ -276,12 +319,206 @@ def _get_local_tfms( continue if isinstance(tfm, _BaseLagTransform) and getattr(tfm, "groupby", None): continue + if isinstance(tfm, _BaseLagTransform) and getattr(tfm, "partition_by", None): + continue local[name] = tfm return local + def _get_partition_key_cols( + self, + mode: str, + group_cols: Sequence[str], + partition_cols: Sequence[str], + ) -> List[str]: + if mode == "local": + cols = [self.id_col, *partition_cols] + elif mode == "group": + cols = [*group_cols, *partition_cols] + elif mode == "global": + cols = [*partition_cols] + else: + raise ValueError(f"Unknown partition mode: {mode}") + return _dedupe_preserve_order(cols) + + def _build_partition_bucket_df( + self, + df: DFType, + mode: str, + group_cols: Sequence[str], + partition_cols: Sequence[str], + ) -> DFType: + key_cols = self._get_partition_key_cols(mode, group_cols, partition_cols) + if mode == "local": + bucket_df = df[key_cols + [self.time_col, self.target_col]] + bucket_df = ufp.sort(bucket_df, by=key_cols + [self.time_col]) + return ufp.drop_index_if_pandas(bucket_df) + bucket_df = ufp.group_by_agg( + df[key_cols + [self.time_col, self.target_col]], + key_cols + [self.time_col], + {self.target_col: "sum"}, + maintain_order=True, + ) + bucket_df = ufp.sort(bucket_df, by=key_cols + [self.time_col]) + return ufp.drop_index_if_pandas(bucket_df) + + def _add_bucket_id( + self, data: DFType, cols: Sequence[str] + ) -> Tuple[DFType, DFType]: + cols = list(cols) + if isinstance(data, pd.DataFrame): + groups = data[cols].drop_duplicates().reset_index(drop=True) + groups["_bucket_id"] = np.arange(len(groups), dtype=np.int64) + data = data.merge(groups, on=cols, how="left") + else: + groups = data.select(cols).unique(maintain_order=True) + groups = groups.with_row_index(name="_bucket_id") + data = data.join(groups, on=cols, how="left") + return ufp.drop_index_if_pandas(data), ufp.drop_index_if_pandas(groups) + + def _lookup_bucket_ids( + self, data: DFType, groups: DFType, cols: Sequence[str] + ) -> np.ndarray: + cols = list(cols) + if isinstance(data, pd.DataFrame): + joined = data[cols].merge(groups, on=cols, how="left") + bucket_ids = joined["_bucket_id"].to_numpy() + else: + # Avoid categorical cache mismatches when partition keys come from + # different polars sources during update/predict. + left = data.select([pl.col(col).cast(pl.String).alias(col) for col in cols]) + right = groups.select( + [pl.col(col).cast(pl.String).alias(col) for col in cols] + + [pl.col("_bucket_id")] + ) + joined = left.join(right, on=cols, how="left") + bucket_ids = joined["_bucket_id"].to_numpy() + missing = pd.isna(bucket_ids) + out = np.full(bucket_ids.shape[0], -1, dtype=np.int64) + if (~missing).any(): + out[~missing] = bucket_ids[~missing].astype(np.int64, copy=False) + return out + + def _get_partition_context(self, step_df: Optional[DFType]) -> Optional[DFType]: + partition_states = getattr(self, "_partition_states", {}) + if not partition_states: + return None + required_cols = {self.id_col} + for state in partition_states.values(): + required_cols.update(state["key_cols"]) + columns = {} + missing = [] + step_columns = set() if step_df is None else set(step_df.columns) + for col in required_cols: + if step_df is not None and col in step_columns: + columns[col] = step_df[col] + elif col in self.static_features_.columns: + columns[col] = self.static_features_[col] + else: + missing.append(col) + if missing: + missing_cols = ", ".join(sorted(missing)) + raise ValueError( + "Partition lag transforms require the following columns in the input " + f"data or future exogenous dataframe: {missing_cols}." + ) + if isinstance(self.static_features_, pd.DataFrame): + return pd.DataFrame(columns) + return pl_DataFrame(columns) + + def _ensure_partition_bucket_ids( + self, state: Dict[str, Any], keys_df: DFType + ) -> np.ndarray: + key_cols = state["key_cols"] + bucket_ids = self._lookup_bucket_ids(keys_df, state["groups"], key_cols) + missing_mask = bucket_ids < 0 + if not missing_mask.any(): + return bucket_ids + + if isinstance(keys_df, pd.DataFrame): + missing_groups = ( + keys_df.loc[missing_mask, key_cols].drop_duplicates().reset_index(drop=True) + ) + start = len(state["groups"]) + missing_groups["_bucket_id"] = np.arange( + start, start + len(missing_groups), dtype=np.int64 + ) + state["groups"] = pd.concat([state["groups"], missing_groups], ignore_index=True) + else: + missing_groups = ( + keys_df.filter(pl.Series(missing_mask)) + .select(key_cols) + .unique(maintain_order=True) + ) + start = state["groups"].height + missing_groups = missing_groups.with_row_index( + name="_bucket_id", offset=start + ) + state["groups"] = pl.concat([state["groups"], missing_groups], how="vertical") + return self._lookup_bucket_ids(keys_df, state["groups"], key_cols) + + def _compute_partition_features( + self, step_df: Optional[DFType] + ) -> Dict[str, np.ndarray]: + partition_states = getattr(self, "_partition_states", {}) + if not partition_states: + self._current_partition_keys = None + return {} + context = self._get_partition_context(step_df) + assert context is not None + self._current_partition_keys = {} + n_series = len(self.uids) + features: Dict[str, np.ndarray] = {} + for state_key, state in partition_states.items(): + core_ga = CoreGroupedArray(state["ga"].data, state["ga"].indptr) + updates = {} + for name, tfm in state["tfms"].items(): + fresh_tfm = copy.deepcopy(tfm)._set_core_tfm(tfm._core_tfm.lag) + fresh_tfm.transform(core_ga) + updates[name] = fresh_tfm.update(core_ga) + bucket_ids = self._lookup_bucket_ids( + context, state["groups"], state["key_cols"] + ) + self._current_partition_keys[state_key] = context[state["key_cols"]] + valid = bucket_ids >= 0 + for name, vals in updates.items(): + out = np.full(n_series, np.nan, dtype=float) + if valid.any(): + out[valid] = vals[bucket_ids[valid]] + features[name] = out + return features + + def _update_partition_states(self, new: np.ndarray) -> None: + partition_states = getattr(self, "_partition_states", {}) + if not partition_states: + return + current_partition_keys = self._current_partition_keys + if current_partition_keys is None: + raise ValueError("Partition lag transforms require partition keys before updating.") + new_arr = np.asarray(new, dtype=self.ga.data.dtype) + for state_key, state in partition_states.items(): + keys_df = current_partition_keys[state_key] + old_n_buckets = len(state["groups"]) + bucket_ids = self._ensure_partition_bucket_ids(state, keys_df) + new_n_buckets = len(state["groups"]) + bucket_sums = np.zeros(new_n_buckets, dtype=self.ga.data.dtype) + new_sizes = np.zeros(new_n_buckets, dtype=np.int32) + np.add.at(bucket_sums, bucket_ids, new_arr) + new_sizes[np.unique(bucket_ids)] = 1 + new_groups = np.zeros(new_n_buckets, dtype=bool) + if new_n_buckets > old_n_buckets: + new_groups[old_n_buckets:] = True + new_values = bucket_sums[new_sizes.astype(bool)] + state["ga"] = state["ga"].append_several( + new_sizes=new_sizes, + new_values=new_values, + new_groups=new_groups, + ) + def _check_aligned_ends(self) -> None: """Check that all series end at the same timestamp when using global/group transforms.""" - if not (self._get_global_tfms() or self._get_group_tfms()): + partition_tfms = self._get_partition_tfms() + needs_alignment = any(mode in {"global", "group"} for mode, _, _ in partition_tfms) + if not (self._get_global_tfms() or self._get_group_tfms() or needs_alignment): return if isinstance(self.last_dates, pd.Index): aligned = self.last_dates.nunique() == 1 @@ -402,7 +639,16 @@ def _fit( self._global_times = global_df[time_col] to_drop = [id_col, time_col, target_col] if static_features is None: - static_features = [c for c in df.columns if c not in [time_col, target_col]] + partition_cols = { + col + for _, _, partition_cols in self._get_partition_tfms().keys() + for col in partition_cols + } + static_features = [ + c + for c in df.columns + if c not in [time_col, target_col] and c not in partition_cols + ] elif id_col not in static_features: static_features = [id_col, *static_features] else: # static_features defined and contain id_col @@ -434,17 +680,71 @@ def _fit( self.features_order_ = [c for c in df.columns if c not in to_drop] + [ f for f in self.features if f not in df.columns ] + df_for_special = df + if self.target_transforms is not None: + transformed_target = ga.data + if self._restore_idxs is not None: + transformed_target = transformed_target[self._restore_idxs] + # Keep the caller's dataframe untouched when building aggregated or + # partitioned states from the transformed target. + df_for_special = ufp.copy_if_pandas(df, deep=True) + df_for_special = ufp.assign_columns( + df_for_special, target_col, transformed_target + ) + + self._partition_states = {} + partition_tfms = self._get_partition_tfms() + if partition_tfms: + for (mode, group_cols, partition_cols), tfms in partition_tfms.items(): + for col in partition_cols: + if col not in df.columns: + raise ValueError( + f"Partition column '{col}' not found in dataframe." + ) + if mode == "group": + missing = [ + c + for c in group_cols + if c not in self.static_features_.columns + ] + if missing: + raise ValueError( + "Groupby columns must be static features. " + f"Missing from static_features: {missing}." + ) + bucket_df = self._build_partition_bucket_df( + df_for_special, mode, group_cols, partition_cols + ) + key_cols = self._get_partition_key_cols( + mode, group_cols, partition_cols + ) + bucket_df, groups = self._add_bucket_id(bucket_df, key_cols) + if isinstance(bucket_df, pd.DataFrame): + process_df = bucket_df[["_bucket_id", time_col, target_col]] + else: + process_df = bucket_df.select(["_bucket_id", time_col, target_col]) + processed = ufp.process_df( + process_df, + id_col="_bucket_id", + time_col=time_col, + target_col=target_col, + ) + if processed.sort_idxs is not None: + bucket_df = ufp.take_rows(bucket_df, processed.sort_idxs) + bucket_df = ufp.drop_index_if_pandas(bucket_df) + self._partition_states[(mode, group_cols, partition_cols)] = { + "mode": mode, + "group_cols": list(group_cols), + "partition_cols": list(partition_cols), + "key_cols": key_cols, + "tfms": tfms, + "ga": GroupedArray(processed.data[:, 0], processed.indptr), + "df": bucket_df, + "groups": groups, + } self._group_states: Dict[Tuple[str, ...], Dict[str, Any]] = {} group_tfms = self._get_group_tfms() if group_tfms: - if self.target_transforms is not None: - transformed_target = ga.data - if self._restore_idxs is not None: - transformed_target = transformed_target[self._restore_idxs] - df_for_group = ufp.assign_columns(df, target_col, transformed_target) - else: - df_for_group = df - def _add_group_id(data, cols): if isinstance(data, pd.DataFrame): groups = data[cols].drop_duplicates().reset_index(drop=True) @@ -475,7 +775,7 @@ def _map_group_id(data, groups, cols): f"Missing from static_features: {missing}." ) group_df = ufp.group_by_agg( - df_for_group[group_cols_list + [time_col, target_col]], + df_for_special[group_cols_list + [time_col, target_col]], group_cols_list + [time_col], {target_col: "sum"}, maintain_order=True, @@ -635,6 +935,32 @@ def _transform( ) for name in feature_cols: features[name] = joined[name].to_numpy() + if self._partition_states: + for state in self._partition_states.values(): + bucket_df = state["df"] + bucket_vals = state["ga"].apply_transforms( + transforms=state["tfms"], updates_only=False + ) + key_cols = state["key_cols"] + feature_cols = list(bucket_vals.keys()) + if isinstance(df, pd.DataFrame): + join_df = bucket_df[key_cols + [self.time_col]].copy() + for name, vals in bucket_vals.items(): + join_df[name] = vals + joined = df[key_cols + [self.time_col]].merge( + join_df, on=key_cols + [self.time_col], how="left" + ) + for name in feature_cols: + features[name] = joined[name].to_numpy() + else: + join_df = bucket_df.select(key_cols + [self.time_col]) + for name, vals in bucket_vals.items(): + join_df = join_df.with_columns(pl.Series(name=name, values=vals)) + joined = df.select(key_cols + [self.time_col]).join( + join_df, on=key_cols + [self.time_col], how="left" + ) + for name in feature_cols: + features[name] = joined[name].to_numpy() # filter out the features that already exist in df to avoid overwriting them features = {k: v for k, v in features.items() if k not in df} if self._restore_idxs is not None: @@ -942,8 +1268,9 @@ def _update_y(self, new: np.ndarray) -> None: group_sums = np.zeros(n_groups, dtype=self.ga.data.dtype) np.add.at(group_sums, group_idx, new_arr) state["ga"] = state["ga"].append(group_sums) + self._update_partition_states(new_arr) - def _update_features(self) -> DataFrame: + def _update_features(self, step_df: Optional[DFType] = None) -> DataFrame: """Compute the current values of all the features using the latest values of the time series.""" self.curr_dates: Union[pd.Index, pl_Series] = ufp.offset_times( self.curr_dates, self.freq, 1 @@ -968,6 +1295,7 @@ def _update_features(self) -> DataFrame: group_idx = state["group_idx"] for name, vals in updates.items(): features[name] = vals[group_idx] + features.update(self._compute_partition_features(step_df)) for feature in self.date_features: feat_name, feat_vals = self._compute_date_feature(self.curr_dates, feature) @@ -1010,7 +1338,7 @@ def _get_predictions(self) -> DataFrame: return df def _get_features_for_next_step(self, X_df=None): - new_x = self._update_features() + X = None if X_df is not None: n_series = len(self.uids) h = X_df.shape[0] // n_series # how many timestamps per series @@ -1019,6 +1347,12 @@ def _get_features_for_next_step(self, X_df=None): rows = np.arange(row_offset, X_df.shape[0], h) X = ufp.take_rows(X_df, rows) X = ufp.drop_index_if_pandas(X) + self._current_step_x = X + new_x = self._update_features(step_df=X) + if X is not None: + X = ufp.drop_columns( + X, [c for c in [self.id_col, self.time_col] if c in X.columns] + ) new_x = ufp.horizontal_concat([new_x, X]) if isinstance(new_x, pd.DataFrame): nulls = new_x.isnull().any() @@ -1040,7 +1374,9 @@ def _backup(self) -> Iterator[None]: ga = copy.copy(self.ga) # if these save state (like ExpandingMean) they'll get modified by the updates lag_tfms = copy.deepcopy(self.transforms) + target_tfms = copy.deepcopy(self.target_transforms) group_states = copy.deepcopy(getattr(self, "_group_states", {})) + partition_states = copy.deepcopy(getattr(self, "_partition_states", {})) global_ga = copy.copy(getattr(self, "_global_ga", None)) global_times = copy.copy(getattr(self, "_global_times", None)) try: @@ -1048,11 +1384,14 @@ def _backup(self) -> Iterator[None]: finally: self.ga = ga self.transforms = lag_tfms + self.target_transforms = target_tfms if global_ga is not None or global_times is not None: self._global_ga = global_ga self._global_times = global_times if group_states: self._group_states = group_states + if partition_states: + self._partition_states = partition_states def _predict_setup(self) -> None: # TODO: move to utils @@ -1063,6 +1402,8 @@ def _predict_setup(self) -> None: self.test_dates: List[Union[pd.Index, pl_Series]] = [] self.y_pred = [] self._h = 0 + self._current_partition_keys = None + self._current_step_x = None def _predict_recursive( self, @@ -1371,7 +1712,9 @@ def update(self, df: DataFrame, validate_new_data: bool = False) -> None: values = df[self.target_col].to_numpy() values = values.astype(self.ga.data.dtype, copy=False) self._check_aligned_ends() - if self._get_global_tfms() or self._get_group_tfms(): + partition_tfms = self._get_partition_tfms() + has_partition_agg = any(mode in {"global", "group"} for mode, _, _ in partition_tfms) + if self._get_global_tfms() or self._get_group_tfms() or has_partition_agg: if isinstance(df, pd.DataFrame): expected_ids = pd.Index(uids).union(pd.Index(new_ids)) expected_count = len(expected_ids) @@ -1563,3 +1906,38 @@ def _attach_group_id(data, groups, cols): .to_numpy() ) state["group_idx"] = series_group_id.astype(np.int64, copy=False) + if self._partition_states: + for state in self._partition_states.values(): + old_n_buckets = len(state["groups"]) + bucket_df = self._build_partition_bucket_df( + df, + state["mode"], + state["group_cols"], + state["partition_cols"], + ) + bucket_ids = self._ensure_partition_bucket_ids(state, bucket_df) + if isinstance(bucket_df, pd.DataFrame): + bucket_df = bucket_df.copy() + bucket_df["_bucket_id"] = bucket_ids + else: + bucket_df = bucket_df.with_columns( + pl.Series(name="_bucket_id", values=bucket_ids) + ) + bucket_df = ufp.sort(bucket_df, by=["_bucket_id", self.time_col]) + id_counts = ufp.counts_by_id(bucket_df, "_bucket_id") + if isinstance(state["groups"], pd.DataFrame): + all_ids = state["groups"][["_bucket_id"]].copy() + else: + all_ids = state["groups"].select("_bucket_id") + sizes = ufp.join(all_ids, id_counts, on="_bucket_id", how="left") + sizes = ufp.fill_null(sizes, {"counts": 0}) + sizes = ufp.sort(sizes, by="_bucket_id") + new_groups = sizes["_bucket_id"].to_numpy() >= old_n_buckets + bucket_values = bucket_df[self.target_col].to_numpy().astype( + self.ga.data.dtype, copy=False + ) + state["ga"] = state["ga"].append_several( + new_sizes=sizes["counts"].to_numpy().astype(np.int32), + new_values=bucket_values, + new_groups=new_groups, + ) diff --git a/mlforecast/lag_transforms.py b/mlforecast/lag_transforms.py index c7108740..3d9ce0f3 100644 --- a/mlforecast/lag_transforms.py +++ b/mlforecast/lag_transforms.py @@ -34,16 +34,16 @@ def _pascal2camel(pascal_str: str) -> str: return re.sub(r"(? "_BaseLagTransform": init_args.pop("global_", None) init_args.pop("global", None) init_args.pop("groupby", None) + init_args.pop("partition_by", None) self._core_tfm = getattr(core_tfms, self.__class__.__name__)( lag=lag, **init_args ) @@ -66,19 +67,26 @@ def _set_core_tfm(self, lag: int) -> "_BaseLagTransform": def _get_name(self, lag: int) -> str: init_params = self._get_init_signature() - prefix = "" + prefix_parts = [] groupby = getattr(self, "groupby", None) + partition_by = getattr(self, "partition_by", None) if getattr(self, "global_", False): - prefix = "global_" - elif groupby: + prefix_parts.append("global") + if groupby: group_str = "__".join(groupby) - prefix = f"groupby_{group_str}_" + prefix_parts.append(f"groupby_{group_str}") + if partition_by: + partition_str = "__".join(partition_by) + prefix_parts.append(f"partition_by_{partition_str}") + prefix = "_".join(prefix_parts) + if prefix: + prefix += "_" result = f"{prefix}{_pascal2camel(self.__class__.__name__)}_lag{lag}" changed_params = [ f"{name}{getattr(self, name)}" for name, arg in init_params.items() if arg.default != getattr(self, name) - and name not in {"global_", "groupby"} + and name not in {"global_", "groupby", "partition_by"} ] if changed_params: result += "_" + "_".join(changed_params) @@ -140,6 +148,7 @@ def __init__( min_samples: Optional[int] = None, global_: bool = False, groupby: Optional[Sequence[str]] = None, + partition_by: Optional[Sequence[str]] = None, **kwargs, ): """ @@ -151,17 +160,22 @@ def __init__( Requires all series to end at the same timestamp. Defaults to False. groupby (Sequence[str], optional): Column names to group by before computing the statistic. Columns must be static features. Mutually exclusive with `global_`. Defaults to None. + partition_by (Sequence[str], optional): Column names used to partition observations + before computing the statistic. Defaults to None. """ if "global" in kwargs: global_ = kwargs.pop("global") if "groupby" in kwargs: groupby = kwargs.pop("groupby") + if "partition_by" in kwargs: + partition_by = kwargs.pop("partition_by") if kwargs: raise TypeError(f"Unexpected keyword arguments: {list(kwargs)}") self.window_size = window_size self.min_samples = min_samples self.global_ = global_ - self.groupby = _normalize_groupby(groupby) + self.groupby = _normalize_columns(groupby) + self.partition_by = _normalize_columns(partition_by) if self.global_ and self.groupby: raise ValueError("`global_` and `groupby` can't be used together.") @@ -190,6 +204,7 @@ def __init__( min_samples: Optional[int] = None, global_: bool = False, groupby: Optional[Sequence[str]] = None, + partition_by: Optional[Sequence[str]] = None, **kwargs, ): super().__init__( @@ -197,6 +212,7 @@ def __init__( min_samples=min_samples, global_=global_, groupby=groupby, + partition_by=partition_by, **kwargs, ) self.p = p @@ -221,6 +237,7 @@ def __init__( min_samples: Optional[int] = None, global_: bool = False, groupby: Optional[Sequence[str]] = None, + partition_by: Optional[Sequence[str]] = None, **kwargs, ): """ @@ -233,18 +250,23 @@ def __init__( Requires all series to end at the same timestamp. Defaults to False. groupby (Sequence[str], optional): Column names to group by before computing the statistic. Columns must be static features. Mutually exclusive with `global_`. Defaults to None. + partition_by (Sequence[str], optional): Column names used to partition observations + before computing the statistic. Defaults to None. """ if "global" in kwargs: global_ = kwargs.pop("global") if "groupby" in kwargs: groupby = kwargs.pop("groupby") + if "partition_by" in kwargs: + partition_by = kwargs.pop("partition_by") if kwargs: raise TypeError(f"Unexpected keyword arguments: {list(kwargs)}") self.season_length = season_length self.window_size = window_size self.min_samples = min_samples self.global_ = global_ - self.groupby = _normalize_groupby(groupby) + self.groupby = _normalize_columns(groupby) + self.partition_by = _normalize_columns(partition_by) if self.global_ and self.groupby: raise ValueError("`global_` and `groupby` can't be used together.") @@ -274,6 +296,7 @@ def __init__( min_samples: Optional[int] = None, global_: bool = False, groupby: Optional[Sequence[str]] = None, + partition_by: Optional[Sequence[str]] = None, **kwargs, ): super().__init__( @@ -282,6 +305,7 @@ def __init__( min_samples=min_samples, global_=global_, groupby=groupby, + partition_by=partition_by, **kwargs, ) self.p = p @@ -301,16 +325,20 @@ def __init__( self, global_: bool = False, groupby: Optional[Sequence[str]] = None, + partition_by: Optional[Sequence[str]] = None, **kwargs, ): if "global" in kwargs: global_ = kwargs.pop("global") if "groupby" in kwargs: groupby = kwargs.pop("groupby") + if "partition_by" in kwargs: + partition_by = kwargs.pop("partition_by") if kwargs: raise TypeError(f"Unexpected keyword arguments: {list(kwargs)}") self.global_ = global_ - self.groupby = _normalize_groupby(groupby) + self.groupby = _normalize_columns(groupby) + self.partition_by = _normalize_columns(partition_by) if self.global_ and self.groupby: raise ValueError("`global_` and `groupby` can't be used together.") @@ -337,9 +365,15 @@ def __init__( p: float, global_: bool = False, groupby: Optional[Sequence[str]] = None, + partition_by: Optional[Sequence[str]] = None, **kwargs, ): - super().__init__(global_=global_, groupby=groupby, **kwargs) + super().__init__( + global_=global_, + groupby=groupby, + partition_by=partition_by, + **kwargs, + ) self.p = p @property @@ -363,17 +397,21 @@ def __init__( alpha: float, global_: bool = False, groupby: Optional[Sequence[str]] = None, + partition_by: Optional[Sequence[str]] = None, **kwargs, ): if "global" in kwargs: global_ = kwargs.pop("global") if "groupby" in kwargs: groupby = kwargs.pop("groupby") + if "partition_by" in kwargs: + partition_by = kwargs.pop("partition_by") if kwargs: raise TypeError(f"Unexpected keyword arguments: {list(kwargs)}") self.alpha = alpha self.global_ = global_ - self.groupby = _normalize_groupby(groupby) + self.groupby = _normalize_columns(groupby) + self.partition_by = _normalize_columns(partition_by) if self.global_ and self.groupby: raise ValueError("`global_` and `groupby` can't be used together.") @@ -395,6 +433,7 @@ def __init__(self, tfm: _BaseLagTransform, n: int): self.n = n self.global_ = getattr(tfm, "global_", False) self.groupby = getattr(tfm, "groupby", None) + self.partition_by = getattr(tfm, "partition_by", None) def _get_name(self, lag: int) -> str: return self.tfm._get_name(lag + self.n) @@ -428,12 +467,17 @@ def __init__( global_2 = getattr(tfm2, "global_", False) groupby_1 = getattr(tfm1, "groupby", None) groupby_2 = getattr(tfm2, "groupby", None) + partition_by_1 = getattr(tfm1, "partition_by", None) + partition_by_2 = getattr(tfm2, "partition_by", None) if global_1 != global_2: raise ValueError("Can't combine transforms with different global_ settings.") if (groupby_1 or groupby_2) and groupby_1 != groupby_2: raise ValueError("Can't combine transforms with different groupby settings.") + if (partition_by_1 or partition_by_2) and partition_by_1 != partition_by_2: + raise ValueError("Can't combine transforms with different partition_by settings.") self.global_ = global_1 self.groupby = groupby_1 + self.partition_by = partition_by_1 def _set_core_tfm(self, lag: int) -> "Combine": self.tfm1 = copy.deepcopy(self.tfm1)._set_core_tfm(lag) diff --git a/tests/test_auto.py b/tests/test_auto.py index 049ac276..d7099a18 100644 --- a/tests/test_auto.py +++ b/tests/test_auto.py @@ -1,11 +1,13 @@ import time +import numpy as np import optuna import pandas as pd import polars as pl import pytest from datasetsforecast.m4 import M4, M4Info from sklearn.compose import ColumnTransformer +from sklearn.ensemble import HistGradientBoostingRegressor from sklearn.linear_model import Ridge from sklearn.pipeline import make_pipeline from sklearn.preprocessing import OneHotEncoder @@ -14,10 +16,11 @@ AutoLightGBM, AutoMLForecast, AutoModel, - AutoRidge, PredictionIntervals, ridge_space, ) +from mlforecast.lag_transforms import ExpandingMean +from mlforecast.utils import generate_daily_series from .conftest import assert_raises_with_message @@ -38,6 +41,31 @@ def weekly_data(): return train, valid, M4Info[group] +def _make_partition_series(n_series: int = 4, length: int = 50) -> pd.DataFrame: + df = generate_daily_series( + n_series=n_series, + min_length=length, + max_length=length, + n_static_features=0, + ) + brand_map = { + uid: i % 2 + for i, uid in enumerate(df["unique_id"].cat.categories) + } + df["brand"] = df["unique_id"].map(brand_map).astype("int8") + df["promo"] = (df["ds"].dt.dayofweek >= 5).astype("int8") + return df + + +def _make_partition_future(df: pd.DataFrame, h: int) -> pd.DataFrame: + last_dates = df.groupby("unique_id", observed=True)["ds"].max().reset_index() + future = last_dates.loc[last_dates.index.repeat(h)].copy() + future["step"] = np.tile(np.arange(1, h + 1), last_dates.shape[0]) + future["ds"] = future["ds"] + pd.to_timedelta(future["step"], unit="D") + future["promo"] = (future["ds"].dt.dayofweek >= 5).astype("int8") + return future[["unique_id", "ds", "promo"]].reset_index(drop=True) + + def test_automlforecast_pipeline(weekly_data): train, valid, info = weekly_data h = info.horizon @@ -76,6 +104,42 @@ def test_automlforecast_pipeline(weekly_data): assert not preds.empty fitted_vals = auto_mlf.forecast_fitted_values(level=[95]) assert not fitted_vals.empty + + +def test_automlforecast_with_partition_by_transform(): + df = _make_partition_series() + h = 5 + + auto_mlf = AutoMLForecast( + freq="D", + init_config=lambda trial: { + "lags": [1, 2, 7], + "lag_transforms": { + 1: [ExpandingMean(groupby=["brand"], partition_by=["promo"])] + }, + }, + fit_config=lambda trial: {"static_features": ["brand"], "dropna": False}, + models={ + "hgb": AutoModel( + HistGradientBoostingRegressor(random_state=0, max_depth=3), + lambda trial: {}, + ) + }, + num_threads=1, + ) + + auto_mlf.fit( + df=df, + n_windows=1, + h=h, + num_samples=1, + optimize_kwargs={}, + ) + preds = auto_mlf.predict(h, X_df=_make_partition_future(df, h)) + + assert not preds.empty + assert preds.shape[0] == df["unique_id"].nunique() * h + assert "hgb" in preds.columns def test_automlforecast_weight_col(weekly_data): @@ -313,4 +377,4 @@ def init_config(trial): preds_b = automl_b.predict(h=h) assert preds_a.columns.tolist() == preds_b.columns.tolist() - assert (preds_a["ridge"].to_numpy() == preds_b["ridge"].to_numpy()).all() \ No newline at end of file + assert (preds_a["ridge"].to_numpy() == preds_b["ridge"].to_numpy()).all() diff --git a/tests/test_core.py b/tests/test_core.py index cb474e0b..8f8f3791 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -47,6 +47,39 @@ def _expanding_mean(x): def _rolling_mean(x, window_size): return pd.Series(x).rolling(window_size).mean().to_numpy() + +def _make_partition_df(engine, include_brand=False): + data = { + "unique_id": ["a", "a", "a", "a", "b", "b", "b", "b"], + "ds": [1, 2, 3, 4, 1, 2, 3, 4], + "y": [1, 2, 3, 4, 10, 20, 30, 40], + "promo": [True, True, False, True, False, True, False, True], + } + if include_brand: + data["brand"] = ["x"] * 8 + if engine == "polars": + return pl.DataFrame(data).with_columns(pl.col("unique_id").cast(pl.Categorical)) + return pd.DataFrame(data) + + +def _make_partition_future_df(engine, include_brand=False): + data = { + "unique_id": ["a", "a", "b", "b"], + "ds": [5, 6, 5, 6], + "promo": [False, True, True, False], + } + if include_brand: + data["brand"] = ["x"] * 4 + if engine == "polars": + return pl.DataFrame(data).with_columns(pl.col("unique_id").cast(pl.Categorical)) + return pd.DataFrame(data) + + +def _maybe_to_pandas(df): + if isinstance(df, pl.DataFrame): + return df.to_pandas() + return df + def test_build_function_transform_name(): assert _build_function_transform_name(_expanding_mean, 1) == "_expanding_mean_lag1" assert _build_function_transform_name(_rolling_mean, 2, 7) == "_rolling_mean_lag2_window_size7" @@ -674,6 +707,342 @@ def test_group_lag_transform(engine): ) +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +def test_partition_lag_transform(engine): + df = _make_partition_df(engine) + tfm = RollingMean(3, min_samples=1, partition_by=["promo"]) + ts = TimeSeries(freq=1, lag_transforms={1: [tfm]}) + prep = ts.fit_transform( + df, + id_col="unique_id", + time_col="ds", + target_col="y", + dropna=False, + static_features=[], + ) + expected_by_key = { + ("a", 1): np.nan, + ("a", 2): 1.0, + ("a", 3): np.nan, + ("a", 4): 1.5, + ("b", 1): np.nan, + ("b", 2): np.nan, + ("b", 3): 10.0, + ("b", 4): 20.0, + } + if engine == "polars": + expected = np.array( + [ + expected_by_key[(uid, ds)] + for uid, ds in zip(prep["unique_id"].to_list(), prep["ds"].to_list()) + ], + dtype=float, + ) + else: + expected = np.array( + [expected_by_key[(uid, ds)] for uid, ds in zip(prep["unique_id"], prep["ds"])], + dtype=float, + ) + col = tfm._get_name(1) + np.testing.assert_allclose(prep[col].to_numpy(), expected, equal_nan=True) + + +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +def test_group_partition_lag_transform(engine): + df = _make_partition_df(engine, include_brand=True) + tfm = RollingMean(2, min_samples=1, groupby=["brand"], partition_by=["promo"]) + ts = TimeSeries(freq=1, lag_transforms={1: [tfm]}) + prep = ts.fit_transform( + df, + id_col="unique_id", + time_col="ds", + target_col="y", + dropna=False, + static_features=["brand"], + ) + expected_by_key = { + ("a", 1): np.nan, + ("a", 2): 1.0, + ("a", 3): 10.0, + ("a", 4): 11.5, + ("b", 1): np.nan, + ("b", 2): 1.0, + ("b", 3): 10.0, + ("b", 4): 11.5, + } + if engine == "polars": + expected = np.array( + [ + expected_by_key[(uid, ds)] + for uid, ds in zip(prep["unique_id"].to_list(), prep["ds"].to_list()) + ], + dtype=float, + ) + else: + expected = np.array( + [expected_by_key[(uid, ds)] for uid, ds in zip(prep["unique_id"], prep["ds"])], + dtype=float, + ) + col = tfm._get_name(1) + np.testing.assert_allclose(prep[col].to_numpy(), expected, equal_nan=True) + + +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +def test_global_partition_lag_transform(engine): + df = _make_partition_df(engine) + tfm = RollingMean(2, min_samples=1, global_=True, partition_by=["promo"]) + ts = TimeSeries(freq=1, lag_transforms={1: [tfm]}) + prep = ts.fit_transform( + df, + id_col="unique_id", + time_col="ds", + target_col="y", + dropna=False, + ) + expected_by_key = { + ("a", 1): np.nan, + ("a", 2): 1.0, + ("a", 3): 10.0, + ("a", 4): 11.5, + ("b", 1): np.nan, + ("b", 2): 1.0, + ("b", 3): 10.0, + ("b", 4): 11.5, + } + if engine == "polars": + expected = np.array( + [ + expected_by_key[(uid, ds)] + for uid, ds in zip(prep["unique_id"].to_list(), prep["ds"].to_list()) + ], + dtype=float, + ) + else: + expected = np.array( + [expected_by_key[(uid, ds)] for uid, ds in zip(prep["unique_id"], prep["ds"])], + dtype=float, + ) + col = tfm._get_name(1) + np.testing.assert_allclose(prep[col].to_numpy(), expected, equal_nan=True) + + +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +def test_partition_lag_transform_predict(engine): + df = _make_partition_df(engine) + future = _make_partition_future_df(engine) + tfm = ExpandingMean(partition_by=["promo"]) + ts = TimeSeries(freq=1, lag_transforms={1: [tfm]}) + ts.fit_transform( + df, + id_col="unique_id", + time_col="ds", + target_col="y", + dropna=False, + static_features=[], + ) + feats = SaveFeatures() + ts.predict({"y": A()}, 2, X_df=future, before_predict_callback=feats) + features = _maybe_to_pandas(feats.get_features(with_step=True)) + col = tfm._get_name(1) + np.testing.assert_allclose( + features.loc[features["step"].eq(0), col].to_numpy(), + np.array([np.nan, 30.0]), + equal_nan=True, + ) + np.testing.assert_allclose( + features.loc[features["step"].eq(1), col].to_numpy(), + np.array([2.3333333333333335, 20.0]), + equal_nan=True, + ) + + +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +def test_group_partition_lag_transform_predict(engine): + df = _make_partition_df(engine, include_brand=True) + future = _make_partition_future_df(engine) + tfm = ExpandingMean(groupby=["brand"], partition_by=["promo"]) + ts = TimeSeries(freq=1, lag_transforms={1: [tfm]}) + ts.fit_transform( + df, + id_col="unique_id", + time_col="ds", + target_col="y", + dropna=False, + static_features=["brand"], + ) + feats = SaveFeatures() + ts.predict({"y": A()}, 2, X_df=future, before_predict_callback=feats) + features = _maybe_to_pandas(feats.get_features(with_step=True)) + col = tfm._get_name(1) + np.testing.assert_allclose( + features.loc[features["step"].eq(0), col].to_numpy(), + np.array([21.5, 22.333333333333332]), + equal_nan=True, + ) + np.testing.assert_allclose( + features.loc[features["step"].eq(1), col].to_numpy(), + np.array([16.75, 14.333333333333334]), + equal_nan=True, + ) + + +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +def test_global_partition_lag_transform_predict(engine): + df = _make_partition_df(engine) + future = _make_partition_future_df(engine) + tfm = ExpandingMean(global_=True, partition_by=["promo"]) + ts = TimeSeries(freq=1, lag_transforms={1: [tfm]}) + ts.fit_transform( + df, + id_col="unique_id", + time_col="ds", + target_col="y", + dropna=False, + static_features=[], + ) + feats = SaveFeatures() + ts.predict({"y": A()}, 2, X_df=future, before_predict_callback=feats) + features = _maybe_to_pandas(feats.get_features(with_step=True)) + col = tfm._get_name(1) + np.testing.assert_allclose( + features.loc[features["step"].eq(0), col].to_numpy(), + np.array([21.5, 22.333333333333332]), + equal_nan=True, + ) + np.testing.assert_allclose( + features.loc[features["step"].eq(1), col].to_numpy(), + np.array([16.75, 14.333333333333334]), + equal_nan=True, + ) + + +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +def test_partition_lag_transform_update(engine): + df = _make_partition_df(engine) + if engine == "polars": + update_df = pl.DataFrame( + { + "unique_id": ["a", "b"], + "ds": [5, 5], + "y": [5, 50], + "promo": [False, True], + } + ).with_columns(pl.col("unique_id").cast(pl.Categorical)) + step_df = pl.DataFrame( + { + "unique_id": ["a", "b"], + "ds": [6, 6], + "promo": [True, False], + } + ).with_columns(pl.col("unique_id").cast(pl.Categorical)) + else: + update_df = pd.DataFrame( + { + "unique_id": ["a", "b"], + "ds": [5, 5], + "y": [5, 50], + "promo": [False, True], + } + ) + step_df = pd.DataFrame( + { + "unique_id": ["a", "b"], + "ds": [6, 6], + "promo": [True, False], + } + ) + tfm = ExpandingMean(partition_by=["promo"]) + ts = TimeSeries(freq=1, lag_transforms={1: [tfm]}) + ts.fit_transform( + df, + id_col="unique_id", + time_col="ds", + target_col="y", + dropna=False, + static_features=[], + ) + ts.update(update_df) + ts._predict_setup() + feats = ts._get_features_for_next_step(step_df) + col = tfm._get_name(1) + np.testing.assert_allclose( + feats[col].to_numpy(), + np.array([2.3333333333333335, 20.0]), + equal_nan=True, + ) + + +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +@pytest.mark.parametrize( + ("tfm", "fit_kwargs", "expected"), + [ + ( + ExpandingMean(groupby=["brand"], partition_by=["promo"]), + {"static_features": ["brand"]}, + np.array([29.25, 16.0]), + ), + ( + ExpandingMean(global_=True, partition_by=["promo"]), + {"static_features": []}, + np.array([29.25, 16.0]), + ), + ], +) +def test_aggregated_partition_lag_transform_update(engine, tfm, fit_kwargs, expected): + include_brand = getattr(tfm, "groupby", None) is not None + df = _make_partition_df(engine, include_brand=include_brand) + if engine == "polars": + update_data = { + "unique_id": ["a", "b"], + "ds": [5, 5], + "y": [5, 50], + "promo": [False, True], + } + if include_brand: + update_data["brand"] = ["x", "x"] + update_df = pl.DataFrame(update_data).with_columns( + pl.col("unique_id").cast(pl.Categorical) + ) + step_df = pl.DataFrame( + { + "unique_id": ["a", "b"], + "ds": [6, 6], + "promo": [True, False], + } + ).with_columns(pl.col("unique_id").cast(pl.Categorical)) + else: + update_data = { + "unique_id": ["a", "b"], + "ds": [5, 5], + "y": [5, 50], + "promo": [False, True], + } + if include_brand: + update_data["brand"] = ["x", "x"] + update_df = pd.DataFrame(update_data) + step_df = pd.DataFrame( + { + "unique_id": ["a", "b"], + "ds": [6, 6], + "promo": [True, False], + } + ) + ts = TimeSeries(freq=1, lag_transforms={1: [tfm]}) + ts.fit_transform( + df, + id_col="unique_id", + time_col="ds", + target_col="y", + dropna=False, + **fit_kwargs, + ) + ts.update(update_df) + ts._predict_setup() + feats = ts._get_features_for_next_step(step_df) + col = tfm._get_name(1) + np.testing.assert_allclose(feats[col].to_numpy(), expected, equal_nan=True) + + @pytest.mark.parametrize("engine", ["pandas", "polars"]) def test_global_lag_transform_requires_aligned_ends(engine): if engine == "polars": diff --git a/tests/test_forecast.py b/tests/test_forecast.py index ee7349f4..0f353d1d 100644 --- a/tests/test_forecast.py +++ b/tests/test_forecast.py @@ -12,6 +12,7 @@ import utilsforecast.processing as ufp import xgboost as xgb from sklearn import set_config +from sklearn.ensemble import HistGradientBoostingRegressor from sklearn.linear_model import LinearRegression from utilsforecast.feature_engineering import fourier, time_features from utilsforecast.processing import match_if_categorical @@ -99,6 +100,37 @@ def predictions(fitted_fcst): return fitted_fcst.predict(horizon) +def _make_partition_series( + n_series: int = 4, length: int = 50, uid_prefix: str = "" +) -> pd.DataFrame: + df = generate_daily_series( + n_series=n_series, + min_length=length, + max_length=length, + n_static_features=0, + ) + if uid_prefix: + df["unique_id"] = df["unique_id"].cat.rename_categories( + {c: f"{uid_prefix}{c}" for c in df["unique_id"].cat.categories} + ) + brand_map = { + uid: i % 2 + for i, uid in enumerate(df["unique_id"].cat.categories) + } + df["brand"] = df["unique_id"].map(brand_map).astype(np.int8) + df["promo"] = (df["ds"].dt.dayofweek >= 5).astype(np.int8) + return df + + +def _make_partition_future(df: pd.DataFrame, h: int) -> pd.DataFrame: + last_dates = df.groupby("unique_id", observed=True)["ds"].max().reset_index() + future = last_dates.loc[last_dates.index.repeat(h)].copy() + future["step"] = np.tile(np.arange(1, h + 1), last_dates.shape[0]) + future["ds"] = future["ds"] + pd.to_timedelta(future["step"], unit="D") + future["promo"] = (future["ds"].dt.dayofweek >= 5).astype(np.int8) + return future[["unique_id", "ds", "promo"]].reset_index(drop=True) + + def test_missing_future(fcst, setup_forecast_data): df, train, _ = setup_forecast_data train2 = train.copy() @@ -231,6 +263,29 @@ def test_prediction_intervals_monotonicity(predictions_w_intervals): assert monotonic_count == len(predictions_w_intervals) +def test_partition_by_cross_validation_refit_false(): + df = _make_partition_series() + fcst = MLForecast( + models=[HistGradientBoostingRegressor(random_state=0, max_depth=3)], + freq="D", + lags=[1, 2, 7], + lag_transforms={1: [ExpandingMean(groupby=["brand"], partition_by=["promo"])]}, + ) + + cv_results = fcst.cross_validation( + df, + n_windows=2, + h=5, + static_features=["brand"], + refit=False, + dropna=False, + ) + + assert "HistGradientBoostingRegressor" in cv_results.columns + assert cv_results.shape[0] == df["unique_id"].nunique() * 2 * 5 + assert cv_results["HistGradientBoostingRegressor"].notna().all() + + def test_indexed_data_datetime_ds(): # test indexed data, datetime ds fcst_test = MLForecast( @@ -1780,3 +1835,23 @@ def test_transfer_learning_with_sparse_horizons(): # Only horizons 3 and 10 should be returned assert preds.shape[0] == n_series * 2 assert set(preds["unique_id"].unique()) == set(train_b["unique_id"].unique()) + + +def test_transfer_learning_with_partition_by_group_transform(): + train_a = _make_partition_series(n_series=3, length=60) + train_b = _make_partition_series(n_series=2, length=60, uid_prefix="new_") + future_b = _make_partition_future(train_b, h=5) + + fcst = MLForecast( + models=[HistGradientBoostingRegressor(random_state=0, max_depth=3)], + freq="D", + lags=[1, 2, 7], + lag_transforms={1: [ExpandingMean(groupby=["brand"], partition_by=["promo"])]}, + ) + fcst.fit(train_a, static_features=["brand"], dropna=False) + + preds = fcst.predict(h=5, new_df=train_b, X_df=future_b) + + assert preds.shape[0] == train_b["unique_id"].nunique() * 5 + assert set(preds["unique_id"].unique()) == set(train_b["unique_id"].unique()) + assert preds["HistGradientBoostingRegressor"].notna().all() From 6a26ef562548f7441a722f9cfe50eb411536c384 Mon Sep 17 00:00:00 2001 From: janrth Date: Tue, 31 Mar 2026 23:43:10 +0200 Subject: [PATCH 2/4] Fix partition-by edge cases and regressions --- mlforecast/core.py | 45 ++++++++++------ mlforecast/lag_transforms.py | 26 +++++++++ tests/test_auto.py | 26 +++++---- tests/test_core.py | 101 +++++++++++++++++++++++++++++++++++ 4 files changed, 174 insertions(+), 24 deletions(-) diff --git a/mlforecast/core.py b/mlforecast/core.py index 0c0505ab..dffc8217 100644 --- a/mlforecast/core.py +++ b/mlforecast/core.py @@ -309,6 +309,11 @@ def _get_partition_tfms( grouped.setdefault(key, {})[name] = tfm return grouped + def _has_aggregated_partition_tfms(self) -> bool: + return any( + mode in {"global", "group"} for mode, _, _ in self._get_partition_tfms() + ) + def _get_local_tfms( self, transforms: Mapping[str, Union[Tuple[Any, ...], _BaseLagTransform]], @@ -472,7 +477,9 @@ def _compute_partition_features( core_ga = CoreGroupedArray(state["ga"].data, state["ga"].indptr) updates = {} for name, tfm in state["tfms"].items(): - fresh_tfm = copy.deepcopy(tfm)._set_core_tfm(tfm._core_tfm.lag) + fresh_tfm = copy.deepcopy(tfm)._set_core_tfm( + tfm._get_configured_lag() + ) fresh_tfm.transform(core_ga) updates[name] = fresh_tfm.update(core_ga) bucket_ids = self._lookup_bucket_ids( @@ -516,9 +523,11 @@ def _update_partition_states(self, new: np.ndarray) -> None: def _check_aligned_ends(self) -> None: """Check that all series end at the same timestamp when using global/group transforms.""" - partition_tfms = self._get_partition_tfms() - needs_alignment = any(mode in {"global", "group"} for mode, _, _ in partition_tfms) - if not (self._get_global_tfms() or self._get_group_tfms() or needs_alignment): + if not ( + self._get_global_tfms() + or self._get_group_tfms() + or self._has_aggregated_partition_tfms() + ): return if isinstance(self.last_dates, pd.Index): aligned = self.last_dates.nunique() == 1 @@ -637,17 +646,19 @@ def _fit( self._global_times = pd.Index(global_df[time_col]) else: self._global_times = global_df[time_col] + partition_tfms = self._get_partition_tfms() + group_tfms = self._get_group_tfms() to_drop = [id_col, time_col, target_col] if static_features is None: - partition_cols = { + partition_feature_cols = { col - for _, _, partition_cols in self._get_partition_tfms().keys() - for col in partition_cols + for _, _, partition_cols_key in partition_tfms.keys() + for col in partition_cols_key } static_features = [ c for c in df.columns - if c not in [time_col, target_col] and c not in partition_cols + if c not in [time_col, target_col] and c not in partition_feature_cols ] elif id_col not in static_features: static_features = [id_col, *static_features] @@ -681,7 +692,7 @@ def _fit( f for f in self.features if f not in df.columns ] df_for_special = df - if self.target_transforms is not None: + if self.target_transforms is not None and (partition_tfms or group_tfms): transformed_target = ga.data if self._restore_idxs is not None: transformed_target = transformed_target[self._restore_idxs] @@ -693,7 +704,6 @@ def _fit( ) self._partition_states = {} - partition_tfms = self._get_partition_tfms() if partition_tfms: for (mode, group_cols, partition_cols), tfms in partition_tfms.items(): for col in partition_cols: @@ -743,7 +753,6 @@ def _fit( "groups": groups, } self._group_states: Dict[Tuple[str, ...], Dict[str, Any]] = {} - group_tfms = self._get_group_tfms() if group_tfms: def _add_group_id(data, cols): if isinstance(data, pd.DataFrame): @@ -1578,7 +1587,11 @@ def predict( X_df: Optional[DFType] = None, ids: Optional[List[str]] = None, ) -> DFType: - if ids is not None and (self._get_global_tfms() or self._get_group_tfms()): + if ids is not None and ( + self._get_global_tfms() + or self._get_group_tfms() + or self._has_aggregated_partition_tfms() + ): raise ValueError( "Cannot use `ids` with global or group lag transforms. " "These transforms require forecasting all series together." @@ -1712,9 +1725,11 @@ def update(self, df: DataFrame, validate_new_data: bool = False) -> None: values = df[self.target_col].to_numpy() values = values.astype(self.ga.data.dtype, copy=False) self._check_aligned_ends() - partition_tfms = self._get_partition_tfms() - has_partition_agg = any(mode in {"global", "group"} for mode, _, _ in partition_tfms) - if self._get_global_tfms() or self._get_group_tfms() or has_partition_agg: + if ( + self._get_global_tfms() + or self._get_group_tfms() + or self._has_aggregated_partition_tfms() + ): if isinstance(df, pd.DataFrame): expected_ids = pd.Index(uids).union(pd.Index(new_ids)) expected_count = len(expected_ids) diff --git a/mlforecast/lag_transforms.py b/mlforecast/lag_transforms.py index 3d9ce0f3..10798213 100644 --- a/mlforecast/lag_transforms.py +++ b/mlforecast/lag_transforms.py @@ -111,6 +111,9 @@ def stack(transforms: Sequence["_BaseLagTransform"]) -> "_BaseLagTransform": ) return out + def _get_configured_lag(self) -> int: + return self._core_tfm.lag + @property def _lag(self): return self._core_tfm.lag - 1 @@ -443,6 +446,9 @@ def _set_core_tfm(self, lag: int) -> "Offset": self._core_tfm = self.tfm._core_tfm return self + def _get_configured_lag(self) -> int: + return self.tfm._get_configured_lag() - self.n + @property def update_samples(self) -> int: return self.tfm.update_samples + self.n @@ -495,6 +501,26 @@ def transform(self, ga: CoreGroupedArray) -> np.ndarray: def update(self, ga: CoreGroupedArray) -> np.ndarray: return self.operator(self.tfm1.update(ga), self.tfm2.update(ga)) + def take(self, idxs: np.ndarray) -> "Combine": + out = copy.deepcopy(self) + out.tfm1 = self.tfm1.take(idxs) + out.tfm2 = self.tfm2.take(idxs) + return out + + @staticmethod + def stack(transforms: Sequence["Combine"]) -> "Combine": + out = copy.deepcopy(transforms[0]) + out.tfm1 = transforms[0].tfm1.stack([tfm.tfm1 for tfm in transforms]) + out.tfm2 = transforms[0].tfm2.stack([tfm.tfm2 for tfm in transforms]) + return out + + def _get_configured_lag(self) -> int: + lag1 = self.tfm1._get_configured_lag() + lag2 = self.tfm2._get_configured_lag() + if lag1 != lag2: + raise ValueError("Combined transforms must share the same configured lag.") + return lag1 + @property def update_samples(self): return max(self.tfm1.update_samples, self.tfm2.update_samples) diff --git a/tests/test_auto.py b/tests/test_auto.py index d7099a18..6cc16188 100644 --- a/tests/test_auto.py +++ b/tests/test_auto.py @@ -16,6 +16,7 @@ AutoLightGBM, AutoMLForecast, AutoModel, + AutoRidge, PredictionIntervals, ridge_space, ) @@ -66,6 +67,10 @@ def _make_partition_future(df: pd.DataFrame, h: int) -> pd.DataFrame: return future[["unique_id", "ds", "promo"]].reset_index(drop=True) +def _ridge_init_config(trial): # noqa: ARG001 + return {"lags": [1, 2, 4]} + + def test_automlforecast_pipeline(weekly_data): train, valid, info = weekly_data h = info.horizon @@ -211,12 +216,11 @@ def test_automlforecast_errors_and_warnings(): def test_polars_input_compatibility(weekly_data): train, _, info = weekly_data h = info.horizon - season_length = info.seasonality train_pl = pl.from_pandas(train.astype({"unique_id": "str"})) auto_mlf = AutoMLForecast( freq=1, - season_length=season_length, + init_config=_ridge_init_config, models={"ridge": AutoRidge()}, num_threads=2, ) @@ -239,12 +243,11 @@ def test_polars_input_compatibility(weekly_data): def test_step_size_impact(weekly_data): train, _, info = weekly_data h = info.horizon - season_length = info.seasonality train_pl = pl.from_pandas(train.astype({"unique_id": "str"})) base = AutoMLForecast( freq=1, - season_length=season_length, + init_config=_ridge_init_config, models={"ridge": AutoRidge()}, num_threads=2, ) @@ -260,7 +263,7 @@ def test_step_size_impact(weekly_data): ) base2 = AutoMLForecast( freq=1, - season_length=season_length, + init_config=_ridge_init_config, models={"ridge": AutoRidge()}, num_threads=2, ) @@ -282,7 +285,6 @@ def test_step_size_impact(weekly_data): def test_nonstandard_column_names(weekly_data): train, _, info = weekly_data h = info.horizon - season_length = info.seasonality fit_kwargs = dict( n_windows=2, @@ -292,7 +294,9 @@ def test_nonstandard_column_names(weekly_data): optimize_kwargs={"timeout": 60}, ) model = AutoMLForecast( - freq=1, season_length=season_length, models={"ridge": AutoRidge()} + freq=1, + init_config=_ridge_init_config, + models={"ridge": AutoRidge()}, ) preds = model.fit(train, **fit_kwargs).predict(5) @@ -314,9 +318,13 @@ def test_nonstandard_column_names(weekly_data): def test_input_size_speedup(weekly_data): train, _, info = weekly_data h = info.horizon - season_length = info.seasonality model = AutoMLForecast( - freq=1, season_length=season_length, models={"ridge": AutoRidge()} + freq=1, + init_config=lambda trial: { # noqa: ARG005 + "lags": list(range(1, 25)), + "lag_transforms": {1: [ExpandingMean()]}, + }, + models={"ridge": AutoRidge()}, ) fit_kwargs = dict( n_windows=3, diff --git a/tests/test_core.py b/tests/test_core.py index 8f8f3791..11310e9d 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,4 +1,5 @@ import copy +import operator import datetime import tempfile import warnings @@ -18,7 +19,9 @@ _name_models, ) from mlforecast.lag_transforms import ( + Combine, ExpandingMean, + Offset, RollingMean, RollingQuantile, RollingStd, @@ -856,6 +859,33 @@ def test_partition_lag_transform_predict(engine): ) +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +def test_partition_combine_lag_transform_predict(engine): + df = _make_partition_df(engine) + future = _make_partition_future_df(engine) + tfm = Combine( + ExpandingMean(partition_by=["promo"]), + Offset(ExpandingMean(partition_by=["promo"]), 1), + operator.add, + ) + ts = TimeSeries(freq=1, lag_transforms={1: [tfm]}) + ts.fit_transform( + df, + id_col="unique_id", + time_col="ds", + target_col="y", + dropna=False, + static_features=[], + ) + feats = SaveFeatures() + ts.predict({"y": A()}, 2, X_df=future, before_predict_callback=feats) + features = _maybe_to_pandas(feats.get_features(with_step=True)) + col = tfm._get_name(1) + + assert col in features.columns + assert not features[col].isna().all() + + @pytest.mark.parametrize("engine", ["pandas", "polars"]) def test_group_partition_lag_transform_predict(engine): df = _make_partition_df(engine, include_brand=True) @@ -916,6 +946,77 @@ def test_global_partition_lag_transform_predict(engine): ) +@pytest.mark.parametrize( + "engine, tfm, include_brand, static_features", + [ + ( + "pandas", + ExpandingMean(groupby=["brand"], partition_by=["promo"]), + True, + ["brand"], + ), + ( + "polars", + ExpandingMean(groupby=["brand"], partition_by=["promo"]), + True, + ["brand"], + ), + ( + "pandas", + ExpandingMean(global_=True, partition_by=["promo"]), + False, + [], + ), + ( + "polars", + ExpandingMean(global_=True, partition_by=["promo"]), + False, + [], + ), + ], +) +def test_aggregated_partition_lag_transform_predict_ids_error( + engine, tfm, include_brand, static_features +): + df = _make_partition_df(engine, include_brand=include_brand) + future = _make_partition_future_df(engine, include_brand=include_brand) + ts = TimeSeries(freq=1, lag_transforms={1: [tfm]}) + ts.fit_transform( + df, + id_col="unique_id", + time_col="ds", + target_col="y", + dropna=False, + static_features=static_features, + ) + + with pytest.raises( + ValueError, + match="Cannot use `ids` with global or group lag transforms", + ): + ts.predict({"y": A()}, 2, X_df=future, ids=["a"]) + + +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +def test_local_partition_lag_transform_predict_ids(engine): + df = _make_partition_df(engine) + future = _make_partition_future_df(engine) + tfm = ExpandingMean(partition_by=["promo"]) + ts = TimeSeries(freq=1, lag_transforms={1: [tfm]}) + ts.fit_transform( + df, + id_col="unique_id", + time_col="ds", + target_col="y", + dropna=False, + static_features=[], + ) + + preds = ts.predict({"y": A()}, 2, X_df=future, ids=["a"]) + preds = _maybe_to_pandas(preds) + assert preds["unique_id"].unique().tolist() == ["a"] + + @pytest.mark.parametrize("engine", ["pandas", "polars"]) def test_partition_lag_transform_update(engine): df = _make_partition_df(engine) From d22290ae0a9864a747c5623fad18b7e968bb0eb2 Mon Sep 17 00:00:00 2001 From: janrth Date: Wed, 1 Apr 2026 00:00:11 +0200 Subject: [PATCH 3/4] remove duplicates --- mlforecast/lag_transforms.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/mlforecast/lag_transforms.py b/mlforecast/lag_transforms.py index af7ea31a..10798213 100644 --- a/mlforecast/lag_transforms.py +++ b/mlforecast/lag_transforms.py @@ -524,16 +524,3 @@ def _get_configured_lag(self) -> int: @property def update_samples(self): return max(self.tfm1.update_samples, self.tfm2.update_samples) - - def take(self, idxs: np.ndarray) -> "Combine": - out = copy.deepcopy(self) - out.tfm1 = self.tfm1.take(idxs) - out.tfm2 = self.tfm2.take(idxs) - return out - - @staticmethod - def stack(transforms: Sequence["Combine"]) -> "Combine": - out = copy.deepcopy(transforms[0]) - out.tfm1 = transforms[0].tfm1.stack([tfm.tfm1 for tfm in transforms]) - out.tfm2 = transforms[0].tfm2.stack([tfm.tfm2 for tfm in transforms]) - return out From 0b735a8a39a4715657270d1e7a0b84094d032d77 Mon Sep 17 00:00:00 2001 From: Simone Zanin Date: Wed, 22 Apr 2026 14:16:38 +0200 Subject: [PATCH 4/4] fix: added new implementation to handle groupby and global_ modes for partition_by window aggregations --- mlforecast/core.py | 121 ++++++++++++++++++----- mlforecast/lag_transforms.py | 137 +++++++++++++++++++++++++- tests/test_core.py | 186 ++++++++++++++++++++++++++--------- 3 files changed, 371 insertions(+), 73 deletions(-) diff --git a/mlforecast/core.py b/mlforecast/core.py index 3ca56bca..47bb08f9 100644 --- a/mlforecast/core.py +++ b/mlforecast/core.py @@ -358,13 +358,14 @@ def _build_partition_bucket_df( bucket_df = df[key_cols + [self.time_col, self.target_col]] bucket_df = ufp.sort(bucket_df, by=key_cols + [self.time_col]) return ufp.drop_index_if_pandas(bucket_df) - bucket_df = ufp.group_by_agg( - df[key_cols + [self.time_col, self.target_col]], - key_cols + [self.time_col], - {self.target_col: "sum"}, - maintain_order=True, + # Keep individual rows (one per original series observation) so the transform + # sees each observation separately rather than a timestamp-level aggregate. + # id_col is included to enable a unique (id_col, time_col) join-back. + keep_cols = _dedupe_preserve_order( + [self.id_col] + key_cols + [self.time_col, self.target_col] ) - bucket_df = ufp.sort(bucket_df, by=key_cols + [self.time_col]) + bucket_df = df[keep_cols] + bucket_df = ufp.sort(bucket_df, by=key_cols + [self.time_col, self.id_col]) return ufp.drop_index_if_pandas(bucket_df) def _add_bucket_id( @@ -508,14 +509,22 @@ def _update_partition_states(self, new: np.ndarray) -> None: old_n_buckets = len(state["groups"]) bucket_ids = self._ensure_partition_bucket_ids(state, keys_df) new_n_buckets = len(state["groups"]) - bucket_sums = np.zeros(new_n_buckets, dtype=self.ga.data.dtype) - new_sizes = np.zeros(new_n_buckets, dtype=np.int32) - np.add.at(bucket_sums, bucket_ids, new_arr) - new_sizes[np.unique(bucket_ids)] = 1 new_groups = np.zeros(new_n_buckets, dtype=bool) if new_n_buckets > old_n_buckets: new_groups[old_n_buckets:] = True - new_values = bucket_sums[new_sizes.astype(bool)] + if state["mode"] == "local": + bucket_sums = np.zeros(new_n_buckets, dtype=self.ga.data.dtype) + new_sizes = np.zeros(new_n_buckets, dtype=np.int32) + np.add.at(bucket_sums, bucket_ids, new_arr) + new_sizes[np.unique(bucket_ids)] = 1 + new_values = bucket_sums[new_sizes.astype(bool)] + else: + # Append each series value individually; sort by bucket_id so + # new_values is laid out bucket-by-bucket for append_several. + sort_order = np.argsort(bucket_ids, kind="stable") + new_values = new_arr[sort_order] + new_sizes = np.zeros(new_n_buckets, dtype=np.int32) + np.add.at(new_sizes, bucket_ids[sort_order], 1) state["ga"] = state["ga"].append_several( new_sizes=new_sizes, new_values=new_values, @@ -761,14 +770,33 @@ def _fit( mode, group_cols, partition_cols ) bucket_df, groups = self._add_bucket_id(bucket_df, key_cols) + if mode != "local": + # Timestamps can repeat within a bucket (multiple series at same ds), + # so use a sequential position per bucket as the time dimension. + if isinstance(bucket_df, pd.DataFrame): + bucket_df = bucket_df.copy() + bucket_df["_bucket_pos"] = ( + bucket_df.groupby("_bucket_id", sort=False) + .cumcount() + .astype(np.int64) + ) + else: + bucket_df = bucket_df.with_columns( + pl.int_range(pl.len()).over("_bucket_id").alias("_bucket_pos") + ) + proc_time_col = "_bucket_pos" + join_cols = [self.id_col, time_col] + else: + proc_time_col = time_col + join_cols = key_cols + [time_col] if isinstance(bucket_df, pd.DataFrame): - process_df = bucket_df[["_bucket_id", time_col, target_col]] + process_df = bucket_df[["_bucket_id", proc_time_col, target_col]] else: - process_df = bucket_df.select(["_bucket_id", time_col, target_col]) + process_df = bucket_df.select(["_bucket_id", proc_time_col, target_col]) processed = ufp.process_df( process_df, id_col="_bucket_id", - time_col=time_col, + time_col=proc_time_col, target_col=target_col, ) if processed.sort_idxs is not None: @@ -779,6 +807,7 @@ def _fit( "group_cols": list(group_cols), "partition_cols": list(partition_cols), "key_cols": key_cols, + "join_cols": join_cols, "tfms": tfms, "ga": GroupedArray(processed.data[:, 0], processed.indptr), "df": bucket_df, @@ -979,26 +1008,68 @@ def _transform( if self._partition_states: for state in self._partition_states.values(): bucket_df = state["df"] - bucket_vals = state["ga"].apply_transforms( - transforms=state["tfms"], updates_only=False - ) - key_cols = state["key_cols"] + mode = state["mode"] + tfms = state["tfms"] + bucket_vals: Dict[str, np.ndarray] = {} + if mode != "local": + # Dispatch each transform via _compute_bucket_feature. + # Transforms that implement that method (e.g. _RollingBase) return + # their own RANGE-based feature array directly. Transforms that + # return None fall back to position-based GroupedArray computation + # followed by same-timestamp correction: all observations sharing + # (bucket_id, ts) receive the first-position value, which was + # computed from strictly earlier timestamps only. + bid_arr = bucket_df["_bucket_id"].to_numpy() + ts_arr = bucket_df[self.time_col].to_numpy() + y_arr = bucket_df[self.target_col].to_numpy().astype(float) + fallback_tfms = {} + for name, tfm in tfms.items(): + if isinstance(tfm, _BaseLagTransform): + computed = tfm._compute_bucket_feature(bid_arr, ts_arr, y_arr) + if computed is not None: + bucket_vals[name] = computed + continue + fallback_tfms[name] = tfm + if fallback_tfms: + pos_vals = state["ga"].apply_transforms( + transforms=fallback_tfms, updates_only=False + ) + for name in list(pos_vals): + vals = pos_vals[name].copy() + i, n_rows = 0, len(vals) + while i < n_rows: + j = i + 1 + while ( + j < n_rows + and bid_arr[j] == bid_arr[i] + and ts_arr[j] == ts_arr[i] + ): + j += 1 + if j > i + 1: + vals[i + 1 : j] = vals[i] + i = j + bucket_vals[name] = vals + else: + bucket_vals = state["ga"].apply_transforms( + transforms=tfms, updates_only=False + ) + join_cols = state["join_cols"] feature_cols = list(bucket_vals.keys()) if isinstance(df, pd.DataFrame): - join_df = bucket_df[key_cols + [self.time_col]].copy() + join_df = bucket_df[join_cols].copy() for name, vals in bucket_vals.items(): join_df[name] = vals - joined = df[key_cols + [self.time_col]].merge( - join_df, on=key_cols + [self.time_col], how="left" + joined = df[join_cols].merge( + join_df, on=join_cols, how="left" ) for name in feature_cols: features[name] = joined[name].to_numpy() else: - join_df = bucket_df.select(key_cols + [self.time_col]) + join_df = bucket_df.select(join_cols) for name, vals in bucket_vals.items(): join_df = join_df.with_columns(pl.Series(name=name, values=vals)) - joined = df.select(key_cols + [self.time_col]).join( - join_df, on=key_cols + [self.time_col], how="left" + joined = df.select(join_cols).join( + join_df, on=join_cols, how="left" ) for name in feature_cols: features[name] = joined[name].to_numpy() @@ -2021,7 +2092,7 @@ def _attach_group_id(data, groups, cols): bucket_df = bucket_df.with_columns( pl.Series(name="_bucket_id", values=bucket_ids) ) - bucket_df = ufp.sort(bucket_df, by=["_bucket_id", self.time_col]) + bucket_df = ufp.sort(bucket_df, by=["_bucket_id", self.time_col, self.id_col]) id_counts = ufp.counts_by_id(bucket_df, "_bucket_id") if isinstance(state["groups"], pd.DataFrame): all_ids = state["groups"][["_bucket_id"]].copy() diff --git a/mlforecast/lag_transforms.py b/mlforecast/lag_transforms.py index 10798213..febc97b1 100644 --- a/mlforecast/lag_transforms.py +++ b/mlforecast/lag_transforms.py @@ -92,6 +92,36 @@ def _get_name(self, lag: int) -> str: result += "_" + "_".join(changed_params) return result + def _compute_bucket_feature( + self, + bid_arr: np.ndarray, # noqa: ARG002 + ts_arr: np.ndarray, # noqa: ARG002 + y_arr: np.ndarray, # noqa: ARG002 + ) -> Optional[np.ndarray]: + """Compute the feature for a non-local (group/global) partition bucket. + + Called during ``_transform`` when ``partition_by`` transforms run in group + or global mode. ``bid_arr``, ``ts_arr``, ``y_arr`` are aligned numpy arrays + over the sorted bucket DataFrame (ordered by bucket_id, timestamp, id). + + Returns a feature array of length ``len(bid_arr)``, or ``None`` to fall back + to the default: position-based GroupedArray transform followed by + same-timestamp correction (all observations sharing a ``(bucket_id, ts)`` + receive the feature value of the first observation in that group, which was + computed from strictly earlier timestamps only). + + The default ``None`` is correct for unbounded (expanding) transforms because + position-based expanding over observations sorted by timestamp is equivalent + to a timestamp-based expanding window. It is also used as a temporary + fallback for transforms whose RANGE semantics have not yet been implemented + (e.g. :class:`_Seasonal_RollingBase`, :class:`ExponentiallyWeightedMean`). + + Subclasses override this method when the position-based fallback produces + semantically wrong results — primarily bounded rolling windows + (:class:`_RollingBase`) where multiple series can share a timestamp. + """ + return None + def transform(self, ga: CoreGroupedArray) -> np.ndarray: return self._core_tfm.transform(ga) @@ -187,16 +217,112 @@ def update_samples(self) -> int: return self._lag + self.window_size -class RollingMean(_RollingBase): ... + def _compute_bucket_feature( + self, + bid_arr: np.ndarray, + ts_arr: np.ndarray, + y_arr: np.ndarray, + ) -> np.ndarray: + """RANGE-based rolling for non-local partition modes. + + For each row at ``(bucket_id, ds=T)`` collects all observations in the same + bucket with ``ts ∈ [T - lag - window_size + 1, T - lag]`` and applies + :meth:`_window_stat`. This matches SQL + ``RANGE BETWEEN (lag + window_size - 1) PRECEDING AND lag PRECEDING``. + + The default loop is O(n × w). Subclasses may override this method with a + more efficient algorithm when the statistic supports it (e.g. ``RollingMean`` + uses cumulative sums for O(n log n) performance). + """ + lag = self._core_tfm.lag + w = self.window_size + min_samples = self.min_samples if self.min_samples is not None else w + n = len(bid_arr) + result = np.empty(n) + result[:] = np.nan + for bid in np.unique(bid_arr): + idxs = np.where(bid_arr == bid)[0] + ts_b = ts_arr[idxs] + y_b = y_arr[idxs] + unique_ts, inv = np.unique(ts_b, return_inverse=True) + feat_u = np.full(len(unique_ts), np.nan) + for k, T in enumerate(unique_ts): + lower, upper = T - lag - w + 1, T - lag + mask = (ts_b >= lower) & (ts_b <= upper) + vals = y_b[mask] + if len(vals) >= min_samples: + feat_u[k] = self._window_stat(vals) + result[idxs] = feat_u[inv] + return result + def _window_stat(self, vals: np.ndarray) -> float: + """Compute the statistic over ``vals``, the individual observations in the window. -class RollingStd(_RollingBase): ... + Subclasses must implement this; it is called by :meth:`_compute_bucket_feature`. + """ + raise NotImplementedError( + f"{self.__class__.__name__} must implement `_window_stat` to support " + "RANGE-based rolling in non-local partition modes." + ) -class RollingMin(_RollingBase): ... +class RollingMean(_RollingBase): + def _compute_bucket_feature( + self, + bid_arr: np.ndarray, + ts_arr: np.ndarray, + y_arr: np.ndarray, + ) -> np.ndarray: + """O(m log m) override using cumulative per-bucket sums, where m = unique timestamps. + + Computes each feature value once per unique timestamp in the bucket, then + broadcasts back to all rows sharing that timestamp via the ``inv`` index + from ``np.unique``. + """ + lag = self._core_tfm.lag + w = self.window_size + min_samples = self.min_samples if self.min_samples is not None else w + n = len(bid_arr) + result = np.empty(n) + result[:] = np.nan + for bid in np.unique(bid_arr): + idxs = np.where(bid_arr == bid)[0] + ts = ts_arr[idxs] + y = y_arr[idxs] + unique_ts, inv, counts = np.unique( + ts, return_inverse=True, return_counts=True + ) + ts_sums = np.bincount(inv, weights=y, minlength=len(unique_ts)) + cum_sum = np.cumsum(ts_sums) + cum_cnt = np.cumsum(counts).astype(float) + upper_ts_u = unique_ts - lag + lower_ts_u = unique_ts - lag - w + upper_idxs = np.searchsorted(unique_ts, upper_ts_u, side="right") - 1 + lower_idxs = np.searchsorted(unique_ts, lower_ts_u, side="right") - 1 + upper_sum = np.where(upper_idxs >= 0, cum_sum[upper_idxs], 0.0) + upper_cnt = np.where(upper_idxs >= 0, cum_cnt[upper_idxs], 0.0) + lower_sum = np.where(lower_idxs >= 0, cum_sum[lower_idxs], 0.0) + lower_cnt = np.where(lower_idxs >= 0, cum_cnt[lower_idxs], 0.0) + win_sum = upper_sum - lower_sum + win_cnt = upper_cnt - lower_cnt + feat_u = np.where(win_cnt >= min_samples, win_sum / win_cnt, np.nan) + result[idxs] = feat_u[inv] + return result + +class RollingStd(_RollingBase): + def _window_stat(self, vals: np.ndarray) -> float: + return float(np.std(vals, ddof=1)) if len(vals) > 1 else np.nan -class RollingMax(_RollingBase): ... + +class RollingMin(_RollingBase): + def _window_stat(self, vals: np.ndarray) -> float: + return float(np.min(vals)) + + +class RollingMax(_RollingBase): + def _window_stat(self, vals: np.ndarray) -> float: + return float(np.max(vals)) class RollingQuantile(_RollingBase): @@ -229,6 +355,9 @@ def _set_core_tfm(self, lag: int): ) return self + def _window_stat(self, vals: np.ndarray) -> float: + return float(np.quantile(vals, self.p)) + class _Seasonal_RollingBase(_BaseLagTransform): """Rolling statistic over seasonal periods""" diff --git a/tests/test_core.py b/tests/test_core.py index 11310e9d..4443c3a0 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -59,7 +59,13 @@ def _make_partition_df(engine, include_brand=False): "promo": [True, True, False, True, False, True, False, True], } if include_brand: - data["brand"] = ["x"] * 8 + # Series c (brand=y) is added alongside b so brand=y has two series, + # making group tests actually aggregate across series within the same brand. + data["unique_id"] = data["unique_id"] + ["c", "c", "c", "c"] + data["ds"] = data["ds"] + [1, 2, 3, 4] + data["y"] = data["y"] + [5, 15, 25, 35] + data["promo"] = data["promo"] + [True, True, True, False] + data["brand"] = ["x", "x", "x", "x", "y", "y", "y", "y", "y", "y", "y", "y"] if engine == "polars": return pl.DataFrame(data).with_columns(pl.col("unique_id").cast(pl.Categorical)) return pd.DataFrame(data) @@ -72,7 +78,10 @@ def _make_partition_future_df(engine, include_brand=False): "promo": [False, True, True, False], } if include_brand: - data["brand"] = ["x"] * 4 + data["unique_id"] = data["unique_id"] + ["c", "c"] + data["ds"] = data["ds"] + [5, 6] + data["promo"] = data["promo"] + [False, True] + data["brand"] = ["x", "x", "y", "y", "y", "y"] if engine == "polars": return pl.DataFrame(data).with_columns(pl.col("unique_id").cast(pl.Categorical)) return pd.DataFrame(data) @@ -763,15 +772,30 @@ def test_group_partition_lag_transform(engine): dropna=False, static_features=["brand"], ) + # brand=x (series a only): + # (x,True)=[a@1(1), a@2(2), a@4(4)] (x,False)=[a@3(3)] + # brand=y (series b + c): + # (y,True) sorted by (ds,id): c@1(5), b@2(20), c@2(15), c@3(25), b@4(40) + # (y,False) sorted by (ds,id): b@1(10), b@3(30), c@4(35) + # RANGE-based RollingMean(window=2, lag=1): at ds=T include all obs in bucket + # with ts in [T-2, T-1]. + # (x,True): a@2→[1]→1.0; a@4→range[2,3]→{ds=2:2}→2.0 + # (y,True): b@2,c@2→range[0,1]→{ds=1:5}→5.0; c@3→range[1,2]→{5,20,15}→40/3 + # b@4→range[2,3]→{ds=2:20,15;ds=3:25}→20.0 + # (y,False): b@3→range[1,2]→{ds=1:10}→10.0; c@4→range[2,3]→{ds=3:30}→30.0 expected_by_key = { ("a", 1): np.nan, ("a", 2): 1.0, - ("a", 3): 10.0, - ("a", 4): 11.5, + ("a", 3): np.nan, + ("a", 4): 2.0, # (x,True) range[2,3]: only a@ds=2=2 → mean=2.0 ("b", 1): np.nan, - ("b", 2): 1.0, + ("b", 2): 5.0, # (y,True) range[0,1]: c@ds=1=5 → mean=5.0 ("b", 3): 10.0, - ("b", 4): 11.5, + ("b", 4): 20.0, # (y,True) range[2,3]: {20,15,25} → mean=60/3=20.0 + ("c", 1): np.nan, + ("c", 2): 5.0, # same (bucket,ds) as b@2 → same feature=5.0 + ("c", 3): 40/3, # (y,True) range[1,2]: {5,20,15} → mean=40/3 + ("c", 4): 30.0, # (y,False) range[2,3]: only b@ds=3=30 → mean=30.0 } if engine == "polars": expected = np.array( @@ -802,15 +826,22 @@ def test_global_partition_lag_transform(engine): target_col="y", dropna=False, ) + # Global mode: individual obs sorted by (ds, id); RANGE PRECEDING correction applied + # so same-timestamp obs share the same feature value (all see strictly prior data). + # (True) sorted: a@ds=1(1), a@ds=2(2), b@ds=2(20), a@ds=4(4), b@ds=4(40) + # (False) sorted: b@ds=1(10), a@ds=3(3), b@ds=3(30) + # RollingMean(2,1) sequential then corrected for ties: + # (True): [NaN, 1.0, 1.0, 11.0, 11.0] ← ds=2 pair both 1.0; ds=4 pair both 11.0 + # (False): [NaN, 10.0, 10.0] ← ds=3 pair both 10.0 expected_by_key = { ("a", 1): np.nan, ("a", 2): 1.0, ("a", 3): 10.0, - ("a", 4): 11.5, + ("a", 4): 11.0, # (True) pos=3: rolling_mean([2, 20]) = 11.0 ("b", 1): np.nan, - ("b", 2): 1.0, - ("b", 3): 10.0, - ("b", 4): 11.5, + ("b", 2): 1.0, # same ds=2 group as a: both see only a@ds=1=1 as prior → 1.0 + ("b", 3): 10.0, # same ds=3 group as a: both see only b@ds=1=10 as prior → 10.0 + ("b", 4): 11.0, # same ds=4 group as a: both see [1,2,20] as prior → 11.0 } if engine == "polars": expected = np.array( @@ -889,7 +920,19 @@ def test_partition_combine_lag_transform_predict(engine): @pytest.mark.parametrize("engine", ["pandas", "polars"]) def test_group_partition_lag_transform_predict(engine): df = _make_partition_df(engine, include_brand=True) - future = _make_partition_future_df(engine) + # future must include all 3 series (a, b, c); brand is static so not required here + if engine == "polars": + future = pl.DataFrame({ + "unique_id": ["a", "a", "b", "b", "c", "c"], + "ds": [5, 6, 5, 6, 5, 6], + "promo": [False, True, True, False, False, True], + }).with_columns(pl.col("unique_id").cast(pl.Categorical)) + else: + future = pd.DataFrame({ + "unique_id": ["a", "a", "b", "b", "c", "c"], + "ds": [5, 6, 5, 6, 5, 6], + "promo": [False, True, True, False, False, True], + }) tfm = ExpandingMean(groupby=["brand"], partition_by=["promo"]) ts = TimeSeries(freq=1, lag_transforms={1: [tfm]}) ts.fit_transform( @@ -904,14 +947,21 @@ def test_group_partition_lag_transform_predict(engine): ts.predict({"y": A()}, 2, X_df=future, before_predict_callback=feats) features = _maybe_to_pandas(feats.get_features(with_step=True)) col = tfm._get_name(1) + # brand=x (only a): (x,False)=[3] → NaN (single element, coreforecast returns NaN) + # brand=y (b + c): (y,True)=[5,20,15,25,40] → 21.0; (y,False)=[10,30,35] → 25.0 + # Step 0: a@promo=False, b@promo=True, c@promo=False np.testing.assert_allclose( features.loc[features["step"].eq(0), col].to_numpy(), - np.array([21.5, 22.333333333333332]), + np.array([np.nan, 21.0, 25.0]), equal_nan=True, ) + # After step 0: a→(x,False), b→(y,True), c→(y,False) each append pred=0 + # Step 1: a@promo=True→(x,True)=[1,2,4]→7/3 + # b@promo=False→(y,False)=[10,30,35,0]→18.75 + # c@promo=True→(y,True)=[5,20,15,25,40,0]→17.5 np.testing.assert_allclose( features.loc[features["step"].eq(1), col].to_numpy(), - np.array([16.75, 14.333333333333334]), + np.array([7 / 3, 18.75, 17.5]), equal_nan=True, ) @@ -934,14 +984,18 @@ def test_global_partition_lag_transform_predict(engine): ts.predict({"y": A()}, 2, X_df=future, before_predict_callback=feats) features = _maybe_to_pandas(feats.get_features(with_step=True)) col = tfm._get_name(1) + # (True) bucket=[1,2,20,4,40], (False) bucket=[10,3,30] + # Step 0: a@promo=False → update=mean([10,3,30])=43/3; b@promo=True → update=mean([1,2,20,4,40])=67/5 np.testing.assert_allclose( features.loc[features["step"].eq(0), col].to_numpy(), - np.array([21.5, 22.333333333333332]), + np.array([43 / 3, 67 / 5]), equal_nan=True, ) + # After step 0: a(promo=F,pred=0) appends to (False)→[10,3,30,0]; b(promo=T,pred=0)→(True)→[1,2,20,4,40,0] + # Step 1: a@promo=True → update=mean([1,2,20,4,40,0])=67/6; b@promo=False → update=mean([10,3,30,0])=43/4 np.testing.assert_allclose( features.loc[features["step"].eq(1), col].to_numpy(), - np.array([16.75, 14.333333333333334]), + np.array([67 / 6, 43 / 4]), equal_nan=True, ) @@ -1078,14 +1132,24 @@ def test_partition_lag_transform_update(engine): ("tfm", "fit_kwargs", "expected"), [ ( + # brand=x (a only): (x,True)=[1,2,4] → update=7/3 + # brand=y (b+c): update_df has b@promo=T, c@promo=T → (y,True) gets [50,100] + # (y,False)=[10,30,35] unchanged (no promo=F updates from b or c) + # At step 6: a@promo=True → (x,True)=[1,2,4] → 7/3 + # b@promo=False → (y,False)=[10,30,35] → mean=25.0 + # c@promo=False → (y,False)=[10,30,35] → mean=25.0 ExpandingMean(groupby=["brand"], partition_by=["promo"]), {"static_features": ["brand"]}, - np.array([29.25, 16.0]), + np.array([7 / 3, 25.0, 25.0]), ), ( + # Individual obs: (True)=[1,2,20,4,40] + b's update(50) → [1,2,20,4,40,50] + # (False)=[10,3,30] + a's update(5) → [10,3,30,5] + # At step 6: a@promo=True → mean([1,2,20,4,40,50])=117/6=19.5 + # b@promo=False → mean([10,3,30,5])=48/4=12.0 ExpandingMean(global_=True, partition_by=["promo"]), {"static_features": []}, - np.array([29.25, 16.0]), + np.array([19.5, 12.0]), ), ], ) @@ -1093,41 +1157,75 @@ def test_aggregated_partition_lag_transform_update(engine, tfm, fit_kwargs, expe include_brand = getattr(tfm, "groupby", None) is not None df = _make_partition_df(engine, include_brand=include_brand) if engine == "polars": - update_data = { - "unique_id": ["a", "b"], - "ds": [5, 5], - "y": [5, 50], - "promo": [False, True], - } if include_brand: - update_data["brand"] = ["x", "x"] + # Model has 3 series (a, b, c); group transforms require all to be updated + update_data = { + "unique_id": ["a", "b", "c"], + "ds": [5, 5, 5], + "y": [5, 50, 100], + "promo": [False, True, True], + "brand": ["x", "y", "y"], + } + else: + update_data = { + "unique_id": ["a", "b"], + "ds": [5, 5], + "y": [5, 50], + "promo": [False, True], + } update_df = pl.DataFrame(update_data).with_columns( pl.col("unique_id").cast(pl.Categorical) ) - step_df = pl.DataFrame( - { - "unique_id": ["a", "b"], - "ds": [6, 6], - "promo": [True, False], - } - ).with_columns(pl.col("unique_id").cast(pl.Categorical)) + if include_brand: + # c queries (y,False) at step 6 — same bucket as b, expected value = 25.0 + step_df = pl.DataFrame( + { + "unique_id": ["a", "b", "c"], + "ds": [6, 6, 6], + "promo": [True, False, False], + } + ).with_columns(pl.col("unique_id").cast(pl.Categorical)) + else: + step_df = pl.DataFrame( + { + "unique_id": ["a", "b"], + "ds": [6, 6], + "promo": [True, False], + } + ).with_columns(pl.col("unique_id").cast(pl.Categorical)) else: - update_data = { - "unique_id": ["a", "b"], - "ds": [5, 5], - "y": [5, 50], - "promo": [False, True], - } if include_brand: - update_data["brand"] = ["x", "x"] - update_df = pd.DataFrame(update_data) - step_df = pd.DataFrame( - { + update_data = { + "unique_id": ["a", "b", "c"], + "ds": [5, 5, 5], + "y": [5, 50, 100], + "promo": [False, True, True], + "brand": ["x", "y", "y"], + } + else: + update_data = { "unique_id": ["a", "b"], - "ds": [6, 6], - "promo": [True, False], + "ds": [5, 5], + "y": [5, 50], + "promo": [False, True], } - ) + update_df = pd.DataFrame(update_data) + if include_brand: + step_df = pd.DataFrame( + { + "unique_id": ["a", "b", "c"], + "ds": [6, 6, 6], + "promo": [True, False, False], + } + ) + else: + step_df = pd.DataFrame( + { + "unique_id": ["a", "b"], + "ds": [6, 6], + "promo": [True, False], + } + ) ts = TimeSeries(freq=1, lag_transforms={1: [tfm]}) ts.fit_transform( df,