forked from agentclientprotocol/python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcore.py
More file actions
91 lines (77 loc) · 2.91 KB
/
core.py
File metadata and controls
91 lines (77 loc) · 2.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""Compatibility re-exports for historical imports.
The project now keeps implementation in dedicated modules mirroring the
agent-client-protocol Rust structure, but external callers may still import
from ``acp.core``. Keep the surface API stable by forwarding to the new homes.
"""
from __future__ import annotations
from typing import Any
from .agent.connection import AgentSideConnection
from .client.connection import ClientSideConnection
from .connection import Connection, JsonValue, MethodHandler
from .exceptions import RequestError
from .interfaces import Agent, Client
__all__ = [
"Agent",
"AgentSideConnection",
"Client",
"ClientSideConnection",
"Connection",
"JsonValue",
"MethodHandler",
"RequestError",
"connect_to_agent",
"run_agent",
]
async def run_agent(
agent: Agent,
input_stream: Any = None,
output_stream: Any = None,
*,
use_unstable_protocol: bool = False,
**connection_kwargs: Any,
) -> None:
"""Run an ACP agent over the given input/output streams.
This is a convenience function that creates an :class:`AgentSideConnection`
and starts listening for incoming messages.
Args:
agent: The agent implementation to run.
input_stream: The (client) input stream to write to (defaults: ``sys.stdin``).
output_stream: The (client) output stream to read from (defaults: ``sys.stdout``).
use_unstable_protocol: Whether to enable unstable protocol features.
**connection_kwargs: Additional keyword arguments to pass to the
:class:`AgentSideConnection` constructor.
"""
from .stdio import stdio_streams
if input_stream is None and output_stream is None:
output_stream, input_stream = await stdio_streams()
conn = AgentSideConnection(
agent,
input_stream,
output_stream,
listening=False,
use_unstable_protocol=use_unstable_protocol,
**connection_kwargs,
)
await conn.listen()
def connect_to_agent(
client: Client,
input_stream: Any,
output_stream: Any,
*,
use_unstable_protocol: bool = False,
**connection_kwargs: Any,
) -> ClientSideConnection:
"""Create a ClientSideConnection to an ACP agent over the given input/output streams.
Args:
client: The client implementation to use.
input_stream: The (agent) input stream to write to (default: ``sys.stdin``).
output_stream: The (agent) output stream to read from (default: ``sys.stdout``).
use_unstable_protocol: Whether to enable unstable protocol features.
**connection_kwargs: Additional keyword arguments to pass to the
:class:`ClientSideConnection` constructor.
Returns:
A :class:`ClientSideConnection` instance connected to the agent.
"""
return ClientSideConnection(
client, input_stream, output_stream, use_unstable_protocol=use_unstable_protocol, **connection_kwargs
)