Skip to content

Commit 634e23c

Browse files
committed
feat: log warning when SSE flush to client is slow (#66)
Adds slow client detection to the EventStream SSE sender. After each flush, the elapsed time is measured using an injected quartz.Clock. If it exceeds 500ms, a Warn log is emitted with the flush duration, making slow clients immediately visible in logs without requiring distributed tracing infrastructure. quartz.Clock is injected into EventStream so tests can simulate slow flushes by advancing a mock clock — no real sleeping required.
1 parent ee2c4b4 commit 634e23c

5 files changed

Lines changed: 150 additions & 5 deletions

File tree

intercept/chatcompletions/streaming.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/coder/aibridge/tracing"
2929

3030
"cdr.dev/slog/v3"
31+
"github.com/coder/quartz"
3132
)
3233

3334
type StreamingInterception struct {
@@ -105,7 +106,7 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re
105106
defer streamCancel(xerrors.New("deferred"))
106107

107108
// events will either terminate when shutdown after interaction with upstream completes, or when streamCtx is done.
108-
events := eventstream.NewEventStream(streamCtx, logger.Named("sse-sender"), nil)
109+
events := eventstream.NewEventStream(streamCtx, logger.Named("sse-sender"), nil, quartz.NewReal())
109110
go events.Start(w, r)
110111
defer func() {
111112
_ = events.Shutdown(streamCtx) // Catch-all in case it doesn't get shutdown after stream completes.

intercept/eventstream/eventstream.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,22 @@ import (
1515
"golang.org/x/xerrors"
1616

1717
"cdr.dev/slog/v3"
18+
"github.com/coder/quartz"
1819
)
1920

2021
var ErrEventStreamClosed = xerrors.New("event stream closed")
2122

22-
const pingInterval = time.Second * 10
23+
const (
24+
pingInterval = time.Second * 10
25+
slowFlushThreshold = time.Millisecond * 500
26+
)
2327

2428
type event []byte
2529

2630
type EventStream struct {
2731
ctx context.Context
2832
logger slog.Logger
33+
clk quartz.Clock
2934

3035
pingPayload []byte
3136

@@ -43,7 +48,7 @@ type EventStream struct {
4348
}
4449

4550
// NewEventStream creates a new SSE stream, with an optional payload which is used to send pings every [pingInterval].
46-
func NewEventStream(ctx context.Context, logger slog.Logger, pingPayload []byte) *EventStream {
51+
func NewEventStream(ctx context.Context, logger slog.Logger, pingPayload []byte, clk quartz.Clock) *EventStream {
4752
// Send periodic pings to keep connections alive.
4853
// The upstream provider may also send their own pings, but we can't rely on this.
4954
tick := time.NewTicker(time.Nanosecond)
@@ -52,6 +57,7 @@ func NewEventStream(ctx context.Context, logger slog.Logger, pingPayload []byte)
5257
return &EventStream{
5358
ctx: ctx,
5459
logger: logger,
60+
clk: clk,
5561

5662
pingPayload: pingPayload,
5763

@@ -131,10 +137,14 @@ func (s *EventStream) Start(w http.ResponseWriter, r *http.Request) {
131137
}
132138
return
133139
}
140+
flushStart := s.clk.Now()
134141
if err := flush(w); err != nil {
135142
s.logger.Warn(ctx, "failed to flush event stream", slog.Error(err))
136143
return
137144
}
145+
if d := s.clk.Since(flushStart); d > slowFlushThreshold {
146+
s.logger.Warn(ctx, "slow client detected", slog.F("flush_duration", d))
147+
}
138148

139149
// Reset the timer once we've flushed some data to the stream, since it's already fresh.
140150
// No need to ping in that case.
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package eventstream_test
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"net"
7+
"net/http"
8+
"net/http/httptest"
9+
"sync"
10+
"testing"
11+
"time"
12+
13+
"cdr.dev/slog/v3"
14+
"cdr.dev/slog/v3/sloggers/slogtest"
15+
"github.com/stretchr/testify/require"
16+
17+
"github.com/coder/aibridge/intercept/eventstream"
18+
"github.com/coder/quartz"
19+
)
20+
21+
// captureSink collects log entries for assertions in tests.
22+
type captureSink struct {
23+
mu sync.Mutex
24+
entries []slog.SinkEntry
25+
}
26+
27+
func (s *captureSink) LogEntry(_ context.Context, e slog.SinkEntry) {
28+
s.mu.Lock()
29+
defer s.mu.Unlock()
30+
s.entries = append(s.entries, e)
31+
}
32+
33+
func (*captureSink) Sync() {}
34+
35+
func (s *captureSink) warns() []slog.SinkEntry {
36+
s.mu.Lock()
37+
defer s.mu.Unlock()
38+
var out []slog.SinkEntry
39+
for _, e := range s.entries {
40+
if e.Level == slog.LevelWarn {
41+
out = append(out, e)
42+
}
43+
}
44+
return out
45+
}
46+
47+
// clockAdvancingFlusher wraps httptest.ResponseRecorder and advances the mock
48+
// clock on each Flush call, simulating a slow client without real sleeping.
49+
type clockAdvancingFlusher struct {
50+
*httptest.ResponseRecorder
51+
clk *quartz.Mock
52+
advance time.Duration
53+
}
54+
55+
func (f *clockAdvancingFlusher) Flush() {
56+
f.clk.Advance(f.advance)
57+
f.ResponseRecorder.Flush()
58+
}
59+
60+
// Hijack satisfies the FullResponseWriter lint rule.
61+
func (f *clockAdvancingFlusher) Hijack() (net.Conn, *bufio.ReadWriter, error) {
62+
return nil, nil, nil
63+
}
64+
65+
func TestEventStream_LogsWarning_WhenFlushIsSlow(t *testing.T) {
66+
t.Parallel()
67+
68+
sink := &captureSink{}
69+
logger := slogtest.Make(t, nil).AppendSinks(sink).Leveled(slog.LevelWarn)
70+
ctx := context.Background()
71+
clk := quartz.NewMock(t)
72+
73+
stream := eventstream.NewEventStream(ctx, logger, nil, clk)
74+
75+
w := &clockAdvancingFlusher{
76+
ResponseRecorder: httptest.NewRecorder(),
77+
clk: clk,
78+
advance: 600 * time.Millisecond, // exceeds slowFlushThreshold (500ms)
79+
}
80+
81+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "/", nil)
82+
require.NoError(t, err)
83+
84+
done := make(chan struct{})
85+
go func() {
86+
defer close(done)
87+
stream.Start(w, req)
88+
}()
89+
90+
stream.InitiateStream(w)
91+
require.NoError(t, stream.SendRaw(ctx, []byte("data: hello\n\n")))
92+
require.NoError(t, stream.Shutdown(ctx))
93+
<-done
94+
95+
warns := sink.warns()
96+
require.Len(t, warns, 1)
97+
require.Equal(t, "slow client detected", warns[0].Message)
98+
}
99+
100+
func TestEventStream_NoWarning_WhenFlushIsFast(t *testing.T) {
101+
t.Parallel()
102+
103+
sink := &captureSink{}
104+
logger := slogtest.Make(t, nil).AppendSinks(sink).Leveled(slog.LevelWarn)
105+
ctx := context.Background()
106+
clk := quartz.NewMock(t)
107+
108+
stream := eventstream.NewEventStream(ctx, logger, nil, clk)
109+
110+
// No clock advance — flush duration stays at 0, below threshold.
111+
w := &clockAdvancingFlusher{
112+
ResponseRecorder: httptest.NewRecorder(),
113+
clk: clk,
114+
advance: 0,
115+
}
116+
117+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "/", nil)
118+
require.NoError(t, err)
119+
120+
done := make(chan struct{})
121+
go func() {
122+
defer close(done)
123+
stream.Start(w, req)
124+
}()
125+
126+
stream.InitiateStream(w)
127+
require.NoError(t, stream.SendRaw(ctx, []byte("data: hello\n\n")))
128+
require.NoError(t, stream.Shutdown(ctx))
129+
<-done
130+
131+
require.Empty(t, sink.warns())
132+
}

intercept/messages/streaming.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/coder/aibridge/tracing"
3030

3131
"cdr.dev/slog/v3"
32+
"github.com/coder/quartz"
3233
)
3334

3435
type StreamingInterception struct {
@@ -140,7 +141,7 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re
140141
}
141142

142143
// events will either terminate when shutdown after interaction with upstream completes, or when streamCtx is done.
143-
events := eventstream.NewEventStream(streamCtx, logger.Named("sse-sender"), i.pingPayload())
144+
events := eventstream.NewEventStream(streamCtx, logger.Named("sse-sender"), i.pingPayload(), quartz.NewReal())
144145
go events.Start(w, r)
145146
defer func() {
146147
_ = events.Shutdown(streamCtx) // Catch-all in case it doesn't get shutdown after stream completes.

intercept/responses/streaming.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"cdr.dev/slog/v3"
1919
"github.com/coder/aibridge/config"
2020
aibcontext "github.com/coder/aibridge/context"
21+
"github.com/coder/quartz"
2122
"github.com/coder/aibridge/intercept"
2223
"github.com/coder/aibridge/intercept/eventstream"
2324
"github.com/coder/aibridge/mcp"
@@ -83,7 +84,7 @@ func (i *StreamingResponsesInterceptor) ProcessRequest(w http.ResponseWriter, r
8384

8485
i.injectTools()
8586

86-
events := eventstream.NewEventStream(ctx, i.logger.Named("sse-sender"), nil)
87+
events := eventstream.NewEventStream(ctx, i.logger.Named("sse-sender"), nil, quartz.NewReal())
8788
go events.Start(w, r)
8889
defer func() {
8990
shutdownCtx, shutdownCancel := context.WithTimeout(ctx, streamShutdownTimeout)

0 commit comments

Comments
 (0)