Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 126 additions & 98 deletions codeframe/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,20 @@ def get(workspace: Workspace, task_id: str) -> Optional[Task]:
Task if found, None otherwise
"""
conn = get_db_connection(workspace)
cursor = conn.cursor()

cursor.execute(
"""
SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at, github_issue_number, parent_id, lineage, is_leaf, hierarchical_id, requirement_ids, external_url, auto_close_github_issue
FROM tasks
WHERE workspace_id = ? AND id = ?
""",
(workspace.id, task_id),
)
row = cursor.fetchone()
conn.close()
try:
cursor = conn.cursor()

cursor.execute(
"""
SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at, github_issue_number, parent_id, lineage, is_leaf, hierarchical_id, requirement_ids, external_url, auto_close_github_issue
FROM tasks
WHERE workspace_id = ? AND id = ?
""",
(workspace.id, task_id),
)
row = cursor.fetchone()
finally:
conn.close()

if not row:
return None
Expand Down Expand Up @@ -288,33 +290,35 @@ def list_tasks(
List of Tasks
"""
conn = get_db_connection(workspace)
cursor = conn.cursor()
try:
cursor = conn.cursor()

if status:
cursor.execute(
"""
SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at, github_issue_number, parent_id, lineage, is_leaf, hierarchical_id, requirement_ids, external_url, auto_close_github_issue
FROM tasks
WHERE workspace_id = ? AND status = ?
ORDER BY priority ASC, created_at ASC
LIMIT ?
""",
(workspace.id, status.value, limit),
)
else:
cursor.execute(
"""
SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at, github_issue_number, parent_id, lineage, is_leaf, hierarchical_id, requirement_ids, external_url, auto_close_github_issue
FROM tasks
WHERE workspace_id = ?
ORDER BY priority ASC, created_at ASC
LIMIT ?
""",
(workspace.id, limit),
)
if status:
cursor.execute(
"""
SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at, github_issue_number, parent_id, lineage, is_leaf, hierarchical_id, requirement_ids, external_url, auto_close_github_issue
FROM tasks
WHERE workspace_id = ? AND status = ?
ORDER BY priority ASC, created_at ASC
LIMIT ?
""",
(workspace.id, status.value, limit),
)
else:
cursor.execute(
"""
SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at, github_issue_number, parent_id, lineage, is_leaf, hierarchical_id, requirement_ids, external_url, auto_close_github_issue
FROM tasks
WHERE workspace_id = ?
ORDER BY priority ASC, created_at ASC
LIMIT ?
""",
(workspace.id, limit),
)

rows = cursor.fetchall()
conn.close()
rows = cursor.fetchall()
finally:
conn.close()

return [_row_to_task(row) for row in rows]

Expand Down Expand Up @@ -468,6 +472,14 @@ def _dispatch_github_autoclose(workspace: Workspace, task: Task) -> None:
# a short-lived CLI process for long at exit.
_AUTOCLOSE_TIMEOUT = 10.0

# Strong references to in-flight auto-close tasks scheduled on a running event
# loop. asyncio only keeps a *weak* reference to a task created with
# ``loop.create_task``, so without this set a task could be garbage-collected
# mid-flight (leaving the issue open and emitting a "Task was destroyed but it
# is pending" warning). Tasks remove themselves via the done-callback once they
# complete. Mirrors ``WebhookNotificationService.send_event_background``.
_background_tasks: set[asyncio.Task] = set()


async def _safe_close_issue(pat: str, repo: str, issue_number: int) -> None:
"""Close the issue, swallowing every error (best-effort, off the hot path)."""
Expand Down Expand Up @@ -512,7 +524,13 @@ def _close_issue_background(pat: str, repo: str, issue_number: int) -> None:
name=f"gh-autoclose-{issue_number}",
).start()
else:
loop.create_task(_safe_close_issue(pat, repo, issue_number))
# Retain a strong reference until the task finishes so asyncio cannot
# garbage-collect it mid-flight; the done-callback consumes any
# exception (``_safe_close_issue`` already swallows them) and drops the
# reference.
task = loop.create_task(_safe_close_issue(pat, repo, issue_number))
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)


def update(
Expand Down Expand Up @@ -626,18 +644,20 @@ def update_depends_on(
now = _utc_now().isoformat()

conn = get_db_connection(workspace)
cursor = conn.cursor()

cursor.execute(
"""
UPDATE tasks
SET depends_on = ?, updated_at = ?
WHERE workspace_id = ? AND id = ?
""",
(json.dumps(depends_on), now, workspace.id, task_id),
)
conn.commit()
conn.close()
try:
cursor = conn.cursor()

cursor.execute(
"""
UPDATE tasks
SET depends_on = ?, updated_at = ?
WHERE workspace_id = ? AND id = ?
""",
(json.dumps(depends_on), now, workspace.id, task_id),
)
conn.commit()
finally:
conn.close()

task.depends_on = depends_on
task.updated_at = datetime.fromisoformat(now)
Expand Down Expand Up @@ -670,17 +690,19 @@ def update_requirement_ids(
now = _utc_now().isoformat()

conn = get_db_connection(workspace)
cursor = conn.cursor()
cursor.execute(
"""
UPDATE tasks
SET requirement_ids = ?, updated_at = ?
WHERE workspace_id = ? AND id = ?
""",
(json.dumps(requirement_ids), now, workspace.id, task_id),
)
conn.commit()
conn.close()
try:
cursor = conn.cursor()
cursor.execute(
"""
UPDATE tasks
SET requirement_ids = ?, updated_at = ?
WHERE workspace_id = ? AND id = ?
""",
(json.dumps(requirement_ids), now, workspace.id, task_id),
)
conn.commit()
finally:
conn.close()

task.requirement_ids = requirement_ids
task.updated_at = datetime.fromisoformat(now)
Expand Down Expand Up @@ -717,18 +739,20 @@ def delete(workspace: Workspace, task_id: str) -> bool:
Use delete_cascade() if you need to clean up dependencies.
"""
conn = get_db_connection(workspace)
cursor = conn.cursor()

cursor.execute(
"""
DELETE FROM tasks
WHERE workspace_id = ? AND id = ?
""",
(workspace.id, task_id),
)
deleted = cursor.rowcount > 0
conn.commit()
conn.close()
try:
cursor = conn.cursor()

cursor.execute(
"""
DELETE FROM tasks
WHERE workspace_id = ? AND id = ?
""",
(workspace.id, task_id),
)
deleted = cursor.rowcount > 0
conn.commit()
finally:
conn.close()

return deleted

Expand All @@ -743,18 +767,20 @@ def delete_all(workspace: Workspace) -> int:
Number of tasks deleted
"""
conn = get_db_connection(workspace)
cursor = conn.cursor()

cursor.execute(
"""
DELETE FROM tasks
WHERE workspace_id = ?
""",
(workspace.id,),
)
deleted_count = cursor.rowcount
conn.commit()
conn.close()
try:
cursor = conn.cursor()

cursor.execute(
"""
DELETE FROM tasks
WHERE workspace_id = ?
""",
(workspace.id,),
)
deleted_count = cursor.rowcount
conn.commit()
finally:
conn.close()

return deleted_count

Expand All @@ -769,19 +795,21 @@ def count_by_status(workspace: Workspace) -> dict[str, int]:
Dict mapping status string to count
"""
conn = get_db_connection(workspace)
cursor = conn.cursor()

cursor.execute(
"""
SELECT status, COUNT(*) as count
FROM tasks
WHERE workspace_id = ?
GROUP BY status
""",
(workspace.id,),
)
rows = cursor.fetchall()
conn.close()
try:
cursor = conn.cursor()

cursor.execute(
"""
SELECT status, COUNT(*) as count
FROM tasks
WHERE workspace_id = ?
GROUP BY status
""",
(workspace.id,),
)
rows = cursor.fetchall()
finally:
conn.close()

return {row[0]: row[1] for row in rows}

Expand Down
33 changes: 0 additions & 33 deletions codeframe/notifications/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,36 +345,3 @@ def _run_send_event_sync(self, payload: dict, url: Optional[str]) -> None:
exc_info=True,
)

def send_blocker_notification_background(
self,
blocker_id: int,
question: str,
agent_id: str,
task_id: int,
blocker_type: BlockerType,
created_at: datetime,
) -> None:
"""Fire-and-forget wrapper for send_blocker_notification.

Launches notification task in background without awaiting result.
Use this method to avoid blocking blocker creation.

Args:
blocker_id: Blocker database ID
question: Blocker question text
agent_id: Agent that created the blocker
task_id: Associated task ID
blocker_type: SYNC or ASYNC
created_at: Blocker creation timestamp
"""
# Create background task
asyncio.create_task(
self.send_blocker_notification(
blocker_id=blocker_id,
question=question,
agent_id=agent_id,
task_id=task_id,
blocker_type=blocker_type,
created_at=created_at,
)
)
Loading
Loading