diff --git a/p2p/host/resource-manager/log_reporter.go b/p2p/host/resource-manager/log_reporter.go new file mode 100644 index 0000000000..bbe1e5fb31 --- /dev/null +++ b/p2p/host/resource-manager/log_reporter.go @@ -0,0 +1,85 @@ +package rcmgr + +import ( + "github.com/libp2p/go-libp2p/core/network" +) + +// LogReporter is a trace reporter that logs blocked resource events. +// It consumes trace events and produces debug logs, ensuring consistency +// between logs and metrics by using the same trace event source. +type LogReporter struct{} + +var _ TraceReporter = (*LogReporter)(nil) + +// NewLogReporter creates a new LogReporter instance. +func NewLogReporter() LogReporter { + return LogReporter{} +} + +// ConsumeEvent implements TraceReporter by logging blocked resource events. +func (r LogReporter) ConsumeEvent(evt TraceEvt) { + switch evt.Type { + case TraceBlockReserveMemoryEvt: + r.logBlockedMemory(evt) + case TraceBlockAddStreamEvt: + r.logBlockedStream(evt) + case TraceBlockAddConnEvt: + r.logBlockedConn(evt) + } +} + +func (r LogReporter) logBlockedMemory(evt TraceEvt) { + logValues := make([]interface{}, 0, 2*6) + logValues = append(logValues, + "scope", evt.Name, + "priority", evt.Priority, + "current", evt.Memory, + "attempted", evt.Delta, + ) + + log.Debug("blocked memory reservation", logValues...) +} + +func (r LogReporter) logBlockedStream(evt TraceEvt) { + logValues := make([]interface{}, 0, 2*6) + + var dir network.Direction + if evt.DeltaIn != 0 { + dir = network.DirInbound + } else { + dir = network.DirOutbound + } + + logValues = append(logValues, + "scope", evt.Name, + "direction", dir, + "inbound", evt.StreamsIn, + "outbound", evt.StreamsOut, + ) + + log.Debug("blocked stream", logValues...) +} + +func (r LogReporter) logBlockedConn(evt TraceEvt) { + logValues := make([]interface{}, 0, 2*7) + + var dir network.Direction + if evt.DeltaIn != 0 { + dir = network.DirInbound + } else { + dir = network.DirOutbound + } + + usefd := evt.Delta != 0 + + logValues = append(logValues, + "scope", evt.Name, + "direction", dir, + "usefd", usefd, + "inbound", evt.ConnsIn, + "outbound", evt.ConnsOut, + "fd", evt.FD, + ) + + log.Debug("blocked connection", logValues...) +} diff --git a/p2p/host/resource-manager/log_reporter_test.go b/p2p/host/resource-manager/log_reporter_test.go new file mode 100644 index 0000000000..b891fc76e2 --- /dev/null +++ b/p2p/host/resource-manager/log_reporter_test.go @@ -0,0 +1,125 @@ +package rcmgr + +import ( + "testing" + + "github.com/libp2p/go-libp2p/core/network" +) + +func TestLogReporter(t *testing.T) { + reporter := NewLogReporter() + + // Test blocked memory event + memEvt := TraceEvt{ + Type: TraceBlockReserveMemoryEvt, + Name: "test-scope", + Priority: 128, + Delta: 1024, + Memory: 2048, + } + // Should not panic + reporter.ConsumeEvent(memEvt) + + // Test blocked stream event + streamEvt := TraceEvt{ + Type: TraceBlockAddStreamEvt, + Name: "test-scope", + DeltaIn: 1, + DeltaOut: 0, + StreamsIn: 5, + StreamsOut: 3, + } + reporter.ConsumeEvent(streamEvt) + + // Test blocked connection event + connEvt := TraceEvt{ + Type: TraceBlockAddConnEvt, + Name: "test-scope", + DeltaIn: 0, + DeltaOut: 1, + Delta: 1, // fd + ConnsIn: 2, + ConnsOut: 4, + FD: 10, + } + reporter.ConsumeEvent(connEvt) + + // Test non-blocked events (should be ignored) + normalEvt := TraceEvt{ + Type: TraceAddStreamEvt, + Name: "test-scope", + } + reporter.ConsumeEvent(normalEvt) +} + +func TestLogReporterDirection(t *testing.T) { + reporter := NewLogReporter() + + // Test inbound stream + inboundStream := TraceEvt{ + Type: TraceBlockAddStreamEvt, + Name: "test-scope", + DeltaIn: 1, + DeltaOut: 0, + StreamsIn: 5, + StreamsOut: 3, + } + reporter.ConsumeEvent(inboundStream) + + // Test outbound stream + outboundStream := TraceEvt{ + Type: TraceBlockAddStreamEvt, + Name: "test-scope", + DeltaIn: 0, + DeltaOut: 1, + StreamsIn: 5, + StreamsOut: 3, + } + reporter.ConsumeEvent(outboundStream) + + // Test inbound connection + inboundConn := TraceEvt{ + Type: TraceBlockAddConnEvt, + Name: "test-scope", + DeltaIn: 1, + DeltaOut: 0, + Delta: 0, + ConnsIn: 2, + ConnsOut: 4, + FD: 10, + } + reporter.ConsumeEvent(inboundConn) + + // Test outbound connection + outboundConn := TraceEvt{ + Type: TraceBlockAddConnEvt, + Name: "test-scope", + DeltaIn: 0, + DeltaOut: 1, + Delta: 1, + ConnsIn: 2, + ConnsOut: 4, + FD: 10, + } + reporter.ConsumeEvent(outboundConn) +} + +func TestLogReporterIntegration(t *testing.T) { + // Create a resource manager with our log reporter + limits := DefaultLimits.AutoScale() + limiter := NewFixedLimiter(limits) + + mgr, err := NewResourceManager(limiter) + if err != nil { + t.Fatal(err) + } + defer mgr.Close() + + // The log reporter should be installed by default + // Try to exceed a limit and verify the trace event is generated + _, err = mgr.OpenConnection(network.DirInbound, false, nil) + if err != nil { + // Expected to fail when limits are exceeded, but that's okay + // The important part is that our reporter processes the event + } +} diff --git a/p2p/host/resource-manager/rcmgr.go b/p2p/host/resource-manager/rcmgr.go index efacb4547d..fdc7128f00 100644 --- a/p2p/host/resource-manager/rcmgr.go +++ b/p2p/host/resource-manager/rcmgr.go @@ -177,15 +177,19 @@ func NewResourceManager(limits Limiter, opts ...Option) (network.ResourceManager } r.verifySourceAddressRateLimiter = newVerifySourceAddressRateLimiter(r.connLimiter) + // Add LogReporter to ensure blocked resource logs come from trace events + logReporter := NewLogReporter() + if r.trace == nil { + r.trace = &trace{} + } + r.trace.reporters = append(r.trace.reporters, logReporter) + if !r.disableMetrics { var sr TraceReporter sr, err := NewStatsTraceReporter() if err != nil { log.Error("failed to initialise StatsTraceReporter", "err", err) } else { - if r.trace == nil { - r.trace = &trace{} - } found := false for _, rep := range r.trace.reporters { if rep == sr { diff --git a/p2p/host/resource-manager/scope.go b/p2p/host/resource-manager/scope.go index 3233f4cbb2..9889effe3a 100644 --- a/p2p/host/resource-manager/scope.go +++ b/p2p/host/resource-manager/scope.go @@ -345,7 +345,6 @@ func (s *resourceScope) ReserveMemory(size int, prio uint8) error { } if err := s.rc.reserveMemory(int64(size), prio); err != nil { - log.Debug("blocked memory reservation", logValuesMemoryLimit(s.name, "", s.rc.stat(), err)...) s.trace.BlockReserveMemory(s.name, prio, int64(size), s.rc.memory) s.metrics.BlockMemory(size) return s.wrapError(err) @@ -370,10 +369,8 @@ func (s *resourceScope) reserveMemoryForEdges(size int, prio uint8) error { var reserved int var err error for _, e := range s.edges { - var stat network.ScopeStat - stat, err = e.ReserveMemoryForChild(int64(size), prio) + _, err = e.ReserveMemoryForChild(int64(size), prio) if err != nil { - log.Debug("blocked memory reservation from constraining edge", logValuesMemoryLimit(s.name, e.name, stat, err)...) break } @@ -452,7 +449,6 @@ func (s *resourceScope) AddStream(dir network.Direction) error { } if err := s.rc.addStream(dir); err != nil { - log.Debug("blocked stream", logValuesStreamLimit(s.name, "", dir, s.rc.stat(), err)...) s.trace.BlockAddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut) return s.wrapError(err) } @@ -474,10 +470,8 @@ func (s *resourceScope) addStreamForEdges(dir network.Direction) error { var err error var reserved int for _, e := range s.edges { - var stat network.ScopeStat - stat, err = e.AddStreamForChild(dir) + _, err = e.AddStreamForChild(dir) if err != nil { - log.Debug("blocked stream from constraining edge", logValuesStreamLimit(s.name, e.name, dir, stat, err)...) break } reserved++ @@ -554,7 +548,6 @@ func (s *resourceScope) AddConn(dir network.Direction, usefd bool) error { } if err := s.rc.addConn(dir, usefd); err != nil { - log.Debug("blocked connection", logValuesConnLimit(s.name, "", dir, usefd, s.rc.stat(), err)...) s.trace.BlockAddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd) return s.wrapError(err) } @@ -576,10 +569,8 @@ func (s *resourceScope) addConnForEdges(dir network.Direction, usefd bool) error var err error var reserved int for _, e := range s.edges { - var stat network.ScopeStat - stat, err = e.AddConnForChild(dir, usefd) + _, err = e.AddConnForChild(dir, usefd) if err != nil { - log.Debug("blocked connection from constraining edge", logValuesConnLimit(s.name, e.name, dir, usefd, stat, err)...) break } reserved++