Skip to content

Commit 040f361

Browse files
committed
review: update eventsstorage comments and test error handling
1 parent fc0c7db commit 040f361

3 files changed

Lines changed: 22 additions & 37 deletions

File tree

server/e2e/e2e_s2_storage_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestS2StorageWriter(t *testing.T) {
5656

5757
checkResp, err := streamClient.CheckTail(ctx)
5858
require.NoError(t, err, "check tail before test")
59-
startSeq := checkResp.SeqNum
59+
startSeq := checkResp.Tail.SeqNum
6060

6161
// Start a capture session.
6262
startResp, err := client.StartCaptureSessionWithResponse(ctx, instanceoapi.StartCaptureSessionJSONRequestBody{})
@@ -81,17 +81,17 @@ func TestS2StorageWriter(t *testing.T) {
8181

8282
// Read records written after the pre-test tail and verify at least one
8383
// envelope is present.
84-
readSession, err := streamClient.ReadSession(ctx, &s2.ReadOptions{
84+
readCtx, readCancel := context.WithTimeout(ctx, 10*time.Second)
85+
defer readCancel()
86+
87+
readSession, err := streamClient.ReadSession(readCtx, &s2.ReadOptions{
8588
SeqNum: s2.Uint64(startSeq),
8689
})
8790
require.NoError(t, err, "open S2 read session")
8891
defer readSession.Close()
8992

90-
readCtx, readCancel := context.WithTimeout(ctx, 10*time.Second)
91-
defer readCancel()
92-
9393
var count int
94-
for readSession.Next(readCtx) {
94+
for readSession.Next() {
9595
count++
9696
}
9797
// EOF is expected once we reach the tail — not an error.

server/lib/events/eventsstorage.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,16 @@ import (
88
"sync/atomic"
99
)
1010

11-
// Storage is the durable storage backend for browser events.
12-
// Append is called serially from StorageWriter.Run and need not be thread-safe.
1311
type Storage interface {
1412
Append(ctx context.Context, env Envelope) error
1513
Close(ctx context.Context) error
1614
}
1715

18-
// StorageWriter drains the ring buffer and forwards each envelope to the
19-
// configured Storage backend. Single-use: call Run once; it blocks until ctx
20-
// is cancelled. After ctx is cancelled, call Drain to flush events that
21-
// arrived before all publishers stopped, then call Close to flush the backend.
22-
//
23-
// Delivery is at-least-once: on process restart the ring is empty so no
24-
// cross-restart duplicates occur, but consumers should dedupe by env.Seq in
25-
// case StorageWriter is ever restarted within a process lifetime.
26-
//
27-
// Starts from the oldest available event in the ring, not the current tail.
16+
// StorageWriter reads from the ring buffer and forwards each envelope to
17+
// Storage. Single-use and not thread-safe: call Run once, then after
18+
// it returns call Drain followed by Close. Reads start from the oldest
19+
// available event in the ring, not the current tail. Delivery is
20+
// at-least-once; consumers should dedupe by env.Seq.
2821
type StorageWriter struct {
2922
reader *Reader
3023
storage Storage

server/lib/events/eventsstorage_writer_test.go

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,8 @@ func TestStorageWriter_NormalAppend(t *testing.T) {
7070

7171
ctx, cancel := context.WithCancel(context.Background())
7272

73-
done := make(chan struct{})
74-
go func() {
75-
defer close(done)
76-
w.Run(ctx) //nolint:errcheck
77-
}()
73+
done := make(chan error, 1)
74+
go func() { done <- w.Run(ctx) }()
7875

7976
env1 := es.Publish(Envelope{Event: makeEvent("test.one")})
8077
env2 := es.Publish(Envelope{Event: makeEvent("test.two")})
@@ -84,7 +81,7 @@ func TestStorageWriter_NormalAppend(t *testing.T) {
8481
}, time.Second, 5*time.Millisecond)
8582

8683
cancel()
87-
<-done
84+
require.ErrorIs(t, <-done, context.Canceled)
8885

8986
got := backend.envelopes()
9087
assert.Equal(t, env1.Seq, got[0].Seq)
@@ -104,18 +101,15 @@ func TestStorageWriter_DroppedEvents(t *testing.T) {
104101
}
105102

106103
ctx, cancel := context.WithCancel(context.Background())
107-
done := make(chan struct{})
108-
go func() {
109-
defer close(done)
110-
w.Run(ctx) //nolint:errcheck
111-
}()
104+
done := make(chan error, 1)
105+
go func() { done <- w.Run(ctx) }()
112106

113107
require.Eventually(t, func() bool {
114108
return len(backend.envelopes()) > 0
115109
}, time.Second, 5*time.Millisecond)
116110

117111
cancel()
118-
<-done
112+
require.ErrorIs(t, <-done, context.Canceled)
119113

120114
// With ring capacity 4 and 8 publishes, the writer must have skipped at
121115
// least 4 events via a drop gap — so fewer than 8 envelopes landed.
@@ -132,11 +126,8 @@ func TestStorageWriter_AppendError(t *testing.T) {
132126
w := NewStorageWriter(es, backend, slog.Default())
133127

134128
ctx, cancel := context.WithCancel(context.Background())
135-
done := make(chan struct{})
136-
go func() {
137-
defer close(done)
138-
w.Run(ctx) //nolint:errcheck
139-
}()
129+
done := make(chan error, 1)
130+
go func() { done <- w.Run(ctx) }()
140131

141132
// Publish an event that will fail. Wait until the writer has attempted it
142133
// (errCount > 0), then clear the error and publish a second event. The
@@ -153,7 +144,7 @@ func TestStorageWriter_AppendError(t *testing.T) {
153144
}, time.Second, 5*time.Millisecond)
154145

155146
cancel()
156-
<-done
147+
require.ErrorIs(t, <-done, context.Canceled)
157148

158149
got := backend.envelopes()
159150
require.Len(t, got, 1)
@@ -198,10 +189,11 @@ func TestStorageWriter_DrainFlushesRingAfterRunExits(t *testing.T) {
198189
cancel()
199190
require.ErrorIs(t, <-done, context.Canceled)
200191

201-
// Drain must flush whatever is left in the ring.
192+
// Drain then Close mirrors the real shutdown sequence.
202193
drainCtx, drainCancel := context.WithTimeout(context.Background(), time.Second)
203194
defer drainCancel()
204195
require.NoError(t, w.Drain(drainCtx))
196+
require.NoError(t, w.Close(drainCtx))
205197

206198
got := backend.envelopes()
207199
// All 8 published events must have been appended across Run + Drain.

0 commit comments

Comments
 (0)