diff --git a/experimenter/experimenter/jetstream/client.py b/experimenter/experimenter/jetstream/client.py index 0289e6689..a1e32cca0 100644 --- a/experimenter/experimenter/jetstream/client.py +++ b/experimenter/experimenter/jetstream/client.py @@ -105,6 +105,12 @@ def get_monitoring_data(): return load_data_from_gcs(str(path)) +def get_enrollment_funnel_data(): + filename = "enrollment_funnel_v1_latest.json" + path = Path(ENROLLMENT_COUNTS_FOLDER, filename) + return load_data_from_gcs(str(path)) + + def get_results_metrics_map( data: JetstreamData, primary_outcome_slugs: list[str], diff --git a/experimenter/experimenter/jetstream/tasks.py b/experimenter/experimenter/jetstream/tasks.py index b1fbe7e2e..2dc1b0396 100644 --- a/experimenter/experimenter/jetstream/tasks.py +++ b/experimenter/experimenter/jetstream/tasks.py @@ -11,6 +11,7 @@ from experimenter.experiments.constants import NimbusConstants from experimenter.experiments.models import NimbusChangeLog, NimbusExperiment from experimenter.jetstream.client import ( + get_enrollment_funnel_data, get_experiment_data, get_monitoring_data, get_population_sizing_data, @@ -144,6 +145,14 @@ def fetch_monitoring_data(): return alert_data = data.get("v1") + + try: + funnel_data = get_enrollment_funnel_data() + funnel_by_slug = funnel_data.get("v1", {}) if funnel_data else {} + except Exception as e: + logger.warning(f"Could not fetch enrollment funnel data: {e}") + funnel_by_slug = {} + updated_count = 0 for exp_slug, monitoring_data in alert_data.items(): @@ -153,11 +162,17 @@ def fetch_monitoring_data(): status=NimbusConstants.Status.LIVE, ) - # Only update if data has changed - if experiment.monitoring_data != monitoring_data: - experiment.monitoring_data = monitoring_data + merged = { + **monitoring_data, + "enrollment_funnel": funnel_by_slug.get(exp_slug, []), + } + + if experiment.monitoring_data != merged: + experiment.monitoring_data = merged experiment.monitoring_data_updated_at = timezone.now() - experiment.save() + experiment.save( + update_fields=["monitoring_data", "monitoring_data_updated_at"] + ) generate_nimbus_changelog( experiment, get_kinto_user(), diff --git a/experimenter/experimenter/jetstream/tests/test_tasks.py b/experimenter/experimenter/jetstream/tests/test_tasks.py index e30df21c0..d7c62959f 100644 --- a/experimenter/experimenter/jetstream/tests/test_tasks.py +++ b/experimenter/experimenter/jetstream/tests/test_tasks.py @@ -13,7 +13,11 @@ from experimenter.experiments.models import NimbusChangeLog, NimbusExperiment from experimenter.experiments.tests.factories import NimbusExperimentFactory from experimenter.jetstream import tasks -from experimenter.jetstream.client import get_data, get_monitoring_data +from experimenter.jetstream.client import ( + get_data, + get_enrollment_funnel_data, + get_monitoring_data, +) from experimenter.jetstream.models import AnalysisWindow, Group from experimenter.jetstream.tests import mock_valid_outcomes from experimenter.jetstream.tests.constants import ( @@ -3441,7 +3445,32 @@ def mock_monitoring_data(request): return data -@pytest.mark.usefixtures("mock_monitoring_data") +@pytest.fixture +def mock_funnel_entries(request): + data = [ + { + "app_name": "firefox_desktop", + "branch": "control", + "status": "Enrolled", + "reason": "Qualified", + "conflict_slug": None, + "client_count": 750000, + }, + { + "app_name": "firefox_desktop", + "branch": None, + "status": "NotEnrolled", + "reason": "NotTargeted", + "conflict_slug": None, + "client_count": 5000000, + }, + ] + if request.instance: + request.instance.funnel_entries = data + return data + + +@pytest.mark.usefixtures("mock_monitoring_data", "mock_funnel_entries") class TestFetchMonitoringDataTask(TestCase): def setUp(self): super().setUp() @@ -3449,6 +3478,15 @@ def setUp(self): self.mock_get_monitoring_data = patcher.start() self.addCleanup(patcher.stop) + self._funnel_patcher = patch( + "experimenter.jetstream.tasks.get_enrollment_funnel_data" + ) + self.mock_get_funnel_data = self._funnel_patcher.start() + self.mock_get_funnel_data.return_value = {"v1": {}} + + def tearDown(self): + self._funnel_patcher.stop() + @parameterized.expand([(None,), ({},)]) def test_fetch_monitoring_data_no_data(self, return_value): self.mock_get_monitoring_data.return_value = return_value @@ -3469,9 +3507,62 @@ def test_fetch_monitoring_data_updates_live_experiment(self): tasks.fetch_monitoring_data() experiment.refresh_from_db() - self.assertEqual(experiment.monitoring_data, self.monitoring_data) + self.assertEqual( + experiment.monitoring_data, + {**self.monitoring_data, "enrollment_funnel": []}, + ) self.assertIsNotNone(experiment.monitoring_data_updated_at) + def test_fetch_monitoring_data_merges_funnel_data(self): + experiment = NimbusExperimentFactory.create( + status=NimbusExperiment.Status.LIVE, + monitoring_data={}, + ) + self.mock_get_monitoring_data.return_value = { + "v1": {experiment.slug: self.monitoring_data} + } + self.mock_get_funnel_data.return_value = { + "v1": {experiment.slug: self.funnel_entries} + } + + tasks.fetch_monitoring_data() + + experiment.refresh_from_db() + self.assertEqual( + experiment.monitoring_data, + {**self.monitoring_data, "enrollment_funnel": self.funnel_entries}, + ) + + def test_fetch_monitoring_data_funnel_defaults_to_empty_list_when_missing(self): + experiment = NimbusExperimentFactory.create( + status=NimbusExperiment.Status.LIVE, + monitoring_data={}, + ) + self.mock_get_monitoring_data.return_value = { + "v1": {experiment.slug: self.monitoring_data} + } + self.mock_get_funnel_data.return_value = {"v1": {}} + + tasks.fetch_monitoring_data() + + experiment.refresh_from_db() + self.assertEqual(experiment.monitoring_data["enrollment_funnel"], []) + + def test_fetch_monitoring_data_continues_if_funnel_fetch_fails(self): + experiment = NimbusExperimentFactory.create( + status=NimbusExperiment.Status.LIVE, + monitoring_data={}, + ) + self.mock_get_monitoring_data.return_value = { + "v1": {experiment.slug: self.monitoring_data} + } + self.mock_get_funnel_data.side_effect = Exception("GCS unavailable") + + tasks.fetch_monitoring_data() + + experiment.refresh_from_db() + self.assertEqual(experiment.monitoring_data["enrollment_funnel"], []) + @parameterized.expand( [ (NimbusExperiment.Status.DRAFT,), @@ -3558,8 +3649,8 @@ def test_fetch_monitoring_data_updates_multiple_experiments(self): exp1.refresh_from_db() exp2.refresh_from_db() - self.assertEqual(exp1.monitoring_data, data1) - self.assertEqual(exp2.monitoring_data, data2) + self.assertEqual(exp1.monitoring_data, {**data1, "enrollment_funnel": []}) + self.assertEqual(exp2.monitoring_data, {**data2, "enrollment_funnel": []}) def test_fetch_monitoring_data_fatal_error(self): self.mock_get_monitoring_data.side_effect = Exception("GCS connection failed") @@ -3586,7 +3677,9 @@ def test_get_monitoring_data_success(self, mock_load): result = get_monitoring_data() self.assertEqual(result, data) - mock_load.assert_called_once() + mock_load.assert_called_once_with( + "enrollment_counts/enrollment_counts_latest.json" + ) @patch("experimenter.jetstream.client.load_data_from_gcs") def test_get_monitoring_data_no_data(self, mock_load): @@ -3598,7 +3691,37 @@ def test_get_monitoring_data_no_data(self, mock_load): @patch("experimenter.jetstream.client.load_data_from_gcs") def test_get_monitoring_data_exception(self, mock_load): - mock_load.side_effect = Exception("GCS error") + mock_load.side_effect = RuntimeError("GCS error") - with self.assertRaises(Exception): + with self.assertRaises(RuntimeError): get_monitoring_data() + + +@pytest.mark.usefixtures("mock_funnel_entries") +class TestGetEnrollmentFunnelData(TestCase): + @patch("experimenter.jetstream.client.load_data_from_gcs") + def test_get_enrollment_funnel_data_success(self, mock_load): + data = {"v1": {"experiment-1": self.funnel_entries}} + mock_load.return_value = data + + result = get_enrollment_funnel_data() + + self.assertEqual(result, data) + mock_load.assert_called_once_with( + "enrollment_counts/enrollment_funnel_v1_latest.json" + ) + + @patch("experimenter.jetstream.client.load_data_from_gcs") + def test_get_enrollment_funnel_data_no_data(self, mock_load): + mock_load.return_value = None + + result = get_enrollment_funnel_data() + + self.assertIsNone(result) + + @patch("experimenter.jetstream.client.load_data_from_gcs") + def test_get_enrollment_funnel_data_exception(self, mock_load): + mock_load.side_effect = RuntimeError("GCS error") + + with self.assertRaises(RuntimeError): + get_enrollment_funnel_data()