diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index d5f5aba2d5..8fecfb9a80 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4675,6 +4675,20 @@ def extension_set_priority( ) workflow_app.add_typer(workflow_catalog_app, name="catalog") +workflow_step_app = typer.Typer( + name="step", + help="Manage workflow step types", + add_completion=False, +) +workflow_app.add_typer(workflow_step_app, name="step") + +workflow_step_catalog_app = typer.Typer( + name="catalog", + help="Manage step catalogs", + add_completion=False, +) +workflow_step_app.add_typer(workflow_step_catalog_app, name="catalog") + @workflow_app.command("run") def workflow_run( @@ -5321,6 +5335,414 @@ def workflow_catalog_remove( console.print(f"[green]✓[/green] Catalog source '{removed_name}' removed") +# ===== Workflow Step Commands ===== + +@workflow_step_app.command("list") +def workflow_step_list(): + """List installed step types (built-in and custom).""" + from .workflows import STEP_REGISTRY + from .workflows.catalog import StepRegistry + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + + # Load custom steps if in a spec-kit project + custom_keys: set[str] = set() + if specify_dir.exists(): + from .workflows import load_custom_steps + loaded = load_custom_steps(project_root) + custom_keys.update(loaded) + # Also read registry for metadata + registry = StepRegistry(project_root) + installed = registry.list() + custom_keys.update(installed.keys()) + + console.print("\n[bold cyan]Installed Step Types:[/bold cyan]\n") + + built_in: list[str] = [] + custom: list[str] = [] + for key in sorted(STEP_REGISTRY.keys()): + if key in custom_keys: + custom.append(key) + else: + built_in.append(key) + + if built_in: + console.print(" [bold]Built-in:[/bold]") + for key in built_in: + console.print(f" • {key}") + console.print() + + if custom: + console.print(" [bold]Custom (installed):[/bold]") + if specify_dir.exists(): + registry = StepRegistry(project_root) + for key in custom: + meta = registry.get(key) or {} + name = meta.get("name", key) + version = meta.get("version", "?") + console.print(f" • [bold]{name}[/bold] ({key}) v{version}") + else: + for key in custom: + console.print(f" • {key}") + console.print() + + if not built_in and not custom: + console.print("[yellow]No step types found.[/yellow]") + + if specify_dir.exists(): + console.print( + " Install a new step type with: [cyan]specify workflow step add [/cyan]" + ) + + +@workflow_step_app.command("add") +def workflow_step_add( + step_id: str = typer.Argument(..., help="Step type ID from catalog"), +): + """Install a custom step type from the step catalog.""" + from .workflows.catalog import StepCatalog, StepCatalogError, StepRegistry + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + catalog = StepCatalog(project_root) + try: + info = catalog.get_step_info(step_id) + except StepCatalogError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + + if not info: + console.print(f"[red]Error:[/red] Step type '{step_id}' not found in catalog") + raise typer.Exit(1) + + if not info.get("_install_allowed", True): + console.print( + f"[yellow]Warning:[/yellow] Step type '{step_id}' is from a discovery-only catalog" + ) + console.print("Direct installation is not enabled for this catalog source.") + raise typer.Exit(1) + + step_yml_url = info.get("step_yml_url") or info.get("url") + if not step_yml_url: + console.print(f"[red]Error:[/red] Catalog entry for '{step_id}' has no URL") + raise typer.Exit(1) + + # Derive __init__.py URL: replace trailing step.yml with __init__.py + # or use explicit init_url if provided. + init_url = info.get("init_url") + if not init_url: + if step_yml_url.endswith("step.yml"): + init_url = step_yml_url[: -len("step.yml")] + "__init__.py" + else: + console.print( + f"[red]Error:[/red] Cannot derive __init__.py URL from '{step_yml_url}'. " + "Catalog entry should provide 'init_url' or a 'url' ending in 'step.yml'." + ) + raise typer.Exit(1) + + from urllib.parse import urlparse + from urllib.request import urlopen + + def _safe_fetch(url: str) -> bytes: + parsed = urlparse(url) + is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1") + if parsed.scheme != "https" and not (parsed.scheme == "http" and is_localhost): + raise ValueError(f"Refusing to fetch from non-HTTPS URL: {url}") + with urlopen(url, timeout=30) as resp: # noqa: S310 + final_url = resp.geturl() + final_parsed = urlparse(final_url) + final_is_localhost = final_parsed.hostname in ("localhost", "127.0.0.1", "::1") + if final_parsed.scheme != "https" and not ( + final_parsed.scheme == "http" and final_is_localhost + ): + raise ValueError(f"Redirect to non-HTTPS URL: {final_url}") + return resp.read() + + step_dir = project_root / ".specify" / "workflows" / "steps" / step_id + step_dir.mkdir(parents=True, exist_ok=True) + + try: + step_yml_content = _safe_fetch(step_yml_url) + init_py_content = _safe_fetch(init_url) + except Exception as exc: + import shutil + shutil.rmtree(step_dir, ignore_errors=True) + console.print(f"[red]Error:[/red] Failed to download step files: {exc}") + raise typer.Exit(1) + + # Validate step.yml + try: + import yaml as _yaml + + meta = _yaml.safe_load(step_yml_content.decode("utf-8")) or {} + except Exception as exc: + import shutil + shutil.rmtree(step_dir, ignore_errors=True) + console.print(f"[red]Error:[/red] Invalid step.yml: {exc}") + raise typer.Exit(1) + + step_meta = meta.get("step", {}) + type_key = step_meta.get("type_key", "") + if not type_key: + import shutil + shutil.rmtree(step_dir, ignore_errors=True) + console.print("[red]Error:[/red] step.yml missing 'step.type_key' field") + raise typer.Exit(1) + + if type_key != step_id: + import shutil + shutil.rmtree(step_dir, ignore_errors=True) + console.print( + f"[red]Error:[/red] step.yml type_key ({type_key!r}) does not match " + f"catalog ID ({step_id!r})" + ) + raise typer.Exit(1) + + # Write files + (step_dir / "step.yml").write_bytes(step_yml_content) + (step_dir / "__init__.py").write_bytes(init_py_content) + + # Register in step registry + registry = StepRegistry(project_root) + registry.add( + step_id, + { + "name": info.get("name", step_id), + "version": info.get("version", step_meta.get("version", "0.0.0")), + "description": info.get("description", step_meta.get("description", "")), + "author": info.get("author", step_meta.get("author", "")), + "source": "catalog", + "catalog_name": info.get("_catalog_name", ""), + "type_key": type_key, + }, + ) + + console.print( + f"[green]✓[/green] Step type '{info.get('name', step_id)}' ({step_id}) installed" + ) + console.print( + " Use [cyan]specify workflow step list[/cyan] to verify the installation." + ) + + +@workflow_step_app.command("remove") +def workflow_step_remove( + step_id: str = typer.Argument(..., help="Step type ID to uninstall"), +): + """Uninstall a custom step type.""" + from .workflows.catalog import StepRegistry + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + registry = StepRegistry(project_root) + if not registry.is_installed(step_id): + console.print(f"[red]Error:[/red] Step type '{step_id}' is not installed") + raise typer.Exit(1) + + step_dir = project_root / ".specify" / "workflows" / "steps" / step_id + if step_dir.exists(): + import shutil + shutil.rmtree(step_dir) + + registry.remove(step_id) + console.print(f"[green]✓[/green] Step type '{step_id}' uninstalled") + + +@workflow_step_app.command("search") +def workflow_step_search( + query: str | None = typer.Argument(None, help="Search query"), +): + """Search the step type catalog.""" + from .workflows.catalog import StepCatalog, StepCatalogError + + project_root = Path.cwd() + if not (project_root / ".specify").exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + catalog = StepCatalog(project_root) + + try: + results = catalog.search(query=query) + except StepCatalogError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + + if not results: + if query: + console.print(f"[yellow]No step types found matching '{query}'.[/yellow]") + else: + console.print("[yellow]No step types found in catalog.[/yellow]") + return + + console.print(f"\n[bold cyan]Step Types ({len(results)}):[/bold cyan]\n") + for step in results: + install_note = ( + "" if step.get("_install_allowed", True) else " [dim](discovery only)[/dim]" + ) + console.print( + f" [bold]{step.get('name', step.get('id', '?'))}[/bold]" + f" ({step.get('id', '?')}) v{step.get('version', '?')}{install_note}" + ) + desc = step.get("description", "") + if desc: + console.print(f" {desc}") + console.print() + + +@workflow_step_app.command("info") +def workflow_step_info( + step_id: str = typer.Argument(..., help="Step type ID"), +): + """Show details for a step type.""" + from .workflows import STEP_REGISTRY + from .workflows.catalog import StepCatalog, StepCatalogError, StepRegistry + + project_root = Path.cwd() + if not (project_root / ".specify").exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + # Load custom steps so registry is up to date + from .workflows import load_custom_steps + load_custom_steps(project_root) + + registry = StepRegistry(project_root) + installed_meta = registry.get(step_id) + + # Check if it's a built-in + builtin_step = STEP_REGISTRY.get(step_id) + is_builtin = builtin_step is not None and not installed_meta + + if is_builtin: + console.print(f"\n[bold cyan]{step_id}[/bold cyan] [dim](built-in)[/dim]") + console.print(f" Type key: {step_id}") + console.print(" [green]Built-in step type[/green]") + return + + if installed_meta: + console.print( + f"\n[bold cyan]{installed_meta.get('name', step_id)}[/bold cyan] ({step_id})" + ) + console.print(f" Version: {installed_meta.get('version', '?')}") + if installed_meta.get("author"): + console.print(f" Author: {installed_meta['author']}") + if installed_meta.get("description"): + console.print(f" Description: {installed_meta['description']}") + console.print(" [green]Installed[/green]") + return + + # Try catalog + catalog = StepCatalog(project_root) + try: + info = catalog.get_step_info(step_id) + except StepCatalogError: + info = None + + if info: + console.print( + f"\n[bold cyan]{info.get('name', step_id)}[/bold cyan] ({step_id})" + ) + console.print(f" Version: {info.get('version', '?')}") + if info.get("author"): + console.print(f" Author: {info['author']}") + if info.get("description"): + console.print(f" Description: {info['description']}") + console.print(" [yellow]Not installed[/yellow]") + console.print( + f"\n Install with: [cyan]specify workflow step add {step_id}[/cyan]" + ) + else: + console.print(f"[red]Error:[/red] Step type '{step_id}' not found") + raise typer.Exit(1) + + +@workflow_step_catalog_app.command("list") +def workflow_step_catalog_list(): + """List configured step catalog sources.""" + from .workflows.catalog import StepCatalog, StepCatalogError + + project_root = Path.cwd() + catalog = StepCatalog(project_root) + + try: + configs = catalog.get_catalog_configs() + except StepCatalogError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + + console.print("\n[bold cyan]Step Catalog Sources:[/bold cyan]\n") + for i, cfg in enumerate(configs): + install_status = ( + "[green]install allowed[/green]" + if cfg["install_allowed"] + else "[yellow]discovery only[/yellow]" + ) + console.print(f" [{i}] [bold]{cfg['name']}[/bold] — {install_status}") + console.print(f" {cfg['url']}") + if cfg.get("description"): + console.print(f" [dim]{cfg['description']}[/dim]") + console.print() + + +@workflow_step_catalog_app.command("add") +def workflow_step_catalog_add( + url: str = typer.Argument(..., help="Catalog URL to add"), + name: str = typer.Option(None, "--name", help="Catalog name"), +): + """Add a step catalog source.""" + from .workflows.catalog import StepCatalog, StepValidationError + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + catalog = StepCatalog(project_root) + try: + catalog.add_catalog(url, name) + except StepValidationError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + + console.print(f"[green]✓[/green] Step catalog source added: {url}") + + +@workflow_step_catalog_app.command("remove") +def workflow_step_catalog_remove( + index: int = typer.Argument( + ..., help="Catalog index to remove (from 'step catalog list')" + ), +): + """Remove a step catalog source by index.""" + from .workflows.catalog import StepCatalog, StepValidationError + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + catalog = StepCatalog(project_root) + try: + removed_name = catalog.remove_catalog(index) + except StepValidationError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + + console.print(f"[green]✓[/green] Step catalog source '{removed_name}' removed") + + def main(): app() diff --git a/src/specify_cli/workflows/__init__.py b/src/specify_cli/workflows/__init__.py index 13782f620b..77a73dd16e 100644 --- a/src/specify_cli/workflows/__init__.py +++ b/src/specify_cli/workflows/__init__.py @@ -7,10 +7,12 @@ - ``STEP_REGISTRY`` — maps ``type_key`` to ``StepBase`` subclass instances. - ``WorkflowEngine`` — orchestrator that loads, validates, and executes workflow YAML definitions. +- ``load_custom_steps`` — loads community-installed step types into STEP_REGISTRY. """ from __future__ import annotations +from pathlib import Path from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -66,3 +68,79 @@ def _register_builtin_steps() -> None: _register_builtin_steps() + + +def load_custom_steps(project_root: Path) -> list[str]: + """Load community-installed custom step types into STEP_REGISTRY. + + Scans ``.specify/workflows/steps/`` for installed step packages. + Each valid package must contain ``step.yml`` (with a ``step.type_key`` + field) and ``__init__.py`` (a ``StepBase`` subclass). + + Returns a list of type_keys that were successfully loaded. + Silently skips packages that fail to import or validate. + """ + import importlib.util as _importlib_util + + steps_dir = Path(project_root) / ".specify" / "workflows" / "steps" + if not steps_dir.is_dir(): + return [] + + loaded: list[str] = [] + for step_dir in steps_dir.iterdir(): + if not step_dir.is_dir(): + continue + step_yml = step_dir / "step.yml" + init_py = step_dir / "__init__.py" + if not step_yml.exists() or not init_py.exists(): + continue + + try: + import yaml as _yaml + + meta = _yaml.safe_load(step_yml.read_text(encoding="utf-8")) or {} + step_meta = meta.get("step", {}) + type_key = step_meta.get("type_key", "") + if not type_key: + continue + + # Skip if already registered (e.g. built-in or previously loaded) + if type_key in STEP_REGISTRY: + continue + + spec = _importlib_util.spec_from_file_location( + f"_speckit_custom_step_{type_key}", init_py + ) + if spec is None or spec.loader is None: + continue + module = _importlib_util.module_from_spec(spec) + spec.loader.exec_module(module) # type: ignore[union-attr] + + # Find the StepBase subclass in the module + from .base import StepBase as _StepBase + + step_class = None + for attr_name in dir(module): + attr = getattr(module, attr_name) + try: + if ( + isinstance(attr, type) + and issubclass(attr, _StepBase) + and attr is not _StepBase + and getattr(attr, "type_key", "") == type_key + ): + step_class = attr + break + except TypeError: + continue + + if step_class is None: + continue + + _register_step(step_class()) + loaded.append(type_key) + except Exception: # noqa: BLE001 + # Silently skip broken step packages at load time + continue + + return loaded diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index da5c60b5c8..77f0f25200 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -1,9 +1,10 @@ -"""Workflow catalog — discovery, install, and management of workflows. +"""Workflow catalog — discovery, install, and management of workflows and step types. Mirrors the existing extension/preset catalog pattern with: - Multi-catalog stack (env var → project → user → built-in) - SHA256-hashed per-URL caching with 1-hour TTL - Workflow registry for installed workflow tracking +- Step registry for installed custom step type tracking - Search across all configured catalog sources """ @@ -538,3 +539,503 @@ def remove_catalog(self, index: int) -> str: if isinstance(removed, dict): return removed.get("name", f"catalog-{index + 1}") return f"catalog-{index + 1}" + + +# --------------------------------------------------------------------------- +# Step catalog errors +# --------------------------------------------------------------------------- + + +class StepCatalogError(Exception): + """Base error for step catalog operations.""" + + +class StepValidationError(StepCatalogError): + """Validation error for step catalog config or step data.""" + + +# --------------------------------------------------------------------------- +# StepCatalogEntry +# --------------------------------------------------------------------------- + + +@dataclass +class StepCatalogEntry: + """Represents a single step catalog source in the catalog stack.""" + + url: str + name: str + priority: int + install_allowed: bool + description: str = "" + + +# --------------------------------------------------------------------------- +# StepRegistry +# --------------------------------------------------------------------------- + + +class StepRegistry: + """Manages the registry of installed custom step types. + + Tracks installed step types and their metadata in + ``.specify/workflows/steps/step-registry.json``. + """ + + REGISTRY_FILE = "step-registry.json" + SCHEMA_VERSION = "1.0" + + def __init__(self, project_root: Path) -> None: + self.project_root = project_root + self.steps_dir = project_root / ".specify" / "workflows" / "steps" + self.registry_path = self.steps_dir / self.REGISTRY_FILE + self.data = self._load() + + def _load(self) -> dict[str, Any]: + """Load registry from disk or create default.""" + if self.registry_path.exists(): + try: + with open(self.registry_path, encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, ValueError): + return {"schema_version": self.SCHEMA_VERSION, "steps": {}} + return {"schema_version": self.SCHEMA_VERSION, "steps": {}} + + def save(self) -> None: + """Persist registry to disk.""" + self.steps_dir.mkdir(parents=True, exist_ok=True) + with open(self.registry_path, "w", encoding="utf-8") as f: + json.dump(self.data, f, indent=2) + + def add(self, step_id: str, metadata: dict[str, Any]) -> None: + """Add or update an installed step entry.""" + from datetime import datetime, timezone + + existing = self.data["steps"].get(step_id, {}) + metadata["installed_at"] = existing.get( + "installed_at", datetime.now(timezone.utc).isoformat() + ) + metadata["updated_at"] = datetime.now(timezone.utc).isoformat() + self.data["steps"][step_id] = metadata + self.save() + + def remove(self, step_id: str) -> bool: + """Remove an installed step entry. Returns True if found.""" + if step_id in self.data["steps"]: + del self.data["steps"][step_id] + self.save() + return True + return False + + def get(self, step_id: str) -> dict[str, Any] | None: + """Get metadata for an installed step.""" + return self.data["steps"].get(step_id) + + def list(self) -> dict[str, dict[str, Any]]: + """Return all installed steps.""" + return dict(self.data["steps"]) + + def is_installed(self, step_id: str) -> bool: + """Check if a step is installed.""" + return step_id in self.data["steps"] + + +# --------------------------------------------------------------------------- +# StepCatalog +# --------------------------------------------------------------------------- + + +class StepCatalog: + """Manages step catalog fetching, caching, and searching. + + Resolution order for catalog sources: + 1. ``SPECKIT_STEP_CATALOG_URL`` env var (overrides all) + 2. Project-level ``.specify/step-catalogs.yml`` + 3. User-level ``~/.specify/step-catalogs.yml`` + 4. Built-in defaults (official + community) + """ + + DEFAULT_CATALOG_URL = ( + "https://raw.githubusercontent.com/github/spec-kit/main/" + "workflows/step-catalog.json" + ) + COMMUNITY_CATALOG_URL = ( + "https://raw.githubusercontent.com/github/spec-kit/main/" + "workflows/step-catalog.community.json" + ) + CACHE_DURATION = 3600 # 1 hour + + def __init__(self, project_root: Path) -> None: + self.project_root = project_root + self.steps_dir = project_root / ".specify" / "workflows" / "steps" + self.cache_dir = self.steps_dir / ".cache" + + # -- Catalog resolution ----------------------------------------------- + + def _validate_catalog_url(self, url: str) -> None: + """Validate that a catalog URL uses HTTPS (localhost HTTP allowed).""" + from urllib.parse import urlparse + + parsed = urlparse(url) + is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1") + if parsed.scheme != "https" and not ( + parsed.scheme == "http" and is_localhost + ): + raise StepValidationError( + f"Catalog URL must use HTTPS (got {parsed.scheme}://). " + "HTTP is only allowed for localhost." + ) + if not parsed.netloc: + raise StepValidationError( + "Catalog URL must be a valid URL with a host." + ) + + def _load_catalog_config( + self, config_path: Path + ) -> list[StepCatalogEntry] | None: + """Load catalog stack configuration from a YAML file.""" + if not config_path.exists(): + return None + try: + data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} + except (yaml.YAMLError, OSError, UnicodeError) as exc: + raise StepValidationError( + f"Failed to read catalog config {config_path}: {exc}" + ) + catalogs_data = data.get("catalogs", []) + if not catalogs_data: + return None + if not isinstance(catalogs_data, list): + raise StepValidationError( + f"Invalid catalog config: 'catalogs' must be a list, " + f"got {type(catalogs_data).__name__}" + ) + + entries: list[StepCatalogEntry] = [] + for idx, item in enumerate(catalogs_data): + if not isinstance(item, dict): + raise StepValidationError( + f"Invalid catalog entry at index {idx}: " + f"expected a mapping, got {type(item).__name__}" + ) + url = str(item.get("url", "")).strip() + if not url: + continue + self._validate_catalog_url(url) + try: + priority = int(item.get("priority", idx + 1)) + except (TypeError, ValueError): + raise StepValidationError( + f"Invalid priority for catalog " + f"'{item.get('name', idx + 1)}': " + f"expected integer, got {item.get('priority')!r}" + ) + raw_install = item.get("install_allowed", False) + if isinstance(raw_install, str): + install_allowed = raw_install.strip().lower() in ( + "true", + "yes", + "1", + ) + else: + install_allowed = bool(raw_install) + entries.append( + StepCatalogEntry( + url=url, + name=str(item.get("name", f"catalog-{idx + 1}")), + priority=priority, + install_allowed=install_allowed, + description=str(item.get("description", "")), + ) + ) + entries.sort(key=lambda e: e.priority) + if not entries: + raise StepValidationError( + f"Catalog config {config_path} contains {len(catalogs_data)} " + f"entries but none have valid URLs." + ) + return entries + + def get_active_catalogs(self) -> list[StepCatalogEntry]: + """Get the ordered list of active step catalogs.""" + # 1. Environment variable override + env_url = os.environ.get("SPECKIT_STEP_CATALOG_URL", "").strip() + if env_url: + self._validate_catalog_url(env_url) + return [ + StepCatalogEntry( + url=env_url, + name="env-override", + priority=1, + install_allowed=True, + description="From SPECKIT_STEP_CATALOG_URL", + ) + ] + + # 2. Project-level config + project_config = self.project_root / ".specify" / "step-catalogs.yml" + project_entries = self._load_catalog_config(project_config) + if project_entries is not None: + return project_entries + + # 3. User-level config + home = Path.home() + user_config = home / ".specify" / "step-catalogs.yml" + user_entries = self._load_catalog_config(user_config) + if user_entries is not None: + return user_entries + + # 4. Built-in defaults + return [ + StepCatalogEntry( + url=self.DEFAULT_CATALOG_URL, + name="default", + priority=1, + install_allowed=True, + description="Official step types", + ), + StepCatalogEntry( + url=self.COMMUNITY_CATALOG_URL, + name="community", + priority=2, + install_allowed=False, + description="Community-contributed step types (discovery only)", + ), + ] + + # -- Caching ---------------------------------------------------------- + + def _get_cache_paths(self, url: str) -> tuple[Path, Path]: + """Get cache file paths for a URL (hash-based).""" + url_hash = hashlib.sha256(url.encode()).hexdigest()[:16] + cache_file = self.cache_dir / f"step-catalog-{url_hash}.json" + meta_file = self.cache_dir / f"step-catalog-{url_hash}-meta.json" + return cache_file, meta_file + + def _is_url_cache_valid(self, url: str) -> bool: + """Check if cached data for a URL is still fresh.""" + _, meta_file = self._get_cache_paths(url) + if not meta_file.exists(): + return False + try: + with open(meta_file, encoding="utf-8") as f: + meta = json.load(f) + fetched_at = meta.get("fetched_at", 0) + return (time.time() - fetched_at) < self.CACHE_DURATION + except (json.JSONDecodeError, OSError): + return False + + def _fetch_single_catalog( + self, entry: StepCatalogEntry, force_refresh: bool = False + ) -> dict[str, Any]: + """Fetch a single catalog, using cache when possible.""" + cache_file, meta_file = self._get_cache_paths(entry.url) + + if not force_refresh and self._is_url_cache_valid(entry.url): + try: + with open(cache_file, encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + pass + + from urllib.parse import urlparse + from urllib.request import urlopen + + def _validate_url(url: str) -> None: + parsed = urlparse(url) + is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1") + if parsed.scheme != "https" and not ( + parsed.scheme == "http" and is_localhost + ): + raise StepCatalogError( + f"Refusing to fetch catalog from non-HTTPS URL: {url}" + ) + + _validate_url(entry.url) + + try: + with urlopen(entry.url, timeout=30) as resp: # noqa: S310 + _validate_url(resp.geturl()) + data = json.loads(resp.read().decode("utf-8")) + except Exception as exc: + if cache_file.exists(): + try: + with open(cache_file, encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, ValueError, OSError): + pass + raise StepCatalogError( + f"Failed to fetch catalog from {entry.url}: {exc}" + ) from exc + + if not isinstance(data, dict): + raise StepCatalogError( + f"Catalog from {entry.url} is not a valid JSON object." + ) + + self.cache_dir.mkdir(parents=True, exist_ok=True) + with open(cache_file, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + with open(meta_file, "w", encoding="utf-8") as f: + json.dump({"url": entry.url, "fetched_at": time.time()}, f) + + return data + + def _get_merged_steps( + self, force_refresh: bool = False + ) -> dict[str, dict[str, Any]]: + """Merge steps from all active catalogs (lower priority number wins).""" + catalogs = self.get_active_catalogs() + merged: dict[str, dict[str, Any]] = {} + fetch_errors = 0 + + for entry in reversed(catalogs): + try: + data = self._fetch_single_catalog(entry, force_refresh) + except StepCatalogError: + fetch_errors += 1 + continue + steps = data.get("steps", {}) + if isinstance(steps, dict): + for step_id, step_data in steps.items(): + if not isinstance(step_data, dict): + continue + step_data["_catalog_name"] = entry.name + step_data["_install_allowed"] = entry.install_allowed + merged[step_id] = step_data + elif isinstance(steps, list): + for step_data in steps: + if not isinstance(step_data, dict): + continue + step_id = step_data.get("id", "") + if step_id: + step_data["_catalog_name"] = entry.name + step_data["_install_allowed"] = entry.install_allowed + merged[step_id] = step_data + if fetch_errors == len(catalogs) and catalogs: + raise StepCatalogError("All configured step catalogs failed to fetch.") + return merged + + # -- Public API ------------------------------------------------------- + + def search( + self, + query: str | None = None, + ) -> list[dict[str, Any]]: + """Search step types across all configured catalogs.""" + merged = self._get_merged_steps() + results: list[dict[str, Any]] = [] + + for step_id, step_data in merged.items(): + step_data.setdefault("id", step_id) + if query: + q = query.lower() + searchable = " ".join( + [ + step_data.get("name", ""), + step_data.get("description", ""), + step_data.get("id", ""), + ] + ).lower() + if q not in searchable: + continue + results.append(step_data) + return results + + def get_step_info(self, step_id: str) -> dict[str, Any] | None: + """Get details for a specific step from the catalog.""" + merged = self._get_merged_steps() + step = merged.get(step_id) + if step: + step.setdefault("id", step_id) + return step + + def get_catalog_configs(self) -> list[dict[str, Any]]: + """Return current catalog configuration as a list of dicts.""" + entries = self.get_active_catalogs() + return [ + { + "name": e.name, + "url": e.url, + "priority": e.priority, + "install_allowed": e.install_allowed, + "description": e.description, + } + for e in entries + ] + + def add_catalog(self, url: str, name: str | None = None) -> None: + """Add a catalog source to the project-level config.""" + self._validate_catalog_url(url) + config_path = self.project_root / ".specify" / "step-catalogs.yml" + + data: dict[str, Any] = {"catalogs": []} + if config_path.exists(): + raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) + if not isinstance(raw, dict): + raise StepValidationError( + "Catalog config file is corrupted (expected a mapping)." + ) + data = raw + + catalogs = data.get("catalogs", []) + if not isinstance(catalogs, list): + raise StepValidationError( + "Catalog config 'catalogs' must be a list." + ) + for cat in catalogs: + if isinstance(cat, dict) and cat.get("url") == url: + raise StepValidationError( + f"Catalog URL already configured: {url}" + ) + + max_priority = max( + (cat.get("priority", 0) for cat in catalogs if isinstance(cat, dict)), + default=0, + ) + catalogs.append( + { + "name": name or f"catalog-{len(catalogs) + 1}", + "url": url, + "priority": max_priority + 1, + "install_allowed": True, + "description": "", + } + ) + data["catalogs"] = catalogs + + config_path.parent.mkdir(parents=True, exist_ok=True) + with open(config_path, "w", encoding="utf-8") as f: + yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True) + + def remove_catalog(self, index: int) -> str: + """Remove a catalog source by index (0-based). Returns the removed name.""" + config_path = self.project_root / ".specify" / "step-catalogs.yml" + if not config_path.exists(): + raise StepValidationError("No step catalog config file found.") + + data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} + if not isinstance(data, dict): + raise StepValidationError( + "Catalog config file is corrupted (expected a mapping)." + ) + catalogs = data.get("catalogs", []) + if not isinstance(catalogs, list): + raise StepValidationError( + "Catalog config 'catalogs' must be a list." + ) + + if index < 0 or index >= len(catalogs): + raise StepValidationError( + f"Catalog index {index} out of range (0-{len(catalogs) - 1})." + ) + + removed = catalogs.pop(index) + data["catalogs"] = catalogs + + with open(config_path, "w", encoding="utf-8") as f: + yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True) + + if isinstance(removed, dict): + return removed.get("name", f"catalog-{index + 1}") + return f"catalog-{index + 1}" diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 4c042fc7d5..4910521baf 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -1843,3 +1843,353 @@ def test_switch_workflow(self, project_dir): assert state.status == RunStatus.COMPLETED assert "do-plan" in state.step_results assert "do-specify" not in state.step_results + + +# ===== Step Registry Tests ===== + +class TestStepRegistryCustom: + """Test StepRegistry operations for custom step types.""" + + def test_add_and_get(self, project_dir): + from specify_cli.workflows.catalog import StepRegistry + + registry = StepRegistry(project_dir) + registry.add("deploy", {"name": "Deploy", "version": "1.0.0", "type_key": "deploy"}) + + entry = registry.get("deploy") + assert entry is not None + assert entry["name"] == "Deploy" + assert "installed_at" in entry + + def test_remove(self, project_dir): + from specify_cli.workflows.catalog import StepRegistry + + registry = StepRegistry(project_dir) + registry.add("deploy", {"name": "Deploy", "type_key": "deploy"}) + assert registry.is_installed("deploy") + + registry.remove("deploy") + assert not registry.is_installed("deploy") + + def test_remove_missing_returns_false(self, project_dir): + from specify_cli.workflows.catalog import StepRegistry + + registry = StepRegistry(project_dir) + removed = registry.remove("nonexistent") + assert removed is False + + def test_list(self, project_dir): + from specify_cli.workflows.catalog import StepRegistry + + registry = StepRegistry(project_dir) + registry.add("step-a", {"name": "A", "type_key": "step-a"}) + registry.add("step-b", {"name": "B", "type_key": "step-b"}) + + installed = registry.list() + assert "step-a" in installed + assert "step-b" in installed + + def test_is_installed(self, project_dir): + from specify_cli.workflows.catalog import StepRegistry + + registry = StepRegistry(project_dir) + assert not registry.is_installed("missing") + + registry.add("exists", {"name": "Exists", "type_key": "exists"}) + assert registry.is_installed("exists") + + def test_persistence(self, project_dir): + from specify_cli.workflows.catalog import StepRegistry + + registry1 = StepRegistry(project_dir) + registry1.add("deploy", {"name": "Deploy", "type_key": "deploy"}) + + registry2 = StepRegistry(project_dir) + assert registry2.is_installed("deploy") + + def test_corrupted_registry_resets(self, project_dir): + from specify_cli.workflows.catalog import StepRegistry + + registry = StepRegistry(project_dir) + registry.steps_dir.mkdir(parents=True, exist_ok=True) + registry.registry_path.write_text("not json", encoding="utf-8") + + # Loading again should reset + registry2 = StepRegistry(project_dir) + assert registry2.list() == {} + + +# ===== Step Catalog Tests ===== + +class TestStepCatalog: + """Test StepCatalog catalog resolution.""" + + def test_default_catalogs(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog + + catalog = StepCatalog(project_dir) + entries = catalog.get_active_catalogs() + assert len(entries) == 2 + assert entries[0].name == "default" + assert entries[1].name == "community" + + def test_env_var_override(self, project_dir, monkeypatch): + from specify_cli.workflows.catalog import StepCatalog + + monkeypatch.setenv("SPECKIT_STEP_CATALOG_URL", "https://example.com/step-catalog.json") + catalog = StepCatalog(project_dir) + entries = catalog.get_active_catalogs() + assert len(entries) == 1 + assert entries[0].name == "env-override" + assert entries[0].url == "https://example.com/step-catalog.json" + + def test_project_level_config(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog + + config_path = project_dir / ".specify" / "step-catalogs.yml" + config_path.write_text(yaml.dump({ + "catalogs": [{ + "name": "custom", + "url": "https://example.com/step-catalog.json", + "priority": 1, + "install_allowed": True, + }] + })) + + catalog = StepCatalog(project_dir) + entries = catalog.get_active_catalogs() + assert len(entries) == 1 + assert entries[0].name == "custom" + + def test_validate_url_http_rejected(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog, StepValidationError + + catalog = StepCatalog(project_dir) + with pytest.raises(StepValidationError, match="HTTPS"): + catalog._validate_catalog_url("http://evil.com/step-catalog.json") + + def test_validate_url_localhost_http_allowed(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog + + catalog = StepCatalog(project_dir) + # Should not raise + catalog._validate_catalog_url("http://localhost:8080/step-catalog.json") + + def test_add_catalog(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog + + catalog = StepCatalog(project_dir) + catalog.add_catalog("https://example.com/new-steps.json", "my-steps") + + config_path = project_dir / ".specify" / "step-catalogs.yml" + assert config_path.exists() + data = yaml.safe_load(config_path.read_text()) + assert len(data["catalogs"]) == 1 + assert data["catalogs"][0]["url"] == "https://example.com/new-steps.json" + + def test_add_catalog_duplicate_rejected(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog, StepValidationError + + catalog = StepCatalog(project_dir) + catalog.add_catalog("https://example.com/steps.json") + + with pytest.raises(StepValidationError, match="already configured"): + catalog.add_catalog("https://example.com/steps.json") + + def test_remove_catalog(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog + + catalog = StepCatalog(project_dir) + catalog.add_catalog("https://example.com/s1.json", "first") + catalog.add_catalog("https://example.com/s2.json", "second") + + removed = catalog.remove_catalog(0) + assert removed == "first" + + config_path = project_dir / ".specify" / "step-catalogs.yml" + data = yaml.safe_load(config_path.read_text()) + assert len(data["catalogs"]) == 1 + + def test_remove_catalog_invalid_index(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog, StepValidationError + + catalog = StepCatalog(project_dir) + catalog.add_catalog("https://example.com/s1.json") + + with pytest.raises(StepValidationError, match="out of range"): + catalog.remove_catalog(5) + + def test_remove_catalog_no_config(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog, StepValidationError + + catalog = StepCatalog(project_dir) + with pytest.raises(StepValidationError, match="No step catalog config file found"): + catalog.remove_catalog(0) + + def test_get_catalog_configs(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog + + catalog = StepCatalog(project_dir) + configs = catalog.get_catalog_configs() + assert len(configs) == 2 + assert configs[0]["name"] == "default" + assert isinstance(configs[0]["install_allowed"], bool) + + def test_search_with_mock_catalog(self, project_dir, monkeypatch): + from specify_cli.workflows.catalog import StepCatalog + + mock_data = { + "schema_version": "1.0", + "steps": { + "deploy": { + "id": "deploy", + "name": "Deploy Step", + "description": "Deploy to production", + "version": "1.0.0", + }, + "notify": { + "id": "notify", + "name": "Notify Step", + "description": "Send notifications", + "version": "1.0.0", + }, + }, + } + + catalog = StepCatalog(project_dir) + monkeypatch.setattr(catalog, "_get_merged_steps", lambda **kw: { + "deploy": dict(mock_data["steps"]["deploy"], _catalog_name="test", _install_allowed=True), + "notify": dict(mock_data["steps"]["notify"], _catalog_name="test", _install_allowed=True), + }) + + results = catalog.search() + assert len(results) == 2 + + results = catalog.search(query="deploy") + assert len(results) == 1 + assert results[0]["id"] == "deploy" + + def test_get_step_info_with_mock_catalog(self, project_dir, monkeypatch): + from specify_cli.workflows.catalog import StepCatalog + + catalog = StepCatalog(project_dir) + monkeypatch.setattr(catalog, "_get_merged_steps", lambda **kw: { + "deploy": { + "id": "deploy", + "name": "Deploy Step", + "version": "1.0.0", + "_catalog_name": "test", + "_install_allowed": True, + }, + }) + + info = catalog.get_step_info("deploy") + assert info is not None + assert info["name"] == "Deploy Step" + + missing = catalog.get_step_info("nonexistent") + assert missing is None + + +# ===== Load Custom Steps Tests ===== + +class TestLoadCustomSteps: + """Test dynamic loading of custom step types from the filesystem.""" + + def test_empty_steps_dir(self, project_dir): + from specify_cli.workflows import load_custom_steps + + loaded = load_custom_steps(project_dir) + assert loaded == [] + + def test_no_steps_dir(self, project_dir): + from specify_cli.workflows import load_custom_steps + + # .specify/workflows/steps does not exist + loaded = load_custom_steps(project_dir) + assert loaded == [] + + def test_load_valid_custom_step(self, project_dir): + from specify_cli.workflows import load_custom_steps, STEP_REGISTRY + + step_dir = project_dir / ".specify" / "workflows" / "steps" / "test-custom" + step_dir.mkdir(parents=True) + + step_yml = """ +schema_version: "1.0" +step: + type_key: "test-custom" + name: "Test Custom Step" + version: "1.0.0" + author: "test" + description: "A test custom step" +""" + (step_dir / "step.yml").write_text(step_yml, encoding="utf-8") + + init_py = """ +from specify_cli.workflows.base import StepBase, StepResult + +class TestCustomStep(StepBase): + type_key = "test-custom" + + def execute(self, config, context): + return StepResult() +""" + (step_dir / "__init__.py").write_text(init_py, encoding="utf-8") + + loaded = load_custom_steps(project_dir) + assert "test-custom" in loaded + assert "test-custom" in STEP_REGISTRY + + def test_skip_missing_step_yml(self, project_dir): + from specify_cli.workflows import load_custom_steps + + step_dir = project_dir / ".specify" / "workflows" / "steps" / "bad-step" + step_dir.mkdir(parents=True) + (step_dir / "__init__.py").write_text("# no step.yml", encoding="utf-8") + + loaded = load_custom_steps(project_dir) + assert "bad-step" not in loaded + + def test_skip_missing_init_py(self, project_dir): + from specify_cli.workflows import load_custom_steps + + step_dir = project_dir / ".specify" / "workflows" / "steps" / "bad-step2" + step_dir.mkdir(parents=True) + (step_dir / "step.yml").write_text( + "step:\n type_key: bad-step2\n", encoding="utf-8" + ) + + loaded = load_custom_steps(project_dir) + assert "bad-step2" not in loaded + + def test_skip_already_registered(self, project_dir): + from specify_cli.workflows import load_custom_steps + + # "command" is already registered as a built-in step + step_dir = project_dir / ".specify" / "workflows" / "steps" / "command" + step_dir.mkdir(parents=True) + (step_dir / "step.yml").write_text( + "step:\n type_key: command\n", encoding="utf-8" + ) + (step_dir / "__init__.py").write_text("", encoding="utf-8") + + # Should not raise KeyError; just skip + loaded = load_custom_steps(project_dir) + assert "command" not in loaded + + def test_skip_broken_init_py(self, project_dir): + from specify_cli.workflows import load_custom_steps + + step_dir = project_dir / ".specify" / "workflows" / "steps" / "broken-step" + step_dir.mkdir(parents=True) + (step_dir / "step.yml").write_text( + "step:\n type_key: broken-step\n", encoding="utf-8" + ) + (step_dir / "__init__.py").write_text( + "raise RuntimeError('broken')", encoding="utf-8" + ) + + # Should not propagate exception + loaded = load_custom_steps(project_dir) + assert "broken-step" not in loaded diff --git a/workflows/step-catalog.community.json b/workflows/step-catalog.community.json new file mode 100644 index 0000000000..50a45049f1 --- /dev/null +++ b/workflows/step-catalog.community.json @@ -0,0 +1,6 @@ +{ + "schema_version": "1.0", + "updated_at": "2026-04-28T00:00:00Z", + "catalog_url": "https://raw.githubusercontent.com/github/spec-kit/main/workflows/step-catalog.community.json", + "steps": {} +} diff --git a/workflows/step-catalog.json b/workflows/step-catalog.json new file mode 100644 index 0000000000..e109bf4edd --- /dev/null +++ b/workflows/step-catalog.json @@ -0,0 +1,6 @@ +{ + "schema_version": "1.0", + "updated_at": "2026-04-28T00:00:00Z", + "catalog_url": "https://raw.githubusercontent.com/github/spec-kit/main/workflows/step-catalog.json", + "steps": {} +}