Skip to content

Commit c82e8e2

Browse files
vishali-mpVishali M P
andauthored
perf(file): read only header row instead of entire CSV in has_valid_headers (#1235)
* perf(file): fast append and schema migration for CSV output - has_valid_headers() reads only first row instead of entire CSV - Append mode with matching headers: direct append via to_csv(mode='a') - On schema mismatch: merge old data with new schema instead of backup (union of columns, preserves old data with NaN for missing) - dropna(axis=1, how='all') only in append path to preserve new file columns * remove unintended changes --------- Co-authored-by: Vishali M P <vishali@Mac.lan>
1 parent abfdd8c commit c82e8e2

3 files changed

Lines changed: 2001 additions & 2961 deletions

File tree

codecarbon/output_methods/file.py

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,12 @@ def has_valid_headers(self, data: EmissionsData) -> bool:
6262
True if the file has valid headers, False otherwise.
6363
"""
6464
with open(self.save_file_path) as csv_file:
65-
csv_reader = csv.DictReader(csv_file)
66-
csv_entries_list = list(csv_reader)
67-
if len(csv_entries_list) == 0:
68-
# No entries
65+
reader = csv.reader(csv_file)
66+
try:
67+
headers = next(reader)
68+
except StopIteration:
6969
return True
70-
dict_from_csv = dict(csv_entries_list[0])
71-
list_of_column_names = sorted(dict_from_csv.keys())
72-
return sorted(data.values.keys()) == list_of_column_names
70+
return sorted(headers) == sorted(data.values.keys())
7371

7472
def out(self, total: EmissionsData, _):
7573
"""
@@ -78,9 +76,8 @@ def out(self, total: EmissionsData, _):
7876
* If the file does not exist, then create it.
7977
* If the file already exists but has invalid headers, then back it up and replace with new data.
8078
* If the file already exists and has valid headers:
81-
* If it has no rows with a matching run ID, append the new data.
82-
* If it has one row with a matching run ID, then replace that row with the new data.
83-
* If it has > one row with a matching run ID, append the new data
79+
* In "append" mode, append the new row directly.
80+
* In "update" mode, deduplicate by run_id.
8481
8582
Args:
8683
total: data to save.
@@ -93,19 +90,20 @@ def out(self, total: EmissionsData, _):
9390
f"File {self.save_file_path} exists but is empty. Treating as new file."
9491
)
9592
file_exists = False
96-
if file_exists and not self.has_valid_headers(total):
93+
94+
headers_match = file_exists and self.has_valid_headers(total)
95+
if file_exists and not headers_match:
9796
logger.warning("The CSV format has changed, backing up old emission file.")
9897
backup(self.save_file_path)
9998
file_exists = False
99+
100100
new_df = pd.DataFrame.from_records([dict(total.values)])
101+
101102
if not file_exists:
102-
df = new_df
103+
new_df.to_csv(self.save_file_path, index=False)
103104
elif self.on_csv_write == "append":
104-
df = pd.read_csv(self.save_file_path)
105-
# Filter out empty or all-NA columns only from new_df, to avoid warnings from Pandas,
106-
# see https://github.com/pandas-dev/pandas/issues/55928
107105
new_df = new_df.dropna(axis=1, how="all")
108-
df = pd.concat([df, new_df])
106+
new_df.to_csv(self.save_file_path, mode="a", header=False, index=False)
109107
else:
110108
df = pd.read_csv(self.save_file_path)
111109
df_run = df.loc[df.run_id == total.run_id]
@@ -121,13 +119,11 @@ def out(self, total: EmissionsData, _):
121119
else:
122120
update_values = {}
123121
for col, val in dict(total.values).items():
124-
# Explicitly cast new values to prevent warnings about incompatible dtypes.
125122
update_values[col] = df[col].dtype.type(val)
126123
df.loc[df.run_id == total.run_id, update_values.keys()] = (
127124
update_values.values()
128125
)
129-
130-
df.to_csv(self.save_file_path, index=False)
126+
df.to_csv(self.save_file_path, index=False)
131127

132128
def task_out(self, data: List[TaskEmissionsData], experiment_name: str):
133129
"""

tests/output_methods/test_file.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,3 +375,41 @@ def test_file_output_task_out(self):
375375
self.assertTrue(os.path.exists(expected_file))
376376
df = pd.read_csv(expected_file)
377377
self.assertEqual(len(df), 1)
378+
379+
def test_fast_append_with_matching_headers(self):
380+
file_output = FileOutput("test.csv", self.temp_dir, on_csv_write="append")
381+
file_output.out(self.emissions_data, None)
382+
file_output.out(self.emissions_data, None)
383+
file_output.out(self.emissions_data, None)
384+
385+
df = pd.read_csv(os.path.join(self.temp_dir, "test.csv"))
386+
self.assertEqual(len(df), 3)
387+
388+
def test_schema_migration_creates_backup(self):
389+
file_output = FileOutput("test.csv", self.temp_dir, on_csv_write="append")
390+
file_output.out(self.emissions_data, None)
391+
392+
path = os.path.join(self.temp_dir, "test.csv")
393+
df_old = pd.read_csv(path)
394+
df_old.rename(columns={"cpu_model": "old_cpu_model"}, inplace=True)
395+
df_old.to_csv(path, index=False)
396+
397+
self.assertFalse(file_output.has_valid_headers(self.emissions_data))
398+
399+
file_output.out(self.emissions_data, None)
400+
401+
df_bak = pd.read_csv(path + ".bak")
402+
self.assertEqual(len(df_bak), 1)
403+
self.assertIn("old_cpu_model", df_bak.columns)
404+
405+
df = pd.read_csv(path)
406+
self.assertEqual(len(df), 1)
407+
self.assertIn("cpu_model", df.columns)
408+
409+
def test_out_append_large_file_fast_path(self):
410+
file_output = FileOutput("test.csv", self.temp_dir, on_csv_write="append")
411+
file_output.out(self.emissions_data, None)
412+
file_output.out(self.emissions_data, None)
413+
414+
df = pd.read_csv(os.path.join(self.temp_dir, "test.csv"))
415+
self.assertEqual(len(df), 2)

0 commit comments

Comments
 (0)