Skip to content

Commit 5b1e3e8

Browse files
committed
Refine jobs spec: priority, delete, populate logic
- Priority: lower = more urgent (0 = highest), default = 5 - Acyclic state diagram with dual (none) states - delete() inherited from delete_quick(), use (jobs & cond).delete() - Added 'ignored' property for consistency - populate() logic: fetch pending first, only refresh if no pending found - Updated all examples to reflect new priority semantics
1 parent 586effa commit 5b1e3e8

1 file changed

Lines changed: 45 additions & 45 deletions

File tree

docs/src/design/autopopulate-2.0-spec.md

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ session_id : int
8989
... # Same primary key attributes as MyTable (NO foreign key constraints)
9090
---
9191
status : enum('pending', 'reserved', 'success', 'error', 'ignore')
92-
priority : int # Higher priority = processed first (default: 0)
92+
priority : int # Lower = more urgent (0 = highest priority, default: 5)
9393
created_time : datetime # When job was added to queue
9494
scheduled_time : datetime # Process on or after this time (default: now)
9595
reserved_time : datetime # When job was reserved (null if not reserved)
@@ -140,17 +140,18 @@ FilteredImage.jobs.refresh() # Refresh job queue
140140

141141
```mermaid
142142
stateDiagram-v2
143-
state "(none)" as none
144-
none --> pending : refresh()
145-
none --> ignore : ignore()
143+
state "(none)" as none1
144+
state "(none)" as none2
145+
none1 --> pending : refresh()
146+
none1 --> ignore : ignore()
146147
pending --> reserved : reserve()
147-
reserved --> none : complete()
148+
reserved --> none2 : complete()
148149
reserved --> success : complete()*
149150
reserved --> error : error()
150151
success --> pending : refresh()*
151-
error --> none : delete()
152-
success --> none : delete()
153-
ignore --> none : delete()
152+
error --> none2 : delete()
153+
success --> none2 : delete()
154+
ignore --> none2 : delete()
154155
```
155156

156157
- `complete()` deletes the job entry (default when `jobs.keep_completed=False`)
@@ -163,12 +164,12 @@ stateDiagram-v2
163164
- `reserve()` — Marks a pending job as `reserved` before calling `make()`
164165
- `complete()` — Marks reserved job as `success`, or deletes it (based on `jobs.keep_completed` setting)
165166
- `error()` — Marks reserved job as `error` with message and stack trace
166-
- `delete()`Removes job entries without confirmation (low-cost operation)
167+
- `delete()`Inherited from `delete_quick()`; use `(jobs & condition).delete()` pattern
167168

168169
**Manual status control:**
169170
- `ignore` is set manually via `jobs.ignore(key)` and is not part of automatic transitions
170171
- Jobs with `status='ignore'` are skipped by `populate()` and `refresh()`
171-
- To reset an ignored job, delete it and call `refresh()`
172+
- To reset an ignored job, delete it and call `refresh()`: `jobs.ignored.delete(); jobs.refresh()`
172173

173174
## API Design
174175

@@ -187,7 +188,7 @@ class JobsTable(Table):
187188
self,
188189
*restrictions,
189190
scheduled_time: datetime = None,
190-
priority: int = None,
191+
priority: int = 5,
191192
stale_timeout: float = None
192193
) -> dict:
193194
"""
@@ -203,7 +204,7 @@ class JobsTable(Table):
203204
scheduled_time: When new jobs should become available for processing.
204205
Default: now (jobs are immediately available).
205206
Use future times to schedule jobs for later processing.
206-
priority: Priority for new jobs (higher = processed first). Default: 0
207+
priority: Priority for new jobs (lower = more urgent). Default: 5
207208
stale_timeout: Seconds after which pending jobs are checked for staleness.
208209
Jobs older than this are removed if their key is no longer
209210
in key_source. Default from config: jobs.stale_timeout (3600s)
@@ -249,22 +250,8 @@ class JobsTable(Table):
249250
"""
250251
...
251252

252-
def delete(self, *restrictions) -> int:
253-
"""
254-
Delete jobs matching restrictions. No confirmation required.
255-
256-
Deleted jobs return to (none) state. Call refresh() to re-add
257-
them as pending if their keys are still in key_source.
258-
259-
Examples:
260-
jobs.errors.delete() # Delete all error jobs
261-
(jobs & 'status="success"').delete() # Delete completed jobs
262-
(jobs & 'subject_id=42').delete() # Delete jobs for specific key
263-
264-
Returns:
265-
Number of jobs deleted.
266-
"""
267-
...
253+
# delete() is inherited from delete_quick() - no confirmation required
254+
# Usage: (jobs & condition).delete() or jobs.errors.delete()
268255

269256
@property
270257
def pending(self) -> QueryExpression:
@@ -281,6 +268,11 @@ class JobsTable(Table):
281268
"""Return query for error jobs."""
282269
return self & 'status="error"'
283270

271+
@property
272+
def ignored(self) -> QueryExpression:
273+
"""Return query for ignored jobs."""
274+
return self & 'status="ignore"'
275+
284276
@property
285277
def completed(self) -> QueryExpression:
286278
"""Return query for completed jobs."""
@@ -305,20 +297,22 @@ def populate(
305297
processes: int = 1,
306298
make_kwargs: dict = None,
307299
# New parameters
308-
priority: int = None, # Only process jobs with this priority or higher
309-
refresh: bool = True, # Refresh jobs queue before populating
300+
priority: int = None, # Only process jobs at this priority or more urgent (lower values)
301+
refresh: bool = True, # Refresh jobs queue if no pending jobs available
310302
) -> dict:
311303
"""
312304
Populate the table by calling make() for each missing entry.
313305
314306
New behavior with reserve_jobs=True:
315-
1. If refresh=True, calls self.jobs.refresh(*restrictions)
316-
2. For each pending job (ordered by priority, scheduled_time):
307+
1. Fetch all non-stale pending jobs (ordered by priority ASC, scheduled_time ASC)
308+
2. For each pending job:
317309
a. Mark job as 'reserved' (per-key, before make)
318310
b. Call make(key)
319-
c. On success: mark job as 'success'
311+
c. On success: mark job as 'success' or delete (based on keep_completed)
320312
d. On error: mark job as 'error' with message/stack
321-
3. Continue until no more pending jobs or max_calls reached
313+
3. If refresh=True and no pending jobs were found, call self.jobs.refresh()
314+
and repeat from step 1
315+
4. Continue until no more pending jobs or max_calls reached
322316
"""
323317
...
324318
```
@@ -345,24 +339,30 @@ MyTable.jobs.progress() # Returns detailed status breakdown
345339

346340
### Priority and Scheduling
347341

348-
Priority and scheduling are handled via `refresh()` parameters:
342+
Priority and scheduling are handled via `refresh()` parameters. Lower priority values are more urgent (0 = highest priority).
349343

350344
```python
351345
from datetime import datetime, timedelta
352346

353-
# Add jobs with high priority (higher = processed first)
347+
# Add urgent jobs (priority=0 is most urgent)
348+
MyTable.jobs.refresh(priority=0)
349+
350+
# Add normal jobs (default priority=5)
351+
MyTable.jobs.refresh()
352+
353+
# Add low-priority background jobs
354354
MyTable.jobs.refresh(priority=10)
355355

356356
# Schedule jobs for future processing (2 hours from now)
357357
future_time = datetime.now() + timedelta(hours=2)
358358
MyTable.jobs.refresh(scheduled_time=future_time)
359359

360-
# Combine: high-priority jobs scheduled for tonight
360+
# Combine: urgent jobs scheduled for tonight
361361
tonight = datetime.now().replace(hour=22, minute=0, second=0)
362-
MyTable.jobs.refresh(priority=100, scheduled_time=tonight)
362+
MyTable.jobs.refresh(priority=0, scheduled_time=tonight)
363363

364-
# Add jobs for specific subjects with priority
365-
MyTable.jobs.refresh(Subject & 'priority="urgent"', priority=50)
364+
# Add urgent jobs for specific subjects
365+
MyTable.jobs.refresh(Subject & 'priority="urgent"', priority=0)
366366
```
367367

368368
## Implementation Details
@@ -487,8 +487,8 @@ New configuration settings for job management:
487487
# In datajoint config
488488
dj.config['jobs.auto_refresh'] = True # Auto-refresh on populate (default: True)
489489
dj.config['jobs.keep_completed'] = False # Keep success records (default: False)
490-
dj.config['jobs.stale_timeout'] = 3600 # Seconds before reserved job is stale (default: 3600)
491-
dj.config['jobs.default_priority'] = 0 # Default priority for new jobs (default: 0)
490+
dj.config['jobs.stale_timeout'] = 3600 # Seconds before pending job is considered stale (default: 3600)
491+
dj.config['jobs.default_priority'] = 5 # Default priority for new jobs (lower = more urgent)
492492
```
493493

494494
## Usage Examples
@@ -509,11 +509,11 @@ print(FilteredImage.jobs.progress())
509509
### Priority-Based Processing
510510

511511
```python
512-
# Add urgent jobs with high priority
512+
# Add urgent jobs (priority=0 is most urgent)
513513
urgent_subjects = Subject & 'priority="urgent"'
514-
FilteredImage.jobs.refresh(urgent_subjects, priority=100)
514+
FilteredImage.jobs.refresh(urgent_subjects, priority=0)
515515

516-
# Workers will process high-priority jobs first
516+
# Workers will process lowest-priority-value jobs first
517517
FilteredImage.populate(reserve_jobs=True)
518518
```
519519

0 commit comments

Comments
 (0)