Skip to content

Commit 110b37e

Browse files
authored
fix: enhance StartK8sStreamWithHeartBeat to prevent goroutine leaks and improve error handling (#6960)
1 parent dea52f6 commit 110b37e

6 files changed

Lines changed: 127 additions & 2484 deletions

File tree

api/connector/Connector.go

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"sync"
2929
"time"
3030

31+
"github.com/devtron-labs/common-lib/async"
3132
"github.com/devtron-labs/devtron/api/bean"
3233
"github.com/gogo/protobuf/proto"
3334
"github.com/grpc-ecosystem/grpc-gateway/runtime"
@@ -45,19 +46,22 @@ type Pump interface {
4546
}
4647

4748
type PumpImpl struct {
48-
logger *zap.SugaredLogger
49+
logger *zap.SugaredLogger
50+
asyncRunnable *async.Runnable
4951
}
5052

51-
func NewPumpImpl(logger *zap.SugaredLogger) *PumpImpl {
53+
func NewPumpImpl(logger *zap.SugaredLogger, asyncRunnable *async.Runnable) *PumpImpl {
5254
return &PumpImpl{
53-
logger: logger,
55+
logger: logger,
56+
asyncRunnable: asyncRunnable,
5457
}
5558
}
5659

5760
func (impl PumpImpl) StartK8sStreamWithHeartBeat(ctx context.Context, w http.ResponseWriter, isReconnect bool, stream io.ReadCloser, err error) {
5861
f, ok := w.(http.Flusher)
5962
if !ok {
6063
http.Error(w, "unexpected server doesnt support streaming", http.StatusInternalServerError)
64+
return // was missing: f is nil past this point
6165
}
6266

6367
w.Header().Set("Transfer-Encoding", "chunked")
@@ -81,18 +85,33 @@ func (impl PumpImpl) StartK8sStreamWithHeartBeat(ctx context.Context, w http.Res
8185
return
8286
}
8387
}
84-
// heartbeat start
88+
// sync.Once ensures stream.Close is idempotent: the goroutine may call it
89+
// via the ctx.Done path, and the deferred cleanup calls it too.
90+
var closeOnce sync.Once
91+
closeStream := func() { closeOnce.Do(func() { stream.Close() }) }
92+
8593
ticker := time.NewTicker(30 * time.Second)
86-
done := make(chan struct{}) // close(done) never blocks, so no buffer needed
94+
done := make(chan struct{})
8795
var mux sync.Mutex
8896

89-
go func() {
97+
// WaitGroup restores the happens-before edge that the old `done <- true`
98+
// blocking send provided. wg.Wait() in the defer ensures the goroutine has
99+
// fully exited before this function returns, preventing f.Flush() from being
100+
// called after net/http.finishRequest() recycles the ResponseWriter.
101+
var wg sync.WaitGroup
102+
wg.Add(1)
103+
104+
// asyncRunnable.Execute wraps fn in a panic-recovering goroutine with metrics.
105+
// wg.Done() is deferred inside fn so it fires even when Execute's recover()
106+
// catches a panic (Go runs inner defers before outer recover).
107+
impl.asyncRunnable.Execute(func() {
108+
defer wg.Done()
90109
for {
91110
select {
92111
case <-done:
93112
return
94113
case <-ctx.Done():
95-
stream.Close() // unblocks the blocking bufReader.ReadString below
114+
closeStream() // unblocks bufReader.ReadString in the main loop
96115
return
97116
case t := <-ticker.C:
98117
mux.Lock()
@@ -107,17 +126,20 @@ func (impl PumpImpl) StartK8sStreamWithHeartBeat(ctx context.Context, w http.Res
107126
}
108127
}
109128
}
110-
}()
129+
})
130+
111131
defer func() {
132+
close(done) // unblock goroutine's select
112133
ticker.Stop()
113-
stream.Close() // idempotent: safe to call after goroutine already closed it
114-
close(done) // signals goroutine to exit if still running
134+
wg.Wait() // block until heartbeat goroutine has fully exited;
135+
// only after this does the handler return and net/http
136+
// reclaim the ResponseWriter
137+
closeStream() // safe: goroutine no longer touches stream after wg.Wait()
115138
}()
116139

117140
bufReader := bufio.NewReader(stream)
118141
eof := false
119142
for !eof {
120-
// fast-exit: if ctx expired between reads, return immediately
121143
select {
122144
case <-ctx.Done():
123145
return
@@ -132,40 +154,36 @@ func (impl PumpImpl) StartK8sStreamWithHeartBeat(ctx context.Context, w http.Res
132154
}
133155
} else if err != nil {
134156
if ctx.Err() != nil {
135-
// stream was closed because ctx expired — not an application error
136157
return
137158
}
138159
impl.logger.Errorw("error in reading buffer string, StartK8sStreamWithHeartBeat", "err", err)
139160
return
140161
}
141162
log = strings.TrimSpace(log)
142163
if log == "" {
143-
continue // blank line mid-stream: skip without aborting
164+
continue
144165
}
145166
splitLog := strings.SplitN(log, " ", 2)
146167
if len(splitLog) < 2 {
147-
continue // no space separator: not a valid log line, skip
168+
continue
148169
}
149170
parsedTime, err := time.Parse(time.RFC3339, splitLog[0])
150171
if err != nil {
151-
impl.logger.Errorw("error in writing data over sse", "err", err)
152-
return
172+
impl.logger.Errorw("error parsing log timestamp, skipping line", "err", err)
173+
continue
153174
}
154175
eventId := strconv.FormatInt(parsedTime.UnixNano(), 10)
155176
mux.Lock()
156-
if len(splitLog) == 2 {
157-
err = impl.sendEvent([]byte(eventId), nil, []byte(splitLog[1]), w)
158-
}
159-
if err == nil {
177+
sendErr := impl.sendEvent([]byte(eventId), nil, []byte(splitLog[1]), w)
178+
if sendErr == nil {
160179
f.Flush()
161180
}
162181
mux.Unlock()
163-
if err != nil {
164-
impl.logger.Errorw("error in writing data over sse", "err", err)
182+
if sendErr != nil {
183+
impl.logger.Errorw("error in writing data over sse", "err", sendErr)
165184
return
166185
}
167186
}
168-
// heartbeat end
169187
}
170188

171189
func (impl PumpImpl) StartStreamWithTransformer(w http.ResponseWriter, recv func() (proto.Message, error), err error, transformer func(interface{}) interface{}) {

api/connector/connector_test.go

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,21 @@ import (
44
"context"
55
"io"
66
"net/http/httptest"
7+
"runtime"
78
"strings"
9+
"sync"
810
"testing"
911
"time"
1012

13+
"github.com/devtron-labs/common-lib/async"
14+
"github.com/devtron-labs/common-lib/constants"
1115
"go.uber.org/zap"
1216
)
1317

1418
func newTestPump() PumpImpl {
1519
logger, _ := zap.NewDevelopment()
16-
return PumpImpl{logger: logger.Sugar()}
20+
runnable := async.NewAsyncRunnable(logger.Sugar(), constants.ServiceName("test"))
21+
return PumpImpl{logger: logger.Sugar(), asyncRunnable: runnable}
1722
}
1823

1924
// blockingReader blocks on Read until Close is called, then returns io.EOF
@@ -133,3 +138,79 @@ func TestStartK8sStreamWithHeartBeat_MalformedLines_DoNotAbortStream(t *testing.
133138
})
134139
}
135140
}
141+
142+
// sentinelFlusher reports a test error if Flush is called after markReturned().
143+
type sentinelFlusher struct {
144+
*httptest.ResponseRecorder
145+
mu sync.Mutex
146+
afterReturn bool
147+
t *testing.T
148+
}
149+
150+
func (sf *sentinelFlusher) markReturned() {
151+
sf.mu.Lock()
152+
sf.afterReturn = true
153+
sf.mu.Unlock()
154+
}
155+
156+
func (sf *sentinelFlusher) Flush() {
157+
sf.mu.Lock()
158+
violation := sf.afterReturn
159+
sf.mu.Unlock()
160+
if violation {
161+
sf.t.Error("Flush called after StartK8sStreamWithHeartBeat returned — heartbeat goroutine leak")
162+
}
163+
sf.ResponseRecorder.Flush()
164+
}
165+
166+
// TestHeartbeatGoroutineExitsBeforeFunctionReturns verifies that no Flush()
167+
// call reaches the ResponseWriter after StartK8sStreamWithHeartBeat returns.
168+
func TestHeartbeatGoroutineExitsBeforeFunctionReturns(t *testing.T) {
169+
t.Parallel()
170+
171+
pump := newTestPump()
172+
ctx, cancel := context.WithCancel(context.Background())
173+
defer cancel()
174+
175+
stream := newFakeStream("2024-01-01T00:00:00Z hello world\n")
176+
w := &sentinelFlusher{ResponseRecorder: httptest.NewRecorder(), t: t}
177+
178+
done := make(chan struct{})
179+
go func() {
180+
defer close(done)
181+
pump.StartK8sStreamWithHeartBeat(ctx, w, false, stream, nil)
182+
w.markReturned() // any Flush after this line is a violation
183+
}()
184+
185+
select {
186+
case <-done:
187+
// Give scheduler a moment: a leaked goroutine would call Flush here.
188+
time.Sleep(50 * time.Millisecond)
189+
case <-time.After(3 * time.Second):
190+
t.Fatal("StartK8sStreamWithHeartBeat did not return within 3s")
191+
}
192+
}
193+
194+
// TestNoGoroutineLeakAfterStreamEOF verifies no goroutine is left running
195+
// after the stream finishes normally.
196+
func TestNoGoroutineLeakAfterStreamEOF(t *testing.T) {
197+
// Not parallel: runtime.NumGoroutine() counts all goroutines in the process;
198+
// parallel sibling tests contaminate the before/after samples.
199+
pump := newTestPump()
200+
ctx, cancel := context.WithCancel(context.Background())
201+
defer cancel()
202+
203+
runtime.GC()
204+
before := runtime.NumGoroutine()
205+
206+
stream := newFakeStream("2024-01-01T00:00:00Z line one\n")
207+
pump.StartK8sStreamWithHeartBeat(ctx, httptest.NewRecorder(), false, stream, nil)
208+
209+
runtime.GC()
210+
time.Sleep(50 * time.Millisecond)
211+
212+
after := runtime.NumGoroutine()
213+
if after > before {
214+
t.Errorf("goroutine leak: %d before, %d after", before, after)
215+
}
216+
}

cmd/external-app/wire_gen.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)