diff --git a/codecarbon/core/emissions.py b/codecarbon/core/emissions.py index 99426b981..7b26c7c84 100644 --- a/codecarbon/core/emissions.py +++ b/codecarbon/core/emissions.py @@ -25,6 +25,7 @@ def __init__( co2_signal_api_token: Optional[ str ] = None, # Deprecated, for backward compatibility + force_carbon_intensity_g_co2e_kwh: Optional[float] = None, ): self._data_source = data_source @@ -38,6 +39,7 @@ def __init__( electricitymaps_api_token = co2_signal_api_token self._electricitymaps_api_token = electricitymaps_api_token + self._force_carbon_intensity_g_co2e_kwh = force_carbon_intensity_g_co2e_kwh def get_cloud_emissions( self, energy: Energy, cloud: CloudMetadata, geo: GeoMetadata = None @@ -50,6 +52,12 @@ def get_cloud_emissions( :return: CO2 emissions in kg """ + if self._force_carbon_intensity_g_co2e_kwh is not None: + logger.info( + f"Using forced carbon intensity for cloud emissions: {self._force_carbon_intensity_g_co2e_kwh} gCO2e/kWh" + ) + return energy.kWh * (self._force_carbon_intensity_g_co2e_kwh / 1000.0) + df: pd.DataFrame = self._data_source.get_cloud_emissions_data() try: emissions_per_kWh: EmissionsPerKWh = EmissionsPerKWh.from_g_per_kWh( @@ -138,6 +146,12 @@ def get_private_infra_emissions(self, energy: Energy, geo: GeoMetadata) -> float :param geo: Country and region metadata :return: CO2 emissions in kg """ + if self._force_carbon_intensity_g_co2e_kwh is not None: + logger.debug( + f"Using forced carbon intensity for private infrastructure emissions: {self._force_carbon_intensity_g_co2e_kwh} gCO2e/kWh" + ) + return energy.kWh * (self._force_carbon_intensity_g_co2e_kwh / 1000.0) + if self._electricitymaps_api_token: try: emissions = electricitymaps_api.get_emissions( diff --git a/codecarbon/emissions_tracker.py b/codecarbon/emissions_tracker.py index 862eba2b4..ad712c9f0 100644 --- a/codecarbon/emissions_tracker.py +++ b/codecarbon/emissions_tracker.py @@ -195,6 +195,7 @@ def __init__( force_ram_power: Optional[int] = _sentinel, pue: Optional[float] = _sentinel, wue: Optional[float] = _sentinel, + force_carbon_intensity_g_co2e_kwh: Optional[float] = _sentinel, force_mode_cpu_load: Optional[bool] = _sentinel, allow_multiple_runs: Optional[bool] = _sentinel, rapl_include_dram: Optional[bool] = _sentinel, @@ -259,11 +260,13 @@ def __init__( then RAM power (W) = Number of RAM Slots × 5 Watts. :param pue: PUE (Power Usage Effectiveness) of the data center where the experiment is being run. + :param wue: WUE (Water Usage Effectiveness) of the data center. Units of L/kWh: + litres of water consumed per kilowatt-hour of electricity consumed. + :param force_carbon_intensity_g_co2e_kwh: Override grid carbon intensity + in gCO2e/kWh for emissions calculations. :param force_mode_cpu_load: Force the addition of a CPU in MODE_CPU_LOAD :param allow_multiple_runs: Allow multiple CodeCarbon instances on the same machine. Defaults to True since v3 (was False in v2). - :param wue: WUE (Water Usage Effectiveness) of the data center. Units of L/kWh: - litres of water consumed per kilowatt-hour of electricity consumed. :param rapl_include_dram: Include DRAM (memory) power in RAPL measurements on Linux, defaults to False. When True, measures CPU package + DRAM. Only affects systems where RAPL exposes separate DRAM domains. @@ -276,6 +279,31 @@ def __init__( # logger.info("base tracker init") self._external_conf = get_hierarchical_config() + self._set_from_conf( + force_carbon_intensity_g_co2e_kwh, + "force_carbon_intensity_g_co2e_kwh", + None, + float, + ) + parsed_intensity = None + if self._force_carbon_intensity_g_co2e_kwh is not None: + try: + value = float(self._force_carbon_intensity_g_co2e_kwh) + if value >= 0: + parsed_intensity = value + else: + logger.warning( + f"Invalid value for force_carbon_intensity_g_co2e_kwh: '{self._force_carbon_intensity_g_co2e_kwh}'. " + "It must be a non-negative number. Using default calculation methods." + ) + except (ValueError, TypeError): + logger.warning( + f"Invalid value for force_carbon_intensity_g_co2e_kwh: '{self._force_carbon_intensity_g_co2e_kwh}'. " + "It must be a numeric value. Using default calculation methods." + ) + self._force_carbon_intensity_g_co2e_kwh = parsed_intensity + self._conf["force_carbon_intensity_g_co2e_kwh"] = parsed_intensity + self.force_carbon_intensity_g_co2e_kwh = parsed_intensity self._set_from_conf(allow_multiple_runs, "allow_multiple_runs", True, bool) if self._allow_multiple_runs: logger.warning( @@ -353,6 +381,11 @@ def __init__( experiment_id, "experiment_id", "5b0fa12a-3dd7-45bb-9766-cc326314d9f1" ) + if self.force_carbon_intensity_g_co2e_kwh is not None: + logger.info( + f"Using forced carbon intensity: {self.force_carbon_intensity_g_co2e_kwh} gCO2e/kWh." + ) + assert self._tracking_mode in ["machine", "process"] set_logger_level(self._log_level) set_logger_format(self._logger_preamble) @@ -446,7 +479,9 @@ def __init__( self._conf["provider"] = cloud.provider self._emissions: Emissions = Emissions( - self._data_source, self._electricitymaps_api_token + self._data_source, + self._electricitymaps_api_token, + force_carbon_intensity_g_co2e_kwh=self.force_carbon_intensity_g_co2e_kwh, ) self._init_output_methods(api_key=self._api_key) @@ -1310,6 +1345,7 @@ def track_emissions( force_ram_power: Optional[int] = _sentinel, pue: Optional[float] = _sentinel, wue: Optional[float] = _sentinel, + force_carbon_intensity_g_co2e_kwh: Optional[float] = _sentinel, allow_multiple_runs: Optional[bool] = _sentinel, rapl_include_dram: Optional[bool] = _sentinel, rapl_prefer_psys: Optional[bool] = _sentinel, @@ -1392,6 +1428,8 @@ def track_emissions( :param pue: PUE (Power Usage Effectiveness) of the data center. :param wue: WUE (Water Usage Effectiveness) of the data center. Units of L/kWh: litres of water consumed per kilowatt-hour of electricity consumed. + :param force_carbon_intensity_g_co2e_kwh: Override grid carbon intensity + in gCO2e/kWh for emissions calculations. :param rapl_include_dram: Include DRAM in RAPL measurements on Linux (default: False). When True, measures CPU package + DRAM. :param rapl_prefer_psys: Prefer psys over package domains for RAPL on Linux @@ -1447,6 +1485,7 @@ def wrapped_fn(*args, **kwargs): force_ram_power=force_ram_power, pue=pue, wue=wue, + force_carbon_intensity_g_co2e_kwh=force_carbon_intensity_g_co2e_kwh, allow_multiple_runs=allow_multiple_runs, rapl_include_dram=rapl_include_dram, rapl_prefer_psys=rapl_prefer_psys, @@ -1481,6 +1520,7 @@ def wrapped_fn(*args, **kwargs): force_ram_power=force_ram_power, pue=pue, wue=wue, + force_carbon_intensity_g_co2e_kwh=force_carbon_intensity_g_co2e_kwh, allow_multiple_runs=allow_multiple_runs, rapl_include_dram=rapl_include_dram, rapl_prefer_psys=rapl_prefer_psys, diff --git a/examples/pue.py b/examples/pue.py index 1d5fac7c5..63faa64e6 100644 --- a/examples/pue.py +++ b/examples/pue.py @@ -6,6 +6,7 @@ @track_emissions( measure_power_secs=3, pue=2, + force_carbon_intensity_g_co2e_kwh=1, log_level="DEBUG", ) def train_model(): diff --git a/tests/test_config.py b/tests/test_config.py index 181913c6c..ef66f66f8 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -11,7 +11,11 @@ parse_env_config, parse_gpu_ids, ) -from codecarbon.emissions_tracker import EmissionsTracker +from codecarbon.emissions_tracker import ( + EmissionsTracker, + OfflineEmissionsTracker, + track_emissions, +) from codecarbon.external.hardware import GPU from tests.testutils import get_custom_mock_open @@ -199,6 +203,7 @@ def test_full_hierarchy(self): force_ram_power=50.5 output_dir=ERROR:not overwritten save_to_file=ERROR:not overwritten + force_carbon_intensity_g_co2e_kwh=123.4 """ ) local_conf = dedent( @@ -225,9 +230,105 @@ def test_full_hierarchy(self): self.assertEqual(tracker._emissions_endpoint, "http://testhost:2000") self.assertEqual(tracker._gpu_ids, ["0", "1"]) self.assertEqual(tracker._electricitymaps_api_token, "signal-token") + self.assertEqual(tracker.force_carbon_intensity_g_co2e_kwh, 123.4) self.assertEqual(tracker._project_name, "test-project") self.assertTrue(tracker._save_to_file) + def test_force_carbon_intensity_constructor_overrides_config(self): + global_conf = dedent( + """\ + [codecarbon] + force_carbon_intensity_g_co2e_kwh=123.4 + """ + ) + + with patch("builtins.open", new_callable=get_custom_mock_open(global_conf, "")): + with patch("os.path.exists", return_value=True): + tracker = EmissionsTracker( + force_carbon_intensity_g_co2e_kwh=456.7, + save_to_file=False, + allow_multiple_runs=True, + ) + + self.assertEqual(tracker.force_carbon_intensity_g_co2e_kwh, 456.7) + self.assertEqual(tracker._conf["force_carbon_intensity_g_co2e_kwh"], 456.7) + + def test_offline_tracker_accepts_force_carbon_intensity_parameter(self): + with patch("builtins.open", new_callable=get_custom_mock_open("", "")): + with patch("os.path.exists", return_value=True): + tracker = OfflineEmissionsTracker( + country_iso_code="FRA", + force_carbon_intensity_g_co2e_kwh=0, + save_to_file=False, + allow_multiple_runs=True, + ) + + self.assertEqual(tracker.force_carbon_intensity_g_co2e_kwh, 0.0) + + def test_force_carbon_intensity_rejects_negative_parameter(self): + with patch("builtins.open", new_callable=get_custom_mock_open("", "")): + with patch("os.path.exists", return_value=True): + tracker = EmissionsTracker( + force_carbon_intensity_g_co2e_kwh=-1, + save_to_file=False, + allow_multiple_runs=True, + ) + + self.assertIsNone(tracker.force_carbon_intensity_g_co2e_kwh) + self.assertIsNone(tracker._conf["force_carbon_intensity_g_co2e_kwh"]) + + def test_force_carbon_intensity_rejects_non_numeric_parameter(self): + with patch("builtins.open", new_callable=get_custom_mock_open("", "")): + with patch("os.path.exists", return_value=True): + tracker = EmissionsTracker( + force_carbon_intensity_g_co2e_kwh="invalid", + save_to_file=False, + allow_multiple_runs=True, + ) + + self.assertIsNone(tracker.force_carbon_intensity_g_co2e_kwh) + self.assertIsNone(tracker._conf["force_carbon_intensity_g_co2e_kwh"]) + + def test_track_emissions_forwards_force_carbon_intensity_parameter(self): + with patch("codecarbon.emissions_tracker.EmissionsTracker") as tracker_class: + + @track_emissions( + force_carbon_intensity_g_co2e_kwh=321.0, + save_to_file=False, + ) + def tracked_function(): + return "success" + + self.assertEqual(tracked_function(), "success") + + tracker_class.assert_called_once() + self.assertEqual( + tracker_class.call_args.kwargs["force_carbon_intensity_g_co2e_kwh"], + 321.0, + ) + + def test_track_emissions_forwards_force_carbon_intensity_to_offline_tracker(self): + with patch( + "codecarbon.emissions_tracker.OfflineEmissionsTracker" + ) as tracker_class: + + @track_emissions( + offline=True, + country_iso_code="FRA", + force_carbon_intensity_g_co2e_kwh=321.0, + save_to_file=False, + ) + def tracked_function(): + return "success" + + self.assertEqual(tracked_function(), "success") + + tracker_class.assert_called_once() + self.assertEqual( + tracker_class.call_args.kwargs["force_carbon_intensity_g_co2e_kwh"], + 321.0, + ) + @mock.patch.dict( os.environ, { diff --git a/tests/test_emissions.py b/tests/test_emissions.py index 2e57a190c..323b8dc1f 100644 --- a/tests/test_emissions.py +++ b/tests/test_emissions.py @@ -174,6 +174,32 @@ def test_get_emissions_PRIVATE_INFRA_unknown_country(self): assert isinstance(emissions, float) self.assertAlmostEqual(emissions, 0.475, places=2) + @patch("codecarbon.core.electricitymaps_api.get_emissions") + def test_private_infra_uses_forced_intensity_when_set(self, mocked_get_emissions): + emissions_calculator = Emissions( + self._data_source, force_carbon_intensity_g_co2e_kwh=50.0 + ) + + emissions = emissions_calculator.get_private_infra_emissions( + Energy.from_energy(kWh=2), + GeoMetadata(country_iso_code="CAN", country_name="Canada"), + ) + + self.assertAlmostEqual(emissions, 0.1, places=6) + mocked_get_emissions.assert_not_called() + + def test_cloud_uses_forced_intensity_when_set(self): + emissions_calculator = Emissions( + self._data_source, force_carbon_intensity_g_co2e_kwh=100.0 + ) + + emissions = emissions_calculator.get_cloud_emissions( + Energy.from_energy(kWh=2), + CloudMetadata(provider="aws", region="us-east-1"), + ) + + self.assertAlmostEqual(emissions, 0.2, places=6) + def test_get_emissions_PRIVATE_INFRA_NORDIC_REGION(self): # WHEN # Test Nordic region (Sweden SE2)