Skip to content

Commit fc9ff3b

Browse files
committed
Refactored ApplyAggregates
This is a preparation to keep track of the aggregate ids. The old functions are unused so let's remove them.
1 parent 6fb73a9 commit fc9ff3b

4 files changed

Lines changed: 116 additions & 144 deletions

File tree

internal/controller/aggregates_controller_test.go

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,6 @@ var _ = Describe("AggregatesController", func() {
6363
}
6464
`
6565

66-
AggregatesPostBody = `
67-
{
68-
"aggregate": {
69-
"name": "test-aggregate1",
70-
"availability_zone": "",
71-
"deleted": false,
72-
"id": 42
73-
}
74-
}`
75-
7666
AggregateRemoveHostBody = `
7767
{
7868
"aggregate": {
@@ -153,27 +143,30 @@ var _ = Describe("AggregatesController", func() {
153143
Expect(result).To(Equal(ctrl.Result{}))
154144
})
155145

156-
Context("Adding new Aggregate", func() {
146+
Context("Adding to existing Aggregate", func() {
157147
BeforeEach(func(ctx SpecContext) {
158-
By("Setting a missing aggregate")
148+
By("Setting a desired aggregate")
159149
hypervisor := &kvmv1.Hypervisor{}
160150
Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed())
161151
hypervisor.Spec.Aggregates = []string{"test-aggregate1"}
162152
Expect(k8sClient.Update(ctx, hypervisor)).To(Succeed())
163153

164-
By("Mocking GetAggregates to return empty list")
154+
By("Mocking GetAggregates to return aggregate without host")
155+
aggregateList := `{
156+
"aggregates": [
157+
{
158+
"name": "test-aggregate1",
159+
"availability_zone": "",
160+
"deleted": false,
161+
"id": 42,
162+
"hosts": []
163+
}
164+
]
165+
}`
165166
fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) {
166167
w.Header().Add("Content-Type", "application/json")
167168
w.WriteHeader(http.StatusOK)
168-
_, err := fmt.Fprint(w, AggregateListBodyEmpty)
169-
Expect(err).NotTo(HaveOccurred())
170-
})
171-
172-
By("Mocking CreateAggregate")
173-
fakeServer.Mux.HandleFunc("POST /os-aggregates", func(w http.ResponseWriter, r *http.Request) {
174-
w.Header().Add("Content-Type", "application/json")
175-
w.WriteHeader(http.StatusOK)
176-
_, err := fmt.Fprint(w, AggregatesPostBody)
169+
_, err := fmt.Fprint(w, aggregateList)
177170
Expect(err).NotTo(HaveOccurred())
178171
})
179172

@@ -322,7 +315,7 @@ var _ = Describe("AggregatesController", func() {
322315
It("should set error condition", func(ctx SpecContext) {
323316
_, err := aggregatesController.Reconcile(ctx, reconcileRequest)
324317
Expect(err).To(HaveOccurred())
325-
sharedErrorConditionChecks(ctx, "failed to get aggregates")
318+
sharedErrorConditionChecks(ctx, "failed to list aggregates")
326319
})
327320
})
328321

internal/controller/decomission_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ var _ = Describe("Decommission Controller", func() {
410410
It("should set decommissioning condition with error", func(ctx SpecContext) {
411411
_, err := decommissionReconciler.Reconcile(ctx, reconcileReq)
412412
Expect(err).NotTo(HaveOccurred())
413-
sharedDecommissioningErrorCheck(ctx, "cannot list aggregates")
413+
sharedDecommissioningErrorCheck(ctx, "failed to list aggregates")
414414
})
415415
})
416416

internal/openstack/aggregates.go

Lines changed: 56 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -26,145 +26,86 @@ import (
2626
"github.com/gophercloud/gophercloud/v2/openstack/compute/v2/aggregates"
2727
)
2828

29-
// GetAggregatesByName retrieves all aggregates from nova and returns them as a map keyed by name.
30-
func GetAggregatesByName(ctx context.Context, serviceClient *gophercloud.ServiceClient) (map[string]*aggregates.Aggregate, error) {
31-
pages, err := aggregates.List(serviceClient).AllPages(ctx)
32-
if err != nil {
33-
return nil, fmt.Errorf("cannot list aggregates due to %w", err)
34-
}
35-
36-
aggs, err := aggregates.ExtractAggregates(pages)
37-
if err != nil {
38-
return nil, fmt.Errorf("cannot list aggregates due to %w", err)
39-
}
40-
41-
aggregateMap := make(map[string]*aggregates.Aggregate, len(aggs))
42-
for _, aggregate := range aggs {
43-
aggregateMap[aggregate.Name] = &aggregate
44-
}
45-
return aggregateMap, nil
46-
}
47-
48-
// AddToAggregate adds the given host to the named aggregate, creating the aggregate if it does not yet exist.
49-
func AddToAggregate(ctx context.Context, serviceClient *gophercloud.ServiceClient, aggs map[string]*aggregates.Aggregate, host, name, zone string) (err error) {
50-
aggregate, found := aggs[name]
51-
log := logger.FromContext(ctx)
52-
if !found {
53-
aggregate, err = aggregates.Create(ctx, serviceClient,
54-
aggregates.CreateOpts{
55-
Name: name,
56-
AvailabilityZone: zone,
57-
}).Extract()
58-
if err != nil {
59-
return fmt.Errorf("failed to create aggregate %v due to %w", name, err)
60-
}
61-
aggs[name] = aggregate
62-
}
63-
64-
if slices.Contains(aggregate.Hosts, host) {
65-
log.Info("Found host in aggregate", "host", host, "name", name)
66-
return nil
67-
}
68-
69-
result, err := aggregates.AddHost(ctx, serviceClient, aggregate.ID, aggregates.AddHostOpts{Host: host}).Extract()
70-
if err != nil {
71-
return fmt.Errorf("failed to add host %v to aggregate %v due to %w", host, name, err)
72-
}
73-
log.Info("Added host to aggregate", "host", host, "name", name)
74-
aggs[name] = result
75-
76-
return nil
77-
}
78-
79-
// RemoveFromAggregate removes the given host from the named aggregate.
80-
func RemoveFromAggregate(ctx context.Context, serviceClient *gophercloud.ServiceClient, aggs map[string]*aggregates.Aggregate, host, name string) error {
81-
aggregate, found := aggs[name]
82-
log := logger.FromContext(ctx)
83-
if !found {
84-
log.Info("cannot find aggregate", "name", name)
85-
return nil
86-
}
87-
88-
found = false
89-
for _, aggHost := range aggregate.Hosts {
90-
if aggHost == host {
91-
found = true
92-
}
93-
}
94-
95-
if !found {
96-
log.Info("cannot find host in aggregate", "host", host, "name", name)
97-
return nil
98-
}
99-
100-
result, err := aggregates.RemoveHost(ctx, serviceClient, aggregate.ID, aggregates.RemoveHostOpts{Host: host}).Extract()
101-
if err != nil {
102-
return fmt.Errorf("failed to add host %v to aggregate %v due to %w", host, name, err)
103-
}
104-
aggs[name] = result
105-
log.Info("removed host from aggregate", "host", host, "name", name)
106-
107-
return nil
108-
}
109-
11029
// ApplyAggregates ensures a host is in exactly the specified aggregates.
111-
// It adds the host to missing aggregates and removes it from extra ones.
30+
//
31+
// The function performs a two-phase operation to maintain security:
32+
// 1. First, adds the host to all desired aggregates (if not already present)
33+
// 2. Then, removes the host from any aggregates it shouldn't be in
34+
//
35+
// This ordering prevents leaving the host unprotected between operations when
36+
// aggregates have filter criteria. However, conflicts may still occur with
37+
// aggregates in different availability zones, in which case errors are collected
38+
// and returned together for eventual convergence.
39+
//
40+
// All specified aggregates must already exist in OpenStack. If any desired
41+
// aggregate is not found, an error is returned listing the missing aggregates.
42+
//
11243
// Pass an empty list to remove the host from all aggregates.
113-
// Aggregates must already exist - this function will not create them.
11444
func ApplyAggregates(ctx context.Context, serviceClient *gophercloud.ServiceClient, host string, desiredAggregates []string) error {
11545
log := logger.FromContext(ctx)
11646

117-
aggs, err := GetAggregatesByName(ctx, serviceClient)
47+
// Fetch all aggregates
48+
pages, err := aggregates.List(serviceClient).AllPages(ctx)
49+
if err != nil {
50+
return fmt.Errorf("failed to list aggregates: %w", err)
51+
}
52+
53+
allAggregates, err := aggregates.ExtractAggregates(pages)
11854
if err != nil {
119-
return fmt.Errorf("failed to get aggregates: %w", err)
55+
return fmt.Errorf("failed to extract aggregates: %w", err)
12056
}
12157

122-
// Build desired set for O(1) lookups
58+
// Build desired set for lookups
12359
desiredSet := make(map[string]bool, len(desiredAggregates))
12460
for _, name := range desiredAggregates {
12561
desiredSet[name] = true
12662
}
12763

128-
// We need to add the host to aggregates first, because if we first drop
129-
// an aggregate with a filter criterion and then add a new one, we leave the host
130-
// open for a period of time. Still, this may fail due to a conflict of aggregates
131-
// with different availability zones, so we collect all the errors and return them
132-
// so it hopefully will converge eventually.
13364
var errs []error
134-
var toRemove []string
135-
136-
// First, add to any desired aggregates (including creating them if needed)
137-
for _, name := range desiredAggregates {
138-
aggregate, exists := aggs[name]
139-
if !exists || !slices.Contains(aggregate.Hosts, host) {
140-
// Aggregate doesn't exist or host not in it - add immediately
141-
log.Info("Adding to aggregate", "aggregate", name, "host", host)
142-
if err := AddToAggregate(ctx, serviceClient, aggs, host, name, ""); err != nil {
143-
errs = append(errs, err)
65+
var toRemove []aggregates.Aggregate
66+
67+
// Single pass: handle adds immediately, collect removes for later
68+
for _, agg := range allAggregates {
69+
hostInAggregate := slices.Contains(agg.Hosts, host)
70+
aggregateDesired := desiredSet[agg.Name]
71+
72+
if aggregateDesired {
73+
// Mark as found
74+
delete(desiredSet, agg.Name)
75+
76+
if !hostInAggregate {
77+
// Add host to this aggregate
78+
log.Info("Adding to aggregate", "aggregate", agg.Name, "host", host)
79+
_, err := aggregates.AddHost(ctx, serviceClient, agg.ID, aggregates.AddHostOpts{Host: host}).Extract()
80+
if err != nil {
81+
errs = append(errs, fmt.Errorf("failed to add host %v to aggregate %v: %w", host, agg.Name, err))
82+
}
14483
}
84+
} else if hostInAggregate {
85+
// Collect for removal (after all adds complete)
86+
toRemove = append(toRemove, agg)
14587
}
14688
}
14789

148-
// Second, collect aggregates to remove from (host is in but shouldn't be)
149-
for name, aggregate := range aggs {
150-
if slices.Contains(aggregate.Hosts, host) && !desiredSet[name] {
151-
toRemove = append(toRemove, name)
90+
// Error if any desired aggregates don't exist
91+
if len(desiredSet) > 0 {
92+
var missing []string
93+
for name := range desiredSet {
94+
missing = append(missing, name)
15295
}
96+
errs = append(errs, fmt.Errorf("aggregates not found: %v", missing))
15397
}
15498

155-
// Remove after all additions are complete
99+
// Remove host from unwanted aggregates (after all adds complete)
156100
if len(toRemove) > 0 {
157-
log.Info("Removing from aggregates", "aggregates", toRemove, "host", host)
158-
for _, name := range toRemove {
159-
if err := RemoveFromAggregate(ctx, serviceClient, aggs, host, name); err != nil {
160-
errs = append(errs, err)
101+
for _, agg := range toRemove {
102+
log.Info("Removing from aggregate", "aggregate", agg.Name, "host", host)
103+
_, err := aggregates.RemoveHost(ctx, serviceClient, agg.ID, aggregates.RemoveHostOpts{Host: host}).Extract()
104+
if err != nil {
105+
errs = append(errs, fmt.Errorf("failed to remove host %v from aggregate %v: %w", host, agg.Name, err))
161106
}
162107
}
163108
}
164109

165-
if len(errs) > 0 {
166-
return fmt.Errorf("encountered errors during aggregate update: %w", errors.Join(errs...))
167-
}
168-
169-
return nil
110+
return errors.Join(errs...)
170111
}

internal/openstack/aggregates_test.go

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ var _ = Describe("ApplyAggregates", func() {
226226
serviceClient := client.ServiceClient(fakeServer)
227227
err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1"})
228228
Expect(err).To(HaveOccurred())
229-
Expect(err.Error()).To(ContainSubstring("failed to get aggregates"))
229+
Expect(err.Error()).To(ContainSubstring("failed to list aggregates"))
230230
})
231231
})
232232

@@ -248,7 +248,7 @@ var _ = Describe("ApplyAggregates", func() {
248248
serviceClient := client.ServiceClient(fakeServer)
249249
err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2", "agg3"})
250250
Expect(err).To(HaveOccurred())
251-
Expect(err.Error()).To(ContainSubstring("encountered errors during aggregate update"))
251+
Expect(err.Error()).To(ContainSubstring("failed to add host"))
252252
})
253253
})
254254

@@ -270,7 +270,7 @@ var _ = Describe("ApplyAggregates", func() {
270270
serviceClient := client.ServiceClient(fakeServer)
271271
err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2"})
272272
Expect(err).To(HaveOccurred())
273-
Expect(err.Error()).To(ContainSubstring("encountered errors during aggregate update"))
273+
Expect(err.Error()).To(ContainSubstring("failed to remove host"))
274274
})
275275
})
276276

@@ -321,9 +321,47 @@ var _ = Describe("ApplyAggregates", func() {
321321
serviceClient := client.ServiceClient(fakeServer)
322322
err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg2", "agg3"})
323323
Expect(err).To(HaveOccurred())
324-
Expect(err.Error()).To(ContainSubstring("encountered errors during aggregate update"))
325324
// Verify it's a joined error with multiple failures
326-
Expect(err.Error()).To(Or(ContainSubstring("Cannot add"), ContainSubstring("Cannot remove")))
325+
Expect(err.Error()).To(And(ContainSubstring("failed to add host"), ContainSubstring("failed to remove host")))
326+
})
327+
})
328+
329+
Context("when desired aggregate does not exist", func() {
330+
BeforeEach(func() {
331+
fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) {
332+
w.Header().Add("Content-Type", "application/json")
333+
w.WriteHeader(http.StatusOK)
334+
fmt.Fprint(w, aggregateListWithHost)
335+
})
336+
})
337+
338+
It("should return an error about missing aggregate", func() {
339+
serviceClient := client.ServiceClient(fakeServer)
340+
err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg1", "agg2", "agg4"})
341+
Expect(err).To(HaveOccurred())
342+
Expect(err.Error()).To(ContainSubstring("aggregates not found"))
343+
Expect(err.Error()).To(ContainSubstring("agg4"))
344+
})
345+
})
346+
347+
Context("when multiple desired aggregates do not exist", func() {
348+
BeforeEach(func() {
349+
fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) {
350+
w.Header().Add("Content-Type", "application/json")
351+
w.WriteHeader(http.StatusOK)
352+
fmt.Fprint(w, aggregateListWithHost)
353+
})
354+
})
355+
356+
It("should return an error listing all missing aggregates", func() {
357+
serviceClient := client.ServiceClient(fakeServer)
358+
err := ApplyAggregates(ctx, serviceClient, "test-host", []string{"agg4", "agg5", "agg6"})
359+
Expect(err).To(HaveOccurred())
360+
Expect(err.Error()).To(ContainSubstring("aggregates not found"))
361+
// Check that all missing aggregates are mentioned in the error
362+
Expect(err.Error()).To(ContainSubstring("agg4"))
363+
Expect(err.Error()).To(ContainSubstring("agg5"))
364+
Expect(err.Error()).To(ContainSubstring("agg6"))
327365
})
328366
})
329367
})

0 commit comments

Comments
 (0)