Skip to content

Commit a23590f

Browse files
Isolate the OpenAI Agents SDK, simplify compactor wiring
- Engine builds one AsyncOpenAI and installs the SDK default with use_for_tracing=False at run start; closed in stream_engine_async's outer finally so connection-pool lifecycle is deterministic. - Compaction is a pure async def compact(*, client, compaction_model, item) -> str. build_compactor / CompactorFactory / Compactor type alias are gone; AgentContext.compact_old_items takes the client directly and calls compact() with self.compaction_model. - OpenAiAgentRunner takes the client (not a compactor); the runner_protocol seam is deleted from production code (tests monkeypatch via the SDK seam). - SynthesisTool requires its injected client; lazy construction is gone. - Tests: shared probe_kit helpers updated; integration tests cover client lifecycle (normal exit + early consumer break); compaction tests monkeypatch agent_context.compact instead of build_compactor; example probes switched to duck-typed fakes and post-state assertions. Closes INF-2933 items 8 and 9.
1 parent f5699a8 commit a23590f

28 files changed

Lines changed: 654 additions & 616 deletions

engine/agents/agent_context.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
from __future__ import annotations
22

3-
from collections.abc import Awaitable, Callable
4-
from typing import TYPE_CHECKING, TypeAlias
3+
from typing import TYPE_CHECKING
4+
5+
from openai import AsyncOpenAI
56

67
from engine.agents.agent_context_items import AgentContextItem
8+
from engine.agents.compactor import compact
79
from engine.agents.prompt_templates import render_root_system_prompt
810
from engine.model_config import ModelConfig
911
from engine.models.messages import AgentMessage
1012

1113
if TYPE_CHECKING:
1214
from engine.engine_config import EngineConfig
1315

14-
Compactor: TypeAlias = Callable[[AgentContextItem], Awaitable[str]]
15-
1616

1717
class AgentContext:
1818
"""One agent's conversation memory, with compaction-aware rendering to AgentMessage.
@@ -101,7 +101,7 @@ def to_messages_array(self) -> list[AgentMessage]:
101101
"""Render stored items into provider-compatible messages, swapping in summaries for compacted items."""
102102
return [_render_item(item) for item in self.items]
103103

104-
async def compact_old_items(self, compactor: "Compactor") -> None:
104+
async def compact_old_items(self, client: AsyncOpenAI) -> None:
105105
"""Compact eligible older items in place using two independent keep-last thresholds.
106106
107107
Text messages and tool turns are tracked separately; tool turns (assistant
@@ -128,7 +128,9 @@ async def compact_old_items(self, compactor: "Compactor") -> None:
128128

129129
for idx in sorted(set(eligible)):
130130
item = self.items[idx]
131-
summary = await compactor(item)
131+
summary = await compact(
132+
client=client, compaction_model=self.compaction_model, item=item
133+
)
132134
self.items[idx] = item.model_copy(
133135
update={"is_compacted": True, "compaction_summary": summary}
134136
)

engine/agents/compactor.py

Lines changed: 24 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,33 @@
11
from __future__ import annotations
22

3-
from collections.abc import Callable
4-
53
from openai import AsyncOpenAI, omit
64

7-
from engine.agents.agent_context import Compactor
85
from engine.agents.agent_context_items import AgentContextItem
9-
from engine.agents.agent_execution import AgentExecution
106
from engine.agents.prompt_templates import COMPACTION_SYSTEM_PROMPT
11-
from engine.engine_config import EngineConfig
12-
13-
CompactorFactory = Callable[[AgentExecution], Compactor]
14-
15-
16-
def build_compactor_factory(
17-
engine_config: EngineConfig,
18-
client: AsyncOpenAI | None = None,
19-
) -> CompactorFactory:
20-
"""Returns a factory that produces a Compactor bound to an OpenAI-compatible client.
21-
22-
The factory takes an AgentExecution (currently unused but reserved for
23-
future per-agent compaction policies) and returns a callable that the
24-
AgentContext can invoke per item it wants compacted. When ``client`` is
25-
not supplied, it is constructed from ``engine_config.model_provider`` so
26-
compaction routes through whichever OpenAI-compatible endpoint the run is
27-
configured for.
28-
"""
29-
openai_client = client
30-
31-
def factory(_execution: AgentExecution) -> Compactor:
32-
async def compact(item: AgentContextItem) -> str:
33-
nonlocal openai_client
34-
if openai_client is None:
35-
openai_client = AsyncOpenAI(
36-
base_url=engine_config.model_provider.base_url,
37-
api_key=engine_config.model_provider.api_key,
38-
default_headers=engine_config.model_provider.default_headers,
39-
)
40-
41-
user_text = _item_as_prompt(item)
42-
# Frontier models (gpt-5.x, claude-opus-4-7+, …) reject
43-
# ``temperature`` as deprecated; only forward it when
44-
# explicitly set on the compaction model.
45-
temperature = (
46-
engine_config.compaction_model.temperature
47-
if engine_config.compaction_model.temperature is not None
48-
else omit
49-
)
50-
response = await openai_client.chat.completions.create(
51-
model=engine_config.compaction_model.name,
52-
messages=[
53-
{"role": "system", "content": COMPACTION_SYSTEM_PROMPT},
54-
{"role": "user", "content": user_text},
55-
],
56-
temperature=temperature,
57-
)
58-
return (response.choices[0].message.content or "").strip()
59-
60-
return compact
61-
62-
return factory
7+
from engine.model_config import ModelConfig
8+
9+
10+
async def compact(
11+
*,
12+
client: AsyncOpenAI,
13+
compaction_model: ModelConfig,
14+
item: AgentContextItem,
15+
) -> str:
16+
"""Summarize one ``AgentContextItem`` via ``client`` using ``compaction_model``."""
17+
user_text = _item_as_prompt(item)
18+
# Frontier models (gpt-5.x, claude-opus-4-7+, …) reject ``temperature``
19+
# as deprecated; only forward it when explicitly set on the compaction
20+
# model.
21+
temperature = compaction_model.temperature if compaction_model.temperature is not None else omit
22+
response = await client.chat.completions.create(
23+
model=compaction_model.name,
24+
messages=[
25+
{"role": "system", "content": COMPACTION_SYSTEM_PROMPT},
26+
{"role": "user", "content": user_text},
27+
],
28+
temperature=temperature,
29+
)
30+
return (response.choices[0].message.content or "").strip()
6331

6432

6533
def _item_as_prompt(item: AgentContextItem) -> str:

engine/agents/engine_run_state.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22

33
from dataclasses import dataclass, field
44

5-
from agents import Runner
5+
from openai import AsyncOpenAI
66

77
from engine.agents.agent_execution import AgentExecution
88
from engine.agents.engine_output_bus import EngineOutputBus
9-
from engine.agents.runner_protocol import RunnerProtocol
109
from engine.engine_config import EngineConfig
1110
from engine.sandbox.sandbox import Sandbox
1211
from engine.traces.trace_store import TraceStore
@@ -17,9 +16,7 @@ class EngineRunState:
1716
"""Shared mutable state for one Engine run.
1817
1918
Holds the singleton TraceStore, output bus, and config, plus lookup tables for
20-
AgentExecutions by ``agent_id`` and by the ``tool_call_id`` that spawned them. The
21-
``runner`` field is a test seam: production uses ``agents.Runner``, probes inject
22-
a fake (see ``RunnerProtocol``).
19+
AgentExecutions by ``agent_id`` and by the ``tool_call_id`` that spawned them.
2320
2421
``sandbox`` is resolved once at run start. ``None`` means the host could not
2522
provide a working sandbox (e.g. Deno not installed or Pyodide wheels could
@@ -31,9 +28,9 @@ class EngineRunState:
3128
output_bus: EngineOutputBus
3229
config: EngineConfig
3330
sandbox: Sandbox | None
31+
openai_client: AsyncOpenAI
3432
executions_by_agent_id: dict[str, AgentExecution] = field(default_factory=dict)
3533
executions_by_tool_call_id: dict[str, AgentExecution] = field(default_factory=dict)
36-
runner: RunnerProtocol = field(default_factory=lambda: Runner)
3734

3835
def register(self, execution: AgentExecution) -> None:
3936
"""Index a newly-created AgentExecution by agent_id, and by tool_call_id when subagent."""

engine/agents/openai_agent_runner.py

Lines changed: 6 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from collections.abc import Awaitable, Callable
55
from typing import Any
66

7-
from agents import set_default_openai_client
87
from openai import (
98
APIConnectionError,
109
APIStatusError,
@@ -13,38 +12,11 @@
1312
RateLimitError,
1413
)
1514

16-
from engine.agents.agent_context import AgentContext, Compactor
15+
from engine.agents.agent_context import AgentContext
1716
from engine.agents.agent_execution import AgentExecution
1817
from engine.agents.engine_output_bus import EngineOutputBus
1918
from engine.agents.openai_event_mapper import OpenAiEventMapper
2019
from engine.errors import EngineAgentExhaustedError, EngineAgentRefusedError
21-
from engine.model_provider_config import ModelProviderConfig
22-
23-
24-
def configure_default_sdk_client(provider: ModelProviderConfig) -> None:
25-
"""Bind the OpenAI Agents SDK's default client to the configured endpoint.
26-
27-
The SDK uses a process-global client, so this is best-effort for callers
28-
running multiple engines in one process. We only override when at least
29-
one of ``base_url`` / ``api_key`` / ``default_headers`` is set; otherwise
30-
the SDK keeps using its env-driven default.
31-
32-
``use_for_tracing=False`` keeps the SDK's tracing exporter on its
33-
default OpenAI path. Without this, redirecting model calls to a non-
34-
OpenAI provider (vLLM, Ollama, OpenRouter, etc.) also redirects
35-
tracing POSTs there — those endpoints don't speak the tracing API,
36-
causing spurious errors or silent trace loss.
37-
"""
38-
if provider.base_url is None and provider.api_key is None and provider.default_headers is None:
39-
return
40-
set_default_openai_client(
41-
AsyncOpenAI(
42-
base_url=provider.base_url,
43-
api_key=provider.api_key,
44-
default_headers=provider.default_headers,
45-
),
46-
use_for_tracing=False,
47-
)
4820

4921

5022
def _is_retriable_llm_error(exc: BaseException) -> bool:
@@ -59,7 +31,6 @@ def _is_retriable_llm_error(exc: BaseException) -> bool:
5931
MAX_CONSECUTIVE_LLM_FAILURES = 10
6032

6133
RunStreamedCallable = Callable[..., Awaitable[Any]]
62-
CompactorFactory = Callable[[AgentExecution], Compactor]
6334
logger = logging.getLogger(__name__)
6435

6536

@@ -76,15 +47,15 @@ class OpenAiAgentRunner:
7647
def __init__(
7748
self,
7849
run_streamed: RunStreamedCallable,
79-
compactor_factory: CompactorFactory,
50+
client: AsyncOpenAI,
8051
event_mapper: OpenAiEventMapper | None = None,
8152
refusal_retries: int = 0,
8253
) -> None:
8354
"""``run_streamed`` is injected so root and subagent paths can supply their own
84-
max_turns and starting agent. ``compactor_factory`` produces a per-execution
85-
compactor bound to whatever model EngineConfig pins for compaction."""
55+
max_turns and starting agent. ``client`` is the per-run AsyncOpenAI used for
56+
compaction calls."""
8657
self._run_streamed = run_streamed
87-
self._compactor_factory = compactor_factory
58+
self._client = client
8859
self._mapper = event_mapper or OpenAiEventMapper()
8960
self._refusal_retries = refusal_retries
9061

@@ -188,7 +159,7 @@ async def run(
188159
)
189160

190161
agent_execution.record_llm_success()
191-
await agent_context.compact_old_items(self._compactor_factory(agent_execution))
162+
await agent_context.compact_old_items(self._client)
192163
return
193164

194165
raise EngineAgentExhaustedError(

engine/agents/runner_protocol.py

Lines changed: 0 additions & 33 deletions
This file was deleted.

0 commit comments

Comments
 (0)