Skip to content

Commit e457f25

Browse files
committed
by network
1 parent 4727874 commit e457f25

5 files changed

Lines changed: 160 additions & 57 deletions

File tree

metis/pkg/daemon/daemon.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (d *Daemon) Run(ctx context.Context) error {
6969

7070
nodeName := os.Getenv("NODE_NAME")
7171
if nodeName == "" {
72-
klog.Warning("NODE_NAME environment variable not set")
72+
logger.Info("NODE_NAME environment variable not set")
7373
}
7474

7575
dbPath := d.Config.DBPath
@@ -88,7 +88,7 @@ func (d *Daemon) Run(ctx context.Context) error {
8888
var nncClient nncclientset.Interface
8989
var kubeClient kubernetes.Interface
9090
if err != nil {
91-
klog.Warning("Failed to get in-cluster config, clients will not be initialized")
91+
logger.Info("Failed to get in-cluster config, clients will not be initialized")
9292
} else {
9393
nncClient, err = nncclientset.NewForConfig(config)
9494
if err != nil {

metis/pkg/daemon/monitor.go

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ type MonitorConfig struct {
9999
// NewMonitor creates a new Monitor.
100100
func NewMonitor(cfg MonitorConfig) *Monitor {
101101
rl := workqueue.DefaultTypedControllerRateLimiter[string]()
102+
// We use a rate-limiting queue to:
103+
// 1. Deduplicate requests from the daemon server and the periodic monitor loop.
104+
// 2. Decouple the daemon server from processing inline, allowing it to just enqueue items.
105+
// 3. Benefit from automatic exponential backoff for retries on failure.
102106
queue := workqueue.NewTypedRateLimitingQueueWithConfig(rl, workqueue.TypedRateLimitingQueueConfig[string]{
103107
Name: "metis-daemon-monitor",
104108
})
@@ -121,18 +125,17 @@ func NewMonitor(cfg MonitorConfig) *Monitor {
121125
}
122126

123127
return &Monitor{
124-
queue: queue,
125-
nncClient: cfg.NNCClient,
126-
nodeName: cfg.NodeName,
127-
store: cfg.Store,
128-
logger: cfg.Logger,
129-
lowUtilizationTimers: make(map[string]time.Time),
130-
GetPendingRequestsCount: cfg.GetPendingRequestsCount,
131-
cooldownPushbackInterval: cooldownPushbackInterval,
132-
drainingExpiration: drainingExpiration,
133-
monitorInterval: monitorInterval,
134-
lowUtilizationThreshold: DefaultLowUtilizationThreshold,
135-
128+
queue: queue,
129+
nncClient: cfg.NNCClient,
130+
nodeName: cfg.NodeName,
131+
store: cfg.Store,
132+
logger: cfg.Logger,
133+
lowUtilizationTimers: make(map[string]time.Time),
134+
GetPendingRequestsCount: cfg.GetPendingRequestsCount,
135+
cooldownPushbackInterval: cooldownPushbackInterval,
136+
drainingExpiration: drainingExpiration,
137+
monitorInterval: monitorInterval,
138+
lowUtilizationThreshold: DefaultLowUtilizationThreshold,
136139
targetUtilizationAfterScaleUp: DefaultTargetUtilizationAfterScaleUp,
137140
cooldownPushbackThreshold: DefaultCooldownPushbackThreshold,
138141
sustainedLowUtilizationDuration: sustainedLowUtilizationDuration,
@@ -143,8 +146,8 @@ func NewMonitor(cfg MonitorConfig) *Monitor {
143146
func (m *Monitor) Run(ctx context.Context, workers int) {
144147
defer m.queue.ShutDown()
145148

146-
m.logger.Info("Starting IPAM monitor", "workers", workers)
147-
defer m.logger.Info("Stopping IPAM monitor")
149+
m.logger.Info("Starting Metis Daemon Monitor", "node", m.nodeName, "workers", workers, "interval", m.monitorInterval)
150+
defer m.logger.Info("Stopping Metis Daemon Monitor")
148151

149152
// Periodic enqueuer
150153
go wait.UntilWithContext(ctx, func(ctx context.Context) {
@@ -178,13 +181,16 @@ func (m *Monitor) processExpiredDrainingBlocks(ctx context.Context) {
178181
m.logger.Error(err, "failed to get networks for expired draining blocks check")
179182
return
180183
}
184+
185+
nnc, err := m.getNodeNetworkConfig(ctx)
186+
if err != nil {
187+
m.logger.Error(err, "failed to get NodeNetworkConfig")
188+
return
189+
}
190+
nncCopy := nnc.DeepCopy()
191+
anyUpdated := false
192+
181193
for _, network := range networks {
182-
nnc, err := m.getNodeNetworkConfig(ctx)
183-
if err != nil {
184-
m.logger.Error(err, "failed to get NodeNetworkConfig", "network", network)
185-
continue
186-
}
187-
nncCopy := nnc.DeepCopy()
188194
var currentAllocation *nncv1.Allocation
189195
for i := range nncCopy.Spec.Allocations {
190196
if nncCopy.Spec.Allocations[i].Network == network {
@@ -193,15 +199,19 @@ func (m *Monitor) processExpiredDrainingBlocks(ctx context.Context) {
193199
}
194200
}
195201

196-
updated, err := m.handleExpiredDrainingBlocks(ctx, network, nncCopy, currentAllocation)
202+
updated, err := m.handleExpiredDrainingBlocksPerNetwork(ctx, network, nncCopy, currentAllocation)
197203
if err != nil {
198204
m.logger.Error(err, "failed to handle expired draining blocks", "network", network)
199205
continue
200206
}
201207
if updated {
202-
if err := m.patchNNC(ctx, nncCopy); err != nil {
203-
m.logger.Error(err, "failed to patch NNC for expired draining blocks", "network", network)
204-
}
208+
anyUpdated = true
209+
}
210+
}
211+
212+
if anyUpdated {
213+
if err := m.patchNNC(ctx, nncCopy); err != nil {
214+
m.logger.Error(err, "failed to patch NNC for expired draining blocks")
205215
}
206216
}
207217
}
@@ -493,7 +503,12 @@ func (m *Monitor) maybeDrainExcessive(ctx context.Context, network string, info
493503
return false
494504
}
495505

496-
func (m *Monitor) handleExpiredDrainingBlocks(ctx context.Context, network string, nncCopy *nncv1.NodeNetworkConfig, currentAllocation *nncv1.Allocation) (bool, error) {
506+
func (m *Monitor) handleExpiredDrainingBlocksPerNetwork(ctx context.Context, network string, nncCopy *nncv1.NodeNetworkConfig, currentAllocation *nncv1.Allocation) (bool, error) {
507+
deletingBlocks, err := m.store.GetDeletingCIDRBlocks(ctx, network)
508+
if err != nil {
509+
return false, fmt.Errorf("failed to query deleting cidr blocks: %w", err)
510+
}
511+
497512
expiredBlocks, err := m.store.FindAndMarkExpiredDrainingCIDRBlocks(ctx, network, m.drainingExpiration)
498513
if err != nil {
499514
return false, fmt.Errorf("failed to query and mark draining cidr blocks: %w", err)
@@ -502,11 +517,6 @@ func (m *Monitor) handleExpiredDrainingBlocks(ctx context.Context, network strin
502517
var reducePods int
503518
updated := false
504519

505-
deletingBlocks, err := m.store.GetDeletingCIDRBlocks(ctx)
506-
if err != nil {
507-
return false, fmt.Errorf("failed to query deleting cidr blocks: %w", err)
508-
}
509-
510520
statusMap := make(map[string]nncv1.PodCIDR)
511521
for _, podCIDR := range nncCopy.Status.PodCIDRs {
512522
if podCIDR.Network == network {
@@ -539,6 +549,7 @@ func (m *Monitor) handleExpiredDrainingBlocks(ctx context.Context, network strin
539549

540550
// Reconcile blocks that are in Deleting state in the local DB but failed to be added
541551
// to the CRD's Spec.ReleasableCIDRs in a previous iteration.
552+
// Removing entry from status and releasableset is atomic, this is atomically done by other controllers.
542553
for _, block := range deletingBlocks {
543554
podCIDR, inStatus := statusMap[block.CIDR]
544555
if inStatus && !releasableSet[block.CIDR] {

metis/pkg/daemon/watcher.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type Watcher struct {
5353
func NewWatcher(logger logr.Logger, nncClient nncclientset.Interface, nncInformer nncinformers.NodeNetworkConfigInformer, store *store.Store, nodeName string, onCIDRAdded func(network string, availableIPs int)) *Watcher {
5454
rl := workqueue.DefaultTypedControllerRateLimiter[string]()
5555
queue := workqueue.NewTypedRateLimitingQueueWithConfig(rl, workqueue.TypedRateLimitingQueueConfig[string]{
56-
Name: "crd-watcher",
56+
Name: "metis-nnc-watcher",
5757
})
5858

5959
var nncLister nnclisters.NodeNetworkConfigLister
@@ -113,11 +113,11 @@ func NewWatcher(logger logr.Logger, nncClient nncclientset.Interface, nncInforme
113113
func (w *Watcher) Run(ctx context.Context, workers int) {
114114
defer w.queue.ShutDown()
115115

116-
w.logger.Info("Starting CRD watcher", "workers", workers)
117-
defer w.logger.Info("Stopping CRD watcher")
116+
w.logger.Info("Starting Metis Daemon NodeNetworkConfig CRD watcher", "workers", workers)
117+
defer w.logger.Info("Stopping Metis Daemon NodeNetworkConfig CRD watcher")
118118

119119
if w.nncSynced != nil {
120-
if !cache.WaitForNamedCacheSync("CRDWatcher", ctx.Done(), w.nncSynced) {
120+
if !cache.WaitForNamedCacheSync("MetisNNCWatcher", ctx.Done(), w.nncSynced) {
121121
return
122122
}
123123
}
@@ -170,7 +170,7 @@ func (w *Watcher) syncCIDR(ctx context.Context, network string) error {
170170
return err
171171
}
172172

173-
return w.maybeDeleteCIDRs(ctx, nnc)
173+
return w.maybeDeleteCIDRs(ctx, nnc, network)
174174
}
175175

176176
// getNodeNetworkConfig fetches the NodeNetworkConfig CR.
@@ -245,33 +245,33 @@ func (w *Watcher) addCIDR(ctx context.Context, nnc *nncv1.NodeNetworkConfig, net
245245
return nil
246246
}
247247

248-
func (w *Watcher) maybeDeleteCIDRs(ctx context.Context, nnc *nncv1.NodeNetworkConfig) error {
249-
toBeDeletedBlocks, err := w.store.GetDeletingCIDRBlocks(ctx)
248+
func (w *Watcher) maybeDeleteCIDRs(ctx context.Context, nnc *nncv1.NodeNetworkConfig, network string) error {
249+
toBeDeletedBlocks, err := w.store.GetDeletingCIDRBlocks(ctx, network)
250250
if err != nil {
251251
return fmt.Errorf("failed to query deleting cidr blocks: %w", err)
252252
}
253253

254-
var toBeDeletedBlockIDs []int64
255-
for _, block := range toBeDeletedBlocks {
256-
inStatus := false
257-
for _, podCIDR := range nnc.Status.PodCIDRs {
258-
if podCIDR.CIDR == block.CIDR {
259-
inStatus = true
260-
break
261-
}
254+
// Create a map for quick lookup of CIDRs in NNC status for the current network
255+
statusCIDRs := make(map[string]bool)
256+
for _, podCIDR := range nnc.Status.PodCIDRs {
257+
if podCIDR.Network == network {
258+
statusCIDRs[podCIDR.CIDR] = true
262259
}
260+
}
263261

264-
if !inStatus {
265-
toBeDeletedBlockIDs = append(toBeDeletedBlockIDs, block.ID)
262+
var blocksToDelete []store.DeletingCIDRBlock
263+
for _, block := range toBeDeletedBlocks {
264+
if !statusCIDRs[block.CIDR] {
265+
blocksToDelete = append(blocksToDelete, block)
266266
}
267267
}
268268

269-
for _, id := range toBeDeletedBlockIDs {
270-
err = w.store.DeleteCIDRBlock(ctx, id)
269+
for _, block := range blocksToDelete {
270+
err = w.store.DeleteCIDRBlock(ctx, block.ID)
271271
if err != nil {
272-
return fmt.Errorf("failed to delete cidr block %d from store: %w", id, err)
272+
return fmt.Errorf("failed to delete cidr block %d from store: %w", block.ID, err)
273273
}
274-
w.logger.Info("Deleted CIDR block from local DB as it was released by GCE", "cidrBlockID", id)
274+
w.logger.Info("Deleted CIDR block from local DB as it was released by GCE", "cidrBlockID", block.ID, "cidr", block.CIDR, "network", network)
275275
}
276276

277277
return nil

metis/pkg/store/store.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -557,11 +557,12 @@ type DeletingCIDRBlock struct {
557557
ID int64
558558
TotalIPs int
559559
CIDR string
560+
Network string
560561
}
561562

562-
// GetDeletingCIDRBlocks fetches all CIDR blocks in Deleting state.
563-
func (s *Store) GetDeletingCIDRBlocks(ctx context.Context) ([]DeletingCIDRBlock, error) {
564-
rows, err := s.db.QueryContext(ctx, "SELECT id, total_ips, cidr FROM cidr_blocks WHERE state = ?", StateDeleting)
563+
// GetDeletingCIDRBlocks fetches all CIDR blocks in Deleting state for a specific network.
564+
func (s *Store) GetDeletingCIDRBlocks(ctx context.Context, network string) ([]DeletingCIDRBlock, error) {
565+
rows, err := s.db.QueryContext(ctx, "SELECT id, total_ips, cidr, network FROM cidr_blocks WHERE state = ? AND network = ?", StateDeleting, network)
565566
if err != nil {
566567
return nil, err
567568
}
@@ -570,7 +571,7 @@ func (s *Store) GetDeletingCIDRBlocks(ctx context.Context) ([]DeletingCIDRBlock,
570571
var result []DeletingCIDRBlock
571572
for rows.Next() {
572573
var r DeletingCIDRBlock
573-
if err := rows.Scan(&r.ID, &r.TotalIPs, &r.CIDR); err != nil {
574+
if err := rows.Scan(&r.ID, &r.TotalIPs, &r.CIDR, &r.Network); err != nil {
574575
return nil, err
575576
}
576577
result = append(result, r)

metis/pkg/store/store_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -999,3 +999,94 @@ func TestStore_FindAndMarkExpiredDrainingCIDRBlocks(t *testing.T) {
999999
t.Errorf("Expected state Deleting, got %s", state)
10001000
}
10011001
}
1002+
1003+
func TestStore_GetDeletingCIDRBlocks(t *testing.T) {
1004+
logger := logr.Discard()
1005+
tempDir := t.TempDir()
1006+
dbPath := filepath.Join(tempDir, "deleting_test.sqlite")
1007+
s, err := NewStore(context.Background(), logger, dbPath)
1008+
if err != nil {
1009+
t.Fatalf("NewStore failed: %v", err)
1010+
}
1011+
defer s.Close()
1012+
1013+
network1 := "network-1"
1014+
network2 := "network-2"
1015+
1016+
// Add blocks for network 1
1017+
err = s.AddCIDR(context.Background(), network1, "10.0.1.0/29")
1018+
if err != nil {
1019+
t.Fatalf("AddCIDR failed: %v", err)
1020+
}
1021+
err = s.AddCIDR(context.Background(), network1, "10.0.2.0/29")
1022+
if err != nil {
1023+
t.Fatalf("AddCIDR failed: %v", err)
1024+
}
1025+
1026+
// Add block for network 2
1027+
err = s.AddCIDR(context.Background(), network2, "10.0.3.0/29")
1028+
if err != nil {
1029+
t.Fatalf("AddCIDR failed: %v", err)
1030+
}
1031+
1032+
// Mark one block of network 1 as Deleting
1033+
blocks1, err := s.GetReadyCIDRBlocksSorted(context.Background(), network1)
1034+
if err != nil || len(blocks1) < 2 {
1035+
t.Fatalf("Failed to get ready blocks for network 1: %v", err)
1036+
}
1037+
err = s.MarkCIDRBlockAsDeleting(context.Background(), blocks1[0].ID)
1038+
if err != nil {
1039+
t.Fatalf("MarkCIDRBlockAsDeleting failed: %v", err)
1040+
}
1041+
1042+
// Mark block of network 2 as Deleting
1043+
blocks2, err := s.GetReadyCIDRBlocksSorted(context.Background(), network2)
1044+
if err != nil || len(blocks2) == 0 {
1045+
t.Fatalf("Failed to get ready blocks for network 2: %v", err)
1046+
}
1047+
err = s.MarkCIDRBlockAsDeleting(context.Background(), blocks2[0].ID)
1048+
if err != nil {
1049+
t.Fatalf("MarkCIDRBlockAsDeleting failed: %v", err)
1050+
}
1051+
1052+
// Call GetDeletingCIDRBlocks for Network 1
1053+
deleting1, err := s.GetDeletingCIDRBlocks(context.Background(), network1)
1054+
if err != nil {
1055+
t.Fatalf("GetDeletingCIDRBlocks failed for network 1: %v", err)
1056+
}
1057+
if len(deleting1) != 1 {
1058+
t.Errorf("Expected 1 deleting block for Network 1, got %d", len(deleting1))
1059+
} else {
1060+
if deleting1[0].CIDR != "10.0.2.0/29" {
1061+
t.Errorf("Expected deleting block 10.0.2.0/29, got %s", deleting1[0].CIDR)
1062+
}
1063+
if deleting1[0].Network != network1 {
1064+
t.Errorf("Expected network %s, got %s", network1, deleting1[0].Network)
1065+
}
1066+
}
1067+
1068+
// Call GetDeletingCIDRBlocks for Network 2
1069+
deleting2, err := s.GetDeletingCIDRBlocks(context.Background(), network2)
1070+
if err != nil {
1071+
t.Fatalf("GetDeletingCIDRBlocks failed for network 2: %v", err)
1072+
}
1073+
if len(deleting2) != 1 {
1074+
t.Errorf("Expected 1 deleting block for Network 2, got %d", len(deleting2))
1075+
} else {
1076+
if deleting2[0].CIDR != "10.0.3.0/29" {
1077+
t.Errorf("Expected deleting block 10.0.3.0/29, got %s", deleting2[0].CIDR)
1078+
}
1079+
if deleting2[0].Network != network2 {
1080+
t.Errorf("Expected network %s, got %s", network2, deleting2[0].Network)
1081+
}
1082+
}
1083+
1084+
// Call GetDeletingCIDRBlocks for a non-existent network
1085+
deleting3, err := s.GetDeletingCIDRBlocks(context.Background(), "non-existent")
1086+
if err != nil {
1087+
t.Fatalf("GetDeletingCIDRBlocks failed for non-existent network: %v", err)
1088+
}
1089+
if len(deleting3) != 0 {
1090+
t.Errorf("Expected 0 deleting blocks for non-existent network, got %d", len(deleting3))
1091+
}
1092+
}

0 commit comments

Comments
 (0)