-
Notifications
You must be signed in to change notification settings - Fork 711
Expand file tree
/
Copy pathmessages_kernel.py
More file actions
274 lines (205 loc) · 7.42 KB
/
Copy pathmessages_kernel.py
File metadata and controls
274 lines (205 loc) · 7.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
import uuid
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Literal, Optional
from semantic_kernel.kernel_pydantic import Field, KernelBaseModel
class DataType(str, Enum):
"""Enumeration of possible data types for documents in the database."""
session = "session"
plan = "plan"
step = "step"
agent_message = "agent_message"
team_config = "team_config"
user_current_team = "user_current_team"
m_plan = "m_plan"
m_plan_message = "m_plan_message"
class AgentType(str, Enum):
"""Enumeration of agent types."""
HUMAN = "Human_Agent"
HR = "Hr_Agent"
MARKETING = "Marketing_Agent"
PROCUREMENT = "Procurement_Agent"
PRODUCT = "Product_Agent"
GENERIC = "Generic_Agent"
TECH_SUPPORT = "Tech_Support_Agent"
GROUP_CHAT_MANAGER = "Group_Chat_Manager"
PLANNER = "Planner_Agent"
# Add other agents as needed
class StepStatus(str, Enum):
"""Enumeration of possible statuses for a step."""
planned = "planned"
awaiting_feedback = "awaiting_feedback"
approved = "approved"
rejected = "rejected"
action_requested = "action_requested"
completed = "completed"
failed = "failed"
class PlanStatus(str, Enum):
"""Enumeration of possible statuses for a plan."""
in_progress = "in_progress"
completed = "completed"
failed = "failed"
canceled = "canceled"
approved = "approved"
created = "created"
class HumanFeedbackStatus(str, Enum):
"""Enumeration of human feedback statuses."""
requested = "requested"
accepted = "accepted"
rejected = "rejected"
class MessageRole(str, Enum):
"""Message roles compatible with Semantic Kernel."""
system = "system"
user = "user"
assistant = "assistant"
function = "function"
class BaseDataModel(KernelBaseModel):
"""Base data model with common fields."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
session_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
timestamp: Optional[datetime] = Field(
default_factory=lambda: datetime.now(timezone.utc)
)
class AgentMessage(BaseDataModel):
"""Base class for messages sent between agents."""
data_type: Literal[DataType.agent_message] = Field(DataType.agent_message, Literal=True)
plan_id: str
content: str
source: str
step_id: Optional[str] = None
class Session(BaseDataModel):
"""Represents a user session."""
data_type: Literal[DataType.session] = Field(DataType.session, Literal=True)
user_id: str
current_status: str
message_to_user: Optional[str] = None
class UserCurrentTeam(BaseDataModel):
"""Represents the current team of a user."""
data_type: Literal[DataType.user_current_team] = Field(DataType.user_current_team, Literal=True)
user_id: str
team_id: str
class Plan(BaseDataModel):
"""Represents a plan containing multiple steps."""
data_type: Literal[DataType.plan] = Field(DataType.plan, Literal=True)
plan_id: str
user_id: str
initial_goal: str
overall_status: PlanStatus = PlanStatus.in_progress
approved: bool = False
source: str = AgentType.PLANNER.value
m_plan: Optional[Dict[str, Any]] = None
summary: Optional[str] = None
team_id: Optional[str] = None
streaming_message: Optional[str] = None
human_clarification_request: Optional[str] = None
human_clarification_response: Optional[str] = None
class Step(BaseDataModel):
"""Represents an individual step (task) within a plan."""
data_type: Literal[DataType.step] = Field(DataType.step, Literal=True)
plan_id: str
user_id: str
action: str
agent: AgentType
status: StepStatus = StepStatus.planned
agent_reply: Optional[str] = None
human_feedback: Optional[str] = None
human_approval_status: Optional[HumanFeedbackStatus] = HumanFeedbackStatus.requested
updated_action: Optional[str] = None
class TeamSelectionRequest(BaseDataModel):
"""Request model for team selection."""
team_id: str
class TeamAgent(KernelBaseModel):
"""Represents an agent within a team."""
input_key: str
type: str
name: str
deployment_name: str
system_message: str = ""
description: str = ""
icon: str
index_name: str = ""
use_rag: bool = False
use_mcp: bool = False
use_bing: bool = False
use_reasoning: bool = False
coding_tools: bool = False
class StartingTask(KernelBaseModel):
"""Represents a starting task for a team."""
id: str
name: str
prompt: str
created: str
creator: str
logo: str
class TeamConfiguration(BaseDataModel):
"""Represents a team configuration stored in the database."""
team_id: str
data_type: Literal[DataType.team_config] = Field(DataType.team_config, Literal=True)
session_id: str # Partition key
name: str
status: str
created: str
created_by: str
agents: List[TeamAgent] = Field(default_factory=list)
description: str = ""
logo: str = ""
plan: str = ""
starting_tasks: List[StartingTask] = Field(default_factory=list)
user_id: str # Who uploaded this configuration
class PlanWithSteps(Plan):
"""Plan model that includes the associated steps."""
steps: List[Step] = Field(default_factory=list)
total_steps: int = 0
planned: int = 0
awaiting_feedback: int = 0
approved: int = 0
rejected: int = 0
action_requested: int = 0
completed: int = 0
failed: int = 0
def update_step_counts(self):
"""Update the counts of steps by their status."""
status_counts = {
StepStatus.planned: 0,
StepStatus.awaiting_feedback: 0,
StepStatus.approved: 0,
StepStatus.rejected: 0,
StepStatus.action_requested: 0,
StepStatus.completed: 0,
StepStatus.failed: 0,
}
for step in self.steps:
status_counts[step.status] += 1
self.total_steps = len(self.steps)
self.planned = status_counts[StepStatus.planned]
self.awaiting_feedback = status_counts[StepStatus.awaiting_feedback]
self.approved = status_counts[StepStatus.approved]
self.rejected = status_counts[StepStatus.rejected]
self.action_requested = status_counts[StepStatus.action_requested]
self.completed = status_counts[StepStatus.completed]
self.failed = status_counts[StepStatus.failed]
if self.total_steps > 0 and (self.completed + self.failed) == self.total_steps:
self.overall_status = PlanStatus.completed
# Mark the plan as complete if the sum of completed and failed steps equals the total number of steps
# Message classes for communication between agents
class InputTask(KernelBaseModel):
"""Message representing the initial input task from the user."""
session_id: str
description: str # Initial goal
# team_id: str
class UserLanguage(KernelBaseModel):
language: str
class AgentMessageType(str, Enum):
HUMAN_AGENT = "Human_Agent",
AI_AGENT = "AI_Agent",
class AgentMessageData (BaseDataModel):
data_type: Literal[DataType.m_plan_message] = Field(DataType.m_plan_message, Literal=True)
plan_id: str
user_id: str
agent: str
m_plan_id: Optional[str] = None
agent_type: AgentMessageType = AgentMessageType.AI_AGENT
content: str
raw_data: str
steps: List[Any] = Field(default_factory=list)
next_steps: List[Any] = Field(default_factory=list)