Skip to content

Commit 2c44e1b

Browse files
authored
feat(bench): Added a separation between opensre logic and benchmarking framework (Tracer-Cloud#2745)
* separation between opensre and benchmarking framework
1 parent f5f8d07 commit 2c44e1b

24 files changed

Lines changed: 1020 additions & 169 deletions

app/agent/investigation.py

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,33 @@
7777
class ConnectedInvestigationAgent:
7878
"""ReAct loop scoped to the tools enabled by connected integrations."""
7979

80+
def _should_accept_conclusion(
81+
self,
82+
*,
83+
evidence_count: int, # noqa: ARG002 — used by overrides
84+
iteration: int, # noqa: ARG002 — used by overrides
85+
) -> tuple[bool, str | None]:
86+
"""Hook: decide what to do when the LLM stops requesting tools.
87+
88+
Returns ``(accept_conclusion, nudge)``:
89+
- ``(True, None)`` — accept the LLM's choice, exit the loop. Default.
90+
- ``(False, "...")`` — reject the bail, inject the nudge string as a
91+
user message, continue the loop. ``MAX_INVESTIGATION_LOOPS`` still
92+
caps the worst case so a stubborn model can't infinite-loop.
93+
94+
**Contract:** ``(False, None)`` is invalid and raises ``ValueError`` at
95+
the call site. Rejecting the conclusion without providing a nudge
96+
would spin the loop on an unchanged message history until the outer
97+
iteration cap, silently burning the token budget. The type system
98+
allows ``str | None`` so subclasses can use a single return type;
99+
the runtime guard enforces the actual contract.
100+
101+
Default returns ``(True, None)`` — production agents accept whatever
102+
the LLM decides. Subclasses can override to enforce minimum-evidence
103+
floors, structured-stage progression, or other termination policies.
104+
"""
105+
return True, None
106+
80107
def run(
81108
self,
82109
state: dict[str, Any],
@@ -203,8 +230,28 @@ def _record_tool_end(tc: ToolCall, output: Any) -> None:
203230
messages.append(_build_assistant_msg(llm, response))
204231

205232
if not response.has_tool_calls:
206-
logger.debug("[agent] no tool calls — done after %d iterations", iteration + 1)
207-
break
233+
accept, nudge = self._should_accept_conclusion(
234+
evidence_count=len(evidence_entries),
235+
iteration=iteration,
236+
)
237+
if accept:
238+
logger.debug("[agent] no tool calls — done after %d iterations", iteration + 1)
239+
break
240+
# Contract: rejecting the conclusion (accept=False) MUST
241+
# come with a nudge so the next LLM call sees new context.
242+
# Without one the loop would spin on an unchanged message
243+
# history until MAX_INVESTIGATION_LOOPS, silently burning
244+
# the entire token budget without making progress. Failing
245+
# fast keeps buggy hook overrides loud rather than expensive.
246+
if nudge is None:
247+
raise ValueError(
248+
f"{type(self).__name__}._should_accept_conclusion returned "
249+
"(False, None) — a nudge string is required when rejecting "
250+
"the conclusion, otherwise the LLM will loop on an unchanged "
251+
"message history until MAX_INVESTIGATION_LOOPS."
252+
)
253+
messages.append({"role": "user", "content": nudge})
254+
continue
208255

209256
# Emit tool_start for each pending call before executing
210257
for tc in response.tool_calls:
@@ -347,8 +394,8 @@ def _enforce_context_budget(
347394
348395
No-op on the happy path: the estimate covers messages + system + tools
349396
in one pass and returns under the ceiling for normal investigations.
350-
Only fires on long CloudOpsBench cases where unbounded tool history
351-
has pushed the prompt past the model's limit.
397+
Only fires on long investigations where unbounded tool history has
398+
pushed the prompt past the model's limit.
352399
"""
353400
while _estimate_message_tokens(messages, system=system, tools=tools) > _TOKEN_BUDGET_CEILING:
354401
if not _trim_oldest_tool_pair(messages):

app/agent/llm_invoke_errors.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ def classify_llm_invoke_failure(exc: BaseException) -> LLMInvokeFailure | None:
6868
6969
Returns ``None`` to signal the caller should re-raise. In particular,
7070
:class:`LLMCreditExhaustedError` is intentionally NOT classified — it
71-
represents a non-recoverable billing condition that the bench runner
72-
(and production agent) must halt on, not wrap into a degraded result.
71+
represents a non-recoverable billing condition that callers must halt
72+
on, not wrap into a degraded result.
7373
"""
7474
from app.integrations.llm_cli.errors import (
7575
CLIAuthenticationRequired,

app/pipeline/pipeline.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,16 @@
44

55
import logging
66
from datetime import UTC, datetime
7-
from typing import Any, cast
7+
from typing import TYPE_CHECKING, Any, cast
88

99
from app.state import AgentState
1010

11+
if TYPE_CHECKING:
12+
# Type-only import — avoids paying the agent module's heavy import cost
13+
# at pipeline load while still letting static type-checkers validate
14+
# ``agent_class`` injections.
15+
from app.agent.investigation import ConnectedInvestigationAgent
16+
1117
logger = logging.getLogger(__name__)
1218

1319

@@ -133,11 +139,20 @@ def log_query(query: str, window: dict[str, Any]) -> dict[str, Any]:
133139
return {"configurable": {"upstream_evidence_provider": provider}}
134140

135141

136-
def run_connected_investigation(state: AgentState) -> AgentState:
142+
def run_connected_investigation(
143+
state: AgentState,
144+
*,
145+
agent_class: type[ConnectedInvestigationAgent] | None = None,
146+
) -> AgentState:
137147
"""Resolve connected integrations → parse alert → agent loop → deliver.
138148
139149
All steps mutate a shared state dict. Each step returns a dict of updates
140150
which are merged in. Pure function: inputs in, state out.
151+
152+
``agent_class``: optional override for the investigation agent class.
153+
Defaults to :class:`ConnectedInvestigationAgent`. Callers that need a
154+
custom termination policy, structured-stage progression, or other
155+
agent-level extensions can pass a subclass instead.
141156
"""
142157
from app.agent.context import resolve_integrations
143158
from app.agent.extract import extract_alert
@@ -146,6 +161,7 @@ def run_connected_investigation(state: AgentState) -> AgentState:
146161
from app.delivery import deliver
147162
from app.utils.sentry_sdk import capture_exception
148163

164+
agent_class = agent_class or ConnectedInvestigationAgent
149165
state_any = cast(dict[str, Any], state)
150166

151167
try:
@@ -155,7 +171,7 @@ def run_connected_investigation(state: AgentState) -> AgentState:
155171
if state_any.get("is_noise"):
156172
return cast(AgentState, state_any)
157173

158-
_merge(state_any, ConnectedInvestigationAgent().run(state_any))
174+
_merge(state_any, agent_class().run(state_any))
159175
_merge(
160176
state_any,
161177
node_correlate_upstream(

app/pipeline/runners.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,20 @@
99
import threading
1010
from collections.abc import AsyncIterator, Callable
1111
from dataclasses import dataclass
12-
from typing import Any, cast
12+
from typing import TYPE_CHECKING, Any, cast
1313

1414
from app.remote.stream import StreamEvent
1515
from app.state import AgentState, make_initial_state
1616
from app.types.config import NodeConfig
1717
from app.utils.errors import report_and_reraise
1818
from app.utils.sentry_sdk import init_sentry
1919

20+
if TYPE_CHECKING:
21+
# Type-only — avoids paying the agent module's heavy import cost at
22+
# runner load while still letting static type-checkers validate
23+
# ``agent_class`` injections.
24+
from app.agent.investigation import ConnectedInvestigationAgent
25+
2026
logger = logging.getLogger(__name__)
2127

2228
# Serializes temporary render_report monkeypatches when multiple streaming
@@ -80,6 +86,7 @@ def run_investigation(
8086
openclaw_context: dict[str, Any] | None = None,
8187
opensre_evaluate: bool = False,
8288
investigation_metadata: tuple[str, str, str] | None = None,
89+
agent_class: type[ConnectedInvestigationAgent] | None = None,
8390
) -> AgentState:
8491
"""Run the investigation from a raw alert payload. Pure function: inputs in, state out.
8592
@@ -90,6 +97,10 @@ def run_investigation(
9097
FixtureGrafanaBackend should be injected without real credential resolution.
9198
investigation_metadata: Optional ``(alert_name, pipeline_name, severity)`` for
9299
initial state; avoids copying those fields onto ``raw_alert``.
100+
agent_class: Optional override for the investigation agent class. Defaults
101+
to ``ConnectedInvestigationAgent``. Callers that need a custom
102+
termination policy, structured-stage progression, or other
103+
agent-level extensions can pass a subclass instead.
93104
"""
94105
init_sentry(entrypoint="pipeline")
95106
from app.pipeline.pipeline import run_connected_investigation as _run
@@ -109,7 +120,7 @@ def run_investigation(
109120
message="run_investigation failed",
110121
tags={"surface": "pipeline", "component": "app.pipeline.runners"},
111122
):
112-
return _run(initial)
123+
return _run(initial, agent_class=agent_class)
113124

114125

115126
def run_chat(state: AgentState, _config: NodeConfig | None = None) -> AgentState:

app/services/agent_llm_client.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,10 @@ def _rate_limit_sleep_seconds(err: BaseException, fallback_backoff: float) -> fl
5656
jitter ``Uniform(0, fallback_backoff)`` when no hint is available — same
5757
pattern AWS and the Anthropic/OpenAI SDKs use for transient errors.
5858
59-
Always adds ±10% jitter even on the server hint: with multiple bench
60-
workers, four clients all sleeping for *exactly* the suggested 94ms
61-
would still wake up in lockstep and re-trigger the same TPM bucket.
59+
Always adds ±10% jitter even on the server hint: with multiple
60+
concurrent callers, clients all sleeping for *exactly* the suggested
61+
94ms would still wake up in lockstep and re-trigger the same TPM
62+
bucket.
6263
6364
Logs which branch produced the sleep duration so operators can audit
6465
whether the Retry-After path is actually firing (most OpenAI 429s

app/tools/EKSListClustersTool/__init__.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,6 @@
1616

1717

1818
def _eks_available(sources: dict[str, dict]) -> bool:
19-
# In CloudOpsBench replay mode the EKS surface is served by the case
20-
# snapshot via CloudOpsBenchK8sTools. Exposing the real EKS tools too
21-
# would have the agent attempt sts:AssumeRole against placeholder ARNs
22-
# like arn:aws:iam::placeholder:role/placeholder, which always fails.
23-
backend = (sources.get("eks") or {}).get("_backend")
24-
if getattr(backend, "is_cloudopsbench_backend", False):
25-
return False
2619
return bool(sources.get("eks", {}).get("connection_verified"))
2720

2821

app/tools/registry.py

Lines changed: 69 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import inspect
77
import logging
88
import pkgutil
9+
import threading
910
from functools import lru_cache
1011
from types import ModuleType
1112

@@ -25,6 +26,44 @@
2526
"utils",
2627
}
2728

29+
# Extension point: callers outside ``app.tools.*`` (e.g. test suites,
30+
# external benchmark harnesses, downstream integrators) can register
31+
# additional tool packages by calling
32+
# :func:`register_external_tool_package`. Registered packages are walked
33+
# the same way as :mod:`app.tools` — each top-level submodule is imported
34+
# and any ``@tool``-decorated callables are picked up.
35+
#
36+
# Production stays clean: with no external registrations, the registry
37+
# discovers only ``app.tools.*``. The list is *not* persisted across
38+
# processes — every fresh import of opensre starts with zero externals.
39+
_external_tool_packages: list[ModuleType] = []
40+
_external_registration_lock = threading.Lock()
41+
42+
43+
def register_external_tool_package(package: ModuleType) -> None:
44+
"""Register an additional tool package for registry discovery.
45+
46+
Call before any ``get_registered_tools()`` consumer in the same
47+
process. The registry cache is cleared so the new package's tools
48+
appear on the next lookup.
49+
50+
Idempotent and thread-safe: concurrent callers registering the same
51+
package (e.g. multiple workers in a ``ThreadPoolExecutor`` each
52+
importing a bench package) won't add duplicate entries that would
53+
otherwise produce noisy ``Duplicate tool name`` warnings on every
54+
subsequent registry walk.
55+
56+
Production code does NOT call this — it's a hook for test suites
57+
and external integrators that ship their own tools but want them
58+
routed through opensre's agent loop.
59+
"""
60+
with _external_registration_lock:
61+
if package in _external_tool_packages:
62+
return
63+
_external_tool_packages.append(package)
64+
clear_tool_registry_cache()
65+
66+
2867
# Preserve the current chat surface while the repo migrates toward explicit
2968
# per-tool surface metadata.
3069
_LEGACY_CHAT_TOOL_NAMES = {
@@ -46,9 +85,9 @@
4685
}
4786

4887

49-
def _iter_tool_module_names() -> list[str]:
88+
def _iter_tool_module_names(package: ModuleType) -> list[str]:
5089
module_names: list[str] = []
51-
for module_info in pkgutil.iter_modules(tools_package.__path__):
90+
for module_info in pkgutil.iter_modules(package.__path__):
5291
if module_info.name in _SKIP_MODULE_NAMES:
5392
continue
5493
if module_info.name.startswith("_") or module_info.name.endswith("_test"):
@@ -57,8 +96,8 @@ def _iter_tool_module_names() -> list[str]:
5796
return sorted(module_names)
5897

5998

60-
def _import_tool_module(module_name: str) -> ModuleType:
61-
return importlib.import_module(f"{tools_package.__name__}.{module_name}")
99+
def _import_tool_module(package: ModuleType, module_name: str) -> ModuleType:
100+
return importlib.import_module(f"{package.__name__}.{module_name}")
62101

63102

64103
def _candidate_belongs_to_module(candidate: object, module_name: str) -> bool:
@@ -122,29 +161,35 @@ def _collect_registered_tools_from_module(module: ModuleType) -> list[Registered
122161
def _load_registry_snapshot() -> tuple[RegisteredTool, ...]:
123162
tools_by_name: dict[str, RegisteredTool] = {}
124163

125-
for module_name in _iter_tool_module_names():
126-
try:
127-
module = _import_tool_module(module_name)
128-
except ModuleNotFoundError as exc:
129-
logger.warning("[tools] Skipping %s: %s", module_name, exc)
130-
continue
131-
except Exception as exc:
132-
logger.warning(
133-
"[tools] Skipping %s due to import failure: %s",
134-
module_name,
135-
exc,
136-
exc_info=True,
137-
)
138-
continue
139-
140-
for tool in _collect_registered_tools_from_module(module):
141-
if tool.name in tools_by_name:
164+
# Walk the canonical tools package, then any externally-registered
165+
# packages in the order they were registered. First definition of a
166+
# given tool name wins; duplicates are logged and skipped.
167+
packages: list[ModuleType] = [tools_package, *_external_tool_packages]
168+
for package in packages:
169+
for module_name in _iter_tool_module_names(package):
170+
try:
171+
module = _import_tool_module(package, module_name)
172+
except ModuleNotFoundError as exc:
173+
logger.warning("[tools] Skipping %s.%s: %s", package.__name__, module_name, exc)
174+
continue
175+
except Exception as exc:
142176
logger.warning(
143-
"[tools] Duplicate tool name '%s' across modules; keeping first definition",
144-
tool.name,
177+
"[tools] Skipping %s.%s due to import failure: %s",
178+
package.__name__,
179+
module_name,
180+
exc,
181+
exc_info=True,
145182
)
146183
continue
147-
tools_by_name[tool.name] = tool
184+
185+
for tool in _collect_registered_tools_from_module(module):
186+
if tool.name in tools_by_name:
187+
logger.warning(
188+
"[tools] Duplicate tool name '%s' across modules; keeping first definition",
189+
tool.name,
190+
)
191+
continue
192+
tools_by_name[tool.name] = tool
148193

149194
return tuple(sorted(tools_by_name.values(), key=lambda tool: tool.name))
150195

app/tools/utils/availability.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,14 @@ def eks_available_or_backend(sources: dict[str, dict]) -> bool:
2020
support continue to use the narrower check in
2121
``app.tools.EKSListClustersTool._eks_available``.
2222
23-
Exception: in CloudOpsBench replay mode the EKS surface is served by the
24-
case snapshot via CloudOpsBenchK8sTools (GetResources, GetClusterConfiguration,
25-
etc.). The CloudOpsBenchReplayBackend does not implement the EKS tool API
26-
(list_pods, get_pod_logs, ...), so exposing these EKS tools would have the
27-
agent call methods that don't exist on the backend.
23+
The ``_backend`` slot is reserved for fixture backends that implement
24+
the EKS tool API (``list_pods``, ``get_pod_logs``, ...). Other backend
25+
types that speak different protocols should be placed in their own
26+
distinct source slots and are invisible to this check — the real EKS
27+
tools stay deactivated for those modes.
2828
"""
2929
eks = sources.get("eks", {})
30-
backend = eks.get("_backend")
31-
if getattr(backend, "is_cloudopsbench_backend", False):
32-
return False
33-
return bool(eks.get("connection_verified") or backend)
30+
return bool(eks.get("connection_verified") or eks.get("_backend"))
3431

3532

3633
def datadog_available_or_backend(sources: dict[str, dict]) -> bool:

0 commit comments

Comments
 (0)