Skip to content
6 changes: 6 additions & 0 deletions experimenter/experimenter/jetstream/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ def get_monitoring_data():
return load_data_from_gcs(str(path))


def get_enrollment_funnel_data():
filename = "enrollment_funnel_v1_latest.json"
path = Path(ENROLLMENT_COUNTS_FOLDER, filename)
return load_data_from_gcs(str(path))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got distracted by a meeting partway through reviewing this, so I'll just leave a quick comment for posterity: This is another spot where it'd be nice to have the data defined in the schemas package so we can validate it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 we should definitely do schema validation, I will add that in the ticket



def get_results_metrics_map(
data: JetstreamData,
primary_outcome_slugs: list[str],
Expand Down
23 changes: 19 additions & 4 deletions experimenter/experimenter/jetstream/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from experimenter.experiments.constants import NimbusConstants
from experimenter.experiments.models import NimbusChangeLog, NimbusExperiment
from experimenter.jetstream.client import (
get_enrollment_funnel_data,
get_experiment_data,
get_monitoring_data,
get_population_sizing_data,
Expand Down Expand Up @@ -144,6 +145,14 @@ def fetch_monitoring_data():
return

alert_data = data.get("v1")

try:
funnel_data = get_enrollment_funnel_data()
funnel_by_slug = funnel_data.get("v1", {}) if funnel_data else {}
except Exception as e:
logger.warning(f"Could not fetch enrollment funnel data: {e}")
funnel_by_slug = {}

updated_count = 0

for exp_slug, monitoring_data in alert_data.items():
Expand All @@ -153,11 +162,17 @@ def fetch_monitoring_data():
status=NimbusConstants.Status.LIVE,
)

# Only update if data has changed
if experiment.monitoring_data != monitoring_data:
experiment.monitoring_data = monitoring_data
merged = {
**monitoring_data,
"enrollment_funnel": funnel_by_slug.get(exp_slug, []),
}

if experiment.monitoring_data != merged:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can call save with only_fields=("monitoring_data", "monitoring_data_updated_at") to limit the DB update

experiment.monitoring_data = merged
experiment.monitoring_data_updated_at = timezone.now()
experiment.save()
experiment.save(
update_fields=["monitoring_data", "monitoring_data_updated_at"]
)
generate_nimbus_changelog(
experiment,
get_kinto_user(),
Expand Down
139 changes: 131 additions & 8 deletions experimenter/experimenter/jetstream/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
from experimenter.experiments.models import NimbusChangeLog, NimbusExperiment
from experimenter.experiments.tests.factories import NimbusExperimentFactory
from experimenter.jetstream import tasks
from experimenter.jetstream.client import get_data, get_monitoring_data
from experimenter.jetstream.client import (
get_data,
get_enrollment_funnel_data,
get_monitoring_data,
)
from experimenter.jetstream.models import AnalysisWindow, Group
from experimenter.jetstream.tests import mock_valid_outcomes
from experimenter.jetstream.tests.constants import (
Expand Down Expand Up @@ -3441,14 +3445,48 @@ def mock_monitoring_data(request):
return data


@pytest.mark.usefixtures("mock_monitoring_data")
@pytest.fixture
def mock_funnel_entries(request):
data = [
{
"app_name": "firefox_desktop",
"branch": "control",
"status": "Enrolled",
"reason": "Qualified",
"conflict_slug": None,
"client_count": 750000,
},
{
"app_name": "firefox_desktop",
"branch": None,
"status": "NotEnrolled",
"reason": "NotTargeted",
"conflict_slug": None,
"client_count": 5000000,
},
]
if request.instance:
request.instance.funnel_entries = data
return data


@pytest.mark.usefixtures("mock_monitoring_data", "mock_funnel_entries")
class TestFetchMonitoringDataTask(TestCase):
def setUp(self):
super().setUp()
patcher = patch("experimenter.jetstream.tasks.get_monitoring_data")
self.mock_get_monitoring_data = patcher.start()
self.addCleanup(patcher.stop)

self._funnel_patcher = patch(
"experimenter.jetstream.tasks.get_enrollment_funnel_data"
)
self.mock_get_funnel_data = self._funnel_patcher.start()
self.mock_get_funnel_data.return_value = {"v1": {}}

def tearDown(self):
self._funnel_patcher.stop()

@parameterized.expand([(None,), ({},)])
def test_fetch_monitoring_data_no_data(self, return_value):
self.mock_get_monitoring_data.return_value = return_value
Expand All @@ -3469,9 +3507,62 @@ def test_fetch_monitoring_data_updates_live_experiment(self):
tasks.fetch_monitoring_data()

experiment.refresh_from_db()
self.assertEqual(experiment.monitoring_data, self.monitoring_data)
self.assertEqual(
experiment.monitoring_data,
{**self.monitoring_data, "enrollment_funnel": []},
)
self.assertIsNotNone(experiment.monitoring_data_updated_at)

def test_fetch_monitoring_data_merges_funnel_data(self):
experiment = NimbusExperimentFactory.create(
status=NimbusExperiment.Status.LIVE,
monitoring_data={},
)
self.mock_get_monitoring_data.return_value = {
"v1": {experiment.slug: self.monitoring_data}
}
self.mock_get_funnel_data.return_value = {
"v1": {experiment.slug: self.funnel_entries}
}

tasks.fetch_monitoring_data()

experiment.refresh_from_db()
self.assertEqual(
experiment.monitoring_data,
{**self.monitoring_data, "enrollment_funnel": self.funnel_entries},
)

def test_fetch_monitoring_data_funnel_defaults_to_empty_list_when_missing(self):
experiment = NimbusExperimentFactory.create(
status=NimbusExperiment.Status.LIVE,
monitoring_data={},
)
self.mock_get_monitoring_data.return_value = {
"v1": {experiment.slug: self.monitoring_data}
}
self.mock_get_funnel_data.return_value = {"v1": {}}

tasks.fetch_monitoring_data()

experiment.refresh_from_db()
self.assertEqual(experiment.monitoring_data["enrollment_funnel"], [])

def test_fetch_monitoring_data_continues_if_funnel_fetch_fails(self):
experiment = NimbusExperimentFactory.create(
status=NimbusExperiment.Status.LIVE,
monitoring_data={},
)
self.mock_get_monitoring_data.return_value = {
"v1": {experiment.slug: self.monitoring_data}
}
self.mock_get_funnel_data.side_effect = Exception("GCS unavailable")

tasks.fetch_monitoring_data()

experiment.refresh_from_db()
self.assertEqual(experiment.monitoring_data["enrollment_funnel"], [])

@parameterized.expand(
[
(NimbusExperiment.Status.DRAFT,),
Expand Down Expand Up @@ -3558,8 +3649,8 @@ def test_fetch_monitoring_data_updates_multiple_experiments(self):

exp1.refresh_from_db()
exp2.refresh_from_db()
self.assertEqual(exp1.monitoring_data, data1)
self.assertEqual(exp2.monitoring_data, data2)
self.assertEqual(exp1.monitoring_data, {**data1, "enrollment_funnel": []})
self.assertEqual(exp2.monitoring_data, {**data2, "enrollment_funnel": []})

def test_fetch_monitoring_data_fatal_error(self):
self.mock_get_monitoring_data.side_effect = Exception("GCS connection failed")
Expand All @@ -3586,7 +3677,9 @@ def test_get_monitoring_data_success(self, mock_load):
result = get_monitoring_data()

self.assertEqual(result, data)
mock_load.assert_called_once()
mock_load.assert_called_once_with(
"enrollment_counts/enrollment_counts_latest.json"
)

@patch("experimenter.jetstream.client.load_data_from_gcs")
def test_get_monitoring_data_no_data(self, mock_load):
Expand All @@ -3598,7 +3691,37 @@ def test_get_monitoring_data_no_data(self, mock_load):

@patch("experimenter.jetstream.client.load_data_from_gcs")
def test_get_monitoring_data_exception(self, mock_load):
mock_load.side_effect = Exception("GCS error")
mock_load.side_effect = RuntimeError("GCS error")

with self.assertRaises(Exception):
with self.assertRaises(RuntimeError):
get_monitoring_data()


@pytest.mark.usefixtures("mock_funnel_entries")
class TestGetEnrollmentFunnelData(TestCase):
@patch("experimenter.jetstream.client.load_data_from_gcs")
def test_get_enrollment_funnel_data_success(self, mock_load):
data = {"v1": {"experiment-1": self.funnel_entries}}
mock_load.return_value = data

result = get_enrollment_funnel_data()

self.assertEqual(result, data)
mock_load.assert_called_once_with(
"enrollment_counts/enrollment_funnel_v1_latest.json"
)

@patch("experimenter.jetstream.client.load_data_from_gcs")
def test_get_enrollment_funnel_data_no_data(self, mock_load):
mock_load.return_value = None

result = get_enrollment_funnel_data()

self.assertIsNone(result)

@patch("experimenter.jetstream.client.load_data_from_gcs")
def test_get_enrollment_funnel_data_exception(self, mock_load):
mock_load.side_effect = RuntimeError("GCS error")

with self.assertRaises(RuntimeError):
get_enrollment_funnel_data()
Loading