Skip to content

Commit 5fa9556

Browse files
authored
Merge pull request #128 from qiangzii/master
support change loadbalancer eip;
2 parents d4ffc45 + cf71afa commit 5fa9556

7 files changed

Lines changed: 236 additions & 46 deletions

File tree

docs/configure.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,9 @@ spec:
155155
- `Cluster`: 如果`service`中不显式指定 `externalTrafficPolicy` 字段的值,则默认为`Cluster`;这种模式下,可以通过给服务添加相关注解来指定LB监听器backend的添加规则
156156

157157
`Cluster`模式下,目前支持的 `service` 注解有:
158-
- 使用指定Label的Worker节点作为后端服务器, `service.beta.kubernetes.io/qingcloud-lb-backend-label`,可以指定多个Label,多个Label以逗号分隔。例如:`key1=value1,key2=value2`,多个Label之间是And关系。同时,在需要成为后端服务器的Worker节点打上`key1=value1,key2=value2`的Label;只有服务指定的所有Label的key和value都和Worker节点匹配时,Worker节点会被选为服务的后端服务器;没有此注解则添加所有Worker节点为backend
158+
- 使用指定Label的Worker节点作为后端服务器, `service.beta.kubernetes.io/qingcloud-lb-backend-label`,可以指定多个Label,多个Label以逗号分隔。例如:`key1=value1,key2=value2`,多个Label之间是And关系。同时,在需要成为后端服务器的Worker节点打上`key1=value1,key2=value2`的Label;只有服务指定的所有Label的key和value都和Worker节点匹配时,Worker节点会被选为服务的后端服务器;没有此注解则添加所有Worker节点为backend;通过注解过滤节点后,如果没有满足条件的节点,为了避免服务中断,会添加所有Worker节点为后端服务器;
159+
160+
> 本章节所说的"所有Worker节点"特指所有 `Ready` 状态的Worker节点;
159161

160162
### 参考示例
161163
#### Local模式

pkg/controllers/endpoint/endpoint_controller.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,10 @@ func (epc *EndpointController) handleEndpointsUpdate(key string) error {
177177
}
178178
// ignore service which service type != loadbalancer or externalTrafficPolicy != Local
179179
if svc.Spec.Type != corev1.ServiceTypeLoadBalancer || svc.Spec.ExternalTrafficPolicy != corev1.ServiceExternalTrafficPolicyTypeLocal {
180-
klog.Infof("service %s serviceType = %s, externalTrafficPolicy = %s, skip handle endpoint update", svc.Name, svc.Spec.Type, svc.Spec.ExternalTrafficPolicy)
180+
klog.V(4).Infof("service %s serviceType = %s, externalTrafficPolicy = %s, skip handle endpoint update", svc.Name, svc.Spec.Type, svc.Spec.ExternalTrafficPolicy)
181181
return nil
182182
}
183+
klog.Infof("service %s serviceType = %s, externalTrafficPolicy = %s, going to handle endpoint update", svc.Name, svc.Spec.Type, svc.Spec.ExternalTrafficPolicy)
183184

184185
// 2. get node list
185186
var nodes []*corev1.Node

pkg/executor/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ type QingCloudClientInterface interface {
4949
CreateLB(input *apis.LoadBalancer) (*apis.LoadBalancer, error)
5050
DeleteLB(id *string) error
5151
UpdateLB(id *string) error
52+
AssociateEIPsToLB(id *string, eips []*string) error
53+
DissociateEIPsFromLB(id *string, eips []*string) error
5254

5355
//sg
5456
GetSecurityGroupByName(name string) (*apis.SecurityGroup, error)

pkg/executor/lb.go

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ func convertLoadBalancerStatus(lb *qcservice.LoadBalancer) apis.LoadBalancerStat
6565
}
6666

6767
func convertLoadBalancer(lb *qcservice.LoadBalancer) *apis.LoadBalancer {
68+
var eipsID []*string
69+
for _, eip := range lb.Cluster {
70+
if *eip.EIPID != "" {
71+
eipsID = append(eipsID, eip.EIPID)
72+
}
73+
}
6874
return &apis.LoadBalancer{
6975
Spec: apis.LoadBalancerSpec{
7076
LoadBalancerName: lb.LoadBalancerName,
@@ -73,7 +79,7 @@ func convertLoadBalancer(lb *qcservice.LoadBalancer) *apis.LoadBalancer {
7379
VxNetID: lb.VxNetID,
7480
PrivateIPs: lb.PrivateIPs,
7581
SecurityGroups: lb.SecurityGroupID,
76-
//TODO fill eip
82+
EIPs: eipsID,
7783
},
7884
Status: convertLoadBalancerStatus(lb),
7985
}
@@ -258,7 +264,11 @@ func (q *QingCloudClient) DeleteLB(id *string) error {
258264
return nil
259265
}
260266

261-
if err != nil {
267+
if output != nil && err == nil {
268+
if *output.RetCode != 0 {
269+
return fmt.Errorf("failed to delete lb %s, code=%d, message=%s", *id, *output.RetCode, *output.Message)
270+
}
271+
262272
err = qcclient.WaitJob(q.jobService, *output.JobID, operationWaitTimeout, waitInterval)
263273
if err != nil {
264274
return fmt.Errorf("lb %s delete job not completed", *id)
@@ -267,3 +277,62 @@ func (q *QingCloudClient) DeleteLB(id *string) error {
267277

268278
return nil
269279
}
280+
281+
func (q *QingCloudClient) AssociateEIPsToLB(id *string, eips []*string) error {
282+
var err error
283+
var output *qcservice.AssociateEIPsToLoadBalancerOutput
284+
285+
if len(eips) == 0 {
286+
return nil
287+
}
288+
289+
output, err = q.LBService.AssociateEIPsToLoadBalancer(&qcservice.AssociateEIPsToLoadBalancerInput{
290+
EIPs: eips,
291+
LoadBalancer: id,
292+
})
293+
if err != nil {
294+
return fmt.Errorf("associate eip %s to lb %s error: %v", spew.Sdump(eips), *id, err)
295+
}
296+
297+
if output != nil {
298+
if *output.RetCode != 0 {
299+
return fmt.Errorf("associate eip %s to lb %s failed, code=%d, message=%s", spew.Sdump(eips), *id, *output.RetCode, *output.Message)
300+
}
301+
302+
err = qcclient.WaitJob(q.jobService, *output.JobID, operationWaitTimeout, waitInterval)
303+
if err != nil {
304+
return fmt.Errorf("associate eip %s to lb %s job not completed, err %v", spew.Sdump(eips), *id, err)
305+
}
306+
}
307+
308+
return nil
309+
}
310+
func (q *QingCloudClient) DissociateEIPsFromLB(id *string, eips []*string) error {
311+
var err error
312+
var output *qcservice.DissociateEIPsFromLoadBalancerOutput
313+
314+
if len(eips) == 0 {
315+
return nil
316+
}
317+
318+
output, err = q.LBService.DissociateEIPsFromLoadBalancer(&qcservice.DissociateEIPsFromLoadBalancerInput{
319+
EIPs: eips,
320+
LoadBalancer: id,
321+
})
322+
if err != nil {
323+
return fmt.Errorf("dissociate eips %s from lb %s error: %v", spew.Sdump(eips), *id, err)
324+
}
325+
326+
if output != nil {
327+
if *output.RetCode != 0 {
328+
return fmt.Errorf("dissociate eip %s from lb %s failed, code=%d, message=%s", spew.Sdump(eips), *id, *output.RetCode, *output.Message)
329+
}
330+
331+
err = qcclient.WaitJob(q.jobService, *output.JobID, operationWaitTimeout, waitInterval)
332+
if err != nil {
333+
return fmt.Errorf("dissociate eip %s from lb %s job not completed, err %v", spew.Sdump(eips), *id, err)
334+
}
335+
}
336+
337+
return nil
338+
}

pkg/qingcloud/annotations.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,14 +173,14 @@ func (qc *QingCloud) ParseServiceLBConfig(cluster string, service *v1.Service) (
173173
}
174174

175175
networkType := annotation[ServiceAnnotationLoadBalancerNetworkType]
176+
if config.VxNetID == nil && qc.Config.DefaultVxNetForLB != "" {
177+
config.VxNetID = qcservice.String(qc.Config.DefaultVxNetForLB)
178+
}
176179
switch networkType {
177180
case NetworkModePublic:
178181
config.NetworkType = networkType
179182
case NetworkModeInternal:
180183
config.NetworkType = networkType
181-
if config.VxNetID == nil && qc.Config.DefaultVxNetForLB != "" {
182-
config.VxNetID = qcservice.String(qc.Config.DefaultVxNetForLB)
183-
}
184184
default:
185185
config.NetworkType = NetworkModePublic
186186
}

pkg/qingcloud/qingcloud.go

Lines changed: 10 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -220,11 +220,18 @@ func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service *
220220
modify = true
221221
}
222222

223+
// update eips
224+
err = qc.updateLBEip(conf, lb)
225+
if err != nil {
226+
klog.Errorf("update eip for lb %s error %v", *lb.Status.LoadBalancerID, err)
227+
return nil, err
228+
}
229+
223230
//update listener
224231
listenerIDs := filterListeners(lb.Status.LoadBalancerListeners, conf.listenerName)
225232
klog.Infof("The loadbalancer %s has the following listeners %s", *lb.Status.LoadBalancerID, spew.Sdump(listenerIDs))
226233
if len(listenerIDs) <= 0 {
227-
klog.Infof("create listeners for loadbalancers %s, service ports %s", *lb.Status.LoadBalancerID, spew.Sdump(service.Spec.Ports))
234+
klog.Infof("creating listeners for loadbalancers %s, service ports %s", *lb.Status.LoadBalancerID, spew.Sdump(service.Spec.Ports))
228235
if err = qc.createListenersAndBackends(conf, lb, service.Spec.Ports, nodes); err != nil {
229236
klog.Errorf("createListenersAndBackends for loadbalancer %s error: %v", *lb.Status.LoadBalancerID, err)
230237
return nil, err
@@ -300,46 +307,10 @@ func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service *
300307
//1. create lb
301308
//1.1 prepare eip
302309
if len(conf.EipIDs) <= 0 && conf.EipSource != nil {
303-
var (
304-
eip *apis.EIP
305-
)
306-
307-
switch *conf.EipSource {
308-
case AllocateOnly:
309-
eip, err = qc.Client.AllocateEIP(nil)
310-
case UseAvailableOnly:
311-
eips, err := qc.Client.GetAvaliableEIPs()
312-
if err != nil {
313-
return nil, err
314-
}
315-
316-
if len(eips) <= 0 {
317-
return nil, fmt.Errorf("no avaliable eips")
318-
}
319-
320-
eip = eips[0]
321-
case UseAvailableOrAllocateOne:
322-
eips, err := qc.Client.GetAvaliableEIPs()
323-
if err != nil {
324-
return nil, err
325-
}
326-
327-
if len(eips) <= 0 {
328-
eip, err = qc.Client.AllocateEIP(nil)
329-
if err != nil {
330-
return nil, err
331-
}
332-
} else {
333-
eip = eips[0]
334-
}
335-
}
336-
310+
eip, err := qc.prepareEip(conf.EipSource)
337311
if err != nil {
338312
return nil, err
339-
} else if eip == nil {
340-
return nil, fmt.Errorf("has no eip")
341313
}
342-
343314
conf.EipIDs = []*string{eip.Status.EIPID}
344315
}
345316
//1.2 prepare sg
@@ -368,7 +339,7 @@ func (qc *QingCloud) EnsureLoadBalancer(ctx context.Context, _ string, service *
368339
}
369340

370341
if len(lb.Status.VIP) <= 0 {
371-
return nil, fmt.Errorf("loadbalance has not vip, please spec it")
342+
return nil, fmt.Errorf("loadbalance has no vip, please spec it")
372343
}
373344

374345
err = qc.Client.UpdateLB(lb.Status.LoadBalancerID)

pkg/qingcloud/qingcloud_utils.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package qingcloud
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/davecgh/go-spew/spew"
7+
"k8s.io/klog/v2"
8+
9+
"github.com/yunify/qingcloud-cloud-controller-manager/pkg/apis"
10+
)
11+
12+
func (qc *QingCloud) prepareEip(eipSource *string) (eip *apis.EIP, err error) {
13+
14+
switch *eipSource {
15+
case AllocateOnly:
16+
eip, err = qc.Client.AllocateEIP(nil)
17+
case UseAvailableOnly:
18+
eips, err := qc.Client.GetAvaliableEIPs()
19+
if err != nil {
20+
return nil, err
21+
}
22+
23+
if len(eips) <= 0 {
24+
return nil, fmt.Errorf("no avaliable eips")
25+
}
26+
27+
eip = eips[0]
28+
case UseAvailableOrAllocateOne:
29+
eips, err := qc.Client.GetAvaliableEIPs()
30+
if err != nil {
31+
return nil, err
32+
}
33+
34+
if len(eips) <= 0 {
35+
eip, err = qc.Client.AllocateEIP(nil)
36+
if err != nil {
37+
return nil, err
38+
}
39+
} else {
40+
eip = eips[0]
41+
}
42+
}
43+
44+
if err != nil {
45+
return nil, err
46+
} else if eip == nil {
47+
return nil, fmt.Errorf("has no eip")
48+
}
49+
return eip, nil
50+
}
51+
52+
func (qc *QingCloud) diffLBEip(config *LoadBalancerConfig, lb *apis.LoadBalancer) (eipsToAdd, eipsToDel []*string, err error) {
53+
54+
// lb eip
55+
lbEipMap := make(map[string]bool)
56+
if lb.Spec.EIPs != nil {
57+
for _, lbEipID := range lb.Spec.EIPs {
58+
lbEipMap[*lbEipID] = true
59+
}
60+
}
61+
62+
// eip/internal --> eip;
63+
if config.EipIDs != nil {
64+
// config eip
65+
configEipMap := make(map[string]bool)
66+
for _, configEipID := range config.EipIDs {
67+
configEipMap[*configEipID] = true
68+
if !lbEipMap[*configEipID] {
69+
eipsToAdd = append(eipsToAdd, configEipID)
70+
}
71+
}
72+
73+
for lbEipID := range lbEipMap {
74+
if !configEipMap[lbEipID] {
75+
eipsToDel = append(eipsToDel, &lbEipID)
76+
}
77+
}
78+
} else if config.EipSource != nil {
79+
switch *config.EipSource {
80+
case AllocateOnly, UseAvailableOnly, UseAvailableOrAllocateOne:
81+
if len(lb.Spec.EIPs) > 0 {
82+
// lb already has eip, do nothing
83+
klog.Infof("lb %s already has eip %s, do nothing", *lb.Status.LoadBalancerID, spew.Sdump(lb.Spec.EIPs))
84+
} else {
85+
// get or create an available eip from qingcloud and associate this eip to lb
86+
eip, err := qc.prepareEip(config.EipSource)
87+
if err != nil {
88+
return nil, nil, fmt.Errorf("prepare eip for lb %s error: %v", *lb.Status.LoadBalancerID, err)
89+
}
90+
eipsToAdd = append(eipsToAdd, eip.Status.EIPID)
91+
}
92+
default: // annotation value not correct, do nothing
93+
return nil, nil, fmt.Errorf("the value of annotation '%s' is mistake", ServiceAnnotationLoadBalancerEipSource)
94+
}
95+
} else if config.NetworkType == NetworkModeInternal { // eip/internal --> intertal
96+
// delete all eip from this lb
97+
if lb.Spec.EIPs != nil {
98+
eipsToDel = append(eipsToDel, lb.Spec.EIPs...)
99+
}
100+
}
101+
return
102+
}
103+
104+
func (qc *QingCloud) updateLBEip(config *LoadBalancerConfig, lb *apis.LoadBalancer) (err error) {
105+
var updated bool
106+
var eipsToAdd, eipsToDel []*string
107+
// if reuse lb, do nothing
108+
if config.ReuseLBID != "" {
109+
return nil
110+
}
111+
112+
eipsToAdd, eipsToDel, err = qc.diffLBEip(config, lb)
113+
if err != nil {
114+
return err
115+
}
116+
117+
// update lb eip
118+
if len(eipsToAdd) > 0 {
119+
klog.Infof("associating eips %s to lb %s", spew.Sdump(eipsToAdd), *lb.Status.LoadBalancerID)
120+
err = qc.Client.AssociateEIPsToLB(lb.Status.LoadBalancerID, eipsToAdd)
121+
if err != nil {
122+
return err
123+
}
124+
updated = true
125+
}
126+
if len(eipsToDel) > 0 {
127+
klog.Infof("dissociating eips %s from lb %s", spew.Sdump(eipsToDel), *lb.Status.LoadBalancerID)
128+
err = qc.Client.DissociateEIPsFromLB(lb.Status.LoadBalancerID, eipsToDel)
129+
if err != nil {
130+
return err
131+
}
132+
updated = true
133+
}
134+
135+
// update lb status
136+
if updated {
137+
lbNew, err := qc.Client.GetLoadBalancerByName(config.LoadBalancerName)
138+
if err != nil {
139+
return fmt.Errorf("get loadbalancer by name error: %v", err)
140+
}
141+
lb.Status = lbNew.Status
142+
}
143+
144+
return nil
145+
}

0 commit comments

Comments
 (0)