Skip to content

Commit 6e4bee9

Browse files
committed
update: enhance agent data handling with fingerprinting for change detection
1 parent 5b65e3d commit 6e4bee9

1 file changed

Lines changed: 33 additions & 10 deletions

File tree

internal/reconciler/reconciler.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package reconciler
33

44
import (
55
"context"
6+
"encoding/json"
67
"errors"
78
"fmt"
9+
"hash/fnv"
810
"strings"
911
"sync"
1012
"time"
@@ -31,7 +33,8 @@ type Reconciler struct {
3133
removeDelay time.Duration // delay before cleaning up orphaned CF resources
3234
mu sync.RWMutex
3335
containers map[string]*types.ParsedContainer // containerID -> parsed container
34-
agentData map[string][]*types.ParsedContainer // agentID -> containers
36+
agentData map[string][]*types.ParsedContainer // agentID -> containers
37+
agentFingerprints map[string]uint64 // agentID -> data hash
3538

3639
// Channel to trigger reconciliation when agent data changes
3740
agentTrigger chan struct{}
@@ -75,8 +78,9 @@ func NewReconciler(cfg *Config) *Reconciler {
7578
orphanTTL: cfg.OrphanTTL,
7679
removeDelay: cfg.RemoveDelay,
7780
containers: make(map[string]*types.ParsedContainer),
78-
agentData: make(map[string][]*types.ParsedContainer),
79-
agentTrigger: make(chan struct{}, 1),
81+
agentData: make(map[string][]*types.ParsedContainer),
82+
agentFingerprints: make(map[string]uint64),
83+
agentTrigger: make(chan struct{}, 1),
8084
expectedAgents: cfg.ExpectedAgents,
8185
agentReady: make(chan struct{}),
8286
startedAt: time.Now(),
@@ -655,31 +659,50 @@ func (r *Reconciler) resolveAccessReferences(containers []*types.ParsedContainer
655659
}
656660

657661
// UpdateAgentData updates container data from an agent.
662+
// Only triggers reconciliation when the data has actually changed.
658663
func (r *Reconciler) UpdateAgentData(agentID string, containers []*types.ParsedContainer) {
664+
fp := agentDataFingerprint(containers)
665+
659666
r.mu.Lock()
667+
changed := r.agentFingerprints[agentID] != fp
660668
r.agentData[agentID] = containers
669+
r.agentFingerprints[agentID] = fp
661670
allReady := r.checkExpectedAgentsLocked()
662671
r.mu.Unlock()
663672

664-
log.Debug().
665-
Str("agent", agentID).
666-
Int("containers", len(containers)).
667-
Msg("Updated agent data")
668-
669673
if allReady {
670674
r.agentReadyOnce.Do(func() {
671675
close(r.agentReady)
672676
})
673677
}
674678

675-
// Non-blocking send to trigger immediate reconciliation
679+
if !changed {
680+
log.Debug().
681+
Str("agent", agentID).
682+
Int("containers", len(containers)).
683+
Msg("Agent data unchanged, skipping reconciliation trigger")
684+
return
685+
}
686+
687+
log.Debug().
688+
Str("agent", agentID).
689+
Int("containers", len(containers)).
690+
Msg("Agent data updated, triggering reconciliation")
691+
676692
select {
677693
case r.agentTrigger <- struct{}{}:
678694
default:
679-
// Reconciliation already pending, skip duplicate trigger
680695
}
681696
}
682697

698+
// agentDataFingerprint computes a fast hash of container data for change detection.
699+
func agentDataFingerprint(containers []*types.ParsedContainer) uint64 {
700+
data, _ := json.Marshal(containers)
701+
h := fnv.New64a()
702+
h.Write(data)
703+
return h.Sum64()
704+
}
705+
683706
// checkExpectedAgentsLocked returns true if all expected agents have reported.
684707
// Must be called with r.mu held (read or write).
685708
func (r *Reconciler) checkExpectedAgentsLocked() bool {

0 commit comments

Comments
 (0)