|
| 1 | +# DataJoint Jobs System |
| 2 | + |
| 3 | +This document describes the behavior and mechanism of DataJoint's jobs reservation and execution system. |
| 4 | + |
| 5 | +## Jobs Table Structure |
| 6 | + |
| 7 | +The jobs table (`~jobs`) is a system table that tracks the state and execution of jobs in the DataJoint pipeline. It has the following key fields: |
| 8 | + |
| 9 | +- `table_name`: The full table name being populated |
| 10 | +- `key_hash`: A hash of the job's primary key |
| 11 | +- `status`: Current job status, one of: |
| 12 | + - `scheduled`: Job is queued for execution |
| 13 | + - `reserved`: Job is currently being processed |
| 14 | + - `error`: Job failed with an error |
| 15 | + - `ignore`: Job is marked to be ignored |
| 16 | + - `success`: Job completed successfully |
| 17 | +- `key`: JSON structure containing the job's primary key |
| 18 | +- `error_message`: Error message if job failed |
| 19 | +- `error_stack`: Stack trace if job failed |
| 20 | +- `user`: Database user who created the job |
| 21 | +- `host`: System hostname where job was created |
| 22 | +- `pid`: Process ID of the job |
| 23 | +- `connection_id`: Database connection ID |
| 24 | +- `timestamp`: When the job status was last changed |
| 25 | +- `run_duration`: How long the job took to execute (in seconds) |
| 26 | +- `run_version`: Version information of the code/environment |
| 27 | + |
| 28 | +## Job Scheduling Process |
| 29 | + |
| 30 | +The `schedule_jobs` method implements an optimization strategy to prevent excessive scheduling: |
| 31 | + |
| 32 | +1. **Rate Limiting**: |
| 33 | + - Uses `min_scheduling_interval` (configurable via `dj.config["min_scheduling_interval"]`) |
| 34 | + - Default interval is 5 seconds |
| 35 | + - Can be overridden per call |
| 36 | + |
| 37 | +2. **Scheduling Logic**: |
| 38 | + - Checks for recent scheduling events within the interval |
| 39 | + - Skips scheduling if recent events exist |
| 40 | + - Otherwise, finds keys that need computation by: |
| 41 | + 1. Querying the `key_source` to get all possible keys |
| 42 | + 2. Excluding keys that already exist in the target table |
| 43 | + 3. Excluding keys that are already in the jobs table with incompatible status |
| 44 | + (i.e., `scheduled`, `reserved`, or `success`) |
| 45 | + - Schedules each valid key as a new job |
| 46 | + - Records scheduling events for rate limiting |
| 47 | + |
| 48 | +3. **Job States**: |
| 49 | + - New jobs start as `scheduled` |
| 50 | + - Jobs can be rescheduled if in `error` or `ignore` state (with `force=True`) |
| 51 | + - Prevents rescheduling if job is `scheduled`, `reserved`, or `success` |
| 52 | + |
| 53 | +## Populate Process Flow |
| 54 | + |
| 55 | +The `populate()` method orchestrates the job execution process: |
| 56 | + |
| 57 | +1. **Initialization**: |
| 58 | + - Optionally schedules new jobs (controlled by `schedule_jobs` parameter) |
| 59 | + |
| 60 | +2. **Job Selection**: |
| 61 | + - If `reserve_jobs=True`: |
| 62 | + - Fetches `scheduled` jobs from the jobs table |
| 63 | + - Applies any restrictions to the job set |
| 64 | + - Attempts to reserve each job before processing |
| 65 | + - Skips jobs that cannot be reserved (already taken by another process) |
| 66 | + - If `reserve_jobs=False`: |
| 67 | + - Uses traditional direct computation approach |
| 68 | + |
| 69 | +3. **Execution**: |
| 70 | + - Processes jobs in specified order (`original`, `reverse`, or `random`) |
| 71 | + - Supports single or multi-process execution |
| 72 | + - For reserved jobs: |
| 73 | + - Updates job status to `reserved` during processing |
| 74 | + - Records execution metrics (duration, version) |
| 75 | + - Updates status to `success` or `error` on completion |
| 76 | + - Records errors and execution metrics |
| 77 | + |
| 78 | +4. **Cleanup**: |
| 79 | + - Optionally purges invalid jobs |
| 80 | + |
| 81 | +## Job Cleanup Process |
| 82 | + |
| 83 | +The `purge_invalid_jobs` method maintains database consistency by removing invalid jobs: |
| 84 | + |
| 85 | +1. **Invalid Success Jobs**: |
| 86 | + - Identifies jobs marked as `success` but not present in the target table |
| 87 | + - These typically occur when target table entries are deleted |
| 88 | + |
| 89 | +2. **Invalid Incomplete Jobs**: |
| 90 | + - Identifies jobs in `scheduled`/`error`/`ignore` state that are no longer in the `key_source` |
| 91 | + - These typically occur when upstream table entries are deleted |
| 92 | + |
| 93 | +3. **Cleanup Characteristics**: |
| 94 | + - Potentially time-consuming operation |
| 95 | + - Should not need to run frequently |
| 96 | + - Helps maintain database consistency |
| 97 | + |
| 98 | +## Jobs Table Maintenance |
| 99 | + |
| 100 | +The "freshness" and consistency of the jobs table depends on regular maintenance through two key operations: |
| 101 | + |
| 102 | +1. **Scheduling Updates** (`schedule_jobs`): |
| 103 | + - Adds new jobs to the table |
| 104 | + - Should be run frequently enough to keep up with new data |
| 105 | + - Rate-limited by `min_scheduling_interval` to prevent overload |
| 106 | + - Example: Run every few minutes in a cron job for active pipelines |
| 107 | + - Event-driven approach: `inserts` in upstream tables auto trigger this step |
| 108 | + |
| 109 | +2. **Cleanup** (`purge_invalid_jobs`): |
| 110 | + - Removes invalid or outdated jobs |
| 111 | + - Should be run periodically to maintain consistency |
| 112 | + - More resource-intensive than scheduling |
| 113 | + - Example: Run daily during low-activity periods |
| 114 | + - Event-driven approach: `deletes` in upstream or target tables auto trigger this step |
| 115 | + |
| 116 | +The balance between these operations affects: |
| 117 | +- How quickly new jobs are discovered and scheduled |
| 118 | +- How long invalid jobs remain in the table |
| 119 | +- Database size and query performance |
| 120 | +- Overall system responsiveness |
| 121 | + |
| 122 | +Recommended maintenance schedule: |
| 123 | +```python |
| 124 | +# Example: Run scheduling frequently |
| 125 | +dj.config["min_scheduling_interval"] = 300 # 5 minutes |
| 126 | + |
| 127 | +# Example: Run cleanup daily |
| 128 | +# (implement as a cron job or scheduled task) |
| 129 | +def daily_cleanup(): |
| 130 | + for table in your_pipeline_tables: |
| 131 | + table.purge_invalid_jobs() |
| 132 | +``` |
0 commit comments