Skip to content

Commit ee0be10

Browse files
committed
Wire ARC TCKDB upload sweep
1 parent c4fe12a commit ee0be10

3 files changed

Lines changed: 401 additions & 4 deletions

File tree

ARC.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
from arc.common import read_yaml_file
1313
from arc.main import ARC
14+
from arc.tckdb.config import TCKDBConfig
15+
from arc.tckdb.sweep import run_upload_sweep
1416

1517

1618
def parse_command_line_arguments(command_line_args=None):
@@ -59,8 +61,32 @@ def main():
5961
input_dict['verbose'] = input_dict['verbose'] if 'verbose' in input_dict else verbose
6062
if 'project_directory' not in input_dict or not input_dict['project_directory']:
6163
input_dict['project_directory'] = project_directory
64+
65+
tckdb_config = TCKDBConfig.from_dict(input_dict.pop('tckdb', None))
66+
6267
arc_object = ARC(**input_dict)
63-
arc_object.execute()
68+
arc_object.tckdb_config = tckdb_config
69+
if tckdb_config is not None:
70+
print(f'TCKDB integration enabled: {tckdb_config.base_url}')
71+
72+
# Persistent SSH pool lives for the duration of the run; close it
73+
# explicitly on every exit path (success, error, ctrl-C) so we don't
74+
# leave paramiko Transports orphaned. Lazily instantiated on first
75+
# remote-queue job, so this is a no-op for fully-local runs.
76+
try:
77+
arc_object.execute()
78+
79+
if tckdb_config is not None:
80+
from arc.tckdb.adapter import TCKDBAdapter
81+
adapter = TCKDBAdapter(tckdb_config, project_directory=arc_object.project_directory)
82+
run_upload_sweep(
83+
adapter=adapter,
84+
project_directory=arc_object.project_directory,
85+
tckdb_config=tckdb_config,
86+
)
87+
finally:
88+
from arc.job.ssh_pool import reset_default_pool
89+
reset_default_pool()
6490

6591

6692
if __name__ == '__main__':

ARC_test.py

Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
"""Tests for the ARC.py end-of-run TCKDB upload sweep dispatcher.
2+
3+
These tests focus on the wiring between ``tckdb.upload_mode`` and the
4+
adapter method that gets called per species. They use a stub adapter so
5+
no network or live ARC objects are required.
6+
"""
7+
8+
import os
9+
import shutil
10+
import tempfile
11+
import unittest
12+
from pathlib import Path
13+
from types import SimpleNamespace
14+
15+
import yaml
16+
17+
from arc.tckdb.adapter import UploadOutcome
18+
from arc.tckdb.config import TCKDBConfig
19+
from arc.tckdb.sweep import _resolve_artifact_path, run_upload_sweep
20+
21+
22+
# --------------------------------------------------------------------------
23+
# Test doubles
24+
# --------------------------------------------------------------------------
25+
26+
27+
class _StubAdapter:
28+
"""Records which adapter method was called per species, no network."""
29+
30+
def __init__(self, *, conformer_outcome=None, bundle_outcome=None,
31+
conformer_raises=None, bundle_raises=None):
32+
self.conformer_calls = []
33+
self.bundle_calls = []
34+
self.artifact_calls = []
35+
self._conformer_outcome = conformer_outcome
36+
self._bundle_outcome = bundle_outcome
37+
self._conformer_raises = conformer_raises
38+
self._bundle_raises = bundle_raises
39+
40+
def submit_from_output(self, *, output_doc, species_record):
41+
self.conformer_calls.append(species_record.get("label"))
42+
if self._conformer_raises is not None:
43+
raise self._conformer_raises
44+
return self._conformer_outcome
45+
46+
def submit_computed_species_from_output(self, *, output_doc, species_record):
47+
self.bundle_calls.append(species_record.get("label"))
48+
if self._bundle_raises is not None:
49+
raise self._bundle_raises
50+
return self._bundle_outcome
51+
52+
def submit_artifacts_for_calculation(self, **kwargs):
53+
self.artifact_calls.append(kwargs)
54+
return None
55+
56+
57+
def _outcome(status, *, label="ethanol", error=None,
58+
primary=None, additional=None):
59+
"""Build a stand-in UploadOutcome with the fields the sweep reads."""
60+
return UploadOutcome(
61+
status=status,
62+
payload_path=Path(f"/tmp/{label}.payload.json"),
63+
sidecar_path=Path(f"/tmp/{label}.meta.json"),
64+
idempotency_key=f"arc:test:{label}:k:abc1234567890def",
65+
error=error,
66+
primary_calculation=primary,
67+
additional_calculations=additional or [],
68+
)
69+
70+
71+
# --------------------------------------------------------------------------
72+
# Fixtures
73+
# --------------------------------------------------------------------------
74+
75+
76+
def _write_output_yml(project_dir: str, *, species_labels=("CCO",), with_ts=False):
77+
"""Write a minimal ``output.yml`` matching what the sweep reads."""
78+
out_dir = os.path.join(project_dir, "output")
79+
os.makedirs(out_dir, exist_ok=True)
80+
doc = {
81+
"schema_version": "1.0",
82+
"project": "test_project",
83+
"arc_version": "0.0.0",
84+
"opt_level": {"method": "wb97xd", "basis": "def2-tzvp", "software": "gaussian"},
85+
"species": [
86+
{
87+
"label": label,
88+
"smiles": "CCO",
89+
"charge": 0,
90+
"multiplicity": 1,
91+
"is_ts": False,
92+
"converged": True,
93+
"xyz": "C 0.0 0.0 0.0\nH 1.0 0.0 0.0",
94+
"opt_n_steps": 12,
95+
"opt_final_energy_hartree": -154.0,
96+
"ess_versions": {"opt": "Gaussian 16, Revision A.03"},
97+
}
98+
for label in species_labels
99+
],
100+
"transition_states": [
101+
{"label": "TS0", "is_ts": True, "converged": True}
102+
] if with_ts else [],
103+
}
104+
with open(os.path.join(out_dir, "output.yml"), "w") as f:
105+
yaml.safe_dump(doc, f)
106+
return doc
107+
108+
109+
# --------------------------------------------------------------------------
110+
# Dispatch behavior
111+
# --------------------------------------------------------------------------
112+
113+
114+
class TestRunTckdbUploadSweepDispatch(unittest.TestCase):
115+
"""Wiring tests: which adapter method gets called per upload_mode."""
116+
117+
def setUp(self):
118+
self.tmp = tempfile.mkdtemp(prefix="arc-sweep-test-")
119+
self.addCleanup(shutil.rmtree, self.tmp, ignore_errors=True)
120+
_write_output_yml(self.tmp)
121+
self.arc_object = SimpleNamespace(project_directory=self.tmp)
122+
123+
def _cfg(self, **overrides):
124+
defaults = dict(
125+
enabled=True,
126+
base_url="http://localhost:8000/api/v1",
127+
api_key_env="X_TCKDB_API_KEY",
128+
)
129+
defaults.update(overrides)
130+
return TCKDBConfig(**defaults)
131+
132+
# ---------------- 1: missing upload_mode → conformer (default)
133+
def test_default_mode_uses_legacy_conformer_path(self):
134+
cfg = self._cfg() # upload_mode defaults to "conformer"
135+
adapter = _StubAdapter(conformer_outcome=_outcome("uploaded"))
136+
run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg)
137+
self.assertEqual(adapter.conformer_calls, ["CCO"])
138+
self.assertEqual(adapter.bundle_calls, [])
139+
140+
# ---------------- 2: explicit conformer
141+
def test_explicit_conformer_mode_uses_legacy_path(self):
142+
cfg = self._cfg(upload_mode="conformer")
143+
adapter = _StubAdapter(conformer_outcome=_outcome("uploaded"))
144+
run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg)
145+
self.assertEqual(adapter.conformer_calls, ["CCO"])
146+
self.assertEqual(adapter.bundle_calls, [])
147+
148+
# ---------------- 3: computed_species → bundle path
149+
def test_computed_species_mode_dispatches_bundle(self):
150+
cfg = self._cfg(upload_mode="computed_species")
151+
adapter = _StubAdapter(bundle_outcome=_outcome("uploaded"))
152+
run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg)
153+
self.assertEqual(adapter.bundle_calls, ["CCO"])
154+
self.assertEqual(adapter.conformer_calls, [])
155+
156+
# ---------------- 4: bundle path never calls legacy
157+
def test_computed_species_does_not_call_legacy_submit(self):
158+
# Multiple species so we'd notice any leak across iterations.
159+
_write_output_yml(self.tmp, species_labels=("CCO", "CO", "CC"))
160+
cfg = self._cfg(upload_mode="computed_species")
161+
adapter = _StubAdapter(bundle_outcome=_outcome("uploaded"))
162+
run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg)
163+
self.assertEqual(adapter.bundle_calls, ["CCO", "CO", "CC"])
164+
self.assertEqual(adapter.conformer_calls, [])
165+
# And no per-artifact sweep call: bundles inline artifacts.
166+
self.assertEqual(adapter.artifact_calls, [])
167+
168+
# ---------------- 5: failure in bundle mode is recorded; sweep continues
169+
def test_computed_species_failure_continues_to_next_species(self):
170+
_write_output_yml(self.tmp, species_labels=("CCO", "CO"))
171+
cfg = self._cfg(upload_mode="computed_species")
172+
# First species: outcome with status=failed (non-strict path).
173+
# Second species: outcome with status=uploaded.
174+
# We achieve "different per call" by mutating the stub's outcome
175+
# mid-sweep, since the stub returns the same outcome each call by
176+
# default. Use a side-effect via a wrapper instead.
177+
outcomes = iter([
178+
_outcome("failed", label="CCO", error="HTTP 503"),
179+
_outcome("uploaded", label="CO"),
180+
])
181+
adapter = _StubAdapter()
182+
adapter.submit_computed_species_from_output = (
183+
lambda *, output_doc, species_record: (
184+
adapter.bundle_calls.append(species_record.get("label"))
185+
or next(outcomes)
186+
)
187+
)
188+
run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg)
189+
# Both species processed; first failed, second uploaded.
190+
self.assertEqual(adapter.bundle_calls, ["CCO", "CO"])
191+
192+
# ---------------- 5b: an unhandled exception in bundle mode is caught
193+
def test_computed_species_exception_is_caught_and_logged(self):
194+
_write_output_yml(self.tmp, species_labels=("CCO", "CO"))
195+
cfg = self._cfg(upload_mode="computed_species")
196+
# Simulate an unhandled exception on the FIRST species; second
197+
# should still be attempted (matches conformer-mode behavior).
198+
call_log = []
199+
def fake_submit(*, output_doc, species_record):
200+
label = species_record.get("label")
201+
call_log.append(label)
202+
if label == "CCO":
203+
raise RuntimeError("boom")
204+
return _outcome("uploaded", label=label)
205+
adapter = _StubAdapter()
206+
adapter.submit_computed_species_from_output = fake_submit
207+
run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg)
208+
self.assertEqual(call_log, ["CCO", "CO"])
209+
210+
# ---------------- 6: sidecar written before live upload failure (bundle)
211+
def test_bundle_mode_sidecar_written_before_upload_failure(self):
212+
# This is fundamentally an adapter-level guarantee, but we verify
213+
# the wiring preserves it: a "failed" outcome carrying real
214+
# payload_path and sidecar_path values means the sweep still
215+
# passes those upward to the user.
216+
cfg = self._cfg(upload_mode="computed_species")
217+
sentinel_payload = Path("/tmp/sentinel.payload.json")
218+
sentinel_sidecar = Path("/tmp/sentinel.meta.json")
219+
outcome = UploadOutcome(
220+
status="failed",
221+
payload_path=sentinel_payload,
222+
sidecar_path=sentinel_sidecar,
223+
idempotency_key="arc:t:CCO:c:abc1234567890def",
224+
error="HTTP 503",
225+
)
226+
adapter = _StubAdapter(bundle_outcome=outcome)
227+
# Capture stdout to confirm the failure summary is printed
228+
# (don't assert on exact text — assert on key tokens).
229+
with unittest.mock.patch("builtins.print") as mock_print:
230+
run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg)
231+
printed = "\n".join(str(c.args[0]) for c in mock_print.call_args_list)
232+
self.assertIn("computed-species bundle", printed)
233+
self.assertIn("failed: 1", printed)
234+
self.assertIn("HTTP 503", printed)
235+
236+
237+
# --------------------------------------------------------------------------
238+
# Summary-print mode awareness
239+
# --------------------------------------------------------------------------
240+
241+
242+
class TestSweepSummaryByMode(unittest.TestCase):
243+
"""The summary line names the mode; bundle mode omits the artifact line."""
244+
245+
def setUp(self):
246+
self.tmp = tempfile.mkdtemp(prefix="arc-sweep-summary-")
247+
self.addCleanup(shutil.rmtree, self.tmp, ignore_errors=True)
248+
_write_output_yml(self.tmp)
249+
self.arc_object = SimpleNamespace(project_directory=self.tmp)
250+
251+
def _run_with_mode(self, *, upload_mode, artifacts_upload=False):
252+
from arc.tckdb.config import TCKDBArtifactConfig
253+
cfg = TCKDBConfig(
254+
enabled=True, base_url="http://x", api_key_env="X",
255+
upload_mode=upload_mode,
256+
artifacts=TCKDBArtifactConfig(upload=artifacts_upload),
257+
)
258+
adapter = _StubAdapter(
259+
conformer_outcome=_outcome("uploaded"),
260+
bundle_outcome=_outcome("uploaded"),
261+
)
262+
with unittest.mock.patch("builtins.print") as mock_print:
263+
run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg)
264+
return "\n".join(str(c.args[0]) for c in mock_print.call_args_list)
265+
266+
def test_conformer_mode_summary_says_conformer(self):
267+
out = self._run_with_mode(upload_mode="conformer")
268+
self.assertIn("conformer/calculation", out)
269+
self.assertNotIn("computed-species bundle", out)
270+
271+
def test_bundle_mode_summary_says_bundle(self):
272+
out = self._run_with_mode(upload_mode="computed_species")
273+
self.assertIn("computed-species bundle", out)
274+
self.assertNotIn("conformer/calculation", out)
275+
276+
def test_bundle_mode_omits_artifact_line_even_when_enabled(self):
277+
# Inline artifacts mean the standalone artifact tally would mislead.
278+
out = self._run_with_mode(upload_mode="computed_species", artifacts_upload=True)
279+
self.assertNotIn("artifacts: uploaded", out)
280+
281+
def test_conformer_mode_emits_artifact_line_when_enabled(self):
282+
out = self._run_with_mode(upload_mode="conformer", artifacts_upload=True)
283+
self.assertIn("artifacts:", out)
284+
285+
286+
# --------------------------------------------------------------------------
287+
# _resolve_artifact_path: prefer recorded <calc>_input over derivation
288+
# --------------------------------------------------------------------------
289+
290+
291+
class TestResolveArtifactPath(unittest.TestCase):
292+
"""The legacy artifact sweep prefers ``output.yml``'s ``<calc>_input``
293+
field, falling back to settings-based derivation only when absent."""
294+
295+
def test_input_kind_prefers_recorded_field(self):
296+
"""When ``opt_input`` is on the record, it wins over the derived path."""
297+
species_record = {
298+
"opt_log": "calcs/CH4/opt/input.log",
299+
"opt_input": "calcs/CH4/opt/explicit_input.gjf", # NEW field
300+
}
301+
output_doc = {"opt_level": {"software": "gaussian"}}
302+
path = _resolve_artifact_path(
303+
kind="input", calc_type="opt",
304+
species_record=species_record, output_doc=output_doc,
305+
)
306+
self.assertEqual(path, "calcs/CH4/opt/explicit_input.gjf")
307+
308+
def test_input_kind_falls_back_to_settings_when_field_absent(self):
309+
"""Older output.yml without ``<calc>_input`` still resolves via settings."""
310+
species_record = {"opt_log": "/abs/calcs/CH4/opt/input.log"}
311+
output_doc = {"opt_level": {"software": "gaussian"}}
312+
path = _resolve_artifact_path(
313+
kind="input", calc_type="opt",
314+
species_record=species_record, output_doc=output_doc,
315+
)
316+
# Derived sibling: input.gjf next to the log.
317+
self.assertEqual(path, "/abs/calcs/CH4/opt/input.gjf")
318+
319+
def test_input_kind_falls_back_when_recorded_field_is_none(self):
320+
"""Explicit ``None`` in the record (deck wasn't kept) → fallback."""
321+
species_record = {
322+
"opt_log": "/abs/calcs/CH4/opt/input.log",
323+
"opt_input": None,
324+
}
325+
output_doc = {"opt_level": {"software": "gaussian"}}
326+
path = _resolve_artifact_path(
327+
kind="input", calc_type="opt",
328+
species_record=species_record, output_doc=output_doc,
329+
)
330+
self.assertEqual(path, "/abs/calcs/CH4/opt/input.gjf")
331+
332+
def test_input_kind_per_job_picks_correct_recorded_field(self):
333+
"""Different calcs hit different ``<calc>_input`` fields, not all opt's."""
334+
species_record = {
335+
"opt_log": "/abs/opt.log", "opt_input": "/abs/opt_deck.gjf",
336+
"freq_log": "/abs/freq.log", "freq_input": "/abs/freq_deck.gjf",
337+
"sp_log": "/abs/sp.log", "sp_input": "/abs/sp_deck.in", # cross-software run
338+
}
339+
output_doc = {"opt_level": {"software": "gaussian"}}
340+
for calc, expected in (
341+
("opt", "/abs/opt_deck.gjf"),
342+
("freq", "/abs/freq_deck.gjf"),
343+
("sp", "/abs/sp_deck.in"), # NOT input.gjf — sp uses its own software
344+
):
345+
path = _resolve_artifact_path(
346+
kind="input", calc_type=calc,
347+
species_record=species_record, output_doc=output_doc,
348+
)
349+
self.assertEqual(path, expected,
350+
msg=f"{calc}: expected {expected}, got {path}")
351+
352+
353+
if __name__ == "__main__":
354+
unittest.main()

0 commit comments

Comments
 (0)