-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask_queue.py
More file actions
252 lines (203 loc) · 7.78 KB
/
task_queue.py
File metadata and controls
252 lines (203 loc) · 7.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
"""
TaskQueue: A priority-based task scheduler with retry logic.
Supports:
- Adding tasks with priority levels (1=highest, 5=lowest)
- Automatic retry with exponential backoff on failure
- Task deduplication by name
- Execution history tracking
- Statistics reporting
"""
from __future__ import annotations
import time
import hashlib
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Any, Callable
from collections import defaultdict
class Priority(IntEnum):
CRITICAL = 1
HIGH = 2
NORMAL = 3
LOW = 4
BACKGROUND = 5
@dataclass
class Task:
name: str
fn: Callable[..., Any]
priority: Priority = Priority.NORMAL
max_retries: int = 3
args: tuple = ()
kwargs: dict = field(default_factory=dict)
_attempts: int = 0
_last_error: str | None = None
_created_at: float = field(default_factory=time.time)
@property
def task_id(self) -> str:
"""Generate unique ID from task name."""
return hashlib.md5(self.name.encode()).hexdigest()
@property
def can_retry(self) -> bool:
return self._attempts < self.max_retries
@property
def backoff_seconds(self) -> float:
"""Exponential backoff: 2^attempts seconds."""
return 2**self._attempts
class TaskResult:
def __init__(self, task: Task, success: bool, value: Any = None, error: str | None = None):
self.task = task
self.success = success
self.value = value
self.error = error
self.timestamp = time.time()
self.duration = 0.0
class TaskQueue:
def __init__(self, max_queue_size: int = 100, sleep_fn: Callable[[float], None] = time.sleep):
self._queue: list[Task] = []
self._history: list[TaskResult] = []
self._max_size = max_queue_size
self._running = False
self._completed_names: set[str] = set()
self._stats = defaultdict(int)
self._sleep_fn = sleep_fn
def add_task(self, task: Task) -> bool:
"""Add a task to the queue. Returns False if queue is full or task is duplicate."""
# Check for duplicates
if self._is_duplicate(task):
return False
# Check queue capacity
if len(self._queue) >= self._max_size:
return False
self._queue.append(task)
self._sort_queue()
self._stats["tasks_added"] += 1
return True
def add_batch(self, tasks: list[Task]) -> dict[str, bool]:
"""Add multiple tasks. Returns dict of name -> success."""
results = {}
for task in tasks:
if task.name not in results:
results[task.name] = self.add_task(task)
return results
def execute_next(self) -> TaskResult | None:
"""Execute the highest priority task in the queue."""
if not self._queue:
return None
task = self._queue.pop(0)
return self._execute_task(task)
def execute_all(self) -> list[TaskResult]:
"""Execute all tasks in priority order."""
results = []
self._running = True
while self._queue and self._running:
result = self.execute_next()
if result:
results.append(result)
self._running = False
return results
def cancel_task(self, task_name: str) -> bool:
"""Remove a task from the queue by name."""
for i, task in enumerate(self._queue):
if task.name == task_name:
del self._queue[i]
self._stats["tasks_cancelled"] += 1
return True
return False
def get_stats(self) -> dict[str, Any]:
"""Return execution statistics."""
total = self._stats["tasks_succeeded"] + self._stats["tasks_failed"]
success_rate = self._stats["tasks_succeeded"] / total if total else 0
avg_duration = 0.0
if self._history:
avg_duration = sum(r.duration for r in self._history) / len(self._history)
return {
"queue_size": len(self._queue),
"total_executed": total,
"succeeded": self._stats["tasks_succeeded"],
"failed": self._stats["tasks_failed"],
"success_rate": success_rate,
"avg_duration": avg_duration,
"retries": self._stats["total_retries"],
"tasks_added": self._stats["tasks_added"],
"tasks_cancelled": self._stats["tasks_cancelled"],
}
def get_history(self, last_n: int = 10) -> list[TaskResult]:
"""Return the last N results."""
return self._history[-last_n:]
def clear(self) -> int:
"""Clear the queue. Returns number of tasks removed."""
count = len(self._queue)
self._queue.clear()
return count
def peek(self) -> Task | None:
"""Look at the next task without removing it."""
if self._queue:
return self._queue[0]
return None
def get_tasks_by_priority(self, priority: Priority) -> list[Task]:
"""Get all queued tasks with a specific priority."""
return [t for t in self._queue if t.priority == priority]
def _execute_task(self, task: Task) -> TaskResult:
"""Execute a single task with retry logic."""
start_time = time.time()
while True:
task._attempts += 1
try:
value = task.fn(*task.args, **task.kwargs)
result = TaskResult(task, success=True, value=value)
result.duration = time.time() - start_time
self._history.append(result)
self._completed_names.add(task.name)
self._stats["tasks_succeeded"] += 1
return result
except Exception as e:
task._last_error = str(e)
self._stats["total_retries"] += 1
if not task.can_retry:
result = TaskResult(task, success=False, error=str(e))
result.duration = time.time() - start_time
self._history.append(result)
self._stats["tasks_failed"] += 1
return result
# Backoff before retry
self._sleep_fn(task.backoff_seconds)
def _is_duplicate(self, task: Task) -> bool:
"""Check if a task with the same name is already queued or completed."""
# Check current queue
for queued in self._queue:
if queued.name == task.name:
return True
# Check completed tasks
if task.name in self._completed_names:
return True
return False
def _sort_queue(self) -> None:
"""Sort queue by priority (lowest number = highest priority)."""
self._queue.sort(key=lambda t: t.priority)
def run_pipeline(steps: list[Callable], data: Any = None) -> tuple[bool, Any, list[str]]:
"""
Execute a series of functions as a pipeline.
Each step receives the output of the previous step.
Returns (success, final_result, errors).
"""
errors = []
current = data
for i, step in enumerate(steps):
try:
current = step(current)
except Exception as e:
errors.append(f"Step {i}: {str(e)}")
return (False, current, errors)
return (True, current, errors)
def merge_task_stats(*stat_dicts: dict[str, Any]) -> dict[str, Any]:
"""Merge statistics from multiple TaskQueue instances."""
merged = defaultdict(int)
for stats in stat_dicts:
for key, value in stats.items():
if isinstance(value, (int, float)):
merged[key] += value
elif key == "success_rate":
pass # Recompute below
# Recompute success rate
total = merged["succeeded"] + merged["failed"]
merged["success_rate"] = merged["succeeded"] / total if total else 0.0
return dict(merged)