Skip to content

Commit 65e0514

Browse files
committed
update: enhance agent data handling with fingerprinting for change detection
1 parent 5b65e3d commit 65e0514

1 file changed

Lines changed: 46 additions & 10 deletions

File tree

internal/reconciler/reconciler.go

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ package reconciler
33

44
import (
55
"context"
6+
"encoding/json"
67
"errors"
78
"fmt"
9+
"hash/fnv"
10+
"sort"
811
"strings"
912
"sync"
1013
"time"
@@ -31,7 +34,8 @@ type Reconciler struct {
3134
removeDelay time.Duration // delay before cleaning up orphaned CF resources
3235
mu sync.RWMutex
3336
containers map[string]*types.ParsedContainer // containerID -> parsed container
34-
agentData map[string][]*types.ParsedContainer // agentID -> containers
37+
agentData map[string][]*types.ParsedContainer // agentID -> containers
38+
agentFingerprints map[string]uint64 // agentID -> data hash
3539

3640
// Channel to trigger reconciliation when agent data changes
3741
agentTrigger chan struct{}
@@ -75,8 +79,9 @@ func NewReconciler(cfg *Config) *Reconciler {
7579
orphanTTL: cfg.OrphanTTL,
7680
removeDelay: cfg.RemoveDelay,
7781
containers: make(map[string]*types.ParsedContainer),
78-
agentData: make(map[string][]*types.ParsedContainer),
79-
agentTrigger: make(chan struct{}, 1),
82+
agentData: make(map[string][]*types.ParsedContainer),
83+
agentFingerprints: make(map[string]uint64),
84+
agentTrigger: make(chan struct{}, 1),
8085
expectedAgents: cfg.ExpectedAgents,
8186
agentReady: make(chan struct{}),
8287
startedAt: time.Now(),
@@ -655,31 +660,62 @@ func (r *Reconciler) resolveAccessReferences(containers []*types.ParsedContainer
655660
}
656661

657662
// UpdateAgentData updates container data from an agent.
663+
// Only triggers reconciliation when the data has actually changed.
658664
func (r *Reconciler) UpdateAgentData(agentID string, containers []*types.ParsedContainer) {
665+
fp := agentDataFingerprint(containers)
666+
659667
r.mu.Lock()
668+
changed := r.agentFingerprints[agentID] != fp
660669
r.agentData[agentID] = containers
670+
r.agentFingerprints[agentID] = fp
661671
allReady := r.checkExpectedAgentsLocked()
662672
r.mu.Unlock()
663673

664-
log.Debug().
665-
Str("agent", agentID).
666-
Int("containers", len(containers)).
667-
Msg("Updated agent data")
668-
669674
if allReady {
670675
r.agentReadyOnce.Do(func() {
671676
close(r.agentReady)
672677
})
673678
}
674679

675-
// Non-blocking send to trigger immediate reconciliation
680+
if !changed {
681+
log.Debug().
682+
Str("agent", agentID).
683+
Int("containers", len(containers)).
684+
Msg("Agent data unchanged, skipping reconciliation trigger")
685+
return
686+
}
687+
688+
log.Debug().
689+
Str("agent", agentID).
690+
Int("containers", len(containers)).
691+
Msg("Agent data updated, triggering reconciliation")
692+
676693
select {
677694
case r.agentTrigger <- struct{}{}:
678695
default:
679-
// Reconciliation already pending, skip duplicate trigger
680696
}
681697
}
682698

699+
// agentDataFingerprint computes a fast hash for change detection.
700+
// Only hashes fields that affect reconciliation (ID, Name, Labels),
701+
// skipping volatile fields like State, Created, Started, Networks.
702+
func agentDataFingerprint(containers []*types.ParsedContainer) uint64 {
703+
sorted := make([]*types.ParsedContainer, len(containers))
704+
copy(sorted, containers)
705+
sort.Slice(sorted, func(i, j int) bool {
706+
return sorted[i].Info.ID < sorted[j].Info.ID
707+
})
708+
709+
h := fnv.New64a()
710+
for _, c := range sorted {
711+
fmt.Fprintf(h, "%s\x00%s\x00", c.Info.ID, c.Info.Name)
712+
labels, _ := json.Marshal(c.Info.Labels)
713+
h.Write(labels)
714+
h.Write([]byte{0})
715+
}
716+
return h.Sum64()
717+
}
718+
683719
// checkExpectedAgentsLocked returns true if all expected agents have reported.
684720
// Must be called with r.mu held (read or write).
685721
func (r *Reconciler) checkExpectedAgentsLocked() bool {

0 commit comments

Comments
 (0)