with client.messages.stream(
model="{{OPUS_ID}}",
max_tokens=64000,
messages=[{"role": "user", "content": "Write a story"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)async with async_client.messages.stream(
model="{{OPUS_ID}}",
max_tokens=64000,
messages=[{"role": "user", "content": "Write a story"}]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)Claude may return text, thinking blocks, or tool use. Handle each appropriately:
Opus 4.6: Use
thinking: {type: "adaptive"}. On older models, usethinking: {type: "enabled", budget_tokens: N}instead.
with client.messages.stream(
model="{{OPUS_ID}}",
max_tokens=64000,
thinking={"type": "adaptive"},
messages=[{"role": "user", "content": "Analyze this problem"}]
) as stream:
for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "thinking":
print("\n[Thinking...]")
elif event.content_block.type == "text":
print("\n[Response:]")
elif event.type == "content_block_delta":
if event.delta.type == "thinking_delta":
print(event.delta.thinking, end="", flush=True)
elif event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)The Python tool runner currently returns complete messages. Use streaming for individual API calls within a manual loop if you need per-token streaming with tools:
with client.messages.stream(
model="{{OPUS_ID}}",
max_tokens=64000,
tools=tools,
messages=messages
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
response = stream.get_final_message()
# Continue with tool execution if response.stop_reason == "tool_use"with client.messages.stream(
model="{{OPUS_ID}}",
max_tokens=64000,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# Get full message after streaming
final_message = stream.get_final_message()
print(f"\n\nTokens used: {final_message.usage.output_tokens}")def stream_with_progress(client, **kwargs):
"""Stream a response with progress updates."""
total_tokens = 0
content_parts = []
with client.messages.stream(**kwargs) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
text = event.delta.text
content_parts.append(text)
print(text, end="", flush=True)
elif event.type == "message_delta":
if event.usage and event.usage.output_tokens is not None:
total_tokens = event.usage.output_tokens
final_message = stream.get_final_message()
print(f"\n\n[Tokens used: {total_tokens}]")
return "".join(content_parts)try:
with client.messages.stream(
model="{{OPUS_ID}}",
max_tokens=64000,
messages=[{"role": "user", "content": "Write a story"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
except anthropic.APIConnectionError:
print("\nConnection lost. Please retry.")
except anthropic.RateLimitError:
print("\nRate limited. Please wait and retry.")
except anthropic.APIStatusError as e:
print(f"\nAPI error: {e.status_code}")| Event Type | Description | When it fires |
|---|---|---|
message_start |
Contains message metadata | Once at the beginning |
content_block_start |
New content block beginning | When a text/tool_use block starts |
content_block_delta |
Incremental content update | For each token/chunk |
content_block_stop |
Content block complete | When a block finishes |
message_delta |
Message-level updates | Contains stop_reason, usage |
message_stop |
Message complete | Once at the end |
- Always flush output — Use
flush=Trueto show tokens immediately - Handle partial responses — If the stream is interrupted, you may have incomplete content
- Track token usage — The
message_deltaevent contains usage information - Use timeouts — Set appropriate timeouts for your application
- Default to streaming — Use
.get_final_message()to get the complete response even when streaming, giving you timeout protection without needing to handle individual events