Skip to content

Commit 586d260

Browse files
authored
allow pilot.JobGetAvailable to be called even if paused (#839)
1 parent fa9b795 commit 586d260

2 files changed

Lines changed: 15 additions & 10 deletions

File tree

producer.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -530,9 +530,6 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit
530530
case jobID := <-p.cancelCh:
531531
p.maybeCancelJob(jobID)
532532
case <-fetchLimiter.C():
533-
if p.paused {
534-
continue
535-
}
536533
p.innerFetchLoop(workCtx, fetchResultCh)
537534
// Ensure we can't start another fetch when fetchCtx is done, even if
538535
// the fetchLimiter is also ready to fire:
@@ -556,13 +553,18 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit
556553
}
557554

558555
func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) {
559-
limit := p.maxJobsToFetch()
560-
if limit <= 0 {
561-
// We have no slots for new jobs, so don't bother fetching. However, since
562-
// we knew it was time to fetch, we keep track of what happened so we can
563-
// trigger another fetch as soon as we have open slots.
564-
p.fetchWhenSlotsAreAvailable = true
565-
return
556+
var limit int
557+
if p.paused {
558+
limit = 0
559+
} else {
560+
limit = p.maxJobsToFetch()
561+
if limit <= 0 {
562+
// We have no slots for new jobs, so don't bother fetching. However, since
563+
// we knew it was time to fetch, we keep track of what happened so we can
564+
// trigger another fetch as soon as we have open slots.
565+
p.fetchWhenSlotsAreAvailable = true
566+
return
567+
}
566568
}
567569

568570
go p.dispatchWork(workCtx, limit, fetchResultCh)

rivershared/riverpilot/standard.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ type StandardPilot struct {
1414
}
1515

1616
func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Executor, state ProducerState, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
17+
if params.Max <= 0 {
18+
return nil, nil
19+
}
1720
return exec.JobGetAvailable(ctx, params)
1821
}
1922

0 commit comments

Comments
 (0)