diff --git a/.github/workflows/build-branch.yml b/.github/workflows/build-branch.yml
index b3ceacd..ef640c2 100644
--- a/.github/workflows/build-branch.yml
+++ b/.github/workflows/build-branch.yml
@@ -94,6 +94,16 @@ jobs:
- id: checkout_files
name: Checkout Files
uses: actions/checkout@v4
+ - name: Install Modern Docker CLI
+ run: |
+ sudo apt-get update
+ sudo apt-get install -y ca-certificates curl gnupg
+ sudo install -m 0755 -d /etc/apt/keyrings
+ curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
+ sudo chmod a+r /etc/apt/keyrings/docker.gpg
+ echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu jammy stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
+ sudo apt-get update
+ sudo apt-get install -y docker-ce-cli
- name: Plane MCP Server Build and Push
uses: makeplane/actions/build-push@v1.0.0
with:
@@ -102,7 +112,7 @@ jobs:
release-version: ${{ needs.release_build_setup.outputs.release_version }}
dockerhub-username: ${{ secrets.DOCKERHUB_USERNAME }}
dockerhub-token: ${{ secrets.DOCKERHUB_TOKEN }}
- docker-image-owner: makeplane
+ docker-image-owner: ${{ secrets.DOCKERHUB_USERNAME }}
docker-image-name: ${{ needs.release_build_setup.outputs.dh_img_name }}
build-context: .
dockerfile-path: ./Dockerfile
@@ -115,11 +125,7 @@ jobs:
if: ${{ needs.release_build_setup.outputs.build_type == 'Release' }}
name: Build Release
runs-on: ubuntu-22.04
- needs:
- [
- release_build_setup,
- build_and_push,
- ]
+ needs: [release_build_setup, build_and_push]
env:
REL_VERSION: ${{ needs.release_build_setup.outputs.release_version }}
steps:
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
new file mode 100644
index 0000000..fb55192
--- /dev/null
+++ b/.github/workflows/ci.yml
@@ -0,0 +1,79 @@
+name: CI / CD
+
+on:
+ push:
+ branches:
+ - main
+ - master
+ - ai-journeys-rearchitecture
+ - plane-agent
+ pull_request:
+ branches:
+ - main
+ - master
+ workflow_dispatch:
+
+permissions:
+ contents: read
+
+jobs:
+ test:
+ name: Run Tests
+ runs-on: ubuntu-latest
+ container:
+ image: python:3.12
+ steps:
+ - name: Setup and Checkout
+ uses: actions/checkout@v4
+
+ - name: Run tests
+ run: |
+ set +e
+ pip install '.[dev]' pytest pytest-asyncio pytest-mock pytest-timeout respx requests-mock anyio
+ pytest -v > test-output.log 2>&1
+ TEST_EXIT=$?
+ cat test-output.log
+ exit $TEST_EXIT
+
+ build-and-push:
+ name: Build & Push Docker
+ needs: test
+ runs-on: ubuntu-latest
+ if: github.event_name == 'push' || github.event_name == 'workflow_dispatch'
+ permissions:
+ packages: write
+ contents: read
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v4
+
+ - name: Set up QEMU
+ uses: docker/setup-qemu-action@v3
+
+ - name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v3
+
+ - name: Log in to Docker Hub
+ uses: docker/login-action@v3
+ with:
+ username: ${{ secrets.DOCKERHUB_USERNAME }}
+ password: ${{ secrets.DOCKERHUB_TOKEN }}
+
+ - name: Set image tags
+ id: tags
+ run: |
+ BRANCH_TAG=$(echo "${{ github.ref_name }}" | sed 's|/|-|g')
+ IMAGE="${{ secrets.DOCKERHUB_USERNAME }}/plane-mcp-server"
+ TAGS="${IMAGE}:${BRANCH_TAG}"
+ if [ "${{ github.ref_name }}" = "main" ] || [ "${{ github.ref_name }}" = "master" ]; then
+ TAGS="${TAGS},${IMAGE}:latest"
+ fi
+ echo "tags=${TAGS}" >> "$GITHUB_OUTPUT"
+
+ - name: Build and push
+ uses: docker/build-push-action@v5
+ with:
+ context: .
+ push: true
+ platforms: linux/amd64,linux/arm64
+ tags: ${{ steps.tags.outputs.tags }}
diff --git a/.gitignore b/.gitignore
index 88f9cb1..fcda810 100644
--- a/.gitignore
+++ b/.gitignore
@@ -55,4 +55,4 @@ dmypy.json
.env.test.local
# Ignore cursor AI rules
-.cursor/rules/codacy.mdc
+.cursor/rules/codacy.mdc
\ No newline at end of file
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..0cda472
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,31 @@
+# This docker-compose configuration is intended for stand-alone MCP server testing.
+services:
+ plane-mcp:
+ build: .
+ container_name: plane-mcp
+ restart: unless-stopped
+ ports:
+ - "${FASTMCP_PORT:-8211}:${FASTMCP_PORT:-8211}"
+ environment:
+ - PLANE_API_KEY=${PLANE_API_KEY}
+ - PLANE_WORKSPACE_SLUG=${PLANE_WORKSPACE_SLUG}
+ - PLANE_BASE_URL=${PLANE_BASE_URL}
+ - FASTMCP_PORT=${FASTMCP_PORT:-8211}
+ - REDIS_HOST=redis
+ - REDIS_PORT=6379
+ entrypoint: ["python", "-m", "plane_mcp"]
+ command: ["http"]
+ depends_on:
+ - redis
+
+ redis:
+ image: redis:alpine
+ container_name: plane-redis
+ restart: unless-stopped
+ ports:
+ - "6379:6379"
+ volumes:
+ - redis_data:/data
+
+volumes:
+ redis_data:
diff --git a/plane_mcp/__init__.py b/plane_mcp/__init__.py
index a7ff0b0..aef2bcd 100644
--- a/plane_mcp/__init__.py
+++ b/plane_mcp/__init__.py
@@ -1 +1,31 @@
"""Plane MCP Server - A Model Context Protocol server for Plane integration."""
+
+import os
+from contextlib import asynccontextmanager
+from fastmcp import FastMCP
+
+_original_fastmcp_init = FastMCP.__init__
+
+def _patched_fastmcp_init(self, *args, **kwargs):
+ if "tasks" not in kwargs:
+ kwargs["tasks"] = os.getenv("PLANE_ALLOW_MEMORY_TASKS", "false").lower() == "true"
+ _original_fastmcp_init(self, *args, **kwargs)
+ self.__mcp_patched_tasks_enabled = kwargs.get("tasks", False)
+
+FastMCP.__init__ = _patched_fastmcp_init
+
+_original_docket_lifespan = FastMCP._docket_lifespan
+
+@asynccontextmanager
+async def _patched_docket_lifespan(self):
+ tasks_enabled = getattr(self, "__mcp_patched_tasks_enabled", True)
+ if not tasks_enabled:
+ try:
+ yield
+ finally:
+ pass
+ return
+ async with _original_docket_lifespan(self):
+ yield
+
+FastMCP._docket_lifespan = _patched_docket_lifespan
diff --git a/plane_mcp/__main__.py b/plane_mcp/__main__.py
index d943d90..8af9170 100644
--- a/plane_mcp/__main__.py
+++ b/plane_mcp/__main__.py
@@ -4,7 +4,7 @@
import logging
import os
import sys
-from contextlib import asynccontextmanager
+from contextlib import AsyncExitStack, asynccontextmanager
from datetime import datetime, timezone
from enum import Enum
@@ -61,13 +61,12 @@ class ServerMode(Enum):
@asynccontextmanager
-async def combined_lifespan(oauth_app, header_app, sse_app):
- """Combine lifespans from both OAuth and Header MCP apps."""
- # Start both lifespans
- async with oauth_app.lifespan(oauth_app):
- async with header_app.lifespan(header_app):
- async with sse_app.lifespan(sse_app):
- yield
+async def combined_lifespan(apps):
+ """Combine lifespans from multiple MCP apps."""
+ async with AsyncExitStack() as stack:
+ for app in apps:
+ await stack.enter_async_context(app.lifespan(app))
+ yield
def main() -> None:
@@ -83,7 +82,10 @@ def main() -> None:
if not os.getenv("PLANE_WORKSPACE_SLUG"):
raise ValueError("PLANE_WORKSPACE_SLUG is not set")
- get_stdio_mcp().run()
+ from plane_mcp.journey.tools import register_tools as register_journey_tools
+ stdio_mcp = get_stdio_mcp()
+ register_journey_tools(stdio_mcp)
+ stdio_mcp.run()
return
if server_mode == ServerMode.HTTP:
@@ -97,17 +99,35 @@ def main() -> None:
oauth_well_known = oauth_mcp.auth.get_well_known_routes(mcp_path="/mcp")
sse_well_known = sse_mcp.auth.get_well_known_routes(mcp_path="/sse")
+ # --- AGENT JOURNEY API ---
+ from plane_mcp.journey.server import (
+ get_header_mcp as journey_get_header_mcp,
+ get_oauth_mcp as journey_get_oauth_mcp
+ )
+ journey_oauth_mcp = journey_get_oauth_mcp("/agent")
+ journey_oauth_app = journey_oauth_mcp.http_app(stateless_http=True)
+
+ journey_header_mcp = journey_get_header_mcp()
+ journey_header_app = journey_header_mcp.http_app(stateless_http=True)
+
+ journey_oauth_well_known = []
+ if hasattr(journey_oauth_mcp, 'auth') and journey_oauth_mcp.auth:
+ journey_oauth_well_known = journey_oauth_mcp.auth.get_well_known_routes(mcp_path="/agent/mcp")
+
app = Starlette(
routes=[
# Well-known routes for OAuth and Header HTTP
*oauth_well_known,
*sse_well_known,
+ *journey_oauth_well_known,
# Mount both MCP servers
Mount("/http/api-key", app=header_app),
Mount("/http", app=oauth_app),
+ Mount("/agent/api-key", app=journey_header_app),
+ Mount("/agent", app=journey_oauth_app),
Mount("/", app=sse_app),
],
- lifespan=lambda app: combined_lifespan(oauth_app, header_app, sse_app),
+ lifespan=lambda app: combined_lifespan([oauth_app, header_app, sse_app, journey_oauth_app, journey_header_app]),
)
app.add_middleware(
diff --git a/plane_mcp/journey/__init__.py b/plane_mcp/journey/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/plane_mcp/journey/__main__.py b/plane_mcp/journey/__main__.py
new file mode 100644
index 0000000..24a9664
--- /dev/null
+++ b/plane_mcp/journey/__main__.py
@@ -0,0 +1,88 @@
+"""Main entry point for the Plane MCP Server."""
+
+import os
+import sys
+from contextlib import asynccontextmanager
+from enum import Enum
+
+import uvicorn
+from fastmcp.utilities.logging import get_logger
+from starlette.applications import Starlette
+from starlette.middleware.cors import CORSMiddleware
+from starlette.routing import Mount
+
+from plane_mcp.journey.server import get_header_mcp, get_stdio_mcp
+
+logger = get_logger(__name__)
+
+
+class ServerMode(Enum):
+ STDIO = "stdio"
+ SSE = "sse"
+ HTTP = "http"
+
+
+@asynccontextmanager
+async def combined_lifespan(oauth_app, header_app, sse_app):
+ """Combine lifespans from both OAuth and Header MCP apps."""
+ # Start both lifespans
+ async with oauth_app.lifespan(oauth_app):
+ async with header_app.lifespan(header_app):
+ async with sse_app.lifespan(sse_app):
+ yield
+
+
+def main() -> None:
+ """Run the MCP server."""
+ server_mode = ServerMode.STDIO
+ if len(sys.argv) > 1:
+ try:
+ server_mode = ServerMode(sys.argv[1])
+ except ValueError:
+ valid_modes = ", ".join(m.value for m in ServerMode)
+ raise ValueError(f"Invalid server mode '{sys.argv[1]}'. Valid modes: {valid_modes}") from None
+
+ if server_mode == ServerMode.STDIO:
+ # Validate API_KEY and PLANE_WORKSPACE_SLUG are set
+ if not os.getenv("PLANE_API_KEY"):
+ raise ValueError("PLANE_API_KEY is not set")
+ if not os.getenv("PLANE_WORKSPACE_SLUG"):
+ raise ValueError("PLANE_WORKSPACE_SLUG is not set")
+
+ get_stdio_mcp().run()
+ return
+
+ if server_mode == ServerMode.SSE:
+ raise NotImplementedError(
+ "SSE mode is not implemented in the agent server. "
+ "Use 'stdio' or 'http'. SSE transport is defined in the MCP spec "
+ "but not supported by this endpoint."
+ )
+
+ if server_mode == ServerMode.HTTP:
+ http_mcp = get_header_mcp()
+ http_app = http_mcp.http_app(transport="streamable-http")
+
+ app = Starlette(
+ routes=[
+ Mount("/", app=http_app),
+ ],
+ lifespan=lambda app: http_app.lifespan(http_app),
+ )
+
+ app.add_middleware(
+ CORSMiddleware,
+ allow_origins=["*"],
+ allow_credentials=False,
+ allow_methods=["*"],
+ allow_headers=["*"],
+ )
+
+ port = int(os.getenv("FASTMCP_PORT", "8211"))
+ logger.info(f"Starting HTTP server for Streamable HTTP at / on port {port}")
+ uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")
+ return
+
+
+if __name__ == "__main__":
+ main()
diff --git a/plane_mcp/journey/base.py b/plane_mcp/journey/base.py
new file mode 100644
index 0000000..3e399ef
--- /dev/null
+++ b/plane_mcp/journey/base.py
@@ -0,0 +1,138 @@
+"""Base classes and utilities for Journey-based AI tools."""
+
+from collections.abc import Callable
+from functools import wraps
+from typing import Any, TypeVar, cast
+
+from plane_mcp.journey.lod import LODProfile, apply_lod
+from plane_mcp.resolver import EntityResolver
+
+T = TypeVar('T', bound=Callable[..., Any])
+
+
+class JourneyBase:
+ """
+ Base class for Journey tools. Provides EntityResolver wiring
+ and utilities to aggressively filter Level of Detail.
+ """
+
+ def __init__(self, resolver: EntityResolver):
+ self.resolver = resolver
+
+ def apply_lod(
+ self, data: Any, profile: LODProfile = LODProfile.SUMMARY, project_identifier: str | None = None
+ ) -> Any:
+ """
+ Applies LOD profile, returning clean minimized data for AI context.
+ """
+ return apply_lod(data, profile=profile, project_identifier=project_identifier)
+
+ def parse_ticket_id(self, ticket_id: str) -> tuple[str, int]:
+ """
+ Parses a typical sequence ID (ENG-123) into project_identifier and sequence_id.
+ """
+ if not isinstance(ticket_id, str):
+ raise ValueError(f"Invalid ticket_id: expected string, got {type(ticket_id).__name__}")
+
+ parts = ticket_id.split("-")
+ if len(parts) != 2:
+ raise ValueError(f"Invalid ticket ID format: '{ticket_id}'. Expected format like 'ENG-123'.")
+
+ project_identifier = parts[0]
+ try:
+ issue_sequence = int(parts[1])
+ except ValueError:
+ raise ValueError(f"Invalid ticket sequence in '{ticket_id}'. Must be an integer (e.g., 123).") from None
+
+ return project_identifier, issue_sequence
+
+def with_lod(profile: LODProfile = LODProfile.SUMMARY) -> Callable[[T], T]:
+ """
+ Decorator for Journey tools to automatically apply an LOD profile to the output.
+ Can be applied to methods of JourneyBase subclasses or any function returning dict/list.
+ """
+ def decorator(func: T) -> T:
+ @wraps(func)
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
+ result = func(*args, **kwargs)
+ # Try to get project_identifier from kwargs if available for sequence injection
+ project_identifier = kwargs.get("project_identifier")
+
+ # If not in kwargs but it's a typical ticket_id arg
+ if not project_identifier and "ticket_id" in kwargs:
+ try:
+ project_identifier = kwargs["ticket_id"].split("-")[0]
+ except (AttributeError, IndexError):
+ pass # ticket_id format doesn't match expected pattern
+
+ # Extract self if it's a method
+ if args and hasattr(args[0], "apply_lod"):
+ # Let the base class handle it
+ return args[0].apply_lod(result, profile=profile, project_identifier=project_identifier)
+ else:
+ return apply_lod(result, profile=profile, project_identifier=project_identifier)
+ return cast(T, wrapper)
+ return decorator
+
+
+def mcp_error_boundary(func: T) -> T:
+ """
+ Decorator to wrap MCP tool executions and catch unhandled exceptions,
+ returning them as a formatted string or dict to prevent 500 errors.
+ """
+ @wraps(func)
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
+ try:
+ return func(*args, **kwargs)
+ except Exception as e:
+ try:
+ import inspect
+ import logging
+ import traceback
+
+ error_details = traceback.format_exc()
+ logging.error(f"Error in {func.__name__}:\n{error_details}")
+
+ # Determine if this is a "handled" error that shouldn't pollute LLM context with stack traces
+ is_handled = isinstance(e, ValueError)
+
+ try:
+ from pydantic import ValidationError as PydanticValidationError
+ if isinstance(e, PydanticValidationError):
+ is_handled = True
+ except ImportError:
+ pass
+
+ try:
+ from pydantic_core import ValidationError as PydanticCoreValidationError
+ if isinstance(e, PydanticCoreValidationError):
+ is_handled = True
+ except ImportError:
+ pass
+
+ try:
+ from plane.errors.errors import HttpError
+ if isinstance(e, HttpError):
+ is_handled = True
+ except ImportError:
+ pass
+
+ error_msg = f"Error executing tool '{func.__name__}': {str(e)}"
+ if not is_handled:
+ error_msg += f"\n\nDetails: {error_details}"
+
+ # Try to determine return type safely
+ try:
+ sig = inspect.signature(func)
+ if sig.return_annotation is list or str(sig.return_annotation).startswith("list"):
+ return [{"error": error_msg}]
+ except Exception:
+ pass
+
+ return {"error": error_msg}
+ except Exception as inner_e:
+ # Absolute fallback if error handling itself fails
+ return {
+ "error": f"CRITICAL: Tool {func.__name__} failed, and error handler also crashed: {str(inner_e)}"
+ }
+ return cast(T, wrapper)
diff --git a/plane_mcp/journey/cache.py b/plane_mcp/journey/cache.py
new file mode 100644
index 0000000..f717442
--- /dev/null
+++ b/plane_mcp/journey/cache.py
@@ -0,0 +1,123 @@
+import json
+import logging
+import os
+import tempfile
+import time
+
+
+def get_cached_workspace_context(cache_ttl_seconds: int = 300) -> dict:
+ from plane_mcp.client import get_plane_client_context
+ ctx = None # Always initialised before use; set inside try so fallback is safe
+ try:
+ ctx = get_plane_client_context()
+ ws = ctx.workspace_slug or "default"
+ except Exception:
+ ws = "default"
+ # Sanitize slug for use as a filename component
+ safe_ws = "".join(c if c.isalnum() or c in "-_" else "_" for c in ws)
+ cache_dir = os.path.join(tempfile.gettempdir(), "plane_mcp")
+ os.makedirs(cache_dir, exist_ok=True)
+ cache_file = os.path.join(cache_dir, f"workspace_{safe_ws}_context_cache.json")
+
+ context = {}
+ if os.path.exists(cache_file):
+ if time.time() - os.path.getmtime(cache_file) < cache_ttl_seconds:
+ try:
+ with open(cache_file) as f:
+ context = json.load(f)
+ except Exception as e:
+ logging.getLogger(__name__).debug(f"Cache read failed for {cache_file}, will refresh: {e}")
+
+ if not context:
+ try:
+ if ctx is not None and ctx.client and ctx.workspace_slug:
+ response = ctx.client.projects.list(workspace_slug=ctx.workspace_slug)
+
+ projects = []
+
+ for p in response.results:
+ proj_dict = {
+ "project_slug": p.identifier,
+ "name": p.name,
+ "description": getattr(p, "description", "") or ""
+ }
+ try:
+ s_res = ctx.client.states.list(workspace_slug=ctx.workspace_slug, project_id=p.id)
+ proj_dict["states"] = [s.name for s in s_res.results if s.name]
+ except Exception:
+ proj_dict["states"] = []
+
+ try:
+ label_res = ctx.client.labels.list(workspace_slug=ctx.workspace_slug, project_id=p.id)
+ proj_dict["labels"] = [label.name for label in label_res.results if label.name]
+ except Exception:
+ proj_dict["labels"] = []
+
+ projects.append(proj_dict)
+
+ all_states = sorted({s for p in projects for s in p.get("states", [])})
+ all_labels = sorted({label for p in projects for label in p.get("labels", [])})
+
+ try:
+ stickies_res = ctx.client.stickies.list(workspace_slug=ctx.workspace_slug)
+ stickies = [
+ {
+ "name": s.name or "Untitled",
+ "description": s.description_stripped or ""
+ }
+ for s in stickies_res.results
+ ]
+ except Exception:
+ stickies = []
+
+ context = {
+ "projects": projects,
+ "priorities": ["urgent", "high", "medium", "low", "none"],
+ "all_states": all_states,
+ "all_labels": all_labels,
+ "stickies": stickies
+ }
+
+ with open(cache_file, "w") as f:
+ json.dump(context, f)
+ except Exception as e:
+ logging.getLogger(__name__).warning(f"Failed to fetch workspace context: {e}")
+ return {"error": f"Could not connect to Plane API: {e}. Check PLANE_API_KEY and PLANE_BASE_URL."}
+
+ return context
+
+def get_cached_project_slugs_docstring(cache_ttl_seconds: int = 300, full_descriptions: bool = False) -> str:
+ context = get_cached_workspace_context(cache_ttl_seconds)
+ projects = context.get("projects", [])
+
+ if not projects:
+ return "(e.g., 'PLANE' or 'TEST'). If you are unsure of a project_slug, make your best logical guess."
+
+ if full_descriptions:
+ lines = ["valid slugs:"]
+ for p in projects:
+ desc = f" - {p['description']}" if p.get("description") else f" - {p.get('name', '')}"
+ lines.append(f" * {p['identifier']}: {desc}")
+ return "\n".join(lines)
+ else:
+ slugs = [p["identifier"] for p in projects]
+ return f"valid slugs: {', '.join(slugs)}"
+
+def get_cached_states_string() -> str:
+ context = get_cached_workspace_context()
+ states = context.get("all_states", context.get("states", []))
+ if not states:
+ return "'In Progress', 'Backlog', 'Done'"
+ return ", ".join([f"'{s}'" for s in states])
+
+def get_cached_labels_string() -> str:
+ context = get_cached_workspace_context()
+ labels = context.get("all_labels", context.get("labels", []))
+ if not labels:
+ return "'bug', 'feature'"
+ return ", ".join([f"'{label}'" for label in labels])
+
+def get_cached_priorities_string() -> str:
+ context = get_cached_workspace_context()
+ priorities = context.get("priorities", ["urgent", "high", "medium", "low", "none"])
+ return ", ".join([f"'{p}'" for p in priorities])
diff --git a/plane_mcp/journey/lod.py b/plane_mcp/journey/lod.py
new file mode 100644
index 0000000..84fc7e6
--- /dev/null
+++ b/plane_mcp/journey/lod.py
@@ -0,0 +1,184 @@
+"""Level of Detail (LOD) filtering system to strip verbose REST metadata."""
+import logging
+import uuid as uuid_module
+from enum import Enum
+from typing import Any
+
+from markdownify import markdownify
+
+from plane_mcp.client import get_plane_client_context
+from plane_mcp.resolver import EntityResolver
+
+logger = logging.getLogger(__name__)
+
+class LODProfile(Enum):
+ SUMMARY = "summary"
+ STANDARD = "standard"
+ FULL = "full"
+
+# Summary: Minimum viable fields for AI context
+SUMMARY_FIELDS = {
+ "ticket_id", "name", "state", "priority", "assignees"
+}
+
+# Standard: Default ticket read fields (Key, Name, Details/description, priority, labels, state)
+STANDARD_FIELDS = {
+ "ticket_id", "name", "description_html", "priority", "labels", "state"
+}
+
+def inject_sequence_id(data: dict[str, Any], project_identifier: str | None = None) -> None:
+ """
+ Consistently inject sequence IDs (e.g., 'ENG-123') into the data dictionary
+ to enable zero-lookup chaining for subsequent AI actions.
+ """
+ if "ticket_id" in data:
+ return
+
+ proj_id = project_identifier or data.get("project_identifier")
+ if not proj_id and "project_detail" in data and isinstance(data["project_detail"], dict):
+ proj_id = data["project_detail"].get("identifier")
+
+ if proj_id and "sequence_id" in data:
+ data["ticket_id"] = f"{proj_id}-{data['sequence_id']}"
+
+def _hydrate_state(data: dict[str, Any], project_identifier: str | None = None) -> None:
+ """If state is a raw UUID, attempt to hydrate its name."""
+ state_val = data.get("state")
+ if state_val:
+ state_str = str(state_val)
+ try:
+ uuid_module.UUID(state_str)
+ except ValueError:
+ return # Not a UUID, nothing to hydrate
+ try:
+ client, workspace_slug = get_plane_client_context()
+ resolver = EntityResolver(client, workspace_slug)
+
+ # Figure out project UUID
+ proj_id = None
+ if data.get("project"):
+ proj_id = str(data.get("project"))
+ elif data.get("project_id"):
+ proj_id = str(data.get("project_id"))
+ elif project_identifier:
+ proj_id = str(resolver.resolve_project(project_identifier))
+
+ if proj_id:
+ state_obj = client.states.retrieve(
+ workspace_slug=workspace_slug, project_id=proj_id, state_id=state_str
+ )
+ data["state"] = {"name": state_obj.name, "id": state_str}
+ except Exception as e:
+ logger.debug("Could not hydrate state %s: %s", state_str, e)
+
+def _clean_html(html_str: str) -> str:
+ if not html_str:
+ return ""
+ return markdownify(html_str, heading_style="ATX", bullet_list_marker="-").strip()
+
+def _apply_lod_to_dict(
+ data: dict[str, Any], profile: LODProfile, project_identifier: str | None = None
+) -> dict[str, Any]:
+ inject_sequence_id(data, project_identifier)
+ _hydrate_state(data, project_identifier)
+
+ result = {}
+ if profile == LODProfile.SUMMARY:
+ for key, value in data.items():
+ if key in SUMMARY_FIELDS:
+ out_key = "issue_key" if key == "ticket_id" else key
+ if key == "state" and isinstance(value, dict) and "name" in value:
+ result[out_key] = value["name"]
+ else:
+ result[out_key] = value
+
+ # Always prefer human-readable state name from state_detail over raw UUID
+ if "state_detail" in data and isinstance(data["state_detail"], dict) and "name" in data["state_detail"]:
+ result["state"] = data["state_detail"]["name"]
+
+ if "ticket_id" in data:
+ result["issue_key"] = data["ticket_id"]
+
+ elif profile == LODProfile.STANDARD:
+ # Standard: Default ticket read fields (issue_key, Name, Details, priority, labels, state)
+
+ if "ticket_id" in data:
+ result["issue_key"] = data["ticket_id"]
+ if "name" in data:
+ result["name"] = data["name"]
+
+ # Description (convert HTML to markdown)
+ if "description_html" in data and isinstance(data["description_html"], str):
+ result["description"] = _clean_html(data["description_html"])
+ elif "description" in data:
+ result["description"] = data["description"]
+
+ if "priority" in data:
+ result["priority"] = data["priority"]
+
+ if "labels" in data and isinstance(data["labels"], list):
+ result["labels"] = [
+ label.get("name") if isinstance(label, dict) and "name" in label else label
+ for label in data["labels"]
+ ]
+
+ # State mapping
+ if "state" in data:
+ if isinstance(data["state"], dict) and "name" in data["state"]:
+ result["state"] = data["state"]["name"]
+ else:
+ result["state"] = data["state"]
+
+ # Backup state check
+ if "state" not in result and "state_detail" in data and isinstance(data["state_detail"], dict):
+ result["state"] = data["state_detail"].get("name")
+
+ elif profile == LODProfile.FULL:
+ result = data.copy()
+ if "description_html" in result and isinstance(result["description_html"], str):
+ result["description"] = _clean_html(result["description_html"])
+ del result["description_html"]
+ # Alias the internal ticket_id intermediate to the canonical issue_key
+ if "ticket_id" in result:
+ result["issue_key"] = result.pop("ticket_id")
+
+ return result
+
+def apply_lod(
+ data: dict | list | Any,
+ profile: LODProfile = LODProfile.SUMMARY,
+ project_identifier: str | None = None
+) -> dict | list:
+ """
+ Applies the LOD filter to a dictionary, list of dictionaries, or Pydantic model
+ and returns a clean JSON-serializable structure.
+ """
+ # Convert Pydantic models or objects to dict
+ if hasattr(data, "model_dump"):
+ try:
+ data = data.model_dump(mode='json')
+ except TypeError:
+ data = data.model_dump()
+ elif hasattr(data, "dict"):
+ data = data.dict()
+ elif hasattr(data, "__dict__"):
+ data = data.__dict__
+
+ filtered_data = None
+ if isinstance(data, list):
+ filtered_data = [
+ _apply_lod_to_dict(
+ item.model_dump(mode='json') if hasattr(item, "model_dump") else (
+ item.dict() if hasattr(item, "dict") else item
+ ),
+ profile,
+ project_identifier
+ ) if hasattr(item, "model_dump") or hasattr(item, "dict") or isinstance(item, dict) else item
+ for item in data
+ ]
+ elif isinstance(data, dict):
+ filtered_data = _apply_lod_to_dict(data, profile, project_identifier)
+ else:
+ filtered_data = data
+
+ return filtered_data
diff --git a/plane_mcp/journey/server.py b/plane_mcp/journey/server.py
new file mode 100644
index 0000000..c6db204
--- /dev/null
+++ b/plane_mcp/journey/server.py
@@ -0,0 +1,95 @@
+"""Plane MCP Server implementation."""
+
+import os
+
+from fastmcp import FastMCP
+from key_value.aio.stores.memory import MemoryStore
+from key_value.aio.stores.redis import RedisStore
+from mcp.types import Icon
+
+from plane_mcp.auth import PlaneHeaderAuthProvider, PlaneOAuthProvider
+from plane_mcp.journey.tools import register_tools
+
+
+def get_oauth_mcp(base_path: str = "/"):
+ import logging
+
+ logger = logging.getLogger(__name__)
+
+ redis_host = os.getenv("REDIS_HOST")
+ redis_port = os.getenv("REDIS_PORT")
+
+ if redis_host and redis_port:
+ try:
+ redis_port_int = int(redis_port)
+ except ValueError:
+ raise ValueError(
+ f"REDIS_PORT must be a valid integer, got '{redis_port}'. "
+ "Please set REDIS_PORT to a valid port number (e.g., 6379)."
+ ) from None
+ logger.info("Using Redis for token storage")
+ client_storage = RedisStore(host=redis_host, port=redis_port_int)
+ else:
+ logger.warning(
+ "Using in-memory storage - tokens will be lost on restart! "
+ "Set REDIS_HOST and REDIS_PORT for production."
+ )
+ client_storage = MemoryStore()
+
+ client_id = os.getenv("PLANE_OAUTH_PROVIDER_CLIENT_ID", "dummy_client_id")
+ client_secret = os.getenv("PLANE_OAUTH_PROVIDER_CLIENT_SECRET", "dummy_client_secret")
+ if client_id == "dummy_client_id" or client_secret == "dummy_client_secret":
+ logger.warning(
+ "OAuth provider is using placeholder credentials. "
+ "Set PLANE_OAUTH_PROVIDER_CLIENT_ID and PLANE_OAUTH_PROVIDER_CLIENT_SECRET for production."
+ )
+
+ # Initialize the MCP server
+ oauth_mcp = FastMCP(
+ "Plane Journey MCP Server",
+ icons=[Icon(src="https://plane.so/favicon.ico", alt="Plane Journey MCP Server")],
+ website_url="https://plane.so",
+ auth=PlaneOAuthProvider(
+ client_id=client_id,
+ client_secret=client_secret,
+ base_url=f"{os.getenv('PLANE_OAUTH_PROVIDER_BASE_URL', 'http://localhost:8211')}{base_path}",
+ plane_base_url=os.getenv("PLANE_BASE_URL", ""),
+ plane_internal_base_url=os.getenv("PLANE_INTERNAL_BASE_URL", ""),
+ client_storage=client_storage,
+ required_scopes=["read", "write"],
+ allowed_client_redirect_uris=[
+ # Localhost only for http (dynamic ports from MCP clients)
+ "http://localhost:*",
+ "http://localhost:*/*",
+ "http://127.0.0.1:*",
+ "http://127.0.0.1:*/*",
+ # Known MCP client custom protocol schemes
+ "cursor://*",
+ "vscode://*",
+ "vscode-insiders://*",
+ "windsurf://*",
+ "claude://*",
+ ],
+ ),
+ )
+ register_tools(oauth_mcp)
+ return oauth_mcp
+
+
+def get_header_mcp():
+ header_mcp = FastMCP(
+ "Plane MCP Server (header-http)",
+ auth=PlaneHeaderAuthProvider(
+ required_scopes=["read", "write"],
+ ),
+ )
+ register_tools(header_mcp)
+ return header_mcp
+
+
+def get_stdio_mcp():
+ stdio_mcp = FastMCP(
+ "Plane Journey MCP Server (stdio)",
+ )
+ register_tools(stdio_mcp)
+ return stdio_mcp
diff --git a/plane_mcp/journey/tools/__init__.py b/plane_mcp/journey/tools/__init__.py
new file mode 100644
index 0000000..cecce2a
--- /dev/null
+++ b/plane_mcp/journey/tools/__init__.py
@@ -0,0 +1,14 @@
+"""Journey tools initialization."""
+
+from fastmcp import FastMCP
+
+from .create_update import register_create_update_tools
+from .read import register_read_tools
+from .workflow import register_workflow_tools
+
+
+def register_tools(mcp: FastMCP) -> None:
+ """Register all journey tools with the MCP server."""
+ register_read_tools(mcp)
+ register_create_update_tools(mcp)
+ register_workflow_tools(mcp)
diff --git a/plane_mcp/journey/tools/create_update.py b/plane_mcp/journey/tools/create_update.py
new file mode 100644
index 0000000..15c18f5
--- /dev/null
+++ b/plane_mcp/journey/tools/create_update.py
@@ -0,0 +1,337 @@
+"""Create and Update tools for Journey Endpoint."""
+
+import logging
+
+from fastmcp import FastMCP
+from plane.errors.errors import HttpError
+from plane.models.cycles import CreateCycle
+from plane.models.labels import CreateLabel
+from plane.models.work_items import CreateWorkItem, UpdateWorkItem
+
+from plane_mcp.client import get_plane_client_context
+from plane_mcp.journey.base import JourneyBase, mcp_error_boundary
+from plane_mcp.journey.yaml_formatter import with_yaml
+from plane_mcp.resolver import EntityResolver
+from plane_mcp.sanitize import sanitize_html
+
+logger = logging.getLogger(__name__)
+
+MAX_NEW_LABELS_PER_REQUEST = 3
+
+
+class CreateUpdateJourney(JourneyBase):
+ def _resolve_or_create_labels(self, project_id: str, label_names: list[str]) -> list[str]:
+ client, workspace_slug = get_plane_client_context()
+ existing = client.labels.list(workspace_slug=workspace_slug, project_id=project_id).results
+ name_to_id = {label.name.lower(): label.id for label in existing if label.name}
+
+ new_labels_needed = [n for n in label_names if n.lower() not in name_to_id]
+ if len(new_labels_needed) > MAX_NEW_LABELS_PER_REQUEST:
+ raise ValueError(
+ f"Too many new labels requested ({len(new_labels_needed)}). "
+ f"Maximum {MAX_NEW_LABELS_PER_REQUEST} new labels can be auto-created per request. "
+ f"Unknown labels: {new_labels_needed}"
+ )
+
+ label_ids = []
+ for name in label_names:
+ key = name.lower()
+ if key in name_to_id:
+ label_ids.append(name_to_id[key])
+ else:
+ new_label = client.labels.create(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ data=CreateLabel(name=name, color="#000000")
+ )
+ label_ids.append(new_label.id)
+ name_to_id[key] = new_label.id
+ return label_ids
+
+ def _resolve_or_create_cycle(self, project_id: str, cycle_name: str) -> str | None:
+ client, workspace_slug = get_plane_client_context()
+ try:
+ existing = client.cycles.list(workspace_slug=workspace_slug, project_id=project_id).results
+ for c in existing:
+ if c.name and c.name.lower() == cycle_name.lower():
+ return c.id
+
+ # Create missing cycle
+ me = client.users.get_me()
+ user_id = me.id if hasattr(me, "id") else None
+
+ import datetime
+ start_dt = datetime.date.today().isoformat()
+ end_dt = (datetime.date.today() + datetime.timedelta(days=14)).isoformat()
+ new_cycle = client.cycles.create(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ data=CreateCycle(
+ name=cycle_name, project_id=project_id, owned_by=user_id, start_date=start_dt, end_date=end_dt
+ )
+ )
+ return new_cycle.id
+ except HttpError as e:
+ if getattr(e, "status_code", getattr(getattr(e, "response", None), "status_code", None)) == 400:
+ logger.warning(
+ f"Cycles appear to be disabled for project {project_id}. Skipping cycle lookup/creation."
+ )
+ return None
+ raise
+
+ def create_ticket(
+ self,
+ title: str,
+ project_slug: str,
+ description: str | None = None,
+ state_name: str | None = None,
+ labels: list[str] | None = None,
+ cycle_name: str | None = None
+ ) -> dict:
+
+ if project_slug.lower() == 'help':
+ from plane_mcp.journey.cache import get_cached_workspace_context
+ ctx = get_cached_workspace_context(0)
+
+ raw_stickies = ctx.get("stickies", [])
+ processed_stickies = []
+ for s in raw_stickies:
+ desc = s.get("description_stripped") or s.get("description") or ""
+ if desc:
+ processed_stickies.append(desc)
+
+ llm_content = {
+ "projects": ctx.get("projects", []),
+ "priorities": ctx.get("priorities", []),
+ "stickies": processed_stickies
+ }
+
+ return llm_content
+ project_id = self.resolver.resolve_project(project_slug)
+ client, workspace_slug = get_plane_client_context()
+
+ state_id = self.resolver.resolve_state(project_slug, state_name) if state_name else None
+ label_ids = self._resolve_or_create_labels(project_id, labels) if labels else None
+
+ cycle_id = None
+ if cycle_name:
+ cycle_id = self._resolve_or_create_cycle(project_id, cycle_name)
+
+ data = CreateWorkItem(
+ name=title,
+ description_html=sanitize_html(description),
+ state=state_id,
+ labels=label_ids
+ )
+
+ new_ticket = client.work_items.create(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ data=data
+ )
+
+ key = f"{project_slug}-{new_ticket.sequence_id}"
+
+ if cycle_id and new_ticket.id:
+ try:
+ client.cycles.add_work_items(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ cycle_id=cycle_id,
+ issue_ids=[new_ticket.id]
+ )
+ except Exception as e:
+ logger.warning("Created %s but failed to add it to cycle %s: %s", key, cycle_id, e)
+ return {
+ "issue_key": key,
+ "status": "warning",
+ "message": "Ticket created successfully, but adding it to the cycle failed. The ticket exists.",
+ }
+
+ return {"issue_key": key}
+
+ def update_ticket(
+ self,
+ ticket_id: str,
+ new_title: str | None = None,
+ append_text: str | None = None,
+ append_after_snippet: str | None = None,
+ replace_text: str | None = None,
+ replace_target_snippet: str | None = None,
+ comment: str | None = None
+ ) -> dict:
+ work_item_id = self.resolver.resolve_ticket(ticket_id)
+ project_identifier, _ = self.parse_ticket_id(ticket_id)
+ project_id = self.resolver.resolve_project(project_identifier)
+
+ client, workspace_slug = get_plane_client_context()
+
+ # Retrieve using internal _get to bypass Pydantic ValidationError when
+ # the API returns label UUIDs as strings instead of Label objects
+ current = client.work_items._get(
+ f"{workspace_slug}/projects/{project_id}/work-items/{work_item_id}"
+ )
+
+ title_changed = False
+ final_title = current.get("name", "")
+ if new_title and new_title != current.get("name"):
+ final_title = new_title
+ title_changed = True
+
+ desc_changed = False
+ final_desc = current.get("description_html") or ""
+
+ if replace_text is not None:
+ if not replace_target_snippet:
+ return {
+ "status": "error",
+ "message": (
+ "You provided 'replace_text' but did not provide 'replace_target_snippet' "
+ "to specify exactly which text to replace."
+ )
+ }
+
+ occurrences = final_desc.count(replace_target_snippet)
+ if occurrences == 0:
+ return {
+ "status": "error",
+ "message": (
+ f"The snippet '{replace_target_snippet}' was not found in the description. "
+ "Ensure you matched the exact text, spaces, and casing."
+ )
+ }
+ if occurrences > 1:
+ return {
+ "status": "error",
+ "message": (
+ f"The snippet '{replace_target_snippet}' matched multiple times. "
+ "Please provide a longer, more specific snippet to uniquely identify the text to replace."
+ )
+ }
+
+ final_desc = final_desc.replace(replace_target_snippet, replace_text)
+ desc_changed = True
+
+ if append_text is not None:
+ if append_after_snippet:
+ occurrences = final_desc.count(append_after_snippet)
+ if occurrences == 0:
+ return {
+ "status": "error",
+ "message": f"The snippet '{append_after_snippet}' was not found in the description. "
+ "Ensure you matched the exact text."
+ }
+ if occurrences > 1:
+ return {
+ "status": "error",
+ "message": f"The snippet '{append_after_snippet}' matched multiple times. "
+ "Please provide a longer snippet."
+ }
+
+ final_desc = final_desc.replace(append_after_snippet,
+ f"{append_after_snippet}
{append_text}")
+ else:
+ if final_desc:
+ final_desc = f"{final_desc}
{append_text}"
+ else:
+ final_desc = append_text
+ desc_changed = True
+
+ if title_changed or desc_changed:
+ data = UpdateWorkItem(
+ name=final_title,
+ description_html=sanitize_html(final_desc)
+ )
+
+ client.work_items.update(
+ workspace_slug=workspace_slug,
+ project_id=project_id,
+ work_item_id=work_item_id,
+ data=data
+ )
+
+ if comment:
+ from plane.models.work_items import CreateWorkItemComment
+ safe_comment = sanitize_html(f"
{comment}
") + client.work_items.comments.create( + workspace_slug=workspace_slug, + project_id=project_id, + work_item_id=work_item_id, + data=CreateWorkItemComment(comment_html=safe_comment) + ) + + if not title_changed and not desc_changed and not comment: + return {"status": "warning", "message": "No changes were provided to update_ticket."} + + return {"issue_key": ticket_id, "status": "success", "message": "Ticket updated successfully."} + +def register_create_update_tools(mcp: FastMCP) -> None: + def create_ticket( + title: str, + project_slug: str, + description: str | None = None, + state_name: str | None = None, + labels: list[str] | None = None, + cycle_name: str | None = None, + ) -> dict: + client, workspace_slug = get_plane_client_context() + resolver = EntityResolver(client, workspace_slug) + journey = CreateUpdateJourney(resolver) + raw_data = journey.create_ticket(title=title, project_slug=project_slug, description=description, + state_name=state_name, labels=labels, cycle_name=cycle_name) + + if project_slug.lower() == 'help': + return raw_data + + + return raw_data + create_ticket.__doc__ = """ + Create a new ticket with automatic resolution of labels and cycles. + Missing labels or cycles will be automatically created. + If state_name is not provided, the ticket is placed in the default Backlog state. + + Args: + title: Title of the ticket. + project_slug: The Plane project identifier (e.g., 'PLANE'). To discover valid project slugs, + call this tool with project_slug='help'. + description: Optional detailed markdown description of the ticket. + state_name: Optional initial state name (e.g., 'Todo', 'In Progress'). Defaults to Backlog. + labels: List of label names. + cycle_name: Name of the cycle to add this ticket to. + """ + create_ticket = mcp.tool()(with_yaml(mcp_error_boundary(create_ticket))) + + @mcp.tool() + @with_yaml + @mcp_error_boundary + def update_ticket( + ticket_id: str, + new_title: str | None = None, + append_text: str | None = None, + append_after_snippet: str | None = None, + replace_text: str | None = None, + replace_target_snippet: str | None = None, + comment: str | None = None + ) -> dict: + """ + Update a ticket's title, description, or add a comment. Features smart targeting to avoid JSON escaping errors. + + Args: + ticket_id: The globally unique, human-readable identifier (e.g., ENG-123). + new_title: Completely replaces the ticket title. + append_text: Text to append to the description. + append_after_snippet: If populated, 'append_text' will be inserted immediately after this exact snippet. + If blank, 'append_text' is added to the very end of the description. + replace_text: The new text that will replace a snippet. + replace_target_snippet: The exact existing text to replace. Required if 'replace_text' is used. + comment: Adds a new comment to the ticket thread. + """ + client, workspace_slug = get_plane_client_context() + resolver = EntityResolver(client, workspace_slug) + journey = CreateUpdateJourney(resolver) + raw_data = journey.update_ticket( + ticket_id, new_title, append_text, append_after_snippet, replace_text, replace_target_snippet, comment + ) + + + return raw_data diff --git a/plane_mcp/journey/tools/read.py b/plane_mcp/journey/tools/read.py new file mode 100644 index 0000000..56c1aef --- /dev/null +++ b/plane_mcp/journey/tools/read.py @@ -0,0 +1,297 @@ +"""Read tools for Journey Endpoint.""" + +from typing import Literal + +from fastmcp import FastMCP +from plane.models.query_params import RetrieveQueryParams + +from plane_mcp.client import get_plane_client_context +from plane_mcp.journey.base import JourneyBase, mcp_error_boundary +from plane_mcp.journey.lod import LODProfile +from plane_mcp.journey.yaml_formatter import with_yaml +from plane_mcp.resolver import EntityResolver + + +class ReadJourney(JourneyBase): + def search_tickets( + self, + project_slug: str, + query: str | None = None, + labels: list[str] | None = None, + priority: list[str] | None = None, + states: list[str] | None = None, + assignees: list[str] | None = None, + limit: int = 50, + cursor: str | None = None, + lod: Literal["summary", "standard", "full"] = "standard" + ) -> dict: + import json + + + if project_slug.lower() == 'help': + from plane_mcp.journey.cache import get_cached_workspace_context + opts = get_cached_workspace_context(0).copy() + + raw_stickies = opts.get("stickies", []) + processed_stickies = [] + for s in raw_stickies: + desc = s.get("description_stripped") or s.get("description") or "" + if desc: + processed_stickies.append(desc) + + llm_content = { + "projects": opts.get("projects", []), + "priorities": opts.get("priorities", []), + "stickies": processed_stickies + } + + + return llm_content + + project_id = self.resolver.resolve_project(project_slug) + client, workspace_slug = get_plane_client_context() + + if limit <= 0: + return {"results": [], "next_cursor": None, "prev_cursor": None} + limit = min(limit, 100) + per_page = limit + query_params = { + "per_page": per_page, + "expand": "assignees,labels,state", + } + + if cursor: + query_params["cursor"] = cursor + if priority: + query_params["priority"] = ",".join([p.lower() for p in priority]) + + if states: + state_ids = [self.resolver.resolve_state(project_slug, s) for s in states] + query_params["state"] = ",".join(state_ids) + + unresolved_labels: list[str] = [] + if labels: + try: + existing_labels = client.labels.list(workspace_slug=workspace_slug, project_id=project_id).results + name_to_id = {label.name.lower(): label.id for label in existing_labels if label.name} + label_ids = [] + for name in labels: + if name.lower() in name_to_id: + label_ids.append(name_to_id[name.lower()]) + else: + unresolved_labels.append(name) + if label_ids: + query_params["labels"] = ",".join(label_ids) + except Exception as e: + import logging + logging.getLogger(__name__).warning(f"Label resolution failed, returning empty results: {e}") + return { + "results": [], + "next_cursor": None, + "prev_cursor": None, + "warnings": [ + f"Label filter could not be applied due to an error: {e}. " + "Please retry or check your label names." + ], + } + + if assignees: + assignee_ids = [] + for a in assignees: + if a.lower() == "me": + try: + me = client.users.get_me() + if hasattr(me, "id"): + assignee_ids.append(me.id) + except Exception: + pass + if assignee_ids: + query_params["assignees"] = ",".join(assignee_ids) + + from plane.models.work_items import PaginatedWorkItemResponse + + matched_results = [] + current_cursor = cursor + next_cursor_to_return = None + + query_lower = query.lower() if query else None + + max_pages_to_fetch = 5 + pages_fetched = 0 + + + # Loop to pull pages and deep-search until we hit the requested limit or run out of pages + while len(matched_results) < limit and pages_fetched < max_pages_to_fetch: + if current_cursor: + query_params["cursor"] = current_cursor + + response = client.work_items._get( + f"{workspace_slug}/projects/{project_id}/work-items", + params=query_params + ) + pages_fetched += 1 + + paginated = PaginatedWorkItemResponse.model_validate(response) + + for item in paginated.results: + if query_lower: + # Deep JSON string match + item_json = json.dumps(item.model_dump(), default=str).lower() + if query_lower in item_json: + matched_results.append(item) + else: + matched_results.append(item) + + # Advance cursor only after a full page is consumed — keeps page boundaries aligned + next_cursor_to_return = getattr(paginated, "next_cursor", None) + if not next_cursor_to_return: + break + current_cursor = next_cursor_to_return + + try: + profile = LODProfile(lod) + except ValueError: + profile = LODProfile.STANDARD + + transformed_results = self.apply_lod(matched_results[:limit], profile=profile, project_identifier=project_slug) + + result = { + "results": transformed_results, + "next_cursor": next_cursor_to_return, + "prev_cursor": paginated.prev_cursor if hasattr(paginated, "prev_cursor") else None + } + if unresolved_labels: + result["warnings"] = [f"Label not found and filter was skipped: {', '.join(unresolved_labels)}"] + return result + + def read_ticket( + self, ticket_id: str, lod: Literal["summary", "standard", "full"] = "standard", comments: bool = False + ) -> dict: + work_item_id = self.resolver.resolve_ticket(ticket_id) + project_identifier, _issue_sequence = self.parse_ticket_id(ticket_id) + project_id = self.resolver.resolve_project(project_identifier) + + client, workspace_slug = get_plane_client_context() + + params = RetrieveQueryParams(expand="assignees,labels,state") + + result = client.work_items.retrieve( + workspace_slug=workspace_slug, + project_id=project_id, + work_item_id=work_item_id, + params=params + ) + + try: + profile = LODProfile(lod) + except ValueError: + profile = LODProfile.STANDARD + + transformed = self.apply_lod(result, profile=profile, project_identifier=project_identifier) + + if comments: + try: + comments_resp = client.work_items.comments.list( + workspace_slug=workspace_slug, + project_id=project_id, + work_item_id=work_item_id + ) + formatted_comments = [] + for c in comments_resp.results: + date_str = str(c.created_at)[:10] if hasattr(c, 'created_at') and c.created_at else "YYYY-MM-DD" + + username = "user" + # Try to extract the best available identifier + if hasattr(c, 'actor_detail') and c.actor_detail: + username = getattr( + c.actor_detail, 'display_name', getattr(c.actor_detail, 'username', username) + ) + elif hasattr(c, 'actor') and c.actor: + username = str(c.actor) + + text = getattr(c, 'comment_stripped', '') or getattr(c, 'comment_html', '') + formatted_comments.append(f"\n{date_str}-@{username}:\n{text}") + + if formatted_comments: + transformed["comments"] = "".join(formatted_comments) + except Exception as e: + import logging + logging.getLogger(__name__).warning(f"Failed to fetch comments for {ticket_id}: {e}") + + return transformed + + +def register_read_tools(mcp: FastMCP) -> None: + + def search_tickets( + project_slug: str, + query: str | None = None, + labels: list[str] | None = None, + priority: list[str] | None = None, + states: list[str] | None = None, + assignees: list[str] | None = None, + limit: int = 50, + cursor: str | None = None, + lod: Literal["summary", "standard", "full"] = "standard" + ) -> dict: + client, workspace_slug = get_plane_client_context() + resolver = EntityResolver(client, workspace_slug) + journey = ReadJourney(resolver) + raw_data = journey.search_tickets(project_slug, query, labels, priority, states, assignees, limit, cursor, lod) + + if project_slug.lower() == 'help': + return raw_data + + # Strip HTML tags from description fields in-place for clean human+machine output + import re + _html_re = re.compile(r'<[^>]+') + for item in raw_data.get("results", []): + if item.get("description"): + item["description"] = " ".join(_html_re.sub('', item["description"]).split()).strip() + + return raw_data + + search_tickets.__doc__ = """ + Search for issues. You can use standard filters or a text query. + If the desired result is not in the current page, call again with the provided next_cursor. + + Args: + project_slug: The Plane project identifier (e.g., 'PLANE' or 'TEST'). To discover valid project slugs, + states, and labels, call this tool with project_slug='help'. + query: Free-form text search query. + labels: List of label names to filter by (e.g., ['bug', 'feature']). + priority: List of priorities to filter by (e.g., ['urgent', 'high', 'medium', 'low', 'none']). + states: List of state names to filter by (e.g., ['In Progress', 'Backlog', 'Done']). + assignees: List of usernames or 'me' to filter by. + limit: Maximum number of results to return (max 100). Default is 50. + cursor: Pagination cursor for getting the next set of results. + lod: Level of Detail profile ("summary", "standard", or "full"). Default is "standard". + """ + search_tickets = mcp.tool()(with_yaml(mcp_error_boundary(search_tickets))) + + @mcp.tool() + @with_yaml + @mcp_error_boundary + def read_ticket( + ticket_id: str, lod: Literal["summary", "standard", "full"] = "standard", comments: bool = False + ) -> dict: + """ + Read the details of a single ticket. + + Args: + ticket_id: The globally unique, human-readable identifier (e.g., ENG-123). The system automatically + resolves the project and issue routing from this prefix. + lod: Level of Detail profile ("summary", "standard", "full"). Default is "standard". + comments: If true, fetches and appends the ticket's comments to the result. + """ + client, workspace_slug = get_plane_client_context() + resolver = EntityResolver(client, workspace_slug) + journey = ReadJourney(resolver) + raw_data = journey.read_ticket(ticket_id, lod, comments) + + # Strip HTML tags from description, preserving newlines and Markdown structure + import re + if raw_data.get("description"): + raw_data["description"] = re.sub(r'<[^>]+>', '', raw_data["description"]).strip() + + return raw_data diff --git a/plane_mcp/journey/tools/workflow.py b/plane_mcp/journey/tools/workflow.py new file mode 100644 index 0000000..b106e0d --- /dev/null +++ b/plane_mcp/journey/tools/workflow.py @@ -0,0 +1,233 @@ +"""Workflow tools for Journey Endpoint.""" + +import logging +from collections import defaultdict +from datetime import datetime, timedelta + +from fastmcp import FastMCP +from plane.errors.errors import HttpError +from plane.models.cycles import CreateCycle +from plane.models.work_items import CreateWorkItemComment, UpdateWorkItem + +from plane_mcp.client import get_plane_client_context +from plane_mcp.journey.base import JourneyBase, mcp_error_boundary +from plane_mcp.journey.lod import LODProfile +from plane_mcp.journey.yaml_formatter import with_yaml +from plane_mcp.resolver import EntityResolutionError, EntityResolver +from plane_mcp.sanitize import sanitize_html + +logger = logging.getLogger(__name__) + +class WorkflowJourney(JourneyBase): + def _resolve_or_create_cycle(self, project_id: str, cycle_name: str) -> str | None: + client, workspace_slug = get_plane_client_context() + try: + existing = client.cycles.list(workspace_slug=workspace_slug, project_id=project_id).results + for c in existing: + if c.name and c.name.lower() == cycle_name.lower(): + return c.id + + me = client.users.get_me() + user_id = me.id if hasattr(me, "id") else None + + start_date = datetime.now() + end_date = start_date + timedelta(days=14) + + new_cycle = client.cycles.create( + workspace_slug=workspace_slug, + project_id=project_id, + data=CreateCycle( + name=cycle_name, + project_id=project_id, + owned_by=user_id, + start_date=start_date.strftime("%Y-%m-%d"), + end_date=end_date.strftime("%Y-%m-%d") + ) + ) + return new_cycle.id + except HttpError as e: + # If the project has the Cycles module disabled, the API returns 400 Bad Request. + # We catch this so the rest of the workflow (like status transitions) can still proceed. + if getattr(e, "status_code", getattr(getattr(e, "response", None), "status_code", None)) == 400: + logger.warning(f"Cycles appear to be disabled for project {project_id}. Skipping cycle creation.") + return None + raise + + def transition_ticket(self, ticket_id: str, state_name: str) -> dict: + work_item_id = self.resolver.resolve_ticket(ticket_id) + project_identifier, _ = self.parse_ticket_id(ticket_id) + project_id = self.resolver.resolve_project(project_identifier) + state_id = self.resolver.resolve_state(project_identifier, state_name) + + client, workspace_slug = get_plane_client_context() + + updated = client.work_items.update( + workspace_slug=workspace_slug, + project_id=project_id, + work_item_id=work_item_id, + data=UpdateWorkItem(state=state_id) + ) + + return self.apply_lod(updated, profile=LODProfile.SUMMARY, project_identifier=project_identifier) + + def begin_work(self, ticket_ids: list[str], cycle_name: str) -> dict: + """Batch operation to add multiple tickets to a cycle and potentially transition them.""" + client, workspace_slug = get_plane_client_context() + + # Group tickets by project + project_to_tickets = defaultdict(list) + for t_id in ticket_ids: + proj_id_str, _ = self.parse_ticket_id(t_id) + w_id = self.resolver.resolve_ticket(t_id) + project_to_tickets[proj_id_str].append(w_id) + + results = {} + for proj_identifier, w_ids in project_to_tickets.items(): + proj_id = self.resolver.resolve_project(proj_identifier) + cycle_id = self._resolve_or_create_cycle(proj_id, cycle_name) + + msg = f"Processed {len(w_ids)} tickets." + + if cycle_id: + # Add all tickets in this project to the cycle; non-fatal on failure + try: + client.cycles.add_work_items( + workspace_slug=workspace_slug, + project_id=proj_id, + cycle_id=cycle_id, + issue_ids=w_ids + ) + msg += f" Added to cycle '{cycle_name}'." + except Exception as e: + logger.warning( + "Failed to add tickets to cycle '%s' for project %s: %s", cycle_name, proj_identifier, e + ) + msg += f" Warning: tickets processed but could not be added to cycle '{cycle_name}'." + else: + msg += " Note: Cycles are disabled for this project, skipped cycle assignment." + + # Optionally transition them to In Progress if such state exists + try: + state_id = self.resolver.resolve_state(proj_identifier, "In Progress") + for w_id in w_ids: + client.work_items.update( + workspace_slug=workspace_slug, + project_id=proj_id, + work_item_id=w_id, + data=UpdateWorkItem(state=state_id) + ) + except EntityResolutionError: + # "In Progress" state not found in this project — skip silently, it's optional + pass + except Exception as e: + # Unexpected API error — log but don't fail the whole batch + logger.warning("State transition to 'In Progress' failed for project %s: %s", proj_identifier, e) + + results[proj_identifier] = msg + + return {"status": "success", "details": results} + + def complete_work(self, ticket_id: str, comment: str) -> dict: + work_item_id = self.resolver.resolve_ticket(ticket_id) + project_identifier, _ = self.parse_ticket_id(ticket_id) + project_id = self.resolver.resolve_project(project_identifier) + + client, workspace_slug = get_plane_client_context() + + # Add comment + safe_comment = sanitize_html(f"{comment}
") + client.work_items.comments.create( + workspace_slug=workspace_slug, + project_id=project_id, + work_item_id=work_item_id, + data=CreateWorkItemComment(comment_html=safe_comment) + ) + + # Try to transition to Done or Completed + state_id = None + for state_name in ["Done", "Completed"]: + try: + state_id = self.resolver.resolve_state(project_identifier, state_name) + break + except ValueError: + continue + + if state_id: + updated = client.work_items.update( + workspace_slug=workspace_slug, + project_id=project_id, + work_item_id=work_item_id, + data=UpdateWorkItem(state=state_id) + ) + + return self.apply_lod(updated, profile=LODProfile.SUMMARY, project_identifier=project_identifier) + + return { + "status": "partial", + "message": ( + "No workflow states found indicating a 'Done' or 'Completed' state found. " + "Call transition_ticket explicitly to close this ticket." + ) + } + + +def register_workflow_tools(mcp: FastMCP) -> None: + def transition_ticket(ticket_id: str, state_name: str) -> dict: + client, workspace_slug = get_plane_client_context() + resolver = EntityResolver(client, workspace_slug) + journey = WorkflowJourney(resolver) + raw_data = journey.transition_ticket(ticket_id, state_name) + + return raw_data + + transition_ticket.__doc__ = """ + Transition a ticket to a new state. + Use this primitive for granular edge-case routing, such as moving a ticket to Canceled, + Duplicate, or custom review states. + + Args: + ticket_id: The globally unique, human-readable identifier (e.g., ENG-123). + state_name: The name of the state to transition to (e.g. 'In Progress'). + """ + transition_ticket = mcp.tool()(with_yaml(mcp_error_boundary(transition_ticket))) + + @mcp.tool() + @with_yaml + @mcp_error_boundary + def begin_work(ticket_ids: list[str], cycle_name: str) -> dict: + """ + Add multiple tickets to a cycle (creating it if missing) and attempt to transition them to 'In Progress'. + Supports batch operations across multiple tickets and potentially multiple projects. + Use this macro as your primary method for standard workflow progression (starting work). + + Args: + ticket_ids: List of globally unique, human-readable identifiers (e.g. ['ENG-123', 'ENG-124']). + The system automatically resolves the project and issue routing from these prefixes. + cycle_name: The name of the cycle to add tickets to. If you are unsure, make your best logical guess. + """ + client, workspace_slug = get_plane_client_context() + resolver = EntityResolver(client, workspace_slug) + journey = WorkflowJourney(resolver) + raw_data = journey.begin_work(ticket_ids, cycle_name) + + return raw_data + @mcp.tool() + @with_yaml + @mcp_error_boundary + def complete_work(ticket_id: str, comment: str) -> dict: + """ + Add a completion comment to a ticket and attempt to transition it to a 'Done' or 'Completed' state. + Use this macro as your primary method for standard workflow progression (finishing work). + + Args: + ticket_id: The globally unique, human-readable identifier (e.g., ENG-123). The system + automatically resolves the project and issue routing from this prefix; no separate + project context is needed. + comment: The text to add as a comment. + """ + client, workspace_slug = get_plane_client_context() + resolver = EntityResolver(client, workspace_slug) + journey = WorkflowJourney(resolver) + raw_data = journey.complete_work(ticket_id, comment) + + return raw_data diff --git a/plane_mcp/journey/yaml_formatter.py b/plane_mcp/journey/yaml_formatter.py new file mode 100644 index 0000000..82fd721 --- /dev/null +++ b/plane_mcp/journey/yaml_formatter.py @@ -0,0 +1,139 @@ +"""YAML output formatter for Journey tools. + +Converts dict/list tool responses into a compact, LLM-friendly YAML format. + +Design decisions: +- Lists use flow-style (inline) to save tokens: [foo, bar, baz] +- Multiline strings use literal block scalar (|) for readability +- None, empty string, and empty list/dict values are stripped +- 2-space indentation (YAML default) +- No line-wrapping (width=1000) to prevent arbitrary breaks +- Values containing colons are forced-quoted for parser safety +- Values containing commas or spaces are auto-quoted by PyYAML + +Missing-field semantics: + Fields with None, "", [], or {} values are omitted from output. + Consumers should treat a missing field as empty/unset. + Falsy-but-meaningful values (False, 0) are preserved. + +Behavioral contracts: +- Input: any dict, list, or primitive returned by a Journey tool +- Output: YAML string for dict/list inputs; passthrough for primitives +- Failure mode: on any exception, returns original data unchanged (no data loss) +""" + +import logging +from collections.abc import Callable +from functools import wraps +from typing import Any, TypeVar, cast + +import yaml + +logger = logging.getLogger(__name__) + +T = TypeVar("T", bound=Callable[..., Any]) + + +class FlowList(list): + """A list subclass that YAML serializes in flow style: [a, b, c]. + + PyYAML's SafeDumper will render normal lists as block-style (- item). + By registering a custom representer for this subclass, we get inline + arrays that are more token-efficient for LLM consumption. + """ + + pass + + +def _flow_list_representer(dumper: yaml.SafeDumper, data: FlowList) -> yaml.Node: + return dumper.represent_sequence("tag:yaml.org,2002:seq", data, flow_style=True) + + +def _str_presenter(dumper: yaml.SafeDumper, data: str) -> yaml.Node: + """Use literal block scalar for multiline strings. + + Forces single-quoting for values containing colons to prevent + YAML parser ambiguity (e.g. cursor values like '3:-1:1'). + """ + if len(data.splitlines()) > 1: + return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|") + if ":" in data: + return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="'") + return dumper.represent_scalar("tag:yaml.org,2002:str", data) + + +yaml.SafeDumper.add_representer(FlowList, _flow_list_representer) +yaml.SafeDumper.add_representer(str, _str_presenter) + + +def _clean_and_convert(obj: Any) -> Any: + """Recursively strip empty values and convert lists to FlowList. + + Removes: None, "", [], {} + Preserves: False, 0, and other falsy-but-meaningful values. + """ + if isinstance(obj, dict): + cleaned = {} + for k, v in obj.items(): + val = _clean_and_convert(v) + if val not in (None, "", [], {}): + cleaned[k] = val + return cleaned + elif isinstance(obj, list): + cleaned_list = [_clean_and_convert(v) for v in obj] + is_primitive = all(not isinstance(item, (dict, list)) for item in cleaned_list) + if is_primitive: + return FlowList(cleaned_list) + return cleaned_list + return obj + + +def format_as_yaml(raw_data: Any) -> Any: + """Convert a dict or list to compact YAML string. + + Args: + raw_data: Tool output (typically dict or list). + + Returns: + YAML string if input is dict/list, original value otherwise. + On formatting failure, returns original data unchanged. + """ + if not isinstance(raw_data, (dict, list)): + return raw_data + + try: + cleaned_data = _clean_and_convert(raw_data) + yaml_str = yaml.safe_dump( + cleaned_data, + allow_unicode=True, + sort_keys=False, + default_flow_style=False, + width=1000, + ) + return yaml_str.strip() + except Exception as e: + logger.debug("YAML formatting failed: %s", e) + return raw_data + + +def with_yaml(func: T) -> T: + """Decorator that converts a tool's dict/list return value to YAML. + + Must be applied OUTSIDE (above) mcp_error_boundary so that error + dicts also get YAML-formatted. Stack order: + + @mcp.tool() + @with_yaml + @mcp_error_boundary + def my_tool(...) -> dict: ... + """ + + @wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + result = func(*args, **kwargs) + return format_as_yaml(result) + + if 'return' in wrapper.__annotations__: + wrapper.__annotations__['return'] = Any + + return cast(T, wrapper) diff --git a/plane_mcp/resolver.py b/plane_mcp/resolver.py new file mode 100644 index 0000000..7ff706f --- /dev/null +++ b/plane_mcp/resolver.py @@ -0,0 +1,168 @@ +"""Stateless EntityResolver for mapping human-readable identifiers to Plane UUIDs.""" + +import time + +from plane import PlaneClient +from plane.errors.errors import HttpError +from plane.models.query_params import PaginatedQueryParams + + +class EntityResolutionError(ValueError): + """Exception raised when an entity cannot be resolved, containing actionable options.""" + def __init__(self, message: str, available_options: list[str] | None = None): + super().__init__(message) + self.available_options = available_options or [] + + +# Global caches keyed by workspace_slug to prevent cross-workspace UUID pollution +# Structure: {workspace_slug: {identifier: uuid}} +_GLOBAL_PROJECT_CACHE: dict[str, dict[str, str]] = {} +_GLOBAL_STATE_CACHE: dict[str, dict[str, dict[str, str]]] = {} +_GLOBAL_WORK_ITEM_CACHE: dict[str, dict[str, str]] = {} +# Per-workspace timestamps: {workspace_slug: {"projects": float, "states": float, "work_items": float}} +_CACHE_LAST_UPDATED: dict[str, dict[str, float]] = {} +_CACHE_TTL_SECONDS = 300 # 5 minutes + +class EntityResolver: + """ + Stateless resolver for mapping human-readable identifiers to Plane UUIDs. + Implements global caching to prevent N+1 API calls and returns Actionable Errors + when an entity cannot be found. + """ + + def __init__(self, client: PlaneClient, workspace_slug: str): + self.client = client + self.workspace_slug = workspace_slug + + # Ensure per-workspace cache buckets exist + _GLOBAL_PROJECT_CACHE.setdefault(workspace_slug, {}) + _GLOBAL_STATE_CACHE.setdefault(workspace_slug, {}) + _GLOBAL_WORK_ITEM_CACHE.setdefault(workspace_slug, {}) + _CACHE_LAST_UPDATED.setdefault(workspace_slug, {"projects": 0.0, "states": 0.0, "work_items": 0.0}) + + self._project_cache = _GLOBAL_PROJECT_CACHE[workspace_slug] + self._state_cache = _GLOBAL_STATE_CACHE[workspace_slug] + self._work_item_cache = _GLOBAL_WORK_ITEM_CACHE[workspace_slug] + self._ts = _CACHE_LAST_UPDATED[workspace_slug] + + def resolve_project(self, identifier_or_slug: str) -> str: + """ + Resolve a project identifier (e.g. 'ENG') or slug to its UUID. + """ + key = identifier_or_slug.upper() + if key in self._project_cache and (time.time() - self._ts["projects"] < _CACHE_TTL_SECONDS): + return self._project_cache[key] + + # Fetch all projects to cache and find + try: + projects_resp = self.client.projects.list( + workspace_slug=self.workspace_slug, + params=PaginatedQueryParams(per_page=100) + ) + available = [] + for p in projects_resp.results: + self._project_cache[p.identifier.upper()] = p.id + self._project_cache[p.name.upper()] = p.id + available.append(p.identifier) + if hasattr(p, 'slug') and p.slug: + self._project_cache[p.slug.upper()] = p.id + + self._ts["projects"] = time.time() + + if key in self._project_cache: + return self._project_cache[key] + + raise EntityResolutionError( + f"Project '{identifier_or_slug}' not found. Available projects: {', '.join(available)}", + available_options=available + ) + except Exception as e: + if isinstance(e, EntityResolutionError): + raise + raise RuntimeError(f"Failed to fetch projects: {e!s}") from e + + + def resolve_state(self, project_identifier: str, state_name: str) -> str: + """ + Resolve a state name (e.g. 'In Progress') to its UUID for a specific project. + """ + project_id = self.resolve_project(project_identifier) + key = state_name.lower() + + if project_id in self._state_cache and key in self._state_cache[project_id] and (time.time() - self._ts["states"] < _CACHE_TTL_SECONDS): + return self._state_cache[project_id][key] + + try: + # We don't have the states in cache for this project, fetch them + states_resp = self.client.states.list( + workspace_slug=self.workspace_slug, + project_id=project_id + ) + + if project_id not in self._state_cache: + self._state_cache[project_id] = {} + + available = [] + for s in states_resp.results: + self._state_cache[project_id][s.name.lower()] = s.id + available.append(s.name) + + self._ts["states"] = time.time() + + if key in self._state_cache[project_id]: + return self._state_cache[project_id][key] + + raise EntityResolutionError( + f"State '{state_name}' not found for project '{project_identifier}'. Available states: {', '.join(available)}", + available_options=available + ) + except Exception as e: + if isinstance(e, EntityResolutionError): + raise + raise RuntimeError(f"Failed to fetch states for project {project_identifier}: {e!s}") from e + + def resolve_ticket(self, ticket_id: str) -> str: + """ + Resolve a ticket ID (e.g. 'ENG-123') to its work_item UUID. + """ + key = ticket_id.upper() + if key in self._work_item_cache and (time.time() - self._ts["work_items"] < _CACHE_TTL_SECONDS): + return self._work_item_cache[key] + + parts = key.split('-') + if len(parts) != 2: + raise ValueError(f"Invalid ticket ID format: '{ticket_id}'. Expected format like 'ENG-123'.") + + project_identifier = parts[0] + try: + issue_sequence = int(parts[1]) + except ValueError: + raise ValueError(f"Invalid ticket sequence in '{ticket_id}'. Must be an integer (e.g., 123).") from None + + # Resolve project to ensure the project exists and is valid + self.resolve_project(project_identifier) + + try: + # Bypass Pydantic validation by using internal _get, as the WorkItemDetail + # model currently has an issue with assignees parsing (list of str vs UserLite). + response = self.client.work_items._get( + f"{self.workspace_slug}/work-items/{project_identifier}-{issue_sequence}" + ) + work_item_id = response.get('id') + if not work_item_id: + raise EntityResolutionError(f"Ticket '{ticket_id}' not found.") + + self._work_item_cache[key] = work_item_id + self._ts["work_items"] = time.time() + return work_item_id + except HttpError as e: + if getattr(e, "status_code", None) == 404: + raise EntityResolutionError( + f"Ticket '{ticket_id}' not found.", + available_options=[] + ) from e + raise RuntimeError(f"Failed to retrieve ticket '{ticket_id}': {e!s}") from e + except Exception as e: + if isinstance(e, EntityResolutionError): + raise + raise RuntimeError(f"Failed to retrieve ticket '{ticket_id}': {e!s}") from e diff --git a/plane_mcp/sanitize.py b/plane_mcp/sanitize.py new file mode 100644 index 0000000..05526ce --- /dev/null +++ b/plane_mcp/sanitize.py @@ -0,0 +1,44 @@ +"""HTML sanitization utilities for Plane MCP Server. + +Provides safe HTML cleaning to prevent stored XSS attacks when accepting +user-provided HTML content (descriptions, comments) before sending to the +Plane API. +""" + +import nh3 + +ALLOWED_TAGS = { + "p", "br", "strong", "b", "em", "i", "u", "s", "del", + "h1", "h2", "h3", "h4", "h5", "h6", + "ul", "ol", "li", + "blockquote", "pre", "code", + "a", "img", + "table", "thead", "tbody", "tr", "th", "td", + "hr", "span", "div", "sub", "sup", +} + +ALLOWED_ATTRIBUTES = { + "a": {"href", "title", "target"}, + "img": {"src", "alt", "title", "width", "height"}, + "td": {"colspan", "rowspan"}, + "th": {"colspan", "rowspan"}, + "span": {"class"}, + "div": {"class"}, + "code": {"class"}, + "pre": {"class"}, +} + + +def sanitize_html(html: str | None) -> str | None: + if html is None: + return None + if not html: + return html + + return nh3.clean( + html, + tags=ALLOWED_TAGS, + attributes=ALLOWED_ATTRIBUTES, + link_rel="noopener noreferrer", + url_schemes={"http", "https", "mailto"}, + ) diff --git a/plane_mcp/tools/generated_core.py b/plane_mcp/tools/generated_core.py new file mode 100644 index 0000000..11e9e73 --- /dev/null +++ b/plane_mcp/tools/generated_core.py @@ -0,0 +1,276 @@ +"""Generated tools for Plane MCP Server.""" + +import uuid +from typing import Any + +import requests +from fastmcp import FastMCP + +from plane_mcp.client import get_plane_client_context + + +def register_core_generated_tools(mcp: FastMCP) -> None: + """Register generated core tools.""" + + @mcp.tool() + def get_workspace() -> dict[str, Any]: + """ + Get workspace details returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/".format(workspace_slug=workspace_slug) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("GET", url, headers=headers, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def update_workspace(data: dict[str, Any]) -> dict[str, Any]: + """ + Update workspace details returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + data: JSON payload + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/".format(workspace_slug=workspace_slug) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("PATCH", url, headers=headers, json=data, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def list_projects() -> dict[str, Any]: + """ + List all projects in a workspace returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/".format(workspace_slug=workspace_slug) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("GET", url, headers=headers, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def create_project(data: dict[str, Any]) -> dict[str, Any]: + """ + Create a project returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + data: JSON payload + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/".format(workspace_slug=workspace_slug) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("POST", url, headers=headers, json=data, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def get_project(project_id: uuid.UUID) -> dict[str, Any]: + """ + Get a specific project returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/".format(workspace_slug=workspace_slug, project_id=str(project_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("GET", url, headers=headers, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def list_issues(project_id: uuid.UUID) -> dict[str, Any]: + """ + List all issues in a project returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/issues/".format(workspace_slug=workspace_slug, project_id=str(project_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("GET", url, headers=headers, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def create_issue_raw(project_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]: + """ + Create an issue returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + data: JSON payload + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/issues/".format(workspace_slug=workspace_slug, project_id=str(project_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("POST", url, headers=headers, json=data, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def update_issue_raw(project_id: uuid.UUID, issue_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]: + """ + Update an issue returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + issue_id: UUID of the issue + data: JSON payload + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/issues/{issue_id}/".format(workspace_slug=workspace_slug, project_id=str(project_id), issue_id=str(issue_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("PATCH", url, headers=headers, json=data, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def delete_issue(project_id: uuid.UUID, issue_id: uuid.UUID) -> dict[str, Any]: + """ + Delete an issue returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + issue_id: UUID of the issue + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/issues/{issue_id}/".format(workspace_slug=workspace_slug, project_id=str(project_id), issue_id=str(issue_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("DELETE", url, headers=headers, timeout=client.config.timeout) + response.raise_for_status() + return {"status": "deleted"} if response.status_code == 204 else response.json() diff --git a/plane_mcp/tools/generated_metadata.py b/plane_mcp/tools/generated_metadata.py new file mode 100644 index 0000000..6c2346b --- /dev/null +++ b/plane_mcp/tools/generated_metadata.py @@ -0,0 +1,341 @@ +"""Generated tools for Plane MCP Server.""" + +import uuid +from typing import Any + +import requests +from fastmcp import FastMCP + +from plane_mcp.client import get_plane_client_context + + +def register_metadata_generated_tools(mcp: FastMCP) -> None: + """Register generated metadata tools.""" + + @mcp.tool() + def list_states(project_id: uuid.UUID) -> dict[str, Any]: + """ + List all states in a project returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/states/".format(workspace_slug=workspace_slug, project_id=str(project_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("GET", url, headers=headers, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def create_state(project_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]: + """ + Create a state returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + data: JSON payload + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/states/".format(workspace_slug=workspace_slug, project_id=str(project_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("POST", url, headers=headers, json=data, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def update_state(project_id: uuid.UUID, state_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]: + """ + Update a state returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + state_id: UUID of the state + data: JSON payload + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/states/{state_id}/".format(workspace_slug=workspace_slug, project_id=str(project_id), state_id=str(state_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("PATCH", url, headers=headers, json=data, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def list_labels(project_id: uuid.UUID) -> dict[str, Any]: + """ + List all labels in a project returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/labels/".format(workspace_slug=workspace_slug, project_id=str(project_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("GET", url, headers=headers, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def create_label(project_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]: + """ + Create a label returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + data: JSON payload + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/labels/".format(workspace_slug=workspace_slug, project_id=str(project_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("POST", url, headers=headers, json=data, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def delete_label(project_id: uuid.UUID, label_id: uuid.UUID) -> dict[str, Any]: + """ + Delete a label returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + label_id: UUID of the label + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/labels/{label_id}/".format(workspace_slug=workspace_slug, project_id=str(project_id), label_id=str(label_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("DELETE", url, headers=headers, timeout=client.config.timeout) + response.raise_for_status() + return {"status": "deleted"} if response.status_code == 204 else response.json() + + @mcp.tool() + def list_cycles(project_id: uuid.UUID) -> dict[str, Any]: + """ + List all cycles in a project returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/cycles/".format(workspace_slug=workspace_slug, project_id=str(project_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("GET", url, headers=headers, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def create_cycle(project_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]: + """ + Create a cycle returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + data: JSON payload + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/cycles/".format(workspace_slug=workspace_slug, project_id=str(project_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("POST", url, headers=headers, json=data, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def update_cycle(project_id: uuid.UUID, cycle_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]: + """ + Update a cycle returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + cycle_id: UUID of the cycle + data: JSON payload + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/cycles/{cycle_id}/".format(workspace_slug=workspace_slug, project_id=str(project_id), cycle_id=str(cycle_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("PATCH", url, headers=headers, json=data, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def list_modules(project_id: uuid.UUID) -> dict[str, Any]: + """ + List all modules in a project returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/modules/".format(workspace_slug=workspace_slug, project_id=str(project_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("GET", url, headers=headers, timeout=client.config.timeout) + response.raise_for_status() + return response.json() + + @mcp.tool() + def create_module(project_id: uuid.UUID, data: dict[str, Any]) -> dict[str, Any]: + """ + Create a module returning raw JSON. + + Args: + workspace_slug: Workspace slug (injected by context) + project_id: UUID of the project + data: JSON payload + + Returns: + Raw JSON response from Plane API + """ + client, workspace_slug = get_plane_client_context() + + url = f"{client.config.base_path}{client.projects.base_path}".replace("/workspaces", "") + f"/api/v1/workspaces/{workspace_slug}/projects/{project_id}/modules/".format(workspace_slug=workspace_slug, project_id=str(project_id)) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if client.config.access_token: + headers["Authorization"] = f"Bearer {client.config.access_token}" + elif client.config.api_key: + headers["x-api-key"] = client.config.api_key + + response = requests.request("POST", url, headers=headers, json=data, timeout=client.config.timeout) + response.raise_for_status() + return response.json() diff --git a/pyproject.toml b/pyproject.toml index 6831299..fba9c20 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,8 @@ dependencies = [ "mcp==1.26.0", "PyJWT>=2.12.0", "authlib>=1.6.9", + "markdownify>=0.14.1", + "nh3>=0.2.17", # pydocket 0.16.6 (pinned by fastmcp 2.14.4) imports `FakeConnection` from # `fakeredis.aioredis`, which was removed in fakeredis 2.35.0. "fakeredis[lua]>=2.32.1,<2.35.0", diff --git a/tests/conftest.py b/tests/conftest.py index 58439a1..e28d262 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1 +1,14 @@ """Pytest configuration for Plane MCP Server tests.""" + +import pytest + +import plane_mcp.resolver + + +@pytest.fixture(autouse=True) +def clear_resolver_caches(): + """Clear global caches before each test to prevent test state leakage.""" + plane_mcp.resolver._GLOBAL_PROJECT_CACHE.clear() + plane_mcp.resolver._GLOBAL_STATE_CACHE.clear() + plane_mcp.resolver._GLOBAL_WORK_ITEM_CACHE.clear() + plane_mcp.resolver._CACHE_LAST_UPDATED.clear() diff --git a/tests/e2e_pagination_test.py b/tests/e2e_pagination_test.py new file mode 100644 index 0000000..3e24820 --- /dev/null +++ b/tests/e2e_pagination_test.py @@ -0,0 +1,154 @@ +""" +E2E test for pagination cursor fix in search_tickets. + +Verifies that: +1. Basic search against TEST project works with the live API +2. Pagination cursor is correct — requesting page 2 doesn't repeat page 1 results +3. per_page=limit is respected (not hard-coded 100) +4. Label filter exception returns LLM-friendly response rather than widening search +""" + +import os + +import pytest + +from plane_mcp.client import get_plane_client_context +from plane_mcp.journey.tools.read import ReadJourney +from plane_mcp.resolver import EntityResolver + +pytestmark = pytest.mark.e2e + + +@pytest.fixture +def e2e_journey(): + if not (os.getenv("PLANE_API_KEY") and os.getenv("PLANE_WORKSPACE_SLUG")): + pytest.skip("E2E test requires PLANE_API_KEY and PLANE_WORKSPACE_SLUG env vars") + client, workspace_slug = get_plane_client_context() + resolver = EntityResolver(client, workspace_slug) + return ReadJourney(resolver) + + +def test_basic_search_returns_results(e2e_journey): + """Basic sanity: search on TEST project returns results.""" + result = e2e_journey.search_tickets(project_slug="TEST", limit=5, lod="summary") + assert "results" in result, f"Unexpected response shape: {result}" + print(f"\n[E2E] Basic search: {len(result['results'])} results returned") + assert isinstance(result["results"], list) + + +def test_per_page_equals_limit(e2e_journey): + """Confirm API is called with per_page=limit, not per_page=100.""" + # search_tickets calls get_plane_client_context() internally for a fresh client. + # We must patch that function so the recording wrapper is on the client it actually uses. + from unittest.mock import patch + + from plane_mcp.client import get_plane_client_context + + real_client, real_ws = get_plane_client_context() + calls = [] + real_get = real_client.work_items._get + + def recording_get(path, **kwargs): + calls.append(kwargs.get("params", {})) + return real_get(path, **kwargs) + + real_client.work_items._get = recording_get + with patch("plane_mcp.journey.tools.read.get_plane_client_context", return_value=(real_client, real_ws)): + try: + e2e_journey.search_tickets(project_slug="TEST", limit=3, lod="summary") + finally: + real_client.work_items._get = real_get + + assert calls, "API was never called after patching" + assert calls[0]["per_page"] == 3, ( + f"Expected per_page=3 (min(limit,100)), got {calls[0]['per_page']}" + ) + print(f"\n[E2E] per_page assertion passed: per_page={calls[0]['per_page']}") + + +def test_pagination_cursor_no_overlap(e2e_journey): + """ + Verify that page 2 results don't overlap with page 1. + This catches the old bug where per_page=100 but limit=5 caused the + returned cursor to skip items 6-100 on page 1. + """ + page1 = e2e_journey.search_tickets(project_slug="TEST", limit=3, lod="summary") + cursor = page1.get("next_cursor") + + if not cursor: + pytest.skip("Not enough tickets in TEST project for pagination test (need >3)") + + page2 = e2e_journey.search_tickets(project_slug="TEST", limit=3, cursor=cursor, lod="summary") + + p1_keys = {r.get("key") or r.get("ticket_id") for r in page1["results"]} + p2_keys = {r.get("key") or r.get("ticket_id") for r in page2["results"]} + + overlap = p1_keys & p2_keys + print(f"\n[E2E] Page 1 keys: {p1_keys}") + print(f"[E2E] Page 2 keys: {p2_keys}") + print(f"[E2E] Overlap: {overlap}") + + assert not overlap, ( + f"Pagination overlap detected! These tickets appeared on both pages: {overlap}" + ) + + +def test_label_exception_returns_llm_friendly(e2e_journey): + """ + Verify two distinct label failure modes: + A) Label not found by name -> warning + results still returned (search broadened but warned) + B) Label lookup throws exception -> empty results + warning (safe failure, no widening) + """ + _ = e2e_journey.resolver.client + + # --- Mode A: label name not found (not an exception, just missing) --- + result_a = e2e_journey.search_tickets( + project_slug="TEST", + labels=["nonexistent-label-xyz-123"], + limit=3, + lod="summary" + ) + print(f"\n[E2E] Mode A (label not found): {result_a.get('warnings')}") + assert "warnings" in result_a, "Expected warnings for unknown label" + assert any("nonexistent-label-xyz-123" in w for w in result_a["warnings"]), ( + f"Expected label name in warning, got: {result_a['warnings']}" + ) + # results MAY or MAY NOT be empty depending on whether the filter was applied + + # --- Mode B: label lookup raises an exception -> empty results + warning --- + # Must patch at the module level for the interception to work + from unittest.mock import patch + + def failing_labels_list(*args, **kwargs): + raise ConnectionError("Simulated network failure") + + from plane_mcp.client import get_plane_client_context + real_client_b, real_ws_b = get_plane_client_context() + real_labels_list_b = real_client_b.labels.list + real_client_b.labels.list = failing_labels_list + with patch("plane_mcp.journey.tools.read.get_plane_client_context", return_value=(real_client_b, real_ws_b)): + try: + result_b = e2e_journey.search_tickets( + project_slug="TEST", + labels=["some-label"], + limit=5, + lod="summary" + ) + finally: + real_client_b.labels.list = real_labels_list_b + + print(f"[E2E] Mode B (exception): {result_b}") + assert "results" in result_b + assert result_b["results"] == [], ( + f"Expected empty results on label exception, got: {result_b['results']}" + ) + assert "warnings" in result_b, "Expected a 'warnings' key in the exception response" + assert any("Label filter" in w for w in result_b["warnings"]), ( + f"Expected label-filter warning, got: {result_b['warnings']}" + ) + + +if __name__ == "__main__": + # Can run directly: PLANE_API_KEY=... PLANE_WORKSPACE_SLUG=... python tests/e2e_pagination_test.py + import sys + sys.exit(pytest.main([__file__, "-v", "-s"])) diff --git a/tests/test_generated_tools.py b/tests/test_generated_tools.py new file mode 100644 index 0000000..2c7075d --- /dev/null +++ b/tests/test_generated_tools.py @@ -0,0 +1,21 @@ +import ast +import os + + +def check_file_for_uuid(filepath): + with open(filepath) as f: + tree = ast.parse(f.read()) + + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef) and node.name in ('get_project', 'list_issues', 'create_issue_raw', 'update_issue_raw', 'delete_issue', 'list_states', 'create_state'): + for arg in node.args.args: + if arg.arg.endswith('_id'): + assert ast.unparse(arg.annotation) == "uuid.UUID", f"{arg.arg} in {node.name} is not UUID!" + +def test_generated_core_type_hints(): + """Verify that the auto-generated core tools require UUIDs and proper types.""" + check_file_for_uuid(os.path.join(os.path.dirname(__file__), "../plane_mcp/tools/generated_core.py")) + +def test_generated_metadata_type_hints(): + """Verify that the auto-generated metadata tools require UUIDs.""" + check_file_for_uuid(os.path.join(os.path.dirname(__file__), "../plane_mcp/tools/generated_metadata.py")) diff --git a/tests/test_input_validation.py b/tests/test_input_validation.py new file mode 100644 index 0000000..1d749b7 --- /dev/null +++ b/tests/test_input_validation.py @@ -0,0 +1,242 @@ +"""Tests for input validation and data safety sprint. + +Covers: +- PLANE-27: HTML sanitization strips XSS vectors while preserving safe formatting +- PLANE-29: Label auto-creation is bounded (max 3 new labels per request) +- PLANE-38: update_ticket retrieves without expand to avoid label ValidationError +""" + +from unittest.mock import MagicMock, patch + +import pytest + +from plane_mcp.sanitize import sanitize_html + + +class TestHTMLSanitization: + """PLANE-27: Sanitize HTML inputs to prevent stored XSS.""" + + def test_strips_script_tags(self): + result = sanitize_html("Hello
") + assert "">') + assert "data:" not in result + + def test_strips_style_attribute(self): + result = sanitize_html('Normal text
+