88 "errors"
99 "hash"
1010 "hash/fnv"
11+ "log/slog"
1112 "math/big"
1213 "slices"
1314 "strings"
@@ -157,6 +158,16 @@ type DistMemory struct {
157158
158159 // stopped guards Stop() against double-invocation (idempotent shutdown).
159160 stopped atomic.Bool
161+
162+ // logger is the structured logger used by background loops
163+ // (heartbeat, hint replay, rebalance, gossip, merkle sync) and
164+ // error surfaces (transport bind failures, sync errors, dropped
165+ // hints). Defaults to a no-op handler writing to io.Discard so
166+ // library code does not write to stderr unless the caller opts
167+ // in via WithDistLogger. All log lines are pre-bound with
168+ // `node_id` so operators can grep/filter without the call sites
169+ // having to weave the ID through every record.
170+ logger * slog.Logger
160171}
161172
162173const (
@@ -433,6 +444,11 @@ func (dm *DistMemory) SyncWith(ctx context.Context, nodeID string) error {
433444
434445 remoteTree , err := transport .FetchMerkle (ctx , nodeID )
435446 if err != nil {
447+ dm .logger .Warn ("merkle sync fetch failed" ,
448+ slog .String ("peer_id" , nodeID ),
449+ slog .Any ("err" , err ),
450+ )
451+
436452 return err
437453 }
438454
@@ -625,6 +641,29 @@ func WithDistHTTPAuth(auth DistHTTPAuth) DistMemoryOption {
625641 return func (dm * DistMemory ) { dm .httpAuth = auth }
626642}
627643
644+ // WithDistLogger supplies a structured logger for the dist backend's
645+ // background loops (heartbeat, hint replay, rebalance, gossip, merkle
646+ // auto-sync) and operational error surfaces (HTTP listener failures,
647+ // transport errors, dropped hints). The supplied logger is wrapped with
648+ // `node_id` and `component=dist_memory` attributes before use, so call
649+ // sites do not need to weave the node ID through every record.
650+ //
651+ // Pass slog.Default() to inherit the application's logger, or supply a
652+ // custom *slog.Logger with the desired level / handler. Zero-value (no
653+ // option call) keeps the dist backend silent — the default uses an
654+ // io.Discard handler, which means library code never writes to stderr
655+ // unless the caller opts in.
656+ //
657+ // nil is treated as "no change" — useful when callers conditionally
658+ // build options.
659+ func WithDistLogger (logger * slog.Logger ) DistMemoryOption {
660+ return func (dm * DistMemory ) {
661+ if logger != nil {
662+ dm .logger = logger
663+ }
664+ }
665+ }
666+
628667// NewDistMemory creates a new DistMemory backend.
629668func NewDistMemory (ctx context.Context , opts ... DistMemoryOption ) (IBackend [DistMemory ], error ) {
630669 // Derive a server-lifetime context from the caller's ctx so that:
@@ -659,6 +698,20 @@ func NewDistMemory(ctx context.Context, opts ...DistMemoryOption) (IBackend[Dist
659698 return nil , authErr
660699 }
661700
701+ // Default the logger to a no-op handler so library code never writes
702+ // to stderr unless the caller opts in via WithDistLogger. Bind the
703+ // node identity once here so call sites can log without re-attaching
704+ // it on every record. Done before subsystem startup so background
705+ // loops capture the bound logger when they spawn.
706+ if dm .logger == nil {
707+ dm .logger = slog .New (slog .DiscardHandler )
708+ }
709+
710+ dm .logger = dm .logger .With (
711+ slog .String ("component" , "dist_memory" ),
712+ slog .String ("node_id" , dm .nodeID ),
713+ )
714+
662715 dm .ensureShardConfig ()
663716 dm .initMembershipIfNeeded ()
664717 // Pass the lifecycle ctx to subsystems that capture it (HTTP handlers,
@@ -1917,7 +1970,18 @@ func (dm *DistMemory) migrateIfNeeded(ctx context.Context, item *cache.Item) {
19171970 dm .metrics .rebalancedKeys .Add (1 )
19181971 dm .metrics .rebalancedPrimary .Add (1 )
19191972
1920- _ = transport .ForwardSet (ctx , string (owners [0 ]), item , true )
1973+ // Fire-and-forget forwarding: failures are dropped silently today
1974+ // (Phase B will introduce a retry queue). Logging is the minimum
1975+ // surface so operators can correlate vanished keys with transport
1976+ // failures during rolling deploys.
1977+ migrationErr := transport .ForwardSet (ctx , string (owners [0 ]), item , true )
1978+ if migrationErr != nil {
1979+ dm .logger .Warn ("rebalance migration forward failed" ,
1980+ slog .String ("key" , item .Key ),
1981+ slog .String ("new_primary" , string (owners [0 ])),
1982+ slog .Any ("err" , migrationErr ),
1983+ )
1984+ }
19211985
19221986 // Update originalPrimary so we don't recount repeatedly.
19231987 sh := dm .shardFor (item .Key )
@@ -2077,14 +2141,26 @@ func (dm *DistMemory) tryStartHTTP(ctx context.Context) {
20772141 server := newDistHTTPServer (dm .nodeAddr , limits , dm .httpAuth )
20782142
20792143 server .ctx = dm .lifeCtx // handler-side cancellation tied to Stop
2144+ server .logger = dm .logger
20802145
20812146 err := server .start (ctx , dm )
2082- if err != nil { // best-effort
2147+ if err != nil { // best-effort, but the operator must see this
2148+ dm .logger .Error ("dist HTTP listener bind failed" ,
2149+ slog .String ("addr" , dm .nodeAddr ),
2150+ slog .Any ("err" , err ),
2151+ )
2152+
20832153 return
20842154 }
20852155
20862156 dm .httpServer = server
20872157
2158+ dm .logger .Info ("dist HTTP listener started" ,
2159+ slog .String ("addr" , dm .nodeAddr ),
2160+ slog .Bool ("tls" , limits .TLSConfig != nil ),
2161+ slog .Bool ("auth" , dm .httpAuth .inboundConfigured ()),
2162+ )
2163+
20882164 resolver := dm .makePeerURLResolver (limits )
20892165
20902166 dm .storeTransport (NewDistHTTPTransportWithAuth (limits , dm .httpAuth , resolver ))
@@ -2609,6 +2685,12 @@ func (dm *DistMemory) processHint(ctx context.Context, nodeID string, entry hint
26092685
26102686 dm .metrics .hintedDropped .Add (1 )
26112687
2688+ dm .logger .Warn ("hint dropped after replay error" ,
2689+ slog .String ("peer_id" , nodeID ),
2690+ slog .String ("key" , entry .item .Key ),
2691+ slog .Any ("err" , err ),
2692+ )
2693+
26122694 return 1
26132695}
26142696
@@ -3153,6 +3235,12 @@ func (dm *DistMemory) evaluateLiveness(ctx context.Context, now time.Time, node
31533235 if dm .membership .Remove (node .ID ) {
31543236 dm .metrics .nodesRemoved .Add (1 )
31553237 dm .metrics .nodesDead .Add (1 )
3238+
3239+ dm .logger .Warn ("peer pruned (dead)" ,
3240+ slog .String ("peer_id" , string (node .ID )),
3241+ slog .String ("peer_addr" , node .Address ),
3242+ slog .Duration ("elapsed_since_seen" , elapsed ),
3243+ )
31563244 }
31573245
31583246 return
@@ -3161,6 +3249,11 @@ func (dm *DistMemory) evaluateLiveness(ctx context.Context, now time.Time, node
31613249 if dm .hbSuspectAfter > 0 && elapsed > dm .hbSuspectAfter && node .State == cluster .NodeAlive { // suspect
31623250 dm .membership .Mark (node .ID , cluster .NodeSuspect )
31633251 dm .metrics .nodesSuspect .Add (1 )
3252+
3253+ dm .logger .Info ("peer marked suspect (timeout)" ,
3254+ slog .String ("peer_id" , string (node .ID )),
3255+ slog .Duration ("elapsed_since_seen" , elapsed ),
3256+ )
31643257 }
31653258
31663259 transport := dm .loadTransport ()
@@ -3179,6 +3272,11 @@ func (dm *DistMemory) evaluateLiveness(ctx context.Context, now time.Time, node
31793272 if node .State == cluster .NodeAlive { // escalate
31803273 dm .membership .Mark (node .ID , cluster .NodeSuspect )
31813274 dm .metrics .nodesSuspect .Add (1 )
3275+
3276+ dm .logger .Info ("peer marked suspect (probe failed)" ,
3277+ slog .String ("peer_id" , string (node .ID )),
3278+ slog .Any ("err" , err ),
3279+ )
31823280 }
31833281
31843282 return
0 commit comments