|
| 1 | +package service |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "fmt" |
| 6 | + "net/http" |
| 7 | + "time" |
| 8 | + |
| 9 | + corev1 "k8s.io/api/core/v1" |
| 10 | + "k8s.io/apimachinery/pkg/util/runtime" |
| 11 | + "k8s.io/apimachinery/pkg/util/wait" |
| 12 | + coreinformers "k8s.io/client-go/informers/core/v1" |
| 13 | + clientset "k8s.io/client-go/kubernetes" |
| 14 | + corelisters "k8s.io/client-go/listers/core/v1" |
| 15 | + "k8s.io/client-go/tools/cache" |
| 16 | + "k8s.io/client-go/util/workqueue" |
| 17 | + cloudprovider "k8s.io/cloud-provider" |
| 18 | + cloudproviderapp "k8s.io/cloud-provider/app" |
| 19 | + cloudcontrollerconfig "k8s.io/cloud-provider/app/config" |
| 20 | + genericcontrollermanager "k8s.io/controller-manager/app" |
| 21 | + "k8s.io/klog" |
| 22 | + |
| 23 | + "github.com/yunify/qingcloud-cloud-controller-manager/pkg/qingcloud" |
| 24 | +) |
| 25 | + |
| 26 | +const ( |
| 27 | + serviceRunWorkerPeriod = 1 * time.Second |
| 28 | + serviceWorkers = 1 |
| 29 | +) |
| 30 | + |
| 31 | +type ServiceController struct { |
| 32 | + cloud cloudprovider.Interface |
| 33 | + |
| 34 | + serviceLister corelisters.ServiceLister |
| 35 | + serviceListerSynced cache.InformerSynced |
| 36 | + serviceQueue workqueue.RateLimitingInterface |
| 37 | +} |
| 38 | + |
| 39 | +func StartServiceControllerWarpper(completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) cloudproviderapp.InitFunc { |
| 40 | + return func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) { |
| 41 | + return startServiceController(completedConfig, cloud, ctx.Stop) |
| 42 | + } |
| 43 | +} |
| 44 | + |
| 45 | +func startServiceController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) { |
| 46 | + // Start the endpoint controller |
| 47 | + serviceController, err := New( |
| 48 | + cloud, |
| 49 | + ctx.ClientBuilder.ClientOrDie("cloud-service-controller"), |
| 50 | + ctx.SharedInformers.Core().V1().Services(), |
| 51 | + ) |
| 52 | + if err != nil { |
| 53 | + // This error shouldn't fail. It lives like this as a legacy. |
| 54 | + klog.Errorf("Failed to start cloud-service controller: %v", err) |
| 55 | + return nil, false, nil |
| 56 | + } |
| 57 | + |
| 58 | + go serviceController.Run(stopCh, serviceWorkers) |
| 59 | + |
| 60 | + return nil, true, nil |
| 61 | +} |
| 62 | + |
| 63 | +func New( |
| 64 | + cloud cloudprovider.Interface, |
| 65 | + kubeClient clientset.Interface, |
| 66 | + serviceInformer coreinformers.ServiceInformer, |
| 67 | +) (*ServiceController, error) { |
| 68 | + |
| 69 | + sc := &ServiceController{ |
| 70 | + cloud: cloud, |
| 71 | + serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cloud-service"), |
| 72 | + serviceLister: serviceInformer.Lister(), |
| 73 | + serviceListerSynced: serviceInformer.Informer().HasSynced, |
| 74 | + } |
| 75 | + |
| 76 | + serviceInformer.Informer().AddEventHandler( |
| 77 | + cache.ResourceEventHandlerFuncs{ |
| 78 | + UpdateFunc: func(old, cur interface{}) { |
| 79 | + oldSvc, ok1 := old.(*corev1.Service) |
| 80 | + newSvc, ok2 := cur.(*corev1.Service) |
| 81 | + if ok1 && ok2 && sc.needClean(oldSvc, newSvc) { |
| 82 | + sc.enqueueService(old) |
| 83 | + // sc.cleanLBAndListener(oldSvc, newSvc) |
| 84 | + |
| 85 | + } |
| 86 | + }, |
| 87 | + }, |
| 88 | + ) |
| 89 | + |
| 90 | + return sc, nil |
| 91 | +} |
| 92 | + |
| 93 | +// for service which reuse lb, need to clean listener in those situations |
| 94 | +// change lb: lb1 -> lb2, need to clean lb1's listener ==> TODO |
| 95 | +// change service type: loadbalancer -> nodeport/clusterip, if service changed type and delete annotation, need clean lb(auto created lb) or listener(reuse-lb) |
| 96 | +func (sc *ServiceController) needClean(old, new *corev1.Service) bool { |
| 97 | + if old.Spec.Type == corev1.ServiceTypeLoadBalancer { |
| 98 | + if len(old.Annotations) == 0 { |
| 99 | + klog.V(4).Infof("service %s/%s last config has no annotation, cloud-service-controller do nothing!", old.Namespace, old.Name) |
| 100 | + return false |
| 101 | + } |
| 102 | + if new.Annotations == nil { |
| 103 | + new.Annotations = make(map[string]string) |
| 104 | + } |
| 105 | + |
| 106 | + if new.Spec.Type == corev1.ServiceTypeLoadBalancer { |
| 107 | + //TODO: change lb, clean listener for old listener if change lb |
| 108 | + klog.V(4).Infof("service %s/%s type not change, cloud-service-controller do nothing!", old.Namespace, old.Name) |
| 109 | + return false |
| 110 | + } |
| 111 | + |
| 112 | + // change service type |
| 113 | + // reuse-lb, clean listener if new service delete annotation |
| 114 | + oldStrategy := old.Annotations[qingcloud.ServiceAnnotationLoadBalancerPolicy] |
| 115 | + oldLBID := old.Annotations[qingcloud.ServiceAnnotationLoadBalancerID] |
| 116 | + newStrategy := new.Annotations[qingcloud.ServiceAnnotationLoadBalancerPolicy] |
| 117 | + newLBID := new.Annotations[qingcloud.ServiceAnnotationLoadBalancerID] |
| 118 | + if oldStrategy == qingcloud.ReuseExistingLB && oldLBID != "" { |
| 119 | + if newStrategy == oldStrategy && newLBID == oldLBID { |
| 120 | + // new service keep reuse lb annotation, ignore! |
| 121 | + klog.V(4).Infof("service %s/%s keep reuse lb id annotation, cloud-service-controller do nothing!", old.Namespace, old.Name) |
| 122 | + return false |
| 123 | + } |
| 124 | + klog.V(4).Infof("service %s/%s deleted reuse lb id annotation, cloud-service-controller will try to delete lb later!", old.Namespace, old.Name) |
| 125 | + return true |
| 126 | + } |
| 127 | + |
| 128 | + } |
| 129 | + return false |
| 130 | +} |
| 131 | + |
| 132 | +func (sc *ServiceController) enqueueService(obj interface{}) { |
| 133 | + _, ok := obj.(*corev1.Service) |
| 134 | + if !ok { |
| 135 | + return |
| 136 | + } |
| 137 | + sc.serviceQueue.Add(obj) |
| 138 | +} |
| 139 | + |
| 140 | +func (sc *ServiceController) Run(stopCh <-chan struct{}, workers int) { |
| 141 | + defer runtime.HandleCrash() |
| 142 | + defer sc.serviceQueue.ShutDown() |
| 143 | + |
| 144 | + klog.Info("Starting cloud service controller") |
| 145 | + defer klog.Info("Shutting down cloud service controller") |
| 146 | + |
| 147 | + if !cache.WaitForCacheSync(stopCh, sc.serviceListerSynced) { |
| 148 | + return |
| 149 | + } |
| 150 | + |
| 151 | + for i := 0; i < workers; i++ { |
| 152 | + go wait.Until(sc.worker, serviceRunWorkerPeriod, stopCh) |
| 153 | + } |
| 154 | + |
| 155 | + <-stopCh |
| 156 | +} |
| 157 | + |
| 158 | +func (sc *ServiceController) worker() { |
| 159 | + for sc.processNextWorkItem() { |
| 160 | + } |
| 161 | +} |
| 162 | + |
| 163 | +func (sc *ServiceController) processNextWorkItem() bool { |
| 164 | + obj, quit := sc.serviceQueue.Get() |
| 165 | + if quit { |
| 166 | + return false |
| 167 | + } |
| 168 | + defer sc.serviceQueue.Done(obj) |
| 169 | + |
| 170 | + svc, ok := obj.(*corev1.Service) |
| 171 | + if !ok { |
| 172 | + runtime.HandleError(fmt.Errorf("error assert service %v (will retry)", svc)) |
| 173 | + return true |
| 174 | + } |
| 175 | + err := sc.handleServiceUpdate(svc) |
| 176 | + if err == nil { |
| 177 | + sc.serviceQueue.Forget(obj) |
| 178 | + return true |
| 179 | + } |
| 180 | + |
| 181 | + runtime.HandleError(fmt.Errorf("error processing service %s/%s (will retry): %v", svc.Namespace, svc.Name, err)) |
| 182 | + sc.serviceQueue.AddRateLimited(obj) |
| 183 | + |
| 184 | + return true |
| 185 | +} |
| 186 | + |
| 187 | +func (sc *ServiceController) handleServiceUpdate(svc *corev1.Service) error { |
| 188 | + startTime := time.Now() |
| 189 | + defer func() { |
| 190 | + klog.V(4).Infof("Finished handleEndpointsUpdate %s/%s (%v)", svc.Namespace, svc.Name, time.Since(startTime)) |
| 191 | + }() |
| 192 | + |
| 193 | + cloudLbIntf, _ := sc.cloud.LoadBalancer() |
| 194 | + |
| 195 | + listenerName := fmt.Sprintf("listener_%s_%s_", svc.Namespace, svc.Name) |
| 196 | + lbID := svc.Annotations[qingcloud.ServiceAnnotationLoadBalancerID] |
| 197 | + |
| 198 | + klog.Infof("service %s/%s type changed, and loadbalancer annotation has been deleted, try to deleting listener %s of loadbalancer %s", |
| 199 | + svc.Namespace, svc.Name, listenerName, lbID) |
| 200 | + err := cloudLbIntf.EnsureLoadBalancerDeleted(context.TODO(), "", svc) |
| 201 | + if err != nil { |
| 202 | + return fmt.Errorf("delete listener %s of loadbalancer %s for service %s/%s error: %v", listenerName, lbID, svc.Namespace, svc.Name, err) |
| 203 | + } |
| 204 | + |
| 205 | + klog.Infof("delete listener %s of loadbalancer %s for service %s/%s successful", listenerName, lbID, svc.Namespace, svc.Name) |
| 206 | + return nil |
| 207 | +} |
0 commit comments