@@ -3,8 +3,11 @@ package reconciler
33
44import (
55 "context"
6+ "encoding/json"
67 "errors"
78 "fmt"
9+ "hash/fnv"
10+ "sort"
811 "strings"
912 "sync"
1013 "time"
@@ -31,7 +34,8 @@ type Reconciler struct {
3134 removeDelay time.Duration // delay before cleaning up orphaned CF resources
3235 mu sync.RWMutex
3336 containers map [string ]* types.ParsedContainer // containerID -> parsed container
34- agentData map [string ][]* types.ParsedContainer // agentID -> containers
37+ agentData map [string ][]* types.ParsedContainer // agentID -> containers
38+ agentFingerprints map [string ]uint64 // agentID -> data hash
3539
3640 // Channel to trigger reconciliation when agent data changes
3741 agentTrigger chan struct {}
@@ -75,8 +79,9 @@ func NewReconciler(cfg *Config) *Reconciler {
7579 orphanTTL : cfg .OrphanTTL ,
7680 removeDelay : cfg .RemoveDelay ,
7781 containers : make (map [string ]* types.ParsedContainer ),
78- agentData : make (map [string ][]* types.ParsedContainer ),
79- agentTrigger : make (chan struct {}, 1 ),
82+ agentData : make (map [string ][]* types.ParsedContainer ),
83+ agentFingerprints : make (map [string ]uint64 ),
84+ agentTrigger : make (chan struct {}, 1 ),
8085 expectedAgents : cfg .ExpectedAgents ,
8186 agentReady : make (chan struct {}),
8287 startedAt : time .Now (),
@@ -655,31 +660,57 @@ func (r *Reconciler) resolveAccessReferences(containers []*types.ParsedContainer
655660}
656661
657662// UpdateAgentData updates container data from an agent.
663+ // Only triggers reconciliation when the data has actually changed.
658664func (r * Reconciler ) UpdateAgentData (agentID string , containers []* types.ParsedContainer ) {
665+ fp := agentDataFingerprint (containers )
666+
659667 r .mu .Lock ()
668+ changed := r .agentFingerprints [agentID ] != fp
660669 r .agentData [agentID ] = containers
670+ r .agentFingerprints [agentID ] = fp
661671 allReady := r .checkExpectedAgentsLocked ()
662672 r .mu .Unlock ()
663673
664- log .Debug ().
665- Str ("agent" , agentID ).
666- Int ("containers" , len (containers )).
667- Msg ("Updated agent data" )
668-
669674 if allReady {
670675 r .agentReadyOnce .Do (func () {
671676 close (r .agentReady )
672677 })
673678 }
674679
675- // Non-blocking send to trigger immediate reconciliation
680+ if ! changed {
681+ log .Debug ().
682+ Str ("agent" , agentID ).
683+ Int ("containers" , len (containers )).
684+ Msg ("Agent data unchanged, skipping reconciliation trigger" )
685+ return
686+ }
687+
688+ log .Debug ().
689+ Str ("agent" , agentID ).
690+ Int ("containers" , len (containers )).
691+ Msg ("Agent data updated, triggering reconciliation" )
692+
676693 select {
677694 case r .agentTrigger <- struct {}{}:
678695 default :
679- // Reconciliation already pending, skip duplicate trigger
680696 }
681697}
682698
699+ // agentDataFingerprint computes a fast hash of container data for change detection.
700+ // Containers are sorted by ID before hashing to ensure order-independent comparison,
701+ // since Docker API does not guarantee a stable listing order.
702+ func agentDataFingerprint (containers []* types.ParsedContainer ) uint64 {
703+ sorted := make ([]* types.ParsedContainer , len (containers ))
704+ copy (sorted , containers )
705+ sort .Slice (sorted , func (i , j int ) bool {
706+ return sorted [i ].Info .ID < sorted [j ].Info .ID
707+ })
708+ data , _ := json .Marshal (sorted )
709+ h := fnv .New64a ()
710+ h .Write (data )
711+ return h .Sum64 ()
712+ }
713+
683714// checkExpectedAgentsLocked returns true if all expected agents have reported.
684715// Must be called with r.mu held (read or write).
685716func (r * Reconciler ) checkExpectedAgentsLocked () bool {
0 commit comments