-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathmain.py
More file actions
83 lines (69 loc) · 2.22 KB
/
main.py
File metadata and controls
83 lines (69 loc) · 2.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from strands import Agent, tool
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from model.load import load_model
from mcp_client.client import get_streamable_http_mcp_client
{{#if hasMemory}}
from memory.session import get_memory_session_manager
{{/if}}
app = BedrockAgentCoreApp()
log = app.logger
# Define a Streamable HTTP MCP Client
mcp_client = get_streamable_http_mcp_client()
# Define a collection of tools used by the model
tools = []
# Define a simple function tool
@tool
def add_numbers(a: int, b: int) -> int:
"""Return the sum of two numbers"""
return a+b
tools.append(add_numbers)
{{#if hasMemory}}
def agent_factory():
cache = {}
def get_or_create_agent(session_id, user_id):
key = f"{session_id}/{user_id}"
if key not in cache:
# Create an agent for the given session_id and user_id
cache[key] = Agent(
model=load_model(),
session_manager=get_memory_session_manager(session_id, user_id),
system_prompt="""
You are a helpful assistant. Use tools when appropriate.
""",
tools=tools+[mcp_client]
)
return cache[key]
return get_or_create_agent
get_or_create_agent = agent_factory()
{{else}}
_agent = None
def get_or_create_agent():
global _agent
if _agent is None:
_agent = Agent(
model=load_model(),
system_prompt="""
You are a helpful assistant. Use tools when appropriate.
""",
tools=tools+[mcp_client]
)
return _agent
{{/if}}
@app.entrypoint
async def invoke(payload, context):
log.info("Invoking Agent.....")
{{#if hasMemory}}
session_id = getattr(context, 'session_id', 'default-session')
user_id = getattr(context, 'user_id', 'default-user')
agent = get_or_create_agent(session_id, user_id)
{{else}}
agent = get_or_create_agent()
{{/if}}
# Execute and format response
stream = agent.stream_async(payload.get("prompt"))
async for event in stream:
# Handle Text parts of the response
if "data" in event and isinstance(event["data"], str):
yield event["data"]
if __name__ == "__main__":
app.run()