Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions internal/maintenance/job_rescuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,27 +300,33 @@ const (
// when.
func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRow, now time.Time) (jobRetryDecision, time.Time) {
workUnitFactory := s.Config.WorkUnitFactoryFunc(job.Kind)
if workUnitFactory == nil {
s.Logger.ErrorContext(ctx, s.Name+": Attempted to rescue unhandled job kind, discarding",
slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID))
return jobRetryDecisionDiscard, time.Time{}
}

workUnit := workUnitFactory.MakeUnit(job)
if err := workUnit.UnmarshalJob(); err != nil {
s.Logger.ErrorContext(ctx, s.Name+": Error unmarshaling job args: %s"+err.Error(),
slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID))
}
// If a work unit factory is available, use it to check kind-specific
// timeout and next retry. If the kind is unknown (e.g. job was enqueued by
// a different client with different workers), fall through to the default
// client retry policy.
if workUnitFactory != nil {
workUnit := workUnitFactory.MakeUnit(job)
if err := workUnit.UnmarshalJob(); err != nil {
s.Logger.ErrorContext(ctx, s.Name+": Error unmarshaling job args: %s"+err.Error(),
slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID))
}

if workUnit.Timeout() != 0 && now.Sub(*job.AttemptedAt) < workUnit.Timeout() {
return jobRetryDecisionIgnore, time.Time{}
}
if workUnit.Timeout() != 0 && now.Sub(*job.AttemptedAt) < workUnit.Timeout() {
return jobRetryDecisionIgnore, time.Time{}
}

nextRetry := workUnit.NextRetry()
if nextRetry.IsZero() {
nextRetry = s.Config.ClientRetryPolicy.NextRetry(job)
nextRetry := workUnit.NextRetry()
if !nextRetry.IsZero() {
if job.Attempt < max(job.MaxAttempts, 0) {
return jobRetryDecisionRetry, nextRetry
}
return jobRetryDecisionDiscard, time.Time{}
}
}

nextRetry := s.Config.ClientRetryPolicy.NextRetry(job)

if job.Attempt < max(job.MaxAttempts, 0) {
return jobRetryDecisionRetry, nextRetry
}
Expand Down
Loading