Skip to content

Commit 1e7bfc3

Browse files
committed
feat: add custom component tools
1 parent 63361ad commit 1e7bfc3

1 file changed

Lines changed: 324 additions & 2 deletions

File tree

main.py

Lines changed: 324 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def get_pipeline(pipeline_id: str) -> Dict[str, Any]:
8888
def get_component_schemas() -> Dict[str, Any]:
8989
"""Retrieves the schemas for all available Haystack components from the deepset API. These schemas define the expected input and output parameters for each component type, which is useful for constructing or validating componets in a pipeline YAML."""
9090
try:
91-
return deepset_api_request("/haystack/components/input-output")
91+
return deepset_api_request("/haystack/components")
9292
except Exception as e:
9393
return {"error": str(e)}
9494

@@ -217,7 +217,329 @@ def get_pipeline_yaml_resource(pipeline_name: str) -> str:
217217
"""Return the YAML definition of a specific pipeline as a resource"""
218218
return get_pipeline_yaml(pipeline_name)
219219

220+
221+
@mcp.tool()
222+
def list_pipeline_templates() -> str:
223+
"""
224+
Retrieves a list of pipeline templates to build AI applications like RAG or Agents.
225+
"""
226+
workspace = get_workspace()
227+
try:
228+
# Build the query parameters with fixed values
229+
endpoint = f"/workspaces/{workspace}/pipeline_templates?limit=100&page_number=1&field=created_at&order=DESC"
230+
231+
# Make the API request
232+
response = deepset_api_request(endpoint)
233+
234+
# Check for errors in the response
235+
if isinstance(response, dict) and "error" in response:
236+
return f"Error retrieving pipeline templates: {response['error']}"
237+
238+
# Extract the template data
239+
templates = response.get("data", [])
240+
241+
if not templates:
242+
return "No pipeline templates found."
243+
244+
# Format the response as requested
245+
formatted_output = []
246+
for template in templates:
247+
name = template.get("pipeline_name", "Unnamed Template")
248+
description = template.get("description", "No description available")
249+
250+
formatted_output.append(f"<template>\n{name}\n\n{description}\n</template>\n")
251+
252+
# Join all template entries and return
253+
return "\n".join(formatted_output)
254+
255+
except Exception as e:
256+
return f"Error: {str(e)}"
257+
258+
259+
@mcp.tool()
260+
def get_pipeline_template(template_name: str) -> str:
261+
"""Retrieves a specific pipeline template by name and returns its YAML configurations.
262+
263+
Parameters
264+
----------
265+
template_name : str
266+
The name of the pipeline template to retrieve
267+
268+
Returns
269+
-------
270+
str
271+
A formatted string containing the template's query and indexing YAML (if available)
272+
"""
273+
workspace = get_workspace()
274+
try:
275+
# Build the endpoint URL
276+
endpoint = f"/workspaces/{workspace}/pipeline_templates/{template_name}"
277+
278+
# Make the API request
279+
response = deepset_api_request(endpoint)
280+
281+
# Check for errors in the response
282+
if isinstance(response, dict) and "error" in response:
283+
return f"Error retrieving pipeline template: {response['error']}"
284+
285+
# Extract the YAML data
286+
query_yaml = response.get("query_yaml")
287+
indexing_yaml = response.get("indexing_yaml")
288+
template_name = response.get("name", "Unnamed Template")
289+
290+
# Format the response
291+
formatted_output = [f"# Pipeline Template: {template_name}\n"]
292+
293+
# Add query YAML if available
294+
if query_yaml:
295+
formatted_output.append("## QUERY PIPELINE YAML\n```yaml\n" + query_yaml + "\n```\n")
296+
else:
297+
formatted_output.append("## QUERY PIPELINE YAML\nNo query pipeline YAML available for this template.\n")
298+
299+
# Add indexing YAML if available
300+
if indexing_yaml:
301+
formatted_output.append("## INDEXING PIPELINE YAML\n```yaml\n" + indexing_yaml + "\n```\n")
302+
else:
303+
formatted_output.append(
304+
"## INDEXING PIPELINE YAML\nNo indexing pipeline YAML available for this template.\n")
305+
306+
# Join all sections and return
307+
return "\n".join(formatted_output)
308+
309+
except Exception as e:
310+
return f"Error retrieving pipeline template: {str(e)}"
311+
312+
313+
@mcp.tool()
314+
def get_custom_components() -> str:
315+
"""
316+
Use this to get a list of all installed custom components.
317+
"""
318+
try:
319+
# Retrieve all component schemas
320+
response = get_component_schemas()
321+
322+
# Check for errors in the response
323+
if isinstance(response, dict) and "error" in response:
324+
return f"Error retrieving component schemas: {response['error']}"
325+
326+
# Navigate to the components definition section
327+
# Typically structured as {definitions: {Components: {<component_name>: <schema>}}}
328+
schemas = response.get("component_schema", {})
329+
schemas = schemas.get("definitions", {}).get("Components", {})
330+
331+
if not schemas:
332+
return "No component schemas found or unexpected schema format."
333+
334+
# Filter for custom components (those with package_version key)
335+
custom_components = {}
336+
for component_name, schema in schemas.items():
337+
if "package_version" in schema:
338+
custom_components[component_name] = schema
339+
340+
if not custom_components:
341+
return "No custom components found."
342+
343+
# Format the response
344+
formatted_output = [f"# Custom Components ({len(custom_components)} found)\n"]
345+
346+
for component_name, schema in custom_components.items():
347+
# Extract key information
348+
package_version = schema.get("package_version", "Unknown")
349+
dynamic_params = schema.get("dynamic_params", False)
350+
351+
# Get component type info
352+
type_info = schema.get("properties", {}).get("type", {})
353+
const_value = type_info.get("const", "Unknown")
354+
family = type_info.get("family", "Unknown")
355+
family_description = type_info.get("family_description", "No description")
356+
357+
# Format component details
358+
component_details = [
359+
f"## {component_name}",
360+
f"- **Type**: `{const_value}`",
361+
f"- **Package Version**: {package_version}",
362+
f"- **Family**: {family} - {family_description}",
363+
f"- **Dynamic Parameters**: {'Yes' if dynamic_params else 'No'}"
364+
]
365+
366+
# Add init parameters if available
367+
init_params = schema.get("properties", {}).get("init_parameters", {}).get("properties", {})
368+
if init_params:
369+
component_details.append("\n### Init Parameters:")
370+
for param_name, param_info in init_params.items():
371+
param_type = param_info.get("type", "Unknown")
372+
required = param_name in schema.get("properties", {}).get("init_parameters", {}).get("required", [])
373+
param_description = param_info.get("description", "No description")
374+
375+
component_details.append(f"- **{param_name}** ({param_type}{', required' if required else ''}):")
376+
component_details.append(f" {param_description}")
377+
378+
formatted_output.append("\n".join(component_details) + "\n")
379+
380+
# Join all sections and return
381+
return "\n".join(formatted_output)
382+
383+
except Exception as e:
384+
return f"Error retrieving custom components: {str(e)}"
385+
386+
387+
@mcp.tool()
388+
def get_latest_custom_component_installation_logs() -> str:
389+
"""
390+
Use this to get the logs from the latest custom component installation.
391+
392+
This will give you the full installation log output.
393+
It is useful for debugging custom component installations.
394+
"""
395+
try:
396+
# Note: This endpoint uses v2 of the API instead of v1
397+
endpoint = "/custom_components/logs"
398+
399+
# Set up headers - using accept: text/plain since log output is plain text
400+
headers = {
401+
"Authorization": f"Bearer {get_api_key()}",
402+
"Accept": "text/plain"
403+
}
404+
405+
url = f"{DEEPSET_API_BASE_URL.replace('/api/v1', '/api/v2')}{endpoint}"
406+
407+
try:
408+
# Make direct request since our helper function expects JSON response
409+
response = requests.get(url, headers=headers)
410+
411+
if response.status_code >= 400:
412+
error_message = f"API Error: {response.status_code}"
413+
try:
414+
# Try to get any structured error info
415+
error_details = response.json()
416+
except:
417+
# Fall back to text if not JSON
418+
error_details = response.text if response.text else "No error details provided by API"
419+
return f"Error retrieving installation logs: {error_message}\nDetails: {error_details}"
420+
421+
# Return raw log text
422+
return response.text
423+
424+
except requests.exceptions.RequestException as e:
425+
return f"Request failed: {str(e)}"
426+
427+
except Exception as e:
428+
return f"Error retrieving custom component logs: {str(e)}"
429+
430+
431+
@mcp.tool()
432+
def list_custom_component_installations() -> str:
433+
"""
434+
Retrieves a list of the most recent custom component installations.
435+
436+
This will return version number, high-level log, status and information about who uploaded the package.
437+
"""
438+
try:
439+
# Note: This endpoint uses v2 of the API instead of v1
440+
endpoint = "/custom_components?limit=20&page_number=1&field=created_at&order=DESC"
441+
442+
# Set up headers
443+
headers = {
444+
"Authorization": f"Bearer {get_api_key()}",
445+
"Accept": "application/json"
446+
}
447+
448+
url = f"{DEEPSET_API_BASE_URL.replace('/api/v1', '/api/v2')}{endpoint}"
449+
450+
try:
451+
# Make direct request
452+
response = requests.get(url, headers=headers)
453+
454+
if response.status_code >= 400:
455+
error_message = f"API Error: {response.status_code}"
456+
try:
457+
# Try to get any structured error info
458+
error_details = response.json()
459+
except:
460+
# Fall back to text if not JSON
461+
error_details = response.text if response.text else "No error details provided by API"
462+
return f"Error retrieving installation history: {error_message}\nDetails: {error_details}"
463+
464+
# Parse JSON response
465+
data = response.json()
466+
installations = data.get("data", [])
467+
total = data.get("total", 0)
468+
has_more = data.get("has_more", False)
469+
470+
if not installations:
471+
return "No custom component installations found."
472+
473+
# Format the response
474+
formatted_output = [f"# Custom Component Installations (showing {len(installations)} of {total})\n"]
475+
476+
for install in installations:
477+
# Extract key information
478+
component_id = install.get("custom_component_id", "Unknown")
479+
status = install.get("status", "Unknown")
480+
version = install.get("version", "Unknown")
481+
user_id = install.get("created_by_user_id", "Unknown")
482+
483+
# Try to fetch user information
484+
user_info = "Unknown"
485+
if user_id != "Unknown":
486+
try:
487+
# Make request to the user API endpoint
488+
user_url = f"{DEEPSET_API_BASE_URL}/users/{user_id}"
489+
user_response = requests.get(
490+
user_url,
491+
headers={"Authorization": f"Bearer {get_api_key()}", "Accept": "application/json"}
492+
)
493+
494+
if user_response.status_code == 200:
495+
user_data = user_response.json()
496+
given_name = user_data.get("given_name", "")
497+
family_name = user_data.get("family_name", "")
498+
email = user_data.get("email", "")
499+
500+
if given_name and family_name:
501+
user_info = f"{given_name} {family_name} ({email})"
502+
elif email:
503+
user_info = email
504+
except Exception as e:
505+
user_info = f"Error fetching user: {str(e)}"
506+
507+
# Format installation details
508+
install_details = [
509+
f"## Installation {component_id[:8]}...",
510+
f"- **Status**: {status}",
511+
f"- **Version**: {version}",
512+
f"- **Installed by**: {user_info}"
513+
]
514+
515+
# Add logs if available
516+
logs = install.get("logs", [])
517+
if logs:
518+
install_details.append("\n### Recent Logs:")
519+
for log in logs[:5]: # Show only the first 5 logs
520+
level = log.get("level", "INFO")
521+
msg = log.get("msg", "No message")
522+
install_details.append(f"- [{level}] {msg}")
523+
524+
if len(logs) > 5:
525+
install_details.append(f"- ... and {len(logs) - 5} more log entries")
526+
527+
formatted_output.append("\n".join(install_details) + "\n")
528+
529+
if has_more:
530+
formatted_output.append(
531+
"*Note: There are more installations available. This listing shows only the 10 most recent.*")
532+
533+
# Join all sections and return
534+
return "\n".join(formatted_output)
535+
536+
except requests.exceptions.RequestException as e:
537+
return f"Request failed: {str(e)}"
538+
539+
except Exception as e:
540+
return f"Error retrieving custom component installations: {str(e)}"
541+
220542
if __name__ == "__main__":
221543
# Using the built-in server runner
222544
# Ensure it binds to 0.0.0.0 to be accessible in Docker
223-
mcp.run(host="0.0.0.0", port=8000)
545+
mcp.run()

0 commit comments

Comments
 (0)