Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from ldai import log
from ldai.providers import AgentResult, AgentRunner, ToolRegistry
from ldai.providers import RunnerResult, ToolRegistry
from ldai.providers.runner import Runner
from ldai.providers.types import LDAIMetrics

from ldai_openai.openai_helper import (
get_ai_usage_from_response,
get_tool_calls_from_run_items,
is_agent_tool_instance,
registry_value_to_agent_tool,
)


class OpenAIAgentRunner(AgentRunner):
class OpenAIAgentRunner(Runner):
"""
CAUTION:
This feature is experimental and should NOT be considered ready for production use.
It may change or be removed without notice and is not subject to backwards
compatibility guarantees.

AgentRunner implementation for OpenAI.
Runner implementation for a single OpenAI agent.

Executes a single agent using the OpenAI Agents SDK (``openai-agents``).
Tool calling and the agentic loop are handled internally by ``Runner.run``.
Returned by OpenAIRunnerFactory.create_agent(config, tools).
Returned by ``OpenAIRunnerFactory.create_agent(config, tools)``.

Implements the unified :class:`~ldai.providers.runner.Runner` protocol.
Requires ``openai-agents`` to be installed.
"""

Expand All @@ -39,16 +43,24 @@ def __init__(
self._instructions = instructions
self._tool_definitions = tool_definitions
self._tools = tools
self._tool_name_map: Dict[str, str] = {}

async def run(self, input: Any) -> AgentResult:
async def run(
self,
input: Any,
output_type: Optional[Dict[str, Any]] = None,
) -> RunnerResult:
"""
Run the agent with the given input string.
Run the agent with the given input.

Delegates to the OpenAI Agents SDK ``Runner.run``, which handles the
tool-calling loop internally.

:param input: The user prompt or input to the agent
:return: AgentResult with output, raw response, and aggregated metrics
:param output_type: Reserved for future structured output support;
currently ignored.
:return: :class:`RunnerResult` with ``content``, ``raw`` response, and
metrics including aggregated token usage and observed ``tool_calls``.
Comment thread
cursor[bot] marked this conversation as resolved.
"""
try:
from agents import Agent, Runner
Expand All @@ -57,7 +69,10 @@ async def run(self, input: Any) -> AgentResult:
"openai-agents is required for OpenAIAgentRunner. "
"Install it with: pip install openai-agents"
)
return AgentResult(output="", raw=None, metrics=LDAIMetrics(success=False, usage=None))
return RunnerResult(
content="",
metrics=LDAIMetrics(success=False, usage=None),
)

try:
agent_tools = self._build_agent_tools()
Expand All @@ -73,21 +88,38 @@ async def run(self, input: Any) -> AgentResult:

result = await Runner.run(agent, str(input), max_turns=25)

return AgentResult(
output=str(result.final_output),
raw=result,
tool_calls = [
ld_name
for _agent_name, tool_fn_name in get_tool_calls_from_run_items(result.new_items)
for ld_name in [self._tool_name_map.get(tool_fn_name)]
if ld_name is not None
]
Comment thread
cursor[bot] marked this conversation as resolved.

return RunnerResult(
content=str(result.final_output),
metrics=LDAIMetrics(
success=True,
usage=get_ai_usage_from_response(result),
tool_calls=tool_calls if tool_calls else None,
),
raw=result,
)
except Exception as error:
log.warning(f"OpenAI agent run failed: {error}")
return AgentResult(output="", raw=None, metrics=LDAIMetrics(success=False, usage=None))
return RunnerResult(
content="",
metrics=LDAIMetrics(success=False, usage=None),
)

def _build_agent_tools(self) -> List[Any]:
"""Build tool instances from LD tool definitions and registry."""
"""Build tool instances from LD tool definitions and registry.

Also populates ``self._tool_name_map`` so observed tool-call names
from the runtime can be translated back to their LD config keys for
metric reporting.
"""
tools = []
self._tool_name_map = {}
for td in self._tool_definitions:
if not isinstance(td, dict):
continue
Expand All @@ -97,6 +129,14 @@ def _build_agent_tools(self) -> List[Any]:

tool_fn = self._tools.get(name)
if tool_fn:
# Map runtime tool name → LD config key for metrics (function __name__
# for callables; identity for native tool instances — see get_tool_calls_from_run_items).
if is_agent_tool_instance(tool_fn):
self._tool_name_map[tool_fn.name] = name
else:
fn_name = getattr(tool_fn, '__name__', None)
if fn_name:
self._tool_name_map[fn_name] = name
tools.append(registry_value_to_agent_tool(tool_fn))
continue

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import json
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from ldai import LDMessage, log
from ldai.providers.model_runner import ModelRunner
from ldai.providers.types import LDAIMetrics, ModelResponse, StructuredResponse
from ldai.providers.runner import Runner
from ldai.providers.types import LDAIMetrics, RunnerResult
from openai import AsyncOpenAI

from ldai_openai.openai_helper import (
Expand All @@ -12,12 +12,15 @@
)


class OpenAIModelRunner(ModelRunner):
class OpenAIModelRunner(Runner):
"""
ModelRunner implementation for OpenAI.
Runner implementation for OpenAI chat completions.

Holds a fully-configured AsyncOpenAI client, model name, and parameters.
Returned by OpenAIConnector.create_model(config).
Returned by ``OpenAIRunnerFactory.create_model(config)``.

Implements the unified :class:`~ldai.providers.runner.Runner` protocol via
:meth:`run`.
"""

def __init__(
Expand All @@ -30,13 +33,38 @@ def __init__(
self._model_name = model_name
self._parameters = parameters

async def invoke_model(self, messages: List[LDMessage]) -> ModelResponse:
Comment thread
cursor[bot] marked this conversation as resolved.
async def run(
self,
input: Any,
output_type: Optional[Dict[str, Any]] = None,
) -> RunnerResult:
"""
Invoke the OpenAI model with an array of messages.

:param messages: Array of LDMessage objects representing the conversation
:return: ModelResponse containing the model's response and metrics
Run the OpenAI model with the given input.

:param input: A string prompt or a list of :class:`LDMessage` objects
:param output_type: Optional JSON schema dict requesting structured output.
When provided, ``parsed`` on the returned :class:`RunnerResult` is
populated with the parsed JSON document.
:return: :class:`RunnerResult` containing ``content``, ``metrics``,
``raw`` and (when ``output_type`` is set) ``parsed``.
"""
messages = self._coerce_input(input)

if output_type is not None:
return await self._run_structured(messages, output_type)
return await self._run_completion(messages)
Comment thread
cursor[bot] marked this conversation as resolved.

@staticmethod
def _coerce_input(input: Any) -> List[LDMessage]:
if isinstance(input, str):
return [LDMessage(role='user', content=input)]
if isinstance(input, list):
return input
raise TypeError(
f"Unsupported input type for OpenAIModelRunner.run: {type(input).__name__}"
)

async def _run_completion(self, messages: List[LDMessage]) -> RunnerResult:
try:
response = await self._client.chat.completions.create(
model=self._model_name,
Expand All @@ -45,40 +73,29 @@ async def invoke_model(self, messages: List[LDMessage]) -> ModelResponse:
)

metrics = get_ai_metrics_from_response(response)

content = ''
if response.choices and len(response.choices) > 0:
message = response.choices[0].message
if message and message.content:
content = message.content
content = self._extract_content(response)

if not content:
log.warning('OpenAI response has no content available')
metrics = LDAIMetrics(success=False, usage=metrics.usage)
return RunnerResult(
content='',
metrics=LDAIMetrics(success=False, usage=metrics.usage),
raw=response,
)

return ModelResponse(
message=LDMessage(role='assistant', content=content),
metrics=metrics,
)
return RunnerResult(content=content, metrics=metrics, raw=response)
except Exception as error:
log.warning(f'OpenAI model invocation failed: {error}')
return ModelResponse(
message=LDMessage(role='assistant', content=''),
return RunnerResult(
content='',
metrics=LDAIMetrics(success=False, usage=None),
)

async def invoke_structured_model(
async def _run_structured(
self,
messages: List[LDMessage],
response_structure: Dict[str, Any],
) -> StructuredResponse:
"""
Invoke the OpenAI model with structured output support.

:param messages: Array of LDMessage objects representing the conversation
:param response_structure: Dictionary defining the JSON schema for output structure
:return: StructuredResponse containing the structured data
"""
output_type: Dict[str, Any],
) -> RunnerResult:
try:
response = await self._client.chat.completions.create(
model=self._model_name,
Expand All @@ -87,43 +104,50 @@ async def invoke_structured_model(
'type': 'json_schema',
'json_schema': {
'name': 'structured_output',
'schema': response_structure,
'schema': output_type,
'strict': True,
},
},
**self._parameters,
)

metrics = get_ai_metrics_from_response(response)

content = ''
if response.choices and len(response.choices) > 0:
message = response.choices[0].message
if message and message.content:
content = message.content
content = self._extract_content(response)

if not content:
log.warning('OpenAI structured response has no content available')
return StructuredResponse(
data={},
raw_response='',
return RunnerResult(
content='',
metrics=LDAIMetrics(success=False, usage=metrics.usage),
raw=response,
)

try:
data = json.loads(content)
return StructuredResponse(data=data, raw_response=content, metrics=metrics)
parsed = json.loads(content)
return RunnerResult(
content=content,
metrics=metrics,
raw=response,
parsed=parsed,
)
except json.JSONDecodeError as parse_error:
log.warning(f'OpenAI structured response contains invalid JSON: {parse_error}')
return StructuredResponse(
data={},
raw_response=content,
return RunnerResult(
content=content,
metrics=LDAIMetrics(success=False, usage=metrics.usage),
raw=response,
)
except Exception as error:
log.warning(f'OpenAI structured model invocation failed: {error}')
return StructuredResponse(
data={},
raw_response='',
return RunnerResult(
content='',
metrics=LDAIMetrics(success=False, usage=None),
)

@staticmethod
def _extract_content(response: Any) -> str:
if response.choices and len(response.choices) > 0:
message = response.choices[0].message
if message and message.content:
return message.content
return ''
Loading
Loading