Skip to content
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ Here are some examples:
cforge tools list [--mcp-server-id ID] [--json]
cforge tools get <tool-id>
cforge tools create [file.json]
cforge tools execute <tool-id> # Interactive schema prompt
cforge tools execute <tool-id> --data args.json # Use JSON args file
cforge tools toggle <tool-id>

# Resources
Expand Down
61 changes: 30 additions & 31 deletions cforge/commands/resources/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
"""

# Standard
Comment thread
MatthewGrigsby marked this conversation as resolved.
from enum import Enum
from typing import Any, Dict, Optional

# Third-Party
import typer

# First-Party
from cforge.common import (
CaseInsensitiveEnum,
AuthenticationError,
CLIError,
get_console,
Expand All @@ -34,49 +34,47 @@
)


class _CaseInsensitiveEnum(str, Enum):
"""Enum that supports case-insensitive parsing for CLI options."""

@classmethod
def _missing_(cls, value: object) -> Optional["_CaseInsensitiveEnum"]:
"""Resolve unknown values by matching enum values case-insensitively.

Typer converts CLI strings into Enum members. Implementing `_missing_`
allows `--mode EnFoRcE` to resolve to `PluginMode.ENFORCE`, while still
rejecting unknown values.
"""
if not isinstance(value, str):
return None
value_folded = value.casefold()
for member in cls:
if member.value.casefold() == value_folded:
return member
return None


class PluginMode(_CaseInsensitiveEnum):
class PluginMode(CaseInsensitiveEnum):
"""Valid plugin mode filters supported by the gateway admin API."""

ENFORCE = "enforce"
PERMISSIVE = "permissive"
DISABLED = "disabled"


def _handle_plugins_exception(exception: Exception) -> None:
def _parse_plugin_mode(mode: Optional[str]) -> Optional[PluginMode]:
"""Parse plugin mode with case-insensitive enum matching."""
if mode is None:
return None
try:
return PluginMode(mode)
except ValueError as exc:
choices = ", ".join(member.value for member in PluginMode)
raise CLIError(f"Invalid value for '--mode': {mode!r}. Must be one of: {choices}.") from exc


def _handle_plugins_exception(exception: Exception, operation: str, plugin_name: Optional[str] = None) -> None:
"""Provide plugin-specific hints and raise a CLI error."""
console = get_console()

if isinstance(exception, AuthenticationError):
console.print("[yellow]Access denied. Requires admin.plugins permission.[/yellow]")
elif isinstance(exception, CLIError) and "(404)" in str(exception):
console.print("[yellow]Admin plugin API unavailable. Ensure MCPGATEWAY_ADMIN_API_ENABLED=true and gateway version supports /admin/plugins.[/yellow]")
elif isinstance(exception, CLIError):
error_str = str(exception)
if "(404)" in error_str:
error_str_folded = error_str.casefold()
if operation == "get" and "plugin" in error_str_folded and "not found" in error_str_folded:
plugin_label = plugin_name or "requested plugin"
console.print(f"[yellow]Plugin not found: {plugin_label}[/yellow]")
else:
console.print("[yellow]Admin plugin API unavailable. Ensure MCPGATEWAY_ADMIN_API_ENABLED=true and gateway version supports /admin/plugins.[/yellow]")

handle_exception(exception)


def plugins_list(
search: Optional[str] = typer.Option(None, "--search", help="Search by plugin name, description, or author"),
mode: Optional[PluginMode] = typer.Option(None, "--mode", help="Filter by mode"),
mode: Optional[str] = typer.Option(None, "--mode", help="Filter by mode"),
Comment thread
gabe-l-hart marked this conversation as resolved.
hook: Optional[str] = typer.Option(None, "--hook", help="Filter by hook type"),
tag: Optional[str] = typer.Option(None, "--tag", help="Filter by plugin tag"),
json_output: bool = typer.Option(False, "--json", help="Output as JSON"),
Expand All @@ -88,8 +86,9 @@ def plugins_list(
params: Dict[str, Any] = {}
if search:
params["search"] = search
if mode:
params["mode"] = mode.value
parsed_mode = _parse_plugin_mode(mode)
if parsed_mode:
params["mode"] = parsed_mode.value
if hook:
params["hook"] = hook
if tag:
Expand All @@ -108,7 +107,7 @@ def plugins_list(
console.print("[yellow]No plugins found[/yellow]")

except Exception as e:
_handle_plugins_exception(e)
_handle_plugins_exception(e, operation="list")


def plugins_get(
Expand All @@ -120,7 +119,7 @@ def plugins_get(
print_json(result, f"Plugin {name}")

except Exception as e:
_handle_plugins_exception(e)
_handle_plugins_exception(e, operation="get", plugin_name=name)


def plugins_stats() -> None:
Expand All @@ -130,4 +129,4 @@ def plugins_stats() -> None:
print_json(result, "Plugin Statistics")

except Exception as e:
_handle_plugins_exception(e)
_handle_plugins_exception(e, operation="stats")
70 changes: 70 additions & 0 deletions cforge/commands/resources/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

# First-Party
from cforge.common import (
CLIError,
get_console,
handle_exception,
make_authenticated_request,
print_json,
print_table,
prompt_for_json_schema,
prompt_for_schema,
)
from mcpgateway.schemas import ToolCreate, ToolUpdate
Expand Down Expand Up @@ -179,3 +181,71 @@ def tools_toggle(

except Exception as e:
handle_exception(e)


def tools_execute(
tool_id: str = typer.Argument(..., help="Tool ID"),
data_file: Optional[Path] = typer.Option(None, "--data", help="JSON file containing tool arguments"),
) -> None:
"""Execute a tool by ID using optional dynamic schema prompting."""
console = get_console()

try:
tool_result = make_authenticated_request("GET", f"/tools/{tool_id}")
assert isinstance(tool_result, dict)

tool_name = tool_result.get("name")
if not isinstance(tool_name, str) or not tool_name:
raise CLIError(f"Tool '{tool_id}' does not have a valid name")

raw_schema = tool_result.get("inputSchema")
if raw_schema is None:
raw_schema = tool_result.get("input_schema")
if raw_schema is None:
raw_schema = {"type": "object", "properties": {}}
Comment thread
MatthewGrigsby marked this conversation as resolved.
Outdated

if isinstance(raw_schema, str):
Comment thread
MatthewGrigsby marked this conversation as resolved.
Outdated
input_schema = json.loads(raw_schema)
elif isinstance(raw_schema, dict):
input_schema = raw_schema
else:
raise CLIError("Tool input schema must be a JSON object")

if not isinstance(input_schema, dict):
raise CLIError("Tool input schema must be a JSON object")
if not input_schema:
input_schema = {"type": "object", "properties": {}}

data: Dict[str, Any] = {}
if data_file:
Comment thread
MatthewGrigsby marked this conversation as resolved.
Outdated
if not data_file.exists():
console.print(f"[red]File not found: {data_file}[/red]")
raise typer.Exit(1)
file_data = json.loads(data_file.read_text())
if not isinstance(file_data, dict):
raise CLIError("Data file must contain a JSON object")
data = prompt_for_json_schema(input_schema, prefilled=file_data, prompt_optional=False)
Comment thread
MatthewGrigsby marked this conversation as resolved.
Outdated
else:
data = prompt_for_json_schema(input_schema)

rpc_payload: Dict[str, Any] = {"jsonrpc": "2.0", "id": f"cforge-tools-{tool_id}", "method": "tools/call", "params": {"name": tool_name, "arguments": data}}
rpc_result = make_authenticated_request("POST", "/rpc", json_data=rpc_payload)

if isinstance(rpc_result, dict) and "error" in rpc_result:
error = rpc_result["error"]
if isinstance(error, dict):
err_message = error.get("message", "Unknown error")
err_code = error.get("code")
if err_code is not None:
raise CLIError(f"Tool execution failed ({err_code}): {err_message}")
raise CLIError(f"Tool execution failed: {err_message}")
raise CLIError(f"Tool execution failed: {error}")

console.print("[green]✓ Tool executed successfully![/green]")
if isinstance(rpc_result, dict) and "result" in rpc_result:
print_json(rpc_result["result"], "Tool Result")
else:
print_json(rpc_result, "Tool Result")

except Exception as e:
handle_exception(e)
Loading