Skip to content

Commit a281bb0

Browse files
committed
WIP: implement maxSize alongside maxCapacity
WIP TBD Signed-off-by: Francesco Romani <fromani@redhat.com>
1 parent 6ca85ba commit a281bb0

2 files changed

Lines changed: 89 additions & 26 deletions

File tree

pkg/pfpstatus/record/recorder.go

Lines changed: 82 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package record
1919
import (
2020
"errors"
2121
"time"
22+
"unsafe"
2223

2324
"github.com/k8stopologyawareschedwg/podfingerprint"
2425
)
@@ -36,6 +37,28 @@ type RecordedStatus struct {
3637
podfingerprint.Status
3738
// RecordTime is a timestamp of when the RecordedStatus was added to the record
3839
RecordTime time.Time `json:"recordTime"`
40+
statusSize int `json:"-"`
41+
}
42+
43+
func cloneStatus(st podfingerprint.Status) (podfingerprint.Status, int) {
44+
size := len(st.NodeName) + len(st.FingerprintExpected) + len(st.FingerprintComputed)
45+
pods := make([]podfingerprint.NamespacedName, len(st.Pods))
46+
for idx := 0; idx < len(st.Pods); idx++ {
47+
pods[idx].Namespace = st.Pods[idx].Namespace
48+
pods[idx].Name = st.Pods[idx].Name
49+
size += len(pods[idx].Namespace) + len(pods[idx].Name)
50+
}
51+
ret := podfingerprint.Status{
52+
FingerprintExpected: st.FingerprintExpected,
53+
FingerprintComputed: st.FingerprintComputed,
54+
Pods: pods,
55+
NodeName: st.NodeName,
56+
}
57+
return ret, size
58+
}
59+
60+
func (rs RecordedStatus) Size() int {
61+
return rs.statusSize + int(unsafe.Sizeof(rs.RecordTime))
3962
}
4063

4164
func (rs RecordedStatus) Equal(x RecordedStatus) bool {
@@ -51,32 +74,72 @@ type NodeRecorder struct {
5174
timestamper func() time.Time
5275
nodeName string // shortcut
5376
capacity int
77+
maxSize int
78+
size int
5479
statuses []RecordedStatus
5580
}
5681

82+
type NodeOption func(*NodeRecorder)
83+
84+
func WithCapacity(capacity int) NodeOption {
85+
return func(nr *NodeRecorder) {
86+
nr.capacity = capacity
87+
}
88+
}
89+
90+
func WithMaxSize(maxSize int) NodeOption {
91+
return func(nr *NodeRecorder) {
92+
nr.maxSize = maxSize
93+
nr.capacity = 1
94+
}
95+
}
96+
5797
// NewNodeRecorder creates a new recorder for the given node with the given capacity.
5898
// The record is a ring buffer, so only the latest <capacity> Statuses are kept at any time.
5999
// The timestamper callback is used to mark times. Use `time.Now` if unsure.
60100
// Returns the newly created instance; if parameters are incorrect, returns an error, on which
61101
// case the returned instance should be ignored.
62-
func NewNodeRecorder(nodeName string, capacity int, timestamper Timestamper) (*NodeRecorder, error) {
102+
func NewNodeRecorder(nodeName string, timestamper Timestamper, opts ...NodeOption) (*NodeRecorder, error) {
63103
if nodeName == "" {
64104
return nil, ErrMissingNode
65105
}
66-
if capacity < 1 {
67-
return nil, ErrInvalidCapacity
68-
}
69106
nr := NodeRecorder{
70107
timestamper: timestamper,
71108
nodeName: nodeName,
72-
capacity: capacity,
73109
}
74-
if capacity == 1 { // handle common special case
110+
for _, opt := range opts {
111+
opt(&nr)
112+
}
113+
if nr.capacity < 1 {
114+
return nil, ErrInvalidCapacity
115+
}
116+
if nr.capacity == 1 { // handle common special case
75117
nr.statuses = make([]RecordedStatus, 1)
76118
}
77119
return &nr, nil
78120
}
79121

122+
func (nr *NodeRecorder) dropOldest() {
123+
if nr.Len() < 1 {
124+
return
125+
}
126+
nr.size -= nr.statuses[0].Size()
127+
nr.statuses = nr.statuses[1:]
128+
}
129+
130+
func (nr *NodeRecorder) makeRoom() {
131+
if nr.capacity > 1 && nr.Len() == nr.Cap() {
132+
nr.dropOldest()
133+
return
134+
}
135+
if nr.maxSize == 0 {
136+
return
137+
}
138+
for nr.size > nr.maxSize {
139+
nr.dropOldest()
140+
}
141+
}
142+
80143
// Push adds a new Status to the record, evicting the oldest Status if necessary.
81144
// The pushed status is a full independent copy of the provided Status.
82145
// If the Status added is inconsistent, returns an error detailing the reason.
@@ -88,21 +151,21 @@ func (nr *NodeRecorder) Push(st podfingerprint.Status) error {
88151
if st.NodeName != nr.nodeName {
89152
return ErrMismatchingNode
90153
}
154+
ts := nr.timestamper()
155+
cloned, size := cloneStatus(st)
156+
item := RecordedStatus{
157+
Status: cloned,
158+
RecordTime: ts,
159+
statusSize: size,
160+
}
91161
if nr.capacity == 1 { // handle common special case, avoid any resize
92-
nr.statuses[0] = RecordedStatus{
93-
Status: st.Clone(),
94-
RecordTime: nr.timestamper(),
95-
}
162+
nr.statuses[0] = item
163+
nr.size = item.Size()
96164
return nil
97165
}
98-
if nr.Len() == nr.Cap() {
99-
// drop the older sample
100-
nr.statuses = nr.statuses[1:]
101-
}
102-
nr.statuses = append(nr.statuses, RecordedStatus{
103-
Status: st.Clone(),
104-
RecordTime: nr.timestamper(),
105-
})
166+
nr.makeRoom()
167+
nr.statuses = append(nr.statuses, item)
168+
nr.size += item.Size()
106169
return nil
107170
}
108171

@@ -200,7 +263,7 @@ func (rr *Recorder) Push(st podfingerprint.Status) error {
200263
var err error
201264
nr, ok := rr.nodes[st.NodeName]
202265
if !ok {
203-
nr, err = NewNodeRecorder(st.NodeName, rr.nodeCapacity, rr.timestamper)
266+
nr, err = NewNodeRecorder(st.NodeName, rr.timestamper, WithCapacity(rr.nodeCapacity))
204267
if err != nil {
205268
return err
206269
}

pkg/pfpstatus/record/recorder_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
)
2727

2828
func TestNodeRecorderCreate(t *testing.T) {
29-
nr, err := NewNodeRecorder("test-node", 16, time.Now)
29+
nr, err := NewNodeRecorder("test-node", time.Now, WithCapacity(16))
3030
if err != nil {
3131
t.Fatalf("unexpected error: %v", err)
3232
}
@@ -40,7 +40,7 @@ func TestNodeRecorderCreate(t *testing.T) {
4040
}
4141

4242
func TestNodeRecorderCreateInvalid(t *testing.T) {
43-
_, err := NewNodeRecorder("test-node", 0, time.Now)
43+
_, err := NewNodeRecorder("test-node", time.Now)
4444
if err == nil {
4545
t.Fatalf("unexpected success")
4646
}
@@ -50,7 +50,7 @@ func TestNodeRecorderCreateInvalid(t *testing.T) {
5050
}
5151

5252
func TestNodeRecorderPushMissingNodeName(t *testing.T) {
53-
nr, err := NewNodeRecorder("test-node", 16, time.Now)
53+
nr, err := NewNodeRecorder("test-node", time.Now, WithCapacity(16))
5454
if err != nil {
5555
t.Fatalf("unexpected error: %v", err)
5656
}
@@ -66,7 +66,7 @@ func TestNodeRecorderPushMissingNodeName(t *testing.T) {
6666
}
6767

6868
func TestNodeRecorderPushMismatchingNodeName(t *testing.T) {
69-
nr, err := NewNodeRecorder("test-node", 16, time.Now)
69+
nr, err := NewNodeRecorder("test-node", time.Now, WithCapacity(16))
7070
if err != nil {
7171
t.Fatalf("unexpected error: %v", err)
7272
}
@@ -82,7 +82,7 @@ func TestNodeRecorderPushMismatchingNodeName(t *testing.T) {
8282
}
8383

8484
func TestNodeRecorderPushBasic(t *testing.T) {
85-
nr, err := NewNodeRecorder("test-node", 16, time.Now)
85+
nr, err := NewNodeRecorder("test-node", time.Now, WithCapacity(16))
8686
if err != nil {
8787
t.Fatalf("unexpected error: %v", err)
8888
}
@@ -98,7 +98,7 @@ func TestNodeRecorderPushBasic(t *testing.T) {
9898
}
9999

100100
func TestNodeRecorderSingleEntryPushEvict(t *testing.T) {
101-
nr, err := NewNodeRecorder("test-node", 1, time.Now)
101+
nr, err := NewNodeRecorder("test-node", time.Now, WithCapacity(1))
102102
if err != nil {
103103
t.Fatalf("unexpected error: %v", err)
104104
}
@@ -132,7 +132,7 @@ func TestNodeRecorderSingleEntryPushEvict(t *testing.T) {
132132
}
133133

134134
func TestNodeRecorderMultiEntryPushContentEvict(t *testing.T) {
135-
nr, err := NewNodeRecorder("test-node", 3, time.Now)
135+
nr, err := NewNodeRecorder("test-node", time.Now, WithCapacity(3))
136136
if err != nil {
137137
t.Fatalf("unexpected error: %v", err)
138138
}

0 commit comments

Comments
 (0)