!!! warning "Experimental"
Tasks are an experimental feature. The API may change without notice.
This guide covers calling task-augmented tools from clients, handling the input_required status, and advanced patterns like receiving task requests from servers.
Call a tool as a task and poll for the result:
from mcp.client.session import ClientSession
from mcp.types import CallToolResult
async with ClientSession(read, write) as session:
await session.initialize()
# Call tool as task
result = await session.experimental.call_tool_as_task(
"process_data",
{"input": "hello"},
ttl=60000,
)
task_id = result.task.taskId
# Poll until complete
async for status in session.experimental.poll_task(task_id):
print(f"Status: {status.status} - {status.statusMessage or ''}")
# Get result
final = await session.experimental.get_task_result(task_id, CallToolResult)
print(f"Result: {final.content[0].text}")Use call_tool_as_task() to invoke a tool with task augmentation:
result = await session.experimental.call_tool_as_task(
"my_tool", # Tool name
{"arg": "value"}, # Arguments
ttl=60000, # Time-to-live in milliseconds
meta={"key": "val"}, # Optional metadata
)
task_id = result.task.taskId
print(f"Task: {task_id}, Status: {result.task.status}")The response is a CreateTaskResult containing:
task.taskId- Unique identifier for pollingtask.status- Initial status (usually"working")task.pollInterval- Suggested polling interval (milliseconds)task.ttl- Time-to-live for resultstask.createdAt- Creation timestamp
The poll_task() async iterator polls until the task reaches a terminal state:
async for status in session.experimental.poll_task(task_id):
print(f"Status: {status.status}")
if status.statusMessage:
print(f"Progress: {status.statusMessage}")It automatically:
- Respects the server's suggested
pollInterval - Stops when status is
completed,failed, orcancelled - Yields each status for progress display
When a task needs user input (elicitation), it transitions to input_required. You must call get_task_result() to receive and respond to the elicitation:
async for status in session.experimental.poll_task(task_id):
print(f"Status: {status.status}")
if status.status == "input_required":
# This delivers the elicitation and waits for completion
final = await session.experimental.get_task_result(task_id, CallToolResult)
breakThe elicitation callback (set during session creation) handles the actual user interaction.
To handle elicitation requests from the server, provide a callback when creating the session:
from mcp.types import ElicitRequestParams, ElicitResult
async def handle_elicitation(context, params: ElicitRequestParams) -> ElicitResult:
# Display the message to the user
print(f"Server asks: {params.message}")
# Collect user input (this is a simplified example)
response = input("Your response (y/n): ")
confirmed = response.lower() == "y"
return ElicitResult(
action="accept",
content={"confirm": confirmed},
)
async with ClientSession(
read,
write,
elicitation_callback=handle_elicitation,
) as session:
await session.initialize()
# ... call tasks that may require elicitationSimilarly, handle sampling requests with a callback:
from mcp.types import CreateMessageRequestParams, CreateMessageResult, TextContent
async def handle_sampling(context, params: CreateMessageRequestParams) -> CreateMessageResult:
# In a real implementation, call your LLM here
prompt = params.messages[-1].content.text if params.messages else ""
# Return a mock response
return CreateMessageResult(
role="assistant",
content=TextContent(type="text", text=f"Response to: {prompt}"),
model="my-model",
)
async with ClientSession(
read,
write,
sampling_callback=handle_sampling,
) as session:
# ...Once a task completes, retrieve the result:
if status.status == "completed":
result = await session.experimental.get_task_result(task_id, CallToolResult)
for content in result.content:
if hasattr(content, "text"):
print(content.text)
elif status.status == "failed":
print(f"Task failed: {status.statusMessage}")
elif status.status == "cancelled":
print("Task was cancelled")The result type matches the original request:
tools/call→CallToolResultsampling/createMessage→CreateMessageResultelicitation/create→ElicitResult
Cancel a running task:
cancel_result = await session.experimental.cancel_task(task_id)
print(f"Cancelled, status: {cancel_result.status}")Note: Cancellation is cooperative—the server must check for and handle cancellation.
View all tasks on the server:
result = await session.experimental.list_tasks()
for task in result.tasks:
print(f"{task.taskId}: {task.status}")
# Handle pagination
while result.nextCursor:
result = await session.experimental.list_tasks(cursor=result.nextCursor)
for task in result.tasks:
print(f"{task.taskId}: {task.status}")Servers can send task-augmented requests to clients. This is useful when the server needs the client to perform async work (like complex sampling or user interaction).
Register task handlers to declare what task-augmented requests your client accepts:
from mcp.client.experimental.task_handlers import ExperimentalTaskHandlers
from mcp.types import (
CreateTaskResult, GetTaskResult, GetTaskPayloadResult,
TaskMetadata, ElicitRequestParams,
)
from mcp.shared.experimental.tasks import InMemoryTaskStore
# Client-side task store
client_store = InMemoryTaskStore()
async def handle_augmented_elicitation(context, params: ElicitRequestParams, task_metadata: TaskMetadata):
"""Handle task-augmented elicitation from server."""
# Create a task for this elicitation
task = await client_store.create_task(task_metadata)
# Start async work (e.g., show UI, wait for user)
async def complete_elicitation():
# ... do async work ...
result = ElicitResult(action="accept", content={"confirm": True})
await client_store.store_result(task.taskId, result)
await client_store.update_task(task.taskId, status="completed")
context.session._task_group.start_soon(complete_elicitation)
# Return task reference immediately
return CreateTaskResult(task=task)
async def handle_get_task(context, params):
"""Handle tasks/get from server."""
task = await client_store.get_task(params.taskId)
return GetTaskResult(
taskId=task.taskId,
status=task.status,
statusMessage=task.statusMessage,
createdAt=task.createdAt,
lastUpdatedAt=task.lastUpdatedAt,
ttl=task.ttl,
pollInterval=100,
)
async def handle_get_task_result(context, params):
"""Handle tasks/result from server."""
result = await client_store.get_result(params.taskId)
return GetTaskPayloadResult.model_validate(result.model_dump())
task_handlers = ExperimentalTaskHandlers(
augmented_elicitation=handle_augmented_elicitation,
get_task=handle_get_task,
get_task_result=handle_get_task_result,
)
async with ClientSession(
read,
write,
experimental_task_handlers=task_handlers,
) as session:
# Client now accepts task-augmented elicitation from server
await session.initialize()This enables flows where:
- Client calls a task-augmented tool
- Server's tool work calls
task.elicit_as_task() - Client receives task-augmented elicitation
- Client creates its own task, does async work
- Server polls client's task
- Eventually both tasks complete
A client that handles all task scenarios:
import anyio
from mcp.client.session import ClientSession
from mcp.client.stdio import stdio_client
from mcp.types import CallToolResult, ElicitRequestParams, ElicitResult
async def elicitation_callback(context, params: ElicitRequestParams) -> ElicitResult:
print(f"\n[Elicitation] {params.message}")
response = input("Confirm? (y/n): ")
return ElicitResult(action="accept", content={"confirm": response.lower() == "y"})
async def main():
async with stdio_client(command="python", args=["server.py"]) as (read, write):
async with ClientSession(
read,
write,
elicitation_callback=elicitation_callback,
) as session:
await session.initialize()
# List available tools
tools = await session.list_tools()
print("Tools:", [t.name for t in tools.tools])
# Call a task-augmented tool
print("\nCalling task tool...")
result = await session.experimental.call_tool_as_task(
"confirm_action",
{"action": "delete files"},
)
task_id = result.task.taskId
print(f"Task created: {task_id}")
# Poll and handle input_required
async for status in session.experimental.poll_task(task_id):
print(f"Status: {status.status}")
if status.status == "input_required":
final = await session.experimental.get_task_result(task_id, CallToolResult)
print(f"Result: {final.content[0].text}")
break
if status.status == "completed":
final = await session.experimental.get_task_result(task_id, CallToolResult)
print(f"Result: {final.content[0].text}")
if __name__ == "__main__":
anyio.run(main)Handle task errors gracefully:
from mcp.shared.exceptions import MCPError
try:
result = await session.experimental.call_tool_as_task("my_tool", args)
task_id = result.task.taskId
async for status in session.experimental.poll_task(task_id):
if status.status == "failed":
raise RuntimeError(f"Task failed: {status.statusMessage}")
final = await session.experimental.get_task_result(task_id, CallToolResult)
except MCPError as e:
print(f"MCP error: {e.message}")
except Exception as e:
print(f"Error: {e}")- Server Implementation - Build task-supporting servers
- Tasks Overview - Review lifecycle and concepts