-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathasync_base_acp.py
More file actions
75 lines (60 loc) · 2.61 KB
/
Copy pathasync_base_acp.py
File metadata and controls
75 lines (60 loc) · 2.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from typing import Any
from typing_extensions import override
from agentex.protocol.acp import (
SendEventParams,
CancelTaskParams,
CreateTaskParams,
)
from agentex.lib.utils.logging import make_logger
from agentex.lib.adk.utils._modules.client import create_async_agentex_client
from agentex.lib.sdk.fastacp.base.base_acp_server import BaseACPServer
logger = make_logger(__name__)
class AsyncBaseACP(BaseACPServer):
"""
AsyncBaseACP implementation - a synchronous ACP that provides basic functionality
without any special async orchestration like Temporal.
This implementation provides simple synchronous processing of tasks
and is suitable for basic agent implementations.
"""
def __init__(self):
super().__init__()
self._setup_handlers()
self._agentex_client = create_async_agentex_client()
@classmethod
@override
def create(cls, **kwargs: Any) -> "AsyncBaseACP":
"""Create and initialize SyncACP instance
Args:
**kwargs: Configuration parameters (unused in sync implementation)
Returns:
Initialized SyncACP instance
"""
logger.info("Initializing AsyncBaseACP instance")
instance = cls()
logger.info("AsyncBaseACP instance initialized with default handlers")
return instance
@override
def _setup_handlers(self):
"""Set up default handlers for sync operations"""
@self.on_task_create
async def handle_create_task(params: CreateTaskParams) -> None: # type: ignore[unused-function]
"""Default create task handler - logs the task"""
logger.info(f"AsyncBaseACP creating task {params.task.id}")
@self.on_task_event_send
async def handle_event_send(params: SendEventParams) -> None: # type: ignore[unused-function]
"""Default event handler - logs the event"""
logger.info(
f"AsyncBaseACP received event for task {params.task.id}: {params.event.id},"
f"content: {params.event.content}"
)
# TODO: Implement event handling logic here
# Implement cursor commit logic here
await self._agentex_client.tracker.update(
tracker_id=params.task.id,
last_processed_event_id=params.event.id,
)
@self.on_task_cancel
async def handle_cancel(params: CancelTaskParams) -> None: # type: ignore[unused-function]
"""Default cancel handler - logs the cancellation"""
logger.info(f"AsyncBaseACP canceling task {params.task.id}")
AgenticBaseACP = AsyncBaseACP