The Batches API (POST /v1/messages/batches) processes Messages API requests asynchronously at 50% of standard prices.
- Up to 100,000 requests or 256 MB per batch
- Most batches complete within 1 hour; maximum 24 hours
- Results available for 29 days after creation
- 50% cost reduction on all token usage
- All Messages API features supported (vision, tools, caching, etc.)
import anthropic
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request
client = anthropic.Anthropic()
message_batch = client.messages.batches.create(
requests=[
Request(
custom_id="request-1",
params=MessageCreateParamsNonStreaming(
model="{{OPUS_ID}}",
max_tokens=16000,
messages=[{"role": "user", "content": "Summarize climate change impacts"}]
)
),
Request(
custom_id="request-2",
params=MessageCreateParamsNonStreaming(
model="{{OPUS_ID}}",
max_tokens=16000,
messages=[{"role": "user", "content": "Explain quantum computing basics"}]
)
),
]
)
print(f"Batch ID: {message_batch.id}")
print(f"Status: {message_batch.processing_status}")import time
while True:
batch = client.messages.batches.retrieve(message_batch.id)
if batch.processing_status == "ended":
break
print(f"Status: {batch.processing_status}, processing: {batch.request_counts.processing}")
time.sleep(60)
print("Batch complete!")
print(f"Succeeded: {batch.request_counts.succeeded}")
print(f"Errored: {batch.request_counts.errored}")Note: Examples below use
match/casesyntax, requiring Python 3.10+. For earlier versions, useif/elifchains instead.
for result in client.messages.batches.results(message_batch.id):
match result.result.type:
case "succeeded":
msg = result.result.message
text = next((b.text for b in msg.content if b.type == "text"), "")
print(f"[{result.custom_id}] {text[:100]}")
case "errored":
if result.result.error.type == "invalid_request":
print(f"[{result.custom_id}] Validation error - fix request and retry")
else:
print(f"[{result.custom_id}] Server error - safe to retry")
case "canceled":
print(f"[{result.custom_id}] Canceled")
case "expired":
print(f"[{result.custom_id}] Expired - resubmit")cancelled = client.messages.batches.cancel(message_batch.id)
print(f"Status: {cancelled.processing_status}") # "canceling"shared_system = [
{"type": "text", "text": "You are a literary analyst."},
{
"type": "text",
"text": large_document_text, # Shared across all requests
"cache_control": {"type": "ephemeral"}
}
]
message_batch = client.messages.batches.create(
requests=[
Request(
custom_id=f"analysis-{i}",
params=MessageCreateParamsNonStreaming(
model="{{OPUS_ID}}",
max_tokens=16000,
system=shared_system,
messages=[{"role": "user", "content": question}]
)
)
for i, question in enumerate(questions)
]
)import anthropic
import time
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request
client = anthropic.Anthropic()
# 1. Prepare requests
items_to_classify = [
"The product quality is excellent!",
"Terrible customer service, never again.",
"It's okay, nothing special.",
]
requests = [
Request(
custom_id=f"classify-{i}",
params=MessageCreateParamsNonStreaming(
model="{{HAIKU_ID}}",
max_tokens=50,
messages=[{
"role": "user",
"content": f"Classify as positive/negative/neutral (one word): {text}"
}]
)
)
for i, text in enumerate(items_to_classify)
]
# 2. Create batch
batch = client.messages.batches.create(requests=requests)
print(f"Created batch: {batch.id}")
# 3. Wait for completion
while True:
batch = client.messages.batches.retrieve(batch.id)
if batch.processing_status == "ended":
break
time.sleep(10)
# 4. Collect results
results = {}
for result in client.messages.batches.results(batch.id):
if result.result.type == "succeeded":
msg = result.result.message
results[result.custom_id] = next((b.text for b in msg.content if b.type == "text"), "")
for custom_id, classification in sorted(results.items()):
print(f"{custom_id}: {classification}")