Skip to content

Commit 2d5c8ed

Browse files
committed
Refactor the queue to be more idiomatic
1 parent 74446b0 commit 2d5c8ed

2 files changed

Lines changed: 64 additions & 65 deletions

File tree

storage/internal/queue.go

Lines changed: 59 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package storage
1818
import (
1919
"context"
2020
"errors"
21-
"sync"
2221
"time"
2322

2423
"github.com/transparency-dev/tessera"
@@ -33,14 +32,7 @@ import (
3332
// queue reaches a defined threshold, the queue will call a provided FlushFunc with
3433
// a slice containing all queued entries in the same order as they were added.
3534
type Queue struct {
36-
maxSize uint
37-
maxAge time.Duration
38-
39-
timer *time.Timer
40-
work chan []queueItem
41-
42-
mu sync.Mutex
43-
items []queueItem
35+
inputs chan queueItem
4436
}
4537

4638
// FlushFunc is the signature of a function which will receive the slice of queued entries.
@@ -57,82 +49,84 @@ type FlushFunc func(ctx context.Context, entries []*tessera.Entry) error
5749
// for maxAge, or the size of the queue reaches maxSize.
5850
func NewQueue(ctx context.Context, maxAge time.Duration, maxSize uint, f FlushFunc) *Queue {
5951
q := &Queue{
60-
maxSize: maxSize,
61-
maxAge: maxAge,
62-
work: make(chan []queueItem, 1),
63-
items: make([]queueItem, 0, maxSize),
52+
inputs: make(chan queueItem, maxSize),
6453
}
54+
batches := make(chan []queueItem, 1)
55+
56+
// Spin off a goroutine which accumulates added items into batches and sends them to the flush worker
57+
// via the batches channel.
58+
go func() {
59+
var items []queueItem
60+
timer := time.NewTimer(0)
61+
flush := func() {
62+
if len(items) == 0 {
63+
return
64+
}
65+
if timer != nil {
66+
if !timer.Stop() {
67+
// If the timer has already fired, drain it to avoid a stutter.
68+
select {
69+
case <-timer.C:
70+
default:
71+
}
72+
}
73+
}
74+
// Send to batches channel. This might block if the worker is busy,
75+
// but that's fine as it applies backpressure to the inputs channel.
76+
select {
77+
case batches <- items:
78+
case <-ctx.Done():
79+
return
80+
}
81+
items = nil
82+
}
6583

66-
// Spin off a worker thread to write the queue flushes to storage.
84+
for {
85+
select {
86+
case <-ctx.Done():
87+
return
88+
case item := <-q.inputs:
89+
items = append(items, item)
90+
if len(items) == 1 {
91+
timer.Reset(maxAge)
92+
}
93+
if len(items) >= int(maxSize) {
94+
flush()
95+
}
96+
case <-timer.C:
97+
flush()
98+
}
99+
}
100+
}()
101+
102+
// Spin off a worker thread to process the flushed batches.
67103
go func(ctx context.Context) {
68104
for {
69105
select {
70106
case <-ctx.Done():
71107
return
72-
case entries := <-q.work:
73-
q.doFlush(ctx, f, entries)
108+
case b := <-batches:
109+
q.doFlush(ctx, f, b)
74110
}
75111
}
76112
}(ctx)
113+
77114
return q
78115
}
79116

80117
// Add places e into the queue, and returns a func which should be called to retrieve the assigned index.
81118
func (q *Queue) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture {
82119
qi := newEntry(e)
83-
84-
q.mu.Lock()
85-
86-
q.items = append(q.items, qi)
87-
88-
// If this is the first item, start the timer.
89-
if len(q.items) == 1 {
90-
q.timer = time.AfterFunc(q.maxAge, q.flush)
91-
}
92-
93-
// If we've reached max size, flush.
94-
var itemsToFlush []queueItem
95-
if len(q.items) >= int(q.maxSize) {
96-
itemsToFlush = q.flushLocked()
97-
}
98-
q.mu.Unlock()
99-
100-
if itemsToFlush != nil {
101-
q.work <- itemsToFlush
120+
select {
121+
case q.inputs <- qi:
122+
case <-ctx.Done():
123+
return func() (tessera.Index, error) {
124+
return tessera.Index{}, ctx.Err()
125+
}
102126
}
103-
104127
return qi.f
105128
}
106129

107-
// flush is called by the timer to flush the buffer.
108-
func (q *Queue) flush() {
109-
q.mu.Lock()
110-
itemsToFlush := q.flushLocked()
111-
q.mu.Unlock()
112-
113-
if itemsToFlush != nil {
114-
q.work <- itemsToFlush
115-
}
116-
}
117-
118-
// flushLocked must be called with q.mu held.
119-
// It prepares items for flushing and returns them.
120-
func (q *Queue) flushLocked() []queueItem {
121-
if len(q.items) == 0 {
122-
return nil
123-
}
124-
125-
if q.timer != nil {
126-
q.timer.Stop()
127-
q.timer = nil
128-
}
129-
130-
itemsToFlush := q.items
131-
q.items = make([]queueItem, 0, q.maxSize)
132-
133-
return itemsToFlush
134-
}
135-
136130
// doFlush handles the queue flush, and sending notifications of assigned log indices.
137131
func (q *Queue) doFlush(ctx context.Context, f FlushFunc, entries []queueItem) {
138132
err := otel.TraceErr(ctx, "tessera.storage.queue.doFlush", tracer, func(ctx context.Context, span trace.Span) error {

storage/internal/queue_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,17 @@ func TestQueue(t *testing.T) {
8383
adds[i] = q.Add(ctx, wantEntries[i])
8484
}
8585

86+
var lastIndex int64 = -1
8687
for i, r := range adds {
8788
N, err := r()
8889
if err != nil {
8990
t.Errorf("Add: %v", err)
9091
return
9192
}
93+
if int64(N.Index) <= lastIndex {
94+
t.Errorf("Out of order index for item %d: got %d, last %d", i, N.Index, lastIndex)
95+
}
96+
lastIndex = int64(N.Index)
9297
if got, want := assignedItems[N.Index].Data(), wantEntries[i].Data(); !reflect.DeepEqual(got, want) {
9398
t.Errorf("Got item@%d %v, want %v", N.Index, got, want)
9499
}

0 commit comments

Comments
 (0)