Skip to content

Commit e2b2f85

Browse files
committed
Refactor the queue to be more idiomatic
1 parent cb55912 commit e2b2f85

2 files changed

Lines changed: 73 additions & 67 deletions

File tree

storage/internal/queue.go

Lines changed: 68 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package storage
1818
import (
1919
"context"
2020
"errors"
21-
"sync"
2221
"time"
2322

2423
"github.com/transparency-dev/tessera"
@@ -33,14 +32,7 @@ import (
3332
// queue reaches a defined threshold, the queue will call a provided FlushFunc with
3433
// a slice containing all queued entries in the same order as they were added.
3534
type Queue struct {
36-
maxSize uint
37-
maxAge time.Duration
38-
39-
timer *time.Timer
40-
work chan []queueItem
41-
42-
mu sync.Mutex
43-
items []queueItem
35+
inputs chan queueItem
4436
}
4537

4638
// FlushFunc is the signature of a function which will receive the slice of queued entries.
@@ -57,82 +49,91 @@ type FlushFunc func(ctx context.Context, entries []*tessera.Entry) error
5749
// for maxAge, or the size of the queue reaches maxSize.
5850
func NewQueue(ctx context.Context, maxAge time.Duration, maxSize uint, f FlushFunc) *Queue {
5951
q := &Queue{
60-
maxSize: maxSize,
61-
maxAge: maxAge,
62-
work: make(chan []queueItem, 1),
63-
items: make([]queueItem, 0, maxSize),
52+
inputs: make(chan queueItem, maxSize),
6453
}
54+
batches := make(chan []queueItem, 1)
55+
56+
// Spin off a goroutine which accumulates added items into batches and sends them to the flush worker
57+
// via the batches channel.
58+
go func() {
59+
defer close(batches)
6560

66-
// Spin off a worker thread to write the queue flushes to storage.
67-
go func(ctx context.Context) {
61+
var items []queueItem
62+
timer := time.NewTimer(time.Hour)
63+
timer.Stop()
64+
65+
flush := func() {
66+
if len(items) == 0 {
67+
return
68+
}
69+
if !timer.Stop() {
70+
// If the timer has already fired, drain it to avoid a stutter.
71+
select {
72+
case <-timer.C:
73+
default:
74+
}
75+
}
76+
// Send to batches channel. This might block if the worker is busy,
77+
// but that's fine as it applies backpressure to the inputs channel.
78+
select {
79+
case batches <- items:
80+
case <-ctx.Done():
81+
return
82+
}
83+
items = nil
84+
}
85+
86+
// Process the incoming items into batches, flushing the batch when either
87+
// the batch is full, or the max age is reached.
88+
for {
89+
select {
90+
case <-ctx.Done():
91+
return
92+
case item := <-q.inputs:
93+
items = append(items, item)
94+
if len(items) == 1 {
95+
timer.Reset(maxAge)
96+
}
97+
if len(items) >= int(maxSize) {
98+
flush()
99+
}
100+
case <-timer.C:
101+
flush()
102+
}
103+
}
104+
}()
105+
106+
// Spin off a worker thread to process the flushed batches.
107+
go func() {
68108
for {
69109
select {
70110
case <-ctx.Done():
71111
return
72-
case entries := <-q.work:
73-
q.doFlush(ctx, f, entries)
112+
case b, ok := <-batches:
113+
if !ok {
114+
return
115+
}
116+
q.doFlush(ctx, f, b)
74117
}
75118
}
76-
}(ctx)
119+
}()
120+
77121
return q
78122
}
79123

80124
// Add places e into the queue, and returns a func which should be called to retrieve the assigned index.
81125
func (q *Queue) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture {
82126
qi := newEntry(e)
83-
84-
q.mu.Lock()
85-
86-
q.items = append(q.items, qi)
87-
88-
// If this is the first item, start the timer.
89-
if len(q.items) == 1 {
90-
q.timer = time.AfterFunc(q.maxAge, q.flush)
91-
}
92-
93-
// If we've reached max size, flush.
94-
var itemsToFlush []queueItem
95-
if len(q.items) >= int(q.maxSize) {
96-
itemsToFlush = q.flushLocked()
97-
}
98-
q.mu.Unlock()
99-
100-
if itemsToFlush != nil {
101-
q.work <- itemsToFlush
127+
select {
128+
case q.inputs <- qi:
129+
case <-ctx.Done():
130+
return func() (tessera.Index, error) {
131+
return tessera.Index{}, ctx.Err()
132+
}
102133
}
103-
104134
return qi.f
105135
}
106136

107-
// flush is called by the timer to flush the buffer.
108-
func (q *Queue) flush() {
109-
q.mu.Lock()
110-
itemsToFlush := q.flushLocked()
111-
q.mu.Unlock()
112-
113-
if itemsToFlush != nil {
114-
q.work <- itemsToFlush
115-
}
116-
}
117-
118-
// flushLocked must be called with q.mu held.
119-
// It prepares items for flushing and returns them.
120-
func (q *Queue) flushLocked() []queueItem {
121-
if len(q.items) == 0 {
122-
return nil
123-
}
124-
125-
if q.timer != nil {
126-
q.timer.Stop()
127-
q.timer = nil
128-
}
129-
130-
itemsToFlush := q.items
131-
q.items = make([]queueItem, 0, q.maxSize)
132-
133-
return itemsToFlush
134-
}
135-
136137
// doFlush handles the queue flush, and sending notifications of assigned log indices.
137138
func (q *Queue) doFlush(ctx context.Context, f FlushFunc, entries []queueItem) {
138139
err := otel.TraceErr(ctx, "tessera.storage.queue.doFlush", tracer, func(ctx context.Context, span trace.Span) error {

storage/internal/queue_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,17 @@ func TestQueue(t *testing.T) {
8383
adds[i] = q.Add(ctx, wantEntries[i])
8484
}
8585

86+
var lastIndex int64 = -1
8687
for i, r := range adds {
8788
N, err := r()
8889
if err != nil {
8990
t.Errorf("Add: %v", err)
9091
return
9192
}
93+
if int64(N.Index) <= lastIndex {
94+
t.Errorf("Out of order index for item %d: got %d, last %d", i, N.Index, lastIndex)
95+
}
96+
lastIndex = int64(N.Index)
9297
if got, want := assignedItems[N.Index].Data(), wantEntries[i].Data(); !reflect.DeepEqual(got, want) {
9398
t.Errorf("Got item@%d %v, want %v", N.Index, got, want)
9499
}

0 commit comments

Comments
 (0)