Skip to content

Commit b425a39

Browse files
committed
Refactor the queue to be more idiomatic
1 parent 74446b0 commit b425a39

2 files changed

Lines changed: 58 additions & 65 deletions

File tree

storage/internal/queue.go

Lines changed: 53 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package storage
1818
import (
1919
"context"
2020
"errors"
21-
"sync"
2221
"time"
2322

2423
"github.com/transparency-dev/tessera"
@@ -33,14 +32,7 @@ import (
3332
// queue reaches a defined threshold, the queue will call a provided FlushFunc with
3433
// a slice containing all queued entries in the same order as they were added.
3534
type Queue struct {
36-
maxSize uint
37-
maxAge time.Duration
38-
39-
timer *time.Timer
40-
work chan []queueItem
41-
42-
mu sync.Mutex
43-
items []queueItem
35+
inputs chan queueItem
4436
}
4537

4638
// FlushFunc is the signature of a function which will receive the slice of queued entries.
@@ -57,82 +49,78 @@ type FlushFunc func(ctx context.Context, entries []*tessera.Entry) error
5749
// for maxAge, or the size of the queue reaches maxSize.
5850
func NewQueue(ctx context.Context, maxAge time.Duration, maxSize uint, f FlushFunc) *Queue {
5951
q := &Queue{
60-
maxSize: maxSize,
61-
maxAge: maxAge,
62-
work: make(chan []queueItem, 1),
63-
items: make([]queueItem, 0, maxSize),
52+
inputs: make(chan queueItem, maxSize),
6453
}
54+
batches := make(chan []queueItem, 1)
55+
56+
// Spin off a goroutine which accumulates added items into batches and sends them to the flush worker
57+
// via the batches channel.
58+
go func() {
59+
var items []queueItem
60+
timer := time.NewTimer(0)
61+
flush := func() {
62+
if len(items) == 0 {
63+
return
64+
}
65+
if timer != nil {
66+
timer.Stop()
67+
}
68+
// Send to batches channel. This might block if the worker is busy,
69+
// but that's fine as it applies backpressure to the inputs channel.
70+
select {
71+
case batches <- items:
72+
case <-ctx.Done():
73+
return
74+
}
75+
items = nil
76+
}
6577

66-
// Spin off a worker thread to write the queue flushes to storage.
78+
for {
79+
select {
80+
case <-ctx.Done():
81+
return
82+
case item := <-q.inputs:
83+
items = append(items, item)
84+
if len(items) == 1 {
85+
timer.Reset(maxAge)
86+
}
87+
if len(items) >= int(maxSize) {
88+
flush()
89+
}
90+
case <-timer.C:
91+
flush()
92+
}
93+
}
94+
}()
95+
96+
// Spin off a worker thread to process the flushed batches.
6797
go func(ctx context.Context) {
6898
for {
6999
select {
70100
case <-ctx.Done():
71101
return
72-
case entries := <-q.work:
73-
q.doFlush(ctx, f, entries)
102+
case b := <-batches:
103+
q.doFlush(ctx, f, b)
74104
}
75105
}
76106
}(ctx)
107+
77108
return q
78109
}
79110

80111
// Add places e into the queue, and returns a func which should be called to retrieve the assigned index.
81112
func (q *Queue) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture {
82113
qi := newEntry(e)
83-
84-
q.mu.Lock()
85-
86-
q.items = append(q.items, qi)
87-
88-
// If this is the first item, start the timer.
89-
if len(q.items) == 1 {
90-
q.timer = time.AfterFunc(q.maxAge, q.flush)
91-
}
92-
93-
// If we've reached max size, flush.
94-
var itemsToFlush []queueItem
95-
if len(q.items) >= int(q.maxSize) {
96-
itemsToFlush = q.flushLocked()
97-
}
98-
q.mu.Unlock()
99-
100-
if itemsToFlush != nil {
101-
q.work <- itemsToFlush
114+
select {
115+
case q.inputs <- qi:
116+
case <-ctx.Done():
117+
return func() (tessera.Index, error) {
118+
return tessera.Index{}, ctx.Err()
119+
}
102120
}
103-
104121
return qi.f
105122
}
106123

107-
// flush is called by the timer to flush the buffer.
108-
func (q *Queue) flush() {
109-
q.mu.Lock()
110-
itemsToFlush := q.flushLocked()
111-
q.mu.Unlock()
112-
113-
if itemsToFlush != nil {
114-
q.work <- itemsToFlush
115-
}
116-
}
117-
118-
// flushLocked must be called with q.mu held.
119-
// It prepares items for flushing and returns them.
120-
func (q *Queue) flushLocked() []queueItem {
121-
if len(q.items) == 0 {
122-
return nil
123-
}
124-
125-
if q.timer != nil {
126-
q.timer.Stop()
127-
q.timer = nil
128-
}
129-
130-
itemsToFlush := q.items
131-
q.items = make([]queueItem, 0, q.maxSize)
132-
133-
return itemsToFlush
134-
}
135-
136124
// doFlush handles the queue flush, and sending notifications of assigned log indices.
137125
func (q *Queue) doFlush(ctx context.Context, f FlushFunc, entries []queueItem) {
138126
err := otel.TraceErr(ctx, "tessera.storage.queue.doFlush", tracer, func(ctx context.Context, span trace.Span) error {

storage/internal/queue_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,17 @@ func TestQueue(t *testing.T) {
8383
adds[i] = q.Add(ctx, wantEntries[i])
8484
}
8585

86+
var lastIndex int64 = -1
8687
for i, r := range adds {
8788
N, err := r()
8889
if err != nil {
8990
t.Errorf("Add: %v", err)
9091
return
9192
}
93+
if int64(N.Index) <= lastIndex {
94+
t.Errorf("Out of order index for item %d: got %d, last %d", i, N.Index, lastIndex)
95+
}
96+
lastIndex = int64(N.Index)
9297
if got, want := assignedItems[N.Index].Data(), wantEntries[i].Data(); !reflect.DeepEqual(got, want) {
9398
t.Errorf("Got item@%d %v, want %v", N.Index, got, want)
9499
}

0 commit comments

Comments
 (0)