Skip to content

Commit bba0e9a

Browse files
ffromanishajmakh
authored andcommitted
pfp: recorder: implement maxSize option alongside maxCapacity
Allow to configure the NodeRecorder with maximum capacity to avoid extreme growth in recorded PFP statuses size. This is yet another sealing besides maxCapacity that the node recorder should not cross. The maxSize takes precesdence over maxCapacity, meaning as long as the current size of the nodeRecorder does not reach the maximum allowed, `statuses` slice length is maintained and is allowed to reach the maximum capacity. Once the new item pushed to the nodeRecorder is expected to make the recorder reach the max allowed size, `statuses` will witness reductions in older items to allow the new status to fit, under the constraint that node recorder will at least have one status no matter its size. So the boundary is important but the recorder goal should still be on top. Signed-off-by: Francesco Romani <fromani@redhat.com>
1 parent bb289b7 commit bba0e9a

4 files changed

Lines changed: 154 additions & 15 deletions

File tree

pkg/pfpstatus/pfpstatus.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,16 @@ const (
4141
const (
4242
defaultMaxNodes = 5000
4343
defaultMaxSamplesPerNode = 10
44+
defaultMaxSizePerNode = 0 // no constraints
4445
defaultDumpPeriod = 10 * time.Second
4546
)
4647

4748
type StorageParams struct {
48-
Enabled bool
49-
Directory string
50-
Period time.Duration
51-
CoalesceLast bool
49+
Enabled bool
50+
Directory string
51+
Period time.Duration
52+
CoalesceLast bool
53+
MaxSizePerNode int
5254
}
5355

5456
type Params struct {
@@ -64,10 +66,11 @@ type environ struct {
6466
func DefaultParams() Params {
6567
return Params{
6668
Storage: StorageParams{
67-
Enabled: false,
68-
Directory: DefaultDumpDirectory,
69-
Period: 10 * time.Second,
70-
CoalesceLast: false,
69+
Enabled: false,
70+
Directory: DefaultDumpDirectory,
71+
Period: 10 * time.Second,
72+
CoalesceLast: false,
73+
MaxSizePerNode: defaultMaxSizePerNode,
7174
},
7275
}
7376
}
@@ -100,6 +103,7 @@ func Setup(logh logr.Logger, params Params) {
100103
rec, err := record.NewRecorder(
101104
record.WithMaxNodes(defaultMaxNodes),
102105
record.WithNodeCapacity(defaultMaxSamplesPerNode),
106+
record.WithMaxSizePerNode(defaultMaxSizePerNode),
103107
record.WithPFPCoalescing(params.Storage.CoalesceLast),
104108
)
105109
if err != nil {

pkg/pfpstatus/record/options.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ func WithPFPCoalescing(val bool) Option {
4444
}
4545
}
4646

47+
func WithMaxSizePerNode(maxSize int) Option {
48+
return func(rr *Recorder) {
49+
rr.maxSize = maxSize
50+
}
51+
}
52+
4753
type NodeOption func(*NodeRecorder)
4854

4955
func WithCapacity(capacity int) NodeOption {
@@ -63,3 +69,9 @@ func WithCoalescing(val bool) NodeOption {
6369
nr.coalesceLast = val
6470
}
6571
}
72+
73+
func WithMaxSize(maxSize int) NodeOption {
74+
return func(nr *NodeRecorder) {
75+
nr.maxSize = maxSize
76+
}
77+
}

pkg/pfpstatus/record/recorder.go

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package record
1818

1919
import (
2020
"errors"
21+
"fmt"
22+
"io"
2123
"time"
2224

2325
"github.com/k8stopologyawareschedwg/podfingerprint"
@@ -40,6 +42,13 @@ type RecordedStatus struct {
4042
podfingerprint.Status
4143
// RecordTime is a timestamp of when the RecordedStatus was added to the record
4244
RecordTime time.Time `json:"recordTime"`
45+
// statusSize is approximate size of the string representation of the status in bytes
46+
statusSize int `json:"-"`
47+
}
48+
49+
func (rs RecordedStatus) Size() int {
50+
byteCount, _ := fmt.Fprint(io.Discard, fmt.Sprintf("%+v\n%+v", rs.Pods, rs.RecordTime))
51+
return byteCount
4352
}
4453

4554
func (rs RecordedStatus) Equal(x RecordedStatus) bool {
@@ -55,6 +64,8 @@ type NodeRecorder struct {
5564
timestamper func() time.Time
5665
nodeName string // shortcut
5766
capacity int
67+
maxSize int
68+
size int
5869
statuses []RecordedStatus
5970
coalesceLast bool
6071
}
@@ -88,14 +99,21 @@ func (nr *NodeRecorder) dropOldest() {
8899
if nr.Len() < 1 {
89100
return
90101
}
102+
nr.size -= nr.statuses[0].Size()
91103
nr.statuses = nr.statuses[1:]
92104
}
93105

94-
func (nr *NodeRecorder) makeRoom() {
95-
if nr.Len() < nr.Cap() {
106+
func (nr *NodeRecorder) makeRoom(st RecordedStatus) {
107+
if nr.capacity > 1 && nr.Len() == nr.Cap() {
108+
nr.dropOldest()
96109
return
97110
}
98-
nr.dropOldest()
111+
if nr.maxSize == 0 {
112+
return
113+
}
114+
for (nr.size+st.statusSize) > nr.maxSize && nr.Len() > 0 {
115+
nr.dropOldest()
116+
}
99117
}
100118

101119
// Push adds a new Status to the record, evicting the oldest Status if necessary.
@@ -119,15 +137,19 @@ func (nr *NodeRecorder) Push(st podfingerprint.Status) error {
119137
}
120138

121139
item := RecordedStatus{
122-
Status: st.Clone(),
140+
Status: st,
123141
RecordTime: ts,
124142
}
143+
item.statusSize = item.Size()
125144
if nr.capacity == 1 { // handle common special case, avoid any resize
126145
nr.statuses[0] = item
146+
nr.size = item.Size()
147+
// no maxSize constraint required
127148
return nil
128149
}
129-
nr.makeRoom()
150+
nr.makeRoom(item)
130151
nr.statuses = append(nr.statuses, item)
152+
nr.size += item.Size()
131153
return nil
132154
}
133155

@@ -151,12 +173,18 @@ func (nr *NodeRecorder) IsCoalescing() bool {
151173
return nr.coalesceLast
152174
}
153175

176+
// MaxSize returns the maximum size allowed for nr
177+
func (nr *NodeRecorder) MaxSize() int {
178+
return nr.maxSize
179+
}
180+
154181
// Recorder stores all the recorded statuses, dividing them by node name.
155182
// There is a hard cap of how many nodes are managed, and how many Statuses are recorded per node.
156183
type Recorder struct {
157184
nodes map[string]*NodeRecorder
158185
nodeCapacity int
159186
maxNodes int
187+
maxSize int
160188
timestamper Timestamper
161189
coalesceLast bool
162190
}
@@ -186,7 +214,7 @@ func NewRecorder(opts ...Option) (*Recorder, error) {
186214
return &rec, nil
187215
}
188216

189-
// Cap returns the maximum nodes allowed in this Recorder
217+
// MaxNodes returns the maximum nodes allowed in this Recorder
190218
func (rr *Recorder) MaxNodes() int {
191219
return rr.maxNodes
192220
}
@@ -224,6 +252,11 @@ func (rr *Recorder) IsCoalescing() bool {
224252
return rr.coalesceLast
225253
}
226254

255+
// MaxSize returns the maximum size allowed per NodeRecorder in rr
256+
func (rr *Recorder) MaxSize() int {
257+
return rr.maxSize
258+
}
259+
227260
// Push adds a new Status to the record for its node, evicting the oldest Status
228261
// belonging to the same node if necessary.
229262
// Per-node records are created lazily as needed, up to the configured maximum.
@@ -243,7 +276,7 @@ func (rr *Recorder) Push(st podfingerprint.Status) error {
243276
}
244277

245278
if !ok {
246-
nr, err = NewNodeRecorder(st.NodeName, WithTimestamper(rr.timestamper), WithCapacity(rr.nodeCapacity), WithCoalescing(rr.coalesceLast))
279+
nr, err = NewNodeRecorder(st.NodeName, WithTimestamper(rr.timestamper), WithCapacity(rr.nodeCapacity), WithMaxSize(rr.maxSize), WithCoalescing(rr.coalesceLast))
247280
if err != nil {
248281
return err
249282
}

pkg/pfpstatus/record/recorder_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ package record
1919
import (
2020
"errors"
2121
"fmt"
22+
"io"
2223
"testing"
24+
"time"
2325

2426
"github.com/k8stopologyawareschedwg/podfingerprint"
2527
)
@@ -441,3 +443,91 @@ func TestRecorderWithCoalescing(t *testing.T) {
441443
t.Fatalf("unexpected status count")
442444
}
443445
}
446+
447+
func TestRecorderWithMaxSize(t *testing.T) {
448+
rr, err := NewRecorder(WithMaxNodes(8), WithNodeCapacity(5), WithMaxSizePerNode(1500))
449+
if err != nil {
450+
t.Fatalf("unexpected error: %v", err)
451+
}
452+
453+
if rr.MaxSize() != 1500 {
454+
t.Fatalf("unexpected max size: %d", rr.MaxSize())
455+
}
456+
}
457+
458+
func TestNodeRecorderWithMaxSize(t *testing.T) {
459+
rt := time.Now() // to manage accurate size checks
460+
rtf := func() time.Time {
461+
return rt
462+
}
463+
464+
st := podfingerprint.Status{
465+
FingerprintExpected: "pfp-exp-st1",
466+
FingerprintComputed: "pfp-comp-st1",
467+
NodeName: "node-0",
468+
}
469+
rs := RecordedStatus{
470+
Status: st,
471+
RecordTime: rt,
472+
}
473+
stSize := rs.Size()
474+
maxSize := 2*stSize + 1000
475+
476+
nr, err := NewNodeRecorder("node-0", WithCapacity(5), WithTimestamper(rtf), WithMaxSize(maxSize))
477+
if err != nil {
478+
t.Fatalf("unexpected error: %v", err)
479+
}
480+
if nr.MaxSize() != maxSize {
481+
t.Fatalf("unexpected max size: %d vs %d", nr.MaxSize(), maxSize)
482+
}
483+
484+
for i := 0; i < 2; i++ {
485+
if err := nr.Push(st); err != nil {
486+
t.Fatalf("unexpected error: %v", err)
487+
}
488+
}
489+
490+
stSize += rs.Size()
491+
if nr.Len() != 2 {
492+
t.Fatalf("unexpected length: %d", nr.Len())
493+
}
494+
if nr.size != stSize {
495+
t.Fatalf("unexpected size: %d vs %d", nr.size, stSize)
496+
}
497+
498+
var pods []podfingerprint.NamespacedName
499+
for i := 0; getSize(pods) < maxSize; i++ {
500+
pods = append(pods, podfingerprint.NamespacedName{
501+
Namespace: fmt.Sprintf("pod-name %d", i),
502+
Name: fmt.Sprintf("namespace-name %d", i),
503+
})
504+
}
505+
506+
rs2 := RecordedStatus{
507+
Status: podfingerprint.Status{
508+
FingerprintExpected: "pfp-exp-st2",
509+
FingerprintComputed: "pfp-comp-st2",
510+
Pods: pods,
511+
NodeName: st.NodeName,
512+
},
513+
RecordTime: rt,
514+
}
515+
516+
if err := nr.Push(rs2.Status); err != nil {
517+
t.Fatalf("unexpected error: %v", err)
518+
}
519+
520+
if nr.Len() != 1 {
521+
t.Fatalf("unexpected length: expected 1 found %d", nr.Len())
522+
}
523+
524+
if nr.size != rs2.Size() {
525+
t.Fatalf("unexpected size: expected %d found %d", rs2.Size(), nr.size)
526+
}
527+
}
528+
529+
func getSize(pods []podfingerprint.NamespacedName) int {
530+
byteCount, _ := fmt.Fprint(io.Discard, fmt.Sprintf("%+v\n", pods))
531+
return byteCount
532+
533+
}

0 commit comments

Comments
 (0)