Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions pkg/pfpstatus/record/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package record
import (
"errors"
"time"
"unsafe"

"github.com/k8stopologyawareschedwg/podfingerprint"
)
Expand All @@ -36,6 +37,28 @@ type RecordedStatus struct {
podfingerprint.Status
// RecordTime is a timestamp of when the RecordedStatus was added to the record
RecordTime time.Time `json:"recordTime"`
statusSize int `json:"-"`
}

func cloneStatus(st podfingerprint.Status) (podfingerprint.Status, int) {
size := len(st.NodeName) + len(st.FingerprintExpected) + len(st.FingerprintComputed)
pods := make([]podfingerprint.NamespacedName, len(st.Pods))
for idx := 0; idx < len(st.Pods); idx++ {
pods[idx].Namespace = st.Pods[idx].Namespace
pods[idx].Name = st.Pods[idx].Name
size += len(pods[idx].Namespace) + len(pods[idx].Name)
}
ret := podfingerprint.Status{
FingerprintExpected: st.FingerprintExpected,
FingerprintComputed: st.FingerprintComputed,
Pods: pods,
NodeName: st.NodeName,
}
return ret, size
}

func (rs RecordedStatus) Size() int {
return rs.statusSize + int(unsafe.Sizeof(rs.RecordTime))
}

func (rs RecordedStatus) Equal(x RecordedStatus) bool {
Expand All @@ -51,6 +74,8 @@ type NodeRecorder struct {
timestamper func() time.Time
nodeName string // shortcut
capacity int
maxSize int
size int
statuses []RecordedStatus
}

Expand All @@ -62,6 +87,13 @@ func WithCapacity(capacity int) NodeOption {
}
}

func WithMaxSize(maxSize int) NodeOption {
return func(nr *NodeRecorder) {
nr.maxSize = maxSize
nr.capacity = 1
}
}

// NewNodeRecorder creates a new recorder for the given node with the given capacity.
// The record is a ring buffer, so only the latest <capacity> Statuses are kept at any time.
// The timestamper callback is used to mark times. Use `time.Now` if unsure.
Expand Down Expand Up @@ -91,14 +123,21 @@ func (nr *NodeRecorder) dropOldest() {
if nr.Len() < 1 {
return
}
nr.size -= nr.statuses[0].Size()
nr.statuses = nr.statuses[1:]
}

func (nr *NodeRecorder) makeRoom() {
if nr.Len() < nr.Cap() {
if nr.capacity > 1 && nr.Len() == nr.Cap() {
nr.dropOldest()
return
}
nr.dropOldest()
if nr.maxSize == 0 {
return
}
for nr.size > nr.maxSize {
nr.dropOldest()
}
}

// Push adds a new Status to the record, evicting the oldest Status if necessary.
Expand All @@ -113,16 +152,20 @@ func (nr *NodeRecorder) Push(st podfingerprint.Status) error {
return ErrMismatchingNode
}
ts := nr.timestamper()
cloned, size := cloneStatus(st)
item := RecordedStatus{
Status: st.Clone(),
Status: cloned,
RecordTime: ts,
statusSize: size,
}
if nr.capacity == 1 { // handle common special case, avoid any resize
nr.statuses[0] = item
nr.size = item.Size()
return nil
}
nr.makeRoom()
nr.statuses = append(nr.statuses, item)
nr.size += item.Size()
return nil
}

Expand Down