Skip to content

Commit eca114e

Browse files
authored
Merge pull request #853 from mlco2/fix/intermittent-zero-task-energy
Fix: Prevent intermittent zero energy reporting for tasks
2 parents e5489e2 + c18ff0f commit eca114e

4 files changed

Lines changed: 222 additions & 11 deletions

File tree

.pre-commit-config.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
repos:
22
- repo: https://github.com/pycqa/isort
3-
rev: 5.13.2
3+
rev: 6.0.1
44
hooks:
55
- id: isort
66
args: ["--filter-files"]
77
- repo: https://github.com/psf/black
8-
rev: 24.8.0
8+
rev: 25.1.0
99
hooks:
1010
- id: black
1111
args: [--safe]
1212
- repo: https://github.com/PyCQA/flake8
13-
rev: 7.1.1
13+
rev: 7.2.0
1414
hooks:
1515
- id: flake8
1616
args: ["--config=.flake8"]

codecarbon/emissions_tracker.py

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ def __init__(
317317
self._task_stop_measurement_values = {}
318318
self._tasks: Dict[str, Task] = {}
319319
self._active_task: Optional[str] = None
320+
self._active_task_emissions_at_start: Optional[EmissionsData] = None
320321

321322
# Tracking mode detection
322323
ressource_tracker = ResourceTracker(self)
@@ -474,7 +475,7 @@ def start_task(self, task_name=None) -> None:
474475
self._scheduler_monitor_power.start()
475476

476477
if self._active_task:
477-
logger.info("A task is already under measure")
478+
logger.warning("A task is already under measure")
478479
return
479480
if not task_name:
480481
task_name = uuid.uuid4().__str__()
@@ -484,8 +485,13 @@ def start_task(self, task_name=None) -> None:
484485
# Read initial energy for hardware
485486
for hardware in self._hardware:
486487
hardware.start()
487-
_ = self._prepare_emissions_data()
488-
_ = self._compute_emissions_delta(_)
488+
prepared_data_for_task_start = self._prepare_emissions_data()
489+
self._active_task_emissions_at_start = dataclasses.replace(
490+
prepared_data_for_task_start
491+
)
492+
# The existing call to _compute_emissions_delta uses the result of _prepare_emissions_data.
493+
# Let's make sure it uses the same one we captured.
494+
self._compute_emissions_delta(prepared_data_for_task_start)
489495

490496
self._tasks.update(
491497
{
@@ -506,20 +512,51 @@ def stop_task(self, task_name: str = None) -> EmissionsData:
506512
self._scheduler_monitor_power.stop()
507513

508514
task_name = task_name if task_name else self._active_task
515+
if self._tasks.get(task_name) is None:
516+
logger.warning("stop_task : No active task to stop.")
517+
return None
509518
self._measure_power_and_energy()
519+
emissions_data = (
520+
self._prepare_emissions_data()
521+
) # This is emissions_data_at_stop
522+
523+
if self._active_task_emissions_at_start is None:
524+
logger.error(
525+
f"Task {task_name}: _active_task_emissions_at_start was None. "
526+
"This indicates an issue, possibly start_task was not called or was corrupted. "
527+
"Reporting zero delta for this task to avoid errors."
528+
)
529+
emissions_data_delta = dataclasses.replace(emissions_data)
530+
# Zero out energy fields for the delta
531+
emissions_data_delta.emissions = 0.0
532+
emissions_data_delta.emissions_rate = 0.0
533+
emissions_data_delta.cpu_energy = 0.0
534+
emissions_data_delta.gpu_energy = 0.0
535+
emissions_data_delta.ram_energy = 0.0
536+
emissions_data_delta.energy_consumed = 0.0
537+
else:
538+
emissions_data_delta = dataclasses.replace(emissions_data)
539+
emissions_data_delta.compute_delta_emission(
540+
self._active_task_emissions_at_start
541+
)
510542

511-
emissions_data = self._prepare_emissions_data()
512-
emissions_data_delta = self._compute_emissions_delta(emissions_data)
543+
# Update global _previous_emissions state using the current totals at task stop.
544+
self._compute_emissions_delta(emissions_data)
513545

514546
task_duration = Time.from_seconds(
515547
time.perf_counter() - self._tasks[task_name].start_time
516548
)
517549

550+
# task_emission_data is the final delta object to be returned and stored
518551
task_emission_data = emissions_data_delta
519-
task_emission_data.duration = task_duration.seconds
552+
task_emission_data.duration = (
553+
task_duration.seconds
554+
) # Set the correct duration for the task
555+
520556
self._tasks[task_name].emissions_data = task_emission_data
521557
self._tasks[task_name].is_active = False
522558
self._active_task = None
559+
self._active_task_emissions_at_start = None # Clear task-specific start data
523560

524561
return task_emission_data
525562

@@ -625,7 +662,8 @@ def _persist_data(
625662

626663
def _prepare_emissions_data(self) -> EmissionsData:
627664
"""
628-
:delta: If 'True', return only the delta comsumption since the last call.
665+
Prepare the emissions data to be sent to the API or written to a file.
666+
:return: EmissionsData object with the total emissions data.
629667
"""
630668
cloud: CloudMetadata = self._get_cloud_metadata()
631669
duration: Time = Time.from_seconds(time.perf_counter() - self._start_time)
@@ -688,9 +726,14 @@ def _prepare_emissions_data(self) -> EmissionsData:
688726
return total_emissions
689727

690728
def _compute_emissions_delta(self, total_emissions: EmissionsData) -> EmissionsData:
691-
delta_emissions: EmissionsData = total_emissions
729+
"""
730+
Compute the delta emissions since the last call to this method.
731+
:param total_emissions: The total emissions data to compute the delta from.
732+
:return: EmissionsData with the delta emissions.
733+
"""
692734
if self._previous_emissions is None:
693735
self._previous_emissions = total_emissions
736+
delta_emissions: EmissionsData = total_emissions
694737
else:
695738
# Create a copy
696739
delta_emissions = dataclasses.replace(total_emissions)

examples/task_loop_same_task.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import time
2+
3+
from codecarbon import EmissionsTracker
4+
5+
tracker = EmissionsTracker(
6+
project_name="ZeroEnergyTestLoop",
7+
measure_power_secs=1, # Or your desired interval
8+
log_level="debug", # Set to debug to get all codecarbon logs + our new ones
9+
)
10+
11+
12+
def busy_task(duration_secs=4):
13+
print(f" Task: Starting busy work for ~{duration_secs} seconds...")
14+
start_time = time.perf_counter()
15+
while time.perf_counter() - start_time < duration_secs:
16+
# Simulate some CPU work
17+
# for _ in range(100000): # Adjust complexity as needed
18+
# pass
19+
time.sleep(2)
20+
end_time = time.perf_counter()
21+
print(f" Task: Finished busy work in {end_time - start_time:.2f} seconds.")
22+
23+
24+
max_rounds = 20 # Safety break for the loop
25+
26+
print("Starting tracking loop. Will stop if energy_consumed is 0.0 for a task.")
27+
28+
try:
29+
for current_round in range(max_rounds):
30+
print(f"Round {current_round + 1}:")
31+
task_name = f"round_{current_round + 1}_task"
32+
33+
tracker.start_task(task_name)
34+
print(f" Tracker: Started task '{task_name}'")
35+
36+
busy_task(duration_secs=1) # Simulate work for about 1 second
37+
38+
emissions_data = tracker.stop_task()
39+
print(f" Tracker: Stopped task '{task_name}'")
40+
41+
if emissions_data:
42+
print(f" EmissionsData for {task_name}:")
43+
print(f" Duration: {emissions_data.duration:.4f}s")
44+
print(f" CPU Energy: {emissions_data.cpu_energy:.6f} kWh")
45+
print(f" GPU Energy: {emissions_data.gpu_energy:.6f} kWh")
46+
print(f" RAM Energy: {emissions_data.ram_energy:.6f} kWh")
47+
print(
48+
f" Total Energy Consumed: {emissions_data.energy_consumed:.6f} kWh"
49+
)
50+
print(f" Emissions: {emissions_data.emissions:.6f} kg CO2eq")
51+
52+
if emissions_data.energy_consumed == 0.0:
53+
print("###########################################################")
54+
print(
55+
f"INFO: energy_consumed is 0.0 in round {current_round + 1}. Stopping loop."
56+
)
57+
print("###########################################################")
58+
break
59+
else:
60+
print(f" WARNING: tracker.stop_task() returned None for {task_name}")
61+
62+
# Small pause between rounds, can be adjusted or removed
63+
time.sleep(1)
64+
65+
else: # Executed if the loop completes without break
66+
print(
67+
f"Loop completed {max_rounds} rounds without encountering zero energy consumption."
68+
)
69+
70+
except Exception as e:
71+
print(f"An error occurred: {e}")
72+
finally:
73+
tracker.stop_task()
74+
print("Script finished.")

tests/test_emissions_tracker.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import requests
1111
import responses
1212

13+
from codecarbon.core.units import Energy, Power
1314
from codecarbon.emissions_tracker import (
1415
EmissionsTracker,
1516
OfflineEmissionsTracker,
@@ -407,6 +408,99 @@ def test_carbon_tracker_online_context_manager_TWO_GPU_PRIVATE_INFRA_CANADA(
407408
self.assertIsInstance(tracker.final_emissions, float)
408409
self.assertAlmostEqual(tracker.final_emissions, 6.262572537957655e-05, places=2)
409410

411+
@mock.patch(
412+
"codecarbon.external.ram.RAM.measure_power_and_energy"
413+
) # Corrected path for RAM
414+
@mock.patch(
415+
"codecarbon.external.hardware.CPU.measure_power_and_energy"
416+
) # Path for CPU is likely correct
417+
def test_task_energy_with_live_update_interference(
418+
self,
419+
mock_cpu_measure, # Method decorator (innermost)
420+
mock_ram_measure, # Method decorator (outermost)
421+
mock_setup_intel_cli, # Class decorator (innermost)
422+
mock_log_values, # Class decorator
423+
mocked_env_cloud_details, # Class decorator
424+
mocked_get_gpu_details, # Class decorator
425+
mocked_is_gpu_details_available, # Class decorator (outermost relevant one)
426+
):
427+
# --- Test Setup ---
428+
# Configure mocks to return specific, non-zero energy values
429+
cpu_energy_val_task = 0.0001
430+
ram_energy_val_task = 0.00005
431+
mock_cpu_measure.return_value = (
432+
Power.from_watts(10),
433+
Energy.from_energy(kWh=cpu_energy_val_task),
434+
)
435+
mock_ram_measure.return_value = (
436+
Power.from_watts(5),
437+
Energy.from_energy(kWh=ram_energy_val_task),
438+
)
439+
440+
tracker = EmissionsTracker(
441+
project_name="TestLiveUpdateInterference",
442+
measure_power_secs=1,
443+
api_call_interval=1, # Trigger live update on first opportunity
444+
output_handlers=[], # Clear any default handlers like FileOutput
445+
save_to_file=False, # Ensure no file is created by default
446+
save_to_api=False,
447+
# Config file is mocked by get_custom_mock_open in setUp
448+
)
449+
450+
# --- Test Logic ---
451+
tracker.start_task("my_test_task")
452+
# Simulate some work or time passing if necessary, though energy is mocked.
453+
# time.sleep(0.1) # Not strictly needed due to mocking
454+
455+
task_data = tracker.stop_task()
456+
# In stop_task:
457+
# 1. _measure_power_and_energy() is called MANUALLY.
458+
# - mock_cpu_measure and mock_ram_measure are called.
459+
# - _total_energies get cpu_energy_val_task and ram_energy_val_task added.
460+
# - _measure_occurrence becomes 1.
461+
# - Since api_call_interval is 1, live update path IS triggered if _measure_occurrence >= api_call_interval:
462+
# - _prepare_emissions_data() called (gets totals including task energy).
463+
# - _compute_emissions_delta() called. This updates _previous_emissions.
464+
# 2. Back in stop_task, after _measure_power_and_energy():
465+
# - _prepare_emissions_data() called again (gets same totals).
466+
# - The NEW logic computes delta using _active_task_emissions_at_start.
467+
# - The global _previous_emissions is then updated again using current totals by another _compute_emissions_delta call.
468+
469+
# --- Assertions ---
470+
self.assertIsNotNone(task_data, "Task data should not be None")
471+
472+
self.assertGreater(task_data.cpu_energy, 0, "CPU energy should be non-zero")
473+
self.assertAlmostEqual(
474+
task_data.cpu_energy,
475+
cpu_energy_val_task,
476+
places=7,
477+
msg="CPU energy does not match expected task energy",
478+
)
479+
480+
self.assertGreater(task_data.ram_energy, 0, "RAM energy should be non-zero")
481+
self.assertAlmostEqual(
482+
task_data.ram_energy,
483+
ram_energy_val_task,
484+
places=7,
485+
msg="RAM energy does not match expected task energy",
486+
)
487+
488+
expected_total_energy = cpu_energy_val_task + ram_energy_val_task
489+
self.assertGreater(
490+
task_data.energy_consumed, 0, "Total energy consumed should be non-zero"
491+
)
492+
self.assertAlmostEqual(
493+
task_data.energy_consumed,
494+
expected_total_energy,
495+
places=7,
496+
msg="Total energy consumed does not match sum of components",
497+
)
498+
499+
# Verify mocks were called as expected
500+
# They are called once in _measure_power_and_energy inside stop_task
501+
mock_cpu_measure.assert_called_once()
502+
mock_ram_measure.assert_called_once()
503+
410504
@responses.activate
411505
def test_carbon_tracker_offline_context_manager(
412506
self,

0 commit comments

Comments
 (0)