-
Notifications
You must be signed in to change notification settings - Fork 897
Expand file tree
/
Copy pathclient.py
More file actions
560 lines (476 loc) · 23.1 KB
/
client.py
File metadata and controls
560 lines (476 loc) · 23.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
"""Claude SDK Client for interacting with Claude Code."""
import json
import os
from collections.abc import AsyncIterable, AsyncIterator
from dataclasses import asdict, replace
from typing import Any
from . import Transport
from ._errors import CLIConnectionError
from .types import (
ClaudeAgentOptions,
ContextUsageResponse,
HookEvent,
HookMatcher,
McpStatusResponse,
Message,
PermissionMode,
ResultMessage,
)
class ClaudeSDKClient:
"""
Client for bidirectional, interactive conversations with Claude Code.
This client provides full control over the conversation flow with support
for streaming, interrupts, and dynamic message sending. For simple one-shot
queries, consider using the query() function instead.
Key features:
- **Bidirectional**: Send and receive messages at any time
- **Stateful**: Maintains conversation context across messages
- **Interactive**: Send follow-ups based on responses
- **Control flow**: Support for interrupts and session management
When to use ClaudeSDKClient:
- Building chat interfaces or conversational UIs
- Interactive debugging or exploration sessions
- Multi-turn conversations with context
- When you need to react to Claude's responses
- Real-time applications with user input
- When you need interrupt capabilities
When to use query() instead:
- Simple one-off questions
- Batch processing of prompts
- Fire-and-forget automation scripts
- When all inputs are known upfront
- Stateless operations
See examples/streaming_mode.py for full examples of ClaudeSDKClient in
different scenarios.
Caveat: As of v0.0.20, you cannot use a ClaudeSDKClient instance across
different async runtime contexts (e.g., different trio nurseries or asyncio
task groups). The client internally maintains a persistent anyio task group
for reading messages that remains active from connect() until disconnect().
This means you must complete all operations with the client within the same
async context where it was connected. Ideally, this limitation should not
exist.
"""
def __init__(
self,
options: ClaudeAgentOptions | None = None,
transport: Transport | None = None,
):
"""Initialize Claude SDK client."""
if options is None:
options = ClaudeAgentOptions()
self.options = options
self._custom_transport = transport
self._transport: Transport | None = None
self._query: Any | None = None
def _convert_hooks_to_internal_format(
self, hooks: dict[HookEvent, list[HookMatcher]]
) -> dict[str, list[dict[str, Any]]]:
"""Convert HookMatcher format to internal Query format."""
internal_hooks: dict[str, list[dict[str, Any]]] = {}
for event, matchers in hooks.items():
internal_hooks[event] = []
for matcher in matchers:
# Convert HookMatcher to internal dict format
internal_matcher: dict[str, Any] = {
"matcher": matcher.matcher if hasattr(matcher, "matcher") else None,
"hooks": matcher.hooks if hasattr(matcher, "hooks") else [],
}
if hasattr(matcher, "timeout") and matcher.timeout is not None:
internal_matcher["timeout"] = matcher.timeout
internal_hooks[event].append(internal_matcher)
return internal_hooks
async def connect(
self, prompt: str | AsyncIterable[dict[str, Any]] | None = None
) -> None:
"""Connect to Claude with a prompt or message stream."""
from ._internal.query import Query
from ._internal.transport.subprocess_cli import SubprocessCLITransport
# Auto-connect with empty async iterable if no prompt is provided
async def _empty_stream() -> AsyncIterator[dict[str, Any]]:
# Never yields, but indicates that this function is an iterator and
# keeps the connection open.
# This yield is never reached but makes this an async generator
return
yield {} # type: ignore[unreachable]
# String prompts are sent via transport.write() below, so the transport
# only needs an AsyncIterable (or an empty stream for None/str cases).
actual_prompt = prompt if isinstance(prompt, AsyncIterable) else _empty_stream()
# Validate and configure permission settings (matching TypeScript SDK logic)
if self.options.can_use_tool:
# canUseTool callback requires streaming mode (AsyncIterable prompt)
if isinstance(prompt, str):
raise ValueError(
"can_use_tool callback requires streaming mode. "
"Please provide prompt as an AsyncIterable instead of a string."
)
# canUseTool and permission_prompt_tool_name are mutually exclusive
if self.options.permission_prompt_tool_name:
raise ValueError(
"can_use_tool callback cannot be used with permission_prompt_tool_name. "
"Please use one or the other."
)
# Automatically set permission_prompt_tool_name to "stdio" for control protocol
options = replace(self.options, permission_prompt_tool_name="stdio")
else:
options = self.options
# Use provided custom transport or create subprocess transport
if self._custom_transport:
self._transport = self._custom_transport
else:
self._transport = SubprocessCLITransport(
prompt=actual_prompt,
options=options,
)
await self._transport.connect()
# Extract SDK MCP servers from options
sdk_mcp_servers = {}
if self.options.mcp_servers and isinstance(self.options.mcp_servers, dict):
for name, config in self.options.mcp_servers.items():
if isinstance(config, dict) and config.get("type") == "sdk":
sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item]
# Calculate initialize timeout from CLAUDE_CODE_STREAM_CLOSE_TIMEOUT env var if set
# CLAUDE_CODE_STREAM_CLOSE_TIMEOUT is in milliseconds, convert to seconds
initialize_timeout_ms = int(
os.environ.get("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT", "60000")
)
initialize_timeout = max(initialize_timeout_ms / 1000.0, 60.0)
# Extract exclude_dynamic_sections from preset system prompt for the
# initialize request (older CLIs ignore unknown initialize fields).
exclude_dynamic_sections: bool | None = None
sp = self.options.system_prompt
if isinstance(sp, dict) and sp.get("type") == "preset":
eds = sp.get("exclude_dynamic_sections")
if isinstance(eds, bool):
exclude_dynamic_sections = eds
# Convert agents to dict format for initialize request
agents_dict: dict[str, dict[str, Any]] | None = None
if self.options.agents:
agents_dict = {
name: {k: v for k, v in asdict(agent_def).items() if v is not None}
for name, agent_def in self.options.agents.items()
}
# Create Query to handle control protocol
self._query = Query(
transport=self._transport,
is_streaming_mode=True, # ClaudeSDKClient always uses streaming mode
can_use_tool=self.options.can_use_tool,
hooks=self._convert_hooks_to_internal_format(self.options.hooks)
if self.options.hooks
else None,
sdk_mcp_servers=sdk_mcp_servers,
initialize_timeout=initialize_timeout,
agents=agents_dict,
exclude_dynamic_sections=exclude_dynamic_sections,
skills=self.options.skills,
)
# Start reading messages and initialize
await self._query.start()
await self._query.initialize()
# If we have an initial prompt, send it
if isinstance(prompt, str):
message = {
"type": "user",
"message": {"role": "user", "content": prompt},
"parent_tool_use_id": None,
"session_id": "default",
}
await self._transport.write(json.dumps(message) + "\n")
elif prompt is not None and isinstance(prompt, AsyncIterable):
self._query.spawn_task(self._query.stream_input(prompt))
async def receive_messages(self) -> AsyncIterator[Message]:
"""Receive all messages from Claude."""
if not self._query:
raise CLIConnectionError("Not connected. Call connect() first.")
from ._internal.message_parser import parse_message
async for data in self._query.receive_messages():
message = parse_message(data)
if message is not None:
yield message
async def query(
self, prompt: str | AsyncIterable[dict[str, Any]], session_id: str = "default"
) -> None:
"""
Send a new request in streaming mode.
Args:
prompt: Either a string message or an async iterable of message dictionaries
session_id: Session identifier for the conversation
"""
if not self._query or not self._transport:
raise CLIConnectionError("Not connected. Call connect() first.")
# Handle string prompts
if isinstance(prompt, str):
message = {
"type": "user",
"message": {"role": "user", "content": prompt},
"parent_tool_use_id": None,
"session_id": session_id,
}
await self._transport.write(json.dumps(message) + "\n")
else:
# Handle AsyncIterable prompts - stream them
async for msg in prompt:
# Ensure session_id is set on each message
if "session_id" not in msg:
msg["session_id"] = session_id
await self._transport.write(json.dumps(msg) + "\n")
async def interrupt(self) -> None:
"""Send interrupt signal (only works with streaming mode)."""
if not self._query:
raise CLIConnectionError("Not connected. Call connect() first.")
await self._query.interrupt()
async def set_permission_mode(self, mode: PermissionMode) -> None:
"""Change permission mode during conversation (only works with streaming mode).
Args:
mode: The permission mode to set. Valid options:
- 'default': CLI prompts for dangerous tools
- 'acceptEdits': Auto-accept file edits
- 'plan': Plan-only mode (no tool execution)
- 'bypassPermissions': Allow all tools (use with caution)
- 'dontAsk': Allow all tools without prompting
- 'auto': Automatically determine permission mode
Example:
```python
async with ClaudeSDKClient() as client:
# Start with default permissions
await client.query("Help me analyze this codebase")
# Review mode done, switch to auto-accept edits
await client.set_permission_mode('acceptEdits')
await client.query("Now implement the fix we discussed")
```
"""
if not self._query:
raise CLIConnectionError("Not connected. Call connect() first.")
await self._query.set_permission_mode(mode)
async def set_model(self, model: str | None = None) -> None:
"""Change the AI model during conversation (only works with streaming mode).
Args:
model: The model to use, or None to use default. Examples:
- 'claude-sonnet-4-5'
- 'claude-opus-4-1-20250805'
- 'claude-opus-4-20250514'
Example:
```python
async with ClaudeSDKClient() as client:
# Start with default model
await client.query("Help me understand this problem")
# Switch to a different model for implementation
await client.set_model('claude-sonnet-4-5')
await client.query("Now implement the solution")
```
"""
if not self._query:
raise CLIConnectionError("Not connected. Call connect() first.")
await self._query.set_model(model)
async def rewind_files(self, user_message_id: str) -> None:
"""Rewind tracked files to their state at a specific user message.
Requires:
- `enable_file_checkpointing=True` to track file changes
- `extra_args={"replay-user-messages": None}` to receive UserMessage
objects with `uuid` in the response stream
Args:
user_message_id: UUID of the user message to rewind to. This should be
the `uuid` field from a `UserMessage` received during the conversation.
Example:
```python
options = ClaudeAgentOptions(
enable_file_checkpointing=True,
extra_args={"replay-user-messages": None},
)
async with ClaudeSDKClient(options) as client:
await client.query("Make some changes to my files")
async for msg in client.receive_response():
if isinstance(msg, UserMessage) and msg.uuid:
checkpoint_id = msg.uuid # Save this for later
# Later, rewind to that point
await client.rewind_files(checkpoint_id)
```
"""
if not self._query:
raise CLIConnectionError("Not connected. Call connect() first.")
await self._query.rewind_files(user_message_id)
async def reconnect_mcp_server(self, server_name: str) -> None:
"""Reconnect a disconnected or failed MCP server (only works with streaming mode).
Use this to retry connecting to an MCP server that failed to connect
or was disconnected. Raises an exception if the reconnection fails.
Args:
server_name: The name of the MCP server to reconnect
Example:
```python
async with ClaudeSDKClient(options) as client:
status = await client.get_mcp_status()
for server in status.get("mcpServers", []):
if server["status"] == "failed":
await client.reconnect_mcp_server(server["name"])
```
"""
if not self._query:
raise CLIConnectionError("Not connected. Call connect() first.")
await self._query.reconnect_mcp_server(server_name)
async def toggle_mcp_server(self, server_name: str, enabled: bool) -> None:
"""Enable or disable an MCP server (only works with streaming mode).
Disabling a server disconnects it and removes its tools from the
available tool set. Enabling a server reconnects it and makes its
tools available again. Raises an exception on failure.
Args:
server_name: The name of the MCP server to toggle
enabled: True to enable the server, False to disable it
Example:
```python
async with ClaudeSDKClient(options) as client:
# Temporarily disable a server
await client.toggle_mcp_server("my-server", enabled=False)
await client.query("Do something without my-server tools")
# Re-enable it later
await client.toggle_mcp_server("my-server", enabled=True)
```
"""
if not self._query:
raise CLIConnectionError("Not connected. Call connect() first.")
await self._query.toggle_mcp_server(server_name, enabled)
async def stop_task(self, task_id: str) -> None:
"""Stop a running task (only works with streaming mode).
After this resolves, a `task_notification` system message with
status `'stopped'` will be emitted by the CLI in the message stream.
Args:
task_id: The task ID from `task_notification` events.
Example:
```python
async with ClaudeSDKClient() as client:
await client.query("Start a long-running task")
# Listen for task_notification to get task_id, then:
await client.stop_task("task-abc123")
# A task_notification with status 'stopped' will follow
```
"""
if not self._query:
raise CLIConnectionError("Not connected. Call connect() first.")
await self._query.stop_task(task_id)
async def get_mcp_status(self) -> McpStatusResponse:
"""Get current MCP server connection status (only works with streaming mode).
Queries the Claude Code CLI for the live connection status of all
configured MCP servers.
Returns:
McpStatusResponse dictionary with an 'mcpServers' key containing
a list of McpServerStatus entries. Each entry includes:
- 'name': Server name (str)
- 'status': Connection status ('connected', 'pending', 'failed',
'needs-auth', 'disabled')
- 'serverInfo': MCP server name/version (when connected)
- 'error': Error message (when status is 'failed')
- 'config': Server configuration (stdio/sse/http/sdk/claudeai-proxy)
- 'scope': Configuration scope (e.g., project, user, local)
- 'tools': List of tools provided by the server (when connected)
Example:
```python
async with ClaudeSDKClient(options) as client:
status = await client.get_mcp_status()
for server in status["mcpServers"]:
print(f"{server['name']}: {server['status']}")
if server["status"] == "failed":
print(f" Error: {server.get('error')}")
```
"""
if not self._query:
raise CLIConnectionError("Not connected. Call connect() first.")
result: McpStatusResponse = await self._query.get_mcp_status()
return result
async def get_context_usage(self) -> ContextUsageResponse:
"""Get a breakdown of current context window usage by category.
Returns the same data shown by the `/context` command in the CLI,
including token counts per category, total usage, and detailed
breakdowns of MCP tools, memory files, and agents.
Returns:
ContextUsageResponse dictionary with keys including:
- 'categories': List of categories with name, tokens, color
- 'totalTokens': Total tokens in context
- 'maxTokens': Effective context limit
- 'percentage': Percent of context used (0-100)
- 'model': Model the usage is calculated for
- 'mcpTools': Per-tool token breakdown for MCP servers
- 'memoryFiles': Per-file token breakdown for CLAUDE.md files
- 'agents': Per-agent token breakdown
Example:
```python
async with ClaudeSDKClient() as client:
await client.query("Read this file")
async for _ in client.receive_response():
pass
usage = await client.get_context_usage()
print(f"Using {usage['percentage']:.1f}% of context")
for cat in usage['categories']:
print(f" {cat['name']}: {cat['tokens']} tokens")
```
"""
if not self._query:
raise CLIConnectionError("Not connected. Call connect() first.")
result: ContextUsageResponse = await self._query.get_context_usage()
return result
async def get_server_info(self) -> dict[str, Any] | None:
"""Get server initialization info including available commands and output styles.
Returns initialization information from the Claude Code server including:
- Available commands (slash commands, system commands, etc.)
- Current and available output styles
- Server capabilities
Returns:
Dictionary with server info, or None if not in streaming mode
Example:
```python
async with ClaudeSDKClient() as client:
info = await client.get_server_info()
if info:
print(f"Commands available: {len(info.get('commands', []))}")
print(f"Output style: {info.get('output_style', 'default')}")
```
"""
if not self._query:
raise CLIConnectionError("Not connected. Call connect() first.")
# Return the initialization result that was already obtained during connect
return getattr(self._query, "_initialization_result", None)
async def receive_response(self) -> AsyncIterator[Message]:
"""
Receive messages from Claude until and including a ResultMessage.
This async iterator yields all messages in sequence and automatically terminates
after yielding a ResultMessage (which indicates the response is complete).
It's a convenience method over receive_messages() for single-response workflows.
**Stopping Behavior:**
- Yields each message as it's received
- Terminates immediately after yielding a ResultMessage
- The ResultMessage IS included in the yielded messages
- If no ResultMessage is received, the iterator continues indefinitely
Yields:
Message: Each message received (UserMessage, AssistantMessage, SystemMessage, ResultMessage)
Example:
```python
async with ClaudeSDKClient() as client:
await client.query("What's the capital of France?")
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(msg, ResultMessage):
print(f"Cost: ${msg.total_cost_usd:.4f}")
# Iterator will terminate after this message
```
Note:
To collect all messages: `messages = [msg async for msg in client.receive_response()]`
The final message in the list will always be a ResultMessage.
"""
async for message in self.receive_messages():
yield message
if isinstance(message, ResultMessage):
return
async def disconnect(self) -> None:
"""Disconnect from Claude."""
if self._query:
await self._query.close()
self._query = None
self._transport = None
async def __aenter__(self) -> "ClaudeSDKClient":
"""Enter async context - automatically connects with empty stream for interactive use."""
await self.connect()
return self
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool:
"""Exit async context - always disconnects."""
await self.disconnect()
return False