From 2b7c93d56a7091f949d18fccc8d1e1182701d375 Mon Sep 17 00:00:00 2001 From: Yashika Khurana Date: Tue, 26 May 2026 15:18:46 -0400 Subject: [PATCH 1/7] feat(nimbus): fetch and store enrollment funnel data alongside monitoring alerts (EXP-6872) Extends fetch_monitoring_data task to also pull enrollment_funnel_v1_latest.json from GCS and merge per-experiment funnel rows into monitoring_data["enrollment_funnel"]. Funnel fetch failures are handled gracefully (warning log, defaults to empty list) so alert data continues to save even when funnel data is unavailable. --- experimenter/experimenter/jetstream/client.py | 6 ++ experimenter/experimenter/jetstream/tasks.py | 19 +++- .../jetstream/tests/test_tasks.py | 86 ++++++++++++++++++- 3 files changed, 105 insertions(+), 6 deletions(-) diff --git a/experimenter/experimenter/jetstream/client.py b/experimenter/experimenter/jetstream/client.py index 0289e6689..a1e32cca0 100644 --- a/experimenter/experimenter/jetstream/client.py +++ b/experimenter/experimenter/jetstream/client.py @@ -105,6 +105,12 @@ def get_monitoring_data(): return load_data_from_gcs(str(path)) +def get_enrollment_funnel_data(): + filename = "enrollment_funnel_v1_latest.json" + path = Path(ENROLLMENT_COUNTS_FOLDER, filename) + return load_data_from_gcs(str(path)) + + def get_results_metrics_map( data: JetstreamData, primary_outcome_slugs: list[str], diff --git a/experimenter/experimenter/jetstream/tasks.py b/experimenter/experimenter/jetstream/tasks.py index b1fbe7e2e..d4782c08e 100644 --- a/experimenter/experimenter/jetstream/tasks.py +++ b/experimenter/experimenter/jetstream/tasks.py @@ -11,6 +11,7 @@ from experimenter.experiments.constants import NimbusConstants from experimenter.experiments.models import NimbusChangeLog, NimbusExperiment from experimenter.jetstream.client import ( + get_enrollment_funnel_data, get_experiment_data, get_monitoring_data, get_population_sizing_data, @@ -144,6 +145,14 @@ def fetch_monitoring_data(): return alert_data = data.get("v1") + + try: + funnel_data = get_enrollment_funnel_data() + funnel_by_slug = funnel_data.get("v1", {}) if funnel_data else {} + except Exception as e: + logger.warning(f"Could not fetch enrollment funnel data: {e}") + funnel_by_slug = {} + updated_count = 0 for exp_slug, monitoring_data in alert_data.items(): @@ -153,9 +162,13 @@ def fetch_monitoring_data(): status=NimbusConstants.Status.LIVE, ) - # Only update if data has changed - if experiment.monitoring_data != monitoring_data: - experiment.monitoring_data = monitoring_data + merged = { + **monitoring_data, + "enrollment_funnel": funnel_by_slug.get(exp_slug, []), + } + + if experiment.monitoring_data != merged: + experiment.monitoring_data = merged experiment.monitoring_data_updated_at = timezone.now() experiment.save() generate_nimbus_changelog( diff --git a/experimenter/experimenter/jetstream/tests/test_tasks.py b/experimenter/experimenter/jetstream/tests/test_tasks.py index e30df21c0..9623e7d0a 100644 --- a/experimenter/experimenter/jetstream/tests/test_tasks.py +++ b/experimenter/experimenter/jetstream/tests/test_tasks.py @@ -3441,6 +3441,26 @@ def mock_monitoring_data(request): return data +SAMPLE_FUNNEL_ENTRIES = [ + { + "app_name": "firefox_desktop", + "branch": "control", + "status": "Enrolled", + "reason": "Qualified", + "conflict_slug": None, + "client_count": 750000, + }, + { + "app_name": "firefox_desktop", + "branch": None, + "status": "NotEnrolled", + "reason": "NotTargeted", + "conflict_slug": None, + "client_count": 5000000, + }, +] + + @pytest.mark.usefixtures("mock_monitoring_data") class TestFetchMonitoringDataTask(TestCase): def setUp(self): @@ -3449,6 +3469,13 @@ def setUp(self): self.mock_get_monitoring_data = patcher.start() self.addCleanup(patcher.stop) + funnel_patcher = patch( + "experimenter.jetstream.tasks.get_enrollment_funnel_data" + ) + self.mock_get_funnel_data = funnel_patcher.start() + self.mock_get_funnel_data.return_value = {"v1": {}} + self.addCleanup(funnel_patcher.stop) + @parameterized.expand([(None,), ({},)]) def test_fetch_monitoring_data_no_data(self, return_value): self.mock_get_monitoring_data.return_value = return_value @@ -3469,9 +3496,62 @@ def test_fetch_monitoring_data_updates_live_experiment(self): tasks.fetch_monitoring_data() experiment.refresh_from_db() - self.assertEqual(experiment.monitoring_data, self.monitoring_data) + self.assertEqual( + experiment.monitoring_data, + {**self.monitoring_data, "enrollment_funnel": []}, + ) self.assertIsNotNone(experiment.monitoring_data_updated_at) + def test_fetch_monitoring_data_merges_funnel_data(self): + experiment = NimbusExperimentFactory.create( + status=NimbusExperiment.Status.LIVE, + monitoring_data={}, + ) + self.mock_get_monitoring_data.return_value = { + "v1": {experiment.slug: self.monitoring_data} + } + self.mock_get_funnel_data.return_value = { + "v1": {experiment.slug: SAMPLE_FUNNEL_ENTRIES} + } + + tasks.fetch_monitoring_data() + + experiment.refresh_from_db() + self.assertEqual( + experiment.monitoring_data, + {**self.monitoring_data, "enrollment_funnel": SAMPLE_FUNNEL_ENTRIES}, + ) + + def test_fetch_monitoring_data_funnel_defaults_to_empty_list_when_missing(self): + experiment = NimbusExperimentFactory.create( + status=NimbusExperiment.Status.LIVE, + monitoring_data={}, + ) + self.mock_get_monitoring_data.return_value = { + "v1": {experiment.slug: self.monitoring_data} + } + self.mock_get_funnel_data.return_value = {"v1": {}} + + tasks.fetch_monitoring_data() + + experiment.refresh_from_db() + self.assertEqual(experiment.monitoring_data["enrollment_funnel"], []) + + def test_fetch_monitoring_data_continues_if_funnel_fetch_fails(self): + experiment = NimbusExperimentFactory.create( + status=NimbusExperiment.Status.LIVE, + monitoring_data={}, + ) + self.mock_get_monitoring_data.return_value = { + "v1": {experiment.slug: self.monitoring_data} + } + self.mock_get_funnel_data.side_effect = Exception("GCS unavailable") + + tasks.fetch_monitoring_data() + + experiment.refresh_from_db() + self.assertEqual(experiment.monitoring_data["enrollment_funnel"], []) + @parameterized.expand( [ (NimbusExperiment.Status.DRAFT,), @@ -3558,8 +3638,8 @@ def test_fetch_monitoring_data_updates_multiple_experiments(self): exp1.refresh_from_db() exp2.refresh_from_db() - self.assertEqual(exp1.monitoring_data, data1) - self.assertEqual(exp2.monitoring_data, data2) + self.assertEqual(exp1.monitoring_data, {**data1, "enrollment_funnel": []}) + self.assertEqual(exp2.monitoring_data, {**data2, "enrollment_funnel": []}) def test_fetch_monitoring_data_fatal_error(self): self.mock_get_monitoring_data.side_effect = Exception("GCS connection failed") From c8e3d8a6f6fd9bb1939fbc84612005d3a3d7eca4 Mon Sep 17 00:00:00 2001 From: Yashika Khurana Date: Wed, 27 May 2026 10:29:32 -0400 Subject: [PATCH 2/7] chore(nimbus): refactor GCS client tests and add enrollment funnel coverage (EXP-6872) --- .../jetstream/tests/test_tasks.py | 47 ++++++++++++++++--- 1 file changed, 40 insertions(+), 7 deletions(-) diff --git a/experimenter/experimenter/jetstream/tests/test_tasks.py b/experimenter/experimenter/jetstream/tests/test_tasks.py index 9623e7d0a..7ccecfa34 100644 --- a/experimenter/experimenter/jetstream/tests/test_tasks.py +++ b/experimenter/experimenter/jetstream/tests/test_tasks.py @@ -13,7 +13,11 @@ from experimenter.experiments.models import NimbusChangeLog, NimbusExperiment from experimenter.experiments.tests.factories import NimbusExperimentFactory from experimenter.jetstream import tasks -from experimenter.jetstream.client import get_data, get_monitoring_data +from experimenter.jetstream.client import ( + get_data, + get_enrollment_funnel_data, + get_monitoring_data, +) from experimenter.jetstream.models import AnalysisWindow, Group from experimenter.jetstream.tests import mock_valid_outcomes from experimenter.jetstream.tests.constants import ( @@ -3469,9 +3473,7 @@ def setUp(self): self.mock_get_monitoring_data = patcher.start() self.addCleanup(patcher.stop) - funnel_patcher = patch( - "experimenter.jetstream.tasks.get_enrollment_funnel_data" - ) + funnel_patcher = patch("experimenter.jetstream.tasks.get_enrollment_funnel_data") self.mock_get_funnel_data = funnel_patcher.start() self.mock_get_funnel_data.return_value = {"v1": {}} self.addCleanup(funnel_patcher.stop) @@ -3666,7 +3668,9 @@ def test_get_monitoring_data_success(self, mock_load): result = get_monitoring_data() self.assertEqual(result, data) - mock_load.assert_called_once() + mock_load.assert_called_once_with( + "enrollment_counts/enrollment_counts_latest.json" + ) @patch("experimenter.jetstream.client.load_data_from_gcs") def test_get_monitoring_data_no_data(self, mock_load): @@ -3678,7 +3682,36 @@ def test_get_monitoring_data_no_data(self, mock_load): @patch("experimenter.jetstream.client.load_data_from_gcs") def test_get_monitoring_data_exception(self, mock_load): - mock_load.side_effect = Exception("GCS error") + mock_load.side_effect = RuntimeError("GCS error") - with self.assertRaises(Exception): + with self.assertRaises(RuntimeError): get_monitoring_data() + + +class TestGetEnrollmentFunnelData(TestCase): + @patch("experimenter.jetstream.client.load_data_from_gcs") + def test_get_enrollment_funnel_data_success(self, mock_load): + data = {"v1": {"experiment-1": SAMPLE_FUNNEL_ENTRIES}} + mock_load.return_value = data + + result = get_enrollment_funnel_data() + + self.assertEqual(result, data) + mock_load.assert_called_once_with( + "enrollment_counts/enrollment_funnel_v1_latest.json" + ) + + @patch("experimenter.jetstream.client.load_data_from_gcs") + def test_get_enrollment_funnel_data_no_data(self, mock_load): + mock_load.return_value = None + + result = get_enrollment_funnel_data() + + self.assertIsNone(result) + + @patch("experimenter.jetstream.client.load_data_from_gcs") + def test_get_enrollment_funnel_data_exception(self, mock_load): + mock_load.side_effect = RuntimeError("GCS error") + + with self.assertRaises(RuntimeError): + get_enrollment_funnel_data() From 737af3093557bc2894be9009ba420a71465de85b Mon Sep 17 00:00:00 2001 From: Yashika Khurana Date: Wed, 27 May 2026 10:39:57 -0400 Subject: [PATCH 3/7] chore(nimbus): convert SAMPLE_FUNNEL_ENTRIES to pytest fixture (EXP-6872) --- .../jetstream/tests/test_tasks.py | 54 ++++++++++--------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/experimenter/experimenter/jetstream/tests/test_tasks.py b/experimenter/experimenter/jetstream/tests/test_tasks.py index 7ccecfa34..e7e78f4d2 100644 --- a/experimenter/experimenter/jetstream/tests/test_tasks.py +++ b/experimenter/experimenter/jetstream/tests/test_tasks.py @@ -3445,27 +3445,32 @@ def mock_monitoring_data(request): return data -SAMPLE_FUNNEL_ENTRIES = [ - { - "app_name": "firefox_desktop", - "branch": "control", - "status": "Enrolled", - "reason": "Qualified", - "conflict_slug": None, - "client_count": 750000, - }, - { - "app_name": "firefox_desktop", - "branch": None, - "status": "NotEnrolled", - "reason": "NotTargeted", - "conflict_slug": None, - "client_count": 5000000, - }, -] - - -@pytest.mark.usefixtures("mock_monitoring_data") +@pytest.fixture +def mock_funnel_entries(request): + data = [ + { + "app_name": "firefox_desktop", + "branch": "control", + "status": "Enrolled", + "reason": "Qualified", + "conflict_slug": None, + "client_count": 750000, + }, + { + "app_name": "firefox_desktop", + "branch": None, + "status": "NotEnrolled", + "reason": "NotTargeted", + "conflict_slug": None, + "client_count": 5000000, + }, + ] + if request.instance: + request.instance.funnel_entries = data + return data + + +@pytest.mark.usefixtures("mock_monitoring_data", "mock_funnel_entries") class TestFetchMonitoringDataTask(TestCase): def setUp(self): super().setUp() @@ -3513,7 +3518,7 @@ def test_fetch_monitoring_data_merges_funnel_data(self): "v1": {experiment.slug: self.monitoring_data} } self.mock_get_funnel_data.return_value = { - "v1": {experiment.slug: SAMPLE_FUNNEL_ENTRIES} + "v1": {experiment.slug: self.funnel_entries} } tasks.fetch_monitoring_data() @@ -3521,7 +3526,7 @@ def test_fetch_monitoring_data_merges_funnel_data(self): experiment.refresh_from_db() self.assertEqual( experiment.monitoring_data, - {**self.monitoring_data, "enrollment_funnel": SAMPLE_FUNNEL_ENTRIES}, + {**self.monitoring_data, "enrollment_funnel": self.funnel_entries}, ) def test_fetch_monitoring_data_funnel_defaults_to_empty_list_when_missing(self): @@ -3688,10 +3693,11 @@ def test_get_monitoring_data_exception(self, mock_load): get_monitoring_data() +@pytest.mark.usefixtures("mock_funnel_entries") class TestGetEnrollmentFunnelData(TestCase): @patch("experimenter.jetstream.client.load_data_from_gcs") def test_get_enrollment_funnel_data_success(self, mock_load): - data = {"v1": {"experiment-1": SAMPLE_FUNNEL_ENTRIES}} + data = {"v1": {"experiment-1": self.funnel_entries}} mock_load.return_value = data result = get_enrollment_funnel_data() From 9e6b1f13abb1452bcb71271247b2bec796d37749 Mon Sep 17 00:00:00 2001 From: Yashika Khurana Date: Wed, 27 May 2026 11:41:14 -0400 Subject: [PATCH 4/7] Update experimenter/experimenter/jetstream/tests/test_tasks.py Co-authored-by: Beth Rennie --- experimenter/experimenter/jetstream/tests/test_tasks.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/experimenter/experimenter/jetstream/tests/test_tasks.py b/experimenter/experimenter/jetstream/tests/test_tasks.py index e7e78f4d2..b387726de 100644 --- a/experimenter/experimenter/jetstream/tests/test_tasks.py +++ b/experimenter/experimenter/jetstream/tests/test_tasks.py @@ -3478,10 +3478,12 @@ def setUp(self): self.mock_get_monitoring_data = patcher.start() self.addCleanup(patcher.stop) - funnel_patcher = patch("experimenter.jetstream.tasks.get_enrollment_funnel_data") - self.mock_get_funnel_data = funnel_patcher.start() + self._funnel_patcher = patch("experimenter.jetstream.tasks.get_enrollment_funnel_data") + self.mock_get_funnel_data = self._funnel_patcher.start() self.mock_get_funnel_data.return_value = {"v1": {}} - self.addCleanup(funnel_patcher.stop) + + def tearDown(self): + self._funnel_patcher.stop() @parameterized.expand([(None,), ({},)]) def test_fetch_monitoring_data_no_data(self, return_value): From e9e1eab9a559fdf562a74e95d5f07c7b2534a88d Mon Sep 17 00:00:00 2001 From: Yashika Khurana Date: Wed, 27 May 2026 11:41:30 -0400 Subject: [PATCH 5/7] Update experimenter/experimenter/jetstream/client.py Co-authored-by: Beth Rennie --- experimenter/experimenter/jetstream/client.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/experimenter/experimenter/jetstream/client.py b/experimenter/experimenter/jetstream/client.py index a1e32cca0..456510a2a 100644 --- a/experimenter/experimenter/jetstream/client.py +++ b/experimenter/experimenter/jetstream/client.py @@ -106,9 +106,7 @@ def get_monitoring_data(): def get_enrollment_funnel_data(): - filename = "enrollment_funnel_v1_latest.json" - path = Path(ENROLLMENT_COUNTS_FOLDER, filename) - return load_data_from_gcs(str(path)) + return load_data_from_gcs(os.path.join(ENROLLMENT_COUNTS_FOLDER, "enrollment_funnel_v1_latest.json")) def get_results_metrics_map( From 179ac679435b35b8e98d07ba17f7a6884d2ca355 Mon Sep 17 00:00:00 2001 From: Yashika Khurana Date: Wed, 27 May 2026 11:43:31 -0400 Subject: [PATCH 6/7] chore(nimbus): limit monitoring data save to only updated fields (EXP-6872) --- experimenter/experimenter/jetstream/tasks.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/experimenter/experimenter/jetstream/tasks.py b/experimenter/experimenter/jetstream/tasks.py index d4782c08e..2dc1b0396 100644 --- a/experimenter/experimenter/jetstream/tasks.py +++ b/experimenter/experimenter/jetstream/tasks.py @@ -170,7 +170,9 @@ def fetch_monitoring_data(): if experiment.monitoring_data != merged: experiment.monitoring_data = merged experiment.monitoring_data_updated_at = timezone.now() - experiment.save() + experiment.save( + update_fields=["monitoring_data", "monitoring_data_updated_at"] + ) generate_nimbus_changelog( experiment, get_kinto_user(), From 37629bcccae25f0828482038e9ae8267aea27ad0 Mon Sep 17 00:00:00 2001 From: Yashika Khurana Date: Wed, 27 May 2026 11:46:47 -0400 Subject: [PATCH 7/7] chore(nimbus): fix get_enrollment_funnel_data to use Path consistently with get_monitoring_data (EXP-6872) --- experimenter/experimenter/jetstream/client.py | 4 +++- experimenter/experimenter/jetstream/tests/test_tasks.py | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/experimenter/experimenter/jetstream/client.py b/experimenter/experimenter/jetstream/client.py index 456510a2a..a1e32cca0 100644 --- a/experimenter/experimenter/jetstream/client.py +++ b/experimenter/experimenter/jetstream/client.py @@ -106,7 +106,9 @@ def get_monitoring_data(): def get_enrollment_funnel_data(): - return load_data_from_gcs(os.path.join(ENROLLMENT_COUNTS_FOLDER, "enrollment_funnel_v1_latest.json")) + filename = "enrollment_funnel_v1_latest.json" + path = Path(ENROLLMENT_COUNTS_FOLDER, filename) + return load_data_from_gcs(str(path)) def get_results_metrics_map( diff --git a/experimenter/experimenter/jetstream/tests/test_tasks.py b/experimenter/experimenter/jetstream/tests/test_tasks.py index b387726de..d7c62959f 100644 --- a/experimenter/experimenter/jetstream/tests/test_tasks.py +++ b/experimenter/experimenter/jetstream/tests/test_tasks.py @@ -3478,10 +3478,12 @@ def setUp(self): self.mock_get_monitoring_data = patcher.start() self.addCleanup(patcher.stop) - self._funnel_patcher = patch("experimenter.jetstream.tasks.get_enrollment_funnel_data") + self._funnel_patcher = patch( + "experimenter.jetstream.tasks.get_enrollment_funnel_data" + ) self.mock_get_funnel_data = self._funnel_patcher.start() self.mock_get_funnel_data.return_value = {"v1": {}} - + def tearDown(self): self._funnel_patcher.stop()