Commit a526d93
committed
Implement resumable jobs
Here, implement "resumable" jobs, which are jobs that can checkpoint
their progress so that in case they have to stop early, they're picked
up from a point that lets them skip work that's already been done. This
is especially useful for long running jobs that are at risk of being
interrupted from something like a deploy.
Here's roughly the shape of the API, with the same normal `Work`
function that all jobs implement, and with a series of `ResumableStep`
calls within, each of which take a name for the step and function
representing it:
func (w *ResumableWorker) Work(ctx context.Context, job *river.Job[ResumableArgs]) error {
river.ResumableStep(ctx, "step1", func(ctx context.Context) error {
fmt.Println("Step 1")
return nil
})
river.ResumableStep(ctx, "step2", func(ctx context.Context) error {
fmt.Println("Step 2")
return nil
})
river.ResumableStep(ctx, "step3", func(ctx context.Context) error {
fmt.Println("Step 3")
return nil
})
return nil
}
We also provide a cursor API for more granularity. This lets a step set
an arbitrary cursor value periodically as it's doing something like
looping over records in a set:
river.ResumableStepCursor(ctx, "process_ids", func(ctx context.Context, cursor ResumableCursor) error {
for _, id := range job.Args.IDs {
if id <= cursor.LastProcessedID {
continue
}
fmt.Printf("Processed %d\n", id)
if err := river.ResumableSetCursor(ctx, ResumableCursor{LastProcessedID: id}); err != nil {
return err
}
}
return nil
})
The function is `ResumableStepCursor[TCursor any]` where `TCursor` can
be defined arbitrarily by the user. This could be a simple scalar value
representing an ID, or a more complex `struct` value containing multiple
IDs, enabling nested loops that set inner and outer IDs at the same time.
`ResumableStep` and `ResumableStepCursor` steps can be freely
intermingled, and multiple `ResumableStepCursor` steps with different
cursor types are supported. Cursors must be JSON marshable because
they're stored to a job's metadata.
Lastly, we provide `ResumableSetStepTx` and `ResumableSetStepCursorTx`
for cases where a transaction guarantee is necessary. Normally,
resumable step and cursor are set as a job's being completed, but
there's a chance this is never called in case of sudden failure.
`ResumableSetStepTx` (and its cursor version) is available to durably
persist a step at the cost of an extra database operation similar to how
`JobCompleteTx` does the same for job completion.
One neat aspect the implementation here is that I was able to make it
entirely middleware-only. So all the resumable job logic goes in an
internal `resumableMiddleware` that's included in all clients by
default. This is kind of nice because it keeps its code highly modular
and will hopefully act as a template for future features.1 parent a4143a0 commit a526d93
12 files changed
Lines changed: 1180 additions & 8 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
780 | 780 | | |
781 | 781 | | |
782 | 782 | | |
783 | | - | |
| 783 | + | |
| 784 | + | |
784 | 785 | | |
785 | 786 | | |
786 | 787 | | |
| |||
1002 | 1003 | | |
1003 | 1004 | | |
1004 | 1005 | | |
| 1006 | + | |
| 1007 | + | |
| 1008 | + | |
| 1009 | + | |
1005 | 1010 | | |
1006 | 1011 | | |
1007 | 1012 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
82 | 82 | | |
83 | 83 | | |
84 | 84 | | |
| 85 | + | |
| 86 | + | |
| 87 | + | |
| 88 | + | |
| 89 | + | |
| 90 | + | |
| 91 | + | |
| 92 | + | |
| 93 | + | |
| 94 | + | |
| 95 | + | |
| 96 | + | |
| 97 | + | |
| 98 | + | |
| 99 | + | |
| 100 | + | |
| 101 | + | |
| 102 | + | |
| 103 | + | |
| 104 | + | |
| 105 | + | |
| 106 | + | |
| 107 | + | |
| 108 | + | |
| 109 | + | |
| 110 | + | |
| 111 | + | |
| 112 | + | |
| 113 | + | |
| 114 | + | |
| 115 | + | |
| 116 | + | |
| 117 | + | |
| 118 | + | |
| 119 | + | |
| 120 | + | |
| 121 | + | |
| 122 | + | |
| 123 | + | |
| 124 | + | |
| 125 | + | |
| 126 | + | |
| 127 | + | |
| 128 | + | |
| 129 | + | |
| 130 | + | |
| 131 | + | |
| 132 | + | |
| 133 | + | |
| 134 | + | |
| 135 | + | |
| 136 | + | |
| 137 | + | |
| 138 | + | |
| 139 | + | |
| 140 | + | |
85 | 141 | | |
86 | 142 | | |
87 | 143 | | |
| |||
6936 | 6992 | | |
6937 | 6993 | | |
6938 | 6994 | | |
| 6995 | + | |
| 6996 | + | |
| 6997 | + | |
| 6998 | + | |
| 6999 | + | |
| 7000 | + | |
| 7001 | + | |
| 7002 | + | |
| 7003 | + | |
| 7004 | + | |
| 7005 | + | |
| 7006 | + | |
| 7007 | + | |
| 7008 | + | |
| 7009 | + | |
| 7010 | + | |
| 7011 | + | |
| 7012 | + | |
| 7013 | + | |
| 7014 | + | |
| 7015 | + | |
| 7016 | + | |
| 7017 | + | |
| 7018 | + | |
| 7019 | + | |
| 7020 | + | |
| 7021 | + | |
| 7022 | + | |
| 7023 | + | |
| 7024 | + | |
| 7025 | + | |
| 7026 | + | |
| 7027 | + | |
| 7028 | + | |
| 7029 | + | |
| 7030 | + | |
| 7031 | + | |
| 7032 | + | |
| 7033 | + | |
| 7034 | + | |
| 7035 | + | |
| 7036 | + | |
| 7037 | + | |
| 7038 | + | |
| 7039 | + | |
| 7040 | + | |
| 7041 | + | |
| 7042 | + | |
| 7043 | + | |
| 7044 | + | |
| 7045 | + | |
| 7046 | + | |
| 7047 | + | |
| 7048 | + | |
6939 | 7049 | | |
6940 | 7050 | | |
6941 | 7051 | | |
| |||
7602 | 7712 | | |
7603 | 7713 | | |
7604 | 7714 | | |
7605 | | - | |
| 7715 | + | |
7606 | 7716 | | |
7607 | 7717 | | |
7608 | 7718 | | |
| |||
7613 | 7723 | | |
7614 | 7724 | | |
7615 | 7725 | | |
7616 | | - | |
| 7726 | + | |
7617 | 7727 | | |
7618 | 7728 | | |
7619 | 7729 | | |
| |||
7625 | 7735 | | |
7626 | 7736 | | |
7627 | 7737 | | |
7628 | | - | |
| 7738 | + | |
7629 | 7739 | | |
7630 | 7740 | | |
7631 | 7741 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
| 1 | + | |
| 2 | + | |
| 3 | + | |
| 4 | + | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
| 8 | + | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
| 58 | + | |
| 59 | + | |
| 60 | + | |
| 61 | + | |
| 62 | + | |
| 63 | + | |
| 64 | + | |
| 65 | + | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
| 70 | + | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
| 75 | + | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
| 82 | + | |
| 83 | + | |
| 84 | + | |
| 85 | + | |
| 86 | + | |
| 87 | + | |
| 88 | + | |
| 89 | + | |
| 90 | + | |
| 91 | + | |
| 92 | + | |
| 93 | + | |
| 94 | + | |
| 95 | + | |
| 96 | + | |
| 97 | + | |
| 98 | + | |
| 99 | + | |
| 100 | + | |
| 101 | + | |
| 102 | + | |
| 103 | + | |
| 104 | + | |
| 105 | + | |
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
| 1 | + | |
| 2 | + | |
| 3 | + | |
| 4 | + | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
| 8 | + | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
| 58 | + | |
| 59 | + | |
| 60 | + | |
| 61 | + | |
| 62 | + | |
| 63 | + | |
| 64 | + | |
| 65 | + | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
| 70 | + | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
| 75 | + | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
| 82 | + | |
| 83 | + | |
| 84 | + | |
| 85 | + | |
| 86 | + | |
| 87 | + | |
| 88 | + | |
| 89 | + | |
| 90 | + | |
| 91 | + | |
| 92 | + | |
| 93 | + | |
| 94 | + | |
| 95 | + | |
| 96 | + | |
0 commit comments