diff --git a/codeframe/cli/app.py b/codeframe/cli/app.py index 7f0509e0..39671150 100644 --- a/codeframe/cli/app.py +++ b/codeframe/cli/app.py @@ -1547,6 +1547,18 @@ def tasks_generate( "--overwrite", help="Delete existing tasks before generating new ones", ), + recursive: bool = typer.Option( + False, + "--recursive", "-r", + help="Use recursive decomposition", + ), + max_depth: int = typer.Option( + 3, + "--max-depth", + help="Maximum recursion depth (1-5)", + min=1, + max=5, + ), ) -> None: """Generate tasks from the PRD. @@ -1555,6 +1567,7 @@ def tasks_generate( Use --overwrite to clear existing tasks first (useful for regeneration). Without --overwrite, new tasks are appended (useful for multi-PRD projects). + Use --recursive for recursive decomposition into a task tree. """ from codeframe.core.workspace import get_workspace from codeframe.core import prd, tasks @@ -1585,13 +1598,27 @@ def tasks_generate( from codeframe.cli.validators import require_anthropic_api_key require_anthropic_api_key() - if no_llm: + if recursive: + console.print(f"[dim]Using recursive decomposition (max depth: {max_depth})...[/dim]") + + from codeframe.adapters.llm import get_provider + from codeframe.core.task_tree import generate_task_tree, flatten_task_tree + + provider = get_provider() + tree = generate_task_tree( + provider, + prd_record.content, + lineage=[], + depth=0, + max_depth=max_depth, + ) + created = flatten_task_tree(tree, workspace, prd_id=prd_record.id) + elif no_llm: console.print("[dim]Using simple extraction (--no-llm)[/dim]") + created = tasks.generate_from_prd(workspace, prd_record, use_llm=False) else: console.print("[dim]Using LLM for task generation...[/dim]") - - # Generate tasks - created = tasks.generate_from_prd(workspace, prd_record, use_llm=not no_llm) + created = tasks.generate_from_prd(workspace, prd_record, use_llm=True) # Emit event emit_for_workspace( @@ -1624,6 +1651,35 @@ def tasks_generate( raise typer.Exit(1) +@tasks_app.command("tree") +def tasks_tree( + repo_path: Optional[Path] = typer.Option( + None, + "--workspace", "-w", + help="Workspace path (defaults to current directory)", + ), +) -> None: + """Display task hierarchy as tree.""" + from codeframe.core.task_tree import display_task_tree + from codeframe.core.workspace import get_workspace + + workspace_path = repo_path or Path.cwd() + + try: + workspace = get_workspace(workspace_path) + output = display_task_tree(workspace) + if output: + typer.echo(output) + else: + typer.echo("No tasks found.") + except FileNotFoundError as e: + console.print(f"[red]Error:[/red] {e}") + raise typer.Exit(1) + except Exception as e: + console.print(f"[red]Error:[/red] {e}") + raise typer.Exit(1) + + @tasks_app.command("list") def tasks_list( status: Optional[str] = typer.Option(None, "--status", "-s", help="Filter by status"), diff --git a/codeframe/core/context_packager.py b/codeframe/core/context_packager.py index 6322c1de..1bdbb392 100644 --- a/codeframe/core/context_packager.py +++ b/codeframe/core/context_packager.py @@ -53,6 +53,18 @@ def build( prompt_parts = [context.to_prompt_context()] + # Add lineage context if available + if ( + hasattr(context, "task") + and context.task + and hasattr(context.task, "lineage") + and context.task.lineage + ): + lineage_str = " \u2192 ".join(context.task.lineage) + prompt_parts.append( + f"\n## Task Lineage\nThis task is part of: {lineage_str}\n" + ) + if attempt > 0 and previous_errors: prompt_parts.append(self._build_retry_section(attempt, previous_errors)) diff --git a/codeframe/core/task_tree.py b/codeframe/core/task_tree.py new file mode 100644 index 00000000..459ffe07 --- /dev/null +++ b/codeframe/core/task_tree.py @@ -0,0 +1,400 @@ +"""Recursive task decomposition and tree operations. + +Provides functions to classify, decompose, and recursively build task trees +using LLM-powered analysis. Also handles tree display and status propagation. + +This module is headless - no FastAPI or HTTP dependencies. +""" + +import json +import re +from typing import Optional + +from codeframe.adapters.llm.base import Purpose +from codeframe.core import tasks as task_module +from codeframe.core.state_machine import TaskStatus +from codeframe.core.workspace import Workspace + + +CLASSIFY_SYSTEM_PROMPT = ( + "You are a task decomposition expert. Classify whether a task is 'atomic' " + "(can be done in 1-2 hours by one developer) or 'composite' (should be broken " + "into subtasks). When in doubt, choose 'atomic'. Return ONLY the word 'atomic' " + "or 'composite'." +) + +DECOMPOSE_SYSTEM_PROMPT = ( + "Break this task into 2-7 concrete subtasks. Each should be actionable and " + "testable. Return a JSON array of objects with 'title' and 'description' fields." +) + +# Status display icons +_STATUS_ICONS = { + TaskStatus.DONE: "\u2713", + TaskStatus.IN_PROGRESS: "\u25cf", + TaskStatus.FAILED: "\u2717", + TaskStatus.BLOCKED: "\u2298", + TaskStatus.BACKLOG: "\u25cb", + TaskStatus.READY: "\u25cb", + TaskStatus.MERGED: "\u2713", +} + + +def classify_task( + provider, description: str, lineage: list[str] +) -> str: + """Classify a task as 'atomic' or 'composite' using LLM. + + Args: + provider: LLM provider instance + description: Task description to classify + lineage: List of ancestor task descriptions for context + + Returns: + 'atomic' or 'composite' + """ + lineage_context = "" + if lineage: + lineage_context = "\n\nParent context:\n" + "\n".join( + f"- {desc}" for desc in lineage + ) + + user_message = f"Task: {description}{lineage_context}" + + response = provider.complete( + messages=[{"role": "user", "content": user_message}], + purpose=Purpose.PLANNING, + system=CLASSIFY_SYSTEM_PROMPT, + max_tokens=50, + temperature=0.0, + ) + + result = response.content.strip().lower() + if result in ("atomic", "composite"): + return result + return "atomic" + + +def decompose_task( + provider, description: str, lineage: list[str] +) -> list[dict]: + """Decompose a task into 2-7 subtasks using LLM. + + Args: + provider: LLM provider instance + description: Task description to decompose + lineage: List of ancestor task descriptions for context + + Returns: + List of dicts with 'title' and 'description' keys (2-7 items) + """ + lineage_context = "" + if lineage: + lineage_context = "\n\nParent context:\n" + "\n".join( + f"- {desc}" for desc in lineage + ) + + user_message = f"Task to decompose: {description}{lineage_context}" + + response = provider.complete( + messages=[{"role": "user", "content": user_message}], + purpose=Purpose.PLANNING, + system=DECOMPOSE_SYSTEM_PROMPT, + max_tokens=2048, + temperature=0.0, + ) + + subtasks = _parse_subtasks(response.content) + + # Clamp to 2-7 items + if len(subtasks) > 7: + subtasks = subtasks[:7] + while len(subtasks) < 2: + subtasks.append({ + "title": f"Part {len(subtasks) + 1} of: {description[:60]}", + "description": f"Additional subtask for: {description}", + }) + + return subtasks + + +def _parse_subtasks(content: str) -> list[dict]: + """Parse LLM response into subtask list. + + Handles JSON arrays directly or wrapped in markdown code blocks. + + Args: + content: Raw LLM response + + Returns: + List of dicts with 'title' and 'description' keys + """ + # Try markdown-wrapped JSON first + json_match = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", content) + if json_match: + json_str = json_match.group(1) + else: + # Try raw JSON array + json_match = re.search(r"\[[\s\S]*\]", content) + if json_match: + json_str = json_match.group(0) + else: + return [] + + try: + raw = json.loads(json_str) + except json.JSONDecodeError: + return [] + + if not isinstance(raw, list): + return [] + + result = [] + for item in raw: + if isinstance(item, dict) and "title" in item: + result.append({ + "title": str(item["title"])[:200], + "description": str(item.get("description", ""))[:2000], + }) + + return result + + +def generate_task_tree( + provider, + description: str, + lineage: Optional[list[str]] = None, + depth: int = 0, + max_depth: int = 3, +) -> dict: + """Recursively generate a task tree using LLM classification and decomposition. + + Args: + provider: LLM provider instance + description: Task description + lineage: Ancestor task descriptions (accumulates through recursion) + depth: Current recursion depth + max_depth: Maximum recursion depth before forcing leaf nodes + + Returns: + Tree dict with keys: title, description, is_leaf, children, lineage + """ + lineage = lineage or [] + + # Force leaf at max depth + if depth >= max_depth: + return { + "title": description[:80], + "description": description, + "is_leaf": True, + "children": [], + "lineage": lineage, + } + + kind = classify_task(provider, description, lineage) + + if kind == "atomic": + return { + "title": description[:80], + "description": description, + "is_leaf": True, + "children": [], + "lineage": lineage, + } + + # Composite: decompose and recurse + subtasks = decompose_task(provider, description, lineage) + children = [] + child_lineage = lineage + [description] + + for sub in subtasks: + child_tree = generate_task_tree( + provider, + sub["description"] or sub["title"], + lineage=child_lineage, + depth=depth + 1, + max_depth=max_depth, + ) + # Use the subtask title if it's better than the truncated description + child_tree["title"] = sub["title"][:80] + children.append(child_tree) + + return { + "title": description[:80], + "description": description, + "is_leaf": False, + "children": children, + "lineage": lineage, + } + + +def flatten_task_tree( + tree: dict, + workspace: Workspace, + prd_id: Optional[str] = None, + parent_id: Optional[str] = None, + position: int = 1, + prefix: str = "", +) -> list: + """Walk the tree and create task records in the workspace. + + Args: + tree: Tree dict from generate_task_tree() + workspace: Target workspace + prd_id: Optional PRD ID to associate tasks with + parent_id: Parent task ID (for children) + position: Position among siblings (1-based) + prefix: Hierarchical ID prefix (e.g., "1.2") + + Returns: + Flat list of all created Task objects + """ + h_id = f"{prefix}{position}" if prefix else str(position) + + task = task_module.create( + workspace=workspace, + title=tree["title"], + description=tree.get("description", ""), + prd_id=prd_id, + parent_id=parent_id, + lineage=tree.get("lineage", []), + is_leaf=tree["is_leaf"], + hierarchical_id=h_id, + ) + + result = [task] + + for i, child in enumerate(tree.get("children", []), start=1): + child_tasks = flatten_task_tree( + child, + workspace, + prd_id=prd_id, + parent_id=task.id, + position=i, + prefix=f"{h_id}.", + ) + result.extend(child_tasks) + + return result + + +def display_task_tree(workspace: Workspace) -> str: + """Build an ASCII tree display of all tasks in a workspace. + + Args: + workspace: Workspace to display tasks from + + Returns: + Formatted ASCII tree string + """ + all_tasks = task_module.list_tasks(workspace) + + if not all_tasks: + return "No tasks found." + + # Build children map and find roots + children_map: dict[str, list] = {} + + for t in all_tasks: + pid = t.parent_id + if pid not in children_map: + children_map[pid] = [] + children_map[pid].append(t) + + # Sort children by hierarchical_id if available, else by title + for pid in children_map: + children_map[pid].sort( + key=lambda t: t.hierarchical_id or t.title + ) + + roots = children_map.get(None, []) + + if not roots: + # No tree structure — display flat list + lines = [] + for t in all_tasks: + icon = _STATUS_ICONS.get(t.status, "\u25cb") + label = t.hierarchical_id or t.id[:8] + kind = "composite" if not t.is_leaf else "atomic" + lines.append(f"{label}. {t.title} [{kind}] {icon}") + return "\n".join(lines) + + lines = [] + for root in roots: + _render_node(root, children_map, lines, indent="", is_last=True) + + return "\n".join(lines) + + +def _render_node( + task, + children_map: dict, + lines: list[str], + indent: str, + is_last: bool, +) -> None: + """Recursively render a task node in ASCII tree format.""" + icon = _STATUS_ICONS.get(task.status, "\u25cb") + label = task.hierarchical_id or task.id[:8] + kind = "composite" if not task.is_leaf else "atomic" + + connector = "\u2514\u2500\u2500 " if is_last else "\u251c\u2500\u2500 " + if not indent: + # Root node — no connector + lines.append(f"{label}. {task.title} [{kind}] {icon}") + else: + lines.append(f"{indent}{connector}{label}. {task.title} [{kind}] {icon}") + + children = children_map.get(task.id, []) + child_indent = indent + (" " if is_last else "\u2502 ") + + for i, child in enumerate(children): + _render_node( + child, children_map, lines, child_indent, is_last=(i == len(children) - 1) + ) + + +def propagate_status(workspace: Workspace, task_id: str) -> None: + """Propagate status changes from a child task up to its parent(s). + + When a child task changes status, the parent's status may need to update: + - All children DONE -> parent DONE + - Any child FAILED -> parent FAILED + - Any child IN_PROGRESS -> parent IN_PROGRESS + - Otherwise -> no change + + Args: + workspace: Workspace containing the tasks + task_id: ID of the task whose status just changed + """ + task = task_module.get(workspace, task_id) + if not task or not task.parent_id: + return + + parent = task_module.get(workspace, task.parent_id) + if not parent or parent.is_leaf: + return + + # Load all children of the parent + all_tasks = task_module.list_tasks(workspace) + children = [t for t in all_tasks if t.parent_id == parent.id] + + if not children: + return + + child_statuses = [c.status for c in children] + + # Determine new parent status + new_status = None + if all(s == TaskStatus.DONE for s in child_statuses): + new_status = TaskStatus.DONE + elif any(s == TaskStatus.FAILED for s in child_statuses): + new_status = TaskStatus.FAILED + elif any(s == TaskStatus.IN_PROGRESS for s in child_statuses): + new_status = TaskStatus.IN_PROGRESS + + if new_status and new_status != parent.status: + task_module.update_status(workspace, parent.id, new_status) + + # Recursively propagate upward + propagate_status(workspace, parent.id) diff --git a/codeframe/core/tasks.py b/codeframe/core/tasks.py index e81befb6..cc0b389c 100644 --- a/codeframe/core/tasks.py +++ b/codeframe/core/tasks.py @@ -56,6 +56,10 @@ class Task: complexity_score: Optional[int] = None uncertainty_level: Optional[str] = None github_issue_number: Optional[int] = None + parent_id: Optional[str] = None + lineage: list[str] = field(default_factory=list) + is_leaf: bool = True + hierarchical_id: Optional[str] = None def create( @@ -69,6 +73,10 @@ def create( estimated_hours: Optional[float] = None, complexity_score: Optional[int] = None, uncertainty_level: Optional[str] = None, + parent_id: Optional[str] = None, + lineage: Optional[list[str]] = None, + is_leaf: bool = True, + hierarchical_id: Optional[str] = None, ) -> Task: """Create a new task. @@ -83,6 +91,10 @@ def create( estimated_hours: Optional time estimate in hours complexity_score: Optional complexity rating 1-5 uncertainty_level: Optional uncertainty level ('low', 'medium', 'high') + parent_id: Optional parent task ID for tree structure + lineage: Optional list of ancestor descriptions + is_leaf: Whether this is a leaf/executable task (default True) + hierarchical_id: Optional display ID like "1.2.3" Returns: Created Task @@ -90,16 +102,17 @@ def create( task_id = str(uuid.uuid4()) now = _utc_now().isoformat() depends_on_list = depends_on or [] + lineage_list = lineage or [] conn = get_db_connection(workspace) try: cursor = conn.cursor() cursor.execute( """ - INSERT INTO tasks (id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + INSERT INTO tasks (id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, parent_id, lineage, is_leaf, hierarchical_id, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, - (task_id, workspace.id, prd_id, title, description, status.value, priority, json.dumps(depends_on_list), estimated_hours, complexity_score, uncertainty_level, now, now), + (task_id, workspace.id, prd_id, title, description, status.value, priority, json.dumps(depends_on_list), estimated_hours, complexity_score, uncertainty_level, parent_id, json.dumps(lineage_list), 1 if is_leaf else 0, hierarchical_id, now, now), ) conn.commit() finally: @@ -117,6 +130,10 @@ def create( estimated_hours=estimated_hours, complexity_score=complexity_score, uncertainty_level=uncertainty_level, + parent_id=parent_id, + lineage=lineage_list, + is_leaf=is_leaf, + hierarchical_id=hierarchical_id, created_at=datetime.fromisoformat(now), updated_at=datetime.fromisoformat(now), ) @@ -137,7 +154,7 @@ def get(workspace: Workspace, task_id: str) -> Optional[Task]: cursor.execute( """ - SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at + SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at, github_issue_number, parent_id, lineage, is_leaf, hierarchical_id FROM tasks WHERE workspace_id = ? AND id = ? """, @@ -173,7 +190,7 @@ def list_tasks( if status: cursor.execute( """ - SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at + SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at, github_issue_number, parent_id, lineage, is_leaf, hierarchical_id FROM tasks WHERE workspace_id = ? AND status = ? ORDER BY priority ASC, created_at ASC @@ -184,7 +201,7 @@ def list_tasks( else: cursor.execute( """ - SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at + SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at, github_issue_number, parent_id, lineage, is_leaf, hierarchical_id FROM tasks WHERE workspace_id = ? ORDER BY priority ASC, created_at ASC @@ -539,9 +556,20 @@ def generate_from_prd( status=TaskStatus.BACKLOG, priority=i, # Priority based on order prd_id=prd.id, + complexity_score=task_data.get("complexity"), + estimated_hours=task_data.get("estimated_hours"), + uncertainty_level=task_data.get("uncertainty"), ) created_tasks.append(task) + # Resolve title-based dependencies to task IDs + title_to_id = {t.title: t.id for t in created_tasks} + for task_data, task in zip(tasks_data, created_tasks): + dep_titles = task_data.get("depends_on_titles", []) + dep_ids = [title_to_id[t] for t in dep_titles if t in title_to_id] + if dep_ids: + update_depends_on(workspace, task.id, dep_ids) + return created_tasks @@ -552,28 +580,30 @@ def _generate_tasks_with_llm(prd_content: str) -> list[dict]: prd_content: PRD text Returns: - List of task dicts with 'title' and 'description' keys + List of task dicts with rich metadata fields """ # Use the LLM adapter for provider-agnostic access from codeframe.adapters.llm import get_provider, Purpose provider = get_provider() - prompt = f"""Analyze the following Product Requirements Document (PRD) and generate a list of actionable development tasks. + prompt = f"""Analyze the following PRD and generate a list of actionable development tasks. -Each task should: -1. Be specific and actionable -2. Have a clear title (under 80 characters) -3. Have a brief description explaining what needs to be done -4. Be ordered by logical dependency/priority +For each task, provide: +1. "title": Clear, specific title (under 80 characters) +2. "description": What needs to be done +3. "depends_on_titles": List of other task titles this depends on (empty list if none) +4. "complexity": Complexity score 1-5 (1=trivial, 5=very complex) +5. "estimated_hours": Estimated hours to complete (float) +6. "uncertainty": "low", "medium", or "high" +7. "files_to_modify": List of file paths likely to be modified (best guess) -Return the tasks as a JSON array with objects containing "title" and "description" fields. -Return ONLY the JSON array, no other text. +Order tasks by logical dependency/priority. -PRD Content: -{prd_content} +Return ONLY a JSON array of objects with these fields. -JSON Tasks:""" +PRD: +{prd_content}""" response = provider.complete( messages=[{"role": "user", "content": prompt}], @@ -587,18 +617,42 @@ def _generate_tasks_with_llm(prd_content: str) -> list[dict]: # Try to find JSON array in response json_match = re.search(r"\[[\s\S]*\]", response_text) if json_match: - tasks = json.loads(json_match.group()) + tasks_raw = json.loads(json_match.group()) else: - tasks = json.loads(response_text) + tasks_raw = json.loads(response_text) - # Validate structure + # Validate and extract rich fields validated = [] - for task in tasks: - if isinstance(task, dict) and "title" in task: - validated.append({ - "title": str(task["title"])[:200], - "description": str(task.get("description", ""))[:2000], - }) + for task in tasks_raw: + if not isinstance(task, dict) or "title" not in task: + continue + + # Extract rich fields with defaults and validation + complexity = task.get("complexity") + if complexity is not None: + complexity = max(1, min(5, int(complexity))) + + estimated_hours = task.get("estimated_hours") + if estimated_hours is not None: + estimated_hours = max(0.1, float(estimated_hours)) + + uncertainty = task.get("uncertainty") + if uncertainty not in ("low", "medium", "high"): + uncertainty = None + + files = task.get("files_to_modify", []) + desc = str(task.get("description", ""))[:2000] + if files: + desc += "\n\nFiles to modify: " + ", ".join(str(f) for f in files) + + validated.append({ + "title": str(task["title"])[:200], + "description": desc, + "depends_on_titles": task.get("depends_on_titles", []), + "complexity": complexity, + "estimated_hours": estimated_hours, + "uncertainty": uncertainty, + }) return validated @@ -650,12 +704,21 @@ def _row_to_task(row: tuple) -> Task: Row columns: id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, - created_at, updated_at + created_at, updated_at, github_issue_number, parent_id, lineage, + is_leaf, hierarchical_id """ # Parse depends_on from JSON string (default to empty list if null) depends_on_raw = row[7] depends_on = json.loads(depends_on_raw) if depends_on_raw else [] + # Parse lineage from JSON string (default to empty list if null) + lineage_raw = row[15] if len(row) > 15 else None + lineage = json.loads(lineage_raw) if lineage_raw else [] + + # Parse is_leaf from integer (default to True if null) + is_leaf_raw = row[16] if len(row) > 16 else 1 + is_leaf = bool(is_leaf_raw) if is_leaf_raw is not None else True + return Task( id=row[0], workspace_id=row[1], @@ -671,4 +734,8 @@ def _row_to_task(row: tuple) -> Task: created_at=datetime.fromisoformat(row[11]), updated_at=datetime.fromisoformat(row[12]), github_issue_number=row[13] if len(row) > 13 else None, + parent_id=row[14] if len(row) > 14 else None, + lineage=lineage, + is_leaf=is_leaf, + hierarchical_id=row[17] if len(row) > 17 else None, ) diff --git a/codeframe/core/workspace.py b/codeframe/core/workspace.py index a517609b..95e2b82f 100644 --- a/codeframe/core/workspace.py +++ b/codeframe/core/workspace.py @@ -113,6 +113,10 @@ def _init_database(db_path: Path) -> None: estimated_hours REAL, complexity_score INTEGER CHECK(complexity_score IS NULL OR (complexity_score BETWEEN 1 AND 5)), uncertainty_level TEXT CHECK(uncertainty_level IS NULL OR uncertainty_level IN ('low', 'medium', 'high')), + parent_id TEXT, + lineage TEXT DEFAULT '[]', + is_leaf INTEGER DEFAULT 1, + hierarchical_id TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY (workspace_id) REFERENCES workspace(id), @@ -135,6 +139,14 @@ def _init_database(db_path: Path) -> None: cursor.execute("ALTER TABLE tasks ADD COLUMN uncertainty_level TEXT") if "github_issue_number" not in columns: cursor.execute("ALTER TABLE tasks ADD COLUMN github_issue_number INTEGER") + if "parent_id" not in columns: + cursor.execute("ALTER TABLE tasks ADD COLUMN parent_id TEXT") + if "lineage" not in columns: + cursor.execute("ALTER TABLE tasks ADD COLUMN lineage TEXT DEFAULT '[]'") + if "is_leaf" not in columns: + cursor.execute("ALTER TABLE tasks ADD COLUMN is_leaf INTEGER DEFAULT 1") + if "hierarchical_id" not in columns: + cursor.execute("ALTER TABLE tasks ADD COLUMN hierarchical_id TEXT") # Append-only event log cursor.execute(""" @@ -409,6 +421,18 @@ def _ensure_schema_upgrades(db_path: Path) -> None: if "uncertainty_level" not in task_columns: cursor.execute("ALTER TABLE tasks ADD COLUMN uncertainty_level TEXT") conn.commit() + if "parent_id" not in task_columns: + cursor.execute("ALTER TABLE tasks ADD COLUMN parent_id TEXT") + conn.commit() + if "lineage" not in task_columns: + cursor.execute("ALTER TABLE tasks ADD COLUMN lineage TEXT DEFAULT '[]'") + conn.commit() + if "is_leaf" not in task_columns: + cursor.execute("ALTER TABLE tasks ADD COLUMN is_leaf INTEGER DEFAULT 1") + conn.commit() + if "hierarchical_id" not in task_columns: + cursor.execute("ALTER TABLE tasks ADD COLUMN hierarchical_id TEXT") + conn.commit() # Ensure runs table exists before creating dependent tables (run_logs, diagnostic_reports) cursor.execute( diff --git a/tests/cli/test_tasks_tree_cli.py b/tests/cli/test_tasks_tree_cli.py new file mode 100644 index 00000000..b467412d --- /dev/null +++ b/tests/cli/test_tasks_tree_cli.py @@ -0,0 +1,150 @@ +"""Tests for CLI tasks tree and recursive generate commands. + +TDD approach: Write tests first, then implement. +""" + +import pytest +from unittest.mock import patch, MagicMock + +from typer.testing import CliRunner + +from codeframe.cli.app import app + +pytestmark = pytest.mark.v2 + +runner = CliRunner() + + +class TestTasksGenerateRecursiveFlag: + """Tests for --recursive flag on 'cf tasks generate'.""" + + def test_tasks_generate_recursive_flag_exists(self, tmp_path): + """Verify --recursive flag is accepted without error.""" + # The flag should be parsed even if the command fails for other reasons + # (e.g., no workspace). We just need to confirm the flag doesn't cause + # a "no such option" error. + result = runner.invoke(app, ["tasks", "generate", "--recursive", "-w", str(tmp_path)]) + # Should NOT fail with "No such option: --recursive" + assert "No such option" not in result.output + + def test_tasks_generate_max_depth_flag_exists(self, tmp_path): + """Verify --max-depth flag is accepted.""" + result = runner.invoke(app, ["tasks", "generate", "--max-depth", "2", "-w", str(tmp_path)]) + assert "No such option" not in result.output + + def test_tasks_generate_recursive_calls_task_tree(self, tmp_path): + """When --recursive is passed, should call generate_task_tree.""" + # Set up workspace + state_dir = tmp_path / ".codeframe" + state_dir.mkdir() + + mock_workspace = MagicMock() + mock_workspace.repo_path = tmp_path + mock_workspace.state_dir = state_dir + + mock_prd = MagicMock() + mock_prd.id = "prd-1" + mock_prd.title = "Test PRD" + mock_prd.content = "Build a calculator" + + mock_tree = { + "title": "Build a calculator", + "description": "Build a calculator", + "is_leaf": False, + "children": [], + "lineage": [], + } + + mock_task = MagicMock() + mock_task.title = "Build a calculator" + mock_task.description = "Build a calculator" + + with ( + patch("codeframe.core.workspace.get_workspace", return_value=mock_workspace), + patch("codeframe.core.prd.get_latest", return_value=mock_prd), + patch("codeframe.cli.validators.require_anthropic_api_key"), + patch("codeframe.adapters.llm.get_provider") as mock_get_provider, + patch("codeframe.core.task_tree.generate_task_tree", return_value=mock_tree) as mock_gen_tree, + patch("codeframe.core.task_tree.flatten_task_tree", return_value=[mock_task]) as mock_flatten, + patch("codeframe.core.events.emit_for_workspace"), + ): + result = runner.invoke( + app, + ["tasks", "generate", "--recursive", "-w", str(tmp_path)], + ) + + mock_gen_tree.assert_called_once() + mock_flatten.assert_called_once() + + def test_tasks_generate_without_recursive_uses_existing_behavior(self, tmp_path): + """Without --recursive, should use the existing generate_from_prd path.""" + state_dir = tmp_path / ".codeframe" + state_dir.mkdir() + + mock_workspace = MagicMock() + mock_workspace.repo_path = tmp_path + mock_workspace.state_dir = state_dir + + mock_prd = MagicMock() + mock_prd.id = "prd-1" + mock_prd.title = "Test PRD" + mock_prd.content = "Build something" + + mock_task = MagicMock() + mock_task.title = "Task 1" + mock_task.description = "Do thing" + + with ( + patch("codeframe.core.workspace.get_workspace", return_value=mock_workspace), + patch("codeframe.core.prd.get_latest", return_value=mock_prd), + patch("codeframe.cli.validators.require_anthropic_api_key"), + patch("codeframe.core.tasks.generate_from_prd", return_value=[mock_task]) as mock_gen, + patch("codeframe.core.events.emit_for_workspace"), + ): + result = runner.invoke( + app, + ["tasks", "generate", "-w", str(tmp_path)], + ) + + mock_gen.assert_called_once() + + +class TestTasksTreeCommand: + """Tests for 'cf tasks tree' command.""" + + def test_tasks_tree_no_workspace(self, tmp_path): + """Shows error when no workspace exists.""" + result = runner.invoke(app, ["tasks", "tree", "-w", str(tmp_path)]) + assert result.exit_code != 0 + + def test_tasks_tree_empty(self, tmp_path): + """Shows 'No tasks found' when workspace has no tasks.""" + mock_workspace = MagicMock() + mock_workspace.repo_path = tmp_path + + with ( + patch("codeframe.core.workspace.get_workspace", return_value=mock_workspace), + patch("codeframe.core.task_tree.display_task_tree", return_value=""), + ): + result = runner.invoke(app, ["tasks", "tree", "-w", str(tmp_path)]) + assert "No tasks found" in result.output + + def test_tasks_tree_with_data(self, tmp_path): + """Shows tree output when tasks exist.""" + mock_workspace = MagicMock() + mock_workspace.repo_path = tmp_path + + tree_output = ( + "1. Build calculator [composite] \u25cb\n" + " \u2514\u2500\u2500 1.1. Add numbers [atomic] \u25cb\n" + " \u2514\u2500\u2500 1.2. Subtract numbers [atomic] \u25cb" + ) + + with ( + patch("codeframe.core.workspace.get_workspace", return_value=mock_workspace), + patch("codeframe.core.task_tree.display_task_tree", return_value=tree_output), + ): + result = runner.invoke(app, ["tasks", "tree", "-w", str(tmp_path)]) + assert "Build calculator" in result.output + assert "Add numbers" in result.output + assert "Subtract numbers" in result.output diff --git a/tests/core/test_context_packager.py b/tests/core/test_context_packager.py index dece2359..51024f22 100644 --- a/tests/core/test_context_packager.py +++ b/tests/core/test_context_packager.py @@ -460,3 +460,83 @@ def test_content_matches_prompt(self, mock_workspace, mock_task_context, tmp_pat packager.to_task_file(packaged, task_file) assert task_file.read_text(encoding="utf-8") == packaged.prompt + + +class TestLineageContext: + """Tests for lineage inclusion in build() prompt.""" + + def test_context_packager_includes_lineage(self, mock_workspace): + """Task with lineage should have 'Task Lineage' section in prompt.""" + ctx = MagicMock(spec=TaskContext) + ctx.task = MagicMock() + ctx.task.lineage = ["Build app", "Authentication module"] + ctx.to_prompt_context.return_value = ( + "## Task\n**Title:** Implement JWT\n**Description:** Add tokens\n" + ) + ctx.relevant_files = [] + + with patch("codeframe.core.context_packager.ContextLoader") as MockLoader: + MockLoader.return_value.load.return_value = ctx + + packager = TaskContextPackager(mock_workspace) + result = packager.build("task-1") + + assert "Task Lineage" in result.prompt + assert "Build app" in result.prompt + assert "Authentication module" in result.prompt + + def test_context_packager_no_lineage(self, mock_workspace): + """Task without lineage should not have 'Task Lineage' section.""" + ctx = MagicMock(spec=TaskContext) + ctx.task = MagicMock() + ctx.task.lineage = [] + ctx.to_prompt_context.return_value = ( + "## Task\n**Title:** Simple task\n**Description:** Do it\n" + ) + ctx.relevant_files = [] + + with patch("codeframe.core.context_packager.ContextLoader") as MockLoader: + MockLoader.return_value.load.return_value = ctx + + packager = TaskContextPackager(mock_workspace) + result = packager.build("task-1") + + assert "Task Lineage" not in result.prompt + + def test_context_packager_lineage_missing_attribute(self, mock_workspace): + """Task without lineage attribute should not have 'Task Lineage' section.""" + ctx = MagicMock(spec=TaskContext) + ctx.task = MagicMock(spec=["title", "description", "id"]) + # No lineage attribute on task + ctx.to_prompt_context.return_value = ( + "## Task\n**Title:** Old task\n**Description:** Legacy\n" + ) + ctx.relevant_files = [] + + with patch("codeframe.core.context_packager.ContextLoader") as MockLoader: + MockLoader.return_value.load.return_value = ctx + + packager = TaskContextPackager(mock_workspace) + result = packager.build("task-1") + + assert "Task Lineage" not in result.prompt + + def test_lineage_appears_before_gates(self, mock_workspace): + """Lineage section should appear before Verification Gates.""" + ctx = MagicMock(spec=TaskContext) + ctx.task = MagicMock() + ctx.task.lineage = ["Parent task"] + ctx.to_prompt_context.return_value = ( + "## Task\n**Title:** Child task\n" + ) + ctx.relevant_files = [] + + with patch("codeframe.core.context_packager.ContextLoader") as MockLoader: + MockLoader.return_value.load.return_value = ctx + + packager = TaskContextPackager(mock_workspace) + result = packager.build("task-1") + + lineage_pos = result.prompt.index("Task Lineage") + gates_pos = result.prompt.index("Verification Gates") + assert lineage_pos < gates_pos diff --git a/tests/core/test_rich_task_generation.py b/tests/core/test_rich_task_generation.py new file mode 100644 index 00000000..2b65d33b --- /dev/null +++ b/tests/core/test_rich_task_generation.py @@ -0,0 +1,229 @@ +"""Tests for enhanced LLM task generation with rich metadata. + +Part of issue #420 - Richer Task Generation, Step 2: Enhanced LLM Prompt. +""" + +import json + +import pytest + +from codeframe.adapters.llm.mock import MockProvider +from codeframe.core import tasks +from codeframe.core.workspace import create_or_load_workspace + +pytestmark = pytest.mark.v2 + + +@pytest.fixture +def workspace(tmp_path): + """Create a test workspace.""" + return create_or_load_workspace(tmp_path) + + +@pytest.fixture +def sample_prd(workspace): + """Create a sample PRD for task generation.""" + from codeframe.core.prd import store + + return store(workspace, content="# Sample PRD\n\nBuild a todo app with auth.") + + +def _make_mock_provider(response_json: list[dict]) -> MockProvider: + """Create a MockProvider that returns the given JSON array.""" + mock = MockProvider() + mock.add_text_response(json.dumps(response_json)) + return mock + + +class TestGenerateWithRichMetadata: + """Test that LLM-generated tasks populate rich fields.""" + + def test_generate_with_rich_metadata(self, workspace, sample_prd, monkeypatch): + """Mock LLM returns rich JSON, verify complexity/hours/uncertainty populated.""" + rich_tasks = [ + { + "title": "Set up project structure", + "description": "Initialize the project with proper directory layout", + "depends_on_titles": [], + "complexity": 2, + "estimated_hours": 1.5, + "uncertainty": "low", + "files_to_modify": ["setup.py", "pyproject.toml"], + }, + { + "title": "Implement authentication", + "description": "Add user login and registration", + "depends_on_titles": ["Set up project structure"], + "complexity": 4, + "estimated_hours": 8.0, + "uncertainty": "medium", + "files_to_modify": ["auth/views.py", "auth/models.py"], + }, + ] + mock = _make_mock_provider(rich_tasks) + monkeypatch.setattr("codeframe.adapters.llm.get_provider", lambda: mock) + + created = tasks.generate_from_prd(workspace, sample_prd, use_llm=True) + + assert len(created) == 2 + assert created[0].complexity_score == 2 + assert created[0].estimated_hours == 1.5 + assert created[0].uncertainty_level == "low" + assert created[1].complexity_score == 4 + assert created[1].estimated_hours == 8.0 + assert created[1].uncertainty_level == "medium" + + def test_generate_resolves_dependencies(self, workspace, sample_prd, monkeypatch): + """Mock LLM with depends_on_titles, verify depends_on IDs set.""" + rich_tasks = [ + { + "title": "Task A", + "description": "First task", + "depends_on_titles": [], + "complexity": 1, + "estimated_hours": 1.0, + "uncertainty": "low", + }, + { + "title": "Task B", + "description": "Depends on A", + "depends_on_titles": ["Task A"], + "complexity": 2, + "estimated_hours": 2.0, + "uncertainty": "low", + }, + { + "title": "Task C", + "description": "Depends on A and B", + "depends_on_titles": ["Task A", "Task B"], + "complexity": 3, + "estimated_hours": 3.0, + "uncertainty": "medium", + }, + ] + mock = _make_mock_provider(rich_tasks) + monkeypatch.setattr("codeframe.adapters.llm.get_provider", lambda: mock) + + created = tasks.generate_from_prd(workspace, sample_prd, use_llm=True) + + assert len(created) == 3 + + # Re-fetch to get updated depends_on + task_a = tasks.get(workspace, created[0].id) + task_b = tasks.get(workspace, created[1].id) + task_c = tasks.get(workspace, created[2].id) + + assert task_a.depends_on == [] + assert task_b.depends_on == [task_a.id] + assert set(task_c.depends_on) == {task_a.id, task_b.id} + + def test_generate_clamps_complexity(self, workspace, sample_prd, monkeypatch): + """Complexity > 5 clamped to 5, < 1 clamped to 1.""" + rich_tasks = [ + { + "title": "Overcomplicated task", + "description": "Too complex", + "complexity": 10, + "estimated_hours": 1.0, + }, + { + "title": "Simple task", + "description": "Too simple", + "complexity": 0, + "estimated_hours": 0.5, + }, + ] + mock = _make_mock_provider(rich_tasks) + monkeypatch.setattr("codeframe.adapters.llm.get_provider", lambda: mock) + + created = tasks.generate_from_prd(workspace, sample_prd, use_llm=True) + + assert created[0].complexity_score == 5 + assert created[1].complexity_score == 1 + + def test_generate_fallback_missing_fields(self, workspace, sample_prd, monkeypatch): + """LLM returns only title/desc, verify None defaults work.""" + minimal_tasks = [ + {"title": "Minimal task", "description": "Just title and desc"}, + ] + mock = _make_mock_provider(minimal_tasks) + monkeypatch.setattr("codeframe.adapters.llm.get_provider", lambda: mock) + + created = tasks.generate_from_prd(workspace, sample_prd, use_llm=True) + + assert len(created) == 1 + assert created[0].complexity_score is None + assert created[0].estimated_hours is None + assert created[0].uncertainty_level is None + assert created[0].depends_on == [] + + def test_generate_backward_compat(self, workspace, sample_prd, monkeypatch): + """Existing generate behavior unchanged - simple title/desc tasks work.""" + simple_tasks = [ + {"title": "Task one", "description": "Do thing one"}, + {"title": "Task two", "description": "Do thing two"}, + ] + mock = _make_mock_provider(simple_tasks) + monkeypatch.setattr("codeframe.adapters.llm.get_provider", lambda: mock) + + created = tasks.generate_from_prd(workspace, sample_prd, use_llm=True) + + assert len(created) == 2 + assert created[0].title == "Task one" + assert created[0].description == "Do thing one" + assert created[1].title == "Task two" + + def test_generate_files_in_description(self, workspace, sample_prd, monkeypatch): + """files_to_modify appended to description.""" + rich_tasks = [ + { + "title": "Update models", + "description": "Modify the data models", + "files_to_modify": ["models.py", "schemas.py"], + "complexity": 2, + "estimated_hours": 3.0, + }, + ] + mock = _make_mock_provider(rich_tasks) + monkeypatch.setattr("codeframe.adapters.llm.get_provider", lambda: mock) + + created = tasks.generate_from_prd(workspace, sample_prd, use_llm=True) + + assert "models.py" in created[0].description + assert "schemas.py" in created[0].description + assert "Files to modify:" in created[0].description + + def test_generate_invalid_uncertainty_ignored(self, workspace, sample_prd, monkeypatch): + """Invalid uncertainty values are set to None.""" + rich_tasks = [ + { + "title": "Task with bad uncertainty", + "description": "Bad value", + "uncertainty": "very_high", + "complexity": 3, + "estimated_hours": 2.0, + }, + ] + mock = _make_mock_provider(rich_tasks) + monkeypatch.setattr("codeframe.adapters.llm.get_provider", lambda: mock) + + created = tasks.generate_from_prd(workspace, sample_prd, use_llm=True) + + assert created[0].uncertainty_level is None + + def test_generate_estimated_hours_min_clamp(self, workspace, sample_prd, monkeypatch): + """Estimated hours below 0.1 get clamped to 0.1.""" + rich_tasks = [ + { + "title": "Tiny task", + "description": "Very small", + "estimated_hours": 0.01, + "complexity": 1, + }, + ] + mock = _make_mock_provider(rich_tasks) + monkeypatch.setattr("codeframe.adapters.llm.get_provider", lambda: mock) + + created = tasks.generate_from_prd(workspace, sample_prd, use_llm=True) + + assert created[0].estimated_hours == 0.1 diff --git a/tests/core/test_task_tree.py b/tests/core/test_task_tree.py new file mode 100644 index 00000000..3d5f1fef --- /dev/null +++ b/tests/core/test_task_tree.py @@ -0,0 +1,502 @@ +"""Tests for recursive task decomposition and tree operations.""" + +import json + +import pytest + +from codeframe.adapters.llm.mock import MockProvider +from codeframe.core import tasks +from codeframe.core.state_machine import TaskStatus +from codeframe.core.task_tree import ( + classify_task, + decompose_task, + display_task_tree, + flatten_task_tree, + generate_task_tree, + propagate_status, +) +from codeframe.core.workspace import create_or_load_workspace + +pytestmark = pytest.mark.v2 + + +@pytest.fixture +def workspace(tmp_path): + """Create a test workspace.""" + return create_or_load_workspace(tmp_path) + + +@pytest.fixture +def provider(): + """Create a mock LLM provider.""" + return MockProvider() + + +class TestClassifyTask: + """Test task classification (atomic vs composite).""" + + def test_classify_atomic(self, provider): + """LLM returns 'atomic', verify result.""" + provider.add_text_response("atomic") + result = classify_task(provider, "Add a login button", []) + assert result == "atomic" + + def test_classify_composite(self, provider): + """LLM returns 'composite', verify result.""" + provider.add_text_response("composite") + result = classify_task(provider, "Build entire auth system", []) + assert result == "composite" + + def test_classify_defaults_to_atomic(self, provider): + """LLM returns garbage, verify 'atomic' default.""" + provider.add_text_response("I think this task is somewhere in between") + result = classify_task(provider, "Some task", []) + assert result == "atomic" + + def test_classify_uses_planning_purpose(self, provider): + """Should use Purpose.PLANNING for classification.""" + from codeframe.adapters.llm.base import Purpose + + provider.add_text_response("atomic") + classify_task(provider, "Test task", ["parent context"]) + + assert provider.call_count == 1 + assert provider.last_call["purpose"] == Purpose.PLANNING + + def test_classify_includes_lineage_in_prompt(self, provider): + """Should include lineage context in the user message.""" + provider.add_text_response("atomic") + classify_task(provider, "Create schema", ["Build backend", "Set up database"]) + + user_msg = provider.last_call["messages"][0]["content"] + assert "Build backend" in user_msg + assert "Set up database" in user_msg + + +class TestDecomposeTask: + """Test task decomposition into subtasks.""" + + def test_decompose_returns_subtasks(self, provider): + """LLM returns JSON array, verify parsed result.""" + subtasks = [ + {"title": "Create user table", "description": "Define user schema"}, + {"title": "Add auth endpoints", "description": "Login and register"}, + {"title": "Write tests", "description": "Unit tests for auth"}, + ] + provider.add_text_response(json.dumps(subtasks)) + + result = decompose_task(provider, "Build auth system", []) + + assert len(result) == 3 + assert result[0]["title"] == "Create user table" + assert result[1]["description"] == "Login and register" + + def test_decompose_clamps_to_7(self, provider): + """LLM returns 10 items, verify truncated to 7.""" + subtasks = [{"title": f"Task {i}", "description": f"Desc {i}"} for i in range(10)] + provider.add_text_response(json.dumps(subtasks)) + + result = decompose_task(provider, "Huge task", []) + + assert len(result) == 7 + + def test_decompose_handles_fewer_than_2(self, provider): + """LLM returns 1 item, verify padded to at least 2.""" + subtasks = [{"title": "Only one", "description": "Single task"}] + provider.add_text_response(json.dumps(subtasks)) + + result = decompose_task(provider, "Small task", []) + + assert len(result) >= 2 + + def test_decompose_handles_markdown_wrapped_json(self, provider): + """LLM wraps JSON in markdown code block.""" + subtasks = [ + {"title": "Sub A", "description": "A"}, + {"title": "Sub B", "description": "B"}, + ] + response = f"```json\n{json.dumps(subtasks)}\n```" + provider.add_text_response(response) + + result = decompose_task(provider, "Task with markdown", []) + + assert len(result) == 2 + assert result[0]["title"] == "Sub A" + + def test_decompose_returns_empty_on_invalid_json(self, provider): + """LLM returns garbage, should return fallback subtasks.""" + provider.add_text_response("This is not JSON at all") + + result = decompose_task(provider, "Bad response task", []) + + # Should return at least 2 fallback items + assert len(result) >= 2 + + +class TestGenerateTaskTree: + """Test recursive task tree generation.""" + + def test_generate_task_tree_leaf(self, provider): + """Atomic task produces leaf node.""" + provider.add_text_response("atomic") + + result = generate_task_tree(provider, "Simple task") + + assert result["is_leaf"] is True + assert result["children"] == [] + assert result["description"] == "Simple task" + + def test_generate_task_tree_composite(self, provider): + """Composite task produces children.""" + # First call: classify as composite + provider.add_text_response("composite") + # Second call: decompose into subtasks + subtasks = [ + {"title": "Child A", "description": "Do A"}, + {"title": "Child B", "description": "Do B"}, + ] + provider.add_text_response(json.dumps(subtasks)) + # Third + fourth calls: classify children as atomic + provider.add_text_response("atomic") + provider.add_text_response("atomic") + + result = generate_task_tree(provider, "Parent task") + + assert result["is_leaf"] is False + assert len(result["children"]) == 2 + assert result["children"][0]["description"] == "Do A" + assert result["children"][0]["is_leaf"] is True + assert result["children"][1]["description"] == "Do B" + + def test_generate_task_tree_max_depth(self, provider): + """Respects max_depth — forces leaf at depth limit.""" + # Even though we don't set up classify responses, + # max_depth=0 should immediately return leaf + result = generate_task_tree(provider, "Deep task", depth=0, max_depth=0) + + assert result["is_leaf"] is True + assert result["children"] == [] + # No LLM calls should have been made + assert provider.call_count == 0 + + def test_generate_task_tree_lineage_propagates(self, provider): + """Lineage accumulates through recursion.""" + provider.add_text_response("composite") + subtasks = [{"title": "Child", "description": "Child task"}] + # Will be padded to 2 + provider.add_text_response(json.dumps(subtasks)) + provider.add_text_response("atomic") + provider.add_text_response("atomic") + + result = generate_task_tree(provider, "Root", lineage=["Grandparent"]) + + assert result["lineage"] == ["Grandparent"] + for child in result["children"]: + assert "Root" in child["lineage"] + assert "Grandparent" in child["lineage"] + + +class TestFlattenTaskTree: + """Test flattening tree into workspace tasks.""" + + def test_flatten_creates_tasks(self, workspace): + """Flatten tree into workspace, verify tasks exist.""" + tree = { + "title": "Root task", + "description": "Root description", + "is_leaf": False, + "children": [ + { + "title": "Child A", + "description": "Child A desc", + "is_leaf": True, + "children": [], + "lineage": ["Root description"], + }, + { + "title": "Child B", + "description": "Child B desc", + "is_leaf": True, + "children": [], + "lineage": ["Root description"], + }, + ], + "lineage": [], + } + + result = flatten_task_tree(tree, workspace) + + assert len(result) == 3 # root + 2 children + all_tasks = tasks.list_tasks(workspace) + assert len(all_tasks) == 3 + + def test_flatten_sets_hierarchical_ids(self, workspace): + """Verify hierarchical IDs like '1', '1.1', '1.2'.""" + tree = { + "title": "Root", + "description": "Root desc", + "is_leaf": False, + "children": [ + { + "title": "Child 1", + "description": "C1", + "is_leaf": True, + "children": [], + "lineage": ["Root desc"], + }, + { + "title": "Child 2", + "description": "C2", + "is_leaf": True, + "children": [], + "lineage": ["Root desc"], + }, + ], + "lineage": [], + } + + result = flatten_task_tree(tree, workspace) + + h_ids = [t.hierarchical_id for t in result] + assert "1" in h_ids + assert "1.1" in h_ids + assert "1.2" in h_ids + + def test_flatten_sets_parent_ids(self, workspace): + """Children should reference parent task ID.""" + tree = { + "title": "Parent", + "description": "Parent desc", + "is_leaf": False, + "children": [ + { + "title": "Child", + "description": "Child desc", + "is_leaf": True, + "children": [], + "lineage": ["Parent desc"], + }, + ], + "lineage": [], + } + + result = flatten_task_tree(tree, workspace) + + parent = [t for t in result if t.title == "Parent"][0] + child = [t for t in result if t.title == "Child"][0] + assert child.parent_id == parent.id + assert parent.parent_id is None + + def test_flatten_nested_hierarchical_ids(self, workspace): + """Verify deeply nested IDs like '1.1.1'.""" + tree = { + "title": "Root", + "description": "Root", + "is_leaf": False, + "children": [ + { + "title": "Mid", + "description": "Mid", + "is_leaf": False, + "children": [ + { + "title": "Leaf", + "description": "Leaf", + "is_leaf": True, + "children": [], + "lineage": ["Root", "Mid"], + }, + ], + "lineage": ["Root"], + }, + ], + "lineage": [], + } + + result = flatten_task_tree(tree, workspace) + + h_ids = [t.hierarchical_id for t in result] + assert "1" in h_ids + assert "1.1" in h_ids + assert "1.1.1" in h_ids + + +class TestDisplayTaskTree: + """Test ASCII tree display.""" + + def test_display_task_tree_format(self, workspace): + """Verify ASCII output contains expected elements.""" + # Create a parent and two children manually + parent = tasks.create( + workspace, + title="Set up backend", + description="Backend setup", + status=TaskStatus.IN_PROGRESS, + ) + # Manually update to add tree fields + _update_tree_fields(workspace, parent.id, hierarchical_id="1", is_leaf=False) + + child1 = tasks.create( + workspace, + title="Create schema", + description="DB schema", + status=TaskStatus.DONE, + ) + _update_tree_fields( + workspace, child1.id, parent_id=parent.id, hierarchical_id="1.1", is_leaf=True + ) + + child2 = tasks.create( + workspace, + title="REST endpoints", + description="API endpoints", + status=TaskStatus.BACKLOG, + ) + _update_tree_fields( + workspace, child2.id, parent_id=parent.id, hierarchical_id="1.2", is_leaf=True + ) + + output = display_task_tree(workspace) + + assert "Set up backend" in output + assert "Create schema" in output + assert "REST endpoints" in output + # Status icons + assert "\u2713" in output # DONE checkmark + assert "\u25cf" in output # IN_PROGRESS bullet + assert "\u25cb" in output # BACKLOG circle + + def test_display_empty_workspace(self, workspace): + """Empty workspace should return informative message.""" + output = display_task_tree(workspace) + assert "No tasks" in output or output.strip() == "" + + +class TestPropagateStatus: + """Test status propagation from children to parents.""" + + def test_propagate_status_all_done(self, workspace): + """All children done -> parent done.""" + parent = tasks.create( + workspace, title="Parent", status=TaskStatus.IN_PROGRESS + ) + _update_tree_fields(workspace, parent.id, is_leaf=False) + + child1 = tasks.create(workspace, title="C1", status=TaskStatus.DONE) + _update_tree_fields(workspace, child1.id, parent_id=parent.id, is_leaf=True) + + child2 = tasks.create(workspace, title="C2", status=TaskStatus.DONE) + _update_tree_fields(workspace, child2.id, parent_id=parent.id, is_leaf=True) + + propagate_status(workspace, child1.id) + + updated_parent = tasks.get(workspace, parent.id) + assert updated_parent.status == TaskStatus.DONE + + def test_propagate_status_any_failed(self, workspace): + """Child failed -> parent failed.""" + parent = tasks.create( + workspace, title="Parent", status=TaskStatus.IN_PROGRESS + ) + _update_tree_fields(workspace, parent.id, is_leaf=False) + + child1 = tasks.create(workspace, title="C1", status=TaskStatus.DONE) + _update_tree_fields(workspace, child1.id, parent_id=parent.id, is_leaf=True) + + child2 = tasks.create(workspace, title="C2", status=TaskStatus.IN_PROGRESS) + _update_tree_fields(workspace, child2.id, parent_id=parent.id, is_leaf=True) + # Transition to FAILED + tasks.update_status(workspace, child2.id, TaskStatus.FAILED) + + propagate_status(workspace, child2.id) + + updated_parent = tasks.get(workspace, parent.id) + assert updated_parent.status == TaskStatus.FAILED + + def test_propagate_status_recursive(self, workspace): + """Propagates up multiple levels.""" + grandparent = tasks.create( + workspace, title="GP", status=TaskStatus.IN_PROGRESS + ) + _update_tree_fields(workspace, grandparent.id, is_leaf=False) + + parent = tasks.create( + workspace, title="P", status=TaskStatus.IN_PROGRESS + ) + _update_tree_fields( + workspace, parent.id, parent_id=grandparent.id, is_leaf=False + ) + + child = tasks.create(workspace, title="C", status=TaskStatus.DONE) + _update_tree_fields(workspace, child.id, parent_id=parent.id, is_leaf=True) + + propagate_status(workspace, child.id) + + updated_parent = tasks.get(workspace, parent.id) + assert updated_parent.status == TaskStatus.DONE + + updated_gp = tasks.get(workspace, grandparent.id) + assert updated_gp.status == TaskStatus.DONE + + def test_propagate_no_parent(self, workspace): + """Root task with no parent should be a no-op.""" + root = tasks.create(workspace, title="Root", status=TaskStatus.DONE) + # Should not raise + propagate_status(workspace, root.id) + + def test_propagate_in_progress_child(self, workspace): + """Any child IN_PROGRESS -> parent IN_PROGRESS.""" + parent = tasks.create( + workspace, title="Parent", status=TaskStatus.IN_PROGRESS + ) + _update_tree_fields(workspace, parent.id, is_leaf=False) + + child1 = tasks.create(workspace, title="C1", status=TaskStatus.DONE) + _update_tree_fields(workspace, child1.id, parent_id=parent.id, is_leaf=True) + + child2 = tasks.create(workspace, title="C2", status=TaskStatus.IN_PROGRESS) + _update_tree_fields(workspace, child2.id, parent_id=parent.id, is_leaf=True) + + propagate_status(workspace, child2.id) + + updated_parent = tasks.get(workspace, parent.id) + assert updated_parent.status == TaskStatus.IN_PROGRESS + + +def _update_tree_fields( + workspace, + task_id: str, + parent_id: str = None, + hierarchical_id: str = None, + is_leaf: bool = None, + lineage: list = None, +): + """Helper to update tree-specific fields directly in DB.""" + from codeframe.core.workspace import get_db_connection + + conn = get_db_connection(workspace) + try: + cursor = conn.cursor() + updates = [] + params = [] + if parent_id is not None: + updates.append("parent_id = ?") + params.append(parent_id) + if hierarchical_id is not None: + updates.append("hierarchical_id = ?") + params.append(hierarchical_id) + if is_leaf is not None: + updates.append("is_leaf = ?") + params.append(1 if is_leaf else 0) + if lineage is not None: + updates.append("lineage = ?") + params.append(json.dumps(lineage)) + if updates: + params.extend([workspace.id, task_id]) + cursor.execute( + f"UPDATE tasks SET {', '.join(updates)} WHERE workspace_id = ? AND id = ?", + params, + ) + conn.commit() + finally: + conn.close() diff --git a/tests/core/test_task_tree_schema.py b/tests/core/test_task_tree_schema.py new file mode 100644 index 00000000..04b454b2 --- /dev/null +++ b/tests/core/test_task_tree_schema.py @@ -0,0 +1,121 @@ +"""Tests for task tree schema fields (parent_id, lineage, is_leaf, hierarchical_id). + +Part of issue #420 - Richer Task Generation, Step 1: Schema + Model Extension. +""" + +import pytest + +from codeframe.core import tasks +from codeframe.core.workspace import create_or_load_workspace + +pytestmark = pytest.mark.v2 + + +@pytest.fixture +def workspace(tmp_path): + """Create a test workspace.""" + return create_or_load_workspace(tmp_path) + + +class TestTaskTreeSchemaFields: + """Test the new tree-structure fields on Task model.""" + + def test_create_task_with_parent_id(self, workspace): + """Create a task with parent_id, verify retrieval.""" + parent = tasks.create(workspace, title="Parent task") + child = tasks.create( + workspace, + title="Child task", + parent_id=parent.id, + ) + + assert child.parent_id == parent.id + + # Verify persistence + retrieved = tasks.get(workspace, child.id) + assert retrieved.parent_id == parent.id + + def test_create_leaf_task(self, workspace): + """Default is_leaf should be True.""" + task = tasks.create(workspace, title="Leaf task") + assert task.is_leaf is True + + retrieved = tasks.get(workspace, task.id) + assert retrieved.is_leaf is True + + def test_create_composite_task(self, workspace): + """Composite tasks have is_leaf=False.""" + task = tasks.create( + workspace, + title="Composite task", + is_leaf=False, + ) + assert task.is_leaf is False + + retrieved = tasks.get(workspace, task.id) + assert retrieved.is_leaf is False + + def test_create_task_with_lineage(self, workspace): + """Lineage stored and retrieved as list.""" + lineage = ["Epic: User Auth", "Story: Login Form"] + task = tasks.create( + workspace, + title="Implement email field", + lineage=lineage, + ) + assert task.lineage == lineage + + retrieved = tasks.get(workspace, task.id) + assert retrieved.lineage == lineage + + def test_create_task_with_hierarchical_id(self, workspace): + """Verify '1.2.3' style hierarchical ID.""" + task = tasks.create( + workspace, + title="Sub-sub-task", + hierarchical_id="1.2.3", + ) + assert task.hierarchical_id == "1.2.3" + + retrieved = tasks.get(workspace, task.id) + assert retrieved.hierarchical_id == "1.2.3" + + def test_backward_compat_no_new_fields(self, workspace): + """Creating a task without new fields still works.""" + task = tasks.create(workspace, title="Simple task") + + assert task.parent_id is None + assert task.lineage == [] + assert task.is_leaf is True + assert task.hierarchical_id is None + + retrieved = tasks.get(workspace, task.id) + assert retrieved.parent_id is None + assert retrieved.lineage == [] + assert retrieved.is_leaf is True + assert retrieved.hierarchical_id is None + + def test_list_tasks_includes_tree_fields(self, workspace): + """list_tasks should return tree fields.""" + parent = tasks.create( + workspace, + title="Parent", + is_leaf=False, + hierarchical_id="1", + ) + child = tasks.create( + workspace, + title="Child", + parent_id=parent.id, + lineage=["Parent"], + hierarchical_id="1.1", + ) + + all_tasks = tasks.list_tasks(workspace) + task_map = {t.title: t for t in all_tasks} + + assert task_map["Parent"].is_leaf is False + assert task_map["Parent"].hierarchical_id == "1" + assert task_map["Child"].parent_id == parent.id + assert task_map["Child"].lineage == ["Parent"] + assert task_map["Child"].hierarchical_id == "1.1"