Skip to content

Commit cc1d744

Browse files
committed
fix: drain ring after HTTP shutdown to prevent data loss on exit
Two bugs combined caused silent event loss on shutdown: 1. StorageWriter.Run exited immediately on ctx cancellation with no drain. 2. HTTP servers were shut down after waiting on storageDone, so new events could arrive in the ring after Run had already exited. Fix: - Add Reader.TryRead for non-blocking ring reads. - Add StorageWriter.Drain(ctx) which flushes remaining ring contents after all publishers have stopped, with a bounded 5s deadline. - Reorder main.go shutdown: HTTP servers (and all publishers) stop first, then Run exits, then Drain flushes the remaining tail, then Close flushes the S2 batcher to the network. - Document at-least-once delivery and dedupe-by-seq contract on StorageWriter.
1 parent 9df3e6c commit cc1d744

4 files changed

Lines changed: 123 additions & 18 deletions

File tree

server/cmd/api/main.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -271,15 +271,9 @@ func main() {
271271
<-ctx.Done()
272272
slogger.Info("shutdown signal received")
273273

274-
// Wait for the storage writer to drain from the ring, then flush S2 before
275-
// shutting down the HTTP servers and closing the capture session.
276-
<-storageDone
277-
if storageWriter != nil {
278-
if err := storageWriter.Close(); err != nil {
279-
slogger.Error("storage writer close failed", "err", err)
280-
}
281-
}
282-
274+
// Step 1: shut down all HTTP servers and stop all event publishers (cdpmonitor,
275+
// captureSession) before draining the ring. This bounds the set of events that
276+
// can arrive after Run exits so Drain sees a stable tail.
283277
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
284278
defer shutdownCancel()
285279
g, _ := errgroup.WithContext(shutdownCtx)
@@ -301,6 +295,20 @@ func main() {
301295
if err := g.Wait(); err != nil {
302296
slogger.Error("server failed to shutdown", "err", err)
303297
}
298+
299+
// Step 2: wait for Run to return (it exits on ctx cancellation), then drain any
300+
// events that arrived between the last Read and HTTP shutdown, then flush S2.
301+
<-storageDone
302+
if storageWriter != nil {
303+
drainCtx, drainCancel := context.WithTimeout(context.Background(), 5*time.Second)
304+
defer drainCancel()
305+
if err := storageWriter.Drain(drainCtx); err != nil {
306+
slogger.Warn("storage writer drain incomplete", "err", err)
307+
}
308+
if err := storageWriter.Close(); err != nil {
309+
slogger.Error("storage writer close failed", "err", err)
310+
}
311+
}
304312
}
305313

306314
func mustFFmpeg() {

server/lib/events/eventsstorage.go

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,13 @@ type Storage interface {
1616

1717
// StorageWriter drains the ring buffer and forwards each envelope to the
1818
// configured Storage backend. Single-use: call Run once; it blocks until ctx
19-
// is cancelled. Call Close after Run returns to flush in-flight writes.
19+
// is cancelled. After ctx is cancelled, call Drain to flush events that
20+
// arrived before all publishers stopped, then call Close to flush the backend.
21+
//
22+
// Delivery is at-least-once: on process restart the ring is empty so no
23+
// cross-restart duplicates occur, but consumers should dedupe by env.Seq in
24+
// case StorageWriter is ever restarted within a process lifetime.
25+
//
2026
// Starts from the oldest available event in the ring, not the current tail.
2127
type StorageWriter struct {
2228
reader *Reader
@@ -36,8 +42,8 @@ func NewStorageWriter(es *EventStream, storage Storage, log *slog.Logger) *Stora
3642
}
3743

3844
// Run reads from the ring buffer and appends each envelope to storage until
39-
// ctx is cancelled. Returns ctx.Err() on clean shutdown. Must be called at
40-
// most once; panics on a second call.
45+
// ctx is cancelled. Returns the context error on clean shutdown. Must be
46+
// called at most once; panics on a second call.
4147
func (w *StorageWriter) Run(ctx context.Context) error {
4248
firstCall := false
4349
w.once.Do(func() { firstCall = true })
@@ -50,15 +56,44 @@ func (w *StorageWriter) Run(ctx context.Context) error {
5056
if err != nil {
5157
return err
5258
}
53-
if res.Dropped > 0 {
54-
w.log.Warn("storage writer: dropped events", "count", res.Dropped, "from_seq", res.DroppedFrom, "to_seq", res.DroppedTo)
55-
continue
59+
if err := w.processResult(ctx, res); err != nil {
60+
return err
5661
}
57-
if err := w.storage.Append(ctx, *res.Envelope); err != nil {
58-
total := w.appendErrors.Add(1)
59-
w.log.Error("storage writer: append failed", "seq", res.Envelope.Seq, "err", err, "total_append_errors", total)
62+
}
63+
}
64+
65+
// Drain reads any events still in the ring non-blockingly until caught up or
66+
// ctx expires. Call after all publishers have stopped and Run has returned to
67+
// ensure no events are silently skipped on shutdown.
68+
func (w *StorageWriter) Drain(ctx context.Context) error {
69+
for {
70+
select {
71+
case <-ctx.Done():
72+
w.log.Warn("storage writer: drain deadline exceeded, ring may have unread events")
73+
return ctx.Err()
74+
default:
75+
}
76+
77+
res, ok := w.reader.TryRead()
78+
if !ok {
79+
return nil
6080
}
81+
if err := w.processResult(ctx, res); err != nil {
82+
return err
83+
}
84+
}
85+
}
86+
87+
func (w *StorageWriter) processResult(ctx context.Context, res ReadResult) error {
88+
if res.Dropped > 0 {
89+
w.log.Warn("storage writer: dropped events", "count", res.Dropped, "from_seq", res.DroppedFrom, "to_seq", res.DroppedTo)
90+
return nil
91+
}
92+
if err := w.storage.Append(ctx, *res.Envelope); err != nil {
93+
total := w.appendErrors.Add(1)
94+
w.log.Error("storage writer: append failed", "seq", res.Envelope.Seq, "err", err, "total_append_errors", total)
6195
}
96+
return nil
6297
}
6398

6499
// AppendErrors returns the total number of Append failures since Run started.

server/lib/events/eventsstorage_writer_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,3 +171,39 @@ func TestStorageWriter_ContextCancelled(t *testing.T) {
171171
err := w.Run(ctx)
172172
assert.ErrorIs(t, err, context.Canceled)
173173
}
174+
175+
// TestStorageWriter_DrainFlushesRingAfterRunExits verifies that events
176+
// published before Drain is called are not lost even after Run has returned.
177+
func TestStorageWriter_DrainFlushesRingAfterRunExits(t *testing.T) {
178+
es := newTestStream(t, 64)
179+
backend := &mockBackend{}
180+
w := NewStorageWriter(es, backend, slog.Default())
181+
182+
// Publish events before Run starts so the ring is non-empty.
183+
for range 5 {
184+
es.Publish(Envelope{Event: Event{Type: "pre"}})
185+
}
186+
187+
ctx, cancel := context.WithCancel(context.Background())
188+
done := make(chan error, 1)
189+
go func() { done <- w.Run(ctx) }()
190+
191+
// Let Run consume some events, then cancel.
192+
time.Sleep(20 * time.Millisecond)
193+
194+
// Publish more events that may arrive while Run is winding down.
195+
for range 3 {
196+
es.Publish(Envelope{Event: Event{Type: "post"}})
197+
}
198+
cancel()
199+
require.ErrorIs(t, <-done, context.Canceled)
200+
201+
// Drain must flush whatever is left in the ring.
202+
drainCtx, drainCancel := context.WithTimeout(context.Background(), time.Second)
203+
defer drainCancel()
204+
require.NoError(t, w.Drain(drainCtx))
205+
206+
got := backend.envelopes()
207+
// All 8 published events must have been appended across Run + Drain.
208+
assert.Len(t, got, 8)
209+
}

server/lib/events/ringbuffer.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,32 @@ type Reader struct {
8383
nextSeq uint64
8484
}
8585

86+
// TryRead returns the next available result without blocking. Returns
87+
// (result, true) if data is available, (ReadResult{}, false) if the reader
88+
// has caught up to the latest published seq.
89+
func (r *Reader) TryRead() (ReadResult, bool) {
90+
r.rb.mu.RLock()
91+
defer r.rb.mu.RUnlock()
92+
93+
latest := r.rb.latestSeq
94+
oldest := r.rb.oldestSeq()
95+
96+
if latest == 0 || r.nextSeq > latest {
97+
return ReadResult{}, false
98+
}
99+
100+
if r.nextSeq < oldest {
101+
from := r.nextSeq
102+
dropped := oldest - r.nextSeq
103+
r.nextSeq = oldest
104+
return ReadResult{Dropped: dropped, DroppedFrom: from, DroppedTo: oldest - 1}, true
105+
}
106+
107+
env := r.rb.buf[r.nextSeq%r.rb.cap]
108+
r.nextSeq++
109+
return ReadResult{Envelope: &env}, true
110+
}
111+
86112
// Read blocks until the next envelope is available or ctx is cancelled.
87113
func (r *Reader) Read(ctx context.Context) (ReadResult, error) {
88114
for {

0 commit comments

Comments
 (0)