Skip to content

Commit e4857f6

Browse files
nabinchhaclaude
andauthored
feat: add Streamable HTTP transport support for remote MCP providers (#358)
* feat: add Streamable HTTP transport support for remote MCP providers (#357) Add `streamable_http` as a supported transport type for `MCPProvider`, enabling connections to MCP servers that use the Streamable HTTP protocol (e.g. Tavily remote endpoints). Previously only SSE transport was supported, causing silent 5-minute timeouts when connecting to incompatible endpoints. - Expand `MCPProvider.provider_type` to `Literal["sse", "streamable_http"]` (default remains `"sse"` for backwards compatibility) - Route `streamable_http` providers through `streamablehttp_client` from the MCP SDK in `MCPIOService._get_or_create_session()` - Handle variable-length context manager results from MCP transport clients - Add `DataDesigner.list_mcp_tool_names()` for discovering available tools - Update CLI form builder and controller to support the new transport option - Add tests for streamable_http config, session creation, and form builder Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * updates * simplify import * address greptile comments --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent f251446 commit e4857f6

12 files changed

Lines changed: 215 additions & 44 deletions

File tree

docs/code_reference/mcp.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ The `mcp` module defines configuration and execution classes for tool use via MC
44

55
## Configuration Classes
66

7-
[MCPProvider](#data_designer.config.mcp.MCPProvider) configures remote MCP servers via SSE transport. [LocalStdioMCPProvider](#data_designer.config.mcp.LocalStdioMCPProvider) configures local MCP servers as subprocesses via stdio transport. [ToolConfig](#data_designer.config.mcp.ToolConfig) defines which tools are available for LLM columns and how they are constrained.
7+
[MCPProvider](#data_designer.config.mcp.MCPProvider) configures remote MCP servers via SSE or Streamable HTTP transport. [LocalStdioMCPProvider](#data_designer.config.mcp.LocalStdioMCPProvider) configures local MCP servers as subprocesses via stdio transport. [ToolConfig](#data_designer.config.mcp.ToolConfig) defines which tools are available for LLM columns and how they are constrained.
88

99
For user-facing guides, see:
1010

docs/concepts/mcp/configure-mcp-cli.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,15 @@ data-designer config mcp
4949
The wizard first asks you to choose a provider type:
5050

5151
1. **Remote SSE**: Connect to a pre-existing MCP server via HTTP Server-Sent Events
52-
2. **Local stdio subprocess**: Launch an MCP server as a subprocess
52+
2. **Remote Streamable HTTP**: Connect to a pre-existing MCP server via Streamable HTTP
53+
3. **Local stdio subprocess**: Launch an MCP server as a subprocess
5354

54-
### Remote SSE Configuration
55+
### Remote SSE / Streamable HTTP Configuration
5556

56-
When configuring a Remote SSE provider, you'll be prompted for:
57+
When configuring a remote provider (SSE or Streamable HTTP), you'll be prompted for:
5758

5859
- **Name**: Unique identifier (e.g., `"doc-search"`)
59-
- **Endpoint**: SSE endpoint URL (e.g., `"http://localhost:8080/sse"`)
60+
- **Endpoint**: Server endpoint URL (e.g., `"http://localhost:8080/sse"` or `"https://mcp.example.com/mcp"`)
6061
- **API Key**: Optional API key or environment variable name
6162

6263
### Local Stdio Configuration

docs/concepts/mcp/mcp-providers.md

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,36 +8,45 @@ An MCP provider defines how Data Designer connects to a tool server. Data Design
88

99
| Provider Class | Connection Method | Use Case |
1010
|---------------|-------------------|----------|
11-
| `MCPProvider` | HTTP Server-Sent Events | Connect to a pre-existing MCP server |
11+
| `MCPProvider` | SSE or Streamable HTTP | Connect to a pre-existing MCP server |
1212
| `LocalStdioMCPProvider` | Subprocess via stdin/stdout | Launch an MCP server as a subprocess |
1313

1414
When you create a `ToolConfig`, you reference providers by name, and Data Designer uses those provider settings to communicate with the appropriate MCP servers.
1515

16-
## MCPProvider (Remote SSE)
16+
## MCPProvider (Remote)
1717

18-
Use `MCPProvider` to connect to a pre-existing MCP server via Server-Sent Events:
18+
Use `MCPProvider` to connect to a pre-existing MCP server. Both SSE (Server-Sent Events) and Streamable HTTP transports are supported:
1919

2020
```python
2121
import data_designer.config as dd
2222
from data_designer.interface import DataDesigner
2323

24-
mcp_provider = dd.MCPProvider(
24+
# SSE transport (default)
25+
sse_provider = dd.MCPProvider(
2526
name="remote-mcp",
2627
endpoint="http://localhost:8080/sse",
2728
api_key="MCP_API_KEY", # Environment variable name
2829
)
2930

30-
data_designer = DataDesigner(mcp_providers=[mcp_provider])
31+
# Streamable HTTP transport
32+
http_provider = dd.MCPProvider(
33+
name="remote-tools",
34+
endpoint="https://mcp.example.com/mcp",
35+
api_key="MCP_API_KEY",
36+
provider_type="streamable_http",
37+
)
38+
39+
data_designer = DataDesigner(mcp_providers=[sse_provider, http_provider])
3140
```
3241

3342
### MCPProvider Fields
3443

3544
| Field | Type | Required | Description |
3645
|-------|------|----------|-------------|
3746
| `name` | `str` | Yes | Unique identifier for the provider |
38-
| `endpoint` | `str` | Yes | SSE endpoint URL (e.g., `"http://localhost:8080/sse"`) |
47+
| `endpoint` | `str` | Yes | Endpoint URL for the remote MCP server |
3948
| `api_key` | `str` | No | API key or environment variable name |
40-
| `provider_type` | `str` | No | Always `"sse"` (set automatically) |
49+
| `provider_type` | `str` | No | Transport type: `"sse"` (default) or `"streamable_http"` |
4150

4251
## LocalStdioMCPProvider (Subprocess)
4352

@@ -103,6 +112,12 @@ providers:
103112
endpoint: http://localhost:8080/sse
104113
api_key: ${MCP_API_KEY}
105114

115+
# Remote Streamable HTTP provider
116+
- name: remote-tools
117+
provider_type: streamable_http
118+
endpoint: https://mcp.example.com/mcp
119+
api_key: ${MCP_API_KEY}
120+
106121
# Local stdio provider
107122
- name: local-tools
108123
provider_type: stdio

packages/data-designer-config/src/data_designer/config/mcp.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,36 @@ class MCPProvider(ConfigBase):
1515
"""Configuration for a remote MCP server connection.
1616
1717
MCPProvider is used to connect to pre-existing MCP servers via SSE (Server-Sent Events)
18-
transport. For local subprocess-based MCP servers, use LocalStdioMCPProvider instead.
18+
or Streamable HTTP transport. For local subprocess-based MCP servers, use
19+
LocalStdioMCPProvider instead.
1920
2021
Attributes:
2122
name (str): Unique name used to reference this MCP provider.
22-
endpoint (str): SSE endpoint URL for connecting to the remote MCP server.
23+
endpoint (str): Endpoint URL for connecting to the remote MCP server.
2324
api_key (str | None): Optional API key for authentication. Defaults to None.
24-
provider_type (Literal["sse"]): Transport type discriminator, always "sse".
25+
provider_type (Literal["sse", "streamable_http"]): Transport type discriminator.
26+
Defaults to ``"sse"``.
2527
2628
Examples:
27-
Remote SSE transport:
29+
Remote SSE transport (default):
2830
2931
>>> MCPProvider(
3032
... name="remote-mcp",
3133
... endpoint="http://localhost:8080/sse",
3234
... api_key="your-api-key",
3335
... )
36+
37+
Remote Streamable HTTP transport:
38+
39+
>>> MCPProvider(
40+
... name="remote-mcp",
41+
... endpoint="https://api.example.com/mcp",
42+
... api_key="your-api-key",
43+
... provider_type="streamable_http",
44+
... )
3445
"""
3546

36-
provider_type: Literal["sse"] = "sse"
47+
provider_type: Literal["sse", "streamable_http"] = "sse"
3748
name: str
3849
endpoint: str
3950
api_key: str | None = None

packages/data-designer-config/tests/config/test_mcp.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,32 @@ def test_mcp_provider_requires_endpoint() -> None:
1414
provider = MCPProvider(name="sse", endpoint="http://localhost:8080")
1515
assert provider.endpoint == "http://localhost:8080"
1616
assert provider.api_key is None
17+
assert provider.provider_type == "sse"
1718

1819
provider_with_key = MCPProvider(name="sse-auth", endpoint="http://localhost:8080", api_key="secret")
1920
assert provider_with_key.api_key == "secret"
2021

2122

23+
def test_mcp_provider_streamable_http() -> None:
24+
provider = MCPProvider(
25+
name="streamable",
26+
endpoint="https://api.example.com/mcp",
27+
provider_type="streamable_http",
28+
)
29+
assert provider.provider_type == "streamable_http"
30+
assert provider.endpoint == "https://api.example.com/mcp"
31+
assert provider.api_key is None
32+
33+
provider_with_key = MCPProvider(
34+
name="streamable-auth",
35+
endpoint="https://api.example.com/mcp",
36+
provider_type="streamable_http",
37+
api_key="secret",
38+
)
39+
assert provider_with_key.api_key == "secret"
40+
assert provider_with_key.provider_type == "streamable_http"
41+
42+
2243
def test_local_stdio_mcp_provider_requires_command() -> None:
2344
with pytest.raises(ValidationError):
2445
LocalStdioMCPProvider(name="missing-command")

packages/data-designer-engine/src/data_designer/engine/mcp/io.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@
3939
from mcp import ClientSession, StdioServerParameters
4040
from mcp.client.sse import sse_client
4141
from mcp.client.stdio import stdio_client
42+
from mcp.client.streamable_http import streamablehttp_client
4243

43-
from data_designer.config.mcp import LocalStdioMCPProvider, MCPProviderT
44+
from data_designer.config.mcp import LocalStdioMCPProvider, MCPProvider, MCPProviderT
4445
from data_designer.engine.mcp.errors import MCPToolError
4546
from data_designer.engine.mcp.registry import MCPToolDefinition, MCPToolResult
4647

@@ -211,11 +212,15 @@ async def create_session() -> ClientSession:
211212
env=provider.env,
212213
)
213214
ctx = stdio_client(params)
215+
elif isinstance(provider, MCPProvider) and provider.provider_type == "streamable_http":
216+
headers = _build_auth_headers(provider.api_key)
217+
ctx = streamablehttp_client(provider.endpoint, headers=headers)
214218
else:
215219
headers = _build_auth_headers(provider.api_key)
216220
ctx = sse_client(provider.endpoint, headers=headers)
217221

218-
read, write = await ctx.__aenter__()
222+
ctx_result = await ctx.__aenter__()
223+
read, write = ctx_result[0], ctx_result[1]
219224
new_session = ClientSession(read, write)
220225
await new_session.__aenter__()
221226
await new_session.initialize()
@@ -399,6 +404,11 @@ def list_tools(provider: MCPProviderT, timeout_sec: float | None = None) -> tupl
399404
return _MCP_IO_SERVICE.list_tools(provider, timeout_sec=timeout_sec)
400405

401406

407+
def list_tool_names(provider: MCPProviderT, timeout_sec: float) -> list[str]:
408+
"""Return the names of all tools available on an MCP provider."""
409+
return [t.name for t in _MCP_IO_SERVICE.list_tools(provider, timeout_sec=timeout_sec)]
410+
411+
402412
def call_tools(
403413
calls: list[tuple[MCPProviderT, str, dict[str, Any]]],
404414
*,
@@ -434,7 +444,7 @@ def get_session_pool_info() -> dict[str, Any]:
434444

435445

436446
def _build_auth_headers(api_key: str | None) -> dict[str, Any] | None:
437-
"""Build authentication headers for SSE client."""
447+
"""Build authentication headers for remote MCP clients."""
438448
if not api_key:
439449
return None
440450
return {"Authorization": f"Bearer {api_key}"}

packages/data-designer-engine/src/data_designer/engine/testing/fixtures.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,17 @@ def stub_sse_provider() -> MCPProvider:
8181
)
8282

8383

84+
@pytest.fixture
85+
def stub_streamable_http_provider() -> MCPProvider:
86+
"""Create a stub Streamable HTTP MCP provider for testing."""
87+
return MCPProvider(
88+
name="test-streamable-http",
89+
endpoint="https://api.example.com/mcp",
90+
api_key="test-key",
91+
provider_type="streamable_http",
92+
)
93+
94+
8495
# =============================================================================
8596
# Tool config fixtures
8697
# =============================================================================

packages/data-designer-engine/tests/engine/mcp/test_mcp_io.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,59 @@ def mock_stdio_client(params: Any) -> MockContextManager:
693693
mcp_io.clear_session_pool()
694694

695695

696+
# =============================================================================
697+
# Streamable HTTP session creation tests
698+
# =============================================================================
699+
700+
701+
@pytest.mark.asyncio
702+
async def test_get_or_create_session_for_streamable_http_provider(
703+
monkeypatch: pytest.MonkeyPatch, stub_streamable_http_provider: MCPProvider
704+
) -> None:
705+
"""Test that _get_or_create_session uses streamablehttp_client for streamable_http providers."""
706+
mcp_io.clear_session_pool()
707+
708+
class MockContextManager:
709+
async def __aenter__(self) -> tuple[Any, Any, Any]:
710+
return ("mock_read", "mock_write", "mock_get_session_id")
711+
712+
async def __aexit__(self, *args: Any) -> None:
713+
pass
714+
715+
class MockSession:
716+
async def __aenter__(self) -> "MockSession":
717+
return self
718+
719+
async def __aexit__(self, *args: Any) -> None:
720+
pass
721+
722+
async def initialize(self) -> None:
723+
pass
724+
725+
streamable_http_client_called = False
726+
received_endpoint: str | None = None
727+
received_headers: dict[str, Any] | None = None
728+
729+
def mock_streamablehttp_client(endpoint: str, headers: dict[str, Any] | None = None) -> MockContextManager:
730+
nonlocal streamable_http_client_called, received_endpoint, received_headers
731+
streamable_http_client_called = True
732+
received_endpoint = endpoint
733+
received_headers = headers
734+
return MockContextManager()
735+
736+
monkeypatch.setattr(mcp_io, "streamablehttp_client", mock_streamablehttp_client)
737+
monkeypatch.setattr(mcp_io, "ClientSession", lambda r, w: MockSession())
738+
739+
session = await mcp_io._MCP_IO_SERVICE._get_or_create_session(stub_streamable_http_provider)
740+
741+
assert streamable_http_client_called
742+
assert received_endpoint == stub_streamable_http_provider.endpoint
743+
assert received_headers == {"Authorization": "Bearer test-key"}
744+
assert session is not None
745+
746+
mcp_io.clear_session_pool()
747+
748+
696749
# =============================================================================
697750
# Session cleanup exception handling tests
698751
# =============================================================================

packages/data-designer/src/data_designer/cli/controllers/mcp_provider_controller.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,8 @@ def _select_provider(self, providers: list[MCPProviderT], prompt: str, default:
218218
options = {}
219219
for p in providers:
220220
if isinstance(p, MCPProvider):
221-
options[p.name] = f"{p.name} (SSE: {p.endpoint})"
221+
transport_label = "Streamable HTTP" if p.provider_type == "streamable_http" else "SSE"
222+
options[p.name] = f"{p.name} ({transport_label}: {p.endpoint})"
222223
elif isinstance(p, LocalStdioMCPProvider):
223224
options[p.name] = f"{p.name} (stdio: {p.command})"
224225
else:

packages/data-designer/src/data_designer/cli/forms/mcp_provider_builder.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from __future__ import annotations
55

6-
from typing import Any
6+
from typing import Any, Literal
77

88
from data_designer.cli.forms.field import TextField
99
from data_designer.cli.forms.form import Form
@@ -42,8 +42,8 @@ def run(self, initial_data: dict[str, Any] | None = None) -> MCPProviderT | None
4242
return None
4343

4444
# Run appropriate form based on provider type
45-
if provider_type == "sse":
46-
result = self._run_sse_form(initial_data)
45+
if provider_type in ("sse", "streamable_http"):
46+
result = self._run_remote_form(provider_type, initial_data)
4747
else: # stdio
4848
result = self._run_stdio_form(initial_data)
4949

@@ -59,6 +59,7 @@ def _select_provider_type(self) -> str | None:
5959
"""Prompt user to select provider type."""
6060
options = {
6161
"sse": "Remote SSE server (connect to existing server)",
62+
"streamable_http": "Remote Streamable HTTP server (connect to existing server)",
6263
"stdio": "Local stdio subprocess (launch server as subprocess)",
6364
}
6465

@@ -70,8 +71,11 @@ def _select_provider_type(self) -> str | None:
7071
allow_back=True,
7172
)
7273

73-
def _run_sse_form(self, initial_data: dict[str, Any] | None = None) -> MCPProvider | None:
74-
"""Run form for remote SSE provider."""
74+
def _run_remote_form(
75+
self, provider_type: Literal["sse", "streamable_http"], initial_data: dict[str, Any] | None = None
76+
) -> MCPProvider | None:
77+
"""Run form for a remote MCP provider (SSE or Streamable HTTP)."""
78+
transport_label = "SSE" if provider_type == "sse" else "Streamable HTTP"
7579
fields = [
7680
TextField(
7781
"name",
@@ -82,7 +86,7 @@ def _run_sse_form(self, initial_data: dict[str, Any] | None = None) -> MCPProvid
8286
),
8387
TextField(
8488
"endpoint",
85-
"SSE endpoint URL",
89+
f"{transport_label} endpoint URL",
8690
default=initial_data.get("endpoint") if initial_data else None,
8791
required=True,
8892
validator=self._validate_endpoint,
@@ -95,7 +99,7 @@ def _run_sse_form(self, initial_data: dict[str, Any] | None = None) -> MCPProvid
9599
),
96100
]
97101

98-
form = Form("Remote SSE Provider", fields)
102+
form = Form(f"Remote {transport_label} Provider", fields)
99103
if initial_data:
100104
form.set_values(initial_data)
101105

@@ -108,6 +112,7 @@ def _run_sse_form(self, initial_data: dict[str, Any] | None = None) -> MCPProvid
108112
name=result["name"],
109113
endpoint=result["endpoint"],
110114
api_key=result.get("api_key") or None,
115+
provider_type=provider_type,
111116
)
112117
except Exception as e:
113118
print_error(f"Configuration error: {e}")

0 commit comments

Comments
 (0)