diff --git a/src/praisonai-agents/praisonaiagents/managed/__init__.py b/src/praisonai-agents/praisonaiagents/managed/__init__.py index 3d5142e8c..1af8d07b9 100644 --- a/src/praisonai-agents/praisonaiagents/managed/__init__.py +++ b/src/praisonai-agents/praisonaiagents/managed/__init__.py @@ -41,4 +41,13 @@ "ComputeConfig", "InstanceInfo", "InstanceStatus", + "ManagedBackendProtocol", ] + + +def __getattr__(name: str): + """Lazy import for heavy dependencies.""" + if name == "ManagedBackendProtocol": + from ..agent.protocols import ManagedBackendProtocol + return ManagedBackendProtocol + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/praisonai-agents/tests/managed/test_session_info_schema.py b/src/praisonai-agents/tests/managed/test_session_info_schema.py new file mode 100644 index 000000000..37a3fb048 --- /dev/null +++ b/src/praisonai-agents/tests/managed/test_session_info_schema.py @@ -0,0 +1,141 @@ +""" +Test unified session info schema between managed agent backends. + +Verifies that both AnthropicManagedAgent and LocalManagedAgent return +consistent session information with the same schema. +""" + +import pytest +from unittest.mock import patch, Mock + + +def test_session_info_schema_consistency(): + """Test that both managed agents return identical session schema.""" + from praisonai.integrations.managed_agents import ManagedAgent + from praisonai.integrations.managed_local import LocalManagedAgent + from praisonai.integrations._session_info import SessionInfo + + # Create both backends + managed = ManagedAgent() + local = LocalManagedAgent() + + # Mock session data for both + managed._session_id = "test_session_anthropic" + local._session_id = "test_session_local" + + # Get session info from both + with patch.object(managed, '_get_client') as mock_client: + # Mock Anthropic API response + mock_session = Mock() + mock_session.id = "test_session_anthropic" + mock_session.status = "active" + mock_session.title = "Test Session" + mock_usage = Mock() + mock_usage.input_tokens = 100 + mock_usage.output_tokens = 50 + mock_session.usage = mock_usage + + mock_client.return_value.beta.sessions.retrieve.return_value = mock_session + + managed_info = managed.retrieve_session() + + local_info = local.retrieve_session() + + # Both should have exact same keys + assert set(managed_info.keys()) == set(local_info.keys()) + + # Required keys should be present + required_keys = ["id", "status", "title", "usage"] + for key in required_keys: + assert key in managed_info + assert key in local_info + + # Usage should have consistent structure + usage_keys = ["input_tokens", "output_tokens"] + for key in usage_keys: + assert key in managed_info["usage"] + assert key in local_info["usage"] + assert isinstance(managed_info["usage"][key], int) + assert isinstance(local_info["usage"][key], int) + + +def test_session_info_dataclass(): + """Test SessionInfo dataclass functionality.""" + from praisonai.integrations._session_info import SessionInfo, SessionUsage + + # Test default construction + info = SessionInfo() + assert info.id == "" + assert info.status == "unknown" + assert info.title == "" + assert info.usage.input_tokens == 0 + assert info.usage.output_tokens == 0 + + # Test with data + usage = SessionUsage(input_tokens=100, output_tokens=50) + info = SessionInfo( + id="test_session", + status="active", + title="Test Session", + usage=usage + ) + + # Test to_dict + data = info.to_dict() + expected = { + "id": "test_session", + "status": "active", + "title": "Test Session", + "usage": { + "input_tokens": 100, + "output_tokens": 50 + } + } + assert data == expected + + # Test from_dict + reconstructed = SessionInfo.from_dict(data) + assert reconstructed.id == info.id + assert reconstructed.status == info.status + assert reconstructed.title == info.title + assert reconstructed.usage.input_tokens == info.usage.input_tokens + assert reconstructed.usage.output_tokens == info.usage.output_tokens + + +def test_session_info_backward_compatibility(): + """Test that SessionInfo handles missing fields gracefully.""" + from praisonai.integrations._session_info import SessionInfo + + # Test partial data (old format) + partial_data = { + "id": "test_session", + "status": "active" + # Missing title and usage + } + + info = SessionInfo.from_dict(partial_data) + assert info.id == "test_session" + assert info.status == "active" + assert info.title == "" # Default + assert info.usage.input_tokens == 0 # Default + assert info.usage.output_tokens == 0 # Default + + # Test empty data + info = SessionInfo.from_dict({}) + data = info.to_dict() + + # Should have all required keys with defaults + required_keys = ["id", "status", "title", "usage"] + for key in required_keys: + assert key in data + + +def test_managed_backend_protocol_re_export(): + """Test that ManagedBackendProtocol can be imported from managed module.""" + # Test lazy import works + from praisonaiagents.managed import ManagedBackendProtocol + assert ManagedBackendProtocol is not None + + # Test it's the same as the original + from praisonaiagents.agent.protocols import ManagedBackendProtocol as OriginalProtocol + assert ManagedBackendProtocol is OriginalProtocol \ No newline at end of file diff --git a/src/praisonai/praisonai/cli/commands/managed.py b/src/praisonai/praisonai/cli/commands/managed.py index 3e564a3a2..df3a5540a 100644 --- a/src/praisonai/praisonai/cli/commands/managed.py +++ b/src/praisonai/praisonai/cli/commands/managed.py @@ -316,6 +316,31 @@ def sessions_resume( print(result) +@sessions_app.command("delete") +def sessions_delete( + session_id: str = typer.Argument(..., help="Session ID to delete (sesn_01...)"), + confirm: bool = typer.Option(False, "--yes", "-y", help="Skip confirmation prompt"), +): + """Delete a managed session permanently. + + Example: + praisonai managed sessions delete sesn_01AbCdEf + praisonai managed sessions delete sesn_01AbCdEf --yes + """ + if not confirm: + typer.confirm(f"Delete session {session_id}? This cannot be undone.", abort=True) + + try: + client = _get_client() + # Note: Anthropic API may not have delete endpoint yet + # This is a placeholder for when it becomes available + typer.echo(f"Session deletion not yet supported by Anthropic API") + typer.echo(f"Sessions will eventually expire automatically") + except Exception as e: + typer.echo(f"Error deleting session: {e}", err=True) + raise typer.Exit(1) + + # ───────────────────────────────────────────────────────────────────────────── # agents sub-commands # ───────────────────────────────────────────────────────────────────────────── @@ -397,6 +422,29 @@ def agents_update( typer.echo(f"Updated agent: {updated.id} (v{getattr(updated,'version','')})") +@agents_app.command("delete") +def agents_delete( + agent_id: str = typer.Argument(..., help="Agent ID to delete (agent_01...)"), + confirm: bool = typer.Option(False, "--yes", "-y", help="Skip confirmation prompt"), +): + """Delete a managed agent permanently. + + Example: + praisonai managed agents delete agent_01AbCdEf + praisonai managed agents delete agent_01AbCdEf --yes + """ + if not confirm: + typer.confirm(f"Delete agent {agent_id}? This cannot be undone.", abort=True) + + try: + client = _get_client() + client.beta.agents.delete(agent_id) + typer.echo(f"Agent {agent_id} deleted successfully") + except Exception as e: + typer.echo(f"Error deleting agent: {e}", err=True) + raise typer.Exit(1) + + # ───────────────────────────────────────────────────────────────────────────── # envs sub-commands # ───────────────────────────────────────────────────────────────────────────── @@ -441,6 +489,54 @@ def envs_get( typer.echo(f"Config: {cfg}") +@envs_app.command("update") +def envs_update( + env_id: str = typer.Argument(..., help="Environment ID to update (env_01...)"), + name: Optional[str] = typer.Option(None, "--name", help="Update environment name"), +): + """Update an existing environment's configuration. + + Example: + praisonai managed envs update env_01AbCdEf --name "New Environment Name" + """ + client = _get_client() + kwargs = {} + if name: + kwargs["name"] = name + if not kwargs: + typer.echo("Nothing to update. Pass --name.") + raise typer.Exit(0) + try: + updated = client.beta.environments.update(env_id, **kwargs) + typer.echo(f"Updated environment: {updated.id}") + except Exception as e: + typer.echo(f"Error updating environment: {e}", err=True) + raise typer.Exit(1) + + +@envs_app.command("delete") +def envs_delete( + env_id: str = typer.Argument(..., help="Environment ID to delete (env_01...)"), + confirm: bool = typer.Option(False, "--yes", "-y", help="Skip confirmation prompt"), +): + """Delete an environment permanently. + + Example: + praisonai managed envs delete env_01AbCdEf + praisonai managed envs delete env_01AbCdEf --yes + """ + if not confirm: + typer.confirm(f"Delete environment {env_id}? This cannot be undone.", abort=True) + + try: + client = _get_client() + client.beta.environments.delete(env_id) + typer.echo(f"Environment {env_id} deleted successfully") + except Exception as e: + typer.echo(f"Error deleting environment: {e}", err=True) + raise typer.Exit(1) + + # ───────────────────────────────────────────────────────────────────────────── # ids sub-commands (save / restore / show — no Anthropic IDs are user-defined) # ───────────────────────────────────────────────────────────────────────────── diff --git a/src/praisonai/praisonai/integrations/_session_info.py b/src/praisonai/praisonai/integrations/_session_info.py new file mode 100644 index 000000000..9de09d37a --- /dev/null +++ b/src/praisonai/praisonai/integrations/_session_info.py @@ -0,0 +1,58 @@ +""" +Unified session info schema for managed agents. + +Provides consistent session information structure across different managed agent backends. +""" + +from dataclasses import dataclass, field +from typing import Dict, Any, Optional + + +@dataclass +class SessionUsage: + """Token usage information.""" + input_tokens: int = 0 + output_tokens: int = 0 + + +@dataclass +class SessionInfo: + """Unified session information across managed agent backends. + + Provides consistent schema for session metadata returned by + retrieve_session() methods in both AnthropicManagedAgent and LocalManagedAgent. + + All fields are always present with sensible defaults for backward compatibility. + """ + id: str = "" + status: str = "unknown" + title: str = "" + usage: SessionUsage = field(default_factory=SessionUsage) + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary format for backward compatibility.""" + return { + "id": self.id, + "status": self.status, + "title": self.title, + "usage": { + "input_tokens": self.usage.input_tokens, + "output_tokens": self.usage.output_tokens, + } + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "SessionInfo": + """Create from dictionary with defaults for missing fields.""" + usage_data = data.get("usage", {}) + usage = SessionUsage( + input_tokens=usage_data.get("input_tokens", 0), + output_tokens=usage_data.get("output_tokens", 0) + ) + + return cls( + id=data.get("id", ""), + status=data.get("status", "unknown"), + title=data.get("title", ""), + usage=usage + ) \ No newline at end of file diff --git a/src/praisonai/praisonai/integrations/_tool_mapping.py b/src/praisonai/praisonai/integrations/_tool_mapping.py new file mode 100644 index 000000000..d7f93307e --- /dev/null +++ b/src/praisonai/praisonai/integrations/_tool_mapping.py @@ -0,0 +1,52 @@ +""" +Unified tool alias mapping for managed agents. + +Consolidates the tool mapping logic that was previously split between +managed_agents.TOOL_MAPPING and managed_local.TOOL_ALIAS_MAP. +""" + +from typing import List + + +# Canonical tool alias mapping for all managed agent backends +UNIFIED_TOOL_MAPPING = { + "bash": "execute_command", + "read": "read_file", + "write": "write_file", + "edit": "apply_diff", # Use apply_diff for structured edits + "glob": "list_files", + "grep": "search_file", # Use search_file for content search + "web_fetch": "web_fetch", # Keep as web_fetch for consistency + "web_crawl": "web_fetch", # Alias for web_crawl + "search": "search_web", + "web_search": "search_web", +} + + +def map_managed_tools(managed_tools: List[str]) -> List[str]: + """Map managed agent tool names to PraisonAI tool names. + + Uses unified mapping that resolves conflicts between Anthropic and Local backends: + - edit → apply_diff (structured diff edits preferred) + - grep → search_file (file content search preferred over shell command) + - web_fetch → web_fetch (canonical name) + + Args: + managed_tools: List of tool names from managed agent configuration + + Returns: + List of PraisonAI tool names + """ + return [UNIFIED_TOOL_MAPPING.get(tool, tool) for tool in managed_tools] + + +def get_tool_alias(tool_name: str) -> str: + """Get the canonical PraisonAI tool name for a managed tool. + + Args: + tool_name: Original tool name (e.g. 'bash', 'grep') + + Returns: + Canonical PraisonAI tool name (e.g. 'execute_command', 'search_file') + """ + return UNIFIED_TOOL_MAPPING.get(tool_name, tool_name) \ No newline at end of file diff --git a/src/praisonai/praisonai/integrations/managed_agents.py b/src/praisonai/praisonai/integrations/managed_agents.py index 185221ac5..188c131fe 100644 --- a/src/praisonai/praisonai/integrations/managed_agents.py +++ b/src/praisonai/praisonai/integrations/managed_agents.py @@ -36,6 +36,19 @@ logger = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# Exceptions +# --------------------------------------------------------------------------- + +class ManagedSandboxRequired(Exception): + """Raised when LocalManagedAgent requires sandbox execution but compute provider is missing. + + This safety exception prevents accidental host-side package installation or + execution when the user expects sandboxed behavior. + """ + pass + + # --------------------------------------------------------------------------- # ManagedConfig — Anthropic-specific configuration dataclass # Lives in the Wrapper (not Core SDK) because its fields map directly to @@ -259,7 +272,7 @@ def _ensure_session(self) -> str: # ------------------------------------------------------------------ # Event processing helpers # ------------------------------------------------------------------ - def _process_events(self, client, session_id, stream, *, collect: bool = True, stream_live: bool = False): + def _process_events(self, client, session_id, stream, *, collect: bool = True, stream_live: bool = False, emitter=None, agent_name="Agent"): """Walk the SSE stream and return (text_parts, tool_log). Handles: @@ -277,6 +290,7 @@ def _process_events(self, client, session_id, stream, *, collect: bool = True, s text_parts: List[str] = [] tool_log: List[str] = [] + last_tool_start = None # Track last tool start for synthetic end event for event in stream: etype = getattr(event, "type", None) @@ -292,12 +306,32 @@ def _process_events(self, client, session_id, stream, *, collect: bool = True, s elif etype == "agent.tool_use": name = getattr(event, "name", "unknown") + tool_input = getattr(event, "input", {}) + tool_use_id = getattr(event, "id", "") tool_log.append(name) logger.debug("[managed] tool_use: %s", name) if stream_live: _sys.stdout.write(f"\n[Using tool: {name}]\n") _sys.stdout.flush() + # Emit tool_call_start event + if emitter: + # Emit synthetic tool_call_end for previous tool if any + if last_tool_start: + emitter.tool_call_end( + agent_name=agent_name, + tool_name=last_tool_start["name"], + metadata={"tool_use_id": last_tool_start["id"], "synthetic": True} + ) + + emitter.tool_call_start( + agent_name=agent_name, + tool_name=name, + input_data=tool_input, + metadata={"tool_use_id": tool_use_id} + ) + last_tool_start = {"name": name, "id": tool_use_id} + # Handle tool confirmation (always_ask policy) if getattr(event, "needs_confirmation", False): approved = True @@ -358,6 +392,14 @@ def _process_events(self, client, session_id, stream, *, collect: bool = True, s self.total_input_tokens += getattr(usage, "input_tokens", 0) self.total_output_tokens += getattr(usage, "output_tokens", 0) + # Emit final tool_call_end if there was a tool in progress + if emitter and last_tool_start: + emitter.tool_call_end( + agent_name=agent_name, + tool_name=last_tool_start["name"], + metadata={"tool_use_id": last_tool_start["id"], "synthetic": True} + ) + if tool_log: logger.info("[managed] tools used: %s", tool_log) @@ -382,28 +424,57 @@ def _execute_sync(self, prompt: str, stream_live: bool = False) -> str: (token-by-token streaming). The full text is still returned. """ import sys + from praisonaiagents.trace.context_events import get_context_emitter + + emitter = get_context_emitter() + agent_name = self._cfg.get("name", "Agent") + + # Emit agent_start event + emitter.agent_start( + agent_name=agent_name, + metadata={ + "input": prompt, + "provider": "anthropic", + "model": self._cfg.get("model", "claude-sonnet-4-6"), + "session_id": getattr(self, "session_id", "") + } + ) - client = self._get_client() - session_id = self._ensure_session() - - with client.beta.sessions.events.stream(session_id) as stream: - client.beta.sessions.events.send( - session_id, - events=[{ - "type": "user.message", - "content": [{"type": "text", "text": prompt}], - }], - ) - text_parts, _ = self._process_events( - client, session_id, stream, collect=True, - stream_live=stream_live, - ) + end_metadata = {"status": "completed"} + try: + client = self._get_client() + session_id = self._ensure_session() - if stream_live: - sys.stdout.write("\n") - sys.stdout.flush() + with client.beta.sessions.events.stream(session_id) as stream: + client.beta.sessions.events.send( + session_id, + events=[{ + "type": "user.message", + "content": [{"type": "text", "text": prompt}], + }], + ) + text_parts, tool_log = self._process_events( + client, session_id, stream, collect=True, + stream_live=stream_live, emitter=emitter, agent_name=agent_name, + ) + + if stream_live: + sys.stdout.write("\n") + sys.stdout.flush() + + result = "".join(text_parts) + + # Emit llm_response event + if result: + emitter.llm_response(agent_name=agent_name, response_content=result) - return "".join(text_parts) + return result + + except Exception as e: + end_metadata = {"status": "error", "error": str(e)} + raise + finally: + emitter.agent_end(agent_name=agent_name, metadata=end_metadata) # ------------------------------------------------------------------ # stream() — ManagedBackendProtocol @@ -509,22 +580,44 @@ def interrupt(self) -> None: # retrieve_session — ManagedBackendProtocol # ------------------------------------------------------------------ def retrieve_session(self) -> Dict[str, Any]: - """Retrieve current session metadata and usage from the API.""" + """Retrieve current session metadata and usage from the API. + + Returns unified SessionInfo schema with all fields always present. + """ + from ._session_info import SessionInfo, SessionUsage + if not self._session_id: - return {} - client = self._get_client() - sess = client.beta.sessions.retrieve(self._session_id) - result: Dict[str, Any] = { - "id": getattr(sess, "id", self._session_id), - "status": getattr(sess, "status", None), - } - usage = getattr(sess, "usage", None) - if usage: - result["usage"] = { - "input_tokens": getattr(usage, "input_tokens", 0), - "output_tokens": getattr(usage, "output_tokens", 0), - } - return result + return SessionInfo().to_dict() + + try: + client = self._get_client() + sess = client.beta.sessions.retrieve(self._session_id) + + # Extract usage if available + usage = getattr(sess, "usage", None) + session_usage = SessionUsage() + if usage: + session_usage = SessionUsage( + input_tokens=getattr(usage, "input_tokens", 0), + output_tokens=getattr(usage, "output_tokens", 0) + ) + + # Create unified session info + session_info = SessionInfo( + id=getattr(sess, "id", self._session_id), + status=getattr(sess, "status", "active"), + title=getattr(sess, "title", ""), + usage=session_usage + ) + + return session_info.to_dict() + + except Exception: + # Fallback on API errors + return SessionInfo( + id=self._session_id, + status="unknown" + ).to_dict() # ------------------------------------------------------------------ # list_sessions — ManagedBackendProtocol @@ -639,21 +732,12 @@ def managed_session_id(self) -> Optional[str]: # --------------------------------------------------------------------------- # Tool mapping helpers # --------------------------------------------------------------------------- -TOOL_MAPPING = { - "bash": "execute_command", - "read": "read_file", - "write": "write_file", - "edit": "apply_diff", - "glob": "list_files", - "grep": "search_file", - "web_fetch": "web_fetch", - "search": "search_web", -} +# Import unified mapping to consolidate with managed_local +from ._tool_mapping import map_managed_tools, UNIFIED_TOOL_MAPPING -def map_managed_tools(managed_tools: List[str]) -> List[str]: - """Map managed agent tool names to PraisonAI tool names.""" - return [TOOL_MAPPING.get(tool, tool) for tool in managed_tools] +# Backward compatibility alias +TOOL_MAPPING = UNIFIED_TOOL_MAPPING # --------------------------------------------------------------------------- @@ -709,4 +793,4 @@ def ManagedAgent( # ── Backward-compatible aliases ── ManagedAgentIntegration = ManagedAgent -ManagedBackendConfig = ManagedConfig \ No newline at end of file +ManagedBackendConfig = ManagedConfig diff --git a/src/praisonai/praisonai/integrations/managed_local.py b/src/praisonai/praisonai/integrations/managed_local.py index 80b641374..38341115d 100644 --- a/src/praisonai/praisonai/integrations/managed_local.py +++ b/src/praisonai/praisonai/integrations/managed_local.py @@ -50,17 +50,11 @@ "search_web", ] -TOOL_ALIAS_MAP = { - "bash": "execute_command", - "read": "read_file", - "write": "write_file", - "edit": "write_file", - "glob": "list_files", - "grep": "execute_command", - "web_fetch": "web_crawl", - "search": "search_web", - "web_search": "search_web", -} +# Import unified mapping to consolidate with managed_agents +from ._tool_mapping import get_tool_alias, UNIFIED_TOOL_MAPPING + +# Backward compatibility alias +TOOL_ALIAS_MAP = UNIFIED_TOOL_MAPPING @dataclass @@ -88,11 +82,12 @@ class LocalManagedConfig: metadata: Dict[str, Any] = field(default_factory=dict) # ── Environment fields ── - sandbox_type: str = "subprocess" + sandbox_type: str = "subprocess" # DEPRECATED: Use compute= parameter instead working_dir: str = "" env: Dict[str, str] = field(default_factory=dict) packages: Optional[Dict[str, List[str]]] = None networking: Dict[str, Any] = field(default_factory=lambda: {"type": "unrestricted"}) + host_packages_ok: bool = False # Allow pip install on host Python (unsafe) # ── Session fields ── session_title: str = "PraisonAI local session" @@ -132,7 +127,7 @@ def _translate_anthropic_tools(tools_config: List) -> List[str]: default_enabled = entry.get("default_config", {}).get("enabled", True) for cfg in entry.get("configs", []): name = cfg.get("name", "") - alias = TOOL_ALIAS_MAP.get(name, name) + alias = get_tool_alias(name) if cfg.get("enabled", default_enabled): enabled.add(alias) else: @@ -305,7 +300,7 @@ def _resolve_tools(self) -> List: # Normalize aliases resolved_names = [] for name in tool_names: - resolved_names.append(TOOL_ALIAS_MAP.get(name, name)) + resolved_names.append(get_tool_alias(name)) # Import tools lazily tools = [] @@ -460,29 +455,83 @@ def _restore_state(self) -> None: if saved_cfg.get("provider"): self.provider = saved_cfg["provider"] - def _install_packages(self) -> None: + async def _install_packages(self) -> None: """Install packages specified in config before agent starts.""" packages = self._cfg.get("packages") if not packages: return pip_pkgs = packages.get("pip", []) if isinstance(packages, dict) else [] - if pip_pkgs: - cmd = [sys.executable, "-m", "pip", "install", "-q"] + pip_pkgs - logger.info("[local_managed] installing pip packages: %s", pip_pkgs) + if not pip_pkgs: + return + + # If compute provider is attached, install in sandbox + if self._compute and self._compute_instance_id: + logger.info("[local_managed] installing pip packages in sandbox: %s", pip_pkgs) + import shlex + cmd = f"{shlex.quote(sys.executable)} -m pip install -q " + " ".join( + shlex.quote(pkg) for pkg in pip_pkgs + ) try: - subprocess.run(cmd, check=True, capture_output=True, timeout=120) - except subprocess.CalledProcessError as e: - logger.warning("[local_managed] pip install failed: %s", e.stderr) - except subprocess.TimeoutExpired: - logger.warning("[local_managed] pip install timed out") + await self._compute.execute(self._compute_instance_id, cmd, timeout=120) + except Exception as e: + logger.warning("[local_managed] sandbox pip install failed: %s", e) + return + + # No compute provider - check if host installation is allowed + if not self._cfg.get("host_packages_ok", False): + from praisonai.integrations.managed_agents import ManagedSandboxRequired + raise ManagedSandboxRequired( + "LocalManagedAgent: packages= requires compute= for safety. " + "Either:\n" + "1. Add compute='docker' (recommended), or\n" + "2. Set LocalManagedConfig(host_packages_ok=True) to allow host pip install (unsafe)" + ) + + # Host installation (unsafe but explicitly allowed) + cmd = [sys.executable, "-m", "pip", "install", "-q"] + pip_pkgs + logger.warning("[local_managed] installing pip packages on HOST (unsafe): %s", pip_pkgs) + try: + subprocess.run(cmd, check=True, capture_output=True, timeout=120) + except subprocess.CalledProcessError as e: + logger.warning("[local_managed] pip install failed: %s", e.stderr) + except subprocess.TimeoutExpired: + logger.warning("[local_managed] pip install timed out") + + async def _ensure_compute(self) -> None: + """Provision compute instance if compute provider is attached.""" + if not self._compute or self._compute_instance_id: + return + + logger.info("[local_managed] provisioning compute instance") + try: + from praisonaiagents.managed.protocols import ComputeConfig + + # Create compute config with our environment settings + config = ComputeConfig( + packages=self._cfg.get("packages", {}), + env=self._cfg.get("env", {}), + working_dir=self._cfg.get("working_dir", "/workspace"), + ) + + instance_info = await self._compute.provision(config) + self._compute_instance_id = instance_info.instance_id + logger.info("[local_managed] compute instance provisioned: %s", self._compute_instance_id) + + except Exception as e: + logger.error("[local_managed] failed to provision compute: %s", e) + raise - def _ensure_agent(self) -> Any: + async def _ensure_agent(self) -> Any: """Create or return the inner PraisonAI Agent.""" if self._inner_agent is not None: return self._inner_agent - self._install_packages() + # Provision compute instance if needed + await self._ensure_compute() + + # Install packages (in sandbox or host as configured) + await self._install_packages() from praisonaiagents import Agent @@ -530,8 +579,43 @@ def _ensure_session(self) -> str: # ------------------------------------------------------------------ async def execute(self, prompt: str, **kwargs) -> str: """Execute prompt locally and return full response.""" - loop = asyncio.get_running_loop() - return await loop.run_in_executor(None, self._execute_sync, prompt) + from praisonaiagents.trace.context_events import get_context_emitter + + emitter = get_context_emitter() + agent_name = self._cfg.get("name", "Agent") + + # Emit agent_start event for managed level + emitter.agent_start( + agent_name=agent_name, + metadata={ + "input": prompt, + "provider": "local", + "model": self._cfg.get("model", self._resolve_model()), + "compute": "sandbox" if self._compute else "host", + "session_id": getattr(self, "_session_id", "") + } + ) + + end_metadata = {"status": "completed"} + try: + agent = await self._ensure_agent() + self._ensure_session() + self._persist_message("user", prompt) + + # Execute via inner agent (which will emit its own context events) + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, agent.chat, prompt) + + self._persist_message("assistant", result) + self._sync_usage() + + return result + + except Exception as e: + end_metadata = {"status": "error", "error": str(e)} + raise + finally: + emitter.agent_end(agent_name=agent_name, metadata=end_metadata) def _sync_usage(self) -> None: """Sync token usage from inner agent to managed backend counters.""" @@ -547,7 +631,21 @@ def _persist_message(self, role: str, content: str) -> None: def _execute_sync(self, prompt: str, stream_live: bool = False) -> str: """Synchronous execution using PraisonAI Agent.chat().""" - agent = self._ensure_agent() + # Note: This method is kept for backwards compatibility but + # cannot provision compute instances. Use execute() instead. + if self._inner_agent is None: + # Try sync fallback for packages without compute + try: + import asyncio + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + agent = loop.run_until_complete(self._ensure_agent()) + loop.close() + except Exception as e: + raise RuntimeError(f"Cannot initialize agent synchronously: {e}. Use async execute() instead.") + else: + agent = self._inner_agent + self._ensure_session() self._persist_message("user", prompt) @@ -671,16 +769,25 @@ def interrupt(self) -> None: # retrieve_session / list_sessions — ManagedBackendProtocol # ------------------------------------------------------------------ def retrieve_session(self) -> Dict[str, Any]: - """Retrieve current session metadata.""" + """Retrieve current session metadata. + + Returns unified SessionInfo schema with all fields always present. + """ + from ._session_info import SessionInfo, SessionUsage + self._sync_usage() - return { - "id": self._session_id, - "status": "idle" if self._session_id else "none", - "usage": { - "input_tokens": self.total_input_tokens, - "output_tokens": self.total_output_tokens, - }, - } + + session_info = SessionInfo( + id=self._session_id or "", + status="idle" if self._session_id else "none", + title=self._cfg.get("session_title", ""), + usage=SessionUsage( + input_tokens=self.total_input_tokens, + output_tokens=self.total_output_tokens + ) + ) + + return session_info.to_dict() def list_sessions(self, **kwargs) -> List[Dict[str, Any]]: """List all sessions created in this backend instance.""" diff --git a/src/praisonai/tests/integration/README.md b/src/praisonai/tests/integration/README.md index 720c0a0bb..6e49c7247 100644 --- a/src/praisonai/tests/integration/README.md +++ b/src/praisonai/tests/integration/README.md @@ -279,4 +279,53 @@ Generate detailed coverage: python tests/test_runner.py --pattern frameworks --coverage ``` -This will show which integration test code paths are covered and highlight areas needing additional testing. \ No newline at end of file +This will show which integration test code paths are covered and highlight areas needing additional testing. + +## Real Agentic Tests + +**Location:** `test_managed_real.py` + +These are special tests that make actual LLM API calls to verify end-to-end managed agent functionality. They are gated behind environment variables for safety and cost control. + +### Running Real Agentic Tests + +**Prerequisites:** +- Set `RUN_REAL_AGENTIC=1` to enable the tests +- Set appropriate API keys: + - `ANTHROPIC_API_KEY` for Anthropic Managed Agents + - `OPENAI_API_KEY` for Local Managed Agents + +**Run all real agentic tests:** +```bash +RUN_REAL_AGENTIC=1 ANTHROPIC_API_KEY=your_key OPENAI_API_KEY=your_key pytest src/praisonai/tests/integration/test_managed_real.py -v +``` + +**Run specific real test:** +```bash +RUN_REAL_AGENTIC=1 OPENAI_API_KEY=your_key pytest src/praisonai/tests/integration/test_managed_real.py::test_local_managed_real_openai -v +``` + +### Real Agentic Test Coverage + +- ✅ **Anthropic Managed Agents:** End-to-end execution with Claude models +- ✅ **Local Managed Agents:** End-to-end execution with OpenAI models +- ✅ **Multi-turn sessions:** Context preservation across conversation turns +- ✅ **Trace event emission:** Integration with observability system +- ✅ **Package safety:** Safety checks for package installation + +### Important Notes + +**Cost Awareness:** +- These tests make real API calls and incur costs +- Use fast/cheap models (claude-haiku-4-5, gpt-4o-mini) +- Keep prompts brief and specific + +**Reliability:** +- Tests print actual LLM outputs for human verification +- May occasionally fail due to LLM variability +- Should pass consistently with the given prompts + +**Safety:** +- Tests are skipped by default (`RUN_REAL_AGENTIC=1` required) +- No package installation tests run on CI (gated by API keys) +- Safety checks are tested without actual installation \ No newline at end of file diff --git a/src/praisonai/tests/integration/test_managed_real.py b/src/praisonai/tests/integration/test_managed_real.py new file mode 100644 index 000000000..90ab7f5f0 --- /dev/null +++ b/src/praisonai/tests/integration/test_managed_real.py @@ -0,0 +1,193 @@ +""" +Real agentic integration tests for managed agents. + +These tests use actual LLM providers and are gated behind environment variables. +Run with: RUN_REAL_AGENTIC=1 pytest src/praisonai/tests/integration/test_managed_real.py + +Requirements: +- ANTHROPIC_API_KEY for Anthropic Managed Agents +- OPENAI_API_KEY for Local Managed Agents +""" + +import os +import pytest +from praisonaiagents.trace.context_events import ContextListSink, trace_context + + +# Skip all tests unless explicitly enabled +pytestmark = pytest.mark.skipif( + not os.environ.get("RUN_REAL_AGENTIC"), + reason="Set RUN_REAL_AGENTIC=1 to run real agentic tests" +) + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_anthropic_managed_real(): + """Real agentic test for Anthropic Managed Agents.""" + anthropic = pytest.importorskip("anthropic") + + api_key = os.environ.get("ANTHROPIC_API_KEY") + if not api_key: + pytest.skip("ANTHROPIC_API_KEY not set") + + from praisonai.integrations.managed_agents import ManagedAgent, ManagedConfig + from praisonaiagents import Agent + + # Create managed backend with haiku for speed + config = ManagedConfig( + model="claude-haiku-4-5", + system="You are a helpful assistant. Keep responses very brief.", + name="TestAgent" + ) + managed = ManagedAgent(config=config, api_key=api_key) + + # Create Agent with backend + agent = Agent(name="test", backend=managed) + + # Execute real prompt + result = await agent.execute("Say hello in exactly one sentence.") + + print(f"\nAnthropicManagedAgent result:\n{result}") + + # Assertions + assert isinstance(result, str) + assert len(result.strip()) > 0 + assert managed.total_input_tokens > 0 + assert managed.session_id is not None and len(managed.session_id) > 0 + + # Test session persistence + first_session = managed.session_id + result2 = await agent.execute("What did you just say?") + assert managed.session_id == first_session # Same session + + print(f"Second response: {result2}") + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_local_managed_real_openai(): + """Real agentic test for Local Managed Agents with OpenAI.""" + api_key = os.environ.get("OPENAI_API_KEY") + if not api_key: + pytest.skip("OPENAI_API_KEY not set") + + from praisonai.integrations.managed_local import LocalManagedAgent, LocalManagedConfig + from praisonaiagents import Agent + + # Create local managed backend + config = LocalManagedConfig( + model="gpt-4o-mini", + system="You are a helpful assistant. Keep responses very brief.", + name="LocalTestAgent", + host_packages_ok=True # Allow host execution for this test + ) + managed = LocalManagedAgent(config=config, api_key=api_key) + + # Create Agent with backend + agent = Agent(name="test", backend=managed) + + # Execute real prompt + result = await agent.execute("Say hello in exactly one sentence.") + + print(f"\nLocalManagedAgent result:\n{result}") + + # Assertions + assert isinstance(result, str) + assert len(result.strip()) > 0 + assert managed.total_input_tokens > 0 + assert managed._session_id is not None + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_multi_turn_preserves_session(): + """Test that multi-turn conversations preserve session context.""" + api_key = os.environ.get("OPENAI_API_KEY") + if not api_key: + pytest.skip("OPENAI_API_KEY not set") + + from praisonai.integrations.managed_local import LocalManagedAgent, LocalManagedConfig + from praisonaiagents import Agent + + config = LocalManagedConfig( + model="gpt-4o-mini", + system="You are a helpful assistant. Remember what users tell you.", + host_packages_ok=True + ) + managed = LocalManagedAgent(config=config, api_key=api_key) + agent = Agent(name="test", backend=managed) + + # First turn: tell agent something to remember + result1 = await agent.execute("My favorite color is blue. Please remember this.") + print(f"\nFirst turn: {result1}") + + first_session = managed._session_id + + # Second turn: ask agent to recall + result2 = await agent.execute("What is my favorite color?") + print(f"Second turn: {result2}") + + # Verify session preservation + assert managed._session_id == first_session + assert "blue" in result2.lower() + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_managed_agents_trace_events(): + """Test that managed agents emit proper context trace events.""" + api_key = os.environ.get("OPENAI_API_KEY") + if not api_key: + pytest.skip("OPENAI_API_KEY not set") + + from praisonai.integrations.managed_local import LocalManagedAgent, LocalManagedConfig + from praisonaiagents import Agent + + # Set up trace collection + sink = ContextListSink() + + with trace_context(sink=sink, session_id="test_session"): + config = LocalManagedConfig( + model="gpt-4o-mini", + system="You are a helpful assistant.", + host_packages_ok=True + ) + managed = LocalManagedAgent(config=config, api_key=api_key) + agent = Agent(name="test", backend=managed) + + result = await agent.execute("Say hi") + print(f"\nTrace test result: {result}") + + # Verify trace events were emitted + events = sink.events + print(f"\nEmitted {len(events)} trace events") + + # Should have at least agent_start and agent_end from managed level + event_types = [event.event_type.value for event in events] + print(f"Event types: {event_types}") + + assert len(events) >= 2 + assert "agent_start" in event_types + assert "agent_end" in event_types + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_managed_agent_packages_safety(): + """Test that package installation safety works in real scenarios.""" + from praisonai.integrations.managed_local import LocalManagedAgent, LocalManagedConfig + from praisonai.integrations.managed_agents import ManagedSandboxRequired + + # Test that packages without compute raises exception + config = LocalManagedConfig( + packages={"pip": ["requests"]}, + host_packages_ok=False, # Safety enabled + model="gpt-4o-mini" + ) + managed = LocalManagedAgent(config=config) + + with pytest.raises(ManagedSandboxRequired, match="packages= requires compute="): + await managed._install_packages() + + print("✓ Package safety check works correctly") \ No newline at end of file diff --git a/src/praisonai/tests/unit/integrations/test_managed_agents.py b/src/praisonai/tests/unit/integrations/test_managed_agents.py index 8f59c489a..65559392b 100644 --- a/src/praisonai/tests/unit/integrations/test_managed_agents.py +++ b/src/praisonai/tests/unit/integrations/test_managed_agents.py @@ -1,35 +1,42 @@ """ -Unit tests for the ManagedAgentIntegration feature. +Unit tests for managed agent integration. -Tests the basic functionality of the managed agent backend integration -without making actual API calls. +Tests the basic functionality of managed agent backends without making actual API calls. """ import pytest -from unittest.mock import Mock, patch +from unittest.mock import Mock, patch, AsyncMock -def test_managed_agent_integration_import(): - """Test that ManagedAgentIntegration can be imported.""" - from praisonai.integrations.managed_agents import ManagedAgentIntegration - assert ManagedAgentIntegration is not None +def test_managed_agent_import(): + """Test that ManagedAgent can be imported.""" + from praisonai.integrations.managed_agents import ManagedAgent + assert ManagedAgent is not None -def test_managed_agent_integration_creation(): - """Test creating a ManagedAgentIntegration instance.""" - with patch('praisonai.integrations.managed_agents.aiohttp', None): - from praisonai.integrations.managed_agents import ManagedAgentIntegration - - # Should not raise an exception even without aiohttp - managed = ManagedAgentIntegration( - provider="anthropic", - api_key="test_key" - ) - - assert managed.provider == "anthropic" - assert managed.api_key == "test_key" - assert managed.cli_command == "managed-anthropic" - assert not managed.is_available # Should be False without aiohttp +def test_managed_config_defaults(): + """Test ManagedConfig default values.""" + from praisonai.integrations.managed_agents import ManagedConfig + + config = ManagedConfig() + assert config.model == "claude-sonnet-4-6" + assert config.system == "You are a skilled AI assistant" + assert config.max_turns == 25 + assert isinstance(config.tools, list) + + +def test_managed_agent_creation(): + """Test creating a ManagedAgent instance.""" + from praisonai.integrations.managed_agents import ManagedAgent, ManagedConfig + + config = ManagedConfig( + model="claude-haiku-4-5", + system="Test assistant", + name="TestAgent" + ) + + agent = ManagedAgent(config=config) + assert agent._cfg == config.to_dict() def test_tool_mapping(): @@ -43,155 +50,160 @@ def test_tool_mapping(): assert mapped_tools == expected -def test_agent_backend_parameter(): - """Test that Agent class supports the backend parameter.""" - # Mock aiohttp to avoid import issues - with patch('praisonai.integrations.managed_agents.aiohttp', None): - from praisonai.integrations.managed_agents import ManagedAgentIntegration - from praisonaiagents import Agent - - # Create a managed backend instance - managed = ManagedAgentIntegration(provider="anthropic", api_key="test_key") - - # Create agent with backend parameter - agent = Agent( - name="test_agent", - instructions="You are a test agent.", - backend=managed - ) - - # Verify backend is stored - assert agent.backend == managed - - -def test_agent_backend_delegation(): - """Test that Agent properly delegates execution to backend.""" - import asyncio - from typing import Dict, Any, AsyncIterator - - class MockManagedBackend: - """Mock backend to test delegation.""" - - def __init__(self): - self.executed_prompts = [] - self.execution_kwargs = [] - - async def execute(self, prompt: str, **kwargs) -> str: - self.executed_prompts.append(prompt) - self.execution_kwargs.append(kwargs) - return f"Backend response: {prompt}" - - async def stream(self, prompt: str, **kwargs) -> AsyncIterator[Dict[str, Any]]: - self.executed_prompts.append(prompt) - self.execution_kwargs.append(kwargs) - yield { - 'type': 'agent.message', - 'content': [{'type': 'text', 'text': f"Backend streamed: {prompt}"}] - } - - # Create mock backend - mock_backend = MockManagedBackend() - - # Create agent with backend - agent = Agent( - name="test-agent", - instructions="Test agent", - backend=mock_backend +def test_local_managed_config_defaults(): + """Test LocalManagedConfig default values.""" + from praisonai.integrations.managed_local import LocalManagedConfig + + config = LocalManagedConfig() + assert config.model == "gpt-4o-mini" + assert config.system == "You are a skilled AI assistant" + assert config.host_packages_ok is False + assert config.sandbox_type == "subprocess" + + +def test_local_managed_agent_creation(): + """Test creating a LocalManagedAgent instance.""" + from praisonai.integrations.managed_local import LocalManagedAgent, LocalManagedConfig + + config = LocalManagedConfig( + model="gpt-4o-mini", + name="LocalTestAgent", + host_packages_ok=True + ) + + agent = LocalManagedAgent(config=config) + assert agent._cfg["name"] == "LocalTestAgent" + assert agent._cfg["host_packages_ok"] is True + + +@pytest.mark.asyncio +async def test_managed_backend_protocol_compliance(): + """Test that both backends implement ManagedBackendProtocol.""" + from praisonaiagents.agent.protocols import ManagedBackendProtocol + from praisonai.integrations.managed_agents import ManagedAgent + from praisonai.integrations.managed_local import LocalManagedAgent + + # Test structural typing compliance + managed_agent = ManagedAgent() + local_agent = LocalManagedAgent() + + assert isinstance(managed_agent, ManagedBackendProtocol) + assert isinstance(local_agent, ManagedBackendProtocol) + + # Test required methods exist + required_methods = ["execute", "stream", "reset_session", "reset_all"] + for method in required_methods: + assert hasattr(managed_agent, method) + assert hasattr(local_agent, method) + + +@pytest.mark.asyncio +async def test_managed_agent_factory_anthropic(): + """Test factory function for creating Anthropic managed agents.""" + from praisonai.integrations.managed_agents import ManagedAgent, create_managed_agent + + # Test explicit creation + agent = create_managed_agent("anthropic", api_key="test_key") + assert isinstance(agent, ManagedAgent) + + # Test env-based creation + with patch.dict('os.environ', {'ANTHROPIC_API_KEY': 'env_key'}): + agent = create_managed_agent("anthropic") + assert isinstance(agent, ManagedAgent) + + +@pytest.mark.asyncio +async def test_managed_agent_factory_local(): + """Test factory function for creating local managed agents.""" + from praisonai.integrations.managed_local import LocalManagedAgent + from praisonai.integrations.managed_agents import create_managed_agent + + agent = create_managed_agent("local") + assert isinstance(agent, LocalManagedAgent) + + +def test_managed_sandbox_required_exception(): + """Test that ManagedSandboxRequired exception exists.""" + from praisonai.integrations.managed_agents import ManagedSandboxRequired + + # Test exception can be raised + with pytest.raises(ManagedSandboxRequired): + raise ManagedSandboxRequired("Test message") + + +@pytest.mark.asyncio +async def test_local_managed_packages_safety(): + """Test safety check for packages without compute.""" + from praisonai.integrations.managed_local import LocalManagedAgent, LocalManagedConfig + from praisonai.integrations.managed_agents import ManagedSandboxRequired + + config = LocalManagedConfig( + packages={"pip": ["requests"]}, + host_packages_ok=False # Safety enabled ) - # Test run() delegation - result = agent.run("Test run prompt") - assert result == "Backend response: Test run prompt" - assert len(mock_backend.executed_prompts) == 1 - assert mock_backend.executed_prompts[0] == "Test run prompt" - - # Test start() delegation - result = agent.start("Test start prompt") - assert result == "Backend response: Test start prompt" - assert len(mock_backend.executed_prompts) == 2 - assert mock_backend.executed_prompts[1] == "Test start prompt" - - # Test chat() delegation - result = agent.chat("Test chat prompt") - assert result == "Backend response: Test chat prompt" - assert len(mock_backend.executed_prompts) == 3 - assert mock_backend.executed_prompts[2] == "Test chat prompt" - - # Test that Agent without backend doesn't delegate - local_agent = Agent(name="local", instructions="Local agent") - assert not hasattr(local_agent, 'backend') or local_agent.backend is None - - -def test_managed_backend_protocol(): - """Test the ManagedBackendProtocol interface.""" - from praisonai.integrations.managed_agents import ManagedBackendProtocol - - # Test that the protocol has the expected abstract methods - expected_methods = [ - 'create_agent', - 'create_environment', - 'create_session', - 'send_message', - 'stream_events', - 'collect_response' - ] - - for method_name in expected_methods: - assert hasattr(ManagedBackendProtocol, method_name) - - -@patch('praisonai.integrations.managed_agents.aiohttp') -def test_anthropic_provider_creation(mock_aiohttp): - """Test creating an Anthropic provider.""" - from praisonai.integrations.managed_agents import ManagedAgentIntegration - - # Mock aiohttp to be available - mock_aiohttp.__bool__ = lambda: True - - managed = ManagedAgentIntegration( - provider="anthropic", - api_key="test_key" + agent = LocalManagedAgent(config=config) + + # Should raise exception when trying to install packages without compute + with pytest.raises(ManagedSandboxRequired, match="packages= requires compute="): + await agent._install_packages() + + +@pytest.mark.asyncio +async def test_local_managed_packages_host_allowed(): + """Test that host packages work when explicitly allowed.""" + from praisonai.integrations.managed_local import LocalManagedAgent, LocalManagedConfig + + config = LocalManagedConfig( + packages={"pip": ["requests"]}, + host_packages_ok=True # Explicitly allow unsafe operation ) - assert managed.provider == "anthropic" - assert managed.api_key == "test_key" - assert managed.backend is not None - assert managed.is_available - - -def test_unsupported_provider(): - """Test creating integration with unsupported provider.""" - with patch('praisonai.integrations.managed_agents.aiohttp'): - from praisonai.integrations.managed_agents import ManagedAgentIntegration - - with pytest.raises(ValueError, match="Unsupported provider: unknown"): - ManagedAgentIntegration(provider="unknown", api_key="test_key") - - -def test_session_caching(): - """Test that session IDs are cached correctly (regression test for #357 bug).""" - with patch('praisonai.integrations.managed_agents.aiohttp'): - from praisonai.integrations.managed_agents import ManagedAgentIntegration - - managed = ManagedAgentIntegration(provider="anthropic", api_key="test_key") - - # Simulate adding session to cache - managed._session_cache["test_session"] = "session_id_123" - - # Verify the correct session ID is cached (not the key) - assert managed._session_cache["test_session"] == "session_id_123" - assert managed._session_cache["test_session"] != "test_session" - - -def test_api_key_persistence(): - """Test that API keys from environment are persisted (regression test).""" - with patch('praisonai.integrations.managed_agents.aiohttp'), \ - patch('os.getenv', return_value="env_api_key"): - - from praisonai.integrations.managed_agents import ManagedAgentIntegration - - # Create without explicit API key to trigger env lookup - managed = ManagedAgentIntegration(provider="anthropic", api_key=None) - - # Should have stored the env key back to api_key - assert managed.api_key == "env_api_key" \ No newline at end of file + agent = LocalManagedAgent(config=config) + + # Should not raise exception - mock subprocess to avoid actual installs + with patch('subprocess.run') as mock_run: + await agent._install_packages() + # Should have attempted pip install + assert mock_run.called + + +@pytest.mark.asyncio +async def test_local_managed_agent_backend_delegation(): + """Test that LocalManagedAgent properly implements the backend protocol.""" + from praisonai.integrations.managed_local import LocalManagedAgent, LocalManagedConfig + from praisonaiagents import Agent + + # Create managed backend + config = LocalManagedConfig(model="gpt-4o-mini", name="TestAgent") + managed = LocalManagedAgent(config=config) + + # Create Agent with backend + agent = Agent(name="test", backend=managed) + + # Verify backend is stored + assert hasattr(agent, '_backend') + assert agent._backend == managed + + +def test_retrieve_session_schemas(): + """Test that retrieve_session returns consistent schema.""" + from praisonai.integrations.managed_agents import ManagedAgent + from praisonai.integrations.managed_local import LocalManagedAgent + + # Both should return similar dict structure + managed = ManagedAgent() + local = LocalManagedAgent() + + # Mock session data + managed.session_id = "test_session" + local._session_id = "test_session" + + managed_info = managed.retrieve_session() + local_info = local.retrieve_session() + + # Both should have consistent keys + required_keys = ["id", "status"] + for key in required_keys: + assert key in managed_info + assert key in local_info \ No newline at end of file