diff --git a/src/praisonai-ts/src/cli/features/external-agents.ts b/src/praisonai-ts/src/cli/features/external-agents.ts index 627bb8f4c..ff0ab1de4 100644 --- a/src/praisonai-ts/src/cli/features/external-agents.ts +++ b/src/praisonai-ts/src/cli/features/external-agents.ts @@ -19,6 +19,11 @@ export interface ExternalAgentResult { duration: number; } +export type StreamEvent = + | { type: 'text'; content: string } + | { type: 'json'; data: unknown } + | { type: 'error'; error: string }; + /** * Base class for external agent integrations */ @@ -42,6 +47,11 @@ export abstract class BaseExternalAgent { */ abstract execute(prompt: string): Promise; + /** + * Stream output from the external agent + */ + abstract stream(prompt: string): AsyncGenerator; + /** * Get the agent name */ @@ -97,6 +107,65 @@ export abstract class BaseExternalAgent { }); } + /** + * Stream command output line by line + */ + protected async *streamCommand(args: string[]): AsyncGenerator { + const { spawn } = await import('child_process'); + + const proc = spawn(this.config.command, args, { + cwd: this.config.cwd || process.cwd(), + env: { ...process.env, ...this.config.env }, + timeout: this.config.timeout, + stdio: ['pipe', 'pipe', 'pipe'] + }); + + if (!proc.stdout) { + throw new Error('Failed to create stdout stream'); + } + + let stderr = ''; + proc.stderr?.on('data', (chunk) => { + stderr += chunk.toString(); + }); + + const exit = new Promise((resolve, reject) => { + proc.once('error', reject); + proc.once('close', resolve); + }); + + const readline = await import('readline'); + const rl = readline.createInterface({ + input: proc.stdout, + crlfDelay: Infinity + }); + + try { + for await (const line of rl) { + if (line.trim()) { + // Try to parse as JSON first + try { + const event = JSON.parse(line); + yield { type: 'json', data: event }; + } catch { + // If not JSON, treat as text + yield { type: 'text', content: line }; + } + } + } + + const exitCode = await exit; + if (exitCode !== 0) { + throw new Error(stderr || `${this.config.command} exited with code ${exitCode}`); + } + } finally { + rl.close(); + if (!proc.killed) { + proc.kill(); + } + } + } + /** * Check if a command exists */ @@ -131,6 +200,10 @@ export class ClaudeCodeAgent extends BaseExternalAgent { return this.runCommand(['--print', prompt]); } + async *stream(prompt: string): AsyncGenerator { + yield* this.streamCommand(['--print', '--output-format', 'stream-json', prompt]); + } + async executeWithSession(prompt: string, sessionId?: string): Promise { const args = ['--print']; if (sessionId) { @@ -163,6 +236,10 @@ export class GeminiCliAgent extends BaseExternalAgent { async execute(prompt: string): Promise { return this.runCommand(['-m', this.model, prompt]); } + + async *stream(prompt: string): AsyncGenerator { + yield* this.streamCommand(['-m', this.model, '--json', prompt]); + } } /** @@ -184,6 +261,10 @@ export class CodexCliAgent extends BaseExternalAgent { async execute(prompt: string): Promise { return this.runCommand(['exec', '--full-auto', prompt]); } + + async *stream(prompt: string): AsyncGenerator { + yield* this.streamCommand(['exec', '--full-auto', '--json', prompt]); + } } /** @@ -205,6 +286,19 @@ export class AiderAgent extends BaseExternalAgent { async execute(prompt: string): Promise { return this.runCommand(['--message', prompt, '--yes']); } + + async *stream(prompt: string): AsyncGenerator { + // Aider doesn't support JSON streaming, so just yield text events + const result = await this.execute(prompt); + if (!result.success) { + throw new Error(result.error || 'Aider execution failed'); + } + for (const line of result.output.split('\n')) { + if (line.trim()) { + yield { type: 'text', content: line }; + } + } + } } /** @@ -231,6 +325,16 @@ export class GenericExternalAgent extends BaseExternalAgent { } return this.runCommand(args); } + + async *stream(prompt: string): AsyncGenerator { + const args = [...(this.config.args || [])]; + if (this.promptArg) { + args.push(this.promptArg, prompt); + } else { + args.push(prompt); + } + yield* this.streamCommand(args); + } } /** diff --git a/src/praisonai/praisonai/integrations/__init__.py b/src/praisonai/praisonai/integrations/__init__.py index fdd8ce7f6..ac2c391b2 100644 --- a/src/praisonai/praisonai/integrations/__init__.py +++ b/src/praisonai/praisonai/integrations/__init__.py @@ -41,6 +41,10 @@ 'ManagedAgentIntegration', # backward compat alias 'ManagedBackendConfig', # backward compat alias 'get_available_integrations', + 'ExternalAgentRegistry', + 'get_registry', + 'register_integration', + 'create_integration', ] @@ -79,4 +83,16 @@ def __getattr__(name): elif name == 'get_available_integrations': from .base import get_available_integrations return get_available_integrations + elif name == 'ExternalAgentRegistry': + from .registry import ExternalAgentRegistry + return ExternalAgentRegistry + elif name == 'get_registry': + from .registry import get_registry + return get_registry + elif name == 'register_integration': + from .registry import register_integration + return register_integration + elif name == 'create_integration': + from .registry import create_integration + return create_integration raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/praisonai/praisonai/integrations/base.py b/src/praisonai/praisonai/integrations/base.py index 9f6c69e77..5688fee8e 100644 --- a/src/praisonai/praisonai/integrations/base.py +++ b/src/praisonai/praisonai/integrations/base.py @@ -285,19 +285,47 @@ def get_available_integrations() -> Dict[str, bool]: """ Get a dictionary of all integrations and their availability status. + Backward compatibility wrapper. Use ExternalAgentRegistry for new code. + Returns: dict: Mapping of integration name to availability (True/False) """ - from .claude_code import ClaudeCodeIntegration - from .gemini_cli import GeminiCLIIntegration - from .codex_cli import CodexCLIIntegration - from .cursor_cli import CursorCLIIntegration - - integrations = { - 'claude': ClaudeCodeIntegration(), - 'gemini': GeminiCLIIntegration(), - 'codex': CodexCLIIntegration(), - 'cursor': CursorCLIIntegration(), - } + # Import here to avoid circular imports + try: + from .registry import get_registry + import asyncio + + registry = get_registry() + + # Handle async call in sync context + try: + # Try to get existing event loop + loop = asyncio.get_event_loop() + if loop.is_running(): + # We're in an async context, use create_task + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(asyncio.run, registry.get_available()) + return future.result() + else: + # No running loop, safe to use asyncio.run + return asyncio.run(registry.get_available()) + except RuntimeError: + # No event loop, safe to use asyncio.run + return asyncio.run(registry.get_available()) - return {name: integration.is_available for name, integration in integrations.items()} + except ImportError: + # Fallback to original implementation + from .claude_code import ClaudeCodeIntegration + from .gemini_cli import GeminiCLIIntegration + from .codex_cli import CodexCLIIntegration + from .cursor_cli import CursorCLIIntegration + + integrations = { + 'claude': ClaudeCodeIntegration(), + 'gemini': GeminiCLIIntegration(), + 'codex': CodexCLIIntegration(), + 'cursor': CursorCLIIntegration(), + } + + return {name: integration.is_available for name, integration in integrations.items()} diff --git a/src/praisonai/praisonai/integrations/codex_cli.py b/src/praisonai/praisonai/integrations/codex_cli.py index 27de96b79..8eea5697d 100644 --- a/src/praisonai/praisonai/integrations/codex_cli.py +++ b/src/praisonai/praisonai/integrations/codex_cli.py @@ -45,15 +45,20 @@ class CodexCLIIntegration(BaseCLIIntegration): output_schema: Path to JSON schema for structured output """ + VALID_APPROVAL_MODES = {"suggest", "auto-edit", "full-auto"} + def __init__( self, workspace: str = ".", timeout: int = 300, - full_auto: bool = False, + approval_mode: str = "suggest", # suggest, auto-edit, full-auto sandbox: str = "default", json_output: bool = False, output_schema: Optional[str] = None, output_file: Optional[str] = None, + provider: Optional[str] = None, # OpenAI, OpenRouter, Azure, Gemini, etc. + # Backward compatibility + full_auto: Optional[bool] = None, ): """ Initialize Codex CLI integration. @@ -61,19 +66,34 @@ def __init__( Args: workspace: Working directory for CLI execution timeout: Timeout in seconds for CLI execution - full_auto: Whether to allow file modifications (--full-auto) + approval_mode: Approval mode ("suggest", "auto-edit", "full-auto") sandbox: Sandbox mode ("default", "danger-full-access") json_output: Whether to use JSON streaming output (--json) output_schema: Path to JSON schema for structured output output_file: Path to save the final output (-o) + provider: Model provider ("openai", "openrouter", "azure", "gemini", "ollama", etc.) """ super().__init__(workspace=workspace, timeout=timeout) - self.full_auto = full_auto + # Handle backward compatibility + if full_auto is not None: + approval_mode = "full-auto" if full_auto else "suggest" + + if approval_mode not in self.VALID_APPROVAL_MODES: + raise ValueError( + f"Invalid approval_mode: '{approval_mode}'. " + f"Must be one of: {', '.join(sorted(self.VALID_APPROVAL_MODES))}" + ) + + self.approval_mode = approval_mode self.sandbox = sandbox self.json_output = json_output self.output_schema = output_schema self.output_file = output_file + self.provider = provider + + # Backward compatibility + self.full_auto = approval_mode == "full-auto" @property def cli_command(self) -> str: @@ -99,9 +119,12 @@ def _build_command(self, task: str, **options) -> List[str]: # Add task cmd.append(task) - # Add full auto flag if enabled - if self.full_auto: + # Add approval mode + if self.approval_mode == "full-auto": cmd.append("--full-auto") + elif self.approval_mode == "auto-edit": + cmd.append("--auto-edit") + # suggest is the default, no flag needed # Add sandbox mode if not default if self.sandbox and self.sandbox != "default": @@ -119,6 +142,10 @@ def _build_command(self, task: str, **options) -> List[str]: if self.output_file: cmd.extend(["-o", self.output_file]) + # Add provider if specified + if self.provider: + cmd.extend(["--provider", self.provider]) + return cmd async def execute(self, prompt: str, **options) -> str: diff --git a/src/praisonai/praisonai/integrations/registry.py b/src/praisonai/praisonai/integrations/registry.py new file mode 100644 index 000000000..fb5f52426 --- /dev/null +++ b/src/praisonai/praisonai/integrations/registry.py @@ -0,0 +1,256 @@ +""" +External Agent Registry for PraisonAI. + +Provides a registry pattern for managing external CLI integrations, +allowing dynamic registration and discovery of external agents. + +Features: +- Dynamic registration of custom integrations +- Availability checking for all registered integrations +- Factory pattern for creating integrations +- Backward compatibility with existing get_available_integrations() + +Usage: + from praisonai.integrations.registry import ExternalAgentRegistry + + # Get singleton registry + registry = ExternalAgentRegistry.get_instance() + + # Register custom integration + registry.register('my-agent', MyCustomIntegration) + + # Create integration + agent = registry.create('claude', workspace="/path/to/project") + + # List available integrations + available = await registry.get_available() +""" + +from typing import Dict, Type, Optional, Any, List + +from .base import BaseCLIIntegration + + +class ExternalAgentRegistry: + """ + Registry for external CLI integrations. + + Provides centralized management of external agent integrations + with support for dynamic registration and availability checking. + + Uses singleton pattern to ensure consistent state across the application. + """ + + _instance: Optional['ExternalAgentRegistry'] = None + + def __init__(self): + """Initialize the registry with built-in integrations.""" + self._integrations: Dict[str, Type[BaseCLIIntegration]] = {} + self._register_builtin_integrations() + + @classmethod + def get_instance(cls) -> 'ExternalAgentRegistry': + """ + Get the singleton registry instance. + + Returns: + ExternalAgentRegistry: The singleton registry + """ + if cls._instance is None: + cls._instance = cls() + return cls._instance + + def _register_builtin_integrations(self): + """Register built-in integrations.""" + # Lazy imports to avoid circular dependencies and performance impact + try: + from .claude_code import ClaudeCodeIntegration + self._integrations['claude'] = ClaudeCodeIntegration + except ImportError: + pass + + try: + from .gemini_cli import GeminiCLIIntegration + self._integrations['gemini'] = GeminiCLIIntegration + except ImportError: + pass + + try: + from .codex_cli import CodexCLIIntegration + self._integrations['codex'] = CodexCLIIntegration + except ImportError: + pass + + try: + from .cursor_cli import CursorCLIIntegration + self._integrations['cursor'] = CursorCLIIntegration + except ImportError: + pass + + def register(self, name: str, integration_class: Type[BaseCLIIntegration]) -> None: + """ + Register a new external agent integration. + + Args: + name: Unique name for the integration + integration_class: The integration class (must inherit from BaseCLIIntegration) + + Raises: + ValueError: If integration_class does not inherit from BaseCLIIntegration + """ + if not issubclass(integration_class, BaseCLIIntegration): + raise ValueError( + f"Integration class {integration_class.__name__} must inherit from BaseCLIIntegration" + ) + + self._integrations[name] = integration_class + + def unregister(self, name: str) -> bool: + """ + Unregister an external agent integration. + + Args: + name: Name of the integration to unregister + + Returns: + bool: True if the integration was found and removed, False otherwise + """ + if name in self._integrations: + del self._integrations[name] + return True + return False + + def create(self, name: str, **kwargs: Any) -> Optional[BaseCLIIntegration]: + """ + Create an instance of the specified integration. + + Args: + name: Name of the integration + **kwargs: Arguments to pass to the integration constructor + + Returns: + BaseCLIIntegration: Instance of the integration, or None if not found + """ + integration_class = self._integrations.get(name) + if integration_class is None: + return None + + return integration_class(**kwargs) + + def list_registered(self) -> List[str]: + """ + List all registered integration names. + + Returns: + List[str]: List of registered integration names + """ + return list(self._integrations.keys()) + + async def get_available(self) -> Dict[str, bool]: + """ + Get availability status of all registered integrations. + + Returns: + Dict[str, bool]: Mapping of integration name to availability status + """ + import inspect + availability = {} + + for name, integration_class in self._integrations.items(): + try: + # Check if constructor requires parameters beyond self + sig = inspect.signature(integration_class.__init__) + params = [p for p_name, p in sig.parameters.items() if p_name != 'self' and p.default is inspect.Parameter.empty] + + if params: + # Constructor requires arguments, can't instantiate for availability check + # Skip this integration rather than marking it unavailable + continue + + # Create a temporary instance to check availability + instance = integration_class() + availability[name] = instance.is_available + except Exception as e: + # Log real exceptions rather than hiding them + import logging + logging.warning(f"Failed to check availability for {name}: {e}") + availability[name] = False + + return availability + + async def get_available_names(self) -> List[str]: + """ + Get names of all available integrations. + + Returns: + List[str]: List of available integration names + """ + availability = await self.get_available() + return [name for name, available in availability.items() if available] + + +# Factory functions for convenient access +def get_registry() -> ExternalAgentRegistry: + """ + Get the singleton external agent registry. + + Returns: + ExternalAgentRegistry: The singleton registry instance + """ + return ExternalAgentRegistry.get_instance() + + +def register_integration(name: str, integration_class: Type[BaseCLIIntegration]) -> None: + """ + Register a new external agent integration. + + Args: + name: Unique name for the integration + integration_class: The integration class (must inherit from BaseCLIIntegration) + """ + registry = get_registry() + registry.register(name, integration_class) + + +def create_integration(name: str, **kwargs: Any) -> Optional[BaseCLIIntegration]: + """ + Create an instance of the specified integration. + + Args: + name: Name of the integration + **kwargs: Arguments to pass to the integration constructor + + Returns: + BaseCLIIntegration: Instance of the integration, or None if not found + """ + registry = get_registry() + return registry.create(name, **kwargs) + + +def get_available_integrations() -> Dict[str, bool]: + """ + Get availability status of all registered integrations. + + Backward compatibility wrapper for the original synchronous function. + + Returns: + Dict[str, bool]: Mapping of integration name to availability status + """ + import asyncio + registry = get_registry() + + try: + # Try to get existing event loop + loop = asyncio.get_event_loop() + if loop.is_running(): + # We're in an async context, use create_task via thread pool + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(asyncio.run, registry.get_available()) + return future.result() + else: + # No running loop, safe to use asyncio.run + return asyncio.run(registry.get_available()) + except RuntimeError: + # No event loop, safe to use asyncio.run + return asyncio.run(registry.get_available()) \ No newline at end of file diff --git a/src/praisonai/tests/unit/integrations/test_codex_cli.py b/src/praisonai/tests/unit/integrations/test_codex_cli.py index 67a5fbc93..405e24d3c 100644 --- a/src/praisonai/tests/unit/integrations/test_codex_cli.py +++ b/src/praisonai/tests/unit/integrations/test_codex_cli.py @@ -9,6 +9,7 @@ from unittest.mock import patch, AsyncMock import sys import os +import inspect sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..')) @@ -49,6 +50,22 @@ def test_full_auto_option(self): integration = CodexCLIIntegration(full_auto=True) assert integration.full_auto is True + assert integration.approval_mode == "full-auto" + + def test_approval_mode_option(self): + """Test approval_mode can be set explicitly.""" + from praisonai.integrations.codex_cli import CodexCLIIntegration + + integration = CodexCLIIntegration(approval_mode="auto-edit") + assert integration.approval_mode == "auto-edit" + assert integration.full_auto is False + + def test_invalid_approval_mode_raises(self): + """Test invalid approval_mode raises ValueError.""" + from praisonai.integrations.codex_cli import CodexCLIIntegration + + with pytest.raises(ValueError, match="Invalid approval_mode"): + CodexCLIIntegration(approval_mode="invalid") def test_sandbox_option(self): """Test sandbox option can be set.""" @@ -85,6 +102,26 @@ def test_build_command_with_json(self): cmd = integration._build_command("Fix the bug") assert "--json" in cmd + + def test_build_command_with_auto_edit(self): + """Test building command with auto-edit approval mode.""" + from praisonai.integrations.codex_cli import CodexCLIIntegration + + integration = CodexCLIIntegration(approval_mode="auto-edit") + cmd = integration._build_command("Fix the bug") + + assert "--auto-edit" in cmd + assert "--full-auto" not in cmd + + def test_build_command_with_provider(self): + """Test building command with provider option.""" + from praisonai.integrations.codex_cli import CodexCLIIntegration + + integration = CodexCLIIntegration(provider="openrouter") + cmd = integration._build_command("Fix the bug") + + assert "--provider" in cmd + assert "openrouter" in cmd def test_build_command_with_sandbox(self): """Test building command with sandbox.""" @@ -152,3 +189,15 @@ def test_build_command_with_output_schema(self): assert "--output-schema" in cmd assert "/path/to/schema.json" in cmd + + +class TestIntegrationExports: + """Tests for integration module exports.""" + + def test_get_available_integrations_export_is_sync(self): + """Test exported get_available_integrations remains synchronous.""" + from praisonai.integrations import get_available_integrations + + result = get_available_integrations() + assert inspect.iscoroutine(result) is False + assert isinstance(result, dict)