diff --git a/README.md b/README.md index 07efae07..198024e6 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,23 @@ The `sgrsh` command: For more examples and detailed usage instructions, see the [examples/](examples/) directory. +### Agent Client Protocol (`sgracp`) + +Editors and tools that speak the [Agent Client Protocol](https://agentclientprotocol.com/) can run SGR agents over **stdio** (newline-delimited JSON-RPC), using the same YAML configuration as the HTTP server. + +```bash +sgracp --config examples/sgr_deep_research/config.yaml +``` + +Optional `acp` block in `config.yaml` selects which `agents:` entry to expose (if omitted, the first agent definition is used): + +```yaml +acp: + agent: sgr_agent +``` + +Protocol details follow the official ACP specification, the [Python SDK](https://agentclientprotocol.github.io/python-sdk/), and the `agent-client-protocol` package used by this binary. + ## Benchmarking ![SimpleQA Benchmark Comparison](https://github.com/vamplabAI/sgr-agent-core/blob/main/docs/assets/images/simpleqa_benchmark_comparison.png) diff --git a/config.yaml.example b/config.yaml.example index 3fd67fe4..d255300e 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -18,6 +18,10 @@ execution: logs_dir: "logs" # Directory for saving agent execution logs reports_dir: "reports" # Directory for saving agent reports +# Optional: Agent Client Protocol (stdio) via `sgracp --config config.yaml` +# acp: +# agent: "sgr_agent" # Must match a name under agents: + # Prompts Configuration # prompts: # # Option 1: Use file paths (absolute or relative to project root) diff --git a/docs/en/framework/configuration.md b/docs/en/framework/configuration.md index 3b67c26c..76311b7a 100644 --- a/docs/en/framework/configuration.md +++ b/docs/en/framework/configuration.md @@ -67,6 +67,23 @@ config = GlobalConfig.from_yaml("config.yaml") An example can be found in [`config.yaml.example`](https://github.com/vamplabAI/sgr-agent-core/blob/main/config.yaml.example). +### Agent Client Protocol (`sgracp`) + +The optional top-level `acp` section configures the [`sgracp`](https://github.com/vamplabAI/sgr-agent-core) stdio entrypoint (Agent Client Protocol over stdin/stdout). It uses the same `agents:` definitions as the rest of the stack. + +| Field | Description | +| ----- | ----------- | +| `agent` | Name of an entry under `agents:` to run when a client connects. If omitted, the first agent in `agents:` is used. | + +Example: + +```yaml +acp: + agent: sgr_agent +``` + +You can also set `SGR__ACP__AGENT` in the environment (see `pydantic-settings` nested env rules for your version). + ### Parameter Override **Key Feature:** `AgentDefinition` inherits all parameters from `GlobalConfig` and overrides only those explicitly specified. This allows creating minimal configurations by specifying only necessary changes. diff --git a/docs/ru/framework/configuration.md b/docs/ru/framework/configuration.md index 06362a72..320d3958 100644 --- a/docs/ru/framework/configuration.md +++ b/docs/ru/framework/configuration.md @@ -67,7 +67,22 @@ config = GlobalConfig.from_yaml("config.yaml") Пример можно найти в [`config.yaml.example`](https://github.com/vamplabAI/sgr-agent-core/blob/main/config.yaml.example). +### Agent Client Protocol (`sgracp`) +Необязательная секция верхнего уровня `acp` задаёт параметры для stdio-входа [`sgracp`](https://github.com/vamplabAI/sgr-agent-core) (Agent Client Protocol поверх stdin/stdout). Используются те же определения `agents:`, что и для HTTP API. + +| Поле | Описание | +| ---- | -------- | +| `agent` | Имя записи в `agents:` для запуска при подключении клиента. Если не задано, берётся первый агент из `agents:`. | + +Пример: + +```yaml +acp: + agent: sgr_agent +``` + +Переменную окружения можно задать как `SGR__ACP__AGENT` (см. правила вложенных переменных в `pydantic-settings` для вашей версии). ### Переопределение параметров diff --git a/examples/sgr_deep_research/agents.py b/examples/sgr_deep_research/agents.py index 99e06609..efc8e60d 100644 --- a/examples/sgr_deep_research/agents.py +++ b/examples/sgr_deep_research/agents.py @@ -14,7 +14,6 @@ from sgr_agent_core.agent_definition import AgentConfig from sgr_agent_core.agents.dialog_agent import DialogAgent from sgr_agent_core.agents.iron_agent import IronAgent -from sgr_agent_core.agents.sgr_agent import SGRAgent from sgr_agent_core.agents.sgr_tool_calling_agent import SGRToolCallingAgent from sgr_agent_core.agents.tool_calling_agent import ToolCallingAgent from sgr_agent_core.tools import ( @@ -24,54 +23,11 @@ ExtractPageContentTool, FinalAnswerTool, NextStepToolsBuilder, - NextStepToolStub, ReasoningTool, WebSearchTool, ) -class ResearchSGRAgent(SGRAgent): - """Agent for deep research tasks.""" - - def __init__( - self, - task_messages: list[ChatCompletionMessageParam], - openai_client: AsyncOpenAI, - agent_config: AgentConfig, - toolkit: list[Type[BaseTool]], - def_name: str | None = None, - **kwargs: dict, - ): - research_toolkit = [WebSearchTool, ExtractPageContentTool, CreateReportTool, FinalAnswerTool] - super().__init__( - task_messages=task_messages, - openai_client=openai_client, - agent_config=agent_config, - toolkit=research_toolkit + [t for t in toolkit if t not in research_toolkit], - def_name=def_name, - **kwargs, - ) - - async def _prepare_tools(self) -> Type[NextStepToolStub]: - """Prepare available tools for the current agent state and progress.""" - tools = set(self.toolkit) - if self._context.iteration >= self.config.execution.max_iterations: - tools = { - CreateReportTool, - FinalAnswerTool, - } - if self._context.clarifications_used >= self.config.execution.max_clarifications: - tools -= { - ClarificationTool, - } - search_config = self.get_tool_config(WebSearchTool) - if self._context.searches_used >= search_config.max_searches: - tools -= { - WebSearchTool, - } - return NextStepToolsBuilder.build_NextStepTools(list(tools)) - - class ResearchToolCallingAgent(ToolCallingAgent): """Tool calling research agent for deep research tasks.""" diff --git a/examples/sgr_deep_research/config.yaml.example b/examples/sgr_deep_research/config.yaml.example index f004ca4c..6a23b530 100644 --- a/examples/sgr_deep_research/config.yaml.example +++ b/examples/sgr_deep_research/config.yaml.example @@ -19,6 +19,10 @@ execution: logs_dir: "logs" # Directory for saving agent execution logs reports_dir: "reports" # Directory for saving research reports +# Agent Client Protocol (ACP) +acp: + agent: tool_calling_agent + # MCP (Model Context Protocol) Configuration mcp: mcpServers: @@ -54,20 +58,6 @@ tools: # Agent Definitions agents: - # SGR Agent for research - sgr_agent: - base_class: "agents.ResearchSGRAgent" - llm: - model: "gpt-4.1-mini" - temperature: 0.4 - tools: - - "web_search_tool" - - "extract_page_content_tool" - - "create_report_tool" - - "final_answer_tool" - - "clarification_tool" - - "generate_plan_tool" - - "adapt_plan_tool" # Tool Calling Agent for research tool_calling_agent: diff --git a/examples/sgr_file_agent/config.yaml.example b/examples/sgr_file_agent/config.yaml.example index f88b13d3..b7bfc386 100644 --- a/examples/sgr_file_agent/config.yaml.example +++ b/examples/sgr_file_agent/config.yaml.example @@ -1,55 +1,26 @@ -# SGR File Agent - Configuration Template -# Copy this file to config.yaml and fill in your data +# Smoke test for sgracp + sgr_file_agent (see scripts/smoke_acp_file_agent.py) +# LLM matches OpenAI-compatible endpoint; paths are relative to this file. -# LLM Configuration llm: - api_key: "your-openai-api-key-here" # Your OpenAI API key - base_url: "https://api.openai.com/v1" # API base URL - model: "gpt-4o-mini" # Model name - max_tokens: 8000 # Max output tokens - temperature: 0.4 # Temperature (0.0-1.0) - # proxy: "socks5://127.0.0.1:1081" # Optional proxy (socks5:// or http://) + api_key: "https://t.me/evilfreelancer" + base_url: "https://api.rpa.icu/v1" + model: "gpt-oss:120b" + max_tokens: 4000 + temperature: 0.3 -# Execution Settings execution: - max_clarifications: 3 # Max clarification requests - max_iterations: 10 # Max agent iterations - mcp_context_limit: 15000 # Max context length from MCP server response - logs_dir: "logs" # Directory for saving agent execution logs - reports_dir: "reports" # Directory for saving agent reports + max_clarifications: 1 + max_iterations: 3 + mcp_context_limit: 15000 + logs_dir: "logs" + reports_dir: "reports" -# Prompts Configuration -# prompts: -# # Option 1: Use file paths (absolute or relative to project root) -# system_prompt_file: "path/to/your/system_prompt.txt" -# initial_user_request_file: "path/to/your/initial_user_request.txt" -# clarification_response_file: "path/to/your/clarification_response.txt" -# -# # Option 2: Provide prompts directly as strings -# system_prompt_str: "Your custom system prompt here..." -# initial_user_request_str: "Your custom initial request template..." -# clarification_response_str: "Your custom clarification template..." - - # Note: If both file and string are provided, string takes precedence - -# MCP (Model Context Protocol) Configuration mcp: mcpServers: {} - # Add MCP servers here if needed: - # deepwiki: - # url: "https://mcp.deepwiki.com/mcp" -# Tool Definitions tools: - # Core tools (base_class defaults to sgr_agent_core.tools.*) - reasoning_tool: - # base_class defaults to sgr_agent_core.tools.ReasoningTool - clarification_tool: - # base_class defaults to sgr_agent_core.tools.ClarificationTool - final_answer_tool: - # base_class defaults to sgr_agent_core.tools.FinalAnswerTool - - # File system tools - using relative imports (relative to config.yaml location) + clarification_tool: {} + final_answer_tool: {} get_system_paths_tool: base_class: "tools.GetSystemPathsTool" list_directory_tool: @@ -61,30 +32,17 @@ tools: find_files_fast_tool: base_class: "tools.FindFilesFastTool" -# Agent Definitions agents: sgr_file_agent: - base_class: "sgr_file_agent.SGRFileAgent" # Relative to config.yaml location - # Optional: Override LLM settings for this agent - llm: - model: "gpt-4o-mini" - temperature: 0.4 - max_tokens: 8000 - - # Execution configuration + base_class: "sgr_file_agent.SGRFileAgent" execution: - max_iterations: 15 - max_clarifications: 3 - max_searches: 0 # File agent doesn't use web search + max_iterations: 3 + max_clarifications: 1 + max_searches: 0 logs_dir: "logs/file_agent" reports_dir: "reports/file_agent" - - # Agent-specific parameters (for SGRFileAgent) - working_directory: "." # Working directory for file operations (default: current directory) - - # Tools this agent can use (names from tools section above) + working_directory: "." tools: - - "reasoning_tool" - "clarification_tool" - "final_answer_tool" - "get_system_paths_tool" @@ -92,3 +50,6 @@ agents: - "read_file_tool" - "search_in_files_tool" - "find_files_fast_tool" + +acp: + agent: sgr_file_agent diff --git a/examples/sgr_file_agent/sgr_file_agent.py b/examples/sgr_file_agent/sgr_file_agent.py index 6cce5c8d..e9d2f948 100644 --- a/examples/sgr_file_agent/sgr_file_agent.py +++ b/examples/sgr_file_agent/sgr_file_agent.py @@ -1,10 +1,10 @@ from typing import Type -from openai import AsyncOpenAI +from openai import AsyncOpenAI, pydantic_function_tool +from openai.types.chat import ChatCompletionFunctionToolParam from sgr_agent_core.agent_definition import AgentConfig -from sgr_agent_core.agents.sgr_agent import SGRAgent -from sgr_agent_core.next_step_tool import NextStepToolsBuilder, NextStepToolStub +from sgr_agent_core.agents.tool_calling_agent import ToolCallingAgent from sgr_agent_core.tools import ( BaseTool, ClarificationTool, @@ -21,9 +21,10 @@ ) -class SGRFileAgent(SGRAgent): - """File-first agent that uses OpenAI native function calling to work with filesystem. - Two-phase agent: reasoning phase + action phase. +class SGRFileAgent(ToolCallingAgent): + """File-first agent using native function calling only (no separate + reasoning round-trip). Suited for OpenAI-compatible endpoints that handle + tool_choice reliably but not structured reasoning schemas. Focus: File search and analysis (read-only operations) @@ -92,12 +93,9 @@ def __init__( working_directory = getattr(agent_config, "working_directory", ".") self.working_directory = working_directory - async def _prepare_tools(self) -> Type[NextStepToolStub]: - """Prepare available tools for current agent state and progress. - - Returns NextStepToolStub class for response_format, filtering - tools based on agent state. - """ + async def _prepare_tools(self) -> list[ChatCompletionFunctionToolParam]: + """Build function-calling tool list for each step (filter by iteration + limits).""" tools = set(self.toolkit) if self._context.iteration >= self.config.execution.max_iterations: tools = { @@ -108,4 +106,4 @@ async def _prepare_tools(self) -> Type[NextStepToolStub]: tools -= { ClarificationTool, } - return NextStepToolsBuilder.build_NextStepTools(list(tools)) + return [pydantic_function_tool(tool, name=tool.tool_name) for tool in tools] diff --git a/pyproject.toml b/pyproject.toml index 28108bd0..f119b50f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ dependencies = [ "jambo>=0.1.3.post2", # Tools filtering "rank-bm25>=0.2.2", + "agent-client-protocol>=0.9.0", ] [project.urls] @@ -66,6 +67,7 @@ Documentation = "https://vamplabai.github.io/sgr-agent-core/" [project.scripts] sgr = "sgr_agent_core.server.__main__:main" sgrsh = "sgr_agent_core.cli.__main__:main" +sgracp = "sgr_agent_core.acp.__main__:main" [project.optional-dependencies] dev = [ diff --git a/sgr_agent_core/acp/__init__.py b/sgr_agent_core/acp/__init__.py new file mode 100644 index 00000000..5a497897 --- /dev/null +++ b/sgr_agent_core/acp/__init__.py @@ -0,0 +1,5 @@ +"""Agent Client Protocol (ACP) stdio integration for SGR Agent Core.""" + +from sgr_agent_core.acp.bridge import SGRACPBridge, extract_prompt_text + +__all__ = ["SGRACPBridge", "extract_prompt_text"] diff --git a/sgr_agent_core/acp/__main__.py b/sgr_agent_core/acp/__main__.py new file mode 100644 index 00000000..3eec255b --- /dev/null +++ b/sgr_agent_core/acp/__main__.py @@ -0,0 +1,70 @@ +"""CLI entrypoint: ``sgracp --config /path/to/config.yaml`` (ACP over stdio).""" + +from __future__ import annotations + +import argparse +import asyncio +import importlib +import logging +import sys + +from acp import run_agent + +from sgr_agent_core.acp.bridge import SGRACPBridge +from sgr_agent_core.agent_config import GlobalConfig +from sgr_agent_core.server.__main__ import load_config + +logger = logging.getLogger(__name__) + + +def _preload_agent_modules_from_config() -> None: + """Import agent modules so ImportString classes register in + AgentRegistry.""" + cfg = GlobalConfig() + if cfg.config_dir: + root = cfg.config_dir.resolve() + # Insert parent first so `import sgr_file_agent` resolves to the package + # (example dir contains both package `sgr_file_agent/` and `sgr_file_agent.py`). + parent = str(root.parent) + if parent not in sys.path: + sys.path.insert(0, parent) + root_s = str(root) + if root_s not in sys.path: + sys.path.insert(0, root_s) + for ad in cfg.agents.values(): + bc = ad.base_class + if isinstance(bc, str) and "." in bc: + mod_name = bc.rsplit(".", 1)[0] + try: + importlib.import_module(mod_name) + except ModuleNotFoundError: + logger.warning("Could not import agent module %s for %s", mod_name, ad.name) + + +def main() -> None: + """Load YAML config and run an ACP agent on stdio.""" + parser = argparse.ArgumentParser(prog="sgracp", description="SGR Agent Core (Agent Client Protocol, stdio)") + parser.add_argument( + "--config", + required=True, + help="Path to config.yaml (same format as the sgr HTTP server)", + ) + parser.add_argument( + "--agents", + default=None, + help="Optional path to agents-only YAML (merged into definitions)", + ) + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO, stream=sys.stderr, format="%(levelname)s %(name)s: %(message)s") + + load_config(args.config, args.agents) + _preload_agent_modules_from_config() + cfg = GlobalConfig() + default_name = cfg.acp.agent if cfg.acp else None + bridge = SGRACPBridge(default_agent_name=default_name) + asyncio.run(run_agent(bridge)) + + +if __name__ == "__main__": + main() diff --git a/sgr_agent_core/acp/bridge.py b/sgr_agent_core/acp/bridge.py new file mode 100644 index 00000000..97fce254 --- /dev/null +++ b/sgr_agent_core/acp/bridge.py @@ -0,0 +1,268 @@ +"""ACP Agent protocol implementation backed by SGR AgentFactory.""" + +from __future__ import annotations + +import asyncio +import uuid +from dataclasses import dataclass +from typing import Any + +from acp.interfaces import Client +from acp.schema import ( + AgentCapabilities, + AuthenticateResponse, + CloseSessionResponse, + HttpMcpServer, + Implementation, + InitializeResponse, + ListSessionsResponse, + LoadSessionResponse, + McpCapabilities, + McpServerStdio, + NewSessionResponse, + PromptCapabilities, + PromptResponse, + SessionInfo, + SetSessionConfigOptionResponse, + SetSessionModelResponse, + SetSessionModeResponse, + SseMcpServer, + TextContentBlock, +) + +from sgr_agent_core import __version__ +from sgr_agent_core.acp.streaming import create_acp_streaming_generator_class +from sgr_agent_core.agent_config import GlobalConfig +from sgr_agent_core.agent_definition import AgentDefinition +from sgr_agent_core.agent_factory import AgentFactory +from sgr_agent_core.base_agent import BaseAgent +from sgr_agent_core.models import AgentStatesEnum + + +def extract_prompt_text( + prompt: list[TextContentBlock | Any], +) -> str: + """Concatenate user-visible text from ACP prompt content blocks.""" + parts: list[str] = [] + for block in prompt: + if isinstance(block, TextContentBlock): + parts.append(block.text) + elif getattr(block, "type", None) == "text" and getattr(block, "text", None): + parts.append(str(block.text)) + return "".join(parts).strip() + + +@dataclass +class _ACPSession: + """Per-session state for the ACP bridge.""" + + session_id: str + cwd: str + agent: BaseAgent | None = None + execute_task: asyncio.Task | None = None + + +class SGRACPBridge: + """Routes ACP JSON-RPC methods to SGR agents loaded from GlobalConfig.""" + + _SUPPORTED_PROTOCOL = 1 + + def __init__(self, default_agent_name: str | None = None) -> None: + self._default_agent_name = default_agent_name + self._sessions: dict[str, _ACPSession] = {} + self._client: Client | None = None + + def on_connect(self, conn: Client) -> None: + """Store the client connection for outbound session updates.""" + self._client = conn + + def _resolve_agent_definition(self) -> AgentDefinition: + gc = GlobalConfig() + name = self._default_agent_name + if not name and gc.acp and gc.acp.agent: + name = gc.acp.agent + if not name and gc.agents: + name = next(iter(gc.agents.keys())) + if not name or name not in gc.agents: + raise ValueError( + "No agent definition selected for ACP. Set acp.agent in config or pass default_agent_name." + ) + return gc.agents[name] + + async def initialize( + self, + protocol_version: int, + client_capabilities: Any = None, + client_info: Any = None, + **kwargs: Any, + ) -> InitializeResponse: + """Negotiate protocol version and advertise agent capabilities.""" + negotiated = min(protocol_version, self._SUPPORTED_PROTOCOL) + if negotiated < 1: + negotiated = 1 + return InitializeResponse( + protocol_version=negotiated, + agent_capabilities=AgentCapabilities( + load_session=False, + prompt_capabilities=PromptCapabilities( + image=False, + audio=False, + embedded_context=True, + ), + mcp_capabilities=McpCapabilities(http=True, sse=False), + ), + agent_info=Implementation( + name="sgr-agent-core", + title="SGR Agent Core", + version=__version__, + ), + auth_methods=[], + ) + + async def new_session( + self, + cwd: str, + mcp_servers: list[HttpMcpServer | SseMcpServer | McpServerStdio] | None = None, + **kwargs: Any, + ) -> NewSessionResponse: + """Create a new session id (working directory is stored for future + use).""" + session_id = f"sgr_{uuid.uuid4().hex}" + self._sessions[session_id] = _ACPSession(session_id=session_id, cwd=cwd) + return NewSessionResponse(session_id=session_id) + + async def load_session( + self, + cwd: str, + session_id: str, + mcp_servers: list[HttpMcpServer | SseMcpServer | McpServerStdio] | None = None, + **kwargs: Any, + ) -> LoadSessionResponse | None: + """Session restore is not supported.""" + return None + + async def list_sessions( + self, + cursor: str | None = None, + cwd: str | None = None, + **kwargs: Any, + ) -> ListSessionsResponse: + """Return basic metadata for active sessions.""" + sessions = [SessionInfo(session_id=s.session_id, cwd=s.cwd) for s in self._sessions.values()] + return ListSessionsResponse(sessions=sessions) + + async def set_session_mode( + self, + mode_id: str, + session_id: str, + **kwargs: Any, + ) -> SetSessionModeResponse | None: + """Session modes are not used.""" + return None + + async def set_session_model( + self, + model_id: str, + session_id: str, + **kwargs: Any, + ) -> SetSessionModelResponse | None: + """Per-session model switching is not implemented.""" + return None + + async def set_config_option( + self, + config_id: str, + session_id: str, + value: str | bool, + **kwargs: Any, + ) -> SetSessionConfigOptionResponse | None: + """Dynamic session config options are not implemented.""" + return None + + async def authenticate(self, method_id: str, **kwargs: Any) -> AuthenticateResponse | None: + """Authentication is not required for this agent.""" + return None + + async def prompt( + self, + prompt: list[Any], + session_id: str, + message_id: str | None = None, + **kwargs: Any, + ) -> PromptResponse: + """Run one user turn (or continue after clarification) for the SGR + agent.""" + if self._client is None: + raise RuntimeError("ACP client connection is not initialized") + + sess = self._sessions.get(session_id) + if sess is None: + raise ValueError(f"Unknown session id: {session_id}") + + text = extract_prompt_text(prompt) + if not text: + return PromptResponse(stop_reason="end_turn") + + if sess.agent and sess.agent._context.state == AgentStatesEnum.WAITING_FOR_CLARIFICATION: + await sess.agent.provide_clarification( + [{"role": "user", "content": text}], + replace_conversation=False, + ) + if sess.execute_task: + return await self._wait_turn(sess.agent, sess.execute_task) + return PromptResponse(stop_reason="refusal") + + if sess.agent is not None and sess.agent._context.state in AgentStatesEnum.FINISH_STATES.value: + sess.agent = None + sess.execute_task = None + + agent_def = self._resolve_agent_definition() + gen_cls = create_acp_streaming_generator_class(session_id, self._client) + agent = await AgentFactory.create( + agent_def, + [{"role": "user", "content": text}], + streaming_generator=gen_cls, + ) + sess.agent = agent + sess.execute_task = asyncio.create_task(agent.execute()) + assert sess.execute_task is not None + return await self._wait_turn(agent, sess.execute_task) + + async def _wait_turn(self, agent: BaseAgent, task: asyncio.Task) -> PromptResponse: + """Wait until the agent finishes, asks for clarification, or errors.""" + while not task.done(): + if agent._context.state == AgentStatesEnum.WAITING_FOR_CLARIFICATION: + return PromptResponse(stop_reason="end_turn") + await asyncio.sleep(0.05) + + exc = task.exception() + if exc is not None: + if isinstance(exc, asyncio.CancelledError): + return PromptResponse(stop_reason="cancelled") + return PromptResponse(stop_reason="refusal") + + return PromptResponse(stop_reason=self._map_stop_reason(agent._context.state)) + + def _map_stop_reason(self, state: AgentStatesEnum) -> str: + if state == AgentStatesEnum.CANCELLED: + return "cancelled" + if state in (AgentStatesEnum.FAILED, AgentStatesEnum.ERROR): + return "refusal" + return "end_turn" + + async def close_session(self, session_id: str, **kwargs: Any) -> CloseSessionResponse | None: + """Explicit close is not implemented.""" + return None + + async def cancel(self, session_id: str, **kwargs: Any) -> None: + """Cancel an in-flight prompt turn.""" + sess = self._sessions.get(session_id) + if sess and sess.agent: + await sess.agent.cancel() + + async def ext_method(self, method: str, params: dict[str, Any]) -> dict[str, Any]: + """Extension methods are not supported.""" + return {} + + async def ext_notification(self, method: str, params: dict[str, Any]) -> None: + """Extension notifications are ignored.""" diff --git a/sgr_agent_core/acp/streaming.py b/sgr_agent_core/acp/streaming.py new file mode 100644 index 00000000..f8b85ee8 --- /dev/null +++ b/sgr_agent_core/acp/streaming.py @@ -0,0 +1,90 @@ +"""Streaming generator that maps SGR stream events to ACP session updates.""" + +from __future__ import annotations + +import asyncio +from typing import Any + +from acp.contrib.tool_calls import ToolCallTracker +from acp.helpers import text_block, tool_content, update_agent_message_text +from acp.interfaces import Client + +from sgr_agent_core.stream import BaseStreamingGenerator + + +def create_acp_streaming_generator_class( + session_id: str, + client: Client, +) -> type[BaseStreamingGenerator]: + """Build a streaming generator class bound to an ACP session and client.""" + + class ACPStreamingGenerator(BaseStreamingGenerator): + """Forwards SGR streaming events to ACP ``session/update`` + notifications.""" + + name = "acp" + + def __init__(self, agent_id: str) -> None: + super().__init__() + self._session_id = session_id + self._client = client + self._tracker = ToolCallTracker() + + def _emit(self, update: Any) -> None: + async def _send() -> None: + await self._client.session_update(self._session_id, update) + + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return + loop.create_task(_send()) + + def add_content_delta(self, content: str, phase_id: str) -> None: + self._emit(update_agent_message_text(content)) + + def add_chunk(self, chunk: Any, phase_id: str) -> None: + delta = "" + choices = getattr(chunk, "choices", None) + if choices: + ch0 = choices[0] + d = getattr(ch0, "delta", None) + if d is not None and getattr(d, "content", None): + delta = d.content or "" + if delta: + self._emit(update_agent_message_text(delta)) + + def add_tool_call(self, phase_id: str, tool: Any) -> None: + if "-action" in phase_id: + iter_part = phase_id.split("-")[0] + reasoning_id = f"{iter_part}-reasoning" + try: + prog = self._tracker.progress( + reasoning_id, + status="completed", + content=[tool_content(text_block(""))], + ) + self._emit(prog) + except Exception: + pass + + title = getattr(tool, "tool_name", None) or tool.__class__.__name__ + kind = "think" if "reasoning" in phase_id else "other" + start = self._tracker.start(phase_id, title=str(title), kind=kind, status="in_progress") + self._emit(start) + + def add_tool_result(self, phase_id: str, content: str, tool_name: str | None = None) -> None: + text = content if len(content) <= 8000 else content[:8000] + "\n…" + prog = self._tracker.progress( + phase_id, + status="completed", + content=[tool_content(text_block(text))], + ) + self._emit(prog) + + def finish(self, phase_id: str, content: str | None = None, finish_reason: str = "stop") -> None: + if content: + self._emit(update_agent_message_text(str(content))) + super().finish() + + return ACPStreamingGenerator diff --git a/sgr_agent_core/agent_config.py b/sgr_agent_core/agent_config.py index 0569b5e2..e81e6004 100644 --- a/sgr_agent_core/agent_config.py +++ b/sgr_agent_core/agent_config.py @@ -4,10 +4,21 @@ from typing import ClassVar, Self import yaml +from pydantic import BaseModel, Field from pydantic_settings import BaseSettings, SettingsConfigDict from sgr_agent_core.agent_definition import AgentConfig, Definitions + +class AcpSettings(BaseModel): + """Settings for the ``sgracp`` Agent Client Protocol stdio server.""" + + agent: str | None = Field( + default=None, + description="Agent definition name from the agents section to run when using sgracp", + ) + + logger = logging.getLogger(__name__) @@ -17,6 +28,7 @@ class GlobalConfig(BaseSettings, AgentConfig, Definitions): # Directory where main config.yaml lives (if loaded via from_yaml) config_dir: Path | None = None + acp: AcpSettings | None = None def __new__(cls, *args, **kwargs): if cls._instance is None: diff --git a/sgr_agent_core/agent_factory.py b/sgr_agent_core/agent_factory.py index 54deb214..25e34134 100644 --- a/sgr_agent_core/agent_factory.py +++ b/sgr_agent_core/agent_factory.py @@ -1,5 +1,6 @@ """Agent Factory for dynamic agent creation from definitions.""" +import importlib import logging from typing import Any, Type, TypeVar @@ -12,7 +13,7 @@ from sgr_agent_core.base_agent import BaseAgent from sgr_agent_core.base_tool import BaseTool from sgr_agent_core.services import AgentRegistry, MCP2ToolConverter, StreamingGeneratorRegistry -from sgr_agent_core.stream import OpenAIStreamingGenerator +from sgr_agent_core.stream import BaseStreamingGenerator, OpenAIStreamingGenerator logger = logging.getLogger(__name__) @@ -86,12 +87,19 @@ def _resolve_tools_with_configs( return toolkit, tool_configs @classmethod - async def create(cls, agent_def: AgentDefinition, task_messages: list[ChatCompletionMessageParam]) -> Agent: + async def create( + cls, + agent_def: AgentDefinition, + task_messages: list[ChatCompletionMessageParam], + *, + streaming_generator: type[BaseStreamingGenerator] | None = None, + ) -> Agent: """Create an agent instance from a definition. Args: agent_def: Agent definition with configuration (classes already resolved) task_messages: Task messages in OpenAI ChatCompletionMessageParam format + streaming_generator: Optional streaming generator class (overrides execution config) Returns: Created agent instance @@ -110,8 +118,19 @@ async def create(cls, agent_def: AgentDefinition, task_messages: list[ChatComple # Already a class (either passed directly or resolved from ImportString by Pydantic) BaseClass = agent_def.base_class elif isinstance(agent_def.base_class, str): - # String - look up in registry - BaseClass = AgentRegistry.get(agent_def.base_class) + bc = agent_def.base_class + BaseClass = None + if "." in bc: + mod_name, _, cls_name = bc.rpartition(".") + try: + mod = importlib.import_module(mod_name) + cand = getattr(mod, cls_name, None) + if isinstance(cand, type) and issubclass(cand, BaseAgent): + BaseClass = cand + except Exception: + BaseClass = None + if BaseClass is None: + BaseClass = AgentRegistry.get(bc) if BaseClass is None: error_msg = ( @@ -137,6 +156,7 @@ async def create(cls, agent_def: AgentDefinition, task_messages: list[ChatComple for key, value in agent_def.model_dump().items(): agent_kwargs[key] = value + gen_cls = streaming_generator or cls._resolve_streaming_generator(agent_def.execution.streaming_generator) agent = BaseClass( task_messages=task_messages, def_name=agent_def.name, @@ -144,7 +164,7 @@ async def create(cls, agent_def: AgentDefinition, task_messages: list[ChatComple tool_configs=tool_configs, openai_client=cls._create_client(agent_def.llm), agent_config=agent_def, - streaming_generator=cls._resolve_streaming_generator(agent_def.execution.streaming_generator), + streaming_generator=gen_cls, **agent_kwargs, ) logger.info( diff --git a/sgr_agent_core/agents/sgr_tool_calling_agent.py b/sgr_agent_core/agents/sgr_tool_calling_agent.py index 96950315..47d5eb70 100644 --- a/sgr_agent_core/agents/sgr_tool_calling_agent.py +++ b/sgr_agent_core/agents/sgr_tool_calling_agent.py @@ -1,6 +1,7 @@ from typing import Literal, Type from openai import AsyncOpenAI, pydantic_function_tool +from pydantic import ValidationError from sgr_agent_core.agent_config import AgentConfig from sgr_agent_core.base_agent import BaseAgent @@ -79,29 +80,55 @@ async def _reasoning_phase(self) -> ReasoningTool: async def _select_action_phase(self, reasoning: ReasoningTool) -> BaseTool: phase_id = f"{self._context.iteration}-action" - async with self.openai_client.chat.completions.stream( - messages=await self._prepare_context(), - tools=await self._prepare_tools(), - tool_choice=self.tool_choice, - **self.config.llm.to_openai_client_kwargs(), - ) as stream: - async for event in stream: - if event.type == "chunk": - self.streaming_generator.add_chunk(event.chunk, phase_id) - completion = await stream.get_final_completion() + _fallback_content: str = "Task completed successfully" + completion = None try: - tool = completion.choices[0].message.tool_calls[0].function.parsed_arguments - except (IndexError, AttributeError, TypeError): - final_content = completion.choices[0].message.content or "Task completed successfully" - tool = FinalAnswerTool( - reasoning="Agent decided to complete the task", - completed_steps=[], - answer=final_content, - status=AgentStatesEnum.COMPLETED, - ) - if not isinstance(tool, BaseTool): - raise ValueError("Selected tool is not a valid BaseTool instance") - + async with self.openai_client.chat.completions.stream( + messages=await self._prepare_context(), + tools=await self._prepare_tools(), + tool_choice=self.tool_choice, + **self.config.llm.to_openai_client_kwargs(), + ) as stream: + async for event in stream: + if event.type == "chunk": + self.streaming_generator.add_chunk(event.chunk, phase_id) + completion = await stream.get_final_completion() + except ValidationError as exc: + self.logger.warning("Streaming validation error (%s), falling back to FinalAnswerTool", exc) + if completion is not None: + try: + _fallback_content = completion.choices[0].message.content or _fallback_content + tool = completion.choices[0].message.tool_calls[0].function.parsed_arguments + if not isinstance(tool, BaseTool): + raise TypeError(f"parsed_arguments returned {type(tool).__name__}, expected BaseTool") + self.conversation.append( + { + "role": "assistant", + "content": reasoning.remaining_steps[0] if reasoning.remaining_steps else "Completing", + "tool_calls": [ + { + "type": "function", + "id": phase_id, + "function": { + "name": tool.tool_name, + "arguments": tool.model_dump_json(), + }, + } + ], + } + ) + self.streaming_generator.add_tool_call(phase_id, tool) + return tool + except (IndexError, AttributeError, TypeError, ValidationError) as exc: + self.logger.warning( + "Tool call parsing failed (%s: %s), falling back to FinalAnswerTool", type(exc).__name__, exc + ) + tool = FinalAnswerTool( + reasoning="Agent decided to complete the task", + completed_steps=["Response synthesized without a tool call"], + answer=_fallback_content, + status=AgentStatesEnum.COMPLETED, + ) self.conversation.append( { "role": "assistant", diff --git a/sgr_agent_core/agents/tool_calling_agent.py b/sgr_agent_core/agents/tool_calling_agent.py index 580b47d3..bc82b209 100644 --- a/sgr_agent_core/agents/tool_calling_agent.py +++ b/sgr_agent_core/agents/tool_calling_agent.py @@ -1,11 +1,14 @@ from typing import Literal, Type from openai import AsyncOpenAI +from pydantic import ValidationError from sgr_agent_core.agent_config import AgentConfig from sgr_agent_core.base_agent import BaseAgent +from sgr_agent_core.models import AgentStatesEnum from sgr_agent_core.tools import ( BaseTool, + FinalAnswerTool, ) @@ -40,20 +43,46 @@ async def _reasoning_phase(self) -> None: async def _select_action_phase(self, reasoning=None) -> BaseTool: phase_id = f"{self._context.iteration}-action" - async with self.openai_client.chat.completions.stream( - messages=await self._prepare_context(), - tools=await self._prepare_tools(), - tool_choice=self.tool_choice, - **self.config.llm.to_openai_client_kwargs(), - ) as stream: - async for event in stream: - if event.type == "chunk": - self.streaming_generator.add_chunk(event.chunk, phase_id) - completion = await stream.get_final_completion() - tool = completion.choices[0].message.tool_calls[0].function.parsed_arguments + _fallback_content: str = "Task completed successfully" + completion = None + # ValidationError can be thrown inside the stream loop when OpenAI SDK + # tries to parse tool call arguments against the Pydantic schema. + # Other exceptions (e.g. TypeError from invalid kwargs) must propagate. + try: + async with self.openai_client.chat.completions.stream( + messages=await self._prepare_context(), + tools=await self._prepare_tools(), + tool_choice=self.tool_choice, + **self.config.llm.to_openai_client_kwargs(), + ) as stream: + async for event in stream: + if event.type == "chunk": + self.streaming_generator.add_chunk(event.chunk, phase_id) + completion = await stream.get_final_completion() + except ValidationError as exc: + self.logger.warning("Streaming validation error (%s), falling back to FinalAnswerTool", exc) + if completion is not None: + try: + _fallback_content = completion.choices[0].message.content or _fallback_content + tool = completion.choices[0].message.tool_calls[0].function.parsed_arguments + if not isinstance(tool, BaseTool): + raise TypeError(f"parsed_arguments returned {type(tool).__name__}, expected BaseTool") + return self._append_tool_call(phase_id, tool) + except (IndexError, AttributeError, TypeError, ValidationError) as exc: + self.logger.warning( + "Tool call parsing failed (%s: %s), falling back to FinalAnswerTool", type(exc).__name__, exc + ) + tool = FinalAnswerTool( + reasoning="Agent decided to complete the task", + completed_steps=["Response synthesized without a tool call"], + answer=_fallback_content, + status=AgentStatesEnum.COMPLETED, + ) + return self._append_tool_call(phase_id, tool) - if not isinstance(tool, BaseTool): - raise ValueError("Selected tool is not a valid BaseTool instance") + def _append_tool_call(self, phase_id: str, tool: BaseTool) -> BaseTool: + """Append the selected tool call to conversation history and notify + streaming.""" self.conversation.append( { "role": "assistant", diff --git a/tests/test_acp_bridge.py b/tests/test_acp_bridge.py new file mode 100644 index 00000000..24c7986b --- /dev/null +++ b/tests/test_acp_bridge.py @@ -0,0 +1,88 @@ +"""Tests for Agent Client Protocol (ACP) stdio bridge.""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest +from acp.schema import TextContentBlock + +from sgr_agent_core.agent_definition import AgentDefinition, ExecutionConfig, LLMConfig, PromptsConfig +from sgr_agent_core.agent_factory import AgentFactory +from sgr_agent_core.agents import SGRAgent +from sgr_agent_core.stream import BaseStreamingGenerator + + +@pytest.mark.asyncio +async def test_acp_extract_prompt_text_joins_text_blocks(): + """Prompt extraction should concatenate text parts from ACP content + blocks.""" + from sgr_agent_core.acp.bridge import extract_prompt_text + + blocks = [ + TextContentBlock(type="text", text="Hello "), + TextContentBlock(type="text", text="world"), + ] + assert extract_prompt_text(blocks) == "Hello world" + + +@pytest.mark.asyncio +async def test_acp_bridge_initialize_returns_agent_info(): + """Initialize should negotiate protocol version and return agent + capabilities.""" + from sgr_agent_core.acp.bridge import SGRACPBridge + + bridge = SGRACPBridge(default_agent_name="sgr_agent") + resp = await bridge.initialize(protocol_version=1, client_capabilities=None, client_info=None) + assert resp.protocol_version == 1 + assert resp.agent_info.name == "sgr-agent-core" + assert resp.agent_capabilities.load_session is False + + +@pytest.mark.asyncio +async def test_acp_bridge_new_session_returns_session_id(): + """new_session should create a session with a stable id prefix pattern.""" + from sgr_agent_core.acp.bridge import SGRACPBridge + + bridge = SGRACPBridge(default_agent_name="sgr_agent") + out = await bridge.new_session(cwd="/tmp", mcp_servers=None) + assert out.session_id.startswith("sgr_") + + +@pytest.mark.asyncio +async def test_agent_factory_create_accepts_streaming_generator_override(): + """AgentFactory.create should allow overriding streaming generator + class.""" + from tests.test_agent_factory import mock_global_config + + class DummyGen(BaseStreamingGenerator): + name = "dummy_acp_test" + + def __init__(self, agent_id: str) -> None: + super().__init__() + self.agent_id = agent_id + + with ( + patch("sgr_agent_core.agent_factory.MCP2ToolConverter.build_tools_from_mcp", return_value=[]), + mock_global_config(), + ): + agent_def = AgentDefinition( + name="sgr_agent", + base_class=SGRAgent, + tools=["reasoningtool"], + llm=LLMConfig(api_key="k", base_url="https://api.openai.com/v1"), + prompts=PromptsConfig( + system_prompt_str="s", + initial_user_request_str="i", + clarification_response_str="c", + ), + execution=ExecutionConfig(streaming_generator="openai"), + ) + + agent = await AgentFactory.create( + agent_def, + [{"role": "user", "content": "x"}], + streaming_generator=DummyGen, + ) + + assert isinstance(agent.streaming_generator, DummyGen)