Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integrations/openrouter/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = ["haystack-ai>=2.22.0"]
dependencies = ["haystack-ai>=2.30.0"]

[project.urls]
Documentation = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/openrouter#readme"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,105 @@
#
# SPDX-License-Identifier: Apache-2.0

import json
from typing import Any

from haystack import component, default_to_dict, logging
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage, StreamingCallbackT
from haystack.components.generators.chat.openai import _check_finish_reason
from haystack.components.generators.utils import _normalize_messages, _serialize_object
from haystack.dataclasses import (
ChatMessage,
ReasoningContent,
StreamingCallbackT,
ToolCall,
select_streaming_callback,
)
from haystack.tools import ToolsType, _check_duplicate_tool_names, flatten_tools_or_toolsets, serialize_tools_or_toolset
from haystack.utils import serialize_callable
from haystack.utils.auth import Secret
from openai.types.chat import ChatCompletion, ParsedChatCompletion
from openai.types.chat.chat_completion import Choice

logger = logging.getLogger(__name__)


def _extract_reasoning(message: Any) -> ReasoningContent | None:
"""Extract reasoning content from an OpenRouter API response message."""
# OpenRouter attaches reasoning content as extra attributes on the standard OpenAI SDK message,
# so we read them with getattr rather than relying on typed fields.
reasoning_text = getattr(message, "reasoning", None) or ""
raw_details = getattr(message, "reasoning_details", None) or []

if not reasoning_text and not raw_details:
return None

details = []
for d in raw_details:
if isinstance(d, dict):
details.append(d)
elif hasattr(d, "model_dump"):
details.append(d.model_dump())
else:
details.append(vars(d))

# Some models only return structured details without a flat `reasoning` string, so we
# reconstruct the text from the known detail types.
if not reasoning_text and details:
parts = []
for d in details:
dtype = d.get("type", "")
if dtype == "reasoning.text":
parts.append(d.get("text", ""))
elif dtype == "reasoning.summary":
parts.append(d.get("summary", ""))
reasoning_text = "".join(parts)

extra = {}
if details:
extra["reasoning_details"] = details

return ReasoningContent(reasoning_text=reasoning_text, extra=extra)


def _convert_openrouter_completion_to_chat_message(
completion: ChatCompletion | ParsedChatCompletion, choice: Choice
) -> ChatMessage:
"""Convert an OpenRouter chat completion to a ChatMessage, including reasoning content."""
message = choice.message
text = message.content
tool_calls = []
if message.tool_calls:
for tc in message.tool_calls:
func = getattr(tc, "function", None)
if func is None:
continue
try:
arguments = json.loads(func.arguments)
tool_calls.append(ToolCall(id=tc.id, tool_name=func.name, arguments=arguments))
except json.JSONDecodeError:
logger.warning(
"OpenRouter returned a malformed JSON string for tool call arguments. "
"Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
_id=tc.id,
_name=func.name,
_arguments=func.arguments,
)

logprobs = _serialize_object(choice.logprobs) if choice.logprobs else None
meta = {
"model": completion.model,
"index": choice.index,
"finish_reason": choice.finish_reason,
"usage": _serialize_object(completion.usage),
}
if logprobs:
meta["logprobs"] = logprobs

reasoning = _extract_reasoning(message)
return ChatMessage.from_assistant(text=text, tool_calls=tool_calls, meta=meta, reasoning=reasoning)


@component
class OpenRouterChatGenerator(OpenAIChatGenerator):
"""
Expand All @@ -26,9 +113,12 @@ class OpenRouterChatGenerator(OpenAIChatGenerator):
parameter in `run` method.

Key Features and Compatibility:
- **Primary Compatibility**: Designed to work seamlessly with the OpenRouter chat completion endpoint.
- **Primary Compatibility**: Compatible with the OpenRouter chat completion endpoint.
- **Streaming Support**: Supports streaming responses from the OpenRouter chat completion endpoint.
- **Customizability**: Supports all parameters supported by the OpenRouter chat completion endpoint.
- **Reasoning Support**: Extracts reasoning/thinking content from models that support it
(e.g., DeepSeek R1, Claude with extended thinking) and stores it in the `ReasoningContent`
field on `ChatMessage`. Reasoning content is only captured for non-streaming requests.

This component uses the ChatMessage format for structuring both input and output,
ensuring coherent and contextually relevant responses in chat-based text generation scenarios.
Expand All @@ -40,20 +130,20 @@ class OpenRouterChatGenerator(OpenAIChatGenerator):

Usage example:
```python
from haystack_integrations.components.generators.openrouter import OpenRouterChatGenerator
from haystack_integrations.components.generators.openrouter import (
OpenRouterChatGenerator,
)
from haystack.dataclasses import ChatMessage

messages = [ChatMessage.from_user("What's Natural Language Processing?")]

client = OpenRouterChatGenerator()
client = OpenRouterChatGenerator(
model="deepseek/deepseek-r1",
generation_kwargs={"reasoning": {"effort": "high"}},
)
response = client.run(messages)
print(response)

>>{'replies': [ChatMessage(_content='Natural Language Processing (NLP) is a branch of artificial intelligence
>>that focuses on enabling computers to understand, interpret, and generate human language in a way that is
>>meaningful and useful.', _role=<ChatRole.ASSISTANT: 'assistant'>, _name=None,
>>_meta={'model': 'openai/gpt-5-mini', 'index': 0, 'finish_reason': 'stop',
>>'usage': {'prompt_tokens': 15, 'completion_tokens': 36, 'total_tokens': 51}})]}
print(response["replies"][0].reasoning) # Access reasoning content
print(response["replies"][0].text) # Access final answer
```
"""

Expand Down Expand Up @@ -98,14 +188,11 @@ def __init__(
events as they become available, with the stream terminated by a data: [DONE] message.
- `safe_prompt`: Whether to inject a safety prompt before all conversations.
- `random_seed`: The seed to use for random sampling.
- `reasoning`: A dict to configure reasoning/thinking tokens for models that support it.
Example: `{"effort": "high"}` or `{"max_tokens": 2000}`.
Reasoning content is only captured for non-streaming requests.
See [OpenRouter reasoning docs](https://openrouter.ai/docs/use-cases/reasoning-tokens).
- `response_format`: A JSON schema or a Pydantic model that enforces the structure of the model's response.
If provided, the output will always be validated against this
format (unless the model returns a tool call).
For details, see the [OpenAI Structured Outputs documentation](https://platform.openai.com/docs/guides/structured-outputs).
Notes:
- This parameter accepts Pydantic models and JSON schemas for latest models starting from GPT-4o.
- For structured outputs with streaming,
the `response_format` must be a JSON schema and not a Pydantic model.
:param tools:
A list of tools or a Toolset for which the model can prepare calls. This parameter can accept either a
list of `Tool` objects or a `Toolset` instance.
Expand Down Expand Up @@ -187,6 +274,12 @@ def _prepare_api_call(
# adapt ChatMessage(s) to the format expected by the OpenAI API
openai_formatted_messages = [message.to_openai_dict_format() for message in messages]

# OpenRouter expects reasoning_details to be sent back in multi-turn conversations, but
# to_openai_dict_format() strips reasoning, so we re-inject it into the formatted message dicts.
for i, chat_msg in enumerate(messages):
if chat_msg.reasoning and chat_msg.reasoning.extra.get("reasoning_details"):
openai_formatted_messages[i]["reasoning_details"] = chat_msg.reasoning.extra["reasoning_details"]

flattened_tools = flatten_tools_or_toolsets(tools or self.tools)
tools_strict = tools_strict if tools_strict is not None else self.tools_strict
_check_duplicate_tool_names(flattened_tools)
Expand Down Expand Up @@ -227,3 +320,156 @@ def _prepare_api_call(
if response_format:
final_args["response_format"] = response_format
return final_args

@component.output_types(replies=list[ChatMessage])
def run(
self,
messages: list[ChatMessage] | str,
streaming_callback: StreamingCallbackT | None = None,
generation_kwargs: dict[str, Any] | None = None,
*,
tools: ToolsType | None = None,
tools_strict: bool | None = None,
) -> dict[str, list[ChatMessage]]:
"""
Invokes chat completion on the OpenRouter API.

:param messages:
A list of ChatMessage instances representing the input messages.
If a string is provided, it is converted to a list containing a ChatMessage with user role.
:param streaming_callback:
A callback function that is called when a new token is received from the stream.
:param generation_kwargs:
Additional keyword arguments for text generation. These parameters will
override the parameters passed during component initialization.
For details on OpenRouter API parameters, see
[OpenRouter docs](https://openrouter.ai/docs/quickstart).
:param tools: A list of Tool and/or Toolset objects, or a single Toolset for which the model can prepare calls.
If set, it will override the `tools` parameter provided during initialization.
:param tools_strict:
Whether to enable strict schema adherence for tool calls.

:returns:
A dictionary with the following key:
- `replies`: A list containing the generated responses as ChatMessage instances.
"""
messages = _normalize_messages(messages)
if not self._is_warmed_up:
self.warm_up()

if len(messages) == 0:
return {"replies": []}

streaming_callback = select_streaming_callback(
init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=False
)

# Reasoning content is reconstructed from the full response message, which is not available while
# streaming, so we warn the user that it will not be captured in this mode.
if streaming_callback is not None:
merged_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}
if merged_kwargs.get("reasoning"):
logger.warning(
"Streaming with reasoning is active. Reasoning content will not be captured during "
"streaming. Use non-streaming mode to extract reasoning content."
)

api_args = self._prepare_api_call(
messages=messages,
streaming_callback=streaming_callback,
generation_kwargs=generation_kwargs,
tools=tools,
tools_strict=tools_strict,
)
openai_endpoint = api_args.pop("openai_endpoint")
chat_completion = getattr(self.client.chat.completions, openai_endpoint)(**api_args)

if streaming_callback is not None:
# streaming uses the inherited handler so reasoning extraction is intentionally skipped
completions = self._handle_stream_response(chat_completion, streaming_callback)
else:
assert isinstance(chat_completion, ChatCompletion), "Unexpected response type for non-streaming request."
completions = [
_convert_openrouter_completion_to_chat_message(chat_completion, choice)
for choice in chat_completion.choices
]

for message in completions:
_check_finish_reason(message.meta)

return {"replies": completions}

@component.output_types(replies=list[ChatMessage])
async def run_async(
self,
messages: list[ChatMessage] | str,
streaming_callback: StreamingCallbackT | None = None,
generation_kwargs: dict[str, Any] | None = None,
*,
tools: ToolsType | None = None,
tools_strict: bool | None = None,
) -> dict[str, list[ChatMessage]]:
"""
Asynchronously invokes chat completion on the OpenRouter API.

:param messages:
A list of ChatMessage instances representing the input messages.
If a string is provided, it is converted to a list containing a ChatMessage with user role.
:param streaming_callback:
A callback function that is called when a new token is received from the stream.
Must be a coroutine.
:param generation_kwargs:
Additional keyword arguments for text generation.
:param tools: A list of Tool and/or Toolset objects, or a single Toolset.
:param tools_strict:
Whether to enable strict schema adherence for tool calls.

:returns:
A dictionary with the following key:
- `replies`: A list containing the generated responses as ChatMessage instances.
"""
messages = _normalize_messages(messages)
if not self._is_warmed_up:
self.warm_up()

if len(messages) == 0:
return {"replies": []}

streaming_callback = select_streaming_callback(
init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=True
)

# Reasoning content is reconstructed from the full response message, which is not available while
# streaming, so we warn the user that it will not be captured in this mode.
if streaming_callback is not None:
merged_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}
if merged_kwargs.get("reasoning"):
logger.warning(
"Streaming with reasoning is active. Reasoning content will not be captured during "
"streaming. Use non-streaming mode to extract reasoning content."
)

api_args = self._prepare_api_call(
messages=messages,
streaming_callback=streaming_callback,
generation_kwargs=generation_kwargs,
tools=tools,
tools_strict=tools_strict,
)
openai_endpoint = api_args.pop("openai_endpoint")
chat_completion = await getattr(self.async_client.chat.completions, openai_endpoint)(**api_args)

if streaming_callback is not None:
# streaming uses the inherited handler so reasoning extraction is intentionally skipped
completions = await self._handle_async_stream_response(chat_completion, streaming_callback)
else:
assert isinstance(chat_completion, ChatCompletion), "Unexpected response type for non-streaming request."
completions = [
_convert_openrouter_completion_to_chat_message(chat_completion, choice)
for choice in chat_completion.choices
]

for message in completions:
_check_finish_reason(message.meta)

return {"replies": completions}
Loading
Loading