Skip to content

Commit c6467b3

Browse files
committed
review: remove DroppedFrom/DroppedTo fields and trim redundant comments
1 parent 040f361 commit c6467b3

3 files changed

Lines changed: 8 additions & 17 deletions

File tree

server/lib/events/eventsstorage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func (w *StorageWriter) Drain(ctx context.Context) error {
8080

8181
func (w *StorageWriter) processResult(ctx context.Context, res ReadResult) error {
8282
if res.Dropped > 0 {
83-
w.log.Warn("storage writer: dropped events", "count", res.Dropped, "from_seq", res.DroppedFrom, "to_seq", res.DroppedTo)
83+
w.log.Warn("storage writer: dropped events", "count", res.Dropped)
8484
return nil
8585
}
8686
if err := w.storage.Append(ctx, *res.Envelope); err != nil {

server/lib/events/ringbuffer.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,10 @@ func (rb *ringBuffer) newReader(afterSeq uint64) *Reader {
6868

6969
// ReadResult is returned by Reader.Read. Exactly one of Envelope or Dropped is
7070
// set: Envelope is non-nil for a normal read, Dropped is non-zero when the
71-
// reader fell behind and events were lost. When Dropped > 0, DroppedFrom and
72-
// DroppedTo are the inclusive seq range of the dropped events.
71+
// reader fell behind and events were lost.
7372
type ReadResult struct {
74-
Envelope *Envelope
75-
Dropped uint64
76-
DroppedFrom uint64 // first seq of the dropped range (only valid when Dropped > 0)
77-
DroppedTo uint64 // last seq of the dropped range (only valid when Dropped > 0)
73+
Envelope *Envelope
74+
Dropped uint64
7875
}
7976

8077
// Reader tracks an independent read position in a ringBuffer.
@@ -98,10 +95,9 @@ func (r *Reader) TryRead() (ReadResult, bool) {
9895
}
9996

10097
if r.nextSeq < oldest {
101-
from := r.nextSeq
10298
dropped := oldest - r.nextSeq
10399
r.nextSeq = oldest
104-
return ReadResult{Dropped: dropped, DroppedFrom: from, DroppedTo: oldest - 1}, true
100+
return ReadResult{Dropped: dropped}, true
105101
}
106102

107103
env := r.rb.buf[r.nextSeq%r.rb.cap]
@@ -131,11 +127,10 @@ func (r *Reader) Read(ctx context.Context) (ReadResult, error) {
131127
}
132128

133129
if r.nextSeq < oldest {
134-
from := r.nextSeq
135130
dropped := oldest - r.nextSeq
136131
r.nextSeq = oldest
137132
r.rb.mu.RUnlock()
138-
return ReadResult{Dropped: dropped, DroppedFrom: from, DroppedTo: oldest - 1}, nil
133+
return ReadResult{Dropped: dropped}, nil
139134
}
140135

141136
if r.nextSeq <= latest {

server/lib/events/s2storage.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/s2-streamstore/s2-sdk-go/s2"
1212
)
1313

14-
// S2Config holds batcher tuning parameters for the S2 backend.
1514
type S2Config struct {
1615
// BatcherLinger is how long the batcher waits before flushing (default: 100ms).
1716
BatcherLinger time.Duration
@@ -38,16 +37,14 @@ func (sp *s2Producer) close(ctx context.Context) error {
3837
return sp.p.Close()
3938
}
4039

41-
// S2Storage is a Storage backed by S2. All events are appended to a single
42-
// fixed stream whose name is provided at construction time.
40+
// S2Storage appends all events to a single fixed stream set at construction time.
4341
type S2Storage struct {
4442
producer s2Producer
4543
shutdownCh chan struct{} // closed when Close is called, bounds ack goroutine contexts
4644
log *slog.Logger
4745
}
4846

49-
// NewS2Storage creates an S2Storage that appends to the given stream within basin.
50-
// ctx is used for AppendSession creation and should be the process lifetime context.
47+
// ctx is used for AppendSession creation and must be the process lifetime context.
5148
func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cfg S2Config, log *slog.Logger) (*S2Storage, error) {
5249
if basin == "" || accessToken == "" || streamName == "" {
5350
return nil, fmt.Errorf("s2storage: basin, accessToken, and streamName are required")
@@ -74,7 +71,6 @@ func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cf
7471
}, nil
7572
}
7673

77-
// Append marshals env to JSON and submits it to the S2 producer.
7874
func (s *S2Storage) Append(_ context.Context, env Envelope) error {
7975
data, err := json.Marshal(env)
8076
if err != nil {

0 commit comments

Comments
 (0)