Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 55 additions & 14 deletions src/claude_agent_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,13 @@ def create_sdk_mcp_server(
from mcp.server import Server
from mcp.types import (
AudioContent,
BlobResourceContents,
CallToolResult,
EmbeddedResource,
ImageContent,
ResourceLink,
TextContent,
TextResourceContents,
Tool,
)

Expand Down Expand Up @@ -349,31 +351,70 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> Any:
)
)
elif item_type == "resource_link":
parts = []
link_name = item.get("name")
uri = item.get("uri")
desc = item.get("description")
if link_name:
parts.append(link_name)
if uri:
parts.append(str(uri))
if desc:
parts.append(desc)
if link_name is None or uri is None:
logger.warning(
"Resource link missing required name/uri, skipping"
)
continue
content.append(
TextContent(
type="text",
text="\n".join(parts) if parts else "Resource link",
ResourceLink(
type="resource_link",
name=link_name,
title=item.get("title"),
uri=uri,
description=item.get("description"),
mimeType=item.get("mimeType"),
size=item.get("size"),
icons=item.get("icons"),
annotations=item.get("annotations"),
_meta=item.get("_meta"),
)
)
elif item_type == "resource":
resource = item.get("resource") or {}
if "text" in resource:
if not isinstance(resource, dict):
logger.warning(
"Embedded resource payload must be a dict, skipping"
)
continue
uri = resource.get("uri")
if uri is None:
logger.warning(
"Embedded resource missing uri, skipping content block"
)
elif "text" in resource:
content.append(
EmbeddedResource(
type="resource",
resource=TextResourceContents(
uri=uri,
text=resource["text"],
mimeType=resource.get("mimeType"),
_meta=resource.get("_meta"),
),
annotations=item.get("annotations"),
_meta=item.get("_meta"),
)
)
elif "blob" in resource:
content.append(
TextContent(type="text", text=resource["text"])
EmbeddedResource(
type="resource",
resource=BlobResourceContents(
uri=uri,
blob=resource["blob"],
mimeType=resource.get("mimeType"),
_meta=resource.get("_meta"),
),
annotations=item.get("annotations"),
_meta=item.get("_meta"),
)
)
else:
logger.warning(
"Binary embedded resource cannot be converted to text, skipping"
"Embedded resource missing text/blob payload, skipping"
)
else:
logger.warning(
Expand Down
35 changes: 11 additions & 24 deletions src/claude_agent_sdk/_internal/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,32 +498,19 @@ async def _handle_sdk_mcp_request(
"mimeType": getattr(item, "mimeType", ""),
}
)
elif item_type == "resource_link":
parts = []
name = getattr(item, "name", None)
uri = getattr(item, "uri", None)
desc = getattr(item, "description", None)
if name:
parts.append(name)
if uri:
parts.append(str(uri))
if desc:
parts.append(desc)
content.append(
{
"type": "text",
"text": "\n".join(parts)
if parts
else "Resource link",
}
)
elif item_type == "resource":
resource = getattr(item, "resource", None)
if resource and hasattr(resource, "text"):
content.append({"type": "text", "text": resource.text})
elif item_type in {"audio", "resource", "resource_link"}:
if hasattr(item, "model_dump"):
content.append(
item.model_dump(
mode="json",
by_alias=True,
exclude_none=True,
)
)
else:
logger.warning(
"Binary embedded resource cannot be converted to text, skipping"
"Unsupported structured content %r without model_dump, skipping",
item_type,
)
else:
logger.warning(
Expand Down
170 changes: 148 additions & 22 deletions tests/test_sdk_mcp_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import base64
import json
import logging
from typing import Any

Expand All @@ -17,6 +18,7 @@
create_sdk_mcp_server,
tool,
)
from claude_agent_sdk._internal.query import Query


@pytest.mark.asyncio
Expand Down Expand Up @@ -303,6 +305,56 @@ async def generate_chart(args: dict[str, Any]) -> dict[str, Any]:
assert tool_executions[0]["args"]["title"] == "Sales Report"


@pytest.mark.asyncio
async def test_structured_resource_results_round_trip_through_sdk_bridge():
"""Test that structured resource blocks survive the SDK MCP round-trip."""

payload = {"status": "ok", "items": [1, 2, 3]}

@tool("get_status", "Return structured JSON status", {})
async def get_status(args: dict[str, Any]) -> dict[str, Any]:
return {
"content": [
{
"type": "resource",
"resource": {
"uri": "file:///status.json",
"mimeType": "application/json",
"text": json.dumps(payload),
},
}
]
}

server_config = create_sdk_mcp_server(
name="structured-json", version="1.0.0", tools=[get_status]
)

query_instance = Query.__new__(Query)
query_instance.sdk_mcp_servers = {"structured": server_config["instance"]}

response = await query_instance._handle_sdk_mcp_request(
"structured",
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": "get_status", "arguments": {}},
},
)

assert response["result"]["content"] == [
{
"type": "resource",
"resource": {
"uri": "file:///status.json",
"mimeType": "application/json",
"text": json.dumps(payload),
},
}
]


@pytest.mark.asyncio
async def test_tool_annotations():
"""Test that tool annotations are stored and flow through list_tools."""
Expand Down Expand Up @@ -419,8 +471,8 @@ async def plain_tool(args: dict[str, Any]) -> dict[str, Any]:


@pytest.mark.asyncio
async def test_resource_link_content_converted_to_text():
"""Test that resource_link content blocks are converted to text."""
async def test_resource_link_content_is_preserved():
"""Test that resource_link content blocks stay structured."""

@tool("get_resource", "Returns a resource link", {"url": str})
async def get_resource(args: dict[str, Any]) -> dict[str, Any]:
Expand Down Expand Up @@ -451,15 +503,15 @@ async def get_resource(args: dict[str, Any]) -> dict[str, Any]:
result = await call_handler(request)

assert len(result.root.content) == 1
assert result.root.content[0].type == "text"
assert "My Document" in result.root.content[0].text
assert "https://example.com/doc.pdf" in result.root.content[0].text
assert "A test document" in result.root.content[0].text
assert result.root.content[0].type == "resource_link"
assert result.root.content[0].name == "My Document"
assert str(result.root.content[0].uri) == "https://example.com/doc.pdf"
assert result.root.content[0].description == "A test document"


@pytest.mark.asyncio
async def test_embedded_resource_text_content_converted():
"""Test that embedded resource with text content is converted to text."""
async def test_embedded_resource_text_content_is_preserved():
"""Test that embedded resource text content stays structured."""

@tool("get_embedded", "Returns an embedded resource", {})
async def get_embedded(args: dict[str, Any]) -> dict[str, Any]:
Expand Down Expand Up @@ -489,15 +541,15 @@ async def get_embedded(args: dict[str, Any]) -> dict[str, Any]:
result = await call_handler(request)

assert len(result.root.content) == 1
assert result.root.content[0].type == "text"
assert result.root.content[0].text == "File contents here"
assert result.root.content[0].type == "resource"
assert str(result.root.content[0].resource.uri) == "file:///test.txt"
assert result.root.content[0].resource.text == "File contents here"
assert result.root.content[0].resource.mimeType == "text/plain"


@pytest.mark.asyncio
async def test_binary_embedded_resource_skipped_with_warning(
caplog: pytest.LogCaptureFixture,
):
"""Test that binary embedded resources are skipped with a warning."""
async def test_binary_embedded_resource_is_preserved():
"""Test that binary embedded resources stay structured."""

@tool("get_binary", "Returns a binary embedded resource", {})
async def get_binary(args: dict[str, Any]) -> dict[str, Any]:
Expand All @@ -524,11 +576,81 @@ async def get_binary(args: dict[str, Any]) -> dict[str, Any]:
method="tools/call",
params=CallToolRequestParams(name="get_binary", arguments={}),
)
result = await call_handler(request)

assert len(result.root.content) == 1
assert result.root.content[0].type == "resource"
assert str(result.root.content[0].resource.uri) == "file:///image.png"
assert result.root.content[0].resource.blob == "iVBORw0KGgo="
assert result.root.content[0].resource.mimeType == "image/png"


@pytest.mark.asyncio
async def test_malformed_embedded_resource_skipped_with_warning(
caplog: pytest.LogCaptureFixture,
):
"""Malformed embedded resources should be skipped with a warning."""

@tool("get_malformed_resource", "Returns malformed resource", {})
async def get_malformed_resource(args: dict[str, Any]) -> dict[str, Any]:
return {
"content": [
{
"type": "resource",
"resource": "not-a-dict",
},
]
}

server_config = create_sdk_mcp_server(
name="malformed-resource-test", tools=[get_malformed_resource]
)
server = server_config["instance"]
call_handler = server.request_handlers[CallToolRequest]

request = CallToolRequest(
method="tools/call",
params=CallToolRequestParams(name="get_malformed_resource", arguments={}),
)
with caplog.at_level(logging.WARNING):
result = await call_handler(request)

assert len(result.root.content) == 0
assert "Binary embedded resource" in caplog.text
assert "Embedded resource payload must be a dict" in caplog.text


@pytest.mark.asyncio
async def test_malformed_resource_link_skipped_with_warning(
caplog: pytest.LogCaptureFixture,
):
"""Malformed resource links should be skipped with a warning."""

@tool("get_bad_link", "Returns malformed resource link", {})
async def get_bad_link(args: dict[str, Any]) -> dict[str, Any]:
return {
"content": [
{
"type": "resource_link",
"name": "Incomplete link",
},
]
}

server_config = create_sdk_mcp_server(
name="malformed-link-test", tools=[get_bad_link]
)
server = server_config["instance"]
call_handler = server.request_handlers[CallToolRequest]

request = CallToolRequest(
method="tools/call",
params=CallToolRequestParams(name="get_bad_link", arguments={}),
)
with caplog.at_level(logging.WARNING):
result = await call_handler(request)

assert len(result.root.content) == 0
assert "Resource link missing required name/uri" in caplog.text


@pytest.mark.asyncio
Expand Down Expand Up @@ -563,7 +685,7 @@ async def get_unknown(args: dict[str, Any]) -> dict[str, Any]:

@pytest.mark.asyncio
async def test_mixed_content_types_with_resource_link():
"""Test that mixed content with text, image, and resource_link works."""
"""Test that mixed content preserves structured resource links."""

png_data = base64.b64encode(b"\x89PNG\r\n\x1a\n").decode("utf-8")

Expand Down Expand Up @@ -595,13 +717,13 @@ async def get_mixed(args: dict[str, Any]) -> dict[str, Any]:
assert result.root.content[0].type == "text"
assert result.root.content[0].text == "Here is the document:"
assert result.root.content[1].type == "image"
assert result.root.content[2].type == "text"
assert "Report" in result.root.content[2].text
assert result.root.content[2].type == "resource_link"
assert result.root.content[2].name == "Report"


@pytest.mark.asyncio
async def test_jsonrpc_bridge_resource_link():
"""Test that the JSONRPC bridge converts resource_link content to text."""
"""Test that the JSONRPC bridge preserves resource_link content."""
from claude_agent_sdk._internal.query import Query

@tool("link_tool", "Returns a link", {})
Expand Down Expand Up @@ -635,6 +757,10 @@ async def link_tool(args: dict[str, Any]) -> dict[str, Any]:
assert response is not None
result_content = response["result"]["content"]
assert len(result_content) == 1
assert result_content[0]["type"] == "text"
assert "API Docs" in result_content[0]["text"]
assert "https://api.example.com" in result_content[0]["text"]
assert result_content[0]["type"] == "resource_link"
assert result_content[0]["name"] == "API Docs"
assert result_content[0]["uri"] in (
"https://api.example.com",
"https://api.example.com/",
)
assert result_content[0]["description"] == "The API documentation"