This specification redesigns the DataJoint job handling system to provide better visibility, control, and scalability for distributed computing workflows. The new system replaces the schema-level ~jobs table with per-table job tables that offer richer status tracking, proper referential integrity, and dashboard-friendly monitoring.
The existing ~jobs table has significant limitations:
- Limited status tracking: Only supports
reserved,error, andignorestatuses - Functions as an error log: Cannot efficiently track pending or completed jobs
- Poor dashboard visibility: No way to monitor pipeline progress without querying multiple tables
- Key hashing obscures data: Primary keys are stored as hashes, making debugging difficult
- No referential integrity: Jobs table is independent of computed tables; orphaned jobs can accumulate
- Frequent manual modifications: Subset operations require modifying
key_sourceproperty - Local visibility only: Custom key sources are not accessible database-wide
- Performance bottleneck: Multiple workers querying
key_sourcesimultaneously creates contention - Codebase dependency: Requires full pipeline codebase to determine pending work
-
Stale job: A job (any status) whose key no longer exists in
key_source. The upstream records have been deleted. Stale jobs are cleaned up byrefresh()based on thestale_timeoutparameter. -
Orphaned job: A
reservedjob whose worker is no longer running. The process that reserved the job crashed, was terminated, or lost connection. The job remainsreservedindefinitely. Orphaned jobs can be cleaned up byrefresh(orphan_timeout=...)or manually deleted. -
Completed job: A job with status
success. Only exists whenkeep_completed=True. Represents historical record of successful computation.
- Per-table jobs: Each computed table gets its own hidden jobs table
- FK-only primary keys: Auto-populated tables must have primary keys composed entirely of foreign key references. Non-FK primary key attributes are prohibited in new tables (legacy tables are supported with degraded granularity)
- No FK constraints on jobs: Jobs tables omit foreign key constraints for performance; stale jobs are cleaned by
refresh() - Rich status tracking: Extended status values for full lifecycle visibility
- Automatic refresh:
populate()automatically refreshes the jobs queue (adding new jobs, removing stale ones) - Backward compatible: When
reserve_jobs=False(default), 1.0 behavior is preserved
Each dj.Imported or dj.Computed table MyTable will have an associated hidden jobs table ~~my_table with the following structure:
# Job queue for MyTable
subject_id : int
session_id : int
... # Only FK-derived primary key attributes (NO foreign key constraints)
---
status : enum('pending', 'reserved', 'success', 'error', 'ignore')
priority : uint8 # Lower = more urgent (0 = highest), set by refresh()
created_time=CURRENT_TIMESTAMP : timestamp # When job was added to queue
scheduled_time=CURRENT_TIMESTAMP : timestamp # Process on or after this time
reserved_time=null : timestamp # When job was reserved
completed_time=null : timestamp # When job completed
duration=null : float64 # Execution duration in seconds
error_message="" : varchar(2047) # Truncated error message
error_stack=null : <blob> # Full error traceback
user="" : varchar(255) # Database user who reserved/completed job
host="" : varchar(255) # Hostname of worker
pid=0 : uint32 # Process ID of worker
connection_id=0 : uint64 # MySQL connection ID
version="" : varchar(255) # Code version (git hash, package version, etc.)
Important: The jobs table primary key includes only those attributes that come through foreign keys in the target table's primary key. Additional primary key attributes (if any) are excluded. This means:
- If a target table has primary key
(-> Subject, -> Session, method), the jobs table has primary key(subject_id, session_id)only - Multiple target rows may map to a single job entry when additional PK attributes exist
- Jobs tables have no foreign key constraints for performance (stale jobs handled by
refresh())
Jobs are accessed as a property of the computed table:
# Current pattern (schema-level)
schema.jobs
# New pattern (per-table)
MyTable.jobs
# Examples
FilteredImage.jobs # Access jobs table
FilteredImage.jobs & 'status="error"' # Query errors
FilteredImage.jobs.refresh() # Refresh job queue| Status | Description |
|---|---|
pending |
Job is queued and ready to be processed |
reserved |
Job is currently being processed by a worker |
success |
Job completed successfully (optional, depends on settings) |
error |
Job failed with an error |
ignore |
Job should be skipped (manually set, not part of automatic transitions) |
stateDiagram-v2
state "(none)" as none1
state "(none)" as none2
none1 --> pending : refresh()
none1 --> ignore : ignore()
pending --> reserved : reserve()
reserved --> none2 : complete()
reserved --> success : complete()*
reserved --> error : error()
success --> pending : refresh()*
error --> none2 : delete()
success --> none2 : delete()
ignore --> none2 : delete()
complete()deletes the job entry (default whenjobs.keep_completed=False)complete()*keeps the job assuccess(whenjobs.keep_completed=True)refresh()*re-pends asuccessjob if its key is inkey_sourcebut not in target
Transition methods:
refresh()— Adds new jobs aspending; also re-pendssuccessjobs if key is inkey_sourcebut not in targetignore()— Marks a key asignore(can be called on keys not yet in jobs table)reserve()— Marks a pending job asreservedbefore callingmake()complete()— Marks reserved job assuccess, or deletes it (based onjobs.keep_completedsetting)error()— Marks reserved job aserrorwith message and stack tracedelete()— Inherited fromdelete_quick(); use(jobs & condition).delete()pattern
Manual status control:
ignoreis set manually viajobs.ignore(key)and is not part of automatic transitions- Jobs with
status='ignore'are skipped bypopulate()andrefresh() - To reset an ignored job, delete it and call
refresh():jobs.ignored.delete(); jobs.refresh()
class JobsTable(Table):
"""Hidden table managing job queue for a computed table."""
@property
def definition(self) -> str:
"""Dynamically generated based on parent table's primary key."""
...
def refresh(
self,
*restrictions,
delay: float = 0,
priority: int = None,
stale_timeout: float = None,
orphan_timeout: float = None
) -> dict:
"""
Refresh the jobs queue: add new jobs and clean up stale/orphaned jobs.
Operations performed:
1. Add new jobs: (key_source & restrictions) - target - jobs → insert as 'pending'
2. Re-pend success jobs: if keep_completed=True and key in key_source but not in target
3. Remove stale jobs: jobs older than stale_timeout whose keys no longer in key_source
4. Remove orphaned jobs: reserved jobs older than orphan_timeout (if specified)
Args:
restrictions: Conditions to filter key_source (for adding new jobs)
delay: Seconds from now until new jobs become available for processing.
Default: 0 (immediately available). Uses database server time.
priority: Priority for new jobs (lower = more urgent).
Default from config: jobs.default_priority (5)
stale_timeout: Seconds after which jobs are checked for staleness.
Jobs older than this are removed if key not in key_source.
Default from config: jobs.stale_timeout (3600s)
Set to 0 to skip stale cleanup.
orphan_timeout: Seconds after which reserved jobs are considered orphaned.
Reserved jobs older than this are deleted and re-added as pending.
Default: None (no orphan cleanup - must be explicit).
Typical value: 3600 (1 hour) or based on expected job duration.
Returns:
{
'added': int, # New pending jobs added
'removed': int, # Stale jobs removed
'orphaned': int, # Orphaned jobs reset to pending
're_pended': int # Success jobs re-pended (keep_completed mode)
}
"""
...
def reserve(self, key: dict) -> bool:
"""
Attempt to reserve a job for processing.
Updates status to 'reserved' if currently 'pending' and scheduled_time <= now.
No locking is used; rare conflicts are resolved by the make() transaction.
Returns:
True if reservation successful, False if job not found or not pending.
"""
...
def complete(self, key: dict, duration: float = None) -> None:
"""
Mark a job as successfully completed.
Updates status to 'success', records duration and completion time.
"""
...
def error(self, key: dict, error_message: str, error_stack: str = None) -> None:
"""
Mark a job as failed with error details.
Updates status to 'error', records error message and stack trace.
"""
...
def ignore(self, key: dict) -> None:
"""
Mark a job to be ignored (skipped during populate).
To reset an ignored job, delete it and call refresh().
"""
...
# delete() is inherited from delete_quick() - no confirmation required
# Usage: (jobs & condition).delete() or jobs.errors.delete()
@property
def pending(self) -> QueryExpression:
"""Return query for pending jobs."""
return self & 'status="pending"'
@property
def reserved(self) -> QueryExpression:
"""Return query for reserved jobs."""
return self & 'status="reserved"'
@property
def errors(self) -> QueryExpression:
"""Return query for error jobs."""
return self & 'status="error"'
@property
def ignored(self) -> QueryExpression:
"""Return query for ignored jobs."""
return self & 'status="ignore"'
@property
def completed(self) -> QueryExpression:
"""Return query for completed jobs."""
return self & 'status="success"'
def progress(self) -> dict:
"""
Return job status breakdown.
Returns:
{
'pending': int, # Jobs waiting to be processed
'reserved': int, # Jobs currently being processed
'success': int, # Completed jobs (if keep_completed=True)
'error': int, # Failed jobs
'ignore': int, # Ignored jobs
'total': int # Total jobs in table
}
"""
...The populate() method is updated to use the new jobs table:
def populate(
self,
*restrictions,
suppress_errors: bool = False,
return_exception_objects: bool = False,
reserve_jobs: bool = False,
max_calls: int = None,
display_progress: bool = False,
processes: int = 1,
make_kwargs: dict = None,
# New parameters
priority: int = None, # Only process jobs at this priority or more urgent (lower values)
refresh: bool = None, # Refresh jobs queue before processing (default from config)
) -> dict:
"""
Populate the table by calling make() for each missing entry.
Behavior depends on reserve_jobs parameter:
When reserve_jobs=False (default, 1.0 compatibility mode):
- Jobs table is NOT used
- Keys computed directly from: (key_source & restrictions) - target
- No job reservation, no status tracking
- Suitable for single-worker scenarios
When reserve_jobs=True (distributed mode):
1. If refresh=True (or config['jobs.auto_refresh'] when refresh=None):
Call self.jobs.refresh(*restrictions) to sync jobs queue
2. Fetch pending jobs ordered by (priority ASC, scheduled_time ASC)
Apply max_calls limit to fetched keys (total across all processes)
3. For each pending job where scheduled_time <= now:
a. Mark job as 'reserved'
b. Call make(key)
c. On success: mark job as 'success' or delete (based on keep_completed)
d. On error: mark job as 'error' with message/stack
4. Continue until all fetched jobs processed
Args:
restrictions: Conditions to filter key_source
suppress_errors: If True, collect errors instead of raising
return_exception_objects: Return exception objects vs strings
reserve_jobs: Enable job reservation for distributed processing
max_calls: Maximum number of make() calls (total across all processes)
display_progress: Show progress bar
processes: Number of worker processes
make_kwargs: Non-computation kwargs passed to make()
priority: Only process jobs at this priority or more urgent (lower values)
refresh: Refresh jobs queue before processing. Default from config['jobs.auto_refresh']
Deprecated parameters (removed in 2.0):
- 'order': Job ordering now controlled by priority. Use refresh(priority=N).
- 'limit': Use max_calls instead. The distinction was confusing (see #1203).
- 'keys': Use restrictions instead. Direct key specification bypassed job tracking.
"""
...# Current progress reporting
remaining, total = MyTable.progress()
# Enhanced progress with jobs table
MyTable.jobs.progress() # Returns detailed status breakdown
# Example output:
# {
# 'pending': 150,
# 'reserved': 3,
# 'success': 847,
# 'error': 12,
# 'ignore': 5,
# 'total': 1017
# }Priority and scheduling are handled via refresh() parameters. Lower priority values are more urgent (0 = highest priority). Scheduling uses relative time (seconds from now) based on database server time.
# Add urgent jobs (priority=0 is most urgent)
MyTable.jobs.refresh(priority=0)
# Add normal jobs (default priority=5)
MyTable.jobs.refresh()
# Add low-priority background jobs
MyTable.jobs.refresh(priority=10)
# Schedule jobs for future processing (2 hours from now)
MyTable.jobs.refresh(delay=2*60*60) # 7200 seconds
# Schedule jobs for tomorrow (24 hours from now)
MyTable.jobs.refresh(delay=24*60*60)
# Combine: urgent jobs with 1-hour delay
MyTable.jobs.refresh(priority=0, delay=3600)
# Add urgent jobs for specific subjects
MyTable.jobs.refresh(Subject & 'priority="urgent"', priority=0)Jobs tables use the ~~ prefix (double tilde):
- Table
FilteredImage(stored as__filtered_image) - Jobs table:
~~filtered_image(stored as~~filtered_image)
The ~~ prefix distinguishes jobs tables from other hidden tables (~jobs, ~lineage) while keeping names short.
New tables: Auto-populated tables (dj.Computed, dj.Imported) must have primary keys composed entirely of foreign key references. Non-FK primary key attributes are prohibited.
# ALLOWED - all PK attributes come from foreign keys
@schema
class FilteredImage(dj.Computed):
definition = """
-> Image
---
filtered_image : <djblob>
"""
# ALLOWED - multiple FKs in primary key
@schema
class Analysis(dj.Computed):
definition = """
-> Recording
-> AnalysisMethod # method comes from FK to lookup table
---
result : float64
"""
# NOT ALLOWED - raises error on table declaration
@schema
class Analysis(dj.Computed):
definition = """
-> Recording
method : varchar(32) # ERROR: non-FK primary key attribute
---
result : float64
"""Rationale: This constraint ensures 1:1 correspondence between jobs and target rows, simplifying job status tracking and eliminating ambiguity.
Legacy table support: Existing tables with non-FK primary key attributes continue to work. The jobs table uses only the FK-derived attributes, treating additional PK attributes as if they were secondary attributes. This means:
- One job entry may correspond to multiple target rows
- Job marked
successwhen ANY matching target row exists - Job marked
pendingonly when NO matching target rows exist
# Legacy table (created before 2.0)
# Jobs table primary key: (recording_id) only
# One job covers all 'method' values for a given recording
@schema
class LegacyAnalysis(dj.Computed):
definition = """
-> Recording
method : varchar(32) # Non-FK attribute (legacy, not recommended)
---
result : float64
"""The jobs table has no foreign key constraints for performance reasons.
Stale jobs are jobs (any status except ignore) whose keys no longer exist in key_source. Since there are no FK constraints on jobs tables, these jobs remain until cleaned up by refresh():
# refresh() handles stale jobs automatically
result = FilteredImage.jobs.refresh()
# Returns: {'added': 10, 'removed': 3, 'orphaned': 0, 're_pended': 0}
# Stale detection logic:
# 1. Find jobs where created_time < (now - stale_timeout)
# 2. Check if their keys still exist in key_source
# 3. Remove jobs (pending, reserved, success, error) whose keys no longer exist
# 4. Jobs with status='ignore' are never removed (permanent until manual delete)Why not use foreign key cascading deletes?
- FK constraints add overhead on every insert/update/delete operation
- Jobs tables are high-traffic (frequent reservations and status updates)
- Stale jobs are harmless until refresh—they simply won't match key_source
- The
refresh()approach is more efficient for batch cleanup
Orphaned jobs are reserved jobs whose worker is no longer running. Unlike stale jobs, orphaned jobs reference valid keys—only the worker has disappeared.
# Automatic orphan cleanup (use with caution)
result = FilteredImage.jobs.refresh(orphan_timeout=3600) # 1 hour
# Jobs reserved more than 1 hour ago are deleted and re-added as pending
# Returns: {'added': 0, 'removed': 0, 'orphaned': 5, 're_pended': 0}
# Manual orphan cleanup (more control)
(FilteredImage.jobs.reserved & 'reserved_time < NOW() - INTERVAL 2 HOUR').delete()
FilteredImage.jobs.refresh() # Re-adds as pending if key still in key_sourceWhen to use orphan_timeout:
- In automated pipelines where job duration is predictable
- When workers are known to have failed (cluster node died)
- Set timeout > expected max job duration to avoid killing active jobs
When NOT to use orphan_timeout:
- When job durations are highly variable
- When you need to coordinate with external orchestration
- Default is None (disabled) for safety
When an auto-populated table is dropped, its associated jobs table is automatically dropped:
# Dropping FilteredImage also drops ~~filtered_image
FilteredImage.drop()When an auto-populated table is altered (e.g., primary key changes), the jobs table is dropped and can be recreated via refresh():
# Alter that changes primary key structure
# Jobs table is dropped since its structure no longer matches
FilteredImage.alter()
# Recreate jobs table with new structure
FilteredImage.jobs.refresh()Jobs tables are created automatically on first use:
# First call to populate with reserve_jobs=True creates the jobs table
FilteredImage.populate(reserve_jobs=True)
# Creates ~~filtered_image if it doesn't exist, then populates
# Alternatively, explicitly create/refresh the jobs table
FilteredImage.jobs.refresh()The jobs table is created with a primary key derived from the target table's foreign key attributes.
Conflict resolution relies on the transaction surrounding each make() call:
- With
reserve_jobs=False: Workers querykey_sourcedirectly and may attempt the same key - With
reserve_jobs=True: Job reservation reduces conflicts but doesn't eliminate them entirely
When two workers attempt to populate the same key:
- Both workers attempt to reserve the same job (near-simultaneous)
- Both reservation attempts succeed (no locking used)
- Both call
make()for the same key - First worker's
make()transaction commits successfully - Second worker's
make()transaction fails with duplicate key error - Second worker silently moves to next job (no status update)
- First worker marks job
successor deletes it
Important: Only errors inside make() are logged with error status. Duplicate key errors from collisions are coordination artifacts handled silently—the first worker's completion takes precedence.
Edge case - first worker crashes after insert:
- Job stays
reserved(orphaned) - Row exists in table (insert succeeded)
- Resolution:
refresh(orphan_timeout=...)sees key exists in table, removes orphaned job
Why this is acceptable:
- The
make()transaction guarantees data integrity - Duplicate key error is a clean, expected signal (not a real error)
- With
reserve_jobs=True, conflicts are rare - Wasted computation is minimal compared to locking complexity
The job reservation mechanism (reserve_jobs=True) allows workers to dynamically claim jobs from a shared queue. However, some orchestration systems may prefer to pre-partition jobs before distributing them to workers:
# Pre-partitioning example: orchestrator divides work explicitly
all_pending = FilteredImage.jobs.pending.fetch("KEY")
# Split jobs among workers (e.g., by worker index)
n_workers = 4
for worker_id in range(n_workers):
worker_keys = all_pending[worker_id::n_workers] # Round-robin assignment
# Send worker_keys to worker via orchestration system (Slurm, K8s, etc.)
# Worker receives its assigned keys and processes them directly
# Pass keys as restrictions to filter key_source
for key in assigned_keys:
FilteredImage.populate(key) # key acts as restriction, reserve_jobs=False by defaultWhen to use each approach:
| Approach | Use Case |
|---|---|
Dynamic reservation (reserve_jobs=True) |
Simple setups, variable job durations, workers that start/stop dynamically |
| Pre-partitioning | Batch schedulers (Slurm, PBS), predictable job counts, avoiding reservation overhead |
Both approaches benefit from the same transaction-based conflict resolution as a safety net.
New configuration settings for job management:
# In datajoint config
dj.config['jobs.auto_refresh'] = True # Auto-refresh on populate (default: True)
dj.config['jobs.keep_completed'] = False # Keep success records (default: False)
dj.config['jobs.stale_timeout'] = 3600 # Seconds before pending job is considered stale (default: 3600)
dj.config['jobs.default_priority'] = 5 # Default priority for new jobs (lower = more urgent)
dj.config['jobs.version'] = None # Version string for jobs (default: None)
# Special values: 'git' = auto-detect git hashWhen both config and method parameters are available, explicit parameters override config values:
# Config sets defaults
dj.config['jobs.auto_refresh'] = True
dj.config['jobs.default_priority'] = 5
# Parameter overrides config
MyTable.populate(reserve_jobs=True, refresh=False) # refresh=False wins
MyTable.jobs.refresh(priority=0) # priority=0 winsParameters set to None use the config default. This allows per-call customization while maintaining global defaults.
# Worker 1
FilteredImage.populate(reserve_jobs=True)
# Worker 2 (can run simultaneously)
FilteredImage.populate(reserve_jobs=True)
# Monitor progress
print(FilteredImage.jobs.progress())# Add urgent jobs (priority=0 is most urgent)
urgent_subjects = Subject & 'priority="urgent"'
FilteredImage.jobs.refresh(urgent_subjects, priority=0)
# Workers will process lowest-priority-value jobs first
FilteredImage.populate(reserve_jobs=True)# Schedule jobs for overnight processing (8 hours from now)
FilteredImage.jobs.refresh('subject_id > 100', delay=8*60*60)
# Only jobs whose scheduled_time <= now will be processed
FilteredImage.populate(reserve_jobs=True)# View errors
errors = FilteredImage.jobs.errors.fetch(as_dict=True)
for err in errors:
print(f"Key: {err['subject_id']}, Error: {err['error_message']}")
# Delete specific error jobs after fixing the issue
(FilteredImage.jobs & 'subject_id=42').delete()
# Delete all error jobs
FilteredImage.jobs.errors.delete()
# Re-add deleted jobs as pending (if keys still in key_source)
FilteredImage.jobs.refresh()# Get pipeline-wide status using schema.jobs
def pipeline_status(schema):
return {
jt.table_name: jt.progress()
for jt in schema.jobs
}
# Example output:
# {
# 'FilteredImage': {'pending': 150, 'reserved': 3, 'success': 847, 'error': 12},
# 'Analysis': {'pending': 500, 'reserved': 0, 'success': 0, 'error': 0},
# }
# Refresh all jobs tables in the schema
for jobs_table in schema.jobs:
jobs_table.refresh()
# Get all errors across the pipeline
all_errors = []
for jt in schema.jobs:
errors = jt.errors.fetch(as_dict=True)
for err in errors:
err['_table'] = jt.table_name
all_errors.append(err)This is a major release. The legacy schema-level ~jobs table is replaced by per-table jobs tables:
- Legacy
~jobstable: No longer used; can be dropped manually if present - New jobs tables: Created automatically on first
populate(reserve_jobs=True)call - No parallel support: Teams should migrate cleanly to the new system
The schema.jobs property returns a list of all jobs table objects for auto-populated tables in the schema:
# Returns list of JobsTable objects
schema.jobs
# [FilteredImage.jobs, Analysis.jobs, ...]
# Iterate over all jobs tables
for jobs_table in schema.jobs:
print(f"{jobs_table.table_name}: {jobs_table.progress()}")
# Query all errors across the schema
all_errors = [job for jt in schema.jobs for job in jt.errors.fetch(as_dict=True)]
# Refresh all jobs tables
for jobs_table in schema.jobs:
jobs_table.refresh()This replaces the legacy single ~jobs table with direct access to per-table jobs.
This section identifies potential hazards and their mitigations.
| Hazard | Description | Mitigation |
|---|---|---|
| Simultaneous reservation | Two workers reserve the same pending job at nearly the same time | Acceptable: duplicate make() calls are resolved by transaction—second worker gets duplicate key error |
| Reserve during refresh | Worker reserves a job while another process is running refresh() |
No conflict: refresh() adds new jobs and removes stale ones; reservation updates existing rows |
| Concurrent refresh calls | Multiple processes call refresh() simultaneously |
Acceptable: may result in duplicate insert attempts, but primary key constraint prevents duplicates |
| Complete vs delete race | One process completes a job while another deletes it | Acceptable: one operation succeeds, other becomes no-op (row not found) |
| Hazard | Description | Mitigation |
|---|---|---|
| Invalid state transition | Code attempts illegal transition (e.g., pending → success) | Implementation enforces valid transitions; invalid attempts raise error |
| Stuck in reserved | Worker crashes while job is reserved (orphaned job) | Manual intervention required: jobs.reserved.delete() (see Orphaned Job Handling) |
| Success re-pended unexpectedly | refresh() re-pends a success job when user expected it to stay |
Only occurs if keep_completed=True AND key exists in key_source but not in target; document clearly |
| Ignore not respected | Ignored jobs get processed anyway | Implementation must skip status='ignore' in populate() job fetching |
| Hazard | Description | Mitigation |
|---|---|---|
| Stale job processed | Job references deleted upstream data | make() will fail or produce invalid results; refresh() cleans stale jobs before processing |
| Jobs table out of sync | Jobs table doesn't match key_source |
refresh() synchronizes; call periodically or rely on populate(refresh=True) |
| Partial make failure | make() partially succeeds then fails |
DataJoint transaction rollback ensures atomicity; job marked as error |
| Error message truncation | Error details exceed varchar(2047) |
Full stack stored in error_stack (mediumblob); error_message is summary only |
| Hazard | Description | Mitigation |
|---|---|---|
| Large jobs table | Jobs table grows very large with keep_completed=True |
Default is keep_completed=False; provide guidance on periodic cleanup |
| Slow refresh on large key_source | refresh() queries entire key_source |
Can restrict refresh to subsets: jobs.refresh(Subject & 'lab="smith"') |
| Many jobs tables per schema | Schema with many computed tables has many jobs tables | Jobs tables are lightweight; only created on first use |
| Hazard | Description | Mitigation |
|---|---|---|
| Accidental job deletion | User runs jobs.delete() without restriction |
delete() inherits from delete_quick() (no confirmation); users must apply restrictions carefully |
| Clearing active jobs | User clears reserved jobs while workers are still running | May cause duplicated work if job is refreshed and picked up again; coordinate with orchestrator |
| Priority confusion | User expects higher number = higher priority | Document clearly: lower values are more urgent (0 = highest priority) |
| Hazard | Description | Mitigation |
|---|---|---|
| Legacy ~jobs table conflict | Old ~jobs table exists alongside new per-table jobs |
Systems are independent; legacy table can be dropped manually |
| Mixed version workers | Some workers use old system, some use new | Major release; do not support mixed operation—require full migration |
| Lost error history | Migrating loses error records from legacy table | Document migration procedure; users can export legacy errors before migration |
- Web-based dashboard for job monitoring
- Webhook notifications for job completion/failure
- Job dependencies (job B waits for job A)
- Resource tagging (GPU required, high memory, etc.)
- Retry policies (max retries, exponential backoff)
- Job grouping/batching for efficiency
- Integration with external schedulers (Slurm, PBS, etc.)
The team considered integrating external tools like Airflow or Flyte but rejected this approach because:
- Deployment complexity: External orchestrators require significant infrastructure
- Maintenance burden: Additional systems to maintain and monitor
- Accessibility: Not all DataJoint users have access to orchestration platforms
- Tight integration: DataJoint's transaction model requires close coordination
The built-in jobs system provides 80% of the value with minimal additional complexity.
Per-table jobs tables provide:
- Better isolation: Jobs for one table don't affect others
- Simpler queries: No need to filter by table_name
- Native keys: Primary keys are readable, not hashed
- High performance: No FK constraints means minimal overhead on job operations
- Scalability: Each table's jobs can be indexed independently
The current system hashes primary keys to support arbitrary key types. The new system uses native keys because:
- Readability: Debugging is much easier with readable keys
- Query efficiency: Native keys can use table indexes
- Foreign keys: Hash-based keys cannot participate in foreign key relationships
- Simplicity: No need for hash computation and comparison
The jobs table primary key includes only attributes derived from foreign keys in the target table's primary key. This design:
- Aligns with key_source: The
key_sourcequery naturally produces keys matching the FK-derived attributes - Simplifies job identity: A job's identity is determined by its upstream dependencies
- Handles additional PK attributes: When targets have additional PK attributes (e.g.,
method), one job covers all values for that attribute