Skip to content

Commit ccef7c2

Browse files
batch queue: add queue initialization to server (#27953)
1 parent 2641634 commit ccef7c2

10 files changed

Lines changed: 176 additions & 53 deletions

File tree

nomad/fsm.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/hashicorp/go-msgpack/v2/codec"
1919
"github.com/hashicorp/nomad/helper/pointer"
2020
"github.com/hashicorp/nomad/helper/uuid"
21+
"github.com/hashicorp/nomad/nomad/queues"
2122
"github.com/hashicorp/nomad/nomad/state"
2223
"github.com/hashicorp/nomad/nomad/structs"
2324
"github.com/hashicorp/nomad/scheduler"
@@ -132,6 +133,7 @@ type SnapshotRestorers map[SnapshotType]SnapshotRestorer
132133
// this outside the Server to avoid exposing this outside the package.
133134
type nomadFSM struct {
134135
evalBroker *EvalBroker
136+
batchQueue queues.Queue
135137
blockedEvals *BlockedEvals
136138
periodicDispatcher *PeriodicDispatch
137139
encrypter *Encrypter
@@ -170,6 +172,9 @@ type FSMConfig struct {
170172
// EvalBroker is the evaluation broker evaluations should be added to
171173
EvalBroker *EvalBroker
172174

175+
// BatchQueue is the configured queue for batch job registrations
176+
BatchQueue queues.Queue
177+
173178
// Periodic is the periodic job dispatcher that periodic jobs should be
174179
// added/removed from
175180
Periodic *PeriodicDispatch
@@ -215,6 +220,7 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) {
215220

216221
fsm := &nomadFSM{
217222
evalBroker: config.EvalBroker,
223+
batchQueue: config.BatchQueue,
218224
periodicDispatcher: config.Periodic,
219225
blockedEvals: config.Blocked,
220226
encrypter: config.Encrypter,
@@ -972,7 +978,11 @@ func (n *nomadFSM) handleUpsertedEval(eval *structs.Evaluation) {
972978
}
973979

974980
if eval.ShouldEnqueue() {
975-
n.evalBroker.Enqueue(eval)
981+
if eval.Type == structs.JobTypeBatch && eval.TriggeredBy == structs.EvalTriggerJobRegister {
982+
n.batchQueue.Enqueue(eval)
983+
} else {
984+
n.evalBroker.Enqueue(eval)
985+
}
976986
} else if eval.ShouldBlock() {
977987
n.blockedEvals.Block(eval)
978988
} else if eval.Status == structs.EvalStatusComplete &&

nomad/leader.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
409409
// the operator.
410410
restoreEvals := s.handleEvalBrokerStateChange(schedulerConfig)
411411

412+
s.batchJobQueue.SetEnabled(true, s.State())
413+
412414
// Enable the deployment watcher, since we are now the leader
413415
s.deploymentWatcher.SetEnabled(true, s.State())
414416

@@ -1447,6 +1449,8 @@ func (s *Server) revokeLeadership() error {
14471449
// Disable the deployment watcher as it is only useful as a leader.
14481450
s.deploymentWatcher.SetEnabled(false, nil)
14491451

1452+
s.batchJobQueue.SetEnabled(false, nil)
1453+
14501454
// Disable the node drainer
14511455
s.nodeDrainer.SetEnabled(false, nil)
14521456

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"context"
99
"errors"
1010
"sync"
11+
"sync/atomic"
1112
"time"
1213

1314
"github.com/hashicorp/go-hclog"
@@ -24,7 +25,7 @@ type DynamicPriorityQueue struct {
2425
// tenants is used to keep track of cluster usage for this queue.
2526
// When workloads are placed or the configured interval is passed,
2627
// cluster usage is updated for the workloads of each tenant.
27-
tenants map[TenantID]Tenant
28+
tenants map[TenantID]*Tenant
2829

2930
// queue is the main datastructure that contains all pending workloads
3031
//
@@ -49,25 +50,27 @@ type DynamicPriorityQueue struct {
4950
// totalUsage is the sum of all tenant usages
5051
totalUsage int
5152

53+
tenantType structs.BatchQueueTenant
54+
55+
metadataKey string
56+
5257
// conf contains user configurations for tuning the behavior of the queue
53-
conf *DynamicPriorityConfig
58+
conf *structs.DynamicQueueConfig
5459

5560
// evalBroker is the injected broker for passing an evaluation
5661
// on to be scheduled by Nomad
57-
evalBroker Queue
62+
evalBroker Broker
63+
64+
// enabled tracks whether the server running the batch job queue is the leader
65+
// so should process evaluations
66+
enabled atomic.Bool
5867

5968
// state is the in-memory state store used for both reconciling tenant
6069
// workload usages, and polling submitted evaluations for placement
6170
state *state.StateStore
6271
logger hclog.Logger
6372
}
6473

65-
type DynamicPriorityConfig struct {
66-
TenantType string
67-
MetadataKey string
68-
CalcInterval time.Duration
69-
}
70-
7174
type Tenant struct {
7275
tid TenantID
7376
workloads map[string]*Workload
@@ -87,33 +90,46 @@ func (w *Workload) calculatePriority(_ int64) {
8790
// unimplemented
8891
}
8992

90-
func NewDynamicPriorityQueue(state *state.StateStore, broker Queue, conf *DynamicPriorityConfig, logger hclog.Logger) *DynamicPriorityQueue {
93+
func NewDynamicPriorityQueue(broker Broker, qconf *structs.BatchQueue, conf *structs.DynamicQueueConfig, logger hclog.Logger) *DynamicPriorityQueue {
9194
return &DynamicPriorityQueue{
92-
tenants: map[TenantID]Tenant{},
93-
queue: WorkloadQueue{},
94-
state: state,
95-
enqueueCh: make(chan *Workload, 8096),
96-
evalBroker: broker,
97-
qMux: sync.Mutex{},
98-
qNotify: make(chan struct{}, 1),
99-
conf: conf,
100-
logger: logger.Named("Dynamic Priority Queue"),
95+
tenants: make(map[TenantID]*Tenant),
96+
queue: WorkloadQueue{},
97+
enqueueCh: make(chan *Workload, 8192),
98+
evalBroker: broker,
99+
qMux: sync.Mutex{},
100+
qNotify: make(chan struct{}, 1),
101+
tenantType: qconf.TenantType,
102+
metadataKey: qconf.MetadataKey,
103+
conf: conf,
104+
logger: logger.Named("Dynamic Priority Queue"),
105+
totalUsage: 0,
101106
}
102107
}
103108

104-
func (d *DynamicPriorityQueue) Start(ctx context.Context) {
105-
// rebuild internal state from statestore, unimplemented
106-
109+
func (d *DynamicPriorityQueue) Start(ctx context.Context) error {
107110
go d.runProducer(ctx)
108111
go d.runConsumer(ctx)
112+
113+
return nil
114+
}
115+
116+
func (d *DynamicPriorityQueue) SetEnabled(val bool, state *state.StateStore) {
117+
// rebuild internal state from statestore, unimplemented
118+
d.state = state
119+
d.enabled.Store(val)
109120
}
110121

111122
// Enqueue is the method used to put evaluations on the queue.
112123
// It generates a workload with an empty priority, appends it
113124
// to an internal channel to be processed and added to the actual
114125
// heap container.
115126
func (d *DynamicPriorityQueue) Enqueue(e *structs.Evaluation) {
127+
if !d.enabled.Load() {
128+
return
129+
}
130+
116131
w := d.generateWorkload(e)
132+
117133
// in the event of an empty workload, just pass eval to eval broker
118134
if w == nil {
119135
d.evalBroker.Enqueue(e)
@@ -143,6 +159,10 @@ func (d *DynamicPriorityQueue) runProducer(ctx context.Context) {
143159
default:
144160
}
145161
case <-time.After(d.conf.CalcInterval):
162+
if !d.enabled.Load() {
163+
continue
164+
}
165+
146166
d.qMux.Lock()
147167
d.calculatePriorities(time.Now().UnixNano())
148168
heap.Init(&d.queue)
@@ -199,11 +219,11 @@ func (d *DynamicPriorityQueue) generateWorkload(e *structs.Evaluation) *Workload
199219
}
200220

201221
tid := ""
202-
switch d.conf.TenantType {
222+
switch d.tenantType {
203223
case "namespace":
204224
tid = job.Namespace
205225
case "metadata":
206-
tenantID, ok := job.Meta[d.conf.MetadataKey]
226+
tenantID, ok := job.Meta[d.metadataKey]
207227
if !ok {
208228
return nil
209229
}

nomad/queues/batch_job_queue_test.go renamed to nomad/queues/dynamic_priority_queue_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ func TestWaitForPlacement(t *testing.T) {
2121

2222
t.Run("returns if eval complete", func(t *testing.T) {
2323
ss := state.TestStateStore(t)
24-
testQueue := NewDynamicPriorityQueue(ss, nil, &DynamicPriorityConfig{}, hclog.New(hclog.DefaultOptions))
24+
testQueue := NewDynamicPriorityQueue(nil, &structs.BatchQueue{}, &structs.DynamicQueueConfig{}, hclog.New(hclog.DefaultOptions))
25+
testQueue.SetEnabled(true, ss)
2526

2627
testEval := mock.Eval()
2728
ss.UpsertEvals(structs.MsgTypeTestSetup, 0, []*structs.Evaluation{testEval})
@@ -43,7 +44,8 @@ func TestWaitForPlacement(t *testing.T) {
4344

4445
t.Run("continues watching blocked evals", func(t *testing.T) {
4546
ss := state.TestStateStore(t)
46-
testQueue := NewDynamicPriorityQueue(ss, nil, &DynamicPriorityConfig{}, hclog.New(hclog.DefaultOptions))
47+
testQueue := NewDynamicPriorityQueue(nil, &structs.BatchQueue{}, &structs.DynamicQueueConfig{}, hclog.New(hclog.DefaultOptions))
48+
testQueue.SetEnabled(true, ss)
4749

4850
testEval := mock.Eval()
4951
blocked := mock.Eval()
@@ -88,7 +90,8 @@ func TestWaitForPlacement(t *testing.T) {
8890

8991
t.Run("continues watching next evals after eval failure", func(t *testing.T) {
9092
ss := state.TestStateStore(t)
91-
testQueue := NewDynamicPriorityQueue(ss, nil, &DynamicPriorityConfig{}, hclog.New(hclog.DefaultOptions))
93+
testQueue := NewDynamicPriorityQueue(nil, &structs.BatchQueue{}, &structs.DynamicQueueConfig{}, hclog.New(hclog.DefaultOptions))
94+
testQueue.SetEnabled(true, ss)
9295

9396
testEval := mock.Eval()
9497
next := mock.Eval()

nomad/queues/interface.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,20 @@
33

44
package queues
55

6-
import "github.com/hashicorp/nomad/nomad/structs"
6+
import (
7+
"context"
8+
9+
"github.com/hashicorp/nomad/nomad/state"
10+
"github.com/hashicorp/nomad/nomad/structs"
11+
)
712

813
type Queue interface {
914
Enqueue(*structs.Evaluation)
15+
Start(context.Context) error
16+
SetEnabled(bool, *state.StateStore)
17+
}
18+
19+
// Broker is the interface for an evaluation broker
20+
type Broker interface {
21+
Enqueue(*structs.Evaluation)
1022
}

nomad/queues/passthrough_queue.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright IBM Corp. 2015, 2026
2+
// SPDX-License-Identifier: BUSL-1.1
3+
4+
package queues
5+
6+
import (
7+
"context"
8+
9+
"github.com/hashicorp/nomad/nomad/state"
10+
"github.com/hashicorp/nomad/nomad/structs"
11+
)
12+
13+
type PassthroughQueue struct {
14+
broker Broker
15+
}
16+
17+
func NewPassthroughQueue(b Broker) *PassthroughQueue {
18+
return &PassthroughQueue{
19+
broker: b,
20+
}
21+
}
22+
23+
// Start is a noop for the passthrough implementation
24+
func (p *PassthroughQueue) Start(context.Context) error { return nil }
25+
26+
func (p *PassthroughQueue) Enqueue(e *structs.Evaluation) { p.broker.Enqueue(e) }
27+
28+
func (p *PassthroughQueue) SetEnabled(bool, *state.StateStore) {}

nomad/queues/queue.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Copyright IBM Corp. 2015, 2026
2+
// SPDX-License-Identifier: BUSL-1.1
3+
4+
package queues
5+
6+
import (
7+
"github.com/hashicorp/go-hclog"
8+
"github.com/hashicorp/nomad/nomad/structs"
9+
)
10+
11+
func NewQueue(sconf *structs.BatchQueue, broker Broker, logger hclog.Logger) (Queue, error) {
12+
switch sconf.Type {
13+
case structs.BatchQueueTypeDynamic:
14+
qconf := &structs.DynamicQueueConfig{}
15+
if err := structs.DecodeBatchQueueConf(sconf.Config, qconf); err != nil {
16+
return nil, err
17+
}
18+
return NewDynamicPriorityQueue(broker, sconf, qconf, logger), nil
19+
default:
20+
return NewPassthroughQueue(broker), nil
21+
}
22+
}

nomad/server.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"github.com/hashicorp/nomad/nomad/drainer"
4848
"github.com/hashicorp/nomad/nomad/lock"
4949
"github.com/hashicorp/nomad/nomad/peers"
50+
"github.com/hashicorp/nomad/nomad/queues"
5051
"github.com/hashicorp/nomad/nomad/reporting"
5152
"github.com/hashicorp/nomad/nomad/state"
5253
"github.com/hashicorp/nomad/nomad/structs"
@@ -214,6 +215,10 @@ type Server struct {
214215
// that are waiting to be brokered to a sub-scheduler
215216
evalBroker *EvalBroker
216217

218+
// batchJobQueue is the interface for enqueuing job
219+
// register evaluations on a queue implementation
220+
batchJobQueue queues.Queue
221+
217222
// brokerLock is used to synchronise the alteration of the blockedEvals and
218223
// evalBroker enabled state. These two subsystems change state when
219224
// leadership changes or when the user modifies the setting via the
@@ -482,13 +487,31 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc
482487
Encrypter: s.encrypter,
483488
})
484489

490+
// Creates the batch job queue
491+
s.batchJobQueue, err = queues.NewQueue(
492+
&s.config.DefaultSchedulerConfig.BatchQueue,
493+
evalBroker,
494+
logger,
495+
)
496+
if err != nil {
497+
s.Shutdown()
498+
s.logger.Error("failed to create batch job queue", "error", err)
499+
return nil, fmt.Errorf("Failed to create batch jo queue: %v", err)
500+
}
501+
485502
// Initialize the Raft server
486503
if err := s.setupRaft(); err != nil {
487504
s.Shutdown()
488505
s.logger.Error("failed to start Raft", "error", err)
489506
return nil, fmt.Errorf("Failed to start Raft: %v", err)
490507
}
491508

509+
if err := s.batchJobQueue.Start(s.shutdownCtx); err != nil {
510+
s.Shutdown()
511+
s.logger.Error("failed to start batch job queue", "error", err)
512+
return nil, fmt.Errorf("Failed to start batcj job queue: %v", err)
513+
}
514+
492515
// Initialize the wan Serf
493516
s.serf, err = s.setupSerf(config.SerfConfig, s.eventCh, serfSnapshot)
494517
if err != nil {
@@ -1356,6 +1379,7 @@ func (s *Server) setupRaft() error {
13561379
// Create the FSM
13571380
fsmConfig := &FSMConfig{
13581381
EvalBroker: s.evalBroker,
1382+
BatchQueue: s.batchJobQueue,
13591383
Periodic: s.periodicDispatcher,
13601384
Blocked: s.blockedEvals,
13611385
Encrypter: s.encrypter,

0 commit comments

Comments
 (0)