-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathstreaming_agent.py
More file actions
116 lines (95 loc) · 3.95 KB
/
streaming_agent.py
File metadata and controls
116 lines (95 loc) · 3.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""Streaming agent example demonstrating real-time response streaming.
This example shows how to use the streaming feature:
- Creates an OpenAIModel (with FakeModel fallback for testing)
- Calls model.complete(messages, stream=True) to get streaming deltas
- Iterates AsyncIterator[StreamDelta] and prints content chunks in real-time
- Shows final delta with usage stats (prompt_tokens, completion_tokens, total_tokens)
Usage:
1. Copy .env.example to .env and fill in your API credentials
2. Run: uv run python examples/streaming_agent.py
Environment variables:
LLM_API_KEY — API key for the LLM model (required, can be fake for testing)
LLM_BASE_URL — Base URL for the API (default: https://dashscope.aliyuncs.com/compatible-mode/v1)
LLM_MODEL — Model name (default: qwen3.5-plus)
Output:
The agent streams the response word-by-word in real-time, showing each chunk
as it arrives from the LLM. At the end, usage statistics are displayed.
"""
from __future__ import annotations
import asyncio
import os
import sys
from ecs_agent.logging import configure_logging
from ecs_agent.providers import FakeModel, Model
from ecs_agent.providers.config import ApiFormat
from ecs_agent.types import CompletionResult, Message, Usage
async def main() -> None:
"""Run a streaming agent that demonstrates real-time response output."""
# --- Configure logging ---
configure_logging(json_output=False)
# --- Load config from environment ---
api_key = os.environ.get("LLM_API_KEY", "")
base_url = os.environ.get(
"LLM_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1"
)
model = os.environ.get("LLM_MODEL", "qwen3.5-plus")
# --- Create LLM model ---
if api_key:
print(f"Using model: {model}")
print(f"Base URL: {base_url}")
model = Model(model, base_url=base_url, api_key=api_key, api_format=ApiFormat.OPENAI_CHAT_COMPLETIONS)
else:
print("No LLM_API_KEY provided. Using FakeModel for demonstration.")
print("To use a real API, set LLM_API_KEY, LLM_BASE_URL, and LLM_MODEL.")
print()
# Create a fake model that streams character-by-character
model = FakeModel(
responses=[
CompletionResult(
message=Message(
role="assistant",
content="This is a streamed response from the FakeModel. Each character arrives as a separate chunk, simulating real-time streaming from an LLM.",
),
usage=Usage(
prompt_tokens=15, completion_tokens=35, total_tokens=50
),
)
]
)
# --- Prepare messages ---
messages = [
Message(
role="user",
content="Explain the concept of streaming in LLMs in a single paragraph.",
)
]
# --- Stream the response ---
print("Streaming response:")
print("-" * 60)
# Call model with streaming enabled
delta_iterator = await model.complete(messages, stream=True)
# Track final delta info
final_delta = None
total_tokens = 0
# Iterate through deltas and print content in real-time
async for delta in delta_iterator:
if delta.content:
# Use sys.stdout.write to avoid newlines between chunks
sys.stdout.write(delta.content)
sys.stdout.flush()
# Keep track of final delta for usage stats
if delta.finish_reason:
final_delta = delta
# --- Print final newline and stats ---
print()
print("-" * 60)
if final_delta and final_delta.usage:
usage = final_delta.usage
print(f"Tokens used:")
print(f" Prompt tokens: {usage.prompt_tokens}")
print(f" Completion tokens: {usage.completion_tokens}")
print(f" Total tokens: {usage.total_tokens}")
else:
print("(No usage statistics available)")
if __name__ == "__main__":
asyncio.run(main())