Skip to content

Commit d993110

Browse files
authored
Merge pull request #126 from qiangzii/master
add externalTrafficPolicy feature support for service
2 parents b6a663f + 9015c38 commit d993110

11 files changed

Lines changed: 684 additions & 16 deletions

File tree

cmd/main.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727

2828
"github.com/spf13/pflag"
2929
"k8s.io/apimachinery/pkg/util/wait"
30-
"k8s.io/cloud-provider"
30+
cloudprovider "k8s.io/cloud-provider"
3131
"k8s.io/cloud-provider/app"
3232
"k8s.io/cloud-provider/app/config"
3333
"k8s.io/cloud-provider/options"
@@ -36,8 +36,11 @@ import (
3636
_ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins
3737
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
3838
"k8s.io/klog/v2"
39+
3940
// For existing cloud providers, the option to import legacy providers is still available.
4041
// e.g. _"k8s.io/legacy-cloud-providers/<provider>"
42+
"github.com/yunify/qingcloud-cloud-controller-manager/pkg/controllers/clusternode"
43+
"github.com/yunify/qingcloud-cloud-controller-manager/pkg/controllers/endpoint"
4144
_ "github.com/yunify/qingcloud-cloud-controller-manager/pkg/qingcloud"
4245
)
4346

@@ -50,7 +53,7 @@ func main() {
5053
}
5154

5255
fss := cliflag.NamedFlagSets{}
53-
command := app.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, app.DefaultInitFuncConstructors, fss, wait.NeverStop)
56+
command := app.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, controllerInitializers(), fss, wait.NeverStop)
5457

5558
// TODO: once we switch everything over to Cobra commands, we can go back to calling
5659
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
@@ -87,3 +90,10 @@ func cloudInitializer(config *config.CompletedConfig) cloudprovider.Interface {
8790
}
8891
return cloud
8992
}
93+
94+
func controllerInitializers() map[string]app.InitFuncConstructor {
95+
controllerInitializers := app.DefaultInitFuncConstructors
96+
controllerInitializers["endpoint"] = endpoint.StartEndpointControllerWrapper
97+
controllerInitializers["clusternode"] = clusternode.StartClusterNodeControllerWrapper
98+
return controllerInitializers
99+
}

deploy/kube-cloud-controller-manager.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ rules:
4848
resources:
4949
- services
5050
verbs:
51+
- get
5152
- list
5253
- patch
5354
- update
@@ -80,7 +81,7 @@ rules:
8081
- apiGroups:
8182
- ""
8283
resources:
83-
- endpoints
84+
- pods
8485
verbs:
8586
- create
8687
- get

docs/configure.md

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,89 @@ spec:
148148
```
149149
监听器参数说明:https://docsv3.qingcloud.com/network/loadbalancer/api/listener/modify_listener_attribute/
150150

151+
## 五、配置LB监听器backend
152+
### 如何配置
153+
根据 `service` 的 `externalTrafficPolicy` 字段的取值,决定LB监听器backend的添加策略
154+
- `Local`: 只会添加提供服务pod所在的Worker节点为LB监听器的backend
155+
- `Cluster`: 如果`service`中不显式指定 `externalTrafficPolicy` 字段的值,则默认为`Cluster`;这种模式下,可以通过给服务添加相关注解来指定LB监听器backend的添加规则
156+
157+
`Cluster`模式下,目前支持的 `service` 注解有:
158+
- 使用指定Label的Worker节点作为后端服务器, `service.beta.kubernetes.io/qingcloud-lb-backend-label`,可以指定多个Label,多个Label以逗号分隔。例如:`key1=value1,key2=value2`,多个Label之间是And关系。同时,在需要成为后端服务器的Worker节点打上`key1=value1,key2=value2`的Label;只有服务指定的所有Label的key和value都和Worker节点匹配时,Worker节点会被选为服务的后端服务器;没有此注解则添加所有Worker节点为backend
159+
160+
### 参考示例
161+
#### Local模式
162+
将服务的`externalTrafficPolicy`指定为`Local`,只添加pod所在Worker节点为backend
163+
164+
```yaml
165+
kind: Service
166+
apiVersion: v1
167+
metadata:
168+
name: reuse-lb
169+
annotations:
170+
service.beta.kubernetes.io/qingcloud-load-balancer-eip-strategy: "reuse-lb"
171+
service.beta.kubernetes.io/qingcloud-load-balancer-id: "lb-oglqftju"
172+
spec:
173+
externalTrafficPolicy: Local
174+
selector:
175+
app: mylbapp
176+
type: LoadBalancer
177+
ports:
178+
- name: http
179+
port: 8090
180+
protocol: TCP
181+
targetPort: 80
182+
```
183+
184+
#### Cluster模式
185+
将服务的`externalTrafficPolicy`指定为`Cluster`,并在服务的注解`service.beta.kubernetes.io/qingcloud-lb-backend-label`中指定要添加为backend的worker节点的label:
186+
187+
```yaml
188+
kind: Service
189+
apiVersion: v1
190+
metadata:
191+
name: reuse-lb
192+
annotations:
193+
service.beta.kubernetes.io/qingcloud-load-balancer-eip-strategy: "reuse-lb"
194+
service.beta.kubernetes.io/qingcloud-load-balancer-id: "lb-oglqftju"
195+
service.beta.kubernetes.io/qingcloud-lb-backend-label: "test-label-key1=value1,test-label-key2=value2"
196+
spec:
197+
externalTrafficPolicy: Cluster
198+
selector:
199+
app: mylbapp
200+
type: LoadBalancer
201+
ports:
202+
- name: http
203+
port: 8090
204+
protocol: TCP
205+
targetPort: 80
206+
```
207+
208+
Worker节点如果要添加为上述服务的backend,则必须同时有下面两个label;注意只有其中一个label并不会添加为上述服务的backend:
209+
210+
```yaml
211+
apiVersion: v1
212+
kind: Node
213+
metadata:
214+
annotations:
215+
csi.volume.kubernetes.io/nodeid: '{"csi-qingcloud":"i-k5rl67a6"}'
216+
node.alpha.kubernetes.io/ttl: "0"
217+
node.beta.kubernetes.io/instance-id: i-k5rl67a6
218+
creationTimestamp: "2022-07-15T07:59:59Z"
219+
labels:
220+
test-label-key1: value1
221+
test-label-key2: value2
222+
name: worker-p001
223+
resourceVersion: "13094827"
224+
uid: b711ce74-9785-41bf-b0ac-777e3c9c0c34
225+
spec:
226+
podCIDR: 10.10.1.0/24
227+
podCIDRs:
228+
- 10.10.1.0/24
229+
status:
230+
...
231+
```
232+
233+
151234
## 配置内网负载均衡器
152235
### 已知问题
153236
k8s在ipvs模式下,kube-proxy会把内网负载均衡器的ip绑定在ipvs接口上,这样会导致从LB过来的包被drop(进来的是主网卡,但是出去的时候发现ipvs有这么一个ip,路由不一致)故目前无法在IPVS模式下使用内网负载均衡器。参考[issue](https://github.com/kubernetes/kubernetes/issues/79783)

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
k8s.io/client-go v0.21.1
1313
k8s.io/cloud-provider v0.21.1
1414
k8s.io/component-base v0.21.1
15+
k8s.io/controller-manager v0.21.1
1516
k8s.io/klog v1.0.0
1617
k8s.io/klog/v2 v2.9.0
1718
)

pkg/apis/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ type LoadBalancerListenerSpec struct {
8484
ListenerPort *int `json:"listener_port" name:"listener_port"`
8585
ListenerProtocol *string `json:"listener_protocol" name:"listener_protocol"`
8686
LoadBalancerListenerName *string `json:"loadbalancer_listener_name" name:"loadbalancer_listener_name"`
87+
LoadBalancerListenerID *string `json:"loadbalancer_listener_id" name:"loadbalancer_listener_id"`
8788
LoadBalancerID *string `json:"loadbalancer_id" name:"loadbalancer_id"`
8889
HealthyCheckMethod *string `json:"healthy_check_method" name:"healthy_check_method"`
8990
HealthyCheckOption *string `json:"healthy_check_option" name:"healthy_check_option"`
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package clusternode
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"time"
8+
9+
corev1 "k8s.io/api/core/v1"
10+
"k8s.io/apimachinery/pkg/labels"
11+
"k8s.io/apimachinery/pkg/util/runtime"
12+
"k8s.io/apimachinery/pkg/util/wait"
13+
coreinformers "k8s.io/client-go/informers/core/v1"
14+
clientset "k8s.io/client-go/kubernetes"
15+
corelisters "k8s.io/client-go/listers/core/v1"
16+
"k8s.io/client-go/tools/cache"
17+
"k8s.io/client-go/util/workqueue"
18+
cloudprovider "k8s.io/cloud-provider"
19+
cloudproviderapp "k8s.io/cloud-provider/app"
20+
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
21+
genericcontrollermanager "k8s.io/controller-manager/app"
22+
"k8s.io/klog"
23+
24+
"github.com/yunify/qingcloud-cloud-controller-manager/pkg/qingcloud"
25+
)
26+
27+
const (
28+
clusterNodeRunWorkerPeriod = 1 * time.Second
29+
clusterNodeWorkers = 1
30+
)
31+
32+
type ClusterNodeController struct {
33+
cloud cloudprovider.Interface
34+
35+
// svc
36+
serviceLister corelisters.ServiceLister
37+
serviceListerSynced cache.InformerSynced
38+
39+
// clusternode
40+
nodeLister corelisters.NodeLister
41+
nodeListerSynced cache.InformerSynced
42+
nodeQueue workqueue.RateLimitingInterface
43+
}
44+
45+
func StartClusterNodeControllerWrapper(completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) cloudproviderapp.InitFunc {
46+
return func(ctx genericcontrollermanager.ControllerContext) (http.Handler, bool, error) {
47+
return startClusterNodeController(completedConfig, cloud, ctx.Stop)
48+
}
49+
}
50+
51+
func startClusterNodeController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
52+
// Start the endpoint controller
53+
clusterNodeController, err := New(
54+
cloud,
55+
ctx.ClientBuilder.ClientOrDie("clusternode-controller"),
56+
ctx.SharedInformers.Core().V1().Services(),
57+
ctx.SharedInformers.Core().V1().Nodes(),
58+
)
59+
if err != nil {
60+
// This error shouldn't fail. It lives like this as a legacy.
61+
klog.Errorf("Failed to start endpoint controller: %v", err)
62+
return nil, false, nil
63+
}
64+
65+
go clusterNodeController.Run(stopCh, clusterNodeWorkers)
66+
67+
return nil, true, nil
68+
}
69+
70+
func New(
71+
cloud cloudprovider.Interface,
72+
kubeClient clientset.Interface,
73+
serviceInformer coreinformers.ServiceInformer,
74+
nodeInformer coreinformers.NodeInformer,
75+
) (*ClusterNodeController, error) {
76+
77+
cnc := &ClusterNodeController{
78+
cloud: cloud,
79+
serviceLister: serviceInformer.Lister(),
80+
serviceListerSynced: serviceInformer.Informer().HasSynced,
81+
nodeLister: nodeInformer.Lister(),
82+
nodeListerSynced: nodeInformer.Informer().HasSynced,
83+
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster-node"),
84+
}
85+
86+
nodeInformer.Informer().AddEventHandler(
87+
cache.ResourceEventHandlerFuncs{
88+
UpdateFunc: func(old, cur interface{}) {
89+
oldNode, ok1 := old.(*corev1.Node)
90+
curNode, ok2 := cur.(*corev1.Node)
91+
if ok1 && ok2 && cnc.needsUpdate(oldNode, curNode) {
92+
cnc.enqueueNode(cur)
93+
}
94+
},
95+
},
96+
)
97+
98+
return cnc, nil
99+
}
100+
101+
//check if node label changed
102+
func (cnc *ClusterNodeController) needsUpdate(old, new *corev1.Node) bool {
103+
104+
if len(old.Labels) != len(new.Labels) {
105+
return true
106+
}
107+
108+
for newLabelKey, newLabelValue := range new.Labels {
109+
oldLabelValuk, ok := old.Labels[newLabelKey]
110+
if !ok || newLabelValue != oldLabelValuk {
111+
return true
112+
}
113+
}
114+
115+
return false
116+
}
117+
118+
func (cnc *ClusterNodeController) enqueueNode(obj interface{}) {
119+
key, err := cache.MetaNamespaceKeyFunc(obj)
120+
if err != nil {
121+
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
122+
return
123+
}
124+
cnc.nodeQueue.Add(key)
125+
}
126+
127+
func (cnc *ClusterNodeController) Run(stopCh <-chan struct{}, workers int) {
128+
defer runtime.HandleCrash()
129+
defer cnc.nodeQueue.ShutDown()
130+
131+
klog.Info("Starting cluster node controller")
132+
defer klog.Info("Shutting down cluster node controller")
133+
134+
if !cache.WaitForCacheSync(stopCh, cnc.serviceListerSynced, cnc.nodeListerSynced) {
135+
return
136+
}
137+
138+
for i := 0; i < workers; i++ {
139+
go wait.Until(cnc.worker, clusterNodeRunWorkerPeriod, stopCh)
140+
}
141+
142+
<-stopCh
143+
}
144+
145+
func (cnc *ClusterNodeController) worker() {
146+
for cnc.processNextWorkItem() {
147+
}
148+
}
149+
150+
func (cnc *ClusterNodeController) processNextWorkItem() bool {
151+
key, quit := cnc.nodeQueue.Get()
152+
if quit {
153+
return false
154+
}
155+
defer cnc.nodeQueue.Done(key)
156+
157+
err := cnc.handleNodesUpdate(key.(string))
158+
if err == nil {
159+
cnc.nodeQueue.Forget(key)
160+
return true
161+
}
162+
163+
runtime.HandleError(fmt.Errorf("error processing cluster node %v (will retry): %v", key, err))
164+
cnc.nodeQueue.AddRateLimited(key)
165+
166+
return true
167+
}
168+
169+
// handleNodesUpdate handle service backend according to node lables
170+
func (cnc *ClusterNodeController) handleNodesUpdate(key string) error {
171+
startTime := time.Now()
172+
defer func() {
173+
klog.V(4).Infof("Finished handleNodesUpdate %q (%v)", key, time.Since(startTime))
174+
}()
175+
176+
// 1. get node list
177+
var nodes []*corev1.Node
178+
nodeList, err := cnc.nodeLister.List(labels.NewSelector())
179+
if err != nil {
180+
return fmt.Errorf("get node list error: %v", err)
181+
}
182+
for i, _ := range nodeList {
183+
nodes = append(nodes, nodeList[i])
184+
}
185+
186+
// 2. list all service
187+
svcs, err := cnc.serviceLister.List(labels.NewSelector())
188+
if err != nil {
189+
return fmt.Errorf("list service error: %v", err)
190+
}
191+
192+
// 3. filter service which externalTrafficPolicy=cluster and has annotation service.beta.kubernetes.io/qingcloud-lb-backend-label
193+
for _, svc := range svcs {
194+
_, ok := svc.Annotations[qingcloud.ServiceAnnotationBackendLabel]
195+
if ok && svc.Spec.Type == corev1.ServiceTypeLoadBalancer &&
196+
svc.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeCluster {
197+
klog.Infof("service %s serviceType = %s, externalTrafficPolicy = %s, also has backend label annotation , going to update loadbalancer", svc.Name, svc.Spec.Type, svc.Spec.ExternalTrafficPolicy)
198+
199+
// 4. update lb
200+
lbInterface, _ := cnc.cloud.LoadBalancer()
201+
err = lbInterface.UpdateLoadBalancer(context.TODO(), "", svc, nodes)
202+
if err != nil {
203+
return fmt.Errorf("update loadbalancer for service %s/%s error: %v", svc.Namespace, svc.Name, err)
204+
}
205+
klog.Infof("update loadbalancer for service %s/%s success", svc.Namespace, svc.Name)
206+
}
207+
}
208+
209+
return nil
210+
}

0 commit comments

Comments
 (0)