forked from microsoft/agent-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
74 lines (56 loc) · 2.42 KB
/
Copy pathmain.py
File metadata and controls
74 lines (56 loc) · 2.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# Copyright (c) Microsoft. All rights reserved.
import os
from collections.abc import AsyncGenerator
from agent_framework import Agent, AgentSession
from agent_framework.foundry import FoundryChatClient
from azure.ai.agentserver.invocations import InvocationAgentServerHost
from azure.identity import DefaultAzureCredential
from dotenv import load_dotenv
from starlette.requests import Request
from starlette.responses import JSONResponse, Response, StreamingResponse
# Load environment variables from .env file
load_dotenv()
# In-memory session store — keyed by session ID.
# WARNING: This is lost on restart. Use durable storage in production.
_sessions: dict[str, AgentSession] = {}
# Create the agent
client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=DefaultAzureCredential(),
)
agent = Agent(
client=client,
instructions="You are a friendly assistant. Keep your answers brief.",
# History will be managed by the hosting infrastructure, thus there
# is no need to store history by the service. Learn more at:
# https://developers.openai.com/api/reference/resources/responses/methods/create
default_options={"store": False},
)
app = InvocationAgentServerHost()
@app.invoke_handler
async def handle_invoke(request: Request):
"""Handle streaming multi-turn chat with Azure OpenAI via SSE."""
data = await request.json()
session_id = request.state.session_id
stream = data.get("stream", False)
user_message = data.get("message", None)
if user_message is None:
error = "Missing 'message' in request"
if stream:
return StreamingResponse(content=error, status_code=400)
return Response(content=error, status_code=400)
session = _sessions.setdefault(session_id, AgentSession(session_id=session_id))
if stream:
async def stream_response() -> AsyncGenerator[str]:
async for update in agent.run(user_message, session=session, stream=True):
yield update.text
return StreamingResponse(
stream_response(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
)
response = await agent.run([user_message], session=session, stream=stream)
return JSONResponse({"response": response.text})
if __name__ == "__main__":
app.run()