Skip to content

Commit 1b75921

Browse files
authored
Associate loadBalancerIP to the network when specified in the Spec (#82)
1 parent 04bf536 commit 1b75921

File tree

3 files changed

+181
-13
lines changed

3 files changed

+181
-13
lines changed

cloudstack.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,23 @@ package cloudstack
2121

2222
import (
2323
"context"
24+
"encoding/json"
2425
"errors"
2526
"fmt"
2627
"io"
2728
"os"
2829
"strings"
30+
"time"
2931

3032
"github.com/apache/cloudstack-go/v2/cloudstack"
3133
"github.com/blang/semver/v4"
3234
"gopkg.in/gcfg.v1"
35+
36+
corev1 "k8s.io/api/core/v1"
37+
apierrors "k8s.io/apimachinery/pkg/api/errors"
3338
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3439
"k8s.io/apimachinery/pkg/types"
40+
"k8s.io/client-go/kubernetes"
3541
cloudprovider "k8s.io/cloud-provider"
3642
"k8s.io/klog/v2"
3743
)
@@ -329,6 +335,101 @@ func (cs *CSCloud) getNodeNameFromPod(ctx context.Context) (string, error) {
329335
return pod.Spec.NodeName, nil
330336
}
331337

338+
// setServiceAnnotation updates a service annotation using the Kubernetes client.
339+
// It uses a patch operation with retry logic to handle concurrent updates safely.
340+
func (cs *CSCloud) setServiceAnnotation(ctx context.Context, service *corev1.Service, key, value string) error {
341+
if cs.clientBuilder == nil {
342+
klog.V(4).Infof("Client builder not available, skipping annotation update for service %s/%s", service.Namespace, service.Name)
343+
return nil
344+
}
345+
346+
client, err := cs.clientBuilder.Client("cloud-controller-manager")
347+
if err != nil {
348+
return fmt.Errorf("failed to get Kubernetes client: %v", err)
349+
}
350+
351+
// First, check if the annotation already has the correct value to avoid unnecessary updates
352+
svc, err := client.CoreV1().Services(service.Namespace).Get(ctx, service.Name, metav1.GetOptions{})
353+
if err != nil {
354+
if apierrors.IsNotFound(err) {
355+
klog.V(4).Infof("Service %s/%s not found, skipping annotation update", service.Namespace, service.Name)
356+
return nil
357+
}
358+
return fmt.Errorf("failed to get service: %v", err)
359+
}
360+
361+
// Check if annotation already has the correct value
362+
if svc.Annotations != nil {
363+
if existingValue, exists := svc.Annotations[key]; exists && existingValue == value {
364+
klog.V(4).Infof("Annotation %s already set to %s for service %s/%s", key, value, service.Namespace, service.Name)
365+
return nil
366+
}
367+
}
368+
369+
// Use patch operation with retry logic to handle concurrent updates
370+
return cs.patchServiceAnnotation(ctx, client, service.Namespace, service.Name, key, value)
371+
}
372+
373+
// patchServiceAnnotation patches a service annotation using a JSON merge patch with retry logic.
374+
// This method handles concurrent updates safely by retrying on conflicts.
375+
func (cs *CSCloud) patchServiceAnnotation(ctx context.Context, client kubernetes.Interface, namespace, name, key, value string) error {
376+
const maxRetries = 3
377+
const retryDelay = 500 * time.Millisecond
378+
379+
// Prepare the patch payload - merge patch that updates only the specific annotation
380+
// JSON merge patch will preserve other annotations while updating/adding this one
381+
patchData := map[string]interface{}{
382+
"metadata": map[string]interface{}{
383+
"annotations": map[string]string{
384+
key: value,
385+
},
386+
},
387+
}
388+
389+
patchBytes, err := json.Marshal(patchData)
390+
if err != nil {
391+
return fmt.Errorf("failed to marshal patch data: %v", err)
392+
}
393+
394+
for attempt := 0; attempt < maxRetries; attempt++ {
395+
// Apply the patch using JSON merge patch type
396+
// This is atomic and avoids race conditions by merging with existing annotations
397+
_, err = client.CoreV1().Services(namespace).Patch(
398+
ctx,
399+
name,
400+
types.MergePatchType,
401+
patchBytes,
402+
metav1.PatchOptions{},
403+
)
404+
405+
if err == nil {
406+
klog.V(4).Infof("Successfully set annotation %s=%s on service %s/%s", key, value, namespace, name)
407+
return nil
408+
}
409+
410+
// Handle conflict errors with retry logic
411+
if apierrors.IsConflict(err) {
412+
if attempt < maxRetries-1 {
413+
klog.V(4).Infof("Conflict updating service %s/%s annotation, retrying (attempt %d/%d): %v", namespace, name, attempt+1, maxRetries, err)
414+
time.Sleep(retryDelay)
415+
continue
416+
}
417+
return fmt.Errorf("failed to update service annotation after %d retries due to conflicts: %v", maxRetries, err)
418+
}
419+
420+
// Handle not found errors
421+
if apierrors.IsNotFound(err) {
422+
klog.V(4).Infof("Service %s/%s not found during patch, skipping annotation update", namespace, name)
423+
return nil
424+
}
425+
426+
// For other errors, return immediately
427+
return fmt.Errorf("failed to patch service annotation: %v", err)
428+
}
429+
430+
return fmt.Errorf("failed to update service annotation after %d attempts", maxRetries)
431+
}
432+
332433
func (cs *CSCloud) getRegionFromZone(zone string) string {
333434
if cs.region != "" {
334435
return cs.region

cloudstack_loadbalancer.go

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,19 +51,25 @@ const (
5151
// The CIDR list is a comma-separated list of CIDR ranges (e.g., "10.0.0.0/8,192.168.1.0/24").
5252
// If not specified, the default is to allow all sources ("0.0.0.0/0").
5353
ServiceAnnotationLoadBalancerSourceCidrs = "service.beta.kubernetes.io/cloudstack-load-balancer-source-cidrs"
54+
55+
// ServiceAnnotationLoadBalancerIPAssociatedByController indicates that the controller
56+
// associated the IP address. This annotation is set by the controller when it associates
57+
// an unallocated IP, and is used to determine if the IP should be disassociated on deletion.
58+
ServiceAnnotationLoadBalancerIPAssociatedByController = "service.beta.kubernetes.io/cloudstack-load-balancer-ip-associated-by-controller" //nolint:gosec
5459
)
5560

5661
type loadBalancer struct {
5762
*cloudstack.CloudStackClient
5863

59-
name string
60-
algorithm string
61-
hostIDs []string
62-
ipAddr string
63-
ipAddrID string
64-
networkID string
65-
projectID string
66-
rules map[string]*cloudstack.LoadBalancerRule
64+
name string
65+
algorithm string
66+
hostIDs []string
67+
ipAddr string
68+
ipAddrID string
69+
networkID string
70+
projectID string
71+
rules map[string]*cloudstack.LoadBalancerRule
72+
ipAssociatedByController bool
6773
}
6874

6975
// GetLoadBalancer returns whether the specified load balancer exists, and if so, what its status is.
@@ -134,6 +140,14 @@ func (cs *CSCloud) EnsureLoadBalancer(ctx context.Context, clusterName string, s
134140
}
135141
}(lb)
136142
}
143+
144+
// If the controller associated the IP and matches the service spec, set the annotation to persist this information.
145+
if lb.ipAssociatedByController && lb.ipAddr == service.Spec.LoadBalancerIP {
146+
if err := cs.setServiceAnnotation(ctx, service, ServiceAnnotationLoadBalancerIPAssociatedByController, "true"); err != nil {
147+
// Log the error but don't fail - the annotation is helpful but not critical
148+
klog.Warningf("Failed to set annotation on service %s/%s: %v", service.Namespace, service.Name, err)
149+
}
150+
}
137151
}
138152

139153
klog.V(4).Infof("Load balancer %v is associated with IP %v", lb.name, lb.ipAddr)
@@ -360,10 +374,52 @@ func (cs *CSCloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName st
360374
}
361375
}
362376

363-
if lb.ipAddr != "" && lb.ipAddr != service.Spec.LoadBalancerIP {
364-
klog.V(4).Infof("Releasing load balancer IP: %v", lb.ipAddr)
365-
if err := lb.releaseLoadBalancerIP(); err != nil {
366-
return err
377+
if lb.ipAddr != "" {
378+
// If the IP was allocated by the controller (not specified in service spec), release it.
379+
if lb.ipAddr != service.Spec.LoadBalancerIP {
380+
klog.V(4).Infof("Releasing load balancer IP: %v", lb.ipAddr)
381+
if err := lb.releaseLoadBalancerIP(); err != nil {
382+
return err
383+
}
384+
} else {
385+
// If the IP was specified in service spec, check if it was associated by the controller.
386+
// First, check if there's an annotation indicating the controller associated it.
387+
// If not, check if there are any other load balancer rules using this IP.
388+
shouldDisassociate := getBoolFromServiceAnnotation(service, ServiceAnnotationLoadBalancerIPAssociatedByController, false)
389+
390+
if shouldDisassociate {
391+
// Annotation is set, so check if there are any other load balancer rules using this IP.
392+
// Since we've already deleted all rules for this service, any remaining rules must belong
393+
// to other services. If no other rules exist, it's safe to disassociate the IP.
394+
ip, count, err := lb.Address.GetPublicIpAddressByID(lb.ipAddrID)
395+
if err != nil {
396+
klog.Errorf("Error retrieving IP address %v for disassociation check: %v", lb.ipAddr, err)
397+
shouldDisassociate = false
398+
} else if count > 0 && ip.Allocated != "" {
399+
p := lb.LoadBalancer.NewListLoadBalancerRulesParams()
400+
p.SetPublicipid(lb.ipAddrID)
401+
p.SetListall(true)
402+
if lb.projectID != "" {
403+
p.SetProjectid(lb.projectID)
404+
}
405+
otherRules, err := lb.LoadBalancer.ListLoadBalancerRules(p)
406+
if err != nil {
407+
klog.Errorf("Error checking for other load balancer rules using IP %v: %v", lb.ipAddr, err)
408+
shouldDisassociate = false
409+
} else if otherRules.Count > 0 {
410+
// Other load balancer rules are using this IP (other services are using it),
411+
// so don't disassociate.
412+
shouldDisassociate = false
413+
}
414+
}
415+
}
416+
417+
if shouldDisassociate {
418+
klog.V(4).Infof("Disassociating IP %v that was associated by the controller", lb.ipAddr)
419+
if err := lb.releaseLoadBalancerIP(); err != nil {
420+
return err
421+
}
422+
}
367423
}
368424
}
369425

@@ -498,6 +554,7 @@ func (lb *loadBalancer) getPublicIPAddress(loadBalancerIP string) error {
498554

499555
p := lb.Address.NewListPublicIpAddressesParams()
500556
p.SetIpaddress(loadBalancerIP)
557+
p.SetAllocatedonly(false)
501558
p.SetListall(true)
502559

503560
if lb.projectID != "" {
@@ -510,12 +567,16 @@ func (lb *loadBalancer) getPublicIPAddress(loadBalancerIP string) error {
510567
}
511568

512569
if l.Count != 1 {
513-
return fmt.Errorf("could not find IP address %v", loadBalancerIP)
570+
return fmt.Errorf("could not find IP address %v. Found %d addresses", loadBalancerIP, l.Count)
514571
}
515572

516573
lb.ipAddr = l.PublicIpAddresses[0].Ipaddress
517574
lb.ipAddrID = l.PublicIpAddresses[0].Id
518575

576+
// If the IP is not allocated, associate it.
577+
if l.PublicIpAddresses[0].Allocated == "" {
578+
return lb.associatePublicIPAddress()
579+
}
519580
return nil
520581
}
521582

@@ -544,6 +605,10 @@ func (lb *loadBalancer) associatePublicIPAddress() error {
544605
p.SetProjectid(lb.projectID)
545606
}
546607

608+
if lb.ipAddr != "" {
609+
p.SetIpaddress(lb.ipAddr)
610+
}
611+
547612
// Associate a new IP address
548613
r, err := lb.Address.AssociateIpAddress(p)
549614
if err != nil {
@@ -552,6 +617,7 @@ func (lb *loadBalancer) associatePublicIPAddress() error {
552617

553618
lb.ipAddr = r.Ipaddress
554619
lb.ipAddrID = r.Id
620+
lb.ipAssociatedByController = true
555621

556622
return nil
557623
}

deployment.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ rules:
6161
resources:
6262
- services
6363
verbs:
64+
- get
6465
- list
6566
- patch
6667
- update

0 commit comments

Comments
 (0)