Skip to content

Commit 12c6f24

Browse files
refactor(reservations): extract shared ResourcesToBlock to replace duplicated reservation blocking logic (#896)
## Refactor: Extract shared `ResourcesToBlock` to eliminate duplication The resource-blocking logic for Reservations was duplicated between `filter_has_enough_capacity` and the `FlavorGroupCapacity` capacity controller. Any future change to blocking semantics would need to be applied in both places with no enforcement of consistency. This PR extracts the logic into `reservations.ResourcesToBlock(res, ignoreAllocations)` as the single source of truth, and updates both consumers to use it.
1 parent 6001650 commit 12c6f24

5 files changed

Lines changed: 356 additions & 69 deletions

File tree

internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go

Lines changed: 5 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
api "github.com/cobaltcore-dev/cortex/api/external/nova"
1313
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
1414
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
15+
resv "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations"
1516
hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
1617
"k8s.io/apimachinery/pkg/api/resource"
1718
)
@@ -198,69 +199,17 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa
198199
continue
199200
}
200201

201-
// For CR reservations with allocations, compute the effective block:
202+
// CommittedResourceReservations: compute the effective block:
202203
// confirmed = sum of resources for VMs present in both Spec and Status allocations
203204
// specOnly = sum of resources for VMs present in Spec but not yet in Status
204205
// remaining = max(0, Spec.Resources - confirmed) [clamped: never negative]
205206
// block = max(remaining, specOnly) [spec-only VM must be fully covered]
206207
//
207208
// Clamping: if confirmed VMs exceed slot size (e.g. after resize), block = 0.
208209
// Oversize spec-only: if a pending VM is larger than the remaining slot, block its full size.
209-
var resourcesToBlock map[hv1.ResourceName]resource.Quantity
210-
if reservation.Spec.Type == v1alpha1.ReservationTypeCommittedResource &&
211-
// When ignoring allocations (empty-datacenter scenario) VM resources are not
212-
// deducted, so the confirmed-VM adjustment would under-block: always use the
213-
// full slot instead.
214-
!s.Options.IgnoreAllocations &&
215-
// if the reservation is not being migrated, block only unused resources
216-
reservation.Spec.TargetHost == reservation.Status.Host &&
217-
reservation.Spec.CommittedResourceReservation != nil &&
218-
len(reservation.Spec.CommittedResourceReservation.Allocations) > 0 {
219-
confirmedResources := make(map[hv1.ResourceName]resource.Quantity)
220-
specOnlyResources := make(map[hv1.ResourceName]resource.Quantity)
221-
222-
statusAllocs := map[string]string{}
223-
if reservation.Status.CommittedResourceReservation != nil {
224-
statusAllocs = reservation.Status.CommittedResourceReservation.Allocations
225-
}
226-
227-
for instanceUUID, allocation := range reservation.Spec.CommittedResourceReservation.Allocations {
228-
_, isConfirmed := statusAllocs[instanceUUID]
229-
for resourceName, quantity := range allocation.Resources {
230-
if isConfirmed {
231-
existing := confirmedResources[resourceName]
232-
existing.Add(quantity)
233-
confirmedResources[resourceName] = existing
234-
} else {
235-
existing := specOnlyResources[resourceName]
236-
existing.Add(quantity)
237-
specOnlyResources[resourceName] = existing
238-
}
239-
}
240-
}
241-
242-
resourcesToBlock = make(map[hv1.ResourceName]resource.Quantity)
243-
zero := resource.Quantity{}
244-
for resourceName, slotSize := range reservation.Spec.Resources {
245-
confirmed := confirmedResources[resourceName]
246-
specOnly := specOnlyResources[resourceName]
247-
248-
remaining := slotSize.DeepCopy()
249-
remaining.Sub(confirmed)
250-
if remaining.Cmp(zero) < 0 {
251-
remaining = zero.DeepCopy()
252-
}
253-
254-
if specOnly.Cmp(remaining) > 0 {
255-
resourcesToBlock[resourceName] = specOnly.DeepCopy()
256-
} else {
257-
resourcesToBlock[resourceName] = remaining
258-
}
259-
}
260-
} else {
261-
// For other reservation types or CR without allocations, block full resources
262-
resourcesToBlock = reservation.Spec.Resources
263-
}
210+
//
211+
// FailoverReservations: block = Spec.Resources (always fully blocked).
212+
resourcesToBlock := resv.UnusedReservationCapacity(&reservation, s.Options.IgnoreAllocations)
264213

265214
// Block the calculated resources on each host
266215
for host := range hostsToBlock {

internal/scheduling/reservations/capacity/controller.go

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,17 @@ func (c *Controller) reconcileAll(ctx context.Context) error {
8686

8787
azs := availabilityZones(hvList.Items)
8888

89+
// Compute reservation memory blocks once per cycle — shared across all (group × AZ) pairs.
90+
blockedByReservations, err := c.blockedMemoryByHost(ctx)
91+
if err != nil {
92+
logger.Error(err, "failed to compute blocked memory by host, placeable slot counts may be overstated")
93+
blockedByReservations = map[string]int64{}
94+
}
95+
8996
var succeeded, failed int
9097
for groupName, groupData := range flavorGroups {
9198
for _, az := range azs {
92-
if err := c.reconcileOne(ctx, groupName, groupData, az, hvByName, hvList.Items); err != nil {
99+
if err := c.reconcileOne(ctx, groupName, groupData, az, hvByName, hvList.Items, blockedByReservations); err != nil {
93100
logger.Error(err, "failed to reconcile flavor group capacity",
94101
"flavorGroup", groupName, "az", az)
95102
failed++
@@ -118,6 +125,7 @@ func (c *Controller) reconcileOne(
118125
az string,
119126
hvByName map[string]hv1.Hypervisor,
120127
allHVs []hv1.Hypervisor,
128+
blockedByReservations map[string]int64,
121129
) error {
122130

123131
smallestFlavorBytes := int64(groupData.SmallestFlavor.MemoryMB) * 1024 * 1024 //nolint:gosec
@@ -162,8 +170,8 @@ func (c *Controller) reconcileOne(
162170
cur := existingByName[flavor.Name]
163171
cur.FlavorName = flavor.Name
164172

165-
totalVMSlots, totalHosts, totalErr := c.probeScheduler(ctx, flavor, az, c.config.TotalPipeline, hvByName, true)
166-
placeableVMs, placeableHosts, placeableErr := c.probeScheduler(ctx, flavor, az, c.config.PlaceablePipeline, hvByName, false)
173+
totalVMSlots, totalHosts, totalErr := c.probeScheduler(ctx, flavor, az, c.config.TotalPipeline, hvByName, true, nil)
174+
placeableVMs, placeableHosts, placeableErr := c.probeScheduler(ctx, flavor, az, c.config.PlaceablePipeline, hvByName, false, blockedByReservations)
167175

168176
if totalErr != nil {
169177
allFresh = false
@@ -258,14 +266,15 @@ func (c *Controller) reconcileOne(
258266
// probeScheduler calls the scheduler with the given pipeline and returns VM slots + host count.
259267
// Capacity is computed as sum of floor(hostMemory / flavorMemory) across returned hosts.
260268
// When ignoreAllocations is true (total/empty-datacenter probe), raw effective capacity is used.
261-
// When false (placeable probe), hv.Status.Allocation is subtracted first so that slots reflect
262-
// remaining capacity after running VMs.
269+
// When false (placeable probe), hv.Status.Allocation and blockedByReservations are subtracted so
270+
// that slots reflect remaining capacity after running VMs and active reservation blocks.
263271
func (c *Controller) probeScheduler(
264272
ctx context.Context,
265273
flavor compute.FlavorInGroup,
266274
az, pipeline string,
267275
hvByName map[string]hv1.Hypervisor,
268276
ignoreAllocations bool,
277+
blockedByReservations map[string]int64,
269278
) (capacity, hosts int64, err error) {
270279

271280
flavorBytes := int64(flavor.MemoryMB) * 1024 * 1024 //nolint:gosec
@@ -318,6 +327,7 @@ func (c *Controller) probeScheduler(
318327
if alloc, ok := hv.Status.Allocation[hv1.ResourceMemory]; ok {
319328
capBytes -= alloc.Value()
320329
}
330+
capBytes -= blockedByReservations[hostName]
321331
if capBytes < 0 {
322332
capBytes = 0
323333
}
@@ -329,6 +339,43 @@ func (c *Controller) probeScheduler(
329339
return capacity, hosts, nil
330340
}
331341

342+
// blockedMemoryByHost lists all Reservations and returns the total bytes blocked per host name.
343+
// Only placed reservations (TargetHost or Status.Host non-empty) are counted.
344+
// When a reservation is being migrated (TargetHost != Status.Host), both hosts are blocked.
345+
func (c *Controller) blockedMemoryByHost(ctx context.Context) (map[string]int64, error) {
346+
var list v1alpha1.ReservationList
347+
if err := c.client.List(ctx, &list); err != nil {
348+
return nil, fmt.Errorf("failed to list reservations: %w", err)
349+
}
350+
351+
blocked := make(map[string]int64)
352+
for i := range list.Items {
353+
res := &list.Items[i]
354+
355+
hostsToBlock := make(map[string]struct{})
356+
if res.Spec.TargetHost != "" {
357+
hostsToBlock[res.Spec.TargetHost] = struct{}{}
358+
}
359+
if res.Status.Host != "" {
360+
hostsToBlock[res.Status.Host] = struct{}{}
361+
}
362+
if len(hostsToBlock) == 0 {
363+
continue
364+
}
365+
366+
resourcesToBlock := reservations.UnusedReservationCapacity(res, false)
367+
memQty, ok := resourcesToBlock[hv1.ResourceMemory]
368+
if !ok {
369+
continue
370+
}
371+
memBytes := memQty.Value()
372+
for host := range hostsToBlock {
373+
blocked[host] += memBytes
374+
}
375+
}
376+
return blocked, nil
377+
}
378+
332379
// sumCommittedCapacity sums AcceptedSpec.Amount (or Spec.Amount as fallback) across all
333380
// CommittedResource CRDs for the given (flavorGroup, az) pair with an active state
334381
// (guaranteed or confirmed) and resource type memory. Returns the total in slots.

internal/scheduling/reservations/capacity/controller_test.go

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func TestReconcileOne_CreatesCRD(t *testing.T) {
226226
}
227227
hvByName := map[string]hv1.Hypervisor{"host-1": *hv}
228228

229-
if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, hvByName, []hv1.Hypervisor{*hv}); err != nil {
229+
if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, hvByName, []hv1.Hypervisor{*hv}, map[string]int64{}); err != nil {
230230
t.Fatalf("reconcileOne failed: %v", err)
231231
}
232232

@@ -293,7 +293,7 @@ func TestReconcileOne_SetsReadyConditionFalseOnSchedulerError(t *testing.T) {
293293
}
294294

295295
// reconcileOne returns no error itself (it continues on probe failure), but sets Ready=False
296-
if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, map[string]hv1.Hypervisor{}, []hv1.Hypervisor{}); err != nil {
296+
if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, map[string]hv1.Hypervisor{}, []hv1.Hypervisor{}, map[string]int64{}); err != nil {
297297
t.Fatalf("reconcileOne failed: %v", err)
298298
}
299299

@@ -358,11 +358,11 @@ func TestReconcileOne_IdempotentUpdate(t *testing.T) {
358358
hvByName := map[string]hv1.Hypervisor{"host-1": *hv}
359359

360360
// First call
361-
if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, hvByName, []hv1.Hypervisor{*hv}); err != nil {
361+
if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, hvByName, []hv1.Hypervisor{*hv}, map[string]int64{}); err != nil {
362362
t.Fatalf("first reconcileOne failed: %v", err)
363363
}
364364
// Second call — should not error on the already-existing CRD
365-
if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, hvByName, []hv1.Hypervisor{*hv}); err != nil {
365+
if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, hvByName, []hv1.Hypervisor{*hv}, map[string]int64{}); err != nil {
366366
t.Fatalf("second reconcileOne failed: %v", err)
367367
}
368368

@@ -429,7 +429,7 @@ func TestProbeScheduler_CapacityCalculation(t *testing.T) {
429429
}
430430
flavor := compute.FlavorInGroup{Name: "test-flavor", MemoryMB: memMB}
431431

432-
capacity, hosts, err := c.probeScheduler(context.Background(), flavor, "az-a", "test-pipeline", hvByName, true)
432+
capacity, hosts, err := c.probeScheduler(context.Background(), flavor, "az-a", "test-pipeline", hvByName, true, nil)
433433
if err != nil {
434434
t.Fatalf("probeScheduler failed: %v", err)
435435
}
@@ -467,7 +467,7 @@ func TestProbeScheduler_SubtractsAllocationsWhenNotIgnored(t *testing.T) {
467467
flavor := compute.FlavorInGroup{Name: "test-flavor", MemoryMB: memMB}
468468

469469
// Total probe (ignoreAllocations=true): raw capacity → 2 slots.
470-
totalCap, _, err := c.probeScheduler(context.Background(), flavor, "az-a", "total-pipeline", hvByName, true)
470+
totalCap, _, err := c.probeScheduler(context.Background(), flavor, "az-a", "total-pipeline", hvByName, true, nil)
471471
if err != nil {
472472
t.Fatalf("probeScheduler (total) failed: %v", err)
473473
}
@@ -476,7 +476,7 @@ func TestProbeScheduler_SubtractsAllocationsWhenNotIgnored(t *testing.T) {
476476
}
477477

478478
// Placeable probe (ignoreAllocations=false): capacity − allocation → 1 slot.
479-
placeableCap, _, err := c.probeScheduler(context.Background(), flavor, "az-a", "placeable-pipeline", hvByName, false)
479+
placeableCap, _, err := c.probeScheduler(context.Background(), flavor, "az-a", "placeable-pipeline", hvByName, false, nil)
480480
if err != nil {
481481
t.Fatalf("probeScheduler (placeable) failed: %v", err)
482482
}
@@ -576,7 +576,7 @@ func TestReconcileOne_ZeroMemoryFlavorReturnsError(t *testing.T) {
576576
groupData := compute.FlavorGroupFeature{
577577
SmallestFlavor: compute.FlavorInGroup{Name: "bad-flavor", MemoryMB: 0},
578578
}
579-
err := c.reconcileOne(context.Background(), "hana-v2", groupData, "az-a", nil, nil)
579+
err := c.reconcileOne(context.Background(), "hana-v2", groupData, "az-a", nil, nil, nil)
580580
if err == nil {
581581
t.Error("expected error for zero-memory flavor")
582582
}
@@ -648,3 +648,47 @@ func TestSumCommittedCapacity(t *testing.T) {
648648
t.Errorf("sumCommittedCapacity = %d, want 3", got)
649649
}
650650
}
651+
652+
// TestProbeScheduler_SubtractsReservationBlocksWhenNotIgnored verifies that placeable-probe
653+
// slot counting subtracts per-host reservation blocks in addition to hv.Status.Allocation.
654+
func TestProbeScheduler_SubtractsReservationBlocksWhenNotIgnored(t *testing.T) {
655+
const memMB = 4096
656+
const memBytes = int64(memMB) * 1024 * 1024
657+
658+
scheme := newTestScheme(t)
659+
660+
// Host has 3-slot capacity (3 × flavor), 1 slot used by running VM, 1 slot blocked by reservation.
661+
hv := newHypervisor("host-1", "az-a", memBytes*3)
662+
hv.Status.Allocation = map[hv1.ResourceName]resource.Quantity{
663+
hv1.ResourceMemory: *resource.NewQuantity(memBytes, resource.BinarySI),
664+
}
665+
666+
fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()
667+
srv := newMockSchedulerServer(t, []string{"host-1"})
668+
defer srv.Close()
669+
670+
c := NewController(fakeClient, Config{SchedulerURL: srv.URL})
671+
hvByName := map[string]hv1.Hypervisor{"host-1": *hv}
672+
flavor := compute.FlavorInGroup{Name: "test-flavor", MemoryMB: memMB}
673+
674+
// Total probe: raw 3 slots, no subtraction.
675+
totalCap, _, err := c.probeScheduler(context.Background(), flavor, "az-a", "total-pipeline", hvByName, true, nil)
676+
if err != nil {
677+
t.Fatalf("probeScheduler (total) failed: %v", err)
678+
}
679+
if totalCap != 3 {
680+
t.Errorf("total capacity = %d, want 3", totalCap)
681+
}
682+
683+
// Placeable probe with 1 reservation block: 3 - 1 (alloc) - 1 (reservation) = 1 slot.
684+
blockedByReservations := map[string]int64{
685+
"host-1": memBytes, // 1 reservation blocking 1 slot's worth of memory
686+
}
687+
placeableCap, _, err := c.probeScheduler(context.Background(), flavor, "az-a", "placeable-pipeline", hvByName, false, blockedByReservations)
688+
if err != nil {
689+
t.Fatalf("probeScheduler (placeable) failed: %v", err)
690+
}
691+
if placeableCap != 1 {
692+
t.Errorf("placeable capacity = %d, want 1 (3 slots − 1 alloc − 1 reservation)", placeableCap)
693+
}
694+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright SAP SE
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package reservations
5+
6+
import (
7+
hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
8+
"k8s.io/apimachinery/pkg/api/resource"
9+
10+
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
11+
)
12+
13+
// UnusedReservationCapacity returns the resources a Reservation should block on its host(s).
14+
// This is the single source of truth used by both the capacity controller and
15+
// filter_has_enough_capacity to ensure consistent accounting.
16+
//
17+
// CommittedResourceReservations: confirmed VMs already appear in hv.Status.Allocation,
18+
// so blocking the full slot would double-count them. The effective block is:
19+
// max(slot − confirmedVMs, specOnlyVMs), clamped to zero. Skipped (full slot used) when
20+
// ignoreAllocations is true or when mid-migration (TargetHost != Status.Host).
21+
//
22+
// FailoverReservations: always block the full Spec.Resources.
23+
func UnusedReservationCapacity(res *v1alpha1.Reservation, ignoreAllocations bool) map[hv1.ResourceName]resource.Quantity {
24+
if res.Spec.Type == v1alpha1.ReservationTypeCommittedResource &&
25+
!ignoreAllocations &&
26+
res.Spec.TargetHost == res.Status.Host &&
27+
res.Spec.CommittedResourceReservation != nil &&
28+
len(res.Spec.CommittedResourceReservation.Allocations) > 0 {
29+
confirmedResources := make(map[hv1.ResourceName]resource.Quantity)
30+
specOnlyResources := make(map[hv1.ResourceName]resource.Quantity)
31+
32+
statusAllocs := map[string]string{}
33+
if res.Status.CommittedResourceReservation != nil {
34+
statusAllocs = res.Status.CommittedResourceReservation.Allocations
35+
}
36+
37+
for instanceUUID, allocation := range res.Spec.CommittedResourceReservation.Allocations {
38+
_, isConfirmed := statusAllocs[instanceUUID]
39+
for resourceName, quantity := range allocation.Resources {
40+
if isConfirmed {
41+
existing := confirmedResources[resourceName]
42+
existing.Add(quantity)
43+
confirmedResources[resourceName] = existing
44+
} else {
45+
existing := specOnlyResources[resourceName]
46+
existing.Add(quantity)
47+
specOnlyResources[resourceName] = existing
48+
}
49+
}
50+
}
51+
52+
result := make(map[hv1.ResourceName]resource.Quantity)
53+
zero := resource.Quantity{}
54+
for resourceName, slotSize := range res.Spec.Resources {
55+
confirmed := confirmedResources[resourceName]
56+
specOnly := specOnlyResources[resourceName]
57+
58+
remaining := slotSize.DeepCopy()
59+
remaining.Sub(confirmed)
60+
if remaining.Cmp(zero) < 0 {
61+
remaining = zero.DeepCopy()
62+
}
63+
64+
if specOnly.Cmp(remaining) > 0 {
65+
result[resourceName] = specOnly.DeepCopy()
66+
} else {
67+
result[resourceName] = remaining
68+
}
69+
}
70+
return result
71+
} else {
72+
// FailoverReservations are always fully blocked and unused.
73+
return res.Spec.Resources
74+
}
75+
}

0 commit comments

Comments
 (0)