From c9c2f6b8542cc1a76e95cd6817b91191e08f415c Mon Sep 17 00:00:00 2001 From: Brandur Date: Tue, 28 Apr 2026 18:05:52 -0500 Subject: [PATCH] Drop rescuer check on unknown job kind Drops the check the job rescuer on unknown job kind. This allows multiple independent Rivers to operate together within a single schema because when they do, they all share a single elected leader, so that leader has to be able to work on behalf of all of them. --- internal/maintenance/job_rescuer.go | 38 +++++++++++++++++------------ 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/internal/maintenance/job_rescuer.go b/internal/maintenance/job_rescuer.go index c7353b36..40cd8050 100644 --- a/internal/maintenance/job_rescuer.go +++ b/internal/maintenance/job_rescuer.go @@ -300,27 +300,33 @@ const ( // when. func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRow, now time.Time) (jobRetryDecision, time.Time) { workUnitFactory := s.Config.WorkUnitFactoryFunc(job.Kind) - if workUnitFactory == nil { - s.Logger.ErrorContext(ctx, s.Name+": Attempted to rescue unhandled job kind, discarding", - slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID)) - return jobRetryDecisionDiscard, time.Time{} - } - workUnit := workUnitFactory.MakeUnit(job) - if err := workUnit.UnmarshalJob(); err != nil { - s.Logger.ErrorContext(ctx, s.Name+": Error unmarshaling job args: %s"+err.Error(), - slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID)) - } + // If a work unit factory is available, use it to check kind-specific + // timeout and next retry. If the kind is unknown (e.g. job was enqueued by + // a different client with different workers), fall through to the default + // client retry policy. + if workUnitFactory != nil { + workUnit := workUnitFactory.MakeUnit(job) + if err := workUnit.UnmarshalJob(); err != nil { + s.Logger.ErrorContext(ctx, s.Name+": Error unmarshaling job args: %s"+err.Error(), + slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID)) + } - if workUnit.Timeout() != 0 && now.Sub(*job.AttemptedAt) < workUnit.Timeout() { - return jobRetryDecisionIgnore, time.Time{} - } + if workUnit.Timeout() != 0 && now.Sub(*job.AttemptedAt) < workUnit.Timeout() { + return jobRetryDecisionIgnore, time.Time{} + } - nextRetry := workUnit.NextRetry() - if nextRetry.IsZero() { - nextRetry = s.Config.ClientRetryPolicy.NextRetry(job) + nextRetry := workUnit.NextRetry() + if !nextRetry.IsZero() { + if job.Attempt < max(job.MaxAttempts, 0) { + return jobRetryDecisionRetry, nextRetry + } + return jobRetryDecisionDiscard, time.Time{} + } } + nextRetry := s.Config.ClientRetryPolicy.NextRetry(job) + if job.Attempt < max(job.MaxAttempts, 0) { return jobRetryDecisionRetry, nextRetry }