Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions keyviz/flusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package keyviz

import (
"context"
"time"
)

// RunFlusher drives Sampler.Flush at the supplied interval until ctx
// is cancelled. Returns when ctx fires; the final tick is not
// executed (a graceful shutdown should call Sampler.Flush once more
// after RunFlusher returns if it wants to harvest the in-progress
// step).
//
// step <= 0 falls back to DefaultStep.
//
// This is a tiny wrapper so call sites in main.go don't need to spell
// out the ticker boilerplate; testing the boilerplate is the unit test
// for this package, not for callers.
func RunFlusher(ctx context.Context, s *MemSampler, step time.Duration) {
if s == nil {
<-ctx.Done()
return
}
if step <= 0 {
step = DefaultStep
}
t := time.NewTicker(step)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
s.Flush()
}
}
}
57 changes: 57 additions & 0 deletions keyviz/flusher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package keyviz

import (
"context"
"testing"
"time"
)

func TestRunFlusherTicksUntilCancel(t *testing.T) {
t.Parallel()
s := NewMemSampler(MemSamplerOptions{Step: 5 * time.Millisecond, HistoryColumns: 16})
if !s.RegisterRoute(1, []byte("a"), []byte("b")) {
t.Fatal("Register failed")
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
defer close(done)
RunFlusher(ctx, s, 5*time.Millisecond)
}()

// Drive Observe across ticker firings.
for i := 0; i < 10; i++ {
s.Observe(1, OpRead, 0, 0)
time.Sleep(2 * time.Millisecond)
}
cancel()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("RunFlusher did not return after cancel")
}

cols := s.Snapshot(time.Time{}, time.Time{})
if len(cols) == 0 {
t.Fatal("expected at least one column from background flushes")
}
}

// TestRunFlusherNilSamplerWaitsCtx asserts the nil-sampler contract
// (RunFlusher just blocks on ctx.Done so callers can hard-wire it
// regardless of whether keyviz is enabled).
func TestRunFlusherNilSamplerWaitsCtx(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
defer close(done)
RunFlusher(ctx, nil, time.Millisecond)
}()
cancel()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("RunFlusher(nil) did not return on cancel")
}
}
99 changes: 99 additions & 0 deletions keyviz/ring_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package keyviz

import (
"sort"
"time"
)

// ringBuffer is a fixed-capacity circular buffer of MatrixColumn,
// oldest-first. Push drops the oldest entry once cap is reached.
// Range returns a half-open [from, to) slice in chronological order.
//
// The buffer is not goroutine-safe by itself; callers (MemSampler)
// guard it with historyMu.
type ringBuffer struct {
cap int
buf []MatrixColumn // logical-index ordered; head is the oldest
wrapped bool
pos int // next write index when wrapped == false
}

func newRingBuffer(cap int) *ringBuffer {
if cap < 1 {
cap = 1
}
return &ringBuffer{cap: cap, buf: make([]MatrixColumn, 0, cap)}
}

// Push appends a column. Once length reaches cap the oldest entry is
// dropped (overwritten in place).
func (r *ringBuffer) Push(col MatrixColumn) {
if !r.wrapped {
r.buf = append(r.buf, col)
if len(r.buf) >= r.cap {
r.wrapped = true
r.pos = 0
}
return
}
r.buf[r.pos] = col
r.pos = (r.pos + 1) % r.cap
}

// Range returns the columns whose At falls in [from, to), oldest
// first. Either bound may be the zero Time, meaning unbounded on that
// side. The returned slice is a deep copy: each column has its own
// Rows slice and each row owns its Start/End byte slices, so callers
// may mutate any field without corrupting stored history or racing
// with concurrent flushes.
func (r *ringBuffer) Range(from, to time.Time) []MatrixColumn {
all := r.snapshotOrdered()
// snapshotOrdered is already chronologically ordered.
lo := 0
if !from.IsZero() {
lo = sort.Search(len(all), func(i int) bool {
return !all[i].At.Before(from)
})
}
hi := len(all)
if !to.IsZero() {
hi = sort.Search(len(all), func(i int) bool {
return !all[i].At.Before(to)
})
}
if lo > hi {
return nil
}
out := make([]MatrixColumn, hi-lo)
for i, src := range all[lo:hi] {
out[i] = cloneColumn(src)
}
return out
}

// cloneColumn returns a deep copy of col: a fresh Rows slice with
// each row's Start/End and MemberRoutes independently allocated.
func cloneColumn(col MatrixColumn) MatrixColumn {
rows := make([]MatrixRow, len(col.Rows))
for i, row := range col.Rows {
rows[i] = row
rows[i].Start = cloneBytes(row.Start)
rows[i].End = cloneBytes(row.End)
if len(row.MemberRoutes) > 0 {
rows[i].MemberRoutes = append([]uint64(nil), row.MemberRoutes...)
}
}
return MatrixColumn{At: col.At, Rows: rows}
}

// snapshotOrdered returns a chronologically ordered (oldest first)
// view of the buffer contents in a fresh slice.
func (r *ringBuffer) snapshotOrdered() []MatrixColumn {
if !r.wrapped {
return append([]MatrixColumn(nil), r.buf...)
}
out := make([]MatrixColumn, 0, r.cap)
out = append(out, r.buf[r.pos:]...)
out = append(out, r.buf[:r.pos]...)
return out
}
Loading
Loading