You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: contributing/PIPELINES.md
+31Lines changed: 31 additions & 0 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -22,8 +22,27 @@ Related notes:
22
22
23
23
* All write APIs must respect DB-level locks. The endpoints can either try to acquire the lock with a timeout and error or provide an async API by storing the request in the DB.
24
24
25
+
## Implementation checklist
26
+
27
+
Brief checklist for implementing a new pipeline:
28
+
29
+
1. Fetcher locks only rows that are ready for processing:
30
+
`status/time` filters, `lock_expires_at` is empty or expired, and `lock_owner` is empty or equal to the pipeline name. Keep the fetch order stable with `last_processed_at`.
31
+
2. Fetcher takes row locks with `skip_locked` and updates `lock_expires_at`, `lock_token`, `lock_owner` before enqueueing items.
32
+
3. Worker keeps heavy work outside DB sessions. DB sessions should be short and used only for refetch/locking and final apply.
33
+
4. Apply stage updates rows using update maps/update rows, not by relying on mutating detached ORM models.
34
+
5. Main apply update is guarded by `id + lock_token`. If the update affects `0` rows, the item is stale and processing results must not be applied.
35
+
6. Successful apply updates `last_processed_at` and unlocks resources that were locked by this item.
36
+
7. If related lock is unavailable, reset main lock for retry: keep `lock_owner`, clear `lock_token` and `lock_expires_at`, and set `last_processed_at` to now.
37
+
8. Register the pipeline in `PipelineManager` and hint fetch from services after commit via `pipeline_hinter.hint_fetch(Model.__name__)`.
38
+
9. Add minimum tests: fetch eligibility/order, successful unlock path, stale lock token path, and related lock contention retry path.
39
+
25
40
## Implementation patterns
26
41
42
+
**Guarded apply by lock token**
43
+
44
+
When writing processing results, update the main row with a filter by both `id` and `lock_token`. This guarantees that only the worker that still owns the lock can apply its results. If the update affects no rows, treat the item as stale and skip applying other changes (status changes, related updates, events). A stale item means another worker or replica already continued processing.
45
+
27
46
**Locking many related resources**
28
47
29
48
A pipeline may need to lock a potentially big set of related resource, e.g. fleet pipeline locking all fleet's instances. For this, do one SELECT FOR UPDATE of non-locked instances and one SELECT to see how many instances there are, and check if you managed to lock all of them. If fail to lock, release the main lock and try processing on another fetch iteration. You may keep `lock_owner` on the main resource or set `lock_owner` on locked related resource and make other pipelines respect that to guarantee the eventual locking of all related resources and avoid lock starvation.
@@ -32,6 +51,18 @@ A pipeline may need to lock a potentially big set of related resource, e.g. flee
32
51
33
52
Multiple main resources may need to lock the same related resource, e.g. multiple jobs may need to change the shared instance. In this case it's not sufficient to set `lock_owner` on the related resource to the pipeline name because workers processing different main resources can still race with each other. To avoid heartbeating the related resource, you may include main resource id in `lock_owner`, e.g. set `lock_owner = f"{Pipeline.__name__}:{item.id}"`.
34
53
54
+
**Reset-and-retry when related lock is unavailable**
55
+
56
+
If a worker cannot lock a required related resource, it should release only the main lock state needed for fast retry: unset `lock_token` and `lock_expires_at`, keep `lock_owner`, and set `last_processed_at` to now. This avoids long waiting and lets the same pipeline retry quickly on the next fetch iteration while other pipelines can still respect ownership intent.
57
+
58
+
**Dealing with side effects**
59
+
60
+
If processing has side effects and the apply phase fails due to a lock mismatch, there are several options: a) revert side effects b) make processing idempotent, i.e. next processing iteration detects side effects does not perform duplicating actions c) log side effects as errors and warn user about possible issues such as orphaned instances – as a temporary solution.
61
+
62
+
**Bulk apply with one consistent current time**
63
+
64
+
When apply needs to update multiple rows (main + related resources), build update maps/update rows first and resolve current-time placeholders once in the apply transaction using `NOW_PLACEHOLDER` + `resolve_now_placeholders()`. This keeps timestamps consistent across all rows and avoids subtle ordering bugs when the same processing pass writes several `*_at` fields.
65
+
35
66
## Performance analysis
36
67
37
68
* Pipeline throughput = workers_num / worker_processing_time. So quick tasks easily give high-throughput pipelines, e.g. 1s task with 20 workers is 1200 tasks/min.
0 commit comments