Date: 2025-07-20 | Status: Active | Version: 1.0 | Owner: Deep Docs Pipeline Source: Generated from codebase analysis | Cross-links: See Related Documents section
The Task Queue Service is the central nervous system of OmoiOS's task execution pipeline. It manages task lifecycle from creation through completion, handles priority-based assignment with dynamic scoring, enforces dependency graphs, and provides comprehensive retry and timeout management. The service supports both legacy agent-based execution and modern sandbox-based execution modes.
graph TB
subgraph "Task Queue Service"
TQS[TaskQueueService]
ET[enqueue_task]
GNT[get_next_task]
GRT[get_ready_tasks]
AT[assign_task]
UTS[update_task_status]
CD[check_dependencies_complete]
DCD[detect_circular_dependencies]
IR[is_retryable_error]
IT[increment_retry]
CTT[check_task_timeout]
CT[cancel_task]
end
subgraph "Supporting Components"
TS[TaskScorer]
GTT[generate_task_title]
end
subgraph "External Services"
DB[(Database)]
EB[Event Bus]
Sentry[Sentry]
end
TQS --> ET
TQS --> GNT
TQS --> GRT
TQS --> AT
TQS --> UTS
TQS --> CD
TQS --> DCD
TQS --> IR
TQS --> IT
TQS --> CTT
TQS --> CT
TQS --> TS
TQS --> GTT
TQS --> DB
TQS --> EB
TQS --> Sentry
backend/omoi_os/services/task_queue.py:81-1446
class TaskQueueService:
"""Manages task queue operations: enqueue, retrieve, assign, update."""
def __init__(
self,
db: DatabaseService,
event_bus: Optional["EventBusService"] = None,
):
self.db = db
self.scorer = TaskScorer(db)
self.event_bus = event_busCore Methods:
| Method | Line | Purpose |
|---|---|---|
enqueue_task |
135-223 | Add task to queue with scoring |
get_next_task |
225-314 | Get highest-scored ready task with atomic claim |
get_ready_tasks |
316-384 | Get multiple ready tasks for batch execution |
assign_task |
386-415 | Assign task to agent/sandbox |
update_task_status |
417-504 | Update status with event publishing |
check_dependencies_complete |
631-645 | Verify dependency satisfaction |
detect_circular_dependencies |
673-708 | DFS-based cycle detection |
Retry & Timeout Methods:
| Method | Line | Purpose |
|---|---|---|
should_retry |
710-730 | Check if failed task can retry |
increment_retry |
732-770 | Increment retry and reset status |
is_retryable_error |
801-856 | Classify error retryability |
check_task_timeout |
860-887 | Check if task exceeded timeout |
cancel_task |
889-917 | Cancel running task |
mark_task_timeout |
949-989 | Mark task as failed due to timeout |
Stale Task Cleanup:
| Method | Line | Purpose |
|---|---|---|
cleanup_stale_claiming_tasks |
1079-1133 | Reset tasks stuck in claiming |
cleanup_stale_assigned_tasks |
1174-1233 | Mark orphaned tasks as failed |
stateDiagram-v2
[*] --> pending: enqueue_task
pending --> claiming: get_next_task (atomic)
claiming --> assigned: assign_task
claiming --> pending: cleanup_stale_claiming_tasks
assigned --> running: update_task_status
assigned --> failed: cleanup_stale_assigned_tasks
running --> completed: update_task_status
running --> failed: update_task_status / timeout
running --> failed: cancel_task
failed --> pending: increment_retry
completed --> [*]
failed --> [*]: max retries exceeded
| Status | Description | Transitions |
|---|---|---|
pending |
Waiting for assignment | → claiming |
claiming |
Atomic claim in progress | → assigned, → pending (stale) |
assigned |
Assigned to agent/sandbox | → running, → failed (stale) |
running |
Actively executing | → completed, → failed |
completed |
Successfully finished | Terminal |
failed |
Execution failed | → pending (retry), Terminal |
under_review |
Pending validation | Legacy status |
validation_in_progress |
Being validated | Legacy status |
backend/omoi_os/services/task_queue.py:135-223
def enqueue_task(
self,
ticket_id: str,
phase_id: str,
task_type: str,
description: str,
priority: str,
dependencies: dict | None = None,
session: Optional["Session"] = None,
title: Optional[str] = None,
execution_config: dict | None = None,
required_capabilities: list[str] | None = None,
) -> Task:
"""Add a task to the queue."""
task = Task(
ticket_id=ticket_id,
phase_id=phase_id,
task_type=task_type,
title=title,
description=description,
priority=priority,
status="pending",
dependencies=dependencies,
execution_config=execution_config,
required_capabilities=required_capabilities,
)
# Compute initial score (REQ-TQM-PRI-002)
task.score = self.scorer.compute_score(task)
# Publish event
self._publish_event("TASK_CREATED", task)
return taskbackend/omoi_os/services/task_queue.py:225-314
sequenceDiagram
participant OW as OrchestratorWorker
participant TQS as TaskQueueService
participant DB as Database
OW->>TQS: get_next_task(phase_id, capabilities)
TQS->>DB: Query pending tasks
DB-->>TQS: List of pending tasks
TQS->>TQS: Filter by dependencies & capabilities
TQS->>TQS: Compute scores
TQS->>TQS: Select highest scored
TQS->>DB: Atomic UPDATE with RETURNING
Note over TQS,DB: UPDATE tasks SET status='claiming' WHERE id=:id AND status='pending'
alt Claim successful
DB-->>TQS: Row returned
TQS-->>OW: Return claimed task
else Claim failed (race condition)
DB-->>TQS: No row returned
TQS-->>OW: Return None
end
def get_next_task(
self,
phase_id: Optional[str] = None,
agent_capabilities: Optional[List[str]] = None,
) -> Task | None:
"""Get highest-scored pending task with atomic claim."""
# Filter and score tasks
available_tasks = []
for task in tasks:
if not self._check_dependencies_complete(session, task):
continue
if not self._check_capability_match(task, agent_capabilities):
continue
task.score = self.scorer.compute_score(task)
available_tasks.append(task)
# Select highest scored
task = max(available_tasks, key=lambda t: t.score)
# ATOMIC CLAIM: Use SELECT ... FOR UPDATE pattern
result = session.execute(
text("""
UPDATE tasks
SET status = 'claiming', score = :score
WHERE id = :task_id
AND status = 'pending'
AND sandbox_id IS NULL
RETURNING id
"""),
{"task_id": str(task.id), "score": task.score},
)
claimed_row = result.fetchone()
if not claimed_row:
return None # Another process claimed this task
return taskbackend/omoi_os/services/task_queue.py:541-567
def _check_dependencies_complete(self, session, task: Task) -> bool:
"""Check if all dependencies for a task are completed."""
if not task.dependencies:
return True
depends_on = task.dependencies.get("depends_on", [])
if not depends_on:
return True
# Check if all dependency tasks are completed
dependency_tasks = session.query(Task).filter(
Task.id.in_(depends_on)
).all()
if len(dependency_tasks) != len(depends_on):
return False # Missing dependencies
return all(
dep_task.status == "completed"
for dep_task in dependency_tasks
)backend/omoi_os/services/task_queue.py:673-708
graph TB
subgraph "Circular Dependency Example"
A[Task A] --> B[Task B]
B --> C[Task C]
C --> A
end
def detect_circular_dependencies(
self,
task_id: str,
depends_on: list[str],
visited: list[str] | None = None
) -> list[str] | None:
"""Detect circular dependencies using DFS."""
if visited is None:
visited = []
if task_id in visited:
# Found a cycle
cycle_start = visited.index(task_id)
return visited[cycle_start:] + [task_id]
visited.append(task_id)
for dep_id in depends_on:
dep_task = session.query(Task).filter(Task.id == dep_id).first()
if dep_task and dep_task.dependencies:
cycle = self.detect_circular_dependencies(
dep_id,
dep_task.dependencies.get("depends_on", []),
visited.copy()
)
if cycle:
return cycle
return Nonebackend/omoi_os/services/task_queue.py:801-856
def is_retryable_error(self, error_message: str | None) -> bool:
"""Check if an error message indicates a retryable error."""
if not error_message:
return True # No error, assume retryable
error_lower = error_message.lower()
# Permanent errors - do NOT retry
permanent_patterns = [
"permission denied",
"access denied",
"authentication failed",
"syntax error",
"invalid argument",
"not found",
"already exists",
"quota exceeded",
]
for pattern in permanent_patterns:
if pattern in error_lower:
return False
# Retryable errors
retryable_patterns = [
"connection",
"timeout",
"network",
"temporary",
"unavailable",
"transient",
]
for pattern in retryable_patterns:
if pattern in error_lower:
return True
return True # Default to retryablebackend/omoi_os/services/task_queue.py:732-770
def increment_retry(self, task_id: str) -> bool:
"""Increment retry count and reset status to pending."""
with self.db.get_session() as session:
task = session.query(Task).filter(Task.id == task_id).first()
if task.retry_count >= task.max_retries:
return False
# Increment and reset
task.retry_count += 1
task.status = "pending"
task.error_message = None
# Clear assignment based on mode
if task.sandbox_id:
task.sandbox_id = None # Sandbox mode
else:
task.assigned_agent_id = None # Legacy mode
session.commit()
# Track metric
track_task_retried(str(task.id), task.phase_id, task.retry_count)
return Truebackend/omoi_os/services/task_queue.py:860-887
def check_task_timeout(self, task_id: str) -> bool:
"""Check if a task has exceeded its timeout."""
with self.db.get_session() as session:
task = session.query(Task).filter(Task.id == task_id).first()
# Only check running tasks with timeout
if task.status != "running" or not task.timeout_seconds:
return False
if not task.started_at:
return False
elapsed = (utc_now() - task.started_at).total_seconds()
return elapsed > task.timeout_secondsbackend/omoi_os/services/task_queue.py:1041-1077
def get_task_timeout_status(self, task_id: str) -> dict:
"""Get comprehensive timeout status for a task."""
return {
"exists": True,
"status": task.status,
"timeout_seconds": task.timeout_seconds,
"elapsed_seconds": elapsed_seconds,
"time_remaining": time_remaining,
"is_timed_out": is_timed_out,
"can_cancel": task.status in ["assigned", "running"],
}backend/omoi_os/services/task_queue.py:1079-1133
Tasks stuck in claiming status (orchestrator crashed mid-claim):
def cleanup_stale_claiming_tasks(
self,
stale_threshold_seconds: int = 60
) -> list[dict]:
"""Clean up tasks stuck in 'claiming' status."""
stale_cutoff = utc_now() - timedelta(seconds=stale_threshold_seconds)
tasks = session.query(Task).filter(
Task.status == "claiming",
Task.created_at < stale_cutoff,
).all()
for task in tasks:
task.status = "pending" # Reset for retry
cleaned_tasks.append({...})
return cleaned_tasksbackend/omoi_os/services/task_queue.py:1174-1233
Tasks stuck in assigned status (sandbox crashed before starting):
def cleanup_stale_assigned_tasks(
self,
stale_threshold_minutes: int = 3,
dry_run: bool = False
) -> list[dict]:
"""Clean up tasks stuck in 'assigned' status."""
stale_cutoff = utc_now() - timedelta(minutes=stale_threshold_minutes)
tasks = session.query(Task).filter(
Task.status.in_(["assigned", "claiming"]),
Task.sandbox_id.isnot(None), # Has sandbox
Task.created_at < stale_cutoff,
).all()
for task in tasks:
if not dry_run:
task.status = "failed"
task.error_message = "Task stuck in assigned status..."
task.completed_at = utc_now()backend/omoi_os/services/task_queue.py:1273-1338
async def get_ready_tasks_async(
self,
phase_id: Optional[str] = None,
limit: int = 10,
agent_capabilities: Optional[List[str]] = None,
) -> list[Task]:
"""Async version: Get multiple ready tasks."""
async with self.db.get_async_session() as session:
stmt = select(Task).filter(Task.status == "pending")
result = await session.execute(stmt)
tasks = result.scalars().all()
# Filter with async dependency check
available_tasks = []
for task in tasks:
if not await self._check_dependencies_complete_async(session, task):
continue
task.score = self.scorer.compute_score(task)
available_tasks.append(task)
# Sort and return
available_tasks.sort(key=lambda t: t.score, reverse=True)
return available_tasks[:limit]backend/omoi_os/services/task_queue.py:100-134
def _publish_event(
self,
event_type: str,
task: Task,
extra_payload: Optional[dict] = None,
) -> None:
"""Publish task event to event bus."""
payload = {
"task_id": str(task.id),
"ticket_id": task.ticket_id,
"phase_id": task.phase_id,
"task_type": task.task_type,
"status": task.status,
"priority": task.priority,
}
event = SystemEvent(
event_type=event_type,
entity_type="task",
entity_id=str(task.id),
payload=payload,
)
self.event_bus.publish(event)| Event | Trigger | Payload |
|---|---|---|
TASK_CREATED |
enqueue_task | Task details |
TASK_ASSIGNED |
assign_task | Agent/sandbox ID |
TASK_STARTED |
update_task_status (running) | Old status |
TASK_COMPLETED |
update_task_status (completed) | Result info |
TASK_FAILED |
update_task_status (failed) | Error message |
TASK_CANCELLED |
cancel_task | Cancellation reason |
TASK_STATUS_CHANGED |
Other status changes | Old/new status |
graph LR
TQS[TaskQueueService] --> DB[(Database)]
TQS --> TS[TaskScorer]
TQS --> EB[EventBus]
TQS --> Sentry[Sentry]
The OrchestratorWorker uses TaskQueueService to:
- Poll for tasks via
get_next_task()orget_next_task_async() - Claim tasks atomically to prevent race conditions
- Assign tasks to sandboxes via
assign_task() - Handle validation tasks via
get_next_validation_task() - Clean up stale tasks via cleanup methods
| Variable | Default | Description |
|---|---|---|
STALE_TASK_THRESHOLD_MINUTES |
3 | Minutes before assigned task is stale |
STALE_CLAIMING_THRESHOLD_SECONDS |
60 | Seconds before claiming task is stale |
STALE_TASK_CHECK_INTERVAL_SECONDS |
15 | Cleanup loop interval |
- Atomic claim - Verify race condition handling
- Dependency checking - Test complete/incomplete scenarios
- Circular detection - Test cycle identification
- Retry logic - Test classification and increment
- Timeout checking - Test elapsed time calculation
- Stale cleanup - Test threshold enforcement
- End-to-end task flow - Create → Claim → Assign → Complete
- Dependency chains - Verify DAG execution order
- Retry loops - Test failure → retry → success
- Timeout enforcement - Test automatic timeout marking
- Event publishing - Verify all events fired
- Orchestrator Worker - Primary consumer of task queue
- Spec Task Execution - Creates tasks via this service
- Diagnostic Service - Monitors task outcomes
- Result Submission - Handles completed task results
- Architecture Overview - System-wide context