Skip to content

Commit cc0eb77

Browse files
committed
Add e2e tests
1 parent c43869d commit cc0eb77

3 files changed

Lines changed: 323 additions & 1 deletion

File tree

tests/test_local_estimators.py

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,72 @@
11
import unittest
22
import numpy as np
3-
from sklearn.linear_model import LogisticRegression
3+
from sklearn.linear_model import LogisticRegression, LinearRegression
44
from dte_adj import SimpleLocalDistributionEstimator, AdjustedLocalDistributionEstimator
55

6+
np.random.seed(123)
7+
8+
9+
def generate_data(n=1000, S=4):
10+
# Generate W ~ U(0,1)
11+
W = np.random.uniform(0, 1, n)
12+
13+
# Assign strata based on W
14+
strata = np.digitize(W, np.linspace(0, 1, S + 1)[1:])
15+
16+
# Generate X ~ N(0, I_20)
17+
X = np.random.randn(n, 20)
18+
19+
# Treatment assignment Z ~ Bernoulli(0.5) within each stratum
20+
Z = np.zeros(n)
21+
for s in range(S):
22+
indices = np.where(strata == s)[0]
23+
Z[indices] = np.random.binomial(1, 0.5, size=len(indices))
24+
25+
# Define functions b(X, W) and c(X, W)
26+
def b(X, W):
27+
return (
28+
np.sin(np.pi * X[:, 0] * X[:, 1])
29+
+ 2 * (X[:, 2] - 0.5) ** 2
30+
+ X[:, 3]
31+
+ 0.5 * X[:, 4]
32+
+ 0.1 * W
33+
)
34+
35+
def c(X, W):
36+
return 0.1 * (X[:, 0] + np.log(1 + np.exp(X[:, 1])) + W)
37+
38+
# Define parameters
39+
a1, a0 = 4, 1
40+
b1, b0 = 1, -1
41+
c1, c0 = 3, 3
42+
43+
# Generate errors
44+
epsilon = np.random.randn(n)
45+
46+
# Compute Y(d)
47+
Y0 = a0 + b(X, W) + epsilon
48+
Y1 = a1 + b(X, W) + epsilon
49+
50+
# Compute D(0) and D(1)
51+
D0 = (b0 + c(X, W) > c0 * epsilon).astype(int)
52+
D1 = np.where(D0 == 0, (b1 + c(X, W) > c1 * epsilon).astype(int), 1)
53+
54+
# Compute observed D and Y
55+
D = D1 * Z + D0 * (1 - Z)
56+
Y = Y1 * D + Y0 * (1 - D)
57+
58+
# discrete
59+
Y = np.random.poisson(np.abs(Y))
60+
61+
return {
62+
"W": W,
63+
"X": X,
64+
"Z": Z,
65+
"D": D,
66+
"Y": Y,
67+
"strata": strata,
68+
}
69+
670

771
class TestLocalEstimators(unittest.TestCase):
872
def setUp(self):
@@ -232,3 +296,43 @@ def test_adjusted_local_estimator_predict_lpte(self):
232296
self.assertTrue(np.all(lower_bound <= upper_bound))
233297
self.assertTrue(np.all(lower_bound <= beta))
234298
self.assertTrue(np.all(beta <= upper_bound))
299+
300+
301+
class TestE2E(unittest.TestCase):
302+
def test_e2e(self):
303+
# Arrange
304+
data = generate_data(n=3000)
305+
X, D, Y, Z, S = data["X"], data["W"], data["Y"], data["Z"], data["strata"]
306+
locations = np.array([np.percentile(Y, p) for p in range(10, 91, 10)])
307+
simple_estimator = SimpleLocalDistributionEstimator()
308+
adjusted_estimator = AdjustedLocalDistributionEstimator(LinearRegression())
309+
310+
# Act
311+
simple_estimator.fit(X, Z, D, Y, S)
312+
adjusted_estimator.fit(X, Z, D, Y, S)
313+
314+
simple_dte, simple_lower_bound, simple_upper_bound = (
315+
simple_estimator.predict_dte(1, 0, locations)
316+
)
317+
adjusted_dte, adjusted_lower_bound, adjusted_upper_bound = (
318+
adjusted_estimator.predict_dte(1, 0, locations)
319+
)
320+
321+
# Assert
322+
np.testing.assert_(np.all(simple_dte < 0), "Not all values are negative")
323+
np.testing.assert_(np.all(adjusted_dte < 0), "Not all values are negative")
324+
np.testing.assert_(
325+
np.all(simple_lower_bound < simple_upper_bound),
326+
"Upper bound is less than lower bound",
327+
)
328+
np.testing.assert_(
329+
np.all(adjusted_lower_bound < adjusted_upper_bound),
330+
"Upper bound is less than lower bound",
331+
)
332+
np.testing.assert_(
333+
np.all(
334+
adjusted_upper_bound - adjusted_lower_bound
335+
< simple_upper_bound - simple_lower_bound
336+
),
337+
"Adjusted estimator does not have narrower intervals",
338+
)

tests/test_simple_estimator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from sklearn.linear_model import LogisticRegression
55
from dte_adj import SimpleDistributionEstimator, AdjustedDistributionEstimator
66

7+
np.random.seed(123)
8+
79

810
def generate_data(n, d_x=100, rho=0.5):
911
"""
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
import unittest
2+
import numpy as np
3+
from sklearn.linear_model import LogisticRegression
4+
from dte_adj import (
5+
SimpleStratifiedDistributionEstimator,
6+
AdjustedStratifiedDistributionEstimator,
7+
)
8+
9+
10+
def generate_data(n=1000, S=4, d=2, discrete=False):
11+
d = 20
12+
13+
Z = np.random.uniform(0, 1, n)
14+
15+
S_i = np.digitize(Z, np.linspace(0, 1, S + 1)[1:-1])
16+
17+
X = np.random.multivariate_normal(mean=np.zeros(d), cov=np.eye(d), size=n)
18+
19+
W = np.zeros(n, dtype=int)
20+
unique_strata = np.unique(S_i)
21+
for s in unique_strata:
22+
idx = np.where(S_i == s)[0]
23+
n_s = len(idx)
24+
W[idx[: n_s // 2]] = 1
25+
np.random.shuffle(W[idx])
26+
27+
b_X = (
28+
np.sin(np.pi * X[:, 0] * X[:, 1])
29+
+ 2 * (X[:, 2] - 0.5) ** 2
30+
+ X[:, 3]
31+
+ 0.5 * X[:, 4]
32+
)
33+
c_X = 0.1 * (X[:, 0] + np.log(1 + np.exp(X[:, 1])))
34+
35+
gamma = 0.1
36+
u = np.random.normal(0, 1, n)
37+
38+
Y = b_X + c_X * W + gamma * Z + u
39+
if discrete:
40+
Y = np.random.poisson(0.2 * np.abs(Y))
41+
42+
return {"W": W, "X": X, "Z": Z, "Y": Y, "strata": S_i}
43+
44+
45+
class TestStratifiedEstimators(unittest.TestCase):
46+
def setUp(self):
47+
np.random.seed(42)
48+
data = generate_data(n=1000, S=4, d=20, discrete=False)
49+
self.X = data["X"]
50+
self.W = data["W"]
51+
self.Y = data["Y"]
52+
self.strata = data["strata"]
53+
self.locations = np.linspace(self.Y.min(), self.Y.max(), 20)
54+
55+
def test_simple_stratified_estimator_fit(self):
56+
estimator = SimpleStratifiedDistributionEstimator()
57+
result = estimator.fit(self.X, self.W, self.Y, self.strata)
58+
59+
self.assertIsInstance(result, SimpleStratifiedDistributionEstimator)
60+
self.assertTrue(np.array_equal(estimator.covariates, self.X))
61+
self.assertTrue(np.array_equal(estimator.treatment_arms, self.W))
62+
self.assertTrue(np.array_equal(estimator.outcomes, self.Y))
63+
self.assertTrue(np.array_equal(estimator.strata, self.strata))
64+
65+
def test_simple_stratified_estimator_predict_dte(self):
66+
estimator = SimpleStratifiedDistributionEstimator()
67+
estimator.fit(self.X, self.W, self.Y, self.strata)
68+
69+
dte, lower_bound, upper_bound = estimator.predict_dte(
70+
target_treatment_arm=1,
71+
control_treatment_arm=0,
72+
locations=self.locations,
73+
alpha=0.05,
74+
)
75+
76+
self.assertEqual(dte.shape, self.locations.shape)
77+
self.assertEqual(lower_bound.shape, self.locations.shape)
78+
self.assertEqual(upper_bound.shape, self.locations.shape)
79+
self.assertTrue(np.all(lower_bound <= dte))
80+
self.assertTrue(np.all(dte <= upper_bound))
81+
82+
def test_simple_stratified_estimator_predict_pte(self):
83+
estimator = SimpleStratifiedDistributionEstimator()
84+
estimator.fit(self.X, self.W, self.Y, self.strata)
85+
86+
pte, lower_bound, upper_bound = estimator.predict_pte(
87+
target_treatment_arm=1,
88+
control_treatment_arm=0,
89+
locations=self.locations,
90+
alpha=0.05,
91+
)
92+
93+
expected_length = len(self.locations) - 1
94+
self.assertEqual(pte.shape, (expected_length,))
95+
self.assertEqual(lower_bound.shape, (expected_length,))
96+
self.assertEqual(upper_bound.shape, (expected_length,))
97+
self.assertTrue(np.all(lower_bound <= upper_bound))
98+
99+
def test_simple_stratified_estimator_predict_qte(self):
100+
estimator = SimpleStratifiedDistributionEstimator()
101+
estimator.fit(self.X, self.W, self.Y, self.strata)
102+
103+
quantiles = np.array([0.25, 0.5, 0.75])
104+
qte, lower_bound, upper_bound = estimator.predict_qte(
105+
target_treatment_arm=1,
106+
control_treatment_arm=0,
107+
quantiles=quantiles,
108+
n_bootstrap=50,
109+
)
110+
111+
self.assertEqual(qte.shape, quantiles.shape)
112+
self.assertEqual(lower_bound.shape, quantiles.shape)
113+
self.assertEqual(upper_bound.shape, quantiles.shape)
114+
self.assertTrue(np.all(lower_bound <= upper_bound))
115+
116+
def test_adjusted_stratified_estimator_fit(self):
117+
base_model = LogisticRegression(max_iter=1000, random_state=42)
118+
estimator = AdjustedStratifiedDistributionEstimator(base_model, folds=3)
119+
result = estimator.fit(self.X, self.W, self.Y, self.strata)
120+
121+
self.assertIsInstance(result, AdjustedStratifiedDistributionEstimator)
122+
self.assertTrue(np.array_equal(estimator.covariates, self.X))
123+
self.assertTrue(np.array_equal(estimator.treatment_arms, self.W))
124+
self.assertTrue(np.array_equal(estimator.outcomes, self.Y))
125+
self.assertTrue(np.array_equal(estimator.strata, self.strata))
126+
self.assertEqual(estimator.folds, 3)
127+
128+
def test_adjusted_stratified_estimator_predict_dte(self):
129+
base_model = LogisticRegression(max_iter=1000, random_state=42)
130+
estimator = AdjustedStratifiedDistributionEstimator(base_model, folds=3)
131+
estimator.fit(self.X, self.W, self.Y, self.strata)
132+
133+
dte, lower_bound, upper_bound = estimator.predict_dte(
134+
target_treatment_arm=1,
135+
control_treatment_arm=0,
136+
locations=self.locations,
137+
alpha=0.05,
138+
variance_type="moment",
139+
)
140+
141+
self.assertEqual(dte.shape, self.locations.shape)
142+
self.assertEqual(lower_bound.shape, self.locations.shape)
143+
self.assertEqual(upper_bound.shape, self.locations.shape)
144+
self.assertTrue(np.all(lower_bound <= dte))
145+
self.assertTrue(np.all(dte <= upper_bound))
146+
147+
def test_adjusted_stratified_estimator_predict_pte(self):
148+
base_model = LogisticRegression(max_iter=1000, random_state=42)
149+
estimator = AdjustedStratifiedDistributionEstimator(base_model, folds=3)
150+
estimator.fit(self.X, self.W, self.Y, self.strata)
151+
152+
pte, lower_bound, upper_bound = estimator.predict_pte(
153+
target_treatment_arm=1,
154+
control_treatment_arm=0,
155+
locations=self.locations,
156+
alpha=0.05,
157+
variance_type="moment",
158+
)
159+
160+
expected_length = len(self.locations) - 1
161+
self.assertEqual(pte.shape, (expected_length,))
162+
self.assertEqual(lower_bound.shape, (expected_length,))
163+
self.assertEqual(upper_bound.shape, (expected_length,))
164+
self.assertTrue(np.all(lower_bound <= upper_bound))
165+
166+
def test_adjusted_stratified_estimator_predict_qte(self):
167+
base_model = LogisticRegression(max_iter=1000, random_state=42)
168+
estimator = AdjustedStratifiedDistributionEstimator(base_model, folds=3)
169+
estimator.fit(self.X, self.W, self.Y, self.strata)
170+
171+
quantiles = np.array([0.25, 0.5, 0.75])
172+
qte, lower_bound, upper_bound = estimator.predict_qte(
173+
target_treatment_arm=1,
174+
control_treatment_arm=0,
175+
quantiles=quantiles,
176+
n_bootstrap=50,
177+
)
178+
179+
self.assertEqual(qte.shape, quantiles.shape)
180+
self.assertEqual(lower_bound.shape, quantiles.shape)
181+
self.assertEqual(upper_bound.shape, quantiles.shape)
182+
self.assertTrue(np.all(lower_bound <= upper_bound))
183+
184+
def test_discrete_outcomes(self):
185+
data = generate_data(n=1000, S=4, d=20, discrete=True)
186+
187+
estimator = SimpleStratifiedDistributionEstimator()
188+
estimator.fit(data["X"], data["W"], data["Y"], data["strata"])
189+
190+
locations = np.arange(0, data["Y"].max() + 1)
191+
dte, lower, upper = estimator.predict_dte(1, 0, locations)
192+
193+
self.assertEqual(dte.shape, locations.shape)
194+
self.assertTrue(np.all(lower <= upper))
195+
196+
def test_invalid_input_shapes(self):
197+
estimator = SimpleStratifiedDistributionEstimator()
198+
199+
X_wrong = self.X[:-10]
200+
201+
with self.assertRaises(ValueError):
202+
estimator.fit(X_wrong, self.W, self.Y, self.strata)
203+
204+
def test_different_alpha_values(self):
205+
estimator = SimpleStratifiedDistributionEstimator()
206+
estimator.fit(self.X, self.W, self.Y, self.strata)
207+
208+
locations = self.locations[:10]
209+
210+
_, lower_005, upper_005 = estimator.predict_dte(1, 0, locations, alpha=0.05)
211+
_, lower_010, upper_010 = estimator.predict_dte(1, 0, locations, alpha=0.10)
212+
213+
width_005 = upper_005 - lower_005
214+
width_010 = upper_010 - lower_010
215+
216+
self.assertTrue(np.all(width_010 < width_005))

0 commit comments

Comments
 (0)