-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathstreaming.py
More file actions
101 lines (71 loc) · 2.72 KB
/
streaming.py
File metadata and controls
101 lines (71 loc) · 2.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""Streaming example.
This example demonstrates real-time token streaming:
- Async streaming with chunk-by-chunk output
- Handling streaming events
- Building responsive UIs with streaming
"""
import asyncio
from miiflow_agent import LLMClient, Message
async def basic_streaming():
"""Basic streaming - print tokens as they arrive."""
client = LLMClient.create("openai", model="gpt-4o-mini")
print("Streaming response: ", end="", flush=True)
async for chunk in client.astream_chat([
Message.user("Write a haiku about programming.")
]):
if chunk.delta:
print(chunk.delta, end="", flush=True)
print("\n")
async def streaming_with_metadata():
"""Streaming with usage tracking."""
client = LLMClient.create("openai", model="gpt-4o-mini")
full_content = ""
final_usage = None
async for chunk in client.astream_chat([
Message.user("Explain recursion in 3 sentences.")
]):
if chunk.delta:
full_content += chunk.delta
print(chunk.delta, end="", flush=True)
# Last chunk contains usage info
if chunk.finish_reason:
final_usage = chunk.usage
print(f"\n\nFull response length: {len(full_content)} chars")
if final_usage:
print(f"Tokens used: {final_usage.total_tokens}")
async def streaming_multiple_messages():
"""Stream responses for multiple messages."""
client = LLMClient.create("openai", model="gpt-4o-mini")
questions = [
"What is 2+2?",
"What is the capital of France?",
"Name a primary color.",
]
for question in questions:
print(f"\nQ: {question}")
print("A: ", end="", flush=True)
async for chunk in client.astream_chat([Message.user(question)]):
if chunk.delta:
print(chunk.delta, end="", flush=True)
print()
async def streaming_with_system_prompt():
"""Streaming with custom system prompt."""
client = LLMClient.create("openai", model="gpt-4o-mini")
messages = [
Message.system("You are a pirate. Respond in pirate speak."),
Message.user("Tell me about the weather today."),
]
print("Pirate response: ", end="", flush=True)
async for chunk in client.astream_chat(messages):
if chunk.delta:
print(chunk.delta, end="", flush=True)
print("\n")
if __name__ == "__main__":
print("=== Basic Streaming ===")
asyncio.run(basic_streaming())
print("=== Streaming with Metadata ===")
asyncio.run(streaming_with_metadata())
print("=== Multiple Streaming Messages ===")
asyncio.run(streaming_multiple_messages())
print("=== Streaming with System Prompt ===")
asyncio.run(streaming_with_system_prompt())