Skip to content

Commit 31114e0

Browse files
committed
Pop multiple requests from the queue if possible
1 parent 86474c4 commit 31114e0

1 file changed

Lines changed: 31 additions & 29 deletions

File tree

internal/scheduling/policy_default.go

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,42 +32,44 @@ func (p *DefaultLocalPolicy) OnCompletion(_ *function.Function, _ *function.Exec
3232

3333
p.queue.Lock()
3434
defer p.queue.Unlock()
35-
if p.queue.len() == 0 {
36-
return
37-
}
3835

39-
// TODO: loop
40-
req := p.queue.front()
36+
tryDequeueing := !p.queue.isEmpty()
4137

42-
containerID, _, err := node.AcquireContainer(req.Fun, true)
43-
if err == nil {
44-
p.queue.dequeue()
45-
log.Printf("[%s] Exec warm from the queue (length=%d)\n", req, p.queue.len())
46-
execLocally(req, containerID, true)
47-
return
48-
}
38+
for tryDequeueing {
39+
req := p.queue.front()
4940

50-
if errors.Is(err, node.NoWarmFoundErr) {
51-
if node.AcquireResourcesForNewContainer(req.Fun, false) {
52-
log.Printf("[%s] Cold start from the queue\n", req)
41+
containerID, _, err := node.AcquireContainer(req.Fun, true)
42+
if err == nil {
5343
p.queue.dequeue()
44+
log.Printf("[%s] Exec warm from the queue (length=%d)\n", req, p.queue.len())
45+
execLocally(req, containerID, true)
46+
continue
47+
}
5448

55-
// This avoids blocking the thread during the cold
56-
// start, but also allows us to check for resource
57-
// availability before dequeueing
58-
node.NewContainerWithAcquiredResourcesAsync(req.Fun, func(c *container.Container) {
59-
execLocally(req, c, false)
60-
}, func(e error) {
61-
dropRequest(req)
62-
})
49+
if errors.Is(err, node.NoWarmFoundErr) {
50+
if node.AcquireResourcesForNewContainer(req.Fun, false) {
51+
log.Printf("[%s] Cold start from the queue\n", req)
52+
p.queue.dequeue()
53+
54+
// This avoids blocking the thread during the cold
55+
// start, but also allows us to check for resource
56+
// availability before dequeueing
57+
node.NewContainerWithAcquiredResourcesAsync(req.Fun, func(c *container.Container) {
58+
execLocally(req, c, false)
59+
}, func(e error) {
60+
dropRequest(req)
61+
})
62+
}
63+
} else if errors.Is(err, node.OutOfResourcesErr) {
64+
tryDequeueing = false
65+
} else {
66+
// other error
67+
log.Printf("%v", err)
68+
p.queue.dequeue()
69+
dropRequest(req)
6370
}
64-
} else if errors.Is(err, node.OutOfResourcesErr) {
65-
} else {
66-
// other error
67-
log.Printf("%v", err)
68-
p.queue.dequeue()
69-
dropRequest(req)
7071
}
72+
7173
}
7274

7375
// OnArrival for default policy is executed every time a function is invoked, before invoking the function

0 commit comments

Comments
 (0)