Skip to content

Commit 6112a7d

Browse files
authored
Implements CSV rotation for job log files (#842)
This was originally an issue the team experienced once in a blue moon where we got OSError because the csvs had become too large. So this feature adds a mechanism to automatically rotate job log CSV files (`initiated_jobs.csv` and `completed_jobs.csv`) once they exceed a predefined maximum number of lines (default 10,000). This prevents these files from growing indefinitely, which can improve performance during file operations and better manage disk space. When rotated, the existing file is renamed with a timestamp suffix, archiving its content, and a new file is then implicitly created for ongoing logs. Includes dedicated unit tests to ensure the rotation logic functions as expected under various conditions. Also incorporates some minor test data refactoring and housekeeping.
2 parents 3ca902b + 5ccf810 commit 6112a7d

5 files changed

Lines changed: 103 additions & 56 deletions

File tree

arc/job/adapter.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,32 @@ def determine_run_time(self):
662662
else:
663663
self.run_time = None
664664

665+
@staticmethod
666+
def _rotate_csv_if_needed(csv_path: str, max_lines: int = 10000, line_count: Optional[int] = None) -> None:
667+
"""
668+
Rotate a CSV file if it reaches or exceeds ``max_lines`` lines (including the header).
669+
The archived file is renamed with a timestamp suffix in the same directory.
670+
A fresh file (with headers) will be created by the caller as needed.
671+
672+
Args:
673+
csv_path (str): The path to the CSV file.
674+
max_lines (int): The maximum number of lines before rotation is triggered.
675+
line_count (int, optional): Pre-computed line count to avoid re-reading the file.
676+
"""
677+
if not os.path.isfile(csv_path):
678+
return
679+
if line_count is None:
680+
with open(csv_path, 'r') as f:
681+
line_count = sum(1 for _ in f)
682+
if line_count >= max_lines:
683+
local_time = datetime.datetime.now().strftime("%d_%m_%y")
684+
base, ext = os.path.splitext(csv_path)
685+
archive_path = f"{base}.old.{local_time}{ext}"
686+
try:
687+
os.rename(csv_path, archive_path)
688+
except FileNotFoundError:
689+
pass # Another process already rotated the file.
690+
665691
def _set_job_number(self):
666692
"""
667693
Used as the entry number in the database, as well as the job name on the server.
@@ -686,12 +712,16 @@ def _set_job_number(self):
686712
writer.writerow(row)
687713
with open(csv_path, 'r') as f:
688714
reader = csv.reader(f, dialect='excel')
715+
line_count = 0
689716
job_num = 0
690717
for _ in reader:
718+
line_count += 1
691719
job_num += 1
692720
if job_num == 100000:
693721
job_num = 0
694722
self.job_num = job_num
723+
# Rotate after counting to avoid a second full read of the file.
724+
self._rotate_csv_if_needed(csv_path, max_lines=10000, line_count=line_count)
695725
# 2. Set other related attributes job_name and job_server_name.
696726
self.job_server_name = self.job_server_name or 'a' + str(self.job_num)
697727
if self.conformer is not None and self.job_name is None:
@@ -728,6 +758,7 @@ def write_completed_job_to_csv_file(self):
728758
self.determine_job_status()
729759
local_arc_path_ = local_arc_path if os.path.isdir(local_arc_path) else ARC_PATH
730760
csv_path = os.path.join(local_arc_path_, 'completed_jobs.csv')
761+
self._rotate_csv_if_needed(csv_path)
731762
if os.path.isfile(csv_path):
732763
# check that this is the updated version
733764
with open(csv_path, 'r') as f:

arc/job/adapter_test.py

Lines changed: 72 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
"""
77

88
import datetime
9+
import glob
910
import math
1011
import os
12+
import tempfile
1113
import time
1214
import shutil
1315
import unittest
@@ -17,7 +19,7 @@
1719

1820
from arc.common import ARC_TESTING_PATH
1921
from arc.imports import settings
20-
from arc.job.adapter import DataPoint, JobEnum, JobTypeEnum, JobExecutionTypeEnum
22+
from arc.job.adapter import DataPoint, JobAdapter, JobEnum, JobTypeEnum, JobExecutionTypeEnum
2123
from arc.job.adapters.gaussian import GaussianAdapter
2224
from arc.level import Level
2325
from arc.species import ARCSpecies
@@ -194,28 +196,34 @@ def setUpClass(cls):
194196
species=[ARCSpecies(label='spc1', xyz=['O 0 0 1'])],
195197
testing=True,
196198
)
199+
# Copy the PBS time limit fixture into the directory structure the adapter expects.
200+
stl_dir = os.path.join(ARC_TESTING_PATH, 'test_JobAdapter_ServerTimeLimit')
201+
err_dest = os.path.join(stl_dir, 'calcs', 'Species', 'spc1', 'opt_101')
202+
os.makedirs(err_dest, exist_ok=True)
203+
shutil.copy(os.path.join(ARC_TESTING_PATH, 'server', 'pbs', 'timelimit', 'err.txt'),
204+
os.path.join(err_dest, 'err.txt'))
197205
cls.job_5 = GaussianAdapter(execution_type='queue',
198-
job_name='spc1',
206+
job_name='opt_101',
199207
job_type='opt',
200208
job_id='123456',
201209
job_num=101,
202-
job_server_name = 'server3',
210+
job_server_name='server3',
203211
level=Level(method='cbs-qb3'),
204212
project='test',
205-
project_directory=os.path.join(ARC_TESTING_PATH, 'test_JobAdapter_ServerTimeLimit'),
213+
project_directory=stl_dir,
206214
species=[ARCSpecies(label='spc1', xyz=['O 0 0 1'])],
207215
server='server3',
208216
testing=True,
209217
)
210218
cls.job_6 = GaussianAdapter(execution_type='queue',
211-
job_name='spc1',
219+
job_name='opt_101',
212220
job_type='opt',
213221
job_id='123456',
214222
job_num=101,
215-
job_server_name = 'server1',
223+
job_server_name='server1',
216224
level=Level(method='cbs-qb3'),
217225
project='test',
218-
project_directory=os.path.join(ARC_TESTING_PATH, 'test_JobAdapter_ServerTimeLimit'),
226+
project_directory=stl_dir,
219227
species=[ARCSpecies(label='spc1', xyz=['O 0 0 1'])],
220228
testing=True,
221229
queue='short_queue',
@@ -471,5 +479,62 @@ def tearDownClass(cls):
471479
shutil.rmtree(os.path.join(ARC_TESTING_PATH, 'test_JobAdapter_ServerTimeLimit'), ignore_errors=True)
472480

473481

482+
class TestRotateCSV(unittest.TestCase):
483+
"""
484+
Contains unit tests for the CSV rotation logic.
485+
"""
486+
487+
def _make_csv(self, path, num_lines):
488+
"""Helper to create a CSV file with a header and ``num_lines - 1`` data rows."""
489+
with open(path, 'w') as f:
490+
f.write('col1,col2\n')
491+
for i in range(num_lines - 1):
492+
f.write(f'{i},data\n')
493+
494+
def test_no_rotation_below_threshold(self):
495+
"""Test that no rotation occurs when the file is below the threshold."""
496+
with tempfile.TemporaryDirectory() as tmp:
497+
csv_path = os.path.join(tmp, 'jobs.csv')
498+
self._make_csv(csv_path, 10)
499+
JobAdapter._rotate_csv_if_needed(csv_path, max_lines=50)
500+
self.assertTrue(os.path.isfile(csv_path))
501+
self.assertEqual(glob.glob(os.path.join(tmp, 'jobs.old.*.csv')), [])
502+
503+
def test_rotation_at_threshold(self):
504+
"""Test that the file is rotated when it reaches the threshold."""
505+
with tempfile.TemporaryDirectory() as tmp:
506+
csv_path = os.path.join(tmp, 'jobs.csv')
507+
self._make_csv(csv_path, 50)
508+
JobAdapter._rotate_csv_if_needed(csv_path, max_lines=50)
509+
self.assertFalse(os.path.isfile(csv_path))
510+
archives = glob.glob(os.path.join(tmp, 'jobs.old.*.csv'))
511+
self.assertEqual(len(archives), 1)
512+
513+
def test_no_error_for_missing_file(self):
514+
"""Test that rotation is a no-op when the file does not exist."""
515+
JobAdapter._rotate_csv_if_needed('/tmp/nonexistent_arc_test.csv')
516+
517+
def test_multiple_rotations(self):
518+
"""Test that multiple rotations produce distinct archive files."""
519+
with tempfile.TemporaryDirectory() as tmp:
520+
csv_path = os.path.join(tmp, 'jobs.csv')
521+
# First rotation on "day 1"
522+
self._make_csv(csv_path, 50)
523+
with patch('arc.job.adapter.datetime') as mock_dt:
524+
mock_dt.datetime.now.return_value = datetime.datetime(2026, 1, 15)
525+
mock_dt.timedelta = datetime.timedelta
526+
JobAdapter._rotate_csv_if_needed(csv_path, max_lines=50)
527+
self.assertFalse(os.path.isfile(csv_path))
528+
# Second rotation on "day 2"
529+
self._make_csv(csv_path, 50)
530+
with patch('arc.job.adapter.datetime') as mock_dt:
531+
mock_dt.datetime.now.return_value = datetime.datetime(2026, 2, 20)
532+
mock_dt.timedelta = datetime.timedelta
533+
JobAdapter._rotate_csv_if_needed(csv_path, max_lines=50)
534+
self.assertFalse(os.path.isfile(csv_path))
535+
archives = glob.glob(os.path.join(tmp, 'jobs.old.*.csv'))
536+
self.assertEqual(len(archives), 2)
537+
538+
474539
if __name__ == '__main__':
475540
unittest.main(testRunner=unittest.TextTestRunner(verbosity=2))

arc/testing/test_JobAdapter_ServerTimeLimit/calcs/Species/spc1/spc1/err.txt renamed to arc/testing/server/pbs/timelimit/err.txt

File renamed without changes.

arc/testing/test_JobAdapter_ServerTimeLimit/calcs/Species/spc1/spc1/input.gjf

Lines changed: 0 additions & 12 deletions
This file was deleted.

arc/testing/test_JobAdapter_ServerTimeLimit/calcs/Species/spc1/spc1/submit.sub

Lines changed: 0 additions & 37 deletions
This file was deleted.

0 commit comments

Comments
 (0)