| layout | default |
|---|---|
| title | LiteLLM Tutorial - Chapter 4: Streaming & Async |
| nav_order | 4 |
| has_children | false |
| parent | LiteLLM Tutorial |
Welcome to Chapter 4: Streaming & Async. In this part of LiteLLM Tutorial: Unified LLM Gateway and Routing Layer, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.
Implement real-time streaming responses and asynchronous processing for better user experience and performance.
Streaming allows you to receive responses in real-time as they're generated, while async processing enables concurrent requests. This chapter covers both patterns and how to implement them with LiteLLM.
Receive responses token by token:
import litellm
response = litellm.completion(
model="gpt-4",
messages=[{"role": "user", "content": "Write a short story about AI"}],
stream=True # Enable streaming
)
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print() # New line at endStream responses contain partial content:
response = litellm.completion(
model="gpt-4",
messages=[{"role": "user", "content": "Count to 10 slowly"}],
stream=True
)
for i, chunk in enumerate(response):
print(f"Chunk {i}:")
print(f" Content: {chunk.choices[0].delta.content}")
print(f" Finish reason: {chunk.choices[0].finish_reason}")
print(f" Usage: {chunk.usage}")
print()Create an interactive chat with real-time responses:
import litellm
import time
class StreamingChat:
def __init__(self, model="gpt-4"):
self.model = model
self.conversation = []
def send_message(self, user_message):
"""Send a message and stream the response."""
self.conversation.append({"role": "user", "content": user_message})
response = litellm.completion(
model=self.model,
messages=self.conversation,
stream=True,
max_tokens=500
)
full_response = ""
print("Assistant: ", end="", flush=True)
for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_response += content
time.sleep(0.01) # Small delay for readability
print() # New line
self.conversation.append({"role": "assistant", "content": full_response})
return full_response
# Usage
chat = StreamingChat()
while True:
user_input = input("You: ")
if user_input.lower() in ['quit', 'exit']:
break
chat.send_message(user_input)Handle multiple streaming requests concurrently:
import asyncio
import litellm
async def stream_completion_async(model, messages):
"""Async streaming completion."""
response = await litellm.acompletion(
model=model,
messages=messages,
stream=True,
max_tokens=300
)
full_content = ""
async for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_content += content
return full_content
async def main():
messages = [{"role": "user", "content": "Explain async programming"}]
print("Streaming response:")
content = await stream_completion_async("gpt-4", messages)
print(f"\n\nFull content length: {len(content)} characters")
# Run async function
asyncio.run(main())Process multiple requests simultaneously:
async def process_multiple_requests():
"""Process multiple LLM requests concurrently."""
tasks = [
litellm.acompletion(
model="gpt-4",
messages=[{"role": "user", "content": f"Tell me a fact about topic {i}"}],
max_tokens=100
)
for i in range(5)
]
# Wait for all to complete
responses = await asyncio.gather(*tasks)
for i, response in enumerate(responses):
content = response.choices[0].message.content
print(f"Topic {i}: {content[:50]}...")
# Run concurrent requests
asyncio.run(process_multiple_requests())Robust streaming with fallbacks:
async def streaming_with_fallback(messages, primary_model="gpt-4", fallback_model="gpt-3.5-turbo"):
"""Stream with automatic fallback."""
try:
response = await litellm.acompletion(
model=primary_model,
messages=messages,
stream=True,
timeout=30
)
full_content = ""
async for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_content += content
return full_content, primary_model
except Exception as e:
print(f"\nPrimary model failed ({primary_model}), trying fallback ({fallback_model})...")
print("Fallback: ", end="", flush=True)
# Fallback to non-streaming for simplicity
response = await litellm.acompletion(
model=fallback_model,
messages=messages,
max_tokens=300
)
content = response.choices[0].message.content
print(content)
return content, fallback_model
async def main():
messages = [{"role": "user", "content": "Write a haiku about programming"}]
content, model_used = await streaming_with_fallback(messages)
print(f"\nUsed model: {model_used}")
asyncio.run(main())Implement WebSocket-based streaming for web applications:
# server.py (using FastAPI)
from fastapi import FastAPI, WebSocket
import litellm
import json
app = FastAPI()
@app.websocket("/chat")
async def chat_websocket(websocket: WebSocket):
await websocket.accept()
# Initialize conversation
conversation = [{"role": "system", "content": "You are a helpful assistant."}]
try:
while True:
# Receive user message
user_message = await websocket.receive_text()
conversation.append({"role": "user", "content": user_message})
# Stream response
response = await litellm.acompletion(
model="gpt-4",
messages=conversation,
stream=True
)
full_response = ""
async for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
# Send chunk to client
await websocket.send_text(json.dumps({
"type": "chunk",
"content": content
}))
# Send completion signal
await websocket.send_text(json.dumps({
"type": "done",
"full_content": full_response
}))
# Add to conversation history
conversation.append({"role": "assistant", "content": full_response})
except Exception as e:
await websocket.send_text(json.dumps({
"type": "error",
"message": str(e)
}))
# client.js (frontend)
const ws = new WebSocket('ws://localhost:8000/chat');
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
if (data.type === 'chunk') {
// Append chunk to response area
responseDiv.textContent += data.content;
} else if (data.type === 'done') {
// Handle completion
console.log('Response complete');
} else if (data.type === 'error') {
// Handle error
console.error('Error:', data.message);
}
};
// Send message
function sendMessage(message) {
ws.send(message);
}Show progress during long generations:
import threading
import time
def show_progress_indicator():
"""Show a simple progress indicator."""
symbols = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
i = 0
while not hasattr(show_progress_indicator, 'stop'):
print(f"\r{symbols[i % len(symbols)]} Generating response...", end="", flush=True)
time.sleep(0.1)
i += 1
print("\r✓ Response complete! ")
def stream_with_progress(model, messages):
"""Stream with progress indicator."""
# Start progress indicator in background
progress_thread = threading.Thread(target=show_progress_indicator)
progress_thread.start()
try:
response = litellm.completion(
model=model,
messages=messages,
stream=True,
max_tokens=1000
)
full_content = ""
print("\r", end="", flush=True) # Clear progress indicator
for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_content += content
print() # New line
return full_content
finally:
# Stop progress indicator
show_progress_indicator.stop = True
progress_thread.join()
# Usage
messages = [{"role": "user", "content": "Write a 500-word essay about artificial intelligence"}]
content = stream_with_progress("gpt-4", messages)Implement rate limiting for concurrent requests:
import asyncio
from collections import deque
import time
class RateLimiter:
def __init__(self, requests_per_minute=60):
self.requests_per_minute = requests_per_minute
self.requests = deque()
self.lock = asyncio.Lock()
async def acquire(self):
"""Acquire permission to make a request."""
async with self.lock:
now = time.time()
# Remove old requests outside the time window
while self.requests and now - self.requests[0] > 60:
self.requests.popleft()
# Check if we can make another request
if len(self.requests) >= self.requests_per_minute:
# Wait until oldest request expires
wait_time = 60 - (now - self.requests[0])
if wait_time > 0:
await asyncio.sleep(wait_time)
self.requests.append(now)
rate_limiter = RateLimiter(requests_per_minute=50)
async def rate_limited_completion(model, messages):
"""Completion with rate limiting."""
await rate_limiter.acquire()
return await litellm.acompletion(
model=model,
messages=messages
)
async def batch_with_rate_limit():
"""Process batch with rate limiting."""
tasks = []
for i in range(10):
messages = [{"role": "user", "content": f"Generate idea {i}"}]
tasks.append(rate_limited_completion("gpt-3.5-turbo", messages))
results = await asyncio.gather(*tasks)
for i, response in enumerate(results):
print(f"Idea {i}: {response.choices[0].message.content[:50]}...")
asyncio.run(batch_with_rate_limit())Process large files with streaming:
async def stream_file_processing(file_path, model="gpt-4"):
"""Process a large file with streaming."""
# Read file in chunks
with open(file_path, 'r') as f:
content = f.read()
# Split into manageable chunks
chunk_size = 4000 # characters
chunks = [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)]
full_analysis = ""
for i, chunk in enumerate(chunks):
print(f"Processing chunk {i + 1}/{len(chunks)}...")
messages = [
{"role": "system", "content": "You are analyzing a large document. Provide insights for this section."},
{"role": "user", "content": f"Analyze this section of the document:\n\n{chunk}"}
]
response = await litellm.acompletion(
model=model,
messages=messages,
stream=True,
max_tokens=500
)
chunk_analysis = ""
async for chunk_response in response:
if chunk_response.choices[0].delta.content:
content_piece = chunk_response.choices[0].delta.content
print(content_piece, end="", flush=True)
chunk_analysis += content_piece
full_analysis += f"\n\n--- Section {i + 1} ---\n{chunk_analysis}"
print("\n") # Section separator
return full_analysis
# Usage
analysis = await stream_file_processing("large_document.txt")Tips for streaming performance:
- Chunk Processing: Process responses in chunks rather than individual tokens for UI updates
- Connection Pooling: Reuse connections for multiple requests
- Concurrent Limits: Limit concurrent streaming requests to avoid overwhelming the API
- Error Recovery: Implement automatic retry logic for failed chunks
- Buffering: Buffer chunks before displaying to reduce UI flicker
- User Feedback: Always show users that streaming is in progress
- Error Handling: Handle streaming errors gracefully (connection drops, timeouts)
- Rate Limiting: Respect API rate limits, especially with concurrent requests
- Resource Management: Clean up resources properly in async code
- UI Responsiveness: Update UI efficiently to avoid blocking the main thread
- Fallback Strategies: Have non-streaming fallbacks for when streaming fails
- Progress Indication: Show meaningful progress indicators for long operations
Streaming and async processing enable responsive, scalable AI applications. These patterns work across all LiteLLM-supported providers, giving you consistent real-time capabilities regardless of the underlying model.
This chapter is expanded to v1-style depth for production-grade learning and implementation quality.
- tutorial: LiteLLM Tutorial: Unified LLM Gateway and Routing Layer
- tutorial slug: litellm-tutorial
- chapter focus: Chapter 4: Streaming & Async
- system context: Litellm Tutorial
- objective: move from surface-level usage to repeatable engineering operation
- Define the runtime boundary for
Chapter 4: Streaming & Async. - Separate control-plane decisions from data-plane execution.
- Capture input contracts, transformation points, and output contracts.
- Trace state transitions across request lifecycle stages.
- Identify extension hooks and policy interception points.
- Map ownership boundaries for team and automation workflows.
- Specify rollback and recovery paths for unsafe changes.
- Track observability signals for correctness, latency, and cost.
| Decision Area | Low-Risk Path | High-Control Path | Tradeoff |
|---|---|---|---|
| Runtime mode | managed defaults | explicit policy config | speed vs control |
| State handling | local ephemeral | durable persisted state | simplicity vs auditability |
| Tool integration | direct API use | mediated adapter layer | velocity vs governance |
| Rollout method | manual change | staged + canary rollout | effort vs safety |
| Incident response | best effort logs | runbooks + SLO alerts | cost vs reliability |
| Failure Mode | Early Signal | Root Cause Pattern | Countermeasure |
|---|---|---|---|
| stale context | inconsistent outputs | missing refresh window | enforce context TTL and refresh hooks |
| policy drift | unexpected execution | ad hoc overrides | centralize policy profiles |
| auth mismatch | 401/403 bursts | credential sprawl | rotation schedule + scope minimization |
| schema breakage | parser/validation errors | unmanaged upstream changes | contract tests per release |
| retry storms | queue congestion | no backoff controls | jittered backoff + circuit breakers |
| silent regressions | quality drop without alerts | weak baseline metrics | eval harness with thresholds |
- Establish a reproducible baseline environment.
- Capture chapter-specific success criteria before changes.
- Implement minimal viable path with explicit interfaces.
- Add observability before expanding feature scope.
- Run deterministic tests for happy-path behavior.
- Inject failure scenarios for negative-path validation.
- Compare output quality against baseline snapshots.
- Promote through staged environments with rollback gates.
- Record operational lessons in release notes.
- chapter-level assumptions are explicit and testable
- API/tool boundaries are documented with input/output examples
- failure handling includes retry, timeout, and fallback policy
- security controls include auth scopes and secret rotation plans
- observability includes logs, metrics, traces, and alert thresholds
- deployment guidance includes canary and rollback paths
- docs include links to upstream sources and related tracks
- post-release verification confirms expected behavior under load
- Langfuse Tutorial
- Vercel AI SDK Tutorial
- OpenAI Python SDK Tutorial
- Aider Tutorial
- Chapter 1: Getting Started
- Build a minimal end-to-end implementation for
Chapter 4: Streaming & Async. - Add instrumentation and measure baseline latency and error rate.
- Introduce one controlled failure and confirm graceful recovery.
- Add policy constraints and verify they are enforced consistently.
- Run a staged rollout and document rollback decision criteria.
- Which execution boundary matters most for this chapter and why?
- What signal detects regressions earliest in your environment?
- What tradeoff did you make between delivery speed and governance?
- How would you recover from the highest-impact failure mode?
- What must be automated before scaling to team-wide adoption?
Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for content, messages, chunk so behavior stays predictable as complexity grows.
In practical terms, this chapter helps you avoid three common failures:
- coupling core logic too tightly to one implementation path
- missing the handoff boundaries between setup, execution, and validation
- shipping changes without clear rollback or observability strategy
After working through this chapter, you should be able to reason about Chapter 4: Streaming & Async as an operating subsystem inside LiteLLM Tutorial: Unified LLM Gateway and Routing Layer, with explicit contracts for inputs, state transitions, and outputs.
Use the implementation notes around print, response, model as your checklist when adapting these patterns to your own repository.
Under the hood, Chapter 4: Streaming & Async usually follows a repeatable control path:
- Context bootstrap: initialize runtime config and prerequisites for
content. - Input normalization: shape incoming data so
messagesreceives stable contracts. - Core execution: run the main logic branch and propagate intermediate state through
chunk. - Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
- Output composition: return canonical result payloads for downstream consumers.
- Operational telemetry: emit logs/metrics needed for debugging and performance tuning.
When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.
Use the following upstream sources to verify implementation details while reading this chapter:
- LiteLLM Repository
Why it matters: authoritative reference on
LiteLLM Repository(github.com). - LiteLLM Releases
Why it matters: authoritative reference on
LiteLLM Releases(github.com). - LiteLLM Docs
Why it matters: authoritative reference on
LiteLLM Docs(docs.litellm.ai).
Suggested trace strategy:
- search upstream code for
contentandmessagesto map concrete implementation paths - compare docs claims against actual runtime/config code before reusing patterns in production