-
Notifications
You must be signed in to change notification settings - Fork 709
Expand file tree
/
Copy pathruntime_interrupt_kernel.py
More file actions
228 lines (178 loc) · 7.8 KB
/
Copy pathruntime_interrupt_kernel.py
File metadata and controls
228 lines (178 loc) · 7.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
from typing import Any, Dict, List, Optional
import semantic_kernel as sk
from semantic_kernel.kernel_pydantic import KernelBaseModel
# Define message classes directly in this file since the imports are problematic
class GetHumanInputMessage(KernelBaseModel):
"""Message requesting input from a human."""
content: str
class MessageBody(KernelBaseModel):
"""Simple message body class with content."""
content: str
class GroupChatMessage(KernelBaseModel):
"""Message in a group chat."""
body: Any
source: str
session_id: str
target: str = ""
def __str__(self):
content = self.body.content if hasattr(self.body, "content") else str(self.body)
return f"GroupChatMessage(source={self.source}, content={content})"
class NeedsUserInputHandler:
"""Handler for capturing messages that need human input."""
def __init__(self):
self.question_for_human: Optional[GetHumanInputMessage] = None
self.messages: List[Dict[str, Any]] = []
async def on_message(
self,
message: Any,
sender_type: str = "unknown_type",
sender_key: str = "unknown_key",
) -> Any:
"""Process an incoming message.
This is equivalent to the on_publish method in the original version.
Args:
message: The message to process
sender_type: The type of the sender (equivalent to sender.type in previous)
sender_key: The key of the sender (equivalent to sender.key in previous)
Returns:
The original message (for pass-through functionality)
"""
print(
f"NeedsUserInputHandler received message: {message} from sender: {sender_type}/{sender_key}"
)
if isinstance(message, GetHumanInputMessage):
self.question_for_human = message
self.messages.append(
{
"agent": {"type": sender_type, "key": sender_key},
"content": message.content,
}
)
print("Captured question for human in NeedsUserInputHandler")
elif isinstance(message, GroupChatMessage):
# Ensure we extract content consistently with the original implementation
content = (
message.body.content
if hasattr(message.body, "content")
else str(message.body)
)
self.messages.append(
{
"agent": {"type": sender_type, "key": sender_key},
"content": content,
}
)
print(f"Captured group chat message in NeedsUserInputHandler - {message}")
elif isinstance(message, dict) and "content" in message:
# Handle messages directly from AzureAIAgent
self.question_for_human = GetHumanInputMessage(content=message["content"])
self.messages.append(
{
"agent": {"type": sender_type, "key": sender_key},
"content": message["content"],
}
)
print("Captured question from AzureAIAgent in NeedsUserInputHandler")
return message
@property
def needs_human_input(self) -> bool:
"""Check if human input is needed."""
return self.question_for_human is not None
@property
def question_content(self) -> Optional[str]:
"""Get the content of the question for human."""
if self.question_for_human:
return self.question_for_human.content
return None
def get_messages(self) -> List[Dict[str, Any]]:
"""Get captured messages and clear buffer."""
messages = self.messages.copy()
self.messages.clear()
print("Returning and clearing captured messages in NeedsUserInputHandler")
return messages
class AssistantResponseHandler:
"""Handler for capturing assistant responses."""
def __init__(self):
self.assistant_response: Optional[str] = None
async def on_message(self, message: Any, sender_type: str = None) -> Any:
"""Process an incoming message from an assistant.
This is equivalent to the on_publish method in the original version.
Args:
message: The message to process
sender_type: The type of the sender (equivalent to sender.type in previous)
Returns:
The original message (for pass-through functionality)
"""
print(
f"on_message called in AssistantResponseHandler with message from sender: {sender_type} - {message}"
)
if hasattr(message, "body") and sender_type in ["writer", "editor"]:
# Ensure we're handling the content consistently with the original implementation
self.assistant_response = (
message.body.content
if hasattr(message.body, "content")
else str(message.body)
)
print("Assistant response set in AssistantResponseHandler")
elif isinstance(message, dict) and "value" in message and sender_type:
# Handle message from AzureAIAgent
self.assistant_response = message["value"]
print(
"Assistant response from AzureAIAgent set in AssistantResponseHandler"
)
return message
@property
def has_response(self) -> bool:
"""Check if response is available."""
has_response = self.assistant_response is not None
print(f"has_response called, returning: {has_response}")
return has_response
def get_response(self) -> Optional[str]:
"""Get captured response."""
response = self.assistant_response
print(f"get_response called, returning: {response}")
return response
# Helper function to register handlers with a Semantic Kernel instance
def register_handlers(kernel: sk.Kernel, session_id: str) -> tuple:
"""Register interrupt handlers with a Semantic Kernel instance.
This is a new function that provides Semantic Kernel integration.
Args:
kernel: The Semantic Kernel instance
session_id: The session identifier
Returns:
Tuple of (NeedsUserInputHandler, AssistantResponseHandler)
"""
user_input_handler = NeedsUserInputHandler()
assistant_handler = AssistantResponseHandler()
# Create kernel functions for the handlers
kernel.add_function(
user_input_handler.on_message,
plugin_name=f"user_input_handler_{session_id}",
function_name="on_message",
)
kernel.add_function(
assistant_handler.on_message,
plugin_name=f"assistant_handler_{session_id}",
function_name="on_message",
)
# Store handler references in kernel's context variables for later retrieval
kernel.set_variable(f"input_handler_{session_id}", user_input_handler)
kernel.set_variable(f"response_handler_{session_id}", assistant_handler)
print(f"Registered handlers for session {session_id} with kernel")
return user_input_handler, assistant_handler
# Helper function to get the registered handlers for a session
def get_handlers(kernel: sk.Kernel, session_id: str) -> tuple:
"""Get the registered interrupt handlers for a session.
This is a new function that provides Semantic Kernel integration.
Args:
kernel: The Semantic Kernel instance
session_id: The session identifier
Returns:
Tuple of (NeedsUserInputHandler, AssistantResponseHandler)
"""
user_input_handler = kernel.get_variable(f"input_handler_{session_id}", None)
assistant_handler = kernel.get_variable(f"response_handler_{session_id}", None)
# Create new handlers if they don't exist
if not user_input_handler or not assistant_handler:
return register_handlers(kernel, session_id)
return user_input_handler, assistant_handler