Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 59 additions & 7 deletions codecarbon/output_methods/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,29 @@
class FileOutput(BaseOutput):
"""
Saves experiment artifacts to a file

Attributes:
output_file_name: str, name of file to write to.
output_dir: str, path to directory to write to.
save_file_path: str, path to file to write to.
on_csv_write: str, "append" or "update", whether or not to append or overwrite a file if it exists.
"""

def __init__(
self, output_file_name: str, output_dir: str, on_csv_write: str = "append"
):
"""
Initialize the FileOutput object.

Args:
output_file_name: name of file to write to.
output_dir: path to directory to write to.
on_csv_write: "append" or "update", whether or not to append or overwrite a file if it exists

Raises:
ValueError: If the on_csv_write value is invalid.
OSError: If the output directory does not exist.
"""
if on_csv_write not in {"append", "update"}:
raise ValueError(
f"Unknown `on_csv_write` value: {on_csv_write}"
Expand All @@ -33,7 +51,16 @@ def __init__(
f"Emissions data (if any) will be saved to file {os.path.abspath(self.save_file_path)}"
)

def has_valid_headers(self, data: EmissionsData):
def has_valid_headers(self, data: EmissionsData) -> bool:
"""
Checks self.save_file_path has headers matching those from passed data.

Args:
data: EmissionsData object with valid headers.

Returns:
True if the file has valid headers, False otherwise.
"""
with open(self.save_file_path) as csv_file:
csv_reader = csv.DictReader(csv_file)
csv_entries_list = list(csv_reader)
Expand All @@ -44,11 +71,21 @@ def has_valid_headers(self, data: EmissionsData):
list_of_column_names = list(dict_from_csv.keys())
return list(data.values.keys()) == list_of_column_names

def out(self, total: EmissionsData, delta: EmissionsData):
def out(self, total: EmissionsData, _: EmissionsData):
"""
Save the emissions data to a CSV file.
If the file already exists, append the new data to it.
param `delta` is not used in this method.
Save the emissions data from a whole run to a CSV file.

* If the file does not exist, then create it.
* If the file already exists but has invalid headers, then back it up and replace with new data.
* If the file already exists and has valid headers:
* If it has no rows with a matching run ID, append the new data.
* If it has one row with a matching run ID, then replace that row with the new data.
* If it has > one row with a matching run ID, append the new data

Args:
total: data to save.


"""
file_exists: bool = os.path.isfile(self.save_file_path)
if file_exists and not self.has_valid_headers(total):
Expand All @@ -60,6 +97,10 @@ def out(self, total: EmissionsData, delta: EmissionsData):
df = new_df
elif self.on_csv_write == "append":
df = pd.read_csv(self.save_file_path)
# Filter out empty or all-NA columns, to avoid warnings from Pandas,
# see https://github.com/pandas-dev/pandas/issues/55928
df = df.dropna(axis=1, how="all")
new_df = new_df.dropna(axis=1, how="all")
df = pd.concat([df, new_df])
else:
df = pd.read_csv(self.save_file_path)
Expand All @@ -74,13 +115,22 @@ def out(self, total: EmissionsData, delta: EmissionsData):
)
df = pd.concat([df, new_df])
else:
df.at[df.run_id == total.run_id, total.values.keys()] = (
total.values.values()
update_values = {}
for col, val in dict(total.values).items():
# Explicitly cast new values to prevent warnings about incompatible dtypes.
update_values[col] = df[col].dtype.type(val)
df.loc[df.run_id == total.run_id, update_values.keys()] = (
update_values.values()
)

df.to_csv(self.save_file_path, index=False)

def task_out(self, data: List[TaskEmissionsData], experiment_name: str):
"""
Save the emissions data from a single task in an experiment run to a CSV file.

Does not attempt to backup existing files or prevent ovewritting them.
"""
run_id = data[0].run_id
save_task_file_path = os.path.join(
self.output_dir, "emissions_" + experiment_name + "_" + run_id + ".csv"
Expand All @@ -90,6 +140,8 @@ def task_out(self, data: List[TaskEmissionsData], experiment_name: str):
[dict(data_point.values) for data_point in data]
)
# Filter out empty or all-NA columns, to avoid warnings from Pandas
# see https://github.com/pandas-dev/pandas/issues/55928
df = df.dropna(axis=1, how="all")
new_df = new_df.dropna(axis=1, how="all")
df = pd.concat([df, new_df], ignore_index=True)
df.to_csv(save_task_file_path, index=False)
200 changes: 200 additions & 0 deletions tests/output_methods/test_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import os
import shutil
import tempfile
import unittest
from unittest.mock import MagicMock, patch

import pandas as pd

from codecarbon.output_methods.emissions_data import EmissionsData, TaskEmissionsData
from codecarbon.output_methods.file import FileOutput


class TestFileOutput(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.emissions_data = EmissionsData(
timestamp="2023-01-01T00:00:00",
project_name="test_project",
run_id="test_run_id",
experiment_id="test_experiment_id",
duration=10,
emissions=0.5,
emissions_rate=0.05,
cpu_power=20,
gpu_power=30,
ram_power=5,
cpu_energy=200,
gpu_energy=300,
ram_energy=50,
energy_consumed=550,
water_consumed=0.1,
country_name="Testland",
country_iso_code="TS",
region="Test Region",
cloud_provider="Test Cloud",
cloud_region="test-cloud-1",
os="TestOS",
python_version="3.8",
codecarbon_version="2.0",
cpu_count=4,
cpu_model="Test CPU",
gpu_count=1,
gpu_model="Test GPU",
longitude=0,
latitude=0,
ram_total_size=16,
tracking_mode="machine",
on_cloud="true",
pue=1.5,
wue=0.5,
)

def tearDown(self):
shutil.rmtree(self.temp_dir)

def test_file_output_initialization(self):
FileOutput("test.csv", self.temp_dir)

def test_file_output_initialization_invalid_csv_write_mode(self):
with self.assertRaises(ValueError):
FileOutput("test.csv", self.temp_dir, on_csv_write="invalid_option")

def test_file_output_initialization_invalid_dir(self):
with self.assertRaises(OSError):
FileOutput("test.csv", "/non/existent/dir")

def test_has_valid_headers_success(self):
file_output = FileOutput("test.csv", self.temp_dir)
file_output.out(self.emissions_data, MagicMock())

self.assertTrue(file_output.has_valid_headers(self.emissions_data))

def test_has_valid_headers_failure(self):
file_output = FileOutput("test.csv", self.temp_dir)
file_output.out(self.emissions_data, MagicMock())

df = pd.read_csv(os.path.join(self.temp_dir, "test.csv"))
df.rename(columns={"wue": "new_header"}, inplace=True)
df.to_csv(os.path.join(self.temp_dir, "test.csv"), index=False)

self.assertFalse(file_output.has_valid_headers(self.emissions_data))

@patch("codecarbon.output_methods.file.FileOutput.has_valid_headers")
def test_file_output_out_file_exists_invalid_headers(self, mock_has_valid_headers):
file_output = FileOutput("test.csv", self.temp_dir, on_csv_write="append")
file_output.out(self.emissions_data, MagicMock())

mock_has_valid_headers.return_value = False
file_output.out(self.emissions_data, MagicMock())

df = pd.read_csv(os.path.join(self.temp_dir, "test.csv.bak"))
self.assertEqual(len(df), 1)
df = pd.read_csv(os.path.join(self.temp_dir, "test.csv"))
self.assertEqual(len(df), 1)

def test_file_output_out_update_no_file_exists(self):
file_output = FileOutput("test.csv", self.temp_dir, on_csv_write="update")
file_output.out(self.emissions_data, MagicMock())

df = pd.read_csv(os.path.join(self.temp_dir, "test.csv"))
self.assertEqual(len(df), 1)

def test_file_output_out_append_no_file_exists(self):
file_output = FileOutput("test.csv", self.temp_dir, on_csv_write="append")
file_output.out(self.emissions_data, MagicMock())

df = pd.read_csv(os.path.join(self.temp_dir, "test.csv"))
self.assertEqual(len(df), 1)

def test_file_output_out_append_file_exists(self):
file_output = FileOutput("test.csv", self.temp_dir, on_csv_write="append")
file_output.out(self.emissions_data, MagicMock())
file_output.out(self.emissions_data, MagicMock())

df = pd.read_csv(os.path.join(self.temp_dir, "test.csv"))
self.assertEqual(len(df), 2)

def test_file_output_out_update_file_exists_no_matching_row(self):
file_output = FileOutput("test.csv", self.temp_dir, on_csv_write="update")
file_output.out(self.emissions_data, MagicMock())

updated_emissions_data = self.emissions_data
updated_emissions_data.run_id = "new_test_run_id"
file_output.out(updated_emissions_data, MagicMock())

df = pd.read_csv(os.path.join(self.temp_dir, "test.csv"))
self.assertEqual(len(df), 2)

def test_file_output_out_update_file_exists_multiple_matching_rows(self):
file_output = FileOutput("test.csv", self.temp_dir, on_csv_write="update")
file_output.out(self.emissions_data, MagicMock())

# Manually add a duplicate row to simulate the condition
df = pd.read_csv(os.path.join(self.temp_dir, "test.csv"))
df = pd.concat([df, df])
df.to_csv(os.path.join(self.temp_dir, "test.csv"), index=False)

file_output.out(self.emissions_data, MagicMock())

df = pd.read_csv(os.path.join(self.temp_dir, "test.csv"))
self.assertEqual(len(df), 3)

def test_file_output_out_update_file_exists_one_matchingrows(self):
file_output = FileOutput("test.csv", self.temp_dir, on_csv_write="update")
file_output.out(self.emissions_data, MagicMock())
df = pd.read_csv(os.path.join(self.temp_dir, "test.csv"))
self.assertEqual(df["cpu_power"].iloc[0], 20)

new_data = self.emissions_data
new_data.cpu_power = 2
file_output.out(new_data, MagicMock())
df = pd.read_csv(os.path.join(self.temp_dir, "test.csv"))
self.assertEqual(df["cpu_power"].iloc[0], 2)

def test_file_output_task_out(self):
task_emissions_data = [
TaskEmissionsData(
task_name="test_task",
timestamp="2023-01-01T00:00:00",
project_name="test_project",
run_id="test_run_id",
duration=10,
emissions=0.5,
emissions_rate=0.05,
cpu_power=20,
gpu_power=30,
ram_power=5,
cpu_energy=200,
gpu_energy=300,
ram_energy=50,
energy_consumed=550,
water_consumed=0.1,
country_name="Testland",
country_iso_code="TS",
region="Test Region",
cloud_provider="Test Cloud",
cloud_region="test-cloud-1",
os="TestOS",
python_version="3.8",
codecarbon_version="2.0",
cpu_count=4,
cpu_model="Test CPU",
gpu_count=1,
gpu_model="Test GPU",
longitude=0,
latitude=0,
ram_total_size=16,
tracking_mode="machine",
on_cloud="true",
)
]
file_output = FileOutput("test.csv", self.temp_dir)
file_output.task_out(task_emissions_data, "test_experiment")

expected_file = os.path.join(
self.temp_dir, "emissions_test_experiment_test_run_id.csv"
)
self.assertTrue(os.path.exists(expected_file))
df = pd.read_csv(expected_file)
self.assertEqual(len(df), 1)
Loading