From dc5641b68b8f365f9d930d4febaefbdba4cd7b91 Mon Sep 17 00:00:00 2001 From: TomeHirata Date: Wed, 28 May 2025 19:40:19 +0900 Subject: [PATCH 1/3] add estimator for CAR Signed-off-by: TomeHirata --- dte_adj/__init__.py | 487 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 464 insertions(+), 23 deletions(-) diff --git a/dte_adj/__init__.py b/dte_adj/__init__.py index f6e3c02..e1a92bf 100644 --- a/dte_adj/__init__.py +++ b/dte_adj/__init__.py @@ -5,14 +5,15 @@ from abc import ABC from .util import compute_confidence_intervals -__all__ = ["SimpleDistributionEstimator", "AdjustedDistributionEstimator"] +__all__ = ["SimpleDistributionEstimator", "AdjustedDistributionEstimator", "SimpleStratifiedDistributionEstimator", "AdjustedStratifiedDistributionEstimator"] class DistributionEstimatorBase(ABC): """A mixin including several convenience functions to compute and display distribution functions.""" def __init__(self): - """Initializes the DistributionFunctionMixin. + """ + Initializes the DistributionFunctionMixin. Returns: DistributionFunctionMixin: An instance of the estimator. @@ -30,7 +31,8 @@ def predict_dte( variance_type="moment", n_bootstrap=500, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: - """Compute DTE based on the estimator for the distribution function. + """ + Compute DTE based on the estimator for the distribution function. Args: target_treatment_arm (int): The index of the treatment arm of the treatment group. @@ -63,8 +65,10 @@ def predict_pte( locations: np.ndarray, alpha: float = 0.05, variance_type="moment", + n_bootstrap=500, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: - """Compute PTE based on the estimator for the distribution function. + """ + Compute PTE based on the estimator for the distribution function. Args: target_treatment_arm (int): The index of the treatment arm of the treatment group. @@ -87,6 +91,7 @@ def predict_pte( width, alpha, variance_type, + n_bootstrap, ) def predict_qte( @@ -99,7 +104,8 @@ def predict_qte( alpha: float = 0.05, n_bootstrap=500, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: - """Compute QTE based on the estimator for the distribution function. + """ + Compute QTE based on the estimator for the distribution function. Args: target_treatment_arm (int): The index of the treatment arm of the treatment group. @@ -154,14 +160,14 @@ def _compute_dtes( n_bootstrap: int, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Compute expected DTEs.""" - treatment_cdf, treatment_cdf_mat = self._compute_cumulative_distribution( + treatment_cdf, treatment_cdf_mat, _ = self._compute_cumulative_distribution( target_treatment_arm, locations, self.confoundings, self.treatment_arms, self.outcomes, ) - control_cdf, control_cdf_mat = self._compute_cumulative_distribution( + control_cdf, control_cdf_mat, _ = self._compute_cumulative_distribution( control_treatment_arm, locations, self.confoundings, @@ -203,9 +209,10 @@ def _compute_ptes( width: float, alpha: float, variance_type: str, + n_bootstrap: int, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Compute expected PTEs.""" - treatment_cumulative_pre, treatment_cdf_mat_pre = ( + treatment_cumulative_pre, treatment_cdf_mat_pre, _ = ( self._compute_cumulative_distribution( target_treatment_arm, locations, @@ -214,7 +221,7 @@ def _compute_ptes( self.outcomes, ) ) - treatment_cumulative_post, treatment_cdf_mat_post = ( + treatment_cumulative_post, treatment_cdf_mat_post, _ = ( self._compute_cumulative_distribution( target_treatment_arm, locations + width, @@ -224,7 +231,7 @@ def _compute_ptes( ) ) treatment_pdf = treatment_cumulative_post - treatment_cumulative_pre - control_cumulative_pre, control_cdf_mat_pre = ( + control_cumulative_pre, control_cdf_mat_pre, _ = ( self._compute_cumulative_distribution( control_treatment_arm, locations, @@ -233,7 +240,7 @@ def _compute_ptes( self.outcomes, ) ) - control_cumulative_post, control_cdf_mat_post = ( + control_cumulative_post, control_cdf_mat_post, _ = ( self._compute_cumulative_distribution( control_treatment_arm, locations + width, @@ -265,6 +272,7 @@ def _compute_ptes( ind_control=control_treatment_arm, alpha=alpha, variance_type=variance_type, + n_bootstrap=n_bootstrap, ) return ( @@ -315,7 +323,8 @@ def find_quantile(quantile, arm): def fit( self, confoundings: np.ndarray, treatment_arms: np.ndarray, outcomes: np.ndarray ) -> "DistributionEstimatorBase": - """Train the DistributionEstimatorBase. + """ + Train the DistributionEstimatorBase. Args: confoundings (np.ndarray): Pre-treatment covariates. @@ -340,7 +349,8 @@ def fit( return self def predict(self, treatment_arm: int, locations: np.ndarray) -> np.ndarray: - """Compute cumulative distribution values. + """ + Compute cumulative distribution values. Args: treatment_arm (int): The index of the treatment arm. @@ -374,13 +384,26 @@ def _compute_cumulative_distribution( confoundings: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, - ) -> np.ndarray: - """Compute the cumulative distribution values.""" + ) -> Tuple[np.ndarray, np.ndarray]: + """ + Compute the cumulative distribution values. + + Args: + target_treatment_arm (int): The index of the treatment arm. + locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. + confoundings: (np.ndarray): An array of confounding variables in the observed data. + treatment_arms (np.ndarray): An array of treatment arms in the observed data. + outcomes (np.ndarray): An array of outcomes in the observed data. + + Returns: + Tuple[np.ndarray, np.ndarray]: Estimated cumulative distribution values and cumulative distribution for each observation. + """ raise NotImplementedError() class SimpleDistributionEstimator(DistributionEstimatorBase): - """A class for computing the empirical distribution function and the distributional parameters + """ + A class for computing the empirical distribution function and the distributional parameters based on the distribution function. """ @@ -399,7 +422,7 @@ def _compute_cumulative_distribution( confoundings: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, - ) -> np.ndarray: + ) -> Tuple[np.ndarray, np.ndarray]: """Compute the cumulative distribution values. Args: @@ -410,7 +433,7 @@ def _compute_cumulative_distribution( outcomes (np.ndarray): An array of outcomes in the observed data. Returns: - np.ndarray: Estimated cumulative distribution values. + Tuple[np.ndarray, np.ndarray]: Estimated cumulative distribution values and cumulative distribution for each observation. """ unique_treatment_arm = np.unique(treatment_arms) d_confounding = {} @@ -428,7 +451,7 @@ def _compute_cumulative_distribution( cumulative_distribution[i] = ( np.searchsorted(d_outcome[target_treatment_arm], outcome, side="right") ) / len(d_outcome[target_treatment_arm]) - return cumulative_distribution, np.zeros((n_obs, n_loc)) + return cumulative_distribution, np.repeat(cumulative_distribution.reshape(1, -1), n_obs, axis=0) class AdjustedDistributionEstimator(DistributionEstimatorBase): @@ -463,18 +486,18 @@ def _compute_cumulative_distribution( confoundings: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, - ) -> np.ndarray: + ) -> Tuple[np.ndarray, np.ndarray]: """Compute the cumulative distribution values. Args: target_treatment_arm (int): The index of the treatment arm. locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. confoundings: (np.ndarray): An array of confounding variables in the observed data. - treatment_arm (np.ndarray): An array of treatment arms in the observed data. - outcomes (np.ndarray): An array of outcomes in the observed data + treatment_arms (np.ndarray): An array of treatment arms in the observed data. + outcomes (np.ndarray): An array of outcomes in the observed data. Returns: - np.ndarray: Estimated cumulative distribution values. + Tuple[np.ndarray, np.ndarray]: Estimated cumulative distribution values and cumulative distribution for each observation. """ n_records = outcomes.shape[0] n_loc = locations.shape[0] @@ -555,3 +578,421 @@ def _compute_model_prediction(self, model, confoundings: np.ndarray) -> np.ndarr return probabilities[:, 1] else: return model.predict(confoundings) + + +class SimpleStratifiedDistributionEstimator(DistributionEstimatorBase): + """A class is for estimating the empirical distribution function and computing the Distributional parameters for CAR.""" + + def fit( + self, + confoundings: np.ndarray, + treatment_arms: np.ndarray, + outcomes: np.ndarray, + strata: np.ndarray, + ) -> "DistributionEstimatorBase": + """ + Train the DistributionEstimatorBase. + + Args: + confoundings (np.ndarray): Pre-treatment covariates. + treatment_arms (np.ndarray): The index of the treatment arm. + outcomes (np.ndarray): Scalar-valued observed outcome. + + Returns: + DistributionEstimatorBase: The fitted estimator. + """ + if confoundings.shape[0] != treatment_arms.shape[0]: + raise ValueError( + "The shape of confounding and treatment_arm should be same" + ) + + if confoundings.shape[0] != outcomes.shape[0]: + raise ValueError("The shape of confounding and outcome should be same") + + self.confoundings = confoundings + self.treatment_arms = treatment_arms + self.outcomes = outcomes + self.strata = strata + + return self + + def _compute_cumulative_distribution( + self, + target_treatment_arm: int, + locations: np.ndarray, + confoundings: np.ndarray, + treatment_arms: np.ndarray, + outcomes: np.array, + ) -> np.ndarray: + """ + Compute the cumulative distribution values. + + Args: + target_treatment_arm (int): The index of the treatment arm. + locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. + confoundings: (np.ndarray): An array of confounding variables in the observed data. + treatment_arm (np.ndarray): An array of treatment arms in the observed data. + outcomes (np.ndarray): An array of outcomes in the observed data + + Returns: + np.ndarray: Estimated cumulative distribution values. + """ + n_records = outcomes.shape[0] + n_loc = locations.shape[0] + cumulative_distribution = np.zeros(n_loc) + superset_prediction = np.zeros((n_records, n_loc)) + prediction = np.zeros((n_records, n_loc)) + treatment_mask = treatment_arms == target_treatment_arm + confounding_in_arm = confoundings[treatment_mask] + n_records_in_arm = len(confounding_in_arm) + + strata = self.strata + s_list = np.unique(strata) + unique_treatment_arm = np.unique(treatment_arms) + s_dict = {} + for s in s_list: + s_mask = strata == s + s_dict[s] = (s_mask & treatment_mask).sum() / s_mask.sum() + n_obs = outcomes.shape[0] + n_loc = locations.shape[0] + for i, outcome in enumerate(locations): + for j in range(n_obs): + s = strata[j] + prediction[j, i] = ( + (outcomes[j] <= outcome) / s_dict[s] * treatment_mask[j] + ) + + pred = {} + for j in range(n_obs): + s = strata[j] + s_mask = s == strata + if s in pred: + superset_prediction[j] = pred[s] + else: + superset_prediction[j] = prediction[s_mask].mean(axis=0) + pred[s] = superset_prediction[j] + + for i, outcome in enumerate(locations): + for j in range(n_obs): + s = strata[j] + prediction[j, i] = ( + (outcomes[j] <= outcome) - superset_prediction[j, i] + ) / s_dict[s] * treatment_mask[j] + superset_prediction[j, i] + + return prediction.mean(axis=0), prediction, superset_prediction + + def _compute_interval_probability( + self, + target_treatment_arm: int, + locations: np.ndarray, + confoundings: np.ndarray, + treatment_arms: np.ndarray, + outcomes: np.array, + ) -> np.ndarray: + """Compute the cumulative distribution values. + + Args: + target_treatment_arm (int): The index of the treatment arm. + locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. + confoundings: (np.ndarray): An array of confounding variables in the observed data. + treatment_arm (np.ndarray): An array of treatment arms in the observed data. + outcomes (np.ndarray): An array of outcomes in the observed data + + Returns: + np.ndarray: Estimated cumulative distribution values. + """ + n_records = outcomes.shape[0] + n_loc = locations.shape[0] + cumulative_distribution = np.zeros(n_loc) + superset_prediction = np.zeros((n_records, n_loc)) + prediction = np.zeros((n_records, n_loc)) + treatment_mask = treatment_arms == target_treatment_arm + confounding_in_arm = confoundings[treatment_mask] + n_records_in_arm = len(confounding_in_arm) + + strata = self.strata + s_list = np.unique(strata) + unique_treatment_arm = np.unique(treatment_arms) + s_dict = {} + for s in s_list: + s_mask = strata == s + s_dict[s] = (s_mask & treatment_mask).sum() / s_mask.sum() + n_obs = outcomes.shape[0] + n_loc = locations.shape[0] + for i, outcome in enumerate(locations): + for j in range(n_obs): + s = strata[j] + prediction[j, i] = ( + (outcomes[j] <= outcome) / s_dict[s] * treatment_mask[j] + ) + + for j in range(n_obs): + s = strata[j] + s_mask = s == strata + superset_prediction[j] = prediction[s_mask].mean(axis=0) + + for i, outcome in enumerate(locations): + for j in range(n_obs): + s = strata[j] + prediction[j, i] = ( + (outcomes[j] <= outcome) - superset_prediction[j, i] + ) / s_dict[s] * treatment_mask[j] + superset_prediction[j, i] + return prediction.mean(axis=0), superset_prediction + cdf = prediction.mean(axis=0) + return cdf[1:] - cdf[:-1], prediction[:,1:] - prediction[:,:-1], superset_prediction[:,1:] - superset_prediction[:,:-1] + + +class AdjustedStratifiedDistributionEstimator(DistributionEstimatorBase): + """A class is for estimating the adjusted distribution function and computing the Distributional parameters for CAR.""" + + def __init__(self, base_model, folds=3, is_multi_task=False): + """ + Initializes the AdjustedDistributionEstimator. + + Args: + base_model (scikit-learn estimator): The base model implementing used for conditional distribution function estimators. The model should implement fit(data, targets) and predict_proba(data). + folds (int): The number of folds for cross-fitting. + is_multi_task(bool): Whether to use multi-task learning. If True, your base model needs to support multi-task prediction (n_samples, n_features) -> (n_samples, n_targets). + + Returns: + AdjustedDistributionEstimator: An instance of the estimator. + """ + if (not hasattr(base_model, "predict")) and ( + not hasattr(base_model, "predict_proba") + ): + raise ValueError( + "Base model should implement either predict_proba or predict" + ) + self.base_model = base_model + self.folds = folds + self.is_multi_task = is_multi_task + super().__init__() + + def fit( + self, + confoundings: np.ndarray, + treatment_arms: np.ndarray, + outcomes: np.ndarray, + strata: np.ndarray, + ) -> "DistributionEstimatorBase": + """ + Train the DistributionEstimatorBase. + + Args: + confoundings (np.ndarray): Pre-treatment covariates. + treatment_arms (np.ndarray): The index of the treatment arm. + outcomes (np.ndarray): Scalar-valued observed outcome. + + Returns: + DistributionEstimatorBase: The fitted estimator. + """ + if confoundings.shape[0] != treatment_arms.shape[0]: + raise ValueError( + "The shape of confounding and treatment_arm should be same" + ) + + if confoundings.shape[0] != outcomes.shape[0]: + raise ValueError("The shape of confounding and outcome should be same") + + self.confoundings = confoundings + self.treatment_arms = treatment_arms + self.outcomes = outcomes + self.strata = strata + + return self + + def _compute_cumulative_distribution( + self, + target_treatment_arm: int, + locations: np.ndarray, + confoundings: np.ndarray, + treatment_arms: np.ndarray, + outcomes: np.array, + ) -> np.ndarray: + """ + Compute the cumulative distribution values. + + Args: + target_treatment_arm (int): The index of the treatment arm. + locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. + confoundings: (np.ndarray): An array of confounding variables in the observed data. + treatment_arm (np.ndarray): An array of treatment arms in the observed data. + outcomes (np.ndarray): An array of outcomes in the observed data + + Returns: + np.ndarray: Estimated cumulative distribution values. + """ + n_records = outcomes.shape[0] + n_loc = locations.shape[0] + cumulative_distribution = np.zeros(n_loc) + superset_prediction = np.zeros((n_records, n_loc)) + prediction = np.zeros((n_records, n_loc)) + treatment_mask = treatment_arms == target_treatment_arm + confounding_in_arm = confoundings[treatment_mask] + n_records_in_arm = len(confounding_in_arm) + folds = np.random.randint(self.folds, size=n_records) + strata = self.strata + s_list = np.unique(strata) + if self.is_multi_task: + subset_prediction = np.zeros( + (n_records_in_arm, n_loc) + ) # (n_records_in_arm, n_loc) + binominal = (outcomes.reshape(-1, 1) <= locations) * 1 # (n_records, n_loc) + cdf = binominal[treatment_mask].mean(axis=0) # (n_loc) + for fold in range(self.folds): + fold_mask = (folds != fold) & treatment_mask + confounding_train = confoundings[fold_mask] + binominal_train = binominal[fold_mask] + if len(np.unique(binominal_train)) > 1: + self.model = deepcopy(self.base_model) + self.model.fit(confounding_train, binominal_train) + for s in s_list: + s_mask = strata == s + weight = (s_mask & treatment_mask).sum() / s_mask.sum() + superset_mask = (folds == fold) & s_mask + subset_test_mask = (folds == fold) & s_mask & treatment_mask + subset_train_mask = (folds != fold) & s_mask & treatment_mask + confounding_train = confoundings[subset_train_mask] + binominal_train = binominal[subset_train_mask] + + pred = self._compute_model_prediction( + self.model, confoundings[superset_mask] + ) + prediction[superset_mask] = ( + pred + + treatment_mask[superset_mask].reshape(-1, 1) + * (binominal[superset_mask] - pred) + / weight + ) + superset_prediction[superset_mask] = pred + else: + for i, location in enumerate(locations): + subset_prediction = np.zeros(n_records_in_arm) + binominal = (outcomes <= location) * 1 # (n_records) + cdf = binominal[treatment_mask].mean() + for fold in range(self.folds): + fold_mask = (folds != fold) & treatment_mask + confounding_train = confoundings[fold_mask] + binominal_train = binominal[fold_mask] + if len(np.unique(binominal_train)) > 1: + self.model = deepcopy(self.base_model) + self.model.fit(confounding_train, binominal_train) + for s in s_list: + s_mask = strata == s + weight = (s_mask & treatment_mask).sum() / s_mask.sum() + superset_mask = (folds == fold) & s_mask + subset_test_mask = (folds == fold) & s_mask & treatment_mask + subset_train_mask = (folds != fold) & s_mask & treatment_mask + confounding_train = confoundings[subset_train_mask] + binominal_train = binominal[subset_train_mask] + if len(np.unique(binominal_train)) == 1: + pred = binominal_train[0] + superset_prediction[superset_mask, i] = pred + prediction[superset_mask, i] = ( + pred + + treatment_mask[superset_mask] + * (binominal[superset_mask] - pred) + / weight + ) + continue + pred = self._compute_model_prediction( + self.model, confoundings[superset_mask] + ) + prediction[superset_mask, i] = ( + pred + + treatment_mask[superset_mask] + * (binominal[superset_mask] - pred) + / weight + ) + superset_prediction[superset_mask, i] = pred + + return prediction.mean(axis=0), prediction, superset_prediction + + + def _compute_interval_probability( + self, + target_treatment_arm: int, + locations: np.ndarray, + confoundings: np.ndarray, + treatment_arms: np.ndarray, + outcomes: np.array, + ) -> np.ndarray: + """ + Compute the cumulative distribution values. + + Args: + target_treatment_arm (int): The index of the treatment arm. + locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. + confoundings: (np.ndarray): An array of confounding variables in the observed data. + treatment_arm (np.ndarray): An array of treatment arms in the observed data. + outcomes (np.ndarray): An array of outcomes in the observed data + + Returns: + np.ndarray: Estimated cumulative distribution values. + """ + n_records = outcomes.shape[0] + n_loc = locations.shape[0] + cumulative_distribution = np.zeros(n_loc-1) + superset_prediction = np.zeros((n_records, n_loc-1)) + prediction = np.zeros((n_records, n_loc-1)) + treatment_mask = treatment_arms == target_treatment_arm + confounding_in_arm = confoundings[treatment_mask] + n_records_in_arm = len(confounding_in_arm) + folds = np.random.randint(self.folds, size=n_records) + strata = self.strata + s_list = np.unique(strata) + binominals = (outcomes[:, np.newaxis] <= locations) * 1 # (n_records, n_loc) + for i in range(len(locations)-1): + subset_prediction = np.zeros(n_records_in_arm) + binominal = binominals[:, i+1]-binominals[:, i] + for fold in range(self.folds): + fold_mask = (folds != fold) & treatment_mask + confounding_train = confoundings[fold_mask] + binominal_train = binominal[fold_mask] + if len(np.unique(binominal_train)) > 1: + self.model = deepcopy(self.base_model) + self.model.fit(confounding_train, binominal_train) + for s in s_list: + s_mask = strata == s + wight = (s_mask & treatment_mask).sum() / s_mask.sum() + superset_mask = (folds == fold) & s_mask + subset_test_mask = (folds == fold) & s_mask & treatment_mask + subset_train_mask = (folds != fold) & s_mask & treatment_mask + confounding_train = confoundings[subset_train_mask] + binominal_train = binominal[subset_train_mask] + if len(np.unique(binominal_train)) == 1: + pred = binominal_train[0] + superset_prediction[superset_mask, i] = pred + prediction[superset_mask, i] = ( + pred + + treatment_mask[superset_mask] + * (binominal[superset_mask] - pred) + / wight + ) + continue + pred = self._compute_model_prediction( + self.model, confoundings[superset_mask] + ) + prediction[superset_mask, i] = ( + pred + + treatment_mask[superset_mask] + * (binominal[superset_mask] - pred) + / wight + ) + superset_prediction[superset_mask, i] = pred + + return prediction.mean(axis=0), prediction, superset_prediction + + def _compute_model_prediction(self, model, confoundings: np.ndarray) -> np.ndarray: + if hasattr(model, "predict_proba"): + if self.is_multi_task: + # suppose the shape of prediction is (n_records, n_locations) + return model.predict_proba(confoundings) + probabilities = model.predict_proba(confoundings) + if probabilities.ndim == 1: + # when the shape of prediction is (n_records) + return probabilities + # when the shape of prediction is (n_records, 2) + return probabilities[:, 1] + else: + return model.predict(confoundings) From 738a3a443671a18700b46b291cf4daac3891428c Mon Sep 17 00:00:00 2001 From: TomeHirata Date: Thu, 29 May 2025 01:50:39 +0900 Subject: [PATCH 2/3] refactor the main classes --- docs/source/conf.py | 2 +- dte_adj/__init__.py | 462 +++++++--------------- dte_adj/util.py | 26 +- pyproject.toml | 14 +- tests/test_adjusted_estimator.py | 30 +- tests/test_distribution_estimator_base.py | 48 +-- tests/test_simple_estimator.py | 20 +- 7 files changed, 225 insertions(+), 377 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index dcba941..6b36da9 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -13,7 +13,7 @@ project = "dte_adj" copyright = "2024, CyberAgent, Inc." author = "CyberAgent, Inc" -release = "0.1.4" +release = "0.1.5" # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration diff --git a/dte_adj/__init__.py b/dte_adj/__init__.py index e1a92bf..0e70ec8 100644 --- a/dte_adj/__init__.py +++ b/dte_adj/__init__.py @@ -1,11 +1,16 @@ import numpy as np -from typing import Tuple +from typing import Tuple, Optional, Any from scipy.stats import norm from copy import deepcopy from abc import ABC from .util import compute_confidence_intervals -__all__ = ["SimpleDistributionEstimator", "AdjustedDistributionEstimator", "SimpleStratifiedDistributionEstimator", "AdjustedStratifiedDistributionEstimator"] +__all__ = [ + "SimpleDistributionEstimator", + "AdjustedDistributionEstimator", + "SimpleStratifiedDistributionEstimator", + "AdjustedStratifiedDistributionEstimator", +] class DistributionEstimatorBase(ABC): @@ -18,7 +23,7 @@ def __init__(self): Returns: DistributionFunctionMixin: An instance of the estimator. """ - self.confoundings = None + self.covariates = None self.outcomes = None self.treatment_arms = None @@ -98,9 +103,7 @@ def predict_qte( self, target_treatment_arm: int, control_treatment_arm: int, - quantiles: np.ndarray = np.array( - [0.1 * i for i in range(1, 10)], dtype=np.float32 - ), + quantiles: Optional[np.ndarray] = None, alpha: float = 0.05, n_bootstrap=500, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: @@ -124,7 +127,7 @@ def predict_qte( target_treatment_arm, control_treatment_arm, quantiles, - self.confoundings, + self.covariates, self.treatment_arms, self.outcomes, ) @@ -138,7 +141,7 @@ def predict_qte( target_treatment_arm, control_treatment_arm, quantiles, - self.confoundings[bootstrap_indexes], + self.covariates[bootstrap_indexes], self.treatment_arms[bootstrap_indexes], self.outcomes[bootstrap_indexes], ) @@ -160,17 +163,17 @@ def _compute_dtes( n_bootstrap: int, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Compute expected DTEs.""" - treatment_cdf, treatment_cdf_mat, _ = self._compute_cumulative_distribution( + treatment_cdf, _, treatment_cdf_mat = self._compute_cumulative_distribution( target_treatment_arm, locations, - self.confoundings, + self.covariates, self.treatment_arms, self.outcomes, ) - control_cdf, control_cdf_mat, _ = self._compute_cumulative_distribution( + control_cdf, _, control_cdf_mat = self._compute_cumulative_distribution( control_treatment_arm, locations, - self.confoundings, + self.covariates, self.treatment_arms, self.outcomes, ) @@ -216,7 +219,7 @@ def _compute_ptes( self._compute_cumulative_distribution( target_treatment_arm, locations, - self.confoundings, + self.covariates, self.treatment_arms, self.outcomes, ) @@ -225,7 +228,7 @@ def _compute_ptes( self._compute_cumulative_distribution( target_treatment_arm, locations + width, - self.confoundings, + self.covariates, self.treatment_arms, self.outcomes, ) @@ -235,7 +238,7 @@ def _compute_ptes( self._compute_cumulative_distribution( control_treatment_arm, locations, - self.confoundings, + self.covariates, self.treatment_arms, self.outcomes, ) @@ -244,7 +247,7 @@ def _compute_ptes( self._compute_cumulative_distribution( control_treatment_arm, locations + width, - self.confoundings, + self.covariates, self.treatment_arms, self.outcomes, ) @@ -286,7 +289,7 @@ def _compute_qtes( target_treatment_arm: int, control_treatment_arm: int, quantiles: np.ndarray, - confoundings: np.ndarray, + covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: @@ -298,10 +301,10 @@ def find_quantile(quantile, arm): result = -1 while low <= high: mid = (low + high) // 2 - val, _ = self._compute_cumulative_distribution( + val, _, _ = self._compute_cumulative_distribution( arm, np.full((1), locations[mid]), - confoundings, + covariates, treatment_arms, outcomes, ) @@ -320,34 +323,6 @@ def find_quantile(quantile, arm): return result - def fit( - self, confoundings: np.ndarray, treatment_arms: np.ndarray, outcomes: np.ndarray - ) -> "DistributionEstimatorBase": - """ - Train the DistributionEstimatorBase. - - Args: - confoundings (np.ndarray): Pre-treatment covariates. - treatment_arms (np.ndarray): The index of the treatment arm. - outcomes (np.ndarray): Scalar-valued observed outcome. - - Returns: - DistributionEstimatorBase: The fitted estimator. - """ - if confoundings.shape[0] != treatment_arms.shape[0]: - raise ValueError( - "The shape of confounding and treatment_arm should be same" - ) - - if confoundings.shape[0] != outcomes.shape[0]: - raise ValueError("The shape of confounding and outcome should be same") - - self.confoundings = confoundings - self.treatment_arms = treatment_arms - self.outcomes = outcomes - - return self - def predict(self, treatment_arm: int, locations: np.ndarray) -> np.ndarray: """ Compute cumulative distribution values. @@ -372,7 +347,7 @@ def predict(self, treatment_arm: int, locations: np.ndarray) -> np.ndarray: return self._compute_cumulative_distribution( treatment_arm, locations, - self.confoundings, + self.covariates, self.treatment_arms, self.outcomes, )[0] @@ -381,211 +356,32 @@ def _compute_cumulative_distribution( self, target_treatment_arm: int, locations: np.ndarray, - confoundings: np.ndarray, + covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, - ) -> Tuple[np.ndarray, np.ndarray]: + ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute the cumulative distribution values. Args: target_treatment_arm (int): The index of the treatment arm. locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. - confoundings: (np.ndarray): An array of confounding variables in the observed data. + covariates: (np.ndarray): An array of covariates variables in the observed data. treatment_arms (np.ndarray): An array of treatment arms in the observed data. outcomes (np.ndarray): An array of outcomes in the observed data. Returns: - Tuple[np.ndarray, np.ndarray]: Estimated cumulative distribution values and cumulative distribution for each observation. + Tuple[np.ndarray, np.ndarray, np.ndarray]: Estimated cumulative distribution values, prediction for each observation, and superset prediction for each observation. """ raise NotImplementedError() -class SimpleDistributionEstimator(DistributionEstimatorBase): - """ - A class for computing the empirical distribution function and the distributional parameters - based on the distribution function. - """ - - def __init__(self): - """Initializes the SimpleDistributionEstimator. - - Returns: - SimpleDistributionEstimator: An instance of the estimator. - """ - super().__init__() - - def _compute_cumulative_distribution( - self, - target_treatment_arm: int, - locations: np.ndarray, - confoundings: np.ndarray, - treatment_arms: np.ndarray, - outcomes: np.array, - ) -> Tuple[np.ndarray, np.ndarray]: - """Compute the cumulative distribution values. - - Args: - target_treatment_arm (int): The index of the treatment arm. - locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. - confoundings: (np.ndarray): An array of confounding variables in the observed data. - treatment_arms (np.ndarray): An array of treatment arms in the observed data. - outcomes (np.ndarray): An array of outcomes in the observed data. - - Returns: - Tuple[np.ndarray, np.ndarray]: Estimated cumulative distribution values and cumulative distribution for each observation. - """ - unique_treatment_arm = np.unique(treatment_arms) - d_confounding = {} - d_outcome = {} - n_obs = outcomes.shape[0] - n_loc = locations.shape[0] - for arm in unique_treatment_arm: - selected_confounding = confoundings[treatment_arms == arm] - selected_outcome = outcomes[treatment_arms == arm] - sorted_indices = np.argsort(selected_outcome) - d_confounding[arm] = selected_confounding[sorted_indices] - d_outcome[arm] = selected_outcome[sorted_indices] - cumulative_distribution = np.zeros(locations.shape) - for i, outcome in enumerate(locations): - cumulative_distribution[i] = ( - np.searchsorted(d_outcome[target_treatment_arm], outcome, side="right") - ) / len(d_outcome[target_treatment_arm]) - return cumulative_distribution, np.repeat(cumulative_distribution.reshape(1, -1), n_obs, axis=0) - - -class AdjustedDistributionEstimator(DistributionEstimatorBase): - """A class is for estimating the adjusted distribution function and computing the Distributional parameters based on the trained conditional estimator.""" - - def __init__(self, base_model, folds=3, is_multi_task=False): - """Initializes the AdjustedDistributionEstimator. - - Args: - base_model (scikit-learn estimator): The base model implementing used for conditional distribution function estimators. The model should implement fit(data, targets) and predict_proba(data). - folds (int): The number of folds for cross-fitting. - is_multi_task(bool): Whether to use multi-task learning. If True, your base model needs to support multi-task prediction (n_samples, n_features) -> (n_samples, n_targets). - - Returns: - AdjustedDistributionEstimator: An instance of the estimator. - """ - if (not hasattr(base_model, "predict")) and ( - not hasattr(base_model, "predict_proba") - ): - raise ValueError( - "Base model should implement either predict_proba or predict" - ) - self.base_model = base_model - self.folds = folds - self.is_multi_task = is_multi_task - super().__init__() - - def _compute_cumulative_distribution( - self, - target_treatment_arm: int, - locations: np.ndarray, - confoundings: np.ndarray, - treatment_arms: np.ndarray, - outcomes: np.array, - ) -> Tuple[np.ndarray, np.ndarray]: - """Compute the cumulative distribution values. - - Args: - target_treatment_arm (int): The index of the treatment arm. - locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. - confoundings: (np.ndarray): An array of confounding variables in the observed data. - treatment_arms (np.ndarray): An array of treatment arms in the observed data. - outcomes (np.ndarray): An array of outcomes in the observed data. - - Returns: - Tuple[np.ndarray, np.ndarray]: Estimated cumulative distribution values and cumulative distribution for each observation. - """ - n_records = outcomes.shape[0] - n_loc = locations.shape[0] - cumulative_distribution = np.zeros(n_loc) - superset_prediction = np.zeros((n_records, n_loc)) - treatment_mask = treatment_arms == target_treatment_arm - confounding_in_arm = confoundings[treatment_mask] - n_records_in_arm = len(confounding_in_arm) - if self.is_multi_task: - subset_prediction = np.zeros( - (n_records_in_arm, n_loc) - ) # (n_records_in_arm, n_loc) - binominal = (outcomes.reshape(-1, 1) <= locations) * 1 # (n_records, n_loc) - cdf = binominal[treatment_mask].mean(axis=0) # (n_loc) - for fold in range(self.folds): - superset_mask = np.arange(n_records) % self.folds == fold - subset_test_mask = superset_mask & treatment_mask - subset_train_mask = (~superset_mask) & treatment_mask - subset_test_mask_inner = superset_mask[treatment_mask] - confounding_train = confoundings[subset_train_mask] - binominal_train = binominal[subset_train_mask] - model = deepcopy(self.base_model) - model.fit(confounding_train, binominal_train) - subset_prediction[subset_test_mask_inner] = ( - self._compute_model_prediction( - model, confoundings[subset_test_mask] - ) - ) - superset_prediction[superset_mask] = self._compute_model_prediction( - model, confoundings[superset_mask] - ) - cumulative_distribution = ( - cdf - subset_prediction.mean(axis=0) + superset_prediction.mean(axis=0) - ) # (n_loc) - else: - for i, location in enumerate(locations): - subset_prediction = np.zeros(n_records_in_arm) - binominal = (outcomes <= location) * 1 # (n_records) - cdf = binominal[treatment_mask].mean() - for fold in range(self.folds): - superset_mask = np.arange(n_records) % self.folds == fold - subset_test_mask = superset_mask & treatment_mask - subset_train_mask = (~superset_mask) & treatment_mask - subset_test_mask_inner = superset_mask[treatment_mask] - confounding_train = confoundings[subset_train_mask] - binominal_train = binominal[subset_train_mask] - if len(np.unique(binominal_train)) == 1: - subset_prediction[subset_test_mask_inner] = binominal_train[0] - superset_prediction[superset_mask, i] = binominal_train[0] - continue - model = deepcopy(self.base_model) - model.fit(confounding_train, binominal_train) - subset_prediction[subset_test_mask_inner] = ( - self._compute_model_prediction( - model, confoundings[subset_test_mask] - ) - ) - superset_prediction[superset_mask, i] = ( - self._compute_model_prediction( - model, confoundings[superset_mask] - ) - ) - cumulative_distribution[i] = ( - cdf - subset_prediction.mean() + superset_prediction[:, i].mean() - ) - return cumulative_distribution, superset_prediction - - def _compute_model_prediction(self, model, confoundings: np.ndarray) -> np.ndarray: - if hasattr(model, "predict_proba"): - if self.is_multi_task: - # suppose the shape of prediction is (n_records, n_locations) - return model.predict_proba(confoundings) - probabilities = model.predict_proba(confoundings) - if probabilities.ndim == 1: - # when the shape of prediction is (n_records) - return probabilities - # when the shape of prediction is (n_records, 2) - return probabilities[:, 1] - else: - return model.predict(confoundings) - - class SimpleStratifiedDistributionEstimator(DistributionEstimatorBase): """A class is for estimating the empirical distribution function and computing the Distributional parameters for CAR.""" - + def fit( self, - confoundings: np.ndarray, + covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.ndarray, strata: np.ndarray, @@ -594,22 +390,20 @@ def fit( Train the DistributionEstimatorBase. Args: - confoundings (np.ndarray): Pre-treatment covariates. + covariates (np.ndarray): Pre-treatment covariates. treatment_arms (np.ndarray): The index of the treatment arm. outcomes (np.ndarray): Scalar-valued observed outcome. Returns: DistributionEstimatorBase: The fitted estimator. """ - if confoundings.shape[0] != treatment_arms.shape[0]: - raise ValueError( - "The shape of confounding and treatment_arm should be same" - ) + if covariates.shape[0] != treatment_arms.shape[0]: + raise ValueError("The shape of covariates and treatment_arm should be same") - if confoundings.shape[0] != outcomes.shape[0]: - raise ValueError("The shape of confounding and outcome should be same") + if covariates.shape[0] != outcomes.shape[0]: + raise ValueError("The shape of covariates and outcome should be same") - self.confoundings = confoundings + self.covariates = covariates self.treatment_arms = treatment_arms self.outcomes = outcomes self.strata = strata @@ -620,7 +414,7 @@ def _compute_cumulative_distribution( self, target_treatment_arm: int, locations: np.ndarray, - confoundings: np.ndarray, + covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, ) -> np.ndarray: @@ -630,7 +424,7 @@ def _compute_cumulative_distribution( Args: target_treatment_arm (int): The index of the treatment arm. locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. - confoundings: (np.ndarray): An array of confounding variables in the observed data. + covariates: (np.ndarray): An array of covariates variables in the observed data. treatment_arm (np.ndarray): An array of treatment arms in the observed data. outcomes (np.ndarray): An array of outcomes in the observed data @@ -639,16 +433,12 @@ def _compute_cumulative_distribution( """ n_records = outcomes.shape[0] n_loc = locations.shape[0] - cumulative_distribution = np.zeros(n_loc) superset_prediction = np.zeros((n_records, n_loc)) prediction = np.zeros((n_records, n_loc)) treatment_mask = treatment_arms == target_treatment_arm - confounding_in_arm = confoundings[treatment_mask] - n_records_in_arm = len(confounding_in_arm) strata = self.strata s_list = np.unique(strata) - unique_treatment_arm = np.unique(treatment_arms) s_dict = {} for s in s_list: s_mask = strata == s @@ -680,12 +470,12 @@ def _compute_cumulative_distribution( ) / s_dict[s] * treatment_mask[j] + superset_prediction[j, i] return prediction.mean(axis=0), prediction, superset_prediction - + def _compute_interval_probability( self, target_treatment_arm: int, locations: np.ndarray, - confoundings: np.ndarray, + covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, ) -> np.ndarray: @@ -694,7 +484,7 @@ def _compute_interval_probability( Args: target_treatment_arm (int): The index of the treatment arm. locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. - confoundings: (np.ndarray): An array of confounding variables in the observed data. + covariates: (np.ndarray): An array of covariates variables in the observed data. treatment_arm (np.ndarray): An array of treatment arms in the observed data. outcomes (np.ndarray): An array of outcomes in the observed data @@ -703,16 +493,12 @@ def _compute_interval_probability( """ n_records = outcomes.shape[0] n_loc = locations.shape[0] - cumulative_distribution = np.zeros(n_loc) superset_prediction = np.zeros((n_records, n_loc)) prediction = np.zeros((n_records, n_loc)) treatment_mask = treatment_arms == target_treatment_arm - confounding_in_arm = confoundings[treatment_mask] - n_records_in_arm = len(confounding_in_arm) strata = self.strata s_list = np.unique(strata) - unique_treatment_arm = np.unique(treatment_arms) s_dict = {} for s in s_list: s_mask = strata == s @@ -739,13 +525,17 @@ def _compute_interval_probability( ) / s_dict[s] * treatment_mask[j] + superset_prediction[j, i] return prediction.mean(axis=0), superset_prediction cdf = prediction.mean(axis=0) - return cdf[1:] - cdf[:-1], prediction[:,1:] - prediction[:,:-1], superset_prediction[:,1:] - superset_prediction[:,:-1] + return ( + cdf[1:] - cdf[:-1], + prediction[:, 1:] - prediction[:, :-1], + superset_prediction[:, 1:] - superset_prediction[:, :-1], + ) class AdjustedStratifiedDistributionEstimator(DistributionEstimatorBase): """A class is for estimating the adjusted distribution function and computing the Distributional parameters for CAR.""" - def __init__(self, base_model, folds=3, is_multi_task=False): + def __init__(self, base_model: Any, folds=3, is_multi_task=False): """ Initializes the AdjustedDistributionEstimator. @@ -770,7 +560,7 @@ def __init__(self, base_model, folds=3, is_multi_task=False): def fit( self, - confoundings: np.ndarray, + covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.ndarray, strata: np.ndarray, @@ -779,22 +569,20 @@ def fit( Train the DistributionEstimatorBase. Args: - confoundings (np.ndarray): Pre-treatment covariates. + covariates (np.ndarray): Pre-treatment covariates. treatment_arms (np.ndarray): The index of the treatment arm. outcomes (np.ndarray): Scalar-valued observed outcome. Returns: DistributionEstimatorBase: The fitted estimator. """ - if confoundings.shape[0] != treatment_arms.shape[0]: - raise ValueError( - "The shape of confounding and treatment_arm should be same" - ) + if covariates.shape[0] != treatment_arms.shape[0]: + raise ValueError("The shape of covariates and treatment_arm should be same") - if confoundings.shape[0] != outcomes.shape[0]: - raise ValueError("The shape of confounding and outcome should be same") + if covariates.shape[0] != outcomes.shape[0]: + raise ValueError("The shape of covariates and outcome should be same") - self.confoundings = confoundings + self.covariates = covariates self.treatment_arms = treatment_arms self.outcomes = outcomes self.strata = strata @@ -805,7 +593,7 @@ def _compute_cumulative_distribution( self, target_treatment_arm: int, locations: np.ndarray, - confoundings: np.ndarray, + covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, ) -> np.ndarray: @@ -815,48 +603,40 @@ def _compute_cumulative_distribution( Args: target_treatment_arm (int): The index of the treatment arm. locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. - confoundings: (np.ndarray): An array of confounding variables in the observed data. + covariates: (np.ndarray): An array of covariates variables in the observed data. treatment_arm (np.ndarray): An array of treatment arms in the observed data. outcomes (np.ndarray): An array of outcomes in the observed data Returns: - np.ndarray: Estimated cumulative distribution values. + Tuple[np.ndarray, np.ndarray, np.ndarray]: Estimated cumulative distribution values, prediction for each observation, and superset prediction for each observation. """ n_records = outcomes.shape[0] n_loc = locations.shape[0] - cumulative_distribution = np.zeros(n_loc) superset_prediction = np.zeros((n_records, n_loc)) prediction = np.zeros((n_records, n_loc)) treatment_mask = treatment_arms == target_treatment_arm - confounding_in_arm = confoundings[treatment_mask] - n_records_in_arm = len(confounding_in_arm) folds = np.random.randint(self.folds, size=n_records) strata = self.strata s_list = np.unique(strata) if self.is_multi_task: - subset_prediction = np.zeros( - (n_records_in_arm, n_loc) - ) # (n_records_in_arm, n_loc) binominal = (outcomes.reshape(-1, 1) <= locations) * 1 # (n_records, n_loc) - cdf = binominal[treatment_mask].mean(axis=0) # (n_loc) for fold in range(self.folds): fold_mask = (folds != fold) & treatment_mask - confounding_train = confoundings[fold_mask] + covariates_train = covariates[fold_mask] binominal_train = binominal[fold_mask] if len(np.unique(binominal_train)) > 1: self.model = deepcopy(self.base_model) - self.model.fit(confounding_train, binominal_train) + self.model.fit(covariates_train, binominal_train) for s in s_list: s_mask = strata == s weight = (s_mask & treatment_mask).sum() / s_mask.sum() superset_mask = (folds == fold) & s_mask - subset_test_mask = (folds == fold) & s_mask & treatment_mask subset_train_mask = (folds != fold) & s_mask & treatment_mask - confounding_train = confoundings[subset_train_mask] + covariates_train = covariates[subset_train_mask] binominal_train = binominal[subset_train_mask] - + pred = self._compute_model_prediction( - self.model, confoundings[superset_mask] + self.model, covariates[superset_mask] ) prediction[superset_mask] = ( pred @@ -867,23 +647,20 @@ def _compute_cumulative_distribution( superset_prediction[superset_mask] = pred else: for i, location in enumerate(locations): - subset_prediction = np.zeros(n_records_in_arm) binominal = (outcomes <= location) * 1 # (n_records) - cdf = binominal[treatment_mask].mean() for fold in range(self.folds): fold_mask = (folds != fold) & treatment_mask - confounding_train = confoundings[fold_mask] + covariates_train = covariates[fold_mask] binominal_train = binominal[fold_mask] + self.model = deepcopy(self.base_model) if len(np.unique(binominal_train)) > 1: - self.model = deepcopy(self.base_model) - self.model.fit(confounding_train, binominal_train) + self.model.fit(covariates_train, binominal_train) for s in s_list: s_mask = strata == s weight = (s_mask & treatment_mask).sum() / s_mask.sum() superset_mask = (folds == fold) & s_mask - subset_test_mask = (folds == fold) & s_mask & treatment_mask subset_train_mask = (folds != fold) & s_mask & treatment_mask - confounding_train = confoundings[subset_train_mask] + covariates_train = covariates[subset_train_mask] binominal_train = binominal[subset_train_mask] if len(np.unique(binominal_train)) == 1: pred = binominal_train[0] @@ -896,7 +673,7 @@ def _compute_cumulative_distribution( ) continue pred = self._compute_model_prediction( - self.model, confoundings[superset_mask] + self.model, covariates[superset_mask] ) prediction[superset_mask, i] = ( pred @@ -908,12 +685,11 @@ def _compute_cumulative_distribution( return prediction.mean(axis=0), prediction, superset_prediction - def _compute_interval_probability( self, target_treatment_arm: int, locations: np.ndarray, - confoundings: np.ndarray, + covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, ) -> np.ndarray: @@ -923,7 +699,7 @@ def _compute_interval_probability( Args: target_treatment_arm (int): The index of the treatment arm. locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. - confoundings: (np.ndarray): An array of confounding variables in the observed data. + covariates: (np.ndarray): An array of covariates variables in the observed data. treatment_arm (np.ndarray): An array of treatment arms in the observed data. outcomes (np.ndarray): An array of outcomes in the observed data @@ -932,33 +708,28 @@ def _compute_interval_probability( """ n_records = outcomes.shape[0] n_loc = locations.shape[0] - cumulative_distribution = np.zeros(n_loc-1) - superset_prediction = np.zeros((n_records, n_loc-1)) - prediction = np.zeros((n_records, n_loc-1)) + superset_prediction = np.zeros((n_records, n_loc - 1)) + prediction = np.zeros((n_records, n_loc - 1)) treatment_mask = treatment_arms == target_treatment_arm - confounding_in_arm = confoundings[treatment_mask] - n_records_in_arm = len(confounding_in_arm) folds = np.random.randint(self.folds, size=n_records) strata = self.strata s_list = np.unique(strata) binominals = (outcomes[:, np.newaxis] <= locations) * 1 # (n_records, n_loc) - for i in range(len(locations)-1): - subset_prediction = np.zeros(n_records_in_arm) - binominal = binominals[:, i+1]-binominals[:, i] + for i in range(len(locations) - 1): + binominal = binominals[:, i + 1] - binominals[:, i] for fold in range(self.folds): fold_mask = (folds != fold) & treatment_mask - confounding_train = confoundings[fold_mask] + covariates_train = covariates[fold_mask] binominal_train = binominal[fold_mask] if len(np.unique(binominal_train)) > 1: self.model = deepcopy(self.base_model) - self.model.fit(confounding_train, binominal_train) + self.model.fit(covariates_train, binominal_train) for s in s_list: s_mask = strata == s wight = (s_mask & treatment_mask).sum() / s_mask.sum() superset_mask = (folds == fold) & s_mask - subset_test_mask = (folds == fold) & s_mask & treatment_mask subset_train_mask = (folds != fold) & s_mask & treatment_mask - confounding_train = confoundings[subset_train_mask] + covariates_train = covariates[subset_train_mask] binominal_train = binominal[subset_train_mask] if len(np.unique(binominal_train)) == 1: pred = binominal_train[0] @@ -971,7 +742,7 @@ def _compute_interval_probability( ) continue pred = self._compute_model_prediction( - self.model, confoundings[superset_mask] + self.model, covariates[superset_mask] ) prediction[superset_mask, i] = ( pred @@ -983,16 +754,89 @@ def _compute_interval_probability( return prediction.mean(axis=0), prediction, superset_prediction - def _compute_model_prediction(self, model, confoundings: np.ndarray) -> np.ndarray: + def _compute_model_prediction(self, model, covariates: np.ndarray) -> np.ndarray: if hasattr(model, "predict_proba"): if self.is_multi_task: # suppose the shape of prediction is (n_records, n_locations) - return model.predict_proba(confoundings) - probabilities = model.predict_proba(confoundings) + return model.predict_proba(covariates) + probabilities = model.predict_proba(covariates) if probabilities.ndim == 1: # when the shape of prediction is (n_records) return probabilities # when the shape of prediction is (n_records, 2) return probabilities[:, 1] else: - return model.predict(confoundings) + return model.predict(covariates) + + +class SimpleDistributionEstimator(SimpleStratifiedDistributionEstimator): + """ + A class for computing the empirical distribution function and the distributional parameters + based on the distribution function. + """ + + def __init__(self): + """Initializes the SimpleDistributionEstimator. + + Returns: + SimpleDistributionEstimator: An instance of the estimator. + """ + super().__init__() + + def fit( + self, covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.ndarray + ) -> "SimpleDistributionEstimator": + """ + Set parameters. + + Args: + covariates (np.ndarray): Pre-treatment covariates. + treatment_arms (np.ndarray): The index of the treatment arm. + outcomes (np.ndarray): Scalar-valued observed outcome. + + Returns: + SimpleDistributionEstimator: The fitted estimator. + """ + if covariates.shape[0] != treatment_arms.shape[0]: + raise ValueError("The shape of covariates and treatment_arm should be same") + + if covariates.shape[0] != outcomes.shape[0]: + raise ValueError("The shape of covariates and outcome should be same") + + self.covariates = covariates + self.treatment_arms = treatment_arms + self.outcomes = outcomes + self.strata = np.zeros(len(self.covariates)) + + return self + + +class AdjustedDistributionEstimator(AdjustedStratifiedDistributionEstimator): + """A class is for estimating the adjusted distribution function and computing the Distributional parameters based on the trained conditional estimator.""" + + def fit( + self, covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.ndarray + ) -> "AdjustedDistributionEstimator": + """ + Set parameters. + + Args: + covariates (np.ndarray): Pre-treatment covariates. + treatment_arms (np.ndarray): The index of the treatment arm. + outcomes (np.ndarray): Scalar-valued observed outcome. + + Returns: + AdjustedDistributionEstimator: The fitted estimator. + """ + if covariates.shape[0] != treatment_arms.shape[0]: + raise ValueError("The shape of covariates and treatment_arm should be same") + + if covariates.shape[0] != outcomes.shape[0]: + raise ValueError("The shape of covariates and outcome should be same") + + self.covariates = covariates + self.treatment_arms = treatment_arms + self.outcomes = outcomes + self.strata = np.zeros(len(self.covariates)) + + return self diff --git a/dte_adj/util.py b/dte_adj/util.py index 20aa79f..b481eb1 100644 --- a/dte_adj/util.py +++ b/dte_adj/util.py @@ -25,10 +25,10 @@ def compute_confidence_intervals( vec_d (np.ndarray): Treatment indicator vector. vec_loc (np.ndarray): Locations where the distribution parameters are estimated. mat_y_u (np.ndarray): Indicator function for 1{Y⩽y}. Shape is n_obs * n_loc. - vec_prediction_target (np.ndarray): Estimated values from the conditional model for the treatment group. - vec_prediction_control (np.ndarray): Estimated values from the conditional model for the control group. - mat_entire_predictions_target (np.ndarray): Prediction of the conditional distribution estimator for target group. - mat_entire_predictions_control (np.ndarray): Prediction of the conditional distribution estimator for control group. + vec_prediction_target (np.ndarray): Unconditional estimated distributional effects for the treatment group. + vec_prediction_control (np.ndarray): Unconditional estimated distributional effects for the control group. + mat_entire_predictions_target (np.ndarray): Conditional stimated distributional effects for target group. + mat_entire_predictions_control (np.ndarray): Conditional stimated distributional effects for control group. ind_target (int): Index of the target treatment indicator. ind_control (int): Index of the control treatment indicator. alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05. @@ -41,26 +41,14 @@ def compute_confidence_intervals( - np.ndarray: upper bound. """ num_obs = vec_y.shape[0] - n_loc = vec_loc.shape[0] - mat_d = np.tile(vec_d, (n_loc, 1)).T vec_dte = vec_prediction_target - vec_prediction_control - mat_dte = np.tile(vec_dte, (num_obs, 1)) num_target = (vec_d == ind_target).sum() num_control = (vec_d == ind_control).sum() + influence_function = ( - num_obs - / num_target - * (mat_d == ind_target) - * (mat_y_u - mat_entire_predictions_target) - + mat_entire_predictions_target - - num_obs - / num_control - * (mat_d == ind_control) - * (mat_y_u - mat_entire_predictions_control) - - mat_entire_predictions_control - - mat_dte - ) + mat_entire_predictions_target - mat_entire_predictions_target.mean(axis=0) + ) - (mat_entire_predictions_control - mat_entire_predictions_control.mean(axis=0)) omega = (influence_function**2).mean(axis=0) diff --git a/pyproject.toml b/pyproject.toml index 62ea970..a47db8a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,20 +1,20 @@ [build-system] -requires = ["setuptools>=42", "wheel"] +requires = ["setuptools>=77", "wheel"] build-backend = "setuptools.build_meta" [project] name = "dte_adj" -version = "0.1.4" -description = "This is a Python library for a research paper 'Estimating Distributional Treatment Effects in Randomized Experiments: Machine Learning for Variance Reduction'" +version = "0.1.5" +description = "This is a Python library for estimating distributional treatment effects" readme = "README.md" requires-python = ">=3.10" -license = { file = "LICENSE" } +license = "MIT" +license-files = ["LICENSE"] classifiers = [ "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", - "License :: OSI Approved :: MIT License", "Operating System :: OS Independent" ] dependencies = [ @@ -23,5 +23,9 @@ dependencies = [ "scipy~=1.13.1" ] +[tool.setuptools.packages.find] +where = ["."] +include = ["dte_adj", "dte_adj.*"] + [project.urls] homepage = "https://github.com/CyberAgentAILab/python-dte-adjustment" \ No newline at end of file diff --git a/tests/test_adjusted_estimator.py b/tests/test_adjusted_estimator.py index 2d5453e..59679b8 100644 --- a/tests/test_adjusted_estimator.py +++ b/tests/test_adjusted_estimator.py @@ -1,4 +1,5 @@ import unittest +from unittest.mock import patch import numpy as np from dte_adj import AdjustedDistributionEstimator from unittest.mock import MagicMock @@ -9,10 +10,10 @@ def setUp(self): base_model = MagicMock() base_model.predict_proba.side_effect = lambda x, y: x self.estimator = AdjustedDistributionEstimator(base_model, folds=2) - self.confoundings = np.zeros((20, 5)) + self.covariates = np.zeros((20, 5)) self.treatment_arms = np.hstack([np.zeros(10), np.ones(10)]) self.outcomes = np.arange(20) - self.estimator.fit(self.confoundings, self.treatment_arms, self.outcomes) + self.estimator.fit(self.covariates, self.treatment_arms, self.outcomes) def test_init_fail_incorrect_base_model(self): # Act, Assert @@ -53,34 +54,35 @@ def test_fit_fail_invalid_input(self): subject.fit(X, D, Y) self.assertEqual( str(cm.exception), - "The shape of confounding and treatment_arm should be same", + "The shape of covariates and treatment_arm should be same", ) def test_compute_cumulative_distribution(self): # Arrange mock_model = self.estimator.base_model - mock_model.predict_proba.side_effect = lambda x: np.ones((x.shape[0], 2)) * 0.5 + mock_model.predict_proba.side_effect = lambda x: np.ones((len(x), 2)) * 0.5 target_treatment_arm = 0 locations = np.arange(10) # Act - cumulative_distribution, superset_prediction = ( - self.estimator._compute_cumulative_distribution( - target_treatment_arm, - locations, - self.confoundings, - self.treatment_arms, - self.outcomes, + with patch("numpy.random.randint", return_value=np.array([0] * 10 + [1] * 10)): + cumulative_distribution, _, superset_prediction = ( + self.estimator._compute_cumulative_distribution( + target_treatment_arm, + locations, + self.covariates, + self.treatment_arms, + self.outcomes, + ) ) - ) # Assert self.assertEqual(cumulative_distribution.shape, (10,)) self.assertEqual(superset_prediction.shape, (20, 10)) - for i in range(10): + for i in range(9): self.assertAlmostEqual(cumulative_distribution[i], (i + 1) / 10, places=2) for i in range(20): - for j in range(1, 8): + for j in range(9): self.assertAlmostEqual(superset_prediction[i, j], 0.5, places=2) diff --git a/tests/test_distribution_estimator_base.py b/tests/test_distribution_estimator_base.py index c2d3230..f590ca2 100644 --- a/tests/test_distribution_estimator_base.py +++ b/tests/test_distribution_estimator_base.py @@ -7,17 +7,23 @@ def compute_cumulative_distribution( target_treatment_arms: np.ndarray, locations: np.ndarray, - confoundings: np.ndarray, + covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, ) -> np.ndarray: """Mock implementation for testing purposes.""" - return np.linspace( - 0, 0.9, locations.shape[0] - ) + target_treatment_arms * 0.1, np.zeros((outcomes.shape[0], locations.shape[0])) + return ( + np.linspace(0, 0.9, locations.shape[0]) + target_treatment_arms * 0.1, + np.zeros((outcomes.shape[0], locations.shape[0])), + np.zeros((outcomes.shape[0], locations.shape[0])), + ) class MockDistributionEstimator(DistributionEstimatorBase): + """ + Mock class to implement _compute_cumulative_distribution for testing. + """ + def __init__( self, mock_compute_cumulative_distribution=compute_cumulative_distribution ): @@ -27,18 +33,22 @@ def __init__( mock_compute_cumulative_distribution ) - """Mock class to implement _compute_cumulative_distribution for testing.""" + def fit(self, covariates, treatment_arms, outcomes): + """Mock fit method to set attributes.""" + self.covariates = covariates + self.treatment_arms = treatment_arms + self.outcomes = outcomes def _compute_cumulative_distribution( self, target_treatment_arms: np.ndarray, locations: np.ndarray, - confoundings: np.ndarray, + covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, ) -> np.ndarray: return self.compute_cumulative_distribution( - target_treatment_arms, locations, confoundings, treatment_arms, outcomes + target_treatment_arms, locations, covariates, treatment_arms, outcomes ) @@ -53,17 +63,17 @@ def compute_confidence_intervals(*args, **kwargs): class TestDistributionEstimatorBase(unittest.TestCase): def setUp(self): self.estimator = MockDistributionEstimator() - self.confoundings = np.zeros((20, 5)) + self.covariates = np.zeros((20, 5)) self.treatment_arms = np.hstack([np.zeros(10), np.ones(10)]) self.outcomes = np.arange(20) - self.estimator.fit(self.confoundings, self.treatment_arms, self.outcomes) + self.estimator.fit(self.covariates, self.treatment_arms, self.outcomes) def test_initialization(self): # Arrange base_estimator = MockDistributionEstimator() # Assert - self.assertIsNone(base_estimator.confoundings) + self.assertIsNone(base_estimator.covariates) self.assertIsNone(base_estimator.treatment_arms) self.assertIsNone(base_estimator.outcomes) @@ -128,28 +138,12 @@ def test_predict_qte(self): def test_fit_success(self): # Assert - self.assertTrue(np.array_equal(self.estimator.confoundings, self.confoundings)) + self.assertTrue(np.array_equal(self.estimator.covariates, self.covariates)) self.assertTrue( np.array_equal(self.estimator.treatment_arms, self.treatment_arms) ) self.assertTrue(np.array_equal(self.estimator.outcomes, self.outcomes)) - def test_fit_invalid_shapes(self): - # Arrange - confoundings_invalid = np.array([[1, 2], [3, 4]]) - treatment_arms_invalid = np.array([0, 1]) - outcomes_invalid = np.array([0.5, 0.7]) - - # Assert - with self.assertRaises(ValueError): - self.estimator.fit(confoundings_invalid, self.treatment_arms, self.outcomes) - - with self.assertRaises(ValueError): - self.estimator.fit(self.confoundings, treatment_arms_invalid, self.outcomes) - - with self.assertRaises(ValueError): - self.estimator.fit(self.confoundings, self.treatment_arms, outcomes_invalid) - def test_predict_success(self): # Arrange treatment_arm_test = 0 diff --git a/tests/test_simple_estimator.py b/tests/test_simple_estimator.py index 62d7128..80e4928 100644 --- a/tests/test_simple_estimator.py +++ b/tests/test_simple_estimator.py @@ -6,10 +6,10 @@ class TestSimpleEstimator(unittest.TestCase): def setUp(self): self.estimator = SimpleDistributionEstimator() - self.confoundings = np.zeros((20, 5)) + self.covariates = np.zeros((20, 5)) self.treatment_arms = np.hstack([np.zeros(10), np.ones(10)]) self.outcomes = np.arange(20) - self.estimator.fit(self.confoundings, self.treatment_arms, self.outcomes) + self.estimator.fit(self.covariates, self.treatment_arms, self.outcomes) def test_predict(self): # Arrange @@ -22,3 +22,19 @@ def test_predict(self): # Assert np.testing.assert_array_almost_equal(output, expected_output, decimal=2) + + def test_fit_invalid_shapes(self): + # Arrange + covariates_invalid = np.array([[1, 2], [3, 4]]) + treatment_arms_invalid = np.array([0, 1]) + outcomes_invalid = np.array([0.5, 0.7]) + + # Assert + with self.assertRaises(ValueError): + self.estimator.fit(covariates_invalid, self.treatment_arms, self.outcomes) + + with self.assertRaises(ValueError): + self.estimator.fit(self.covariates, treatment_arms_invalid, self.outcomes) + + with self.assertRaises(ValueError): + self.estimator.fit(self.covariates, self.treatment_arms, outcomes_invalid) From 6d4a08f7d1e76c1ccb72ee93cf9245dab955600b Mon Sep 17 00:00:00 2001 From: TomeHirata Date: Thu, 29 May 2025 02:03:28 +0900 Subject: [PATCH 3/3] fix ci --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index dd55c73..7d770ae 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -29,7 +29,7 @@ jobs: - name: Install dependencies run: | - pipenv install --dev + pipenv install --dev --python ${{ matrix.python-version }} - name: Lint with ruff run: |