Skip to content

Commit 3af4e56

Browse files
riolocclaude
andcommitted
feat: add SubprocessDriver for CRD-based agent execution
Introduce SubprocessDriver — a new AgentDriver that manages OpenShift Proposal CR lifecycle via oc/kubectl CLI. The driver builds and applies Proposal CRs, polls status conditions until terminal state, handles auto-approval via ProposalApproval resources, and cleans up on completion. Key design decisions: - Works directly with CRD conditions (the stable API contract) instead of replicating the operator's internal DerivePhase() logic - TerminalOutcome enum for driver-level terminal states - TurnData extended with proposal_spec, proposal_status, description, and expected_proposal_status fields (backward-compatible, all Optional) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4c91ba8 commit 3af4e56

3 files changed

Lines changed: 973 additions & 0 deletions

File tree

src/lightspeed_evaluation/core/models/data.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,21 @@ class TurnData(StreamingMetricsMixin):
9999
default=None, description="Path to verify script for script-based evaluation"
100100
)
101101

102+
# Subprocess driver fields
103+
description: Optional[str] = Field(
104+
default=None, description="Human-readable label for reports"
105+
)
106+
proposal_spec: Optional[dict[str, Any]] = Field(
107+
default=None, description="Inline proposal spec for CRD-based agents"
108+
)
109+
expected_proposal_status: Optional[dict[str, Any]] = Field(
110+
default=None,
111+
description="Expected proposal status for assertion metrics",
112+
)
113+
proposal_status: Optional[dict[str, Any]] = Field(
114+
default=None, description="Raw CRD status populated by SubprocessDriver"
115+
)
116+
102117
# Set of turn metrics that don't pass the validation to ignore them later
103118
_invalid_metrics: set[str] = set()
104119

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
"""Subprocess-based agent driver for CRD lifecycle management."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
import logging
7+
import os
8+
import shutil
9+
import subprocess
10+
import time
11+
import uuid
12+
from enum import StrEnum
13+
from typing import Any, Literal, Optional
14+
15+
from pydantic import BaseModel, ConfigDict, Field
16+
17+
from lightspeed_evaluation.core.models import TurnData
18+
from lightspeed_evaluation.core.system.exceptions import ConfigurationError
19+
from lightspeed_evaluation.pipeline.evaluation.driver import AgentDriver
20+
21+
logger = logging.getLogger(__name__)
22+
23+
24+
class TerminalOutcome(StrEnum):
25+
"""Terminal outcomes for a Proposal CR lifecycle.
26+
27+
These are driver-level labels, not CRD API values. The CRD exposes
28+
conditions with statuses that the driver interprets:
29+
30+
- Analyzed: True = analysis succeeded, False = failed, Unknown = in progress
31+
- Executed: True = execution succeeded, False = failed, Unknown = in progress
32+
- Verified: True = verification passed, False = failed, Unknown = in progress
33+
- Denied: True = user denied a step (terminal)
34+
- Escalated: True = escalation complete (terminal), False = failed, Unknown = in progress
35+
36+
Special reason: RetryingExecution (Verified=False triggers retry, not failure).
37+
"""
38+
39+
COMPLETED = "Completed"
40+
FAILED = "Failed"
41+
DENIED = "Denied"
42+
ESCALATED = "Escalated"
43+
44+
45+
CRD_GROUP = "agentic.openshift.io"
46+
CRD_VERSION = "v1alpha1"
47+
CRD_KIND = "Proposal"
48+
CRD_PLURAL = "proposals"
49+
CRD_API_VERSION = f"{CRD_GROUP}/{CRD_VERSION}"
50+
51+
CLI_COMMAND_TIMEOUT = 30
52+
53+
54+
class SubprocessAgentConfig(BaseModel):
55+
"""Configuration for a subprocess-based CRD agent."""
56+
57+
model_config = ConfigDict(extra="forbid")
58+
59+
type: Literal["subprocess"] = "subprocess"
60+
namespace: str
61+
auto_approve: bool = True
62+
cleanup_proposals: bool = True
63+
timeout: int = Field(default=900, gt=0)
64+
poll_interval: int = Field(default=2, gt=0)
65+
66+
67+
class SubprocessDriver(AgentDriver):
68+
"""Driver that manages Proposal CR lifecycle via oc/kubectl CLI."""
69+
70+
def __init__(self, config: dict[str, Any]) -> None:
71+
"""Initialize the subprocess driver."""
72+
super().__init__(config)
73+
self._config = SubprocessAgentConfig.model_validate(config)
74+
self._cli = self._resolve_cli()
75+
76+
def validate_config(self, config: dict[str, Any]) -> None:
77+
"""Validate subprocess driver configuration."""
78+
SubprocessAgentConfig.model_validate(config)
79+
if not shutil.which("oc") and not shutil.which("kubectl"):
80+
raise ConfigurationError("Neither 'oc' nor 'kubectl' found on PATH")
81+
82+
def execute_turn(
83+
self, turn_data: TurnData, conversation_id: Optional[str] = None
84+
) -> tuple[Optional[str], Optional[str]]:
85+
"""Execute a Proposal CR lifecycle for a single turn."""
86+
# Proposal CR lifecycle:
87+
# 1. Build Proposal CR from TurnData fields
88+
# 2. Apply CR to cluster via oc/kubectl
89+
# 3. Poll status — read .status.conditions each interval
90+
# 4. Auto-approve — create ProposalApproval when Analyzed=True
91+
# 5. Terminal detection — break on completed/failed/denied/escalated
92+
# 6. Amend TurnData — set response and proposal_status in-place
93+
# 7. Cleanup — delete Proposal CR if cleanup_proposals enabled
94+
suffix = uuid.uuid4().hex[:8]
95+
cr_name = (
96+
f"eval-{conversation_id}-{suffix}" if conversation_id else f"eval-{suffix}"
97+
)
98+
proposal_spec = turn_data.proposal_spec or {}
99+
manifest = self._build_proposal_cr(turn_data, cr_name)
100+
101+
result = self._apply(manifest)
102+
if result.returncode != 0:
103+
return (
104+
f"Failed to apply Proposal CR: {result.stderr.strip()}",
105+
None,
106+
)
107+
108+
approved = False
109+
outcome: Optional[TerminalOutcome] = None
110+
status_dict: dict[str, Any] = {}
111+
start = time.monotonic()
112+
113+
while time.monotonic() - start < self._config.timeout:
114+
time.sleep(self._config.poll_interval)
115+
status_dict, err = self._get_status(cr_name)
116+
if err:
117+
self._cleanup(cr_name)
118+
return (err, None)
119+
conditions = status_dict.get("conditions", [])
120+
121+
if (
122+
self._config.auto_approve
123+
and not approved
124+
and self._should_approve(conditions)
125+
):
126+
self._apply(self._build_approval_cr(cr_name, proposal_spec))
127+
approved = True
128+
129+
outcome = self._is_terminal(conditions, proposal_spec)
130+
if outcome is not None:
131+
break
132+
else:
133+
self._cleanup(cr_name)
134+
return (
135+
f"Timeout after {self._config.timeout}s for '{cr_name}'",
136+
None,
137+
)
138+
139+
turn_data.response = self._extract_summary(status_dict)
140+
turn_data.proposal_status = status_dict
141+
self._cleanup(cr_name)
142+
143+
if outcome == TerminalOutcome.COMPLETED:
144+
return (None, None)
145+
return (
146+
f"Proposal '{cr_name}' terminated with outcome: {outcome}",
147+
None,
148+
)
149+
150+
@staticmethod
151+
def _resolve_cli() -> str:
152+
"""Resolve oc or kubectl binary path."""
153+
return shutil.which("oc") or shutil.which("kubectl") or ""
154+
155+
def _run_cli(
156+
self,
157+
args: list[str],
158+
stdin: Optional[str] = None,
159+
) -> subprocess.CompletedProcess[str]:
160+
"""Run a CLI command and return the result."""
161+
return subprocess.run(
162+
[self._cli, *args],
163+
input=stdin,
164+
text=True,
165+
capture_output=True,
166+
env=os.environ.copy(),
167+
timeout=CLI_COMMAND_TIMEOUT,
168+
check=False,
169+
)
170+
171+
def _apply(self, manifest: dict[str, Any]) -> subprocess.CompletedProcess[str]:
172+
"""Apply a CR manifest via stdin."""
173+
return self._run_cli(["apply", "-f", "-"], stdin=json.dumps(manifest))
174+
175+
def _get_status(self, cr_name: str) -> tuple[dict[str, Any], Optional[str]]:
176+
"""Get Proposal CR status."""
177+
result = self._run_cli(
178+
[
179+
"get",
180+
CRD_PLURAL,
181+
cr_name,
182+
"-n",
183+
self._config.namespace,
184+
"-o",
185+
"json",
186+
]
187+
)
188+
if result.returncode != 0:
189+
return {}, f"Failed to get status for '{cr_name}': {result.stderr.strip()}"
190+
try:
191+
cr = json.loads(result.stdout)
192+
except json.JSONDecodeError as exc:
193+
return {}, f"Failed to parse status JSON for '{cr_name}': {exc}"
194+
return cr.get("status", {}), None
195+
196+
def _delete(self, cr_name: str) -> None:
197+
"""Delete a Proposal CR."""
198+
self._run_cli(
199+
[
200+
"delete",
201+
CRD_PLURAL,
202+
cr_name,
203+
"-n",
204+
self._config.namespace,
205+
"--ignore-not-found",
206+
]
207+
)
208+
209+
def _cleanup(self, cr_name: str) -> None:
210+
"""Delete the Proposal CR if cleanup is enabled."""
211+
if not self._config.cleanup_proposals:
212+
return
213+
try:
214+
self._delete(cr_name)
215+
except Exception: # pylint: disable=broad-exception-caught
216+
logger.warning("Failed to clean up Proposal CR '%s'", cr_name)
217+
218+
def _build_proposal_cr(self, turn_data: TurnData, cr_name: str) -> dict[str, Any]:
219+
"""Build Proposal CR manifest from TurnData."""
220+
spec: dict[str, Any] = {"request": turn_data.query}
221+
if turn_data.proposal_spec:
222+
spec.update(turn_data.proposal_spec)
223+
spec.setdefault("analysis", {})
224+
return {
225+
"apiVersion": CRD_API_VERSION,
226+
"kind": CRD_KIND,
227+
"metadata": {
228+
"name": cr_name,
229+
"namespace": self._config.namespace,
230+
},
231+
"spec": spec,
232+
}
233+
234+
def _build_approval_cr(
235+
self, cr_name: str, proposal_spec: dict[str, Any]
236+
) -> dict[str, Any]:
237+
"""Build ProposalApproval CR manifest."""
238+
stages: list[dict[str, Any]] = [
239+
{"type": "Analysis", "decision": "Approved"},
240+
]
241+
if "execution" in proposal_spec:
242+
stages.append(
243+
{
244+
"type": "Execution",
245+
"decision": "Approved",
246+
"execution": {"option": 0},
247+
}
248+
)
249+
if "verification" in proposal_spec:
250+
stages.append({"type": "Verification", "decision": "Approved"})
251+
return {
252+
"apiVersion": CRD_API_VERSION,
253+
"kind": "ProposalApproval",
254+
"metadata": {
255+
"name": cr_name,
256+
"namespace": self._config.namespace,
257+
},
258+
"spec": {"stages": stages},
259+
}
260+
261+
@staticmethod
262+
def _should_approve(conditions: list[dict[str, Any]]) -> bool:
263+
"""Check if conditions indicate the proposal is ready for approval."""
264+
by_type = {c["type"]: c for c in conditions}
265+
analyzed = by_type.get("Analyzed")
266+
return analyzed is not None and analyzed.get("status") == "True"
267+
268+
@staticmethod
269+
def _is_terminal(
270+
conditions: list[dict[str, Any]], proposal_spec: dict[str, Any]
271+
) -> Optional[TerminalOutcome]:
272+
"""Check if conditions indicate a terminal state."""
273+
by_type = {c["type"]: c for c in conditions}
274+
if by_type.get("Denied", {}).get("status") == "True":
275+
return TerminalOutcome.DENIED
276+
if by_type.get("Escalated", {}).get("status") == "True":
277+
return TerminalOutcome.ESCALATED
278+
for c in conditions:
279+
if c.get("status") == "False" and c.get("reason") != "RetryingExecution":
280+
return TerminalOutcome.FAILED
281+
if "verification" in proposal_spec:
282+
last = "Verified"
283+
elif "execution" in proposal_spec:
284+
last = "Executed"
285+
else:
286+
last = "Analyzed"
287+
if by_type.get(last, {}).get("status") == "True":
288+
return TerminalOutcome.COMPLETED
289+
return None
290+
291+
@staticmethod
292+
def _extract_summary(status_dict: dict[str, Any]) -> str:
293+
"""Extract a human-readable summary from analysis results."""
294+
conditions = status_dict.get("conditions", [])
295+
messages = [c["message"] for c in conditions if c.get("message")]
296+
return "; ".join(messages) if messages else "No summary available"

0 commit comments

Comments
 (0)