-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathfastacp.py
More file actions
125 lines (106 loc) · 5.06 KB
/
Copy pathfastacp.py
File metadata and controls
125 lines (106 loc) · 5.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from __future__ import annotations
import os
import inspect
from typing import Any, Literal
from pathlib import Path
from typing_extensions import deprecated
from agentex.lib.types.fastacp import (
BaseACPConfig,
SyncACPConfig,
AsyncACPConfig,
AgenticACPConfig,
)
from agentex.lib.utils.logging import make_logger
from agentex.lib.sdk.fastacp.impl.sync_acp import SyncACP
from agentex.lib.sdk.fastacp.impl.temporal_acp import TemporalACP
from agentex.lib.sdk.fastacp.impl.async_base_acp import AsyncBaseACP
from agentex.lib.sdk.fastacp.base.base_acp_server import BaseACPServer
# Add new mappings between ACP types and configs here
# Add new mappings between ACP types and implementations here
AGENTIC_ACP_IMPLEMENTATIONS: dict[Literal["temporal", "base"], type[BaseACPServer]] = {
"temporal": TemporalACP,
"base": AsyncBaseACP,
}
logger = make_logger(__name__)
class FastACP:
"""Factory for creating FastACP instances
Supports three main ACP types:
- "sync": Simple synchronous ACP implementation
- "async": Advanced ACP with sub-types "base" or "temporal" (requires config)
- "agentic": (Deprecated, use "async") Identical to "async"
"""
@staticmethod
# Note: the config is optional and not used right now but is there to be extended in the future
def create_sync_acp(config: SyncACPConfig | None = None, **kwargs) -> SyncACP: # noqa: ARG004
"""Create a SyncACP instance"""
return SyncACP.create(**kwargs)
@staticmethod
def create_async_acp(config: AsyncACPConfig, **kwargs) -> BaseACPServer:
"""Create an async ACP instance (base or temporal)
Args:
config: AsyncACPConfig with type="base" or type="temporal"
**kwargs: Additional configuration parameters
"""
# Get implementation class
implementation_class = AGENTIC_ACP_IMPLEMENTATIONS[config.type]
# Handle temporal-specific configuration
if config.type == "temporal":
# Extract temporal_address, plugins, and interceptors from config if it's a TemporalACPConfig
temporal_config = kwargs.copy()
if hasattr(config, "temporal_address"):
temporal_config["temporal_address"] = config.temporal_address # type: ignore[attr-defined]
if hasattr(config, "plugins"):
temporal_config["plugins"] = config.plugins # type: ignore[attr-defined]
if hasattr(config, "interceptors"):
temporal_config["interceptors"] = config.interceptors # type: ignore[attr-defined]
if hasattr(config, "payload_codec"):
temporal_config["payload_codec"] = config.payload_codec # type: ignore[attr-defined]
if hasattr(config, "data_converter"):
temporal_config["data_converter"] = config.data_converter # type: ignore[attr-defined]
return implementation_class.create(**temporal_config)
else:
return implementation_class.create(**kwargs)
@staticmethod
@deprecated("Use create_async_acp instead")
def create_agentic_acp(config: AgenticACPConfig, **kwargs) -> BaseACPServer:
"""Create an async ACP instance (base or temporal)
Args:
config: AsyncACPConfig with type="base" or type="temporal"
**kwargs: Additional configuration parameters
"""
return FastACP.create_async_acp(config, **kwargs)
@staticmethod
def locate_build_info_path() -> None:
"""If a build-info.json file is present, set the BUILD_INFO_PATH environment variable"""
acp_root = Path(inspect.stack()[2].filename).resolve().parents[0]
build_info_path = acp_root / "build-info.json"
if build_info_path.exists():
os.environ["BUILD_INFO_PATH"] = str(build_info_path)
@staticmethod
def create(
acp_type: Literal["sync", "async", "agentic"],
config: BaseACPConfig | None = None,
agent_card: Any | None = None,
**kwargs,
) -> BaseACPServer | SyncACP | AsyncBaseACP | TemporalACP:
"""Main factory method to create any ACP type
Args:
acp_type: Type of ACP to create ("sync", "async", or "agentic")
config: Configuration object. Required for async/agentic type.
**kwargs: Additional configuration parameters
"""
FastACP.locate_build_info_path()
if acp_type == "sync":
sync_config = config if isinstance(config, SyncACPConfig) else None
instance = FastACP.create_sync_acp(sync_config, **kwargs)
elif acp_type == "async" or acp_type == "agentic":
if config is None:
config = AsyncACPConfig(type="base")
if not isinstance(config, AsyncACPConfig):
raise ValueError("AsyncACPConfig is required for async/agentic ACP type")
instance = FastACP.create_async_acp(config, **kwargs)
else:
raise ValueError(f"Unknown acp_type: {acp_type}")
if agent_card is not None:
instance._agent_card = agent_card # type: ignore[attr-defined]
return instance