From 73771330ce5719c09c3bffe7da52803aebdda19f Mon Sep 17 00:00:00 2001 From: Shereen Haj Date: Mon, 6 Oct 2025 12:51:50 +0300 Subject: [PATCH 1/3] pfp: recorder: support coalescing option Support new option that if set to true it would configure the recorders to push PFP statuses only if they are unique in PFP values compared to the last existing status, otherwise it just skips pushing the status. Note that this option works only for the last recorded status and it does not affect other statuses in the stack. Signed-off-by: Shereen Haj --- pkg/pfpstatus/pfpstatus.go | 20 +++++++---- pkg/pfpstatus/record/recorder.go | 42 ++++++++++++++++++++--- pkg/pfpstatus/record/recorder_test.go | 49 +++++++++++++++++++++++++++ 3 files changed, 99 insertions(+), 12 deletions(-) diff --git a/pkg/pfpstatus/pfpstatus.go b/pkg/pfpstatus/pfpstatus.go index db6723ea..262bfbfd 100644 --- a/pkg/pfpstatus/pfpstatus.go +++ b/pkg/pfpstatus/pfpstatus.go @@ -45,9 +45,10 @@ const ( ) type StorageParams struct { - Enabled bool - Directory string - Period time.Duration + Enabled bool + Directory string + Period time.Duration + CoalesceLast bool } type Params struct { @@ -63,9 +64,10 @@ type environ struct { func DefaultParams() Params { return Params{ Storage: StorageParams{ - Enabled: false, - Directory: DefaultDumpDirectory, - Period: 10 * time.Second, + Enabled: false, + Directory: DefaultDumpDirectory, + Period: 10 * time.Second, + CoalesceLast: false, }, } } @@ -95,7 +97,11 @@ func Setup(logh logr.Logger, params Params) { logh.Info("Setup in progress", "params", fmt.Sprintf("%+#v", params)) - rec, err := record.NewRecorder(record.WithMaxNodes(defaultMaxNodes), record.WithNodeCapacity(defaultMaxSamplesPerNode)) + rec, err := record.NewRecorder( + record.WithMaxNodes(defaultMaxNodes), + record.WithNodeCapacity(defaultMaxSamplesPerNode), + record.WithPFPCoalescing(params.Storage.CoalesceLast), + ) if err != nil { logh.Error(err, "cannot create a status recorder") return diff --git a/pkg/pfpstatus/record/recorder.go b/pkg/pfpstatus/record/recorder.go index afd7ebfc..21979704 100644 --- a/pkg/pfpstatus/record/recorder.go +++ b/pkg/pfpstatus/record/recorder.go @@ -52,10 +52,11 @@ type Timestamper func() time.Time // NodeRecorder stores all the recorded statuses for a given node name. // Statuses belonging to different nodes won't be accepted. type NodeRecorder struct { - timestamper func() time.Time - nodeName string // shortcut - capacity int - statuses []RecordedStatus + timestamper func() time.Time + nodeName string // shortcut + capacity int + statuses []RecordedStatus + coalesceLast bool } type NodeOption func(*NodeRecorder) @@ -72,6 +73,12 @@ func WithTimestamper(tsr func() time.Time) NodeOption { } } +func WithCoalescing(val bool) NodeOption { + return func(nr *NodeRecorder) { + nr.coalesceLast = val + } +} + // NewNodeRecorder creates a new recorder for the given node with the given capacity. // The record is a ring buffer, so only the latest Statuses are kept at any time. // The timestamper callback is used to mark times. Use `time.Now` if unsure. @@ -123,6 +130,14 @@ func (nr *NodeRecorder) Push(st podfingerprint.Status) error { return ErrMismatchingNode } ts := nr.timestamper() + + if nr.IsCoalescing() && nr.Len() != 0 { + lastItem := nr.statuses[nr.Len()-1] + if lastItem.FingerprintComputed == st.FingerprintComputed && lastItem.FingerprintExpected == st.FingerprintExpected { // pod list should not change + return nil + } + } + item := RecordedStatus{ Status: st.Clone(), RecordTime: ts, @@ -151,6 +166,11 @@ func (nr *NodeRecorder) Content() []RecordedStatus { return nr.statuses } +// IsCoalescing returns true if the node recorder is configured to push statuses only if they are unique in PFPs +func (nr *NodeRecorder) IsCoalescing() bool { + return nr.coalesceLast +} + // Recorder stores all the recorded statuses, dividing them by node name. // There is a hard cap of how many nodes are managed, and how many Statuses are recorded per node. type Recorder struct { @@ -158,6 +178,7 @@ type Recorder struct { nodeCapacity int maxNodes int timestamper Timestamper + coalesceLast bool } type Option func(*Recorder) @@ -180,6 +201,12 @@ func WithMaxNodes(maxNodes int) Option { } } +func WithPFPCoalescing(val bool) Option { + return func(rr *Recorder) { + rr.coalesceLast = val + } +} + // NewRecorder creates a new recorder up to the given node count, each with the given capacity. // Each per-node recorder is a ring buffer, so only the latest Statuses are kept // at any time for each node. The per-node records are created lazily as needed. @@ -238,6 +265,11 @@ func (rr *Recorder) Len() int { return tot } +// IsCoalescing returns true if the recorder is configured to push statuses only if they are unique in PFPs +func (rr *Recorder) IsCoalescing() bool { + return rr.coalesceLast +} + // Push adds a new Status to the record for its node, evicting the oldest Status // belonging to the same node if necessary. // Per-node records are created lazily as needed, up to the configured maximum. @@ -257,7 +289,7 @@ func (rr *Recorder) Push(st podfingerprint.Status) error { } if !ok { - nr, err = NewNodeRecorder(st.NodeName, WithTimestamper(rr.timestamper), WithCapacity(rr.nodeCapacity)) + nr, err = NewNodeRecorder(st.NodeName, WithTimestamper(rr.timestamper), WithCapacity(rr.nodeCapacity), WithCoalescing(rr.coalesceLast)) if err != nil { return err } diff --git a/pkg/pfpstatus/record/recorder_test.go b/pkg/pfpstatus/record/recorder_test.go index 9c60b02d..4a06d8b9 100644 --- a/pkg/pfpstatus/record/recorder_test.go +++ b/pkg/pfpstatus/record/recorder_test.go @@ -392,3 +392,52 @@ func TestRecorderMultiPushMultipleNodes(t *testing.T) { t.Fatalf("unexpected status count for %q", "node-0") } } + +func TestRecorderWithCoalescing(t *testing.T) { + rr, err := NewRecorder(WithMaxNodes(1), WithNodeCapacity(10), WithPFPCoalescing(true)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if rr.IsCoalescing() != true { + t.Fatalf("IsCoalescing should be true") + } + + st := podfingerprint.Status{ + FingerprintExpected: "pfp-exp-st1", + FingerprintComputed: "pfp-comp-st1", + NodeName: "node-1", + } + + rr.Push(st) + rs1 := rr.Content()["node-1"][0] + rr.Push(st) + rs2 := rr.Content()["node-1"][0] + if !rs2.RecordTime.Equal(rs1.RecordTime) { + t.Fatalf("RecordTime should not change, %v vs %v", rs2.RecordTime, rs1.RecordTime) + } + rr.Push(st) + rs3 := rr.Content()["node-1"][0] + if !rs3.RecordTime.Equal(rs1.RecordTime) { + t.Fatalf("RecordTime should not change, %v vs %v", rs3.RecordTime, rs1.RecordTime) + } + + if rr.CountRecords("node-1") != 1 { + t.Fatalf("unexpected status count with WithCoalescing option set to true") + } + + if rr.nodes["node-1"].IsCoalescing() != true { + t.Fatalf("unexpected status with WithCoalescing option set to true") + } + + st.FingerprintExpected = "pfp-exp-st2" + rr.Push(st) + if rr.CountRecords("node-1") != 2 { + t.Fatalf("unexpected status count") + } + + st.FingerprintComputed = "pfp-comp-st2" + rr.Push(st) + if rr.CountRecords("node-1") != 3 { + t.Fatalf("unexpected status count") + } +} From 3c4650545426590c43b0887a1db2984bb3d63dca Mon Sep 17 00:00:00 2001 From: Shereen Haj Date: Mon, 6 Oct 2025 13:01:35 +0300 Subject: [PATCH 2/3] pfp: separate recorders options Place Recorder and NodeRecorder options in one file for easier navigation. Signed-off-by: Shereen Haj --- pkg/pfpstatus/record/options.go | 65 ++++++++++++++++++++++++++++++++ pkg/pfpstatus/record/recorder.go | 50 +----------------------- 2 files changed, 67 insertions(+), 48 deletions(-) create mode 100644 pkg/pfpstatus/record/options.go diff --git a/pkg/pfpstatus/record/options.go b/pkg/pfpstatus/record/options.go new file mode 100644 index 00000000..aee603ba --- /dev/null +++ b/pkg/pfpstatus/record/options.go @@ -0,0 +1,65 @@ +/* + * Copyright 2025 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package record + +import "time" + +type Option func(*Recorder) + +func WithNodeCapacity(nodeCapacity int) Option { + return func(rec *Recorder) { + rec.nodeCapacity = nodeCapacity + } +} + +func WithNodeTimestamper(tsr func() time.Time) Option { + return func(rec *Recorder) { + rec.timestamper = tsr + } +} + +func WithMaxNodes(maxNodes int) Option { + return func(rec *Recorder) { + rec.maxNodes = maxNodes + } +} + +func WithPFPCoalescing(val bool) Option { + return func(rr *Recorder) { + rr.coalesceLast = val + } +} + +type NodeOption func(*NodeRecorder) + +func WithCapacity(capacity int) NodeOption { + return func(nr *NodeRecorder) { + nr.capacity = capacity + } +} + +func WithTimestamper(tsr func() time.Time) NodeOption { + return func(nr *NodeRecorder) { + nr.timestamper = tsr + } +} + +func WithCoalescing(val bool) NodeOption { + return func(nr *NodeRecorder) { + nr.coalesceLast = val + } +} diff --git a/pkg/pfpstatus/record/recorder.go b/pkg/pfpstatus/record/recorder.go index 21979704..94cd44e5 100644 --- a/pkg/pfpstatus/record/recorder.go +++ b/pkg/pfpstatus/record/recorder.go @@ -59,26 +59,6 @@ type NodeRecorder struct { coalesceLast bool } -type NodeOption func(*NodeRecorder) - -func WithCapacity(capacity int) NodeOption { - return func(nr *NodeRecorder) { - nr.capacity = capacity - } -} - -func WithTimestamper(tsr func() time.Time) NodeOption { - return func(nr *NodeRecorder) { - nr.timestamper = tsr - } -} - -func WithCoalescing(val bool) NodeOption { - return func(nr *NodeRecorder) { - nr.coalesceLast = val - } -} - // NewNodeRecorder creates a new recorder for the given node with the given capacity. // The record is a ring buffer, so only the latest Statuses are kept at any time. // The timestamper callback is used to mark times. Use `time.Now` if unsure. @@ -161,7 +141,7 @@ func (nr *NodeRecorder) Cap() int { return nr.capacity } -// Content() returns a shallow copy of all the recorded statuses. +// Content returns a shallow copy of all the recorded statuses. func (nr *NodeRecorder) Content() []RecordedStatus { return nr.statuses } @@ -181,32 +161,6 @@ type Recorder struct { coalesceLast bool } -type Option func(*Recorder) - -func WithNodeCapacity(nodeCapacity int) Option { - return func(rec *Recorder) { - rec.nodeCapacity = nodeCapacity - } -} - -func WithNodeTimestamper(tsr func() time.Time) Option { - return func(rec *Recorder) { - rec.timestamper = tsr - } -} - -func WithMaxNodes(maxNodes int) Option { - return func(rec *Recorder) { - rec.maxNodes = maxNodes - } -} - -func WithPFPCoalescing(val bool) Option { - return func(rr *Recorder) { - rr.coalesceLast = val - } -} - // NewRecorder creates a new recorder up to the given node count, each with the given capacity. // Each per-node recorder is a ring buffer, so only the latest Statuses are kept // at any time for each node. The per-node records are created lazily as needed. @@ -298,7 +252,7 @@ func (rr *Recorder) Push(st podfingerprint.Status) error { return nr.Push(st) } -// Content() returns a shallow copy of all the recorded statuses, by node name. +// Content returns a shallow copy of all the recorded statuses, by node name. func (rr *Recorder) Content() map[string][]RecordedStatus { ret := make(map[string][]RecordedStatus, len(rr.nodes)) for nodeName, nr := range rr.nodes { From 41f345fc9dea4886bee1d3e7ed4db9ffaffb6ecc Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Wed, 24 Sep 2025 15:40:06 +0200 Subject: [PATCH 3/3] pfp: recorder: implement maxSize option alongside maxCapacity Allow to configure the NodeRecorder with maximum capacity to avoid extreme growth in recorded PFP statuses size. This is yet another sealing besides maxCapacity that the node recorder should not cross. The maxSize takes precesdence over maxCapacity, meaning as long as the current size of the nodeRecorder does not reach the maximum allowed, `statuses` slice length is maintained and is allowed to reach the maximum capacity. Once the new item pushed to the nodeRecorder is expected to make the recorder reach the max allowed size, `statuses` will witness reductions in older items to allow the new status to fit, under the constraint that node recorder will at least have one status no matter its size. So the boundary is important but the recorder goal should still be on top. Signed-off-by: Francesco Romani --- pkg/pfpstatus/pfpstatus.go | 20 +++--- pkg/pfpstatus/record/options.go | 12 ++++ pkg/pfpstatus/record/recorder.go | 53 +++++++++++++--- pkg/pfpstatus/record/recorder_test.go | 90 +++++++++++++++++++++++++++ 4 files changed, 160 insertions(+), 15 deletions(-) diff --git a/pkg/pfpstatus/pfpstatus.go b/pkg/pfpstatus/pfpstatus.go index 262bfbfd..8c3c6ca1 100644 --- a/pkg/pfpstatus/pfpstatus.go +++ b/pkg/pfpstatus/pfpstatus.go @@ -41,14 +41,16 @@ const ( const ( defaultMaxNodes = 5000 defaultMaxSamplesPerNode = 10 + defaultMaxSizePerNode = 0 // no constraints defaultDumpPeriod = 10 * time.Second ) type StorageParams struct { - Enabled bool - Directory string - Period time.Duration - CoalesceLast bool + Enabled bool + Directory string + Period time.Duration + CoalesceLast bool + MaxSizePerNode int } type Params struct { @@ -64,10 +66,11 @@ type environ struct { func DefaultParams() Params { return Params{ Storage: StorageParams{ - Enabled: false, - Directory: DefaultDumpDirectory, - Period: 10 * time.Second, - CoalesceLast: false, + Enabled: false, + Directory: DefaultDumpDirectory, + Period: 10 * time.Second, + CoalesceLast: false, + MaxSizePerNode: defaultMaxSizePerNode, }, } } @@ -100,6 +103,7 @@ func Setup(logh logr.Logger, params Params) { rec, err := record.NewRecorder( record.WithMaxNodes(defaultMaxNodes), record.WithNodeCapacity(defaultMaxSamplesPerNode), + record.WithMaxSizePerNode(defaultMaxSizePerNode), record.WithPFPCoalescing(params.Storage.CoalesceLast), ) if err != nil { diff --git a/pkg/pfpstatus/record/options.go b/pkg/pfpstatus/record/options.go index aee603ba..2a76713d 100644 --- a/pkg/pfpstatus/record/options.go +++ b/pkg/pfpstatus/record/options.go @@ -44,6 +44,12 @@ func WithPFPCoalescing(val bool) Option { } } +func WithMaxSizePerNode(maxSize int) Option { + return func(rr *Recorder) { + rr.maxSize = maxSize + } +} + type NodeOption func(*NodeRecorder) func WithCapacity(capacity int) NodeOption { @@ -63,3 +69,9 @@ func WithCoalescing(val bool) NodeOption { nr.coalesceLast = val } } + +func WithMaxSize(maxSize int) NodeOption { + return func(nr *NodeRecorder) { + nr.maxSize = maxSize + } +} diff --git a/pkg/pfpstatus/record/recorder.go b/pkg/pfpstatus/record/recorder.go index 94cd44e5..e2ec71ce 100644 --- a/pkg/pfpstatus/record/recorder.go +++ b/pkg/pfpstatus/record/recorder.go @@ -18,6 +18,8 @@ package record import ( "errors" + "fmt" + "io" "time" "github.com/k8stopologyawareschedwg/podfingerprint" @@ -40,6 +42,13 @@ type RecordedStatus struct { podfingerprint.Status // RecordTime is a timestamp of when the RecordedStatus was added to the record RecordTime time.Time `json:"recordTime"` + // statusSize is approximate size of the string representation of the status in bytes + statusSize int `json:"-"` +} + +func (rs RecordedStatus) Size() int { + byteCount, _ := fmt.Fprint(io.Discard, fmt.Sprintf("%+v\n%+v", rs.Pods, rs.RecordTime)) + return byteCount } func (rs RecordedStatus) Equal(x RecordedStatus) bool { @@ -55,6 +64,8 @@ type NodeRecorder struct { timestamper func() time.Time nodeName string // shortcut capacity int + maxSize int + size int statuses []RecordedStatus coalesceLast bool } @@ -88,14 +99,27 @@ func (nr *NodeRecorder) dropOldest() { if nr.Len() < 1 { return } + nr.size -= nr.statuses[0].Size() nr.statuses = nr.statuses[1:] } -func (nr *NodeRecorder) makeRoom() { - if nr.Len() < nr.Cap() { +func (nr *NodeRecorder) makeRoom(size int) { + if nr.capacity > 1 && nr.Len() == nr.Cap() { + nr.dropOldest() + } + if nr.maxSize == 0 { + return + } + + if size >= nr.maxSize { + nr.size = 0 + nr.statuses = []RecordedStatus{} return } - nr.dropOldest() + + for (nr.size + size) > nr.maxSize { + nr.dropOldest() + } } // Push adds a new Status to the record, evicting the oldest Status if necessary. @@ -119,15 +143,19 @@ func (nr *NodeRecorder) Push(st podfingerprint.Status) error { } item := RecordedStatus{ - Status: st.Clone(), + Status: st, RecordTime: ts, } + item.statusSize = item.Size() if nr.capacity == 1 { // handle common special case, avoid any resize nr.statuses[0] = item + nr.size = item.Size() + // no maxSize constraint required return nil } - nr.makeRoom() + nr.makeRoom(item.statusSize) nr.statuses = append(nr.statuses, item) + nr.size += item.Size() return nil } @@ -151,12 +179,18 @@ func (nr *NodeRecorder) IsCoalescing() bool { return nr.coalesceLast } +// MaxSize returns the maximum size allowed for nr +func (nr *NodeRecorder) MaxSize() int { + return nr.maxSize +} + // Recorder stores all the recorded statuses, dividing them by node name. // There is a hard cap of how many nodes are managed, and how many Statuses are recorded per node. type Recorder struct { nodes map[string]*NodeRecorder nodeCapacity int maxNodes int + maxSize int timestamper Timestamper coalesceLast bool } @@ -186,7 +220,7 @@ func NewRecorder(opts ...Option) (*Recorder, error) { return &rec, nil } -// Cap returns the maximum nodes allowed in this Recorder +// MaxNodes returns the maximum nodes allowed in this Recorder func (rr *Recorder) MaxNodes() int { return rr.maxNodes } @@ -224,6 +258,11 @@ func (rr *Recorder) IsCoalescing() bool { return rr.coalesceLast } +// MaxSize returns the maximum size allowed per NodeRecorder in rr +func (rr *Recorder) MaxSize() int { + return rr.maxSize +} + // Push adds a new Status to the record for its node, evicting the oldest Status // belonging to the same node if necessary. // Per-node records are created lazily as needed, up to the configured maximum. @@ -243,7 +282,7 @@ func (rr *Recorder) Push(st podfingerprint.Status) error { } if !ok { - nr, err = NewNodeRecorder(st.NodeName, WithTimestamper(rr.timestamper), WithCapacity(rr.nodeCapacity), WithCoalescing(rr.coalesceLast)) + nr, err = NewNodeRecorder(st.NodeName, WithTimestamper(rr.timestamper), WithCapacity(rr.nodeCapacity), WithMaxSize(rr.maxSize), WithCoalescing(rr.coalesceLast)) if err != nil { return err } diff --git a/pkg/pfpstatus/record/recorder_test.go b/pkg/pfpstatus/record/recorder_test.go index 4a06d8b9..cedf809c 100644 --- a/pkg/pfpstatus/record/recorder_test.go +++ b/pkg/pfpstatus/record/recorder_test.go @@ -19,7 +19,9 @@ package record import ( "errors" "fmt" + "io" "testing" + "time" "github.com/k8stopologyawareschedwg/podfingerprint" ) @@ -441,3 +443,91 @@ func TestRecorderWithCoalescing(t *testing.T) { t.Fatalf("unexpected status count") } } + +func TestRecorderWithMaxSize(t *testing.T) { + rr, err := NewRecorder(WithMaxNodes(8), WithNodeCapacity(5), WithMaxSizePerNode(1500)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if rr.MaxSize() != 1500 { + t.Fatalf("unexpected max size: %d", rr.MaxSize()) + } +} + +func TestNodeRecorderWithMaxSize(t *testing.T) { + rt := time.Now() // to manage accurate size checks + rtf := func() time.Time { + return rt + } + + st := podfingerprint.Status{ + FingerprintExpected: "pfp-exp-st1", + FingerprintComputed: "pfp-comp-st1", + NodeName: "node-0", + } + rs := RecordedStatus{ + Status: st, + RecordTime: rt, + } + stSize := rs.Size() + maxSize := 2*stSize + 1000 + + nr, err := NewNodeRecorder("node-0", WithCapacity(5), WithTimestamper(rtf), WithMaxSize(maxSize)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if nr.MaxSize() != maxSize { + t.Fatalf("unexpected max size: %d vs %d", nr.MaxSize(), maxSize) + } + + for i := 0; i < 2; i++ { + if err := nr.Push(st); err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + + stSize += rs.Size() + if nr.Len() != 2 { + t.Fatalf("unexpected length: %d", nr.Len()) + } + if nr.size != stSize { + t.Fatalf("unexpected size: %d vs %d", nr.size, stSize) + } + + var pods []podfingerprint.NamespacedName + for i := 0; getSize(pods) < maxSize; i++ { + pods = append(pods, podfingerprint.NamespacedName{ + Namespace: fmt.Sprintf("pod-name %d", i), + Name: fmt.Sprintf("namespace-name %d", i), + }) + } + + rs2 := RecordedStatus{ + Status: podfingerprint.Status{ + FingerprintExpected: "pfp-exp-st2", + FingerprintComputed: "pfp-comp-st2", + Pods: pods, + NodeName: st.NodeName, + }, + RecordTime: rt, + } + + if err := nr.Push(rs2.Status); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if nr.Len() != 1 { + t.Fatalf("unexpected length: expected 1 found %d", nr.Len()) + } + + if nr.size != rs2.Size() { + t.Fatalf("unexpected size: expected %d found %d", rs2.Size(), nr.size) + } +} + +func getSize(pods []podfingerprint.NamespacedName) int { + byteCount, _ := fmt.Fprint(io.Discard, fmt.Sprintf("%+v\n", pods)) + return byteCount + +}