-
Notifications
You must be signed in to change notification settings - Fork 713
Expand file tree
/
Copy pathagent_base.py
More file actions
307 lines (265 loc) · 11.8 KB
/
Copy pathagent_base.py
File metadata and controls
307 lines (265 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
import json
import logging
import os
from abc import ABC, abstractmethod
from typing import (Any, Awaitable, Callable, Dict, List, Mapping, Optional,
Union)
import semantic_kernel as sk
# Import the new AppConfig instance
from app_config import config
from context.cosmos_memory_kernel import CosmosMemoryContext
from event_utils import track_event_if_configured
from models.messages_kernel import (ActionRequest, ActionResponse,
AgentMessage, Step, StepStatus)
from semantic_kernel.agents import AzureAIAgentThread # pylint:disable=E0611
from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent
from semantic_kernel.functions import KernelFunction
from semantic_kernel.functions.kernel_arguments import KernelArguments
from semantic_kernel.functions.kernel_function_decorator import kernel_function
# Default formatting instructions used across agents
DEFAULT_FORMATTING_INSTRUCTIONS = "Instructions: returning the output of this function call verbatim to the user in markdown. Then write AGENT SUMMARY: and then include a summary of what you did."
class BaseAgent(AzureAIAgent):
"""BaseAgent implemented using Semantic Kernel with Azure AI Agent support."""
def __init__(
self,
agent_name: str,
session_id: str,
user_id: str,
memory_store: CosmosMemoryContext,
tools: Optional[List[KernelFunction]] = None,
system_message: Optional[str] = None,
client=None,
definition=None,
thread: AzureAIAgentThread=None,
):
"""Initialize the base agent.
Args:
agent_name: The name of the agent
session_id: The session ID
user_id: The user ID
memory_store: The memory context for storing agent state
tools: Optional list of tools for the agent
system_message: Optional system message for the agent
agent_type: Optional agent type string for automatic tool loading
client: The client required by AzureAIAgent
definition: The definition required by AzureAIAgent
"""
tools = tools or []
system_message = system_message or self.default_system_message(agent_name)
# Call AzureAIAgent constructor with required client and definition
super().__init__(
deployment_name=None, # Set as needed
plugins=tools, # Use the loaded plugins,
endpoint=None, # Set as needed
api_version=None, # Set as needed
token=None, # Set as needed
model=config.AZURE_OPENAI_DEPLOYMENT_NAME,
agent_name=agent_name,
system_prompt=system_message,
client=client,
definition=definition,
)
# Store instance variables
self._agent_name = agent_name
self._session_id = session_id
self._user_id = user_id
self._memory_store = memory_store
self._tools = tools
self._system_message = system_message
self._chat_history = [{"role": "system", "content": self._system_message}]
self._message = None # Will be initialized in handle_action_request
self._thread = thread
# Required properties for AgentGroupChat compatibility
self.name = agent_name # This is crucial for AgentGroupChat to identify agents
@staticmethod
def default_system_message(agent_name=None) -> str:
name = agent_name
return f"You are an AI assistant named {name}. Help the user by providing accurate and helpful information."
async def handle_action_request(self, action_request: ActionRequest) -> str:
"""Handle an action request from another agent or the system.
Args:
action_request_json: The action request as a JSON string
Returns:
A JSON string containing the action response
"""
# Get the step from memory
step: Step = await self._memory_store.get_step(
action_request.step_id, action_request.session_id
)
if not step:
# Create error response if step not found
response = ActionResponse(
step_id=action_request.step_id,
status=StepStatus.failed,
message="Step not found in memory.",
)
return response.model_dump_json()
self._message = [
action_request.action
]
try:
# Use the agent to process the action request
async_generator = self.invoke(
messages=self._message,
thread=self._thread,
)
response_content = ""
# Collect the response from the async generator
async for chunk in async_generator:
if chunk is not None:
response_content += str(chunk)
# Log the messages in the thread
# async for msg in self._thread.get_messages():
# logging.info(f"thread messages - role:{msg.role} - content:{msg.content}")
logging.info(f"Response content length: {len(response_content)}")
logging.info(f"Response content: {response_content}")
# Store agent message in cosmos memory
await self._memory_store.add_item(
AgentMessage(
session_id=action_request.session_id,
user_id=self._user_id,
plan_id=action_request.plan_id,
content=f"{response_content}",
source=self._agent_name,
step_id=action_request.step_id,
)
)
# Track telemetry
track_event_if_configured(
"Base agent - Added into the cosmos",
{
"session_id": action_request.session_id,
"user_id": self._user_id,
"plan_id": action_request.plan_id,
"content": f"{response_content}",
"source": self._agent_name,
"step_id": action_request.step_id,
},
)
except Exception as e:
logging.exception(f"Error during agent execution: {e}")
# Track error in telemetry
track_event_if_configured(
"Base agent - Error during agent execution, captured into the cosmos",
{
"session_id": action_request.session_id,
"user_id": self._user_id,
"plan_id": action_request.plan_id,
"content": f"{e}",
"source": self._agent_name,
"step_id": action_request.step_id,
},
)
# Return an error response
response = ActionResponse(
step_id=action_request.step_id,
plan_id=action_request.plan_id,
session_id=action_request.session_id,
result=f"Error: {str(e)}",
status=StepStatus.failed,
)
return response.json()
# Update step status
step.status = StepStatus.completed
step.agent_reply = response_content
await self._memory_store.update_step(step)
# Track step completion in telemetry
track_event_if_configured(
"Base agent - Updated step and updated into the cosmos",
{
"status": StepStatus.completed,
"session_id": action_request.session_id,
"agent_reply": f"{response_content}",
"user_id": self._user_id,
"plan_id": action_request.plan_id,
"content": f"{response_content}",
"source": self._agent_name,
"step_id": action_request.step_id,
},
)
# Create and return action response
response = ActionResponse(
step_id=step.id,
plan_id=step.plan_id,
session_id=action_request.session_id,
result=response_content,
status=StepStatus.completed,
)
return response.json()
def save_state(self) -> Mapping[str, Any]:
"""Save the state of this agent."""
return {"memory": self._memory_store.save_state()}
def load_state(self, state: Mapping[str, Any]) -> None:
"""Load the state of this agent."""
self._memory_store.load_state(state["memory"])
@classmethod
@abstractmethod
async def create(cls, **kwargs) -> "BaseAgent":
"""Create an instance of the agent."""
pass
@staticmethod
async def _create_azure_ai_agent_definition(
agent_name: str,
instructions: str,
tools: Optional[List[KernelFunction]] = None,
client=None,
response_format=None,
temperature: float = 0.0,
):
"""
Creates a new Azure AI Agent with the specified name and instructions using AIProjectClient.
If an agent with the given name (assistant_id) already exists, it tries to retrieve it first.
Args:
kernel: The Semantic Kernel instance
agent_name: The name of the agent (will be used as assistant_id)
instructions: The system message / instructions for the agent
agent_type: The type of agent (defaults to "assistant")
tools: Optional tool definitions for the agent
tool_resources: Optional tool resources required by the tools
response_format: Optional response format to control structured output
temperature: The temperature setting for the agent (defaults to 0.0)
Returns:
A new AzureAIAgent definition or an existing one if found
"""
try:
# Get the AIProjectClient
if client is None:
client = config.get_ai_project_client()
# # First try to get an existing agent with this name as assistant_id
try:
agent_id = None
agent_list = await client.agents.list_agents()
for agent in agent_list.data:
if agent.name == agent_name:
agent_id = agent.id
break
# If the agent already exists, we can use it directly
# Get the existing agent definition
if agent_id is not None:
logging.info(f"Agent with ID {agent_id} exists.")
existing_definition = await client.agents.get_agent(agent_id)
return existing_definition
except Exception as e:
# The Azure AI Projects SDK throws an exception when the agent doesn't exist
# (not returning None), so we catch it and proceed to create a new agent
if "ResourceNotFound" in str(e) or "404" in str(e):
logging.info(
f"Agent with ID {agent_name} not found. Will create a new one."
)
else:
# Log unexpected errors but still try to create a new agent
logging.warning(
f"Unexpected error while retrieving agent {agent_name}: {str(e)}. Attempting to create new agent."
)
# Create the agent using the project client with the agent_name as both name and assistantId
agent_definition = await client.agents.create_agent(
model=config.AZURE_OPENAI_DEPLOYMENT_NAME,
name=agent_name,
instructions=instructions,
temperature=temperature,
response_format=response_format,
)
return agent_definition
except Exception as exc:
logging.error("Failed to create Azure AI Agent: %s", exc)
raise