diff --git a/.secrets.baseline b/.secrets.baseline index 2298d53a50..9bf68977e5 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -3,7 +3,7 @@ "files": "(?x)( package-lock\\.json$ |Cargo\\.lock$ |uv\\.lock$ |go\\.sum$ |mcpgateway/sri_hashes\\.json$ )|^.secrets.baseline$", "lines": null }, - "generated_at": "2026-06-26T14:48:00Z", + "generated_at": "2026-06-26T15:26:45Z", "plugins_used": [ { "name": "AWSKeyDetector" @@ -274,7 +274,7 @@ "hashed_secret": "b4673e578b9b30fe8bba1b555b7b59883444c697", "is_secret": false, "is_verified": false, - "line_number": 1590, + "line_number": 1594, "type": "Secret Keyword", "verified_result": null }, @@ -282,7 +282,7 @@ "hashed_secret": "4a0a2df96d4c9a13a282268cab33ac4b8cbb2c72", "is_secret": false, "is_verified": false, - "line_number": 1678, + "line_number": 1682, "type": "Secret Keyword", "verified_result": null }, @@ -290,7 +290,7 @@ "hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8", "is_secret": false, "is_verified": false, - "line_number": 2028, + "line_number": 2032, "type": "Basic Auth Credentials", "verified_result": null }, @@ -298,7 +298,7 @@ "hashed_secret": "fa9beb99e4029ad5a6615399e7bbae21356086b3", "is_secret": false, "is_verified": false, - "line_number": 3394, + "line_number": 3398, "type": "Basic Auth Credentials", "verified_result": null }, @@ -306,7 +306,7 @@ "hashed_secret": "fa9beb99e4029ad5a6615399e7bbae21356086b3", "is_secret": false, "is_verified": false, - "line_number": 3485, + "line_number": 3489, "type": "Secret Keyword", "verified_result": null }, @@ -314,7 +314,7 @@ "hashed_secret": "ac371b6dcce28a86c90d12bc57d946a800eebf17", "is_secret": false, "is_verified": false, - "line_number": 3528, + "line_number": 3532, "type": "Secret Keyword", "verified_result": null }, @@ -322,7 +322,7 @@ "hashed_secret": "0b6ec68df700dec4dcd64babd0eda1edccddace1", "is_secret": false, "is_verified": false, - "line_number": 3533, + "line_number": 3537, "type": "Secret Keyword", "verified_result": null }, @@ -330,7 +330,7 @@ "hashed_secret": "4ad6f0082ee224001beb3ca5c3e81c8ceea5ed86", "is_secret": false, "is_verified": false, - "line_number": 3538, + "line_number": 3542, "type": "Secret Keyword", "verified_result": null } @@ -4215,24 +4215,6 @@ "verified_result": null } ], - "mcpgateway/alembic/versions/e28cd485ad3c_remove_global_unique_constraints_for_.py": [ - { - "hashed_secret": "64589d1d5776faad3fc219e30c877e7ab8abb551", - "is_secret": false, - "is_verified": false, - "line_number": 33, - "type": "Hex High Entropy String", - "verified_result": null - }, - { - "hashed_secret": "5dbedebc3423bc4336c49a36bd75fe7d370c5176", - "is_secret": false, - "is_verified": false, - "line_number": 34, - "type": "Hex High Entropy String", - "verified_result": null - } - ], "mcpgateway/common/validators.py": [ { "hashed_secret": "c377074d6473f35a91001981355da793dc808ffd", @@ -4274,7 +4256,7 @@ "hashed_secret": "d3ecb0d890368d7659ee54010045b835dacb8efe", "is_secret": false, "is_verified": false, - "line_number": 738, + "line_number": 739, "type": "Secret Keyword", "verified_result": null } diff --git a/CHANGELOG.md b/CHANGELOG.md index b65d9263fc..842ba2803a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ ## [Unreleased] +### Added + +- New REST API endpoint `POST /v1/tools/generate-schemas-from-openapi` for generating MCP tool schemas from OpenAPI specifications without admin UI dependencies (#5142) + ### Deprecation Notice - Rust MCP runtime sidecar, Rust A2A runtime sidecar, and ValidationMiddleware are deprecated as of 2026-06-11 and will sunset on 2026-07-07. Use the Python MCP transport path, the Python A2A invocation path, and endpoint-level Pydantic or protocol-specific validation instead. See [Deprecations](docs/docs/deprecations.md). diff --git a/mcpgateway/alembic/versions/e28cd485ad3c_remove_global_unique_constraints_for_.py b/mcpgateway/alembic/versions/e28cd485ad3c_remove_global_unique_constraints_for_.py index 0cd84073db..1243e00755 100644 --- a/mcpgateway/alembic/versions/e28cd485ad3c_remove_global_unique_constraints_for_.py +++ b/mcpgateway/alembic/versions/e28cd485ad3c_remove_global_unique_constraints_for_.py @@ -30,8 +30,8 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision: str = "e28cd485ad3c" -down_revision: Union[str, Sequence[str], None] = "0a089912b5f0" +revision: str = "e28cd485ad3c" # pragma: allowlist secret +down_revision: Union[str, Sequence[str], None] = "0a089912b5f0" # pragma: allowlist secret branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None diff --git a/mcpgateway/main.py b/mcpgateway/main.py index a0250bd6d4..11b19a4e61 100644 --- a/mcpgateway/main.py +++ b/mcpgateway/main.py @@ -125,6 +125,7 @@ stop_plugin_invalidation_listener, ) from mcpgateway.plugins.violation_codes import PLUGIN_VIOLATION_CODE_MAPPING, PluginViolationCode, VALID_HTTP_STATUS_CODES +from mcpgateway.routers.openapi_schema_router import router as openapi_schema_router from mcpgateway.routers.server_well_known import router as server_well_known_router from mcpgateway.routers.well_known import router as well_known_router from mcpgateway.schemas import ( @@ -12101,6 +12102,7 @@ async def cleanup_import_statuses(max_age_hours: int = 24, user=Depends(get_curr app.include_router(metrics_router) app.include_router(tag_router) app.include_router(export_import_router) +app.include_router(openapi_schema_router) # Compliance report router (admin API) if settings.mcpgateway_admin_api_enabled: diff --git a/mcpgateway/routers/oauth_router.py b/mcpgateway/routers/oauth_router.py index ab36b5c65b..2b38ac30e5 100644 --- a/mcpgateway/routers/oauth_router.py +++ b/mcpgateway/routers/oauth_router.py @@ -379,6 +379,7 @@ async def initiate_oauth_flow(gateway_id: str, request: Request, current_user: E request: The FastAPI request object. current_user: The authenticated user initiating the OAuth flow. db: The database session dependency. + popup: Indicates if the OAuth flow is initiated in a popup window. Returns: A redirect response to the OAuth provider's authorization URL. diff --git a/mcpgateway/routers/openapi_schema_router.py b/mcpgateway/routers/openapi_schema_router.py new file mode 100644 index 0000000000..6702f34df0 --- /dev/null +++ b/mcpgateway/routers/openapi_schema_router.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +"""Location: ./mcpgateway/routers/openapi_schema_router.py +Copyright 2026 +SPDX-License-Identifier: Apache-2.0 + +OpenAPI Schema Generation Router. + +This module provides a versioned REST API endpoint for generating MCP tool +input/output schemas from OpenAPI specifications. It mirrors the functionality +of the admin endpoint but without CSRF protection, making it suitable for +API consumers and integrations. +""" + +# Standard +import logging +import urllib.parse + +# Third-Party +from fastapi import APIRouter, Body, Depends +from fastapi.responses import JSONResponse +import httpx +from pydantic import BaseModel, Field + +# First-Party +from mcpgateway.common.validators import SecurityValidator +from mcpgateway.middleware.rbac import get_current_user_with_permissions, require_permission +from mcpgateway.services.openapi_service import fetch_and_extract_schemas +from mcpgateway.utils.orjson_response import ORJSONResponse + +# Initialize router +router = APIRouter(prefix="/v1/tools", tags=["Tools"]) +logger = logging.getLogger(__name__) + + +class GenerateSchemaRequest(BaseModel): + """Request body for generating schemas from OpenAPI specification.""" + + url: str = Field(..., description="The tool URL (e.g., http://localhost:8100/calculate)") + request_type: str = Field("GET", description="HTTP method (GET, POST, etc.)") + openapi_url: str = Field("", description="Direct OpenAPI spec URL (optional, auto-discovered if empty)") + + +@router.post("/generate-schemas-from-openapi") +@require_permission("tools.create", allow_admin_bypass=False) +async def generate_schemas_from_openapi( + body: GenerateSchemaRequest = Body(...), + _user=Depends(get_current_user_with_permissions), +) -> JSONResponse: + """ + Generate input_schema and output_schema from OpenAPI specification. + + This endpoint is part of the versioned REST API and does not require + admin UI access. It delegates to the same service logic as the admin + endpoint but without CSRF protection. + + Args: + body: Request body with url, request_type, and openapi_url + _user: Authenticated user from RBAC dependency + + Returns: + JSONResponse with generated schemas or error message. + """ + # Security validation + try: + SecurityValidator.validate_url(body.url, "Tool URL") + except ValueError as e: + return ORJSONResponse( + content={"message": str(e), "success": False}, + status_code=400, + ) + + # Parse URL to extract base and path + parsed = urllib.parse.urlparse(body.url) + base_url = f"{parsed.scheme}://{parsed.netloc}" + tool_path = parsed.path + + # Fetch and extract schemas + try: + input_schema, output_schema, spec_url = await fetch_and_extract_schemas( + base_url=base_url, + path=tool_path, + method=body.request_type, + openapi_url=body.openapi_url or "", + timeout=10.0, + ) + except ValueError as e: + return ORJSONResponse( + content={"message": f"Security validation failed: {str(e)}", "success": False}, + status_code=400, + ) + except KeyError as e: + return ORJSONResponse( + content={"message": str(e), "success": False}, + status_code=404, + ) + except httpx.HTTPStatusError as e: + logger.warning("OpenAPI spec server returned HTTP %s", e.response.status_code, exc_info=True) + return ORJSONResponse( + content={"message": f"OpenAPI spec server returned HTTP {e.response.status_code}", "success": False}, + status_code=502, + ) + except httpx.HTTPError: + logger.warning("Failed to fetch OpenAPI spec", exc_info=True) + return ORJSONResponse( + content={"message": "Failed to fetch OpenAPI spec from the provided URL", "success": False}, + status_code=502, + ) + except Exception: + logger.error("Error fetching OpenAPI spec", exc_info=True) + return ORJSONResponse( + content={"message": "An unexpected error occurred while processing the OpenAPI spec", "success": False}, + status_code=500, + ) + + return ORJSONResponse( + content={ + "message": "Schemas generated successfully from OpenAPI spec", + "success": True, + "input_schema": input_schema, + "output_schema": output_schema, + "spec_url": spec_url, + }, + status_code=200, + ) diff --git a/tests/unit/mcpgateway/routers/test_openapi_schema_router.py b/tests/unit/mcpgateway/routers/test_openapi_schema_router.py new file mode 100644 index 0000000000..696b7ef291 --- /dev/null +++ b/tests/unit/mcpgateway/routers/test_openapi_schema_router.py @@ -0,0 +1,448 @@ +# -*- coding: utf-8 -*- +"""Location: ./tests/unit/mcpgateway/routers/test_openapi_schema_router.py +Copyright 2026 +SPDX-License-Identifier: Apache-2.0 + +Unit tests for the OpenAPI schema generation router. + +Tests cover: + - POST /v1/tools/generate-schemas-from-openapi: success, validation, error mapping + - RBAC enforcement (with real permission check) + - Default value handling +""" + +# Standard +from unittest.mock import AsyncMock, MagicMock, patch + +# Third-Party +import httpx +import pytest + +# Local +from tests.utils.rbac_mocks import patch_rbac_decorators, restore_rbac_decorators + +_originals = patch_rbac_decorators() +# First-Party +from mcpgateway.routers import openapi_schema_router as router_mod # noqa: E402 # pylint: disable=wrong-import-position + +restore_rbac_decorators(_originals) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _mock_user(): + """Return mock user context dict.""" + return {"email": "test@example.com", "is_admin": False} + + +def _create_body(url, request_type="GET", openapi_url=""): + """Create GenerateSchemaRequest body object.""" + from mcpgateway.routers.openapi_schema_router import GenerateSchemaRequest + + return GenerateSchemaRequest(url=url, request_type=request_type, openapi_url=openapi_url) + + +# --------------------------------------------------------------------------- +# Happy Path Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_generate_schema_success_all_fields(): + """Valid request with all fields returns schemas successfully.""" + body = _create_body( + url="http://api.example.com/calculate", + request_type="POST", + openapi_url="http://api.example.com/openapi.json", + ) + + mock_schemas = ( + {"type": "object", "properties": {"a": {"type": "number"}}}, + {"type": "object", "properties": {"result": {"type": "number"}}}, + "http://api.example.com/openapi.json", + ) + + with patch("mcpgateway.routers.openapi_schema_router.fetch_and_extract_schemas", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = mock_schemas + + response = await router_mod.generate_schemas_from_openapi(body, _user=_mock_user()) + + assert response.status_code == 200 + content = response.body + assert b'"success":true' in content + assert b'"message":"Schemas generated successfully from OpenAPI spec"' in content + assert b'"spec_url":"http://api.example.com/openapi.json"' in content + + +@pytest.mark.asyncio +async def test_generate_schema_success_minimal_fields(): + """Valid request with minimal fields applies defaults.""" + body = _create_body(url="http://api.example.com/calculate") + + mock_schemas = ( + {"type": "object"}, + {"type": "object"}, + "http://api.example.com/openapi.json", + ) + + with patch("mcpgateway.routers.openapi_schema_router.fetch_and_extract_schemas", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = mock_schemas + + response = await router_mod.generate_schemas_from_openapi(body, _user=_mock_user()) + + assert response.status_code == 200 + # Verify default request_type="GET" was used + mock_fetch.assert_called_once() + call_kwargs = mock_fetch.call_args[1] + assert call_kwargs["method"] == "GET" + + +@pytest.mark.asyncio +async def test_generate_schema_auto_discovery(): + """Empty openapi_url triggers auto-discovery.""" + body = _create_body(url="http://api.example.com/calculate", openapi_url="") + + mock_schemas = ( + {"type": "object"}, + {"type": "object"}, + "http://api.example.com/openapi.json", + ) + + with patch("mcpgateway.routers.openapi_schema_router.fetch_and_extract_schemas", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = mock_schemas + + response = await router_mod.generate_schemas_from_openapi(body, _user=_mock_user()) + + assert response.status_code == 200 + # Verify empty openapi_url was passed (triggers auto-discovery in service) + call_kwargs = mock_fetch.call_args[1] + assert call_kwargs["openapi_url"] == "" + + +# --------------------------------------------------------------------------- +# Input Validation Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_generate_schema_invalid_json(): + """Invalid JSON body returns 422 (Pydantic validation error).""" + # Pydantic validation happens before the function is called + # This test verifies that invalid input is rejected by FastAPI + # In practice, FastAPI returns 422 for validation errors + # We test this by attempting to create an invalid body object + with pytest.raises(Exception): # Pydantic will raise validation error + from mcpgateway.routers.openapi_schema_router import GenerateSchemaRequest + + GenerateSchemaRequest.model_validate_json('{invalid json}') + + +@pytest.mark.asyncio +async def test_generate_schema_missing_url(): + """Missing url field returns validation error.""" + # Pydantic validation happens before the function is called + with pytest.raises(Exception): # Pydantic will raise validation error + from mcpgateway.routers.openapi_schema_router import GenerateSchemaRequest + + GenerateSchemaRequest(request_type="POST") # Missing required 'url' + + +@pytest.mark.asyncio +async def test_generate_schema_empty_url(): + """Empty url field is accepted by Pydantic but rejected by security validation.""" + body = _create_body(url=" ") + + # Empty URL passes Pydantic but fails security validation + with patch("mcpgateway.routers.openapi_schema_router.SecurityValidator.validate_url") as mock_validate: + mock_validate.side_effect = ValueError("URL cannot be empty") + + response = await router_mod.generate_schemas_from_openapi(body, _user=_mock_user()) + + assert response.status_code == 400 + content = response.body + assert b'"success":false' in content + + +@pytest.mark.asyncio +async def test_generate_schema_invalid_url_format(): + """Invalid URL format returns 400 from security validation.""" + body = _create_body(url="not-a-valid-url") + + with patch("mcpgateway.routers.openapi_schema_router.SecurityValidator.validate_url") as mock_validate: + mock_validate.side_effect = ValueError("Invalid URL format") + + response = await router_mod.generate_schemas_from_openapi(body, _user=_mock_user()) + + assert response.status_code == 400 + content = response.body + assert b'"success":false' in content + assert b"Invalid URL format" in content + + +# --------------------------------------------------------------------------- +# Service Error Mapping Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_generate_schema_security_validation_error(): + """ValueError from security validation returns 400.""" + body = _create_body(url="http://api.example.com/calculate") + + with patch("mcpgateway.routers.openapi_schema_router.fetch_and_extract_schemas", new_callable=AsyncMock) as mock_fetch: + mock_fetch.side_effect = ValueError("Security validation failed: blocked domain") + + response = await router_mod.generate_schemas_from_openapi(body, _user=_mock_user()) + + assert response.status_code == 400 + content = response.body + assert b'"success":false' in content + assert b"Security validation failed" in content + + +@pytest.mark.asyncio +async def test_generate_schema_path_not_found(): + """KeyError (path/method not found) returns 404.""" + body = _create_body(url="http://api.example.com/nonexistent") + + with patch("mcpgateway.routers.openapi_schema_router.fetch_and_extract_schemas", new_callable=AsyncMock) as mock_fetch: + mock_fetch.side_effect = KeyError("Path /nonexistent not found in OpenAPI spec") + + response = await router_mod.generate_schemas_from_openapi(body, _user=_mock_user()) + + assert response.status_code == 404 + content = response.body + assert b'"success":false' in content + + +@pytest.mark.asyncio +async def test_generate_schema_http_status_error(): + """httpx.HTTPStatusError returns 502.""" + body = _create_body(url="http://api.example.com/calculate") + + mock_response = MagicMock() + mock_response.status_code = 404 + + with patch("mcpgateway.routers.openapi_schema_router.fetch_and_extract_schemas", new_callable=AsyncMock) as mock_fetch: + mock_fetch.side_effect = httpx.HTTPStatusError("Not Found", request=MagicMock(), response=mock_response) + + response = await router_mod.generate_schemas_from_openapi(body, _user=_mock_user()) + + assert response.status_code == 502 + content = response.body + assert b'"success":false' in content + assert b"OpenAPI spec server returned HTTP 404" in content + + +@pytest.mark.asyncio +async def test_generate_schema_http_error(): + """httpx.HTTPError returns 502.""" + body = _create_body(url="http://api.example.com/calculate") + + with patch("mcpgateway.routers.openapi_schema_router.fetch_and_extract_schemas", new_callable=AsyncMock) as mock_fetch: + mock_fetch.side_effect = httpx.HTTPError("Connection failed") + + response = await router_mod.generate_schemas_from_openapi(body, _user=_mock_user()) + + assert response.status_code == 502 + content = response.body + assert b'"success":false' in content + assert b"Failed to fetch OpenAPI spec" in content + + +@pytest.mark.asyncio +async def test_generate_schema_generic_exception(): + """Generic Exception returns 500.""" + body = _create_body(url="http://api.example.com/calculate") + + with patch("mcpgateway.routers.openapi_schema_router.fetch_and_extract_schemas", new_callable=AsyncMock) as mock_fetch: + mock_fetch.side_effect = Exception("Unexpected error") + + response = await router_mod.generate_schemas_from_openapi(body, _user=_mock_user()) + + assert response.status_code == 500 + content = response.body + assert b'"success":false' in content + assert b"An unexpected error occurred" in content + + +# --------------------------------------------------------------------------- +# Default Value Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_generate_schema_request_type_defaults_to_get(): + """request_type defaults to GET when omitted.""" + body = _create_body(url="http://api.example.com/calculate") + + mock_schemas = ({"type": "object"}, {"type": "object"}, "http://api.example.com/openapi.json") + + with patch("mcpgateway.routers.openapi_schema_router.fetch_and_extract_schemas", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = mock_schemas + + await router_mod.generate_schemas_from_openapi(body, _user=_mock_user()) + + call_kwargs = mock_fetch.call_args[1] + assert call_kwargs["method"] == "GET" + + +@pytest.mark.asyncio +async def test_generate_schema_openapi_url_can_be_empty(): + """openapi_url can be empty (triggers auto-discovery).""" + body = _create_body(url="http://api.example.com/calculate", openapi_url="") + + mock_schemas = ({"type": "object"}, {"type": "object"}, "http://api.example.com/openapi.json") + + with patch("mcpgateway.routers.openapi_schema_router.fetch_and_extract_schemas", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = mock_schemas + + await router_mod.generate_schemas_from_openapi(body, _user=_mock_user()) + + call_kwargs = mock_fetch.call_args[1] + assert call_kwargs["openapi_url"] == "" + + +# --------------------------------------------------------------------------- +# URL Parsing Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_generate_schema_url_parsing(): + """URL is correctly parsed into base_url and path.""" + body = _create_body(url="https://api.example.com:8080/v1/calculate") + + mock_schemas = ({"type": "object"}, {"type": "object"}, "https://api.example.com:8080/openapi.json") + + with patch("mcpgateway.routers.openapi_schema_router.fetch_and_extract_schemas", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = mock_schemas + + await router_mod.generate_schemas_from_openapi(body, _user=_mock_user()) + + call_kwargs = mock_fetch.call_args[1] + assert call_kwargs["base_url"] == "https://api.example.com:8080" + assert call_kwargs["path"] == "/v1/calculate" + + +# --------------------------------------------------------------------------- +# RBAC Tests (Real Permission Check) +# --------------------------------------------------------------------------- + + +def test_generate_schemas_403_when_permission_denied(monkeypatch: pytest.MonkeyPatch): + """Endpoint returns 403 via ASGI when user lacks tools.create permission. + + Drives the request through TestClient (ASGI routing) rather than calling + the function directly, so it verifies that @require_permission is wired + correctly on the registered route — not just on the bare function. + """ + import importlib + + from fastapi import FastAPI + from fastapi.testclient import TestClient + from mcpgateway.middleware.rbac import get_current_user_with_permissions + from tests.utils.rbac_mocks import patch_rbac_decorators, restore_rbac_decorators + + # Restore real decorators so the route is decorated with the real @require_permission + restore_rbac_decorators(_originals) + + # Reload router to pick up the real decorator + importlib.reload(router_mod) + + # Monkeypatch PermissionService to deny all permissions + class DenyAll: + def __init__(self, _db): + pass + + async def check_permission(self, **_kwargs): + return False + + monkeypatch.setattr("mcpgateway.middleware.rbac.PermissionService", DenyAll) + + # Mount the freshly-reloaded router on a throwaway app + test_app = FastAPI() + test_app.include_router(router_mod.router) + + # Override auth dependency to return an unprivileged user (no DB lookup) + async def unprivileged_user(): + return {"email": "user@example.com", "is_admin": False, "ip_address": "127.0.0.1", "user_agent": "tests"} + + test_app.dependency_overrides[get_current_user_with_permissions] = unprivileged_user + + client = TestClient(test_app, raise_server_exceptions=False) + + response = client.post( + "/v1/tools/generate-schemas-from-openapi", + json={"url": "http://api.example.com/calculate"}, + ) + + assert response.status_code == 403, f"Expected 403, got {response.status_code}: {response.text}" + assert "access denied" in response.json().get("detail", "").lower() + + # Re-patch decorators and reload for remaining tests + patch_rbac_decorators() + importlib.reload(router_mod) + + +# --------------------------------------------------------------------------- +# TestClient Integration Tests +# --------------------------------------------------------------------------- + + +def test_endpoint_via_testclient_validates_route_registration(): + """Verify route is registered and accessible via ASGI app without CSRF token. + + This test confirms: + 1. Route registration works when the router is mounted on a FastAPI app + 2. Pydantic validation returns 422 for invalid input (not 400) + 3. No CSRF token is required (unlike admin endpoint) + 4. Valid request with mocked service returns 200 + """ + from fastapi import FastAPI + from fastapi.testclient import TestClient + from mcpgateway.middleware.rbac import get_current_user_with_permissions + from mcpgateway.routers.openapi_schema_router import router as schema_router + + test_app = FastAPI() + test_app.include_router(schema_router) + + async def mock_user(): + return {"email": "test@example.com", "is_admin": False} + + test_app.dependency_overrides[get_current_user_with_permissions] = mock_user + + client = TestClient(test_app, raise_server_exceptions=False) + + # Test 1: Invalid request body (missing required 'url') should return 422 (Pydantic validation) + response = client.post( + "/v1/tools/generate-schemas-from-openapi", + json={"invalid": "data"}, + ) + assert response.status_code == 422, f"Expected 422, got {response.status_code}" + assert "detail" in response.json() + + # Test 2: Valid request returns 200 without CSRF token + with patch("mcpgateway.routers.openapi_schema_router.fetch_and_extract_schemas", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = ( + {"type": "object", "properties": {"a": {"type": "number"}}}, + {"type": "object", "properties": {"result": {"type": "number"}}}, + "http://api.example.com/openapi.json", + ) + with patch("mcpgateway.routers.openapi_schema_router.SecurityValidator.validate_url"): + response = client.post( + "/v1/tools/generate-schemas-from-openapi", + json={ + "url": "http://api.example.com/calculate", + "request_type": "GET", + "openapi_url": "http://api.example.com/openapi.json", + }, + ) + assert response.status_code == 200, f"Expected 200, got {response.status_code}: {response.text}" + data = response.json() + assert data["success"] is True + assert data["message"] == "Schemas generated successfully from OpenAPI spec" + # Confirm no CSRF token was needed (request succeeded without X-CSRF-Token header) diff --git a/tests/utils/rbac_mocks.py b/tests/utils/rbac_mocks.py index d121d457f7..b596c550fd 100644 --- a/tests/utils/rbac_mocks.py +++ b/tests/utils/rbac_mocks.py @@ -265,7 +265,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.app.dependency_overrides.update(self.original_overrides) -def mock_require_permission_decorator(permission: str, resource_type: Optional[str] = None): +def mock_require_permission_decorator(permission: str, resource_type: Optional[str] = None, allow_admin_bypass: bool = True): """Mock version of the require_permission decorator that always allows access. This decorator bypasses all permission checks and simply executes the @@ -274,6 +274,7 @@ def mock_require_permission_decorator(permission: str, resource_type: Optional[s Args: permission: Required permission (ignored in mock) resource_type: Optional resource type (ignored in mock) + allow_admin_bypass: Whether to allow admin bypass (ignored in mock) Returns: Callable: A decorator that doesn't perform any permission checks @@ -301,12 +302,13 @@ def decorator(func): return decorator -def mock_require_any_permission(permissions, resource_type: Optional[str] = None): +def mock_require_any_permission(permissions, resource_type: Optional[str] = None, allow_admin_bypass: bool = True): """Mock version of require_any_permission that always allows access. Args: permissions: List of permissions (ignored in mock) resource_type: Optional resource type (ignored in mock) + allow_admin_bypass: Whether to allow admin bypass (ignored in mock) Returns: Callable: A decorator that doesn't perform any permission checks