-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathacp.py.j2
More file actions
26 lines (21 loc) · 997 Bytes
/
Copy pathacp.py.j2
File metadata and controls
26 lines (21 loc) · 997 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from typing import AsyncGenerator, Union
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.protocol.acp import SendMessageParams
from agentex.types.task_message_update import TaskMessageUpdate
from agentex.types.task_message_content import TaskMessageContent
from agentex.types.text_content import TextContent
from agentex.lib.utils.logging import make_logger
logger = make_logger(__name__)
# Create an ACP server
acp = FastACP.create(
acp_type="sync",
)
@acp.on_message_send
async def handle_message_send(
params: SendMessageParams
) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]:
"""Default message handler with streaming support"""
return TextContent(
author="agent",
content=f"Hello! I've received your message. Here's a generic response, but in future tutorials we'll see how you can get me to intelligently respond to your message. This is what I heard you say: {params.content.content}",
)