Skip to content

Commit c9e555a

Browse files
committed
review: bound p.Close() with ctx, give Close its own deadline, collapse ack goroutines to one, count ack errors
1 parent 3f9550f commit c9e555a

2 files changed

Lines changed: 44 additions & 25 deletions

File tree

server/cmd/api/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,10 @@ func main() {
303303
if err := storageWriter.Drain(drainCtx); err != nil {
304304
slogger.Warn("storage writer drain incomplete", "err", err)
305305
}
306-
if err := storageWriter.Close(drainCtx); err != nil {
306+
307+
closeCtx, closeCancel := context.WithTimeout(context.Background(), 5*time.Second)
308+
defer closeCancel()
309+
if err := storageWriter.Close(closeCtx); err != nil {
307310
slogger.Error("storage writer close failed", "err", err)
308311
}
309312
}

server/lib/events/s2storage.go

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"log/slog"
99
"sync"
10+
"sync/atomic"
1011
"time"
1112

1213
"github.com/s2-streamstore/s2-sdk-go/s2"
@@ -36,17 +37,28 @@ func (sp *s2Producer) close(ctx context.Context) error {
3637
case <-ctx.Done():
3738
drainErr = ctx.Err()
3839
}
39-
return errors.Join(drainErr, sp.p.Close())
40+
closeDone := make(chan error, 1)
41+
go func() { closeDone <- sp.p.Close() }()
42+
select {
43+
case err := <-closeDone:
44+
return errors.Join(drainErr, err)
45+
case <-ctx.Done():
46+
return errors.Join(drainErr, ctx.Err())
47+
}
4048
}
4149

4250
// S2Storage appends all events to a single fixed stream set at construction time.
4351
type S2Storage struct {
44-
producer s2Producer
45-
shutdownCh chan struct{} // closed when Close is called, bounds ack goroutine contexts
46-
log *slog.Logger
52+
producer s2Producer
53+
shutdownCtx context.Context
54+
shutdownCancel context.CancelFunc
55+
closeOnce sync.Once
56+
ackErrors atomic.Uint64
57+
log *slog.Logger
4758
}
4859

49-
// ctx is used for AppendSession creation and must be the process lifetime context.
60+
// ctx controls AppendSession creation; pass context.Background() so the pipeline
61+
// outlives signal cancellation and can be explicitly flushed via Close.
5062
func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cfg S2Config, log *slog.Logger) (*S2Storage, error) {
5163
if basin == "" || accessToken == "" || streamName == "" {
5264
return nil, fmt.Errorf("s2storage: basin, accessToken, and streamName are required")
@@ -72,14 +84,20 @@ func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cf
7284
})
7385
producer := s2.NewProducer(ctx, batcher, session)
7486

87+
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
7588
return &S2Storage{
76-
producer: s2Producer{p: producer},
77-
shutdownCh: make(chan struct{}),
78-
log: log,
89+
producer: s2Producer{p: producer},
90+
shutdownCtx: shutdownCtx,
91+
shutdownCancel: shutdownCancel,
92+
log: log,
7993
}, nil
8094
}
8195

82-
func (s *S2Storage) Append(_ context.Context, env Envelope) error {
96+
func (s *S2Storage) Append(ctx context.Context, env Envelope) error {
97+
if err := ctx.Err(); err != nil {
98+
return err
99+
}
100+
83101
data, err := json.Marshal(env)
84102
if err != nil {
85103
return fmt.Errorf("s2storage: marshal envelope seq=%d: %w", env.Seq, err)
@@ -93,35 +111,33 @@ func (s *S2Storage) Append(_ context.Context, env Envelope) error {
93111
s.producer.wg.Add(1)
94112
go func() {
95113
defer s.producer.wg.Done()
96-
ackCtx, cancel := context.WithCancel(context.Background())
97-
defer cancel()
98-
go func() {
99-
select {
100-
case <-s.shutdownCh:
101-
cancel()
102-
case <-ackCtx.Done():
103-
}
104-
}()
105-
106-
ticket, err := future.Wait(ackCtx)
114+
115+
ticket, err := future.Wait(s.shutdownCtx)
107116
if err != nil {
108-
s.log.Error("s2storage: wait for submit failed", "seq", env.Seq, "err", err)
117+
total := s.ackErrors.Add(1)
118+
s.log.Error("s2storage: wait for submit failed", "seq", env.Seq, "err", err, "total_ack_errors", total)
109119
return
110120
}
111121
if ticket == nil {
112122
return
113123
}
114-
if _, err := ticket.Ack(ackCtx); err != nil {
115-
s.log.Error("s2storage: ack failed", "seq", env.Seq, "err", err)
124+
if _, err := ticket.Ack(s.shutdownCtx); err != nil {
125+
total := s.ackErrors.Add(1)
126+
s.log.Error("s2storage: ack failed", "seq", env.Seq, "err", err, "total_ack_errors", total)
116127
}
117128
}()
118129

119130
return nil
120131
}
121132

133+
// AckErrors returns the total number of async ack failures since construction.
134+
func (s *S2Storage) AckErrors() uint64 {
135+
return s.ackErrors.Load()
136+
}
137+
122138
// Close cancels in-flight ack goroutines, waits for them to drain, then closes
123139
// the producer (which flushes the S2 batcher to the network).
124140
func (s *S2Storage) Close(ctx context.Context) error {
125-
close(s.shutdownCh)
141+
s.closeOnce.Do(s.shutdownCancel)
126142
return s.producer.close(ctx)
127143
}

0 commit comments

Comments
 (0)