Skip to content

Commit 49035a4

Browse files
committed
Add flex CIDR allocator controller
1 parent bbe46ad commit 49035a4

File tree

11 files changed

+1273
-9
lines changed

11 files changed

+1273
-9
lines changed

pkg/cloudprovider/providers/oci/ccm.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,14 @@ func (cp *CloudProvider) Initialize(clientBuilder cloudprovider.ControllerClient
191191
cp.logger,
192192
cp.instanceCache,
193193
cp.client)
194+
flexCIDRController := NewFlexCIDRController(
195+
factory.Core().V1().Nodes(),
196+
factory.Core().V1().Services(),
197+
cp.kubeclient,
198+
cp,
199+
cp.logger,
200+
cp.instanceCache,
201+
cp.client)
194202

195203
nodeInformer := factory.Core().V1().Nodes()
196204
go nodeInformer.Informer().Run(wait.NeverStop)
@@ -202,6 +210,7 @@ func (cp *CloudProvider) Initialize(clientBuilder cloudprovider.ControllerClient
202210
go serviceAccountInformer.Informer().Run(wait.NeverStop)
203211

204212
go nodeInfoController.Run(wait.NeverStop)
213+
go flexCIDRController.Run(wait.NeverStop)
205214

206215
// If the cluster is type OpenShift then the Tagging Controller
207216
// should be enabled.
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
package oci
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/oracle/oci-cloud-controller-manager/pkg/flexcidr"
9+
"github.com/oracle/oci-cloud-controller-manager/pkg/oci/client"
10+
"github.com/oracle/oci-go-sdk/v65/core"
11+
"github.com/pkg/errors"
12+
"go.uber.org/zap"
13+
v1 "k8s.io/api/core/v1"
14+
"k8s.io/apimachinery/pkg/types"
15+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
16+
"k8s.io/apimachinery/pkg/util/wait"
17+
coreinformers "k8s.io/client-go/informers/core/v1"
18+
clientset "k8s.io/client-go/kubernetes"
19+
"k8s.io/client-go/tools/cache"
20+
"k8s.io/client-go/util/workqueue"
21+
)
22+
23+
const flexCIDRRetryDelay = time.Minute
24+
25+
type FlexCIDRController struct {
26+
nodeInformer coreinformers.NodeInformer
27+
serviceInformer coreinformers.ServiceInformer
28+
kubeClient clientset.Interface
29+
cloud *CloudProvider
30+
queue workqueue.RateLimitingInterface
31+
logger *zap.SugaredLogger
32+
instanceCache cache.Store
33+
ociClient client.Interface
34+
}
35+
36+
func NewFlexCIDRController(
37+
nodeInformer coreinformers.NodeInformer,
38+
serviceInformer coreinformers.ServiceInformer,
39+
kubeClient clientset.Interface,
40+
cloud *CloudProvider,
41+
logger *zap.SugaredLogger,
42+
instanceCache cache.Store,
43+
ociClient client.Interface) *FlexCIDRController {
44+
45+
controller := &FlexCIDRController{
46+
nodeInformer: nodeInformer,
47+
serviceInformer: serviceInformer,
48+
kubeClient: kubeClient,
49+
cloud: cloud,
50+
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
51+
logger: logger,
52+
instanceCache: instanceCache,
53+
ociClient: ociClient,
54+
}
55+
56+
controller.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
57+
AddFunc: func(obj interface{}) {
58+
node := obj.(*v1.Node)
59+
controller.queue.Add(node.Name)
60+
},
61+
UpdateFunc: func(_, newObj interface{}) {
62+
node := newObj.(*v1.Node)
63+
controller.queue.Add(node.Name)
64+
},
65+
})
66+
67+
return controller
68+
}
69+
70+
func (fcc *FlexCIDRController) Run(stopCh <-chan struct{}) {
71+
defer utilruntime.HandleCrash()
72+
defer fcc.queue.ShutDown()
73+
74+
fcc.logger.Info("Starting flex CIDR controller")
75+
76+
if !cache.WaitForCacheSync(stopCh, fcc.nodeInformer.Informer().HasSynced, fcc.serviceInformer.Informer().HasSynced) {
77+
utilruntime.HandleError(fmt.Errorf("timed out waiting for flex CIDR controller caches to sync"))
78+
return
79+
}
80+
81+
wait.Until(fcc.runWorker, time.Second, stopCh)
82+
}
83+
84+
func (fcc *FlexCIDRController) runWorker() {
85+
for fcc.processNextItem() {
86+
}
87+
}
88+
89+
func (fcc *FlexCIDRController) processNextItem() bool {
90+
key, quit := fcc.queue.Get()
91+
if quit {
92+
return false
93+
}
94+
defer fcc.queue.Done(key)
95+
96+
if err := fcc.processItem(key.(string)); err != nil {
97+
fcc.logger.Errorf("Error processing flex CIDR for node %s (will retry): %v", key, err)
98+
fcc.queue.AddRateLimited(key)
99+
} else {
100+
fcc.queue.Forget(key)
101+
}
102+
103+
return true
104+
}
105+
106+
func (fcc *FlexCIDRController) processItem(key string) error {
107+
logger := fcc.logger.With("node", key)
108+
109+
node, err := fcc.nodeInformer.Lister().Get(key)
110+
if err != nil {
111+
return err
112+
}
113+
114+
if len(node.Spec.PodCIDRs) > 0 && len(node.Spec.ProviderID) == 0 {
115+
logger.Debug("node already has podCIDRs but providerID is empty, skipping")
116+
return nil
117+
}
118+
119+
instance, instanceID, err := fcc.getInstanceByNode(node, logger)
120+
if err != nil {
121+
return err
122+
}
123+
if instance == nil {
124+
return nil
125+
}
126+
127+
if err := fcc.instanceCache.Add(instance); err != nil {
128+
logger.With(zap.Error(err)).Debug("failed to add instance to cache")
129+
}
130+
131+
if instance.LifecycleState != core.InstanceLifecycleStateRunning {
132+
logger.Infof("instance %s not running yet, requeueing", instanceID)
133+
fcc.queue.AddAfter(key, flexCIDRRetryDelay)
134+
return nil
135+
}
136+
137+
config, hasConfig := flexcidr.ParsePrimaryVnicConfig(instance)
138+
if !hasConfig {
139+
logger.Debug("instance metadata does not include flex CIDR configuration, skipping")
140+
return nil
141+
}
142+
143+
clusterIPFamily, err := flexcidr.GetClusterIpFamily(context.Background(), fcc.serviceInformer.Lister())
144+
if err != nil {
145+
logger.With(zap.Error(err)).Info("cluster IP family not ready yet, requeueing")
146+
fcc.queue.AddAfter(key, flexCIDRRetryDelay)
147+
return nil
148+
}
149+
150+
primaryVNIC, err := fcc.ociClient.Compute().GetPrimaryVNICForInstance(context.Background(), *instance.CompartmentId, instanceID)
151+
if err != nil {
152+
return errors.Wrap(err, "GetPrimaryVNICForInstance")
153+
}
154+
155+
flexCIDRManager := &flexcidr.FlexCIDR{
156+
Logger: logger,
157+
PrimaryVnicConfig: config,
158+
ClusterIpFamily: clusterIPFamily,
159+
OciCoreClient: fcc.ociClient.Networking(nil),
160+
}
161+
162+
flexCIDRs, err := flexCIDRManager.GetOrCreateFlexCidrList(*primaryVNIC.Id)
163+
if err != nil {
164+
return err
165+
}
166+
if !flexCIDRManager.ValidateFlexCidrList(flexCIDRs) {
167+
return fmt.Errorf("computed flex CIDRs %v are invalid", flexCIDRs)
168+
}
169+
if flexcidr.StringSlicesEqualIgnoreOrder(node.Spec.PodCIDRs, flexCIDRs) {
170+
logger.Debugf("node already has expected podCIDRs %v", flexCIDRs)
171+
return nil
172+
}
173+
174+
return flexcidr.PatchNodePodCIDRs(context.Background(), fcc.kubeClient, node.Name, flexCIDRs, logger)
175+
}
176+
177+
func (fcc *FlexCIDRController) getInstanceByNode(node *v1.Node, logger *zap.SugaredLogger) (*core.Instance, string, error) {
178+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
179+
defer cancel()
180+
181+
providerID := node.Spec.ProviderID
182+
var err error
183+
if providerID == "" {
184+
providerID, err = fcc.cloud.InstanceID(ctx, types.NodeName(node.Name))
185+
if err != nil {
186+
return nil, "", err
187+
}
188+
}
189+
190+
instanceID, err := MapProviderIDToResourceID(providerID)
191+
if err != nil {
192+
logger.With(zap.Error(err)).Error("failed to map providerID to instanceID")
193+
return nil, "", err
194+
}
195+
196+
instance, err := fcc.ociClient.Compute().GetInstance(ctx, instanceID)
197+
if err != nil {
198+
logger.With(zap.Error(err)).Error("failed to fetch instance")
199+
return nil, "", err
200+
}
201+
202+
return instance, instanceID, nil
203+
}

pkg/cloudprovider/providers/oci/instances_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,6 +1041,10 @@ func (c *MockVirtualNetworkClient) CreatePrivateIp(ctx context.Context, vnicID s
10411041
return nil, nil
10421042
}
10431043

1044+
func (c *MockVirtualNetworkClient) CreatePrivateIpWithRequest(ctx context.Context, request core.CreatePrivateIpRequest) (core.PrivateIp, error) {
1045+
return core.PrivateIp{}, nil
1046+
}
1047+
10441048
func (c *MockVirtualNetworkClient) GetIpv6(ctx context.Context, id string) (*core.Ipv6, error) {
10451049
return &core.Ipv6{}, nil
10461050
}
@@ -1061,6 +1065,10 @@ func (c *MockVirtualNetworkClient) CreateIpv6(ctx context.Context, vnicID string
10611065
return &core.Ipv6{}, nil
10621066
}
10631067

1068+
func (c *MockVirtualNetworkClient) CreateIpv6WithRequest(ctx context.Context, request core.CreateIpv6Request) (core.Ipv6, error) {
1069+
return core.Ipv6{}, nil
1070+
}
1071+
10641072
func (c *MockVirtualNetworkClient) GetSubnet(ctx context.Context, id string) (*core.Subnet, error) {
10651073
if subnet, ok := subnets[id]; ok {
10661074
return subnet, nil

pkg/csi/driver/bv_controller_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,10 +487,26 @@ func (c *MockVirtualNetworkClient) CreatePrivateIp(ctx context.Context, vnicId s
487487
return &core.PrivateIp{}, nil
488488
}
489489

490+
func (c *MockVirtualNetworkClient) CreatePrivateIpWithRequest(ctx context.Context, request core.CreatePrivateIpRequest) (core.PrivateIp, error) {
491+
return core.PrivateIp{}, nil
492+
}
493+
490494
func (c *MockVirtualNetworkClient) ListPrivateIps(ctx context.Context, id string) ([]core.PrivateIp, error) {
491495
return []core.PrivateIp{}, nil
492496
}
493497

498+
func (c *MockVirtualNetworkClient) ListIpv6s(ctx context.Context, vnicId string) ([]core.Ipv6, error) {
499+
return []core.Ipv6{}, nil
500+
}
501+
502+
func (c *MockVirtualNetworkClient) CreateIpv6(ctx context.Context, vnicID string) (*core.Ipv6, error) {
503+
return &core.Ipv6{}, nil
504+
}
505+
506+
func (c *MockVirtualNetworkClient) CreateIpv6WithRequest(ctx context.Context, request core.CreateIpv6Request) (core.Ipv6, error) {
507+
return core.Ipv6{}, nil
508+
}
509+
494510
func (c *MockVirtualNetworkClient) GetSubnet(ctx context.Context, id string) (*core.Subnet, error) {
495511
if strings.EqualFold(id, "ocid1.invalid-subnet") {
496512
return nil, errors.New("Internal Error.")

0 commit comments

Comments
 (0)