Skip to content

Commit 2641634

Browse files
committed
batch job queue: adds initial empty implementation (#27841)
1 parent de132a9 commit 2641634

5 files changed

Lines changed: 482 additions & 1 deletion

File tree

ci/test-core.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@
3636
"nomad/auth/...",
3737
"nomad/deploymentwatcher/...",
3838
"nomad/drainer/...",
39-
"nomad/reporting/...",
4039
"nomad/lock/...",
4140
"nomad/peers/...",
41+
"nomad/queues/...",
42+
"nomad/reporting/...",
4243
"nomad/state/...",
4344
"nomad/stream/...",
4445
"nomad/structs/...",

nomad/queues/batch_job_queue.go

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
// Copyright IBM Corp. 2015, 2026
2+
// SPDX-License-Identifier: BUSL-1.1
3+
4+
package queues
5+
6+
import (
7+
"container/heap"
8+
"context"
9+
"errors"
10+
"sync"
11+
"time"
12+
13+
"github.com/hashicorp/go-hclog"
14+
"github.com/hashicorp/go-memdb"
15+
"github.com/hashicorp/nomad/nomad/state"
16+
"github.com/hashicorp/nomad/nomad/structs"
17+
)
18+
19+
var ErrWatchedEvalNotFound = errors.New("watched evaluation not found")
20+
21+
type TenantID string
22+
23+
type DynamicPriorityQueue struct {
24+
// tenants is used to keep track of cluster usage for this queue.
25+
// When workloads are placed or the configured interval is passed,
26+
// cluster usage is updated for the workloads of each tenant.
27+
tenants map[TenantID]Tenant
28+
29+
// queue is the main datastructure that contains all pending workloads
30+
//
31+
// TODO: at the moment, this is using the go stdlib container/heap package,
32+
// but we may want to switch to treeset from Hashicorp's go-set.
33+
// Why? Both have O(logn) push/pop. Heap has constant time peeking, but
34+
// we don't use that. We do want to iterate over workloads quickly, which
35+
// we can do with a red-black tree.
36+
queue WorkloadQueue
37+
38+
// qMux locks the queue during concurrent access
39+
qMux sync.Mutex
40+
41+
// qNotify allows for notifying the consumer that workloads
42+
// have been added to the queue
43+
qNotify chan struct{}
44+
45+
// enqueueCh is used to buffer workloads before they
46+
// are processed by the manager and pushed onto the queue
47+
enqueueCh chan *Workload
48+
49+
// totalUsage is the sum of all tenant usages
50+
totalUsage int
51+
52+
// conf contains user configurations for tuning the behavior of the queue
53+
conf *DynamicPriorityConfig
54+
55+
// evalBroker is the injected broker for passing an evaluation
56+
// on to be scheduled by Nomad
57+
evalBroker Queue
58+
59+
// state is the in-memory state store used for both reconciling tenant
60+
// workload usages, and polling submitted evaluations for placement
61+
state *state.StateStore
62+
logger hclog.Logger
63+
}
64+
65+
type DynamicPriorityConfig struct {
66+
TenantType string
67+
MetadataKey string
68+
CalcInterval time.Duration
69+
}
70+
71+
type Tenant struct {
72+
tid TenantID
73+
workloads map[string]*Workload
74+
usage int
75+
}
76+
77+
type Workload struct {
78+
id string
79+
tid TenantID
80+
priority int
81+
eval *structs.Evaluation
82+
size int
83+
index int
84+
}
85+
86+
func (w *Workload) calculatePriority(_ int64) {
87+
// unimplemented
88+
}
89+
90+
func NewDynamicPriorityQueue(state *state.StateStore, broker Queue, conf *DynamicPriorityConfig, logger hclog.Logger) *DynamicPriorityQueue {
91+
return &DynamicPriorityQueue{
92+
tenants: map[TenantID]Tenant{},
93+
queue: WorkloadQueue{},
94+
state: state,
95+
enqueueCh: make(chan *Workload, 8096),
96+
evalBroker: broker,
97+
qMux: sync.Mutex{},
98+
qNotify: make(chan struct{}, 1),
99+
conf: conf,
100+
logger: logger.Named("Dynamic Priority Queue"),
101+
}
102+
}
103+
104+
func (d *DynamicPriorityQueue) Start(ctx context.Context) {
105+
// rebuild internal state from statestore, unimplemented
106+
107+
go d.runProducer(ctx)
108+
go d.runConsumer(ctx)
109+
}
110+
111+
// Enqueue is the method used to put evaluations on the queue.
112+
// It generates a workload with an empty priority, appends it
113+
// to an internal channel to be processed and added to the actual
114+
// heap container.
115+
func (d *DynamicPriorityQueue) Enqueue(e *structs.Evaluation) {
116+
w := d.generateWorkload(e)
117+
// in the event of an empty workload, just pass eval to eval broker
118+
if w == nil {
119+
d.evalBroker.Enqueue(e)
120+
return
121+
}
122+
123+
d.enqueueCh <- w
124+
}
125+
126+
// runProducer pushes workloads onto the queue and notifies the consumer
127+
// goroutine. It also updates priorities on the configured interval.
128+
func (d *DynamicPriorityQueue) runProducer(ctx context.Context) {
129+
for {
130+
select {
131+
case <-ctx.Done():
132+
return
133+
case w := <-d.enqueueCh:
134+
w.calculatePriority(w.eval.CreateTime)
135+
136+
d.qMux.Lock()
137+
heap.Push(&d.queue, w)
138+
d.qMux.Unlock()
139+
140+
// Notify Workload consumer of new workload
141+
select {
142+
case d.qNotify <- struct{}{}:
143+
default:
144+
}
145+
case <-time.After(d.conf.CalcInterval):
146+
d.qMux.Lock()
147+
d.calculatePriorities(time.Now().UnixNano())
148+
heap.Init(&d.queue)
149+
d.qMux.Unlock()
150+
}
151+
}
152+
}
153+
154+
// runConsumer pops the highest priority workloads off the queue one
155+
// at a time, enqueues them onto the Eval Broker, and waits for them
156+
// to be placed before continuing.
157+
func (d *DynamicPriorityQueue) runConsumer(ctx context.Context) {
158+
for {
159+
select {
160+
case <-ctx.Done():
161+
return
162+
case <-d.qNotify:
163+
164+
// Pop a workload off the queue if available
165+
d.qMux.Lock()
166+
workload := heap.Pop(&d.queue).(*Workload)
167+
d.qMux.Unlock()
168+
169+
// Give the eval to the eval broker
170+
d.evalBroker.Enqueue(workload.eval)
171+
172+
// Wait for the eval to be placed
173+
err := d.waitForPlacement(ctx, workload.eval, memdb.NewWatchSet())
174+
if err != nil {
175+
d.logger.Error("failure waiting for workload placement", "evalID", workload.eval)
176+
}
177+
178+
d.qMux.Lock()
179+
l := d.queue.Len()
180+
d.qMux.Unlock()
181+
182+
// If the queue still has work, notify self
183+
// to continue.
184+
if l > 0 {
185+
select {
186+
case d.qNotify <- struct{}{}:
187+
default:
188+
}
189+
}
190+
}
191+
}
192+
}
193+
194+
// generateWorkload is used to create an initial workload from a given evaluation
195+
func (d *DynamicPriorityQueue) generateWorkload(e *structs.Evaluation) *Workload {
196+
job, err := d.state.JobByID(nil, e.Namespace, e.JobID)
197+
if err != nil {
198+
return nil
199+
}
200+
201+
tid := ""
202+
switch d.conf.TenantType {
203+
case "namespace":
204+
tid = job.Namespace
205+
case "metadata":
206+
tenantID, ok := job.Meta[d.conf.MetadataKey]
207+
if !ok {
208+
return nil
209+
}
210+
tid = tenantID
211+
default:
212+
d.logger.Error("unknown tenant type, this is a bug.")
213+
return nil
214+
}
215+
216+
return &Workload{
217+
tid: TenantID(tid),
218+
priority: 0,
219+
eval: e,
220+
size: 0,
221+
}
222+
}
223+
224+
func (d *DynamicPriorityQueue) calculatePriorities(time int64) {
225+
// Decay tenant workload usages first, because a workload's
226+
// priority relies on its tenant's usage.
227+
for _, tenant := range d.tenants {
228+
for range tenant.workloads {
229+
// Unimplemented
230+
d.totalUsage -= 0
231+
tenant.usage -= 0
232+
}
233+
}
234+
235+
// Now that we have accurate tenant usage, calculate
236+
// each workloads new priority
237+
for _, workload := range d.queue {
238+
workload.calculatePriority(time)
239+
}
240+
}
241+
242+
// waitForPlacement follows a given evalutation in the state store until it, or it's nexted/blocked evals
243+
// have been marked terminal, indicating the workload has been scheduled.
244+
//
245+
// Note: If a job with an unsatisfiable contraint is given to the Eval Broker, this function will block
246+
// until a Nomad operator manually intervenes and stops the job. In the future, we can add an optional
247+
// configurable timeout for this blocking query.
248+
func (d *DynamicPriorityQueue) waitForPlacement(ctx context.Context, eval *structs.Evaluation, ws memdb.WatchSet) error {
249+
for !eval.TerminalStatus() || eval.BlockedEval != "" || eval.NextEval != "" {
250+
id := eval.ID
251+
252+
if eval.BlockedEval != "" {
253+
id = eval.BlockedEval
254+
} else if eval.NextEval != "" {
255+
id = eval.NextEval
256+
}
257+
258+
snap, err := d.state.Snapshot()
259+
if err != nil {
260+
return err
261+
}
262+
263+
// TODO: handle snapshot restores
264+
abandonCh := snap.AbandonCh()
265+
ws.Add(abandonCh)
266+
267+
eval, err = snap.EvalByID(ws, id)
268+
if err != nil {
269+
return err
270+
}
271+
if eval == nil {
272+
return ErrWatchedEvalNotFound
273+
}
274+
275+
if eval.TerminalStatus() {
276+
continue
277+
}
278+
279+
// If the latest version of the eval isn't terminal, wait for an update
280+
if err = ws.WatchCtx(ctx); err != nil {
281+
return err
282+
}
283+
284+
// The watch channel will be closed, we should delete it to
285+
// prevent immediately firing on the next WatchCtx
286+
for k := range ws {
287+
delete(ws, k)
288+
}
289+
}
290+
291+
return nil
292+
}

0 commit comments

Comments
 (0)