Skip to content

Commit b0fb421

Browse files
authored
fix(core): release tasks.py DB connections on exception + retain fire-and-forget auto-close task (#650) (#688)
* fix(core): release tasks.py DB connections on exception + retain fire-and-forget auto-close task (#650) - Wrap unguarded DB paths in tasks.py (get, list_tasks, count_by_status, update_depends_on, update_requirement_ids, delete, delete_all) in try/finally so the SQLite connection is released even when an error is raised before the normal conn.close(). - Retain a strong reference to the GitHub auto-close task scheduled on a running loop (_close_issue_background) via a module-level set + a done-callback that discards it, mirroring WebhookNotificationService.send_event_background. asyncio only keeps a weak reference, so an un-retained task could be GC'd mid-flight. - Remove the unused, buggy send_blocker_notification_background webhook wrapper (bare asyncio.create_task, no reference, no no-loop fallback) and its test. Production blocker notifications use the safe send_event_background. Closes #650 * test(core): strengthen #650 connection-safety coverage from review Address claude-review feedback on PR #688: - Inject faults at execute() so each function body (incl. list_tasks status-filter branch) is reached, not just connection setup. - Add delete/delete_all to the commit-error parametrization (both commit, so a commit failure was also a leak vector). - Snapshot/restore _background_tasks with try/finally so the async test cannot leave the module set dirty for subsequent tests.
1 parent a8edbaa commit b0fb421

4 files changed

Lines changed: 312 additions & 148 deletions

File tree

codeframe/core/tasks.py

Lines changed: 126 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -169,18 +169,20 @@ def get(workspace: Workspace, task_id: str) -> Optional[Task]:
169169
Task if found, None otherwise
170170
"""
171171
conn = get_db_connection(workspace)
172-
cursor = conn.cursor()
173-
174-
cursor.execute(
175-
"""
176-
SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at, github_issue_number, parent_id, lineage, is_leaf, hierarchical_id, requirement_ids, external_url, auto_close_github_issue
177-
FROM tasks
178-
WHERE workspace_id = ? AND id = ?
179-
""",
180-
(workspace.id, task_id),
181-
)
182-
row = cursor.fetchone()
183-
conn.close()
172+
try:
173+
cursor = conn.cursor()
174+
175+
cursor.execute(
176+
"""
177+
SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at, github_issue_number, parent_id, lineage, is_leaf, hierarchical_id, requirement_ids, external_url, auto_close_github_issue
178+
FROM tasks
179+
WHERE workspace_id = ? AND id = ?
180+
""",
181+
(workspace.id, task_id),
182+
)
183+
row = cursor.fetchone()
184+
finally:
185+
conn.close()
184186

185187
if not row:
186188
return None
@@ -288,33 +290,35 @@ def list_tasks(
288290
List of Tasks
289291
"""
290292
conn = get_db_connection(workspace)
291-
cursor = conn.cursor()
293+
try:
294+
cursor = conn.cursor()
292295

293-
if status:
294-
cursor.execute(
295-
"""
296-
SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at, github_issue_number, parent_id, lineage, is_leaf, hierarchical_id, requirement_ids, external_url, auto_close_github_issue
297-
FROM tasks
298-
WHERE workspace_id = ? AND status = ?
299-
ORDER BY priority ASC, created_at ASC
300-
LIMIT ?
301-
""",
302-
(workspace.id, status.value, limit),
303-
)
304-
else:
305-
cursor.execute(
306-
"""
307-
SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at, github_issue_number, parent_id, lineage, is_leaf, hierarchical_id, requirement_ids, external_url, auto_close_github_issue
308-
FROM tasks
309-
WHERE workspace_id = ?
310-
ORDER BY priority ASC, created_at ASC
311-
LIMIT ?
312-
""",
313-
(workspace.id, limit),
314-
)
296+
if status:
297+
cursor.execute(
298+
"""
299+
SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at, github_issue_number, parent_id, lineage, is_leaf, hierarchical_id, requirement_ids, external_url, auto_close_github_issue
300+
FROM tasks
301+
WHERE workspace_id = ? AND status = ?
302+
ORDER BY priority ASC, created_at ASC
303+
LIMIT ?
304+
""",
305+
(workspace.id, status.value, limit),
306+
)
307+
else:
308+
cursor.execute(
309+
"""
310+
SELECT id, workspace_id, prd_id, title, description, status, priority, depends_on, estimated_hours, complexity_score, uncertainty_level, created_at, updated_at, github_issue_number, parent_id, lineage, is_leaf, hierarchical_id, requirement_ids, external_url, auto_close_github_issue
311+
FROM tasks
312+
WHERE workspace_id = ?
313+
ORDER BY priority ASC, created_at ASC
314+
LIMIT ?
315+
""",
316+
(workspace.id, limit),
317+
)
315318

316-
rows = cursor.fetchall()
317-
conn.close()
319+
rows = cursor.fetchall()
320+
finally:
321+
conn.close()
318322

319323
return [_row_to_task(row) for row in rows]
320324

@@ -468,6 +472,14 @@ def _dispatch_github_autoclose(workspace: Workspace, task: Task) -> None:
468472
# a short-lived CLI process for long at exit.
469473
_AUTOCLOSE_TIMEOUT = 10.0
470474

475+
# Strong references to in-flight auto-close tasks scheduled on a running event
476+
# loop. asyncio only keeps a *weak* reference to a task created with
477+
# ``loop.create_task``, so without this set a task could be garbage-collected
478+
# mid-flight (leaving the issue open and emitting a "Task was destroyed but it
479+
# is pending" warning). Tasks remove themselves via the done-callback once they
480+
# complete. Mirrors ``WebhookNotificationService.send_event_background``.
481+
_background_tasks: set[asyncio.Task] = set()
482+
471483

472484
async def _safe_close_issue(pat: str, repo: str, issue_number: int) -> None:
473485
"""Close the issue, swallowing every error (best-effort, off the hot path)."""
@@ -512,7 +524,13 @@ def _close_issue_background(pat: str, repo: str, issue_number: int) -> None:
512524
name=f"gh-autoclose-{issue_number}",
513525
).start()
514526
else:
515-
loop.create_task(_safe_close_issue(pat, repo, issue_number))
527+
# Retain a strong reference until the task finishes so asyncio cannot
528+
# garbage-collect it mid-flight; the done-callback consumes any
529+
# exception (``_safe_close_issue`` already swallows them) and drops the
530+
# reference.
531+
task = loop.create_task(_safe_close_issue(pat, repo, issue_number))
532+
_background_tasks.add(task)
533+
task.add_done_callback(_background_tasks.discard)
516534

517535

518536
def update(
@@ -626,18 +644,20 @@ def update_depends_on(
626644
now = _utc_now().isoformat()
627645

628646
conn = get_db_connection(workspace)
629-
cursor = conn.cursor()
630-
631-
cursor.execute(
632-
"""
633-
UPDATE tasks
634-
SET depends_on = ?, updated_at = ?
635-
WHERE workspace_id = ? AND id = ?
636-
""",
637-
(json.dumps(depends_on), now, workspace.id, task_id),
638-
)
639-
conn.commit()
640-
conn.close()
647+
try:
648+
cursor = conn.cursor()
649+
650+
cursor.execute(
651+
"""
652+
UPDATE tasks
653+
SET depends_on = ?, updated_at = ?
654+
WHERE workspace_id = ? AND id = ?
655+
""",
656+
(json.dumps(depends_on), now, workspace.id, task_id),
657+
)
658+
conn.commit()
659+
finally:
660+
conn.close()
641661

642662
task.depends_on = depends_on
643663
task.updated_at = datetime.fromisoformat(now)
@@ -670,17 +690,19 @@ def update_requirement_ids(
670690
now = _utc_now().isoformat()
671691

672692
conn = get_db_connection(workspace)
673-
cursor = conn.cursor()
674-
cursor.execute(
675-
"""
676-
UPDATE tasks
677-
SET requirement_ids = ?, updated_at = ?
678-
WHERE workspace_id = ? AND id = ?
679-
""",
680-
(json.dumps(requirement_ids), now, workspace.id, task_id),
681-
)
682-
conn.commit()
683-
conn.close()
693+
try:
694+
cursor = conn.cursor()
695+
cursor.execute(
696+
"""
697+
UPDATE tasks
698+
SET requirement_ids = ?, updated_at = ?
699+
WHERE workspace_id = ? AND id = ?
700+
""",
701+
(json.dumps(requirement_ids), now, workspace.id, task_id),
702+
)
703+
conn.commit()
704+
finally:
705+
conn.close()
684706

685707
task.requirement_ids = requirement_ids
686708
task.updated_at = datetime.fromisoformat(now)
@@ -717,18 +739,20 @@ def delete(workspace: Workspace, task_id: str) -> bool:
717739
Use delete_cascade() if you need to clean up dependencies.
718740
"""
719741
conn = get_db_connection(workspace)
720-
cursor = conn.cursor()
721-
722-
cursor.execute(
723-
"""
724-
DELETE FROM tasks
725-
WHERE workspace_id = ? AND id = ?
726-
""",
727-
(workspace.id, task_id),
728-
)
729-
deleted = cursor.rowcount > 0
730-
conn.commit()
731-
conn.close()
742+
try:
743+
cursor = conn.cursor()
744+
745+
cursor.execute(
746+
"""
747+
DELETE FROM tasks
748+
WHERE workspace_id = ? AND id = ?
749+
""",
750+
(workspace.id, task_id),
751+
)
752+
deleted = cursor.rowcount > 0
753+
conn.commit()
754+
finally:
755+
conn.close()
732756

733757
return deleted
734758

@@ -743,18 +767,20 @@ def delete_all(workspace: Workspace) -> int:
743767
Number of tasks deleted
744768
"""
745769
conn = get_db_connection(workspace)
746-
cursor = conn.cursor()
747-
748-
cursor.execute(
749-
"""
750-
DELETE FROM tasks
751-
WHERE workspace_id = ?
752-
""",
753-
(workspace.id,),
754-
)
755-
deleted_count = cursor.rowcount
756-
conn.commit()
757-
conn.close()
770+
try:
771+
cursor = conn.cursor()
772+
773+
cursor.execute(
774+
"""
775+
DELETE FROM tasks
776+
WHERE workspace_id = ?
777+
""",
778+
(workspace.id,),
779+
)
780+
deleted_count = cursor.rowcount
781+
conn.commit()
782+
finally:
783+
conn.close()
758784

759785
return deleted_count
760786

@@ -769,19 +795,21 @@ def count_by_status(workspace: Workspace) -> dict[str, int]:
769795
Dict mapping status string to count
770796
"""
771797
conn = get_db_connection(workspace)
772-
cursor = conn.cursor()
773-
774-
cursor.execute(
775-
"""
776-
SELECT status, COUNT(*) as count
777-
FROM tasks
778-
WHERE workspace_id = ?
779-
GROUP BY status
780-
""",
781-
(workspace.id,),
782-
)
783-
rows = cursor.fetchall()
784-
conn.close()
798+
try:
799+
cursor = conn.cursor()
800+
801+
cursor.execute(
802+
"""
803+
SELECT status, COUNT(*) as count
804+
FROM tasks
805+
WHERE workspace_id = ?
806+
GROUP BY status
807+
""",
808+
(workspace.id,),
809+
)
810+
rows = cursor.fetchall()
811+
finally:
812+
conn.close()
785813

786814
return {row[0]: row[1] for row in rows}
787815

codeframe/notifications/webhook.py

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -345,36 +345,3 @@ def _run_send_event_sync(self, payload: dict, url: Optional[str]) -> None:
345345
exc_info=True,
346346
)
347347

348-
def send_blocker_notification_background(
349-
self,
350-
blocker_id: int,
351-
question: str,
352-
agent_id: str,
353-
task_id: int,
354-
blocker_type: BlockerType,
355-
created_at: datetime,
356-
) -> None:
357-
"""Fire-and-forget wrapper for send_blocker_notification.
358-
359-
Launches notification task in background without awaiting result.
360-
Use this method to avoid blocking blocker creation.
361-
362-
Args:
363-
blocker_id: Blocker database ID
364-
question: Blocker question text
365-
agent_id: Agent that created the blocker
366-
task_id: Associated task ID
367-
blocker_type: SYNC or ASYNC
368-
created_at: Blocker creation timestamp
369-
"""
370-
# Create background task
371-
asyncio.create_task(
372-
self.send_blocker_notification(
373-
blocker_id=blocker_id,
374-
question=question,
375-
agent_id=agent_id,
376-
task_id=task_id,
377-
blocker_type=blocker_type,
378-
created_at=created_at,
379-
)
380-
)

0 commit comments

Comments
 (0)