Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integrations/ollama/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ known-first-party = ["haystack_integrations"]


[tool.ruff]
target-version = "py38"
target-version = "py39"
line-length = 120

[tool.ruff.lint]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import Any, Dict, List, Optional, Union
from typing import Any, Optional, Union

from haystack import Document, component
from tqdm import tqdm
Expand Down Expand Up @@ -30,13 +30,13 @@ def __init__(
self,
model: str = "nomic-embed-text",
url: str = "http://localhost:11434",
generation_kwargs: Optional[Dict[str, Any]] = None,
generation_kwargs: Optional[dict[str, Any]] = None,
timeout: int = 120,
keep_alive: Optional[Union[float, str]] = None,
prefix: str = "",
suffix: str = "",
progress_bar: bool = True,
meta_fields_to_embed: Optional[List[str]] = None,
meta_fields_to_embed: Optional[list[str]] = None,
embedding_separator: str = "\n",
batch_size: int = 32,
):
Expand Down Expand Up @@ -87,7 +87,7 @@ def __init__(
self._client = Client(host=self.url, timeout=self.timeout)
self._async_client = AsyncClient(host=self.url, timeout=self.timeout)

def _prepare_input(self, documents: List[Document]) -> List[Document]:
def _prepare_input(self, documents: list[Document]) -> list[Document]:
"""
Prepares the list of documents to embed by appropriate validation.
"""
Expand All @@ -100,7 +100,7 @@ def _prepare_input(self, documents: List[Document]) -> List[Document]:

return documents

def _prepare_texts_to_embed(self, documents: List[Document]) -> List[str]:
def _prepare_texts_to_embed(self, documents: list[Document]) -> list[str]:
"""
Prepares the texts to embed by concatenating the Document text with the metadata fields to embed.
"""
Expand All @@ -123,8 +123,8 @@ def _prepare_texts_to_embed(self, documents: List[Document]) -> List[str]:
return texts_to_embed

def _embed_batch(
self, texts_to_embed: List[str], batch_size: int, generation_kwargs: Optional[Dict[str, Any]] = None
) -> List[List[float]]:
self, texts_to_embed: list[str], batch_size: int, generation_kwargs: Optional[dict[str, Any]] = None
) -> list[list[float]]:
"""
Internal method to embed a batch of texts.
"""
Expand All @@ -146,8 +146,8 @@ def _embed_batch(
return all_embeddings

async def _embed_batch_async(
self, texts_to_embed: List[str], batch_size: int, generation_kwargs: Optional[Dict[str, Any]] = None
) -> List[List[float]]:
self, texts_to_embed: list[str], batch_size: int, generation_kwargs: Optional[dict[str, Any]] = None
) -> list[list[float]]:
"""
Internal method to embed a batch of texts asynchronously.
"""
Expand Down Expand Up @@ -175,10 +175,10 @@ async def _embed_batch_async(

return all_embeddings

@component.output_types(documents=List[Document], meta=Dict[str, Any])
@component.output_types(documents=list[Document], meta=dict[str, Any])
def run(
self, documents: List[Document], generation_kwargs: Optional[Dict[str, Any]] = None
) -> Dict[str, Union[List[Document], Dict[str, Any]]]:
self, documents: list[Document], generation_kwargs: Optional[dict[str, Any]] = None
) -> dict[str, Union[list[Document], dict[str, Any]]]:
"""
Runs an Ollama Model to compute embeddings of the provided documents.

Expand Down Expand Up @@ -210,10 +210,10 @@ def run(

return {"documents": documents, "meta": {"model": self.model}}

@component.output_types(documents=List[Document], meta=Dict[str, Any])
@component.output_types(documents=list[Document], meta=dict[str, Any])
async def run_async(
self, documents: List[Document], generation_kwargs: Optional[Dict[str, Any]] = None
) -> Dict[str, Union[List[Document], Dict[str, Any]]]:
self, documents: list[Document], generation_kwargs: Optional[dict[str, Any]] = None
) -> dict[str, Union[list[Document], dict[str, Any]]]:
"""
Asynchronously run an Ollama Model to compute embeddings of the provided documents.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional, Union
from typing import Any, Optional, Union

from haystack import component

Expand All @@ -25,7 +25,7 @@ def __init__(
self,
model: str = "nomic-embed-text",
url: str = "http://localhost:11434",
generation_kwargs: Optional[Dict[str, Any]] = None,
generation_kwargs: Optional[dict[str, Any]] = None,
timeout: int = 120,
keep_alive: Optional[Union[float, str]] = None,
):
Expand Down Expand Up @@ -58,10 +58,10 @@ def __init__(
self._client = Client(host=self.url, timeout=self.timeout)
self._async_client = AsyncClient(host=self.url, timeout=self.timeout)

@component.output_types(embedding=List[float], meta=Dict[str, Any])
@component.output_types(embedding=list[float], meta=dict[str, Any])
def run(
self, text: str, generation_kwargs: Optional[Dict[str, Any]] = None
) -> Dict[str, Union[List[float], Dict[str, Any]]]:
self, text: str, generation_kwargs: Optional[dict[str, Any]] = None
) -> dict[str, Union[list[float], dict[str, Any]]]:
"""
Runs an Ollama Model to compute embeddings of the provided text.

Expand All @@ -85,10 +85,10 @@ def run(

return result

@component.output_types(embedding=List[float], meta=Dict[str, Any])
@component.output_types(embedding=list[float], meta=dict[str, Any])
async def run_async(
self, text: str, generation_kwargs: Optional[Dict[str, Any]] = None
) -> Dict[str, Union[List[float], Dict[str, Any]]]:
self, text: str, generation_kwargs: Optional[dict[str, Any]] = None
) -> dict[str, Union[list[float], dict[str, Any]]]:
"""
Asynchronously run an Ollama Model to compute embeddings of the provided text.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from typing import Any, AsyncIterator, Callable, Dict, Iterator, List, Literal, Optional, Union
from collections.abc import AsyncIterator, Iterator
from typing import Any, Callable, Literal, Optional, Union

from haystack import component, default_from_dict, default_to_dict
from haystack.dataclasses import (
Expand All @@ -23,14 +24,14 @@

from ollama import AsyncClient, ChatResponse, Client

FINISH_REASON_MAPPING: Dict[str, FinishReason] = {
FINISH_REASON_MAPPING: dict[str, FinishReason] = {
"stop": "stop",
"tool_calls": "tool_calls",
# we skip load and unload reasons
}


def _convert_chatmessage_to_ollama_format(message: ChatMessage) -> Dict[str, Any]:
def _convert_chatmessage_to_ollama_format(message: ChatMessage) -> dict[str, Any]:
"""
Convert a ChatMessage to the format expected by the Ollama Chat API.
"""
Expand All @@ -48,7 +49,7 @@ def _convert_chatmessage_to_ollama_format(message: ChatMessage) -> Dict[str, Any
msg = "For Ollama compatibility, a `ChatMessage` can contain at most one `TextContent` or `ToolCallResult`."
raise ValueError(msg)

ollama_msg: Dict[str, Any] = {"role": message.role.value}
ollama_msg: dict[str, Any] = {"role": message.role.value}

if tool_call_results:
# Ollama does not provide a way to communicate errors in tool invocations, so we ignore the error field
Expand All @@ -70,7 +71,7 @@ def _convert_chatmessage_to_ollama_format(message: ChatMessage) -> Dict[str, Any
return ollama_msg


def _convert_ollama_meta_to_openai_format(input_response_dict: Dict) -> Dict[str, Any]:
def _convert_ollama_meta_to_openai_format(input_response_dict: dict) -> dict[str, Any]:
"""
Map Ollama metadata keys onto the OpenAI-compatible names Haystack expects.
All fields that are not part of the OpenAI metadata are left unchanged in the returned dict.
Expand Down Expand Up @@ -129,7 +130,7 @@ def _convert_ollama_response_to_chatmessage(ollama_response: ChatResponse) -> Ch
response_dict = ollama_response.model_dump()
ollama_message = response_dict["message"]
text = ollama_message["content"]
tool_calls: List[ToolCall] = []
tool_calls: list[ToolCall] = []

if ollama_tool_calls := ollama_message.get("tool_calls"):
for ollama_tc in ollama_tool_calls:
Expand Down Expand Up @@ -211,7 +212,7 @@ def __init__(
self,
model: str = "qwen3:0.6b",
url: str = "http://localhost:11434",
generation_kwargs: Optional[Dict[str, Any]] = None,
generation_kwargs: Optional[dict[str, Any]] = None,
timeout: int = 120,
keep_alive: Optional[Union[float, str]] = None,
streaming_callback: Optional[Callable[[StreamingChunk], None]] = None,
Expand Down Expand Up @@ -274,7 +275,7 @@ def __init__(
self._client = Client(host=self.url, timeout=self.timeout)
self._async_client = AsyncClient(host=self.url, timeout=self.timeout)

def to_dict(self) -> Dict[str, Any]:
def to_dict(self) -> dict[str, Any]:
"""
Serializes the component to a dictionary.

Expand All @@ -296,7 +297,7 @@ def to_dict(self) -> Dict[str, Any]:
)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "OllamaChatGenerator":
def from_dict(cls, data: dict[str, Any]) -> "OllamaChatGenerator":
"""
Deserializes the component from a dictionary.

Expand All @@ -315,20 +316,20 @@ def _handle_streaming_response(
self,
response_iter: Iterator[ChatResponse],
callback: Optional[SyncStreamingCallbackT],
) -> Dict[str, List[ChatMessage]]:
) -> dict[str, list[ChatMessage]]:
"""
Merge an Ollama streaming response into a single ChatMessage, preserving
tool calls. Works even when arguments arrive piecemeal as str fragments
or as full JSON dicts.
"""

component_info = ComponentInfo.from_component(self)
chunks: List[StreamingChunk] = []
chunks: list[StreamingChunk] = []

# Accumulators
arg_by_id: Dict[str, str] = {}
name_by_id: Dict[str, str] = {}
id_order: List[str] = []
arg_by_id: dict[str, str] = {}
name_by_id: dict[str, str] = {}
id_order: list[str] = []
tool_call_index: int = 0

# Stream
Expand Down Expand Up @@ -399,18 +400,18 @@ async def _handle_streaming_response_async(
self,
response_iter: AsyncIterator[ChatResponse],
callback: Optional[AsyncStreamingCallbackT],
) -> Dict[str, List[ChatMessage]]:
) -> dict[str, list[ChatMessage]]:
"""
Merge an Ollama async streaming response into a single ChatMessage, preserving
tool calls. Works even when arguments arrive piecemeal as str fragments
or as full JSON dicts."""
component_info = ComponentInfo.from_component(self)
chunks: List[StreamingChunk] = []
chunks: list[StreamingChunk] = []

# Accumulators
arg_by_id: Dict[str, str] = {}
name_by_id: Dict[str, str] = {}
id_order: List[str] = []
arg_by_id: dict[str, str] = {}
name_by_id: dict[str, str] = {}
id_order: list[str] = []
tool_call_index: int = 0

# Stream
Expand Down Expand Up @@ -466,15 +467,15 @@ async def _handle_streaming_response_async(

return {"replies": [reply]}

@component.output_types(replies=List[ChatMessage])
@component.output_types(replies=list[ChatMessage])
def run(
self,
messages: List[ChatMessage],
generation_kwargs: Optional[Dict[str, Any]] = None,
messages: list[ChatMessage],
generation_kwargs: Optional[dict[str, Any]] = None,
tools: Optional[ToolsType] = None,
*,
streaming_callback: Optional[StreamingCallbackT] = None,
) -> Dict[str, List[ChatMessage]]:
) -> dict[str, list[ChatMessage]]:
"""
Runs an Ollama Model on a given chat history.

Expand Down Expand Up @@ -532,15 +533,15 @@ def run(
# non-stream path
return {"replies": [_convert_ollama_response_to_chatmessage(ollama_response=response)]}

@component.output_types(replies=List[ChatMessage])
@component.output_types(replies=list[ChatMessage])
async def run_async(
self,
messages: List[ChatMessage],
generation_kwargs: Optional[Dict[str, Any]] = None,
messages: list[ChatMessage],
generation_kwargs: Optional[dict[str, Any]] = None,
tools: Optional[ToolsType] = None,
*,
streaming_callback: Optional[StreamingCallbackT] = None,
) -> Dict[str, List[ChatMessage]]:
) -> dict[str, list[ChatMessage]]:
"""
Async version of run. Runs an Ollama Model on a given chat history.

Expand Down
Loading
Loading