This guide covers the breaking changes introduced in v2 of the MCP Python SDK and how to update your code.
Version 2 of the MCP Python SDK introduces several breaking changes to improve the API, align with the MCP specification, and provide better type safety.
The deprecated streamablehttp_client function has been removed. Use streamable_http_client instead.
Before (v1):
from mcp.client.streamable_http import streamablehttp_client
async with streamablehttp_client(
url="http://localhost:8000/mcp",
headers={"Authorization": "Bearer token"},
timeout=30,
sse_read_timeout=300,
auth=my_auth,
) as (read_stream, write_stream, get_session_id):
...After (v2):
import httpx
from mcp.client.streamable_http import streamable_http_client
# Configure headers, timeout, and auth on the httpx.AsyncClient
http_client = httpx.AsyncClient(
headers={"Authorization": "Bearer token"},
timeout=httpx.Timeout(30, read=300),
auth=my_auth,
)
async with http_client:
async with streamable_http_client(
url="http://localhost:8000/mcp",
http_client=http_client,
) as (read_stream, write_stream):
...The get_session_id callback (third element of the returned tuple) has been removed from streamable_http_client. The function now returns a 2-tuple (read_stream, write_stream) instead of a 3-tuple.
If you need to capture the session ID (e.g., for session resumption testing), you can use httpx event hooks to capture it from the response headers:
Before (v1):
from mcp.client.streamable_http import streamable_http_client
async with streamable_http_client(url) as (read_stream, write_stream, get_session_id):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
session_id = get_session_id() # Get session ID via callbackAfter (v2):
import httpx
from mcp.client.streamable_http import streamable_http_client
# Option 1: Simply ignore if you don't need the session ID
async with streamable_http_client(url) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
# Option 2: Capture session ID via httpx event hooks if needed
captured_session_ids: list[str] = []
async def capture_session_id(response: httpx.Response) -> None:
session_id = response.headers.get("mcp-session-id")
if session_id:
captured_session_ids.append(session_id)
http_client = httpx.AsyncClient(
event_hooks={"response": [capture_session_id]},
follow_redirects=True,
)
async with http_client:
async with streamable_http_client(url, http_client=http_client) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
session_id = captured_session_ids[0] if captured_session_ids else NoneThe headers, timeout, sse_read_timeout, and auth parameters have been removed from StreamableHTTPTransport. Configure these on the httpx.AsyncClient instead (see example above).
The following deprecated type aliases and classes have been removed from mcp.types:
| Removed | Replacement |
|---|---|
Content |
ContentBlock |
ResourceReference |
ResourceTemplateReference |
Cursor |
Use str directly |
MethodT |
Internal TypeVar, not intended for public use |
RequestParamsT |
Internal TypeVar, not intended for public use |
NotificationParamsT |
Internal TypeVar, not intended for public use |
Before (v1):
from mcp.types import Content, ResourceReference, CursorAfter (v2):
from mcp.types import ContentBlock, ResourceTemplateReference
# Use `str` instead of `Cursor` for pagination cursorsThe deprecated args parameter has been removed from ClientSessionGroup.call_tool(). Use arguments instead.
Before (v1):
result = await session_group.call_tool("my_tool", args={"key": "value"})After (v2):
result = await session_group.call_tool("my_tool", arguments={"key": "value"})The deprecated cursor parameter has been removed from the following ClientSession methods:
list_resources()list_resource_templates()list_prompts()list_tools()
Use params=PaginatedRequestParams(cursor=...) instead.
Before (v1):
result = await session.list_resources(cursor="next_page_token")
result = await session.list_tools(cursor="next_page_token")After (v2):
from mcp.types import PaginatedRequestParams
result = await session.list_resources(params=PaginatedRequestParams(cursor="next_page_token"))
result = await session.list_tools(params=PaginatedRequestParams(cursor="next_page_token"))The McpError exception class has been renamed to MCPError for consistent naming with the MCP acronym style used throughout the SDK.
Before (v1):
from mcp.shared.exceptions import McpError
try:
result = await session.call_tool("my_tool")
except McpError as e:
print(f"Error: {e.error.message}")After (v2):
from mcp.shared.exceptions import MCPError
try:
result = await session.call_tool("my_tool")
except MCPError as e:
print(f"Error: {e.message}")MCPError is also exported from the top-level mcp package:
from mcp import MCPErrorThe FastMCP class has been renamed to MCPServer to better reflect its role as the main server class in the SDK. This is a simple rename with no functional changes to the class itself.
Before (v1):
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Demo")After (v2):
from mcp.server.mcpserver import MCPServer
mcp = MCPServer("Demo")The mount_path parameter has been removed from MCPServer.__init__(), MCPServer.run(), MCPServer.run_sse_async(), and MCPServer.sse_app(). It was also removed from the Settings class.
This parameter was redundant because the SSE transport already handles sub-path mounting via ASGI's standard root_path mechanism. When using Starlette's Mount("/path", app=mcp.sse_app()), Starlette automatically sets root_path in the ASGI scope, and the SseServerTransport uses this to construct the correct message endpoint path.
Transport-specific parameters have been moved from the MCPServer constructor to the run(), sse_app(), and streamable_http_app() methods. This provides better separation of concerns - the constructor now only handles server identity and authentication, while transport configuration is passed when starting the server.
Parameters moved:
host,port- HTTP server bindingsse_path,message_path- SSE transport pathsstreamable_http_path- StreamableHTTP endpoint pathjson_response,stateless_http- StreamableHTTP behaviorevent_store,retry_interval- StreamableHTTP event handlingtransport_security- DNS rebinding protection
Before (v1):
from mcp.server.fastmcp import FastMCP
# Transport params in constructor
mcp = FastMCP("Demo", json_response=True, stateless_http=True)
mcp.run(transport="streamable-http")
# Or for SSE
mcp = FastMCP("Server", host="0.0.0.0", port=9000, sse_path="/events")
mcp.run(transport="sse")After (v2):
from mcp.server.mcpserver import MCPServer
# Transport params passed to run()
mcp = MCPServer("Demo")
mcp.run(transport="streamable-http", json_response=True, stateless_http=True)
# Or for SSE
mcp = MCPServer("Server")
mcp.run(transport="sse", host="0.0.0.0", port=9000, sse_path="/events")For mounted apps:
When mounting in a Starlette app, pass transport params to the app methods:
# Before (v1)
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("App", json_response=True)
app = Starlette(routes=[Mount("/", app=mcp.streamable_http_app())])
# After (v2)
from mcp.server.mcpserver import MCPServer
mcp = MCPServer("App")
app = Starlette(routes=[Mount("/", app=mcp.streamable_http_app(json_response=True))])Note: DNS rebinding protection is automatically enabled when host is 127.0.0.1, localhost, or ::1. This now happens in sse_app() and streamable_http_app() instead of the constructor.
The following union types are no longer RootModel subclasses:
ClientRequestServerRequestClientNotificationServerNotificationClientResultServerResultJSONRPCMessage
This means you can no longer access .root on these types or use model_validate() directly on them. Instead, use the provided TypeAdapter instances for validation.
Before (v1):
from mcp.types import ClientRequest, ServerNotification
# Using RootModel.model_validate()
request = ClientRequest.model_validate(data)
actual_request = request.root # Accessing the wrapped value
notification = ServerNotification.model_validate(data)
actual_notification = notification.rootAfter (v2):
from mcp.types import client_request_adapter, server_notification_adapter
# Using TypeAdapter.validate_python()
request = client_request_adapter.validate_python(data)
# No .root access needed - request is the actual type
notification = server_notification_adapter.validate_python(data)
# No .root access needed - notification is the actual typeAvailable adapters:
| Union Type | Adapter |
|---|---|
ClientRequest |
client_request_adapter |
ServerRequest |
server_request_adapter |
ClientNotification |
client_notification_adapter |
ServerNotification |
server_notification_adapter |
ClientResult |
client_result_adapter |
ServerResult |
server_result_adapter |
JSONRPCMessage |
jsonrpc_message_adapter |
All adapters are exported from mcp.types.
The nested RequestParams.Meta Pydantic model class has been replaced with a top-level RequestParamsMeta TypedDict. This affects the ctx.meta field in request handlers and any code that imports or references this type.
Key changes:
RequestParams.Meta(Pydantic model) →RequestParamsMeta(TypedDict)- Attribute access (
meta.progress_token) → Dictionary access (meta.get("progress_token")) progress_tokenfield changed fromProgressToken | None = NonetoNotRequired[ProgressToken]
In request context handlers:
# Before (v1)
@server.call_tool()
async def handle_tool(name: str, arguments: dict) -> list[TextContent]:
ctx = server.request_context
if ctx.meta and ctx.meta.progress_token:
await ctx.session.send_progress_notification(ctx.meta.progress_token, 0.5, 100)
# After (v2)
async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult:
if ctx.meta and "progress_token" in ctx.meta:
await ctx.session.send_progress_notification(ctx.meta["progress_token"], 0.5, 100)
...
server = Server("my-server", on_call_tool=handle_call_tool)The RequestContext class has been split to separate shared fields from server-specific fields. The shared RequestContext now only takes 1 type parameter (the session type) instead of 3.
RequestContext changes:
- Type parameters reduced from
RequestContext[SessionT, LifespanContextT, RequestT]toRequestContext[SessionT] - Server-specific fields (
lifespan_context,experimental,request,close_sse_stream,close_standalone_sse_stream) moved to newServerRequestContextclass inmcp.server.context
Before (v1):
from mcp.client.session import ClientSession
from mcp.shared.context import RequestContext, LifespanContextT, RequestT
# RequestContext with 3 type parameters
ctx: RequestContext[ClientSession, LifespanContextT, RequestT]After (v2):
from mcp.client.context import ClientRequestContext
from mcp.server.context import ServerRequestContext, LifespanContextT, RequestT
# For client-side context (sampling, elicitation, list_roots callbacks)
ctx: ClientRequestContext
# For server-specific context with lifespan and request types
server_ctx: ServerRequestContext[LifespanContextT, RequestT]The mcp.shared.progress module (ProgressContext, Progress, and the progress() context manager) has been removed. This module had no real-world adoption — all users send progress notifications via Context.report_progress() or session.send_progress_notification() directly.
Before:
from mcp.shared.progress import progress
with progress(ctx, total=100) as p:
await p.progress(25)After — use Context.report_progress() (recommended):
@server.tool()
async def my_tool(x: int, ctx: Context) -> str:
await ctx.report_progress(25, 100)
return "done"After — use session.send_progress_notification() (low-level):
await session.send_progress_notification(
progress_token=progress_token,
progress=25,
total=100,
)The uri field on resource-related types now uses str instead of Pydantic's AnyUrl. This aligns with the MCP specification schema which defines URIs as plain strings (uri: string) without strict URL validation. This change allows relative paths like users/me that were previously rejected.
Before (v1):
from pydantic import AnyUrl
from mcp.types import Resource
# Required wrapping in AnyUrl
resource = Resource(name="test", uri=AnyUrl("users/me")) # Would fail validationAfter (v2):
from mcp.types import Resource
# Plain strings accepted
resource = Resource(name="test", uri="users/me") # Works
resource = Resource(name="test", uri="custom://scheme") # Works
resource = Resource(name="test", uri="https://example.com") # WorksIf your code passes AnyUrl objects to URI fields, convert them to strings:
# If you have an AnyUrl from elsewhere
uri = str(my_any_url) # Convert to stringAffected types:
Resource.uriReadResourceRequestParams.uriResourceContents.uri(and subclassesTextResourceContents,BlobResourceContents)SubscribeRequestParams.uriUnsubscribeRequestParams.uriResourceUpdatedNotificationParams.uri
The Client and ClientSession methods read_resource(), subscribe_resource(), and unsubscribe_resource() now only accept str for the uri parameter. If you were passing AnyUrl objects, convert them to strings:
# Before (v1)
from pydantic import AnyUrl
await client.read_resource(AnyUrl("test://resource"))
# After (v2)
await client.read_resource("test://resource")
# Or if you have an AnyUrl from elsewhere:
await client.read_resource(str(my_any_url))All parameters after name are now keyword-only. If you were passing version or other parameters positionally, use keyword arguments instead:
# Before (v1)
server = Server("my-server", "1.0")
# After (v2)
server = Server("my-server", version="1.0")The Server class previously had two type parameters: Server[LifespanResultT, RequestT]. The RequestT parameter has been removed — handlers now receive typed params directly rather than a generic request type.
# Before (v1)
from typing import Any
from mcp.server.lowlevel.server import Server
server: Server[dict[str, Any], Any] = Server(...)
# After (v2)
from typing import Any
from mcp.server import Server
server: Server[dict[str, Any]] = Server(...)The public server.request_handlers and server.notification_handlers dictionaries have been removed. Handler registration is now done exclusively through constructor on_* keyword arguments. There is no public API to register handlers after construction.
# Before (v1) — direct dict access
from mcp.types import ListToolsRequest
if ListToolsRequest in server.request_handlers:
...
# After (v2) — no public access to handler dicts
# Use the on_* constructor params to register handlers
server = Server("my-server", on_list_tools=handle_list_tools)The lowlevel Server class no longer uses decorator methods for handler registration. Instead, handlers are passed as on_* keyword arguments to the constructor.
Before (v1):
from mcp.server.lowlevel.server import Server
server = Server("my-server")
@server.list_tools()
async def handle_list_tools():
return [types.Tool(name="my_tool", description="A tool", inputSchema={})]
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict):
return [types.TextContent(type="text", text=f"Called {name}")]After (v2):
from mcp.server import Server, ServerRequestContext
from mcp.types import (
CallToolRequestParams,
CallToolResult,
ListToolsResult,
PaginatedRequestParams,
TextContent,
Tool,
)
async def handle_list_tools(ctx: ServerRequestContext, params: PaginatedRequestParams | None) -> ListToolsResult:
return ListToolsResult(tools=[Tool(name="my_tool", description="A tool", input_schema={})])
async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult:
return CallToolResult(
content=[TextContent(type="text", text=f"Called {params.name}")],
is_error=False,
)
server = Server("my-server", on_list_tools=handle_list_tools, on_call_tool=handle_call_tool)Key differences:
- Handlers receive
(ctx, params)instead of the full request object or unpacked arguments.ctxis aServerRequestContextwithsession,lifespan_context, andexperimentalfields (plusrequest_id,meta, etc. for request handlers).paramsis the typed request params object. - Handlers return the full result type (e.g.
ListToolsResult) rather than unwrapped values (e.g.list[Tool]). - The automatic
jsonschemainput/output validation that the oldcall_tool()decorator performed has been removed. There is no built-in replacement — if you relied on schema validation in the lowlevel server, you will need to validate inputs yourself in your handler.
Notification handlers:
from mcp.server import Server, ServerRequestContext
from mcp.types import ProgressNotificationParams
async def handle_progress(ctx: ServerRequestContext, params: ProgressNotificationParams) -> None:
print(f"Progress: {params.progress}/{params.total}")
server = Server("my-server", on_progress=handle_progress)The old decorator-based handlers performed significant automatic wrapping of return values. This magic has been removed — handlers now return fully constructed result types. If you want these conveniences, use MCPServer (previously FastMCP) instead of the lowlevel Server.
call_tool() — structured output wrapping removed:
The old decorator accepted several return types and auto-wrapped them into CallToolResult:
# Before (v1) — returning a dict auto-wrapped into structured_content + JSON TextContent
@server.call_tool()
async def handle(name: str, arguments: dict) -> dict:
return {"temperature": 22.5, "city": "London"}
# Before (v1) — returning a list auto-wrapped into CallToolResult.content
@server.call_tool()
async def handle(name: str, arguments: dict) -> list[TextContent]:
return [TextContent(type="text", text="Done")]# After (v2) — construct the full result yourself
import json
async def handle(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult:
data = {"temperature": 22.5, "city": "London"}
return CallToolResult(
content=[TextContent(type="text", text=json.dumps(data, indent=2))],
structured_content=data,
)Note: params.arguments can be None (the old decorator defaulted it to {}). Use params.arguments or {} to preserve the old behavior.
read_resource() — content type wrapping removed:
The old decorator auto-wrapped str into TextResourceContents and bytes into BlobResourceContents (with base64 encoding), and applied a default mime type of text/plain:
# Before (v1) — str/bytes auto-wrapped with mime type defaulting
@server.read_resource()
async def handle(uri: str) -> str:
return "file contents"
@server.read_resource()
async def handle(uri: str) -> bytes:
return b"\x89PNG..."# After (v2) — construct TextResourceContents or BlobResourceContents yourself
import base64
async def handle_read(ctx: ServerRequestContext, params: ReadResourceRequestParams) -> ReadResourceResult:
# Text content
return ReadResourceResult(
contents=[TextResourceContents(uri=str(params.uri), text="file contents", mime_type="text/plain")]
)
async def handle_read(ctx: ServerRequestContext, params: ReadResourceRequestParams) -> ReadResourceResult:
# Binary content — you must base64-encode it yourself
return ReadResourceResult(
contents=[BlobResourceContents(
uri=str(params.uri),
blob=base64.b64encode(b"\x89PNG...").decode("utf-8"),
mime_type="image/png",
)]
)list_tools(), list_resources(), list_prompts() — list wrapping removed:
The old decorators accepted bare lists and wrapped them into the result type:
# Before (v1)
@server.list_tools()
async def handle() -> list[Tool]:
return [Tool(name="my_tool", ...)]
# After (v2)
async def handle(ctx: ServerRequestContext, params: PaginatedRequestParams | None) -> ListToolsResult:
return ListToolsResult(tools=[Tool(name="my_tool", ...)])Using MCPServer instead:
If you prefer the convenience of automatic wrapping, use MCPServer which still provides these features through its @mcp.tool(), @mcp.resource(), and @mcp.prompt() decorators. The lowlevel Server is intentionally minimal — it provides no magic and gives you full control over the MCP protocol types.
The server.request_context property has been removed. Request context is now passed directly to handlers as the first argument (ctx). The request_ctx module-level contextvar is now an internal implementation detail and should not be relied upon.
Before (v1):
from mcp.server.lowlevel.server import request_ctx
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict):
ctx = server.request_context # or request_ctx.get()
await ctx.session.send_log_message(level="info", data="Processing...")
return [types.TextContent(type="text", text="Done")]After (v2):
from mcp.server import ServerRequestContext
from mcp.types import CallToolRequestParams, CallToolResult, TextContent
async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult:
await ctx.session.send_log_message(level="info", data="Processing...")
return CallToolResult(
content=[TextContent(type="text", text="Done")],
is_error=False,
)The RequestContext class now uses optional fields for request-specific data (request_id, meta, etc.) so it can be used for both request and notification handlers. In notification handlers, these fields are None.
from mcp.server import ServerRequestContext
# request_id, meta, etc. are available in request handlers
# but None in notification handlersThe experimental decorator methods on ExperimentalHandlers (@server.experimental.list_tasks(), @server.experimental.get_task(), etc.) have been removed.
Default task handlers are still registered automatically via server.experimental.enable_tasks(). Custom handlers can be passed as on_* kwargs to override specific defaults.
Before (v1):
server = Server("my-server")
server.experimental.enable_tasks()
@server.experimental.get_task()
async def custom_get_task(request: GetTaskRequest) -> GetTaskResult:
...After (v2):
from mcp.server import Server, ServerRequestContext
from mcp.types import GetTaskRequestParams, GetTaskResult
async def custom_get_task(ctx: ServerRequestContext, params: GetTaskRequestParams) -> GetTaskResult:
...
server = Server("my-server")
server.experimental.enable_tasks(on_get_task=custom_get_task)The session hierarchy has been refactored to support pluggable transport implementations. This introduces several breaking changes:
A new runtime-checkable Protocol, AbstractBaseSession, establishes a transport-agnostic contract for all MCP sessions. It ensures that client and server sessions share a consistent communication interface regardless of the transport used.
To maintain a clean architectural boundary, AbstractBaseSession is a pure interface—it defines what methods must exist but does not manage how they work.
-
No State Management: The protocol does not handle internal machinery like task groups, response streams, or buffers.
-
Implementation Ownership: The concrete class is fully responsible for managing its own state and lifecycle for how it sends and receives data.
-
No Inheritance Needed: As a structural protocol, you no longer need to call super().__init__() or inherit from a base class to satisfy the contract.
Previously, all custom sessions were required to inherit from BaseSession, which locked them into a specific architecture involving memory streams and JSON-RPC message routing, but with this change, users have the flexibility to implement their own transport logic with either of the following options:
-
BaseSession (Implementation): Remains available for transports that follow the standard pattern of reading and writing JSON-RPC messages over streams. You can continue to inherit from this if you want the SDK to handle message linking and routing for you.
-
AbstractBaseSession (Interface): A new stateless protocol for transports that do not use standard streams or JSON-RPC. It defines the "what" (method signatures) without enforcing the "how" (internal machinery).
Before:
from mcp.shared.session import BaseSession
# Locked into the BaseSession implementation details
class MyCustomSession(BaseSession[...]):
def __init__(self, read_stream, write_stream):
# Mandatory: must use streams and JSON-RPC machinery
super().__init__(read_stream, write_stream)After:
# OPTION A: Continue using BaseSession (for stream-based JSON-RPC)
class MyStreamSession(BaseSession):
...
# OPTION B: Use AbstractBaseSession (for non-stream/custom transports)
class MyDirectApiSession: # Satisfies AbstractBaseSession protocol
def __init__(self):
# No streams or super().__init__() required.
# Manage your own custom transport logic here.
self._client = CustomTransportClient()
async def send_request(self, ...):
return await self._client.execute(...)A new runtime-checkable Protocol, BaseClientSession, has been introduced to establish a common interface for all MCP client sessions. This protocol defines the essential methods—such as send_request, send_notification, and initialize, etc. — that a session must implement to be compatible with the SDK's client utilities.
The primary goal of this protocol is to ensure that the high-level session logic remains consistent irrespective of the underlying transport. Custom session implementations can now satisfy the SDK's requirements simply by implementing the defined methods. No explicit inheritance is required.
from mcp.client.base_client_session import BaseClientSession
class MyCustomTransportSession:
"""
A custom session implementation. It doesn't need to inherit from
BaseClientSession to be compatible.
"""
async def initialize(self) -> InitializeResult:
...
async def send_request(self, ...) -> Any:
...
# Implementing these methods makes this class a 'BaseClientSession'Because the protocol is @runtime_checkable, you can verify that any session object adheres to the required structure using standard Python checks:
def start_client(session: BaseClientSession):
# This works for any object implementing the protocol
if not isinstance(session, BaseClientSession):
raise TypeError("Session must implement the BaseClientSession protocol")ClientRequestContext is now RequestContext[BaseClientSession] instead of RequestContext[ClientSession]. This means callbacks receive the more general BaseClientSession type, which may not have all methods available on ClientSession.
Before:
from mcp.client.context import ClientRequestContext
from mcp.client.session import ClientSession
async def my_callback(context: ClientRequestContext) -> None:
# Could access ClientSession-specific methods
caps = context.session.get_server_capabilities()After:
from mcp.client.context import ClientRequestContext
from mcp.client.session import ClientSession
async def my_callback(context: ClientRequestContext) -> None:
# context.session is BaseClientSession - narrow the type if needed
if isinstance(context.session, ClientSession):
caps = context.session.get_server_capabilities()sampling_callback, elicitation_callback, and list_roots_callback protocols now require explicit type parameters.
Before:
from mcp.client.session import SamplingFnT
async def my_sampling(context, params) -> CreateMessageResult:
...
# Type inferred as SamplingFnT
session = ClientSession(..., sampling_callback=my_sampling)After:
from mcp.client.session import SamplingFnT, ClientSession
async def my_sampling(
context: RequestContext[ClientSession],
params: CreateMessageRequestParams
) -> CreateMessageResult:
...
# Explicit type annotation recommended
my_sampling_typed: SamplingFnT[ClientSession] = my_sampling
session = ClientSession(..., sampling_callback=my_sampling_typed)To support structural subtyping and ensure type safety across the new Protocol hierarchy, several internal TypeVariables have been updated with explicit variance (covariant or contravariant). These have been renamed to follow PEP 484 naming conventions.
| Original Name | Updated Name | Variance | Purpose |
|---|---|---|---|
| SessionT | SessionT_co | Covariant | Allows specialized sessions to be used where a base session is expected. |
| SendRequestT | SendRequestT_contra | Contravariant | Ensures request types can be safely handled by generic handlers. |
| SendNotificationT | SendNotificationT_contra | Contravariant | Ensures notification types are handled safely across the hierarchy. |
| ReceiveResultT | ReceiveResultT_co | Covariant | Handles successful response models safely. |
Before:
SessionT_co = TypeVar("SessionT_co", bound="AbstractBaseSession[Any, Any]", covariant=True)
SendRequestT = TypeVar("SendRequestT", ClientRequest, ServerRequest)
SendNotificationT = TypeVar("SendNotificationT", ClientNotification, ServerNotification)
ReceiveResultT = TypeVar("ReceiveResultT", bound=BaseModel)After:
SessionT_co = TypeVar("SessionT_co", bound="AbstractBaseSession[Any, Any]", covariant=True)
SendRequestT_contra = TypeVar("SendRequestT_contra", ClientRequest, ServerRequest, contravariant=True)
SendNotificationT_contra = TypeVar(
"SendNotificationT_contra", ClientNotification, ServerNotification, contravariant=True
)
ReceiveResultT_co = TypeVar("ReceiveResultT_co", bound=BaseModel, covariant=True)Previously, the lowlevel Server hardcoded subscribe=False in resource capabilities even when a subscribe_resource() handler was registered. The subscribe capability is now dynamically set to True when an on_subscribe_resource handler is provided. Clients that previously didn't see subscribe: true in capabilities will now see it when a handler is registered, which may change client behavior.
MCP protocol types no longer accept arbitrary extra fields at the top level. This matches the MCP specification which only allows extra fields within _meta objects, not on the types themselves.
# This will now raise a validation error
from mcp.types import CallToolRequestParams
params = CallToolRequestParams(
name="my_tool",
arguments={},
unknown_field="value", # ValidationError: extra fields not permitted
)
# Extra fields are still allowed in _meta
params = CallToolRequestParams(
name="my_tool",
arguments={},
_meta={"progressToken": "tok", "customField": "value"}, # OK
)The streamable_http_app() method is now available directly on the lowlevel Server class, not just MCPServer. This allows using the streamable HTTP transport without the MCPServer wrapper.
from mcp.server import Server, ServerRequestContext
from mcp.types import ListToolsResult, PaginatedRequestParams
async def handle_list_tools(ctx: ServerRequestContext, params: PaginatedRequestParams | None) -> ListToolsResult:
return ListToolsResult(tools=[...])
server = Server("my-server", on_list_tools=handle_list_tools)
app = server.streamable_http_app(
streamable_http_path="/mcp",
json_response=False,
stateless_http=False,
)The lowlevel Server also now exposes a session_manager property to access the StreamableHTTPSessionManager after calling streamable_http_app().
If you encounter issues during migration:
- Check the API Reference for updated method signatures
- Review the examples for updated usage patterns
- Open an issue on GitHub if you find a bug or need further assistance