Skip to content

Commit 3cbea8c

Browse files
authored
feat: add search functionality to PipelineResource (#92)
* feat: add imports for search models * feat: add search models for pipeline search functionality * feat: add imports for search functionality * feat: add search method with streaming support * feat: update imports to include search models * feat: add search method to PipelineResourceProtocol * feat: add stream field to TransportResponse for streaming support * feat: add streaming support to AsyncTransport * feat: add stream parameter to TransportProtocol * feat: add stream parameter to AsyncClientProtocol * feat: add imports for search models in tests * feat: add comprehensive unit tests for search functionality * feat: add integration test for search functionality * feat: add stream parameter to BaseFakeClient * feat: include stream parameter in request recording * feat: add stream parameter to AsyncDeepsetClient * feat: pass stream parameter through to transport * feat: add unit tests for streaming search functionality * feat: create quick test script to validate search functionality * chore: remove temporary test file * fix: update imports to use builtin types instead of typing module * fix: use builtin types and pipe union syntax in Answer model * fix: use builtin types and pipe union syntax in Document model * fix: use builtin types and pipe union syntax in SearchResult model * fix: use builtin types and pipe union syntax in SearchResponse model * fix: use pipe union syntax in FilterCondition model * fix: use builtin types in SearchFilters model * fix: use builtin types and pipe union syntax in StreamDelta model * fix: use pipe union syntax in StreamEvent model * fix: update imports to use builtin types instead of typing module * feat: add search and search stream
1 parent c02b113 commit 3cbea8c

10 files changed

Lines changed: 1178 additions & 13 deletions

File tree

src/deepset_mcp/api/client.py

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import os
2+
from collections.abc import AsyncIterator
3+
from contextlib import AbstractAsyncContextManager, asynccontextmanager
24
from types import TracebackType
35
from typing import Any, Self, TypeVar, overload
46

@@ -8,7 +10,12 @@
810
from deepset_mcp.api.pipeline.resource import PipelineResource
911
from deepset_mcp.api.pipeline_template.resource import PipelineTemplateResource
1012
from deepset_mcp.api.protocols import AsyncClientProtocol
11-
from deepset_mcp.api.transport import AsyncTransport, TransportProtocol, TransportResponse
13+
from deepset_mcp.api.transport import (
14+
AsyncTransport,
15+
StreamingResponse,
16+
TransportProtocol,
17+
TransportResponse,
18+
)
1219
from deepset_mcp.api.user.resource import UserResource
1320

1421
T = TypeVar("T")
@@ -85,7 +92,29 @@ async def request(
8592
response_type: type[T] | None = None,
8693
**kwargs: Any,
8794
) -> TransportResponse[Any]:
88-
"""Make a request to the deepset API."""
95+
"""
96+
Make a regular (non-streaming) request to the deepset API.
97+
98+
Parameters
99+
----------
100+
endpoint : str
101+
API endpoint path
102+
method : str, default="GET"
103+
HTTP method
104+
data : dict, optional
105+
JSON data to send in request body
106+
headers : dict, optional
107+
Additional headers to include
108+
response_type : type[T], optional
109+
Expected response type for type checking
110+
**kwargs : Any
111+
Additional arguments to pass to transport
112+
113+
Returns
114+
-------
115+
TransportResponse[T]
116+
Response with parsed JSON if available
117+
"""
89118
if not endpoint.startswith("/"):
90119
endpoint = f"/{endpoint}"
91120
url = self.base_url + endpoint
@@ -111,6 +140,81 @@ async def request(
111140
**kwargs,
112141
)
113142

143+
def stream_request(
144+
self,
145+
endpoint: str,
146+
*,
147+
method: str = "POST",
148+
data: dict[str, Any] | None = None,
149+
headers: dict[str, str] | None = None,
150+
**kwargs: Any,
151+
) -> AbstractAsyncContextManager[StreamingResponse]:
152+
"""
153+
Make a streaming request to the deepset API.
154+
155+
Must be used as an async context manager to ensure proper cleanup.
156+
157+
Parameters
158+
----------
159+
endpoint : str
160+
API endpoint path
161+
method : str, default="POST"
162+
HTTP method (usually POST for streaming)
163+
data : dict, optional
164+
JSON data to send in request body
165+
headers : dict, optional
166+
Additional headers to include
167+
**kwargs : Any
168+
Additional arguments to pass to transport
169+
170+
Yields
171+
------
172+
StreamingResponse
173+
Response object with streaming capabilities
174+
175+
Examples
176+
--------
177+
async with client.stream_request("/pipelines/search-stream", data={"query": "AI"}) as response:
178+
if response.success:
179+
async for line in response.iter_lines():
180+
# Process each line of the stream
181+
data = json.loads(line)
182+
print(data)
183+
else:
184+
# Handle error
185+
error_body = await response.read_body()
186+
print(f"Error {response.status_code}: {error_body}")
187+
"""
188+
189+
@asynccontextmanager
190+
async def _stream() -> AsyncIterator[StreamingResponse]:
191+
if not endpoint.startswith("/"):
192+
full_endpoint = f"/{endpoint}"
193+
url = self.base_url + full_endpoint
194+
195+
# Default headers for streaming
196+
request_headers: dict[str, str] = {
197+
"Authorization": f"Bearer {self.api_key}",
198+
"Accept": "text/event-stream,application/json,text/plain,*/*",
199+
}
200+
if data is not None:
201+
request_headers["Content-Type"] = "application/json"
202+
# Merge custom headers
203+
if headers:
204+
headers.setdefault("Authorization", request_headers["Authorization"])
205+
request_headers.update(headers)
206+
207+
async with self._transport.stream(
208+
method,
209+
url,
210+
json=data,
211+
headers=request_headers,
212+
**kwargs,
213+
) as response:
214+
yield response
215+
216+
return _stream()
217+
114218
async def close(self) -> None:
115219
"""Close underlying transport resources."""
116220
await self._transport.close()

src/deepset_mcp/api/pipeline/models.py

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
from datetime import datetime
22
from enum import StrEnum
33
from typing import Any
4+
from uuid import UUID
45

5-
from pydantic import BaseModel, Field
6+
from pydantic import BaseModel, Field, model_validator
67

78
from deepset_mcp.api.shared_models import DeepsetUser
89

@@ -96,3 +97,101 @@ class PipelineLogList(BaseModel):
9697
data: list[PipelineLog]
9798
has_more: bool
9899
total: int
100+
101+
102+
# Search-related models
103+
104+
105+
class OffsetRange(BaseModel):
106+
"""Model representing an offset range."""
107+
108+
start: int
109+
end: int
110+
111+
112+
class DeepsetAnswer(BaseModel):
113+
"""Model representing a search answer."""
114+
115+
answer: str # Required field
116+
context: str | None = None
117+
document_id: str | None = None
118+
document_ids: list[str] | None = None
119+
file: dict[str, Any] | None = None
120+
files: list[dict[str, Any]] | None = None
121+
meta: dict[str, Any] | None = None
122+
offsets_in_context: list[OffsetRange] | None = None
123+
offsets_in_document: list[OffsetRange] | None = None
124+
prompt: str | None = None
125+
result_id: UUID | None = None
126+
score: float | None = None
127+
type: str | None = None
128+
129+
130+
class DeepsetDocument(BaseModel):
131+
"""Model representing a search document."""
132+
133+
content: str # Required field
134+
meta: dict[str, Any] # Required field - can hold any value
135+
embedding: list[float] | None = None
136+
file: dict[str, Any] | None = None
137+
id: str | None = None
138+
result_id: UUID | None = None
139+
score: float | None = None
140+
141+
142+
class DeepsetSearchResponse(BaseModel):
143+
"""Model representing a single search result."""
144+
145+
debug: dict[str, Any] | None = Field(default=None, alias="_debug")
146+
answers: list[DeepsetAnswer] = Field(default_factory=list)
147+
documents: list[DeepsetDocument] = Field(default_factory=list)
148+
prompts: dict[str, str] | None = None
149+
query: str | None = None
150+
query_id: UUID | None = None
151+
152+
@model_validator(mode="before")
153+
@classmethod
154+
def normalize_response(cls, data: dict[str, Any]) -> dict[str, Any]:
155+
"""Normalize the response from the search and search-stream endpoints.
156+
157+
The search endpoint returns a list of results, but we only ever use the first result.
158+
We are not sending batch queries, so there will never be more than one result.
159+
We use this validator to transform the data so that we can use the same response model for search and
160+
search-stream endpoints.
161+
"""
162+
# Handle non-stream format with 'results' array
163+
if "results" in data and isinstance(data["results"], list):
164+
if len(data["results"]) > 0:
165+
first_result = data["results"][
166+
0
167+
] # we only ever care for the first result as we don't use batch queries
168+
normalized = {
169+
"query_id": data.get("query_id", first_result.get("query_id")),
170+
"query": first_result.get("query"),
171+
"answers": first_result.get("answers", []),
172+
"documents": first_result.get("documents", []),
173+
"prompts": first_result.get("prompts"),
174+
"_debug": first_result.get("_debug") or first_result.get("debug"),
175+
}
176+
return normalized
177+
else:
178+
return {}
179+
else:
180+
return data
181+
182+
183+
class StreamDelta(BaseModel):
184+
"""Model representing a streaming delta."""
185+
186+
text: str
187+
meta: dict[str, Any] | None = None
188+
189+
190+
class DeepsetStreamEvent(BaseModel):
191+
"""Model representing a stream event."""
192+
193+
query_id: str | UUID | None = None
194+
type: str # "delta", "result", or "error"
195+
delta: StreamDelta | None = None
196+
result: DeepsetSearchResponse | None = None
197+
error: str | None = None

src/deepset_mcp/api/pipeline/resource.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1+
import json
12
import logging
3+
from collections.abc import AsyncIterator
24
from typing import TYPE_CHECKING, Any
35

46
from deepset_mcp.api.exceptions import UnexpectedAPIError
57
from deepset_mcp.api.pipeline.log_level import LogLevel
68
from deepset_mcp.api.pipeline.models import (
79
DeepsetPipeline,
10+
DeepsetSearchResponse,
11+
DeepsetStreamEvent,
812
NoContentResponse,
913
PipelineLogList,
1014
PipelineValidationResult,
@@ -261,3 +265,103 @@ async def delete(self, pipeline_name: str) -> NoContentResponse:
261265
raise_for_status(resp)
262266

263267
return NoContentResponse(message="Pipeline deleted successfully.")
268+
269+
async def search(
270+
self,
271+
pipeline_name: str,
272+
query: str,
273+
debug: bool = False,
274+
view_prompts: bool = False,
275+
params: dict[str, Any] | None = None,
276+
filters: dict[str, Any] | None = None,
277+
) -> DeepsetSearchResponse:
278+
"""Search using a pipeline.
279+
280+
:param pipeline_name: Name of the pipeline to use for search.
281+
:param query: Search query.
282+
:param debug: Whether to include debug information.
283+
:param view_prompts: Whether to include prompts in the response.
284+
:param params: Additional parameters for pipeline components.
285+
:param filters: Search filters to apply.
286+
287+
:returns: SearchResponse containing search results.
288+
"""
289+
# Prepare request data
290+
data: dict[str, Any] = {
291+
"queries": [query], # API expects a list but we only send one query
292+
"debug": debug,
293+
"view_prompts": view_prompts,
294+
}
295+
296+
if params:
297+
data["params"] = params
298+
299+
if filters:
300+
data["filters"] = filters
301+
302+
resp = await self._client.request(
303+
endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}/search",
304+
method="POST",
305+
data=data,
306+
response_type=dict[str, Any],
307+
)
308+
309+
raise_for_status(resp)
310+
311+
if resp.json is not None:
312+
return DeepsetSearchResponse.model_validate(resp.json)
313+
else:
314+
# Return empty response if no JSON data
315+
return DeepsetSearchResponse()
316+
317+
async def search_stream(
318+
self,
319+
pipeline_name: str,
320+
query: str,
321+
debug: bool = False,
322+
view_prompts: bool = False,
323+
params: dict[str, Any] | None = None,
324+
filters: dict[str, Any] | None = None,
325+
) -> AsyncIterator[DeepsetStreamEvent]:
326+
"""Search using a pipeline with response streaming.
327+
328+
:param pipeline_name: Name of the pipeline to use for search.
329+
:param query: Search query.
330+
:param debug: Whether to include debug information.
331+
:param view_prompts: Whether to include prompts in the response.
332+
:param params: Additional parameters for pipeline components.
333+
:param filters: Search filters to apply.
334+
335+
:returns: AsyncIterator streaming the result.
336+
"""
337+
# For streaming, we need to add include_result flag
338+
# Prepare request data
339+
data: dict[str, Any] = {
340+
"query": query,
341+
"debug": debug,
342+
"view_prompts": view_prompts,
343+
"include_result": True,
344+
}
345+
346+
if params:
347+
data["params"] = params
348+
349+
if filters:
350+
data["filters"] = filters
351+
352+
async with self._client.stream_request(
353+
endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}/search-stream",
354+
method="POST",
355+
data=data,
356+
) as resp:
357+
async for line in resp.iter_lines():
358+
try:
359+
event_dict = json.loads(line)
360+
event = DeepsetStreamEvent.model_validate(event_dict)
361+
362+
if event.error is not None:
363+
raise UnexpectedAPIError(message=event.error)
364+
yield event
365+
except (json.JSONDecodeError, ValueError):
366+
# Skip malformed events
367+
continue

0 commit comments

Comments
 (0)