diff --git a/api/connector/Connector.go b/api/connector/Connector.go index a1dd439b94..4e6de99ace 100644 --- a/api/connector/Connector.go +++ b/api/connector/Connector.go @@ -18,8 +18,16 @@ package connector import ( "bufio" + "context" "encoding/json" "fmt" + "io" + "net/http" + "strconv" + "strings" + "sync" + "time" + "github.com/devtron-labs/devtron/api/bean" "github.com/gogo/protobuf/proto" "github.com/grpc-ecosystem/grpc-gateway/runtime" @@ -27,20 +35,13 @@ import ( "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "io" - "net/http" - "regexp" - "strconv" - "strings" - "sync" - "time" ) var delimiter = []byte("\n\n") type Pump interface { StartStreamWithTransformer(w http.ResponseWriter, recv func() (proto.Message, error), err error, transformer func(interface{}) interface{}) - StartK8sStreamWithHeartBeat(w http.ResponseWriter, isReconnect bool, stream io.ReadCloser, err error) + StartK8sStreamWithHeartBeat(ctx context.Context, w http.ResponseWriter, isReconnect bool, stream io.ReadCloser, err error) } type PumpImpl struct { @@ -53,7 +54,7 @@ func NewPumpImpl(logger *zap.SugaredLogger) *PumpImpl { } } -func (impl PumpImpl) StartK8sStreamWithHeartBeat(w http.ResponseWriter, isReconnect bool, stream io.ReadCloser, err error) { +func (impl PumpImpl) StartK8sStreamWithHeartBeat(ctx context.Context, w http.ResponseWriter, isReconnect bool, stream io.ReadCloser, err error) { f, ok := w.(http.Flusher) if !ok { http.Error(w, "unexpected server doesnt support streaming", http.StatusInternalServerError) @@ -82,47 +83,69 @@ func (impl PumpImpl) StartK8sStreamWithHeartBeat(w http.ResponseWriter, isReconn } // heartbeat start ticker := time.NewTicker(30 * time.Second) - done := make(chan bool) + done := make(chan struct{}) // close(done) never blocks, so no buffer needed var mux sync.Mutex - go func() error { + + go func() { for { select { case <-done: - return nil + return + case <-ctx.Done(): + stream.Close() // unblocks the blocking bufReader.ReadString below + return case t := <-ticker.C: mux.Lock() err := impl.sendEvent(nil, []byte("PING"), []byte(t.String()), w) + if err == nil { + f.Flush() + } mux.Unlock() if err != nil { impl.logger.Errorw("error in writing PING over sse", "err", err) - return err + return } - f.Flush() } } }() defer func() { ticker.Stop() - done <- true + stream.Close() // idempotent: safe to call after goroutine already closed it + close(done) // signals goroutine to exit if still running }() bufReader := bufio.NewReader(stream) eof := false for !eof { + // fast-exit: if ctx expired between reads, return immediately + select { + case <-ctx.Done(): + return + default: + } + log, err := bufReader.ReadString('\n') if err == io.EOF { eof = true - // stop if we reached end of stream and the next line is empty if log == "" { return } - } else if err != nil && err != io.EOF { + } else if err != nil { + if ctx.Err() != nil { + // stream was closed because ctx expired — not an application error + return + } impl.logger.Errorw("error in reading buffer string, StartK8sStreamWithHeartBeat", "err", err) return } - log = strings.TrimSpace(log) // Remove trailing line ending - a := regexp.MustCompile(" ") - splitLog := a.Split(log, 2) + log = strings.TrimSpace(log) + if log == "" { + continue // blank line mid-stream: skip without aborting + } + splitLog := strings.SplitN(log, " ", 2) + if len(splitLog) < 2 { + continue // no space separator: not a valid log line, skip + } parsedTime, err := time.Parse(time.RFC3339, splitLog[0]) if err != nil { impl.logger.Errorw("error in writing data over sse", "err", err) @@ -133,12 +156,14 @@ func (impl PumpImpl) StartK8sStreamWithHeartBeat(w http.ResponseWriter, isReconn if len(splitLog) == 2 { err = impl.sendEvent([]byte(eventId), nil, []byte(splitLog[1]), w) } + if err == nil { + f.Flush() + } mux.Unlock() if err != nil { impl.logger.Errorw("error in writing data over sse", "err", err) return } - f.Flush() } // heartbeat end } diff --git a/api/connector/connector_test.go b/api/connector/connector_test.go new file mode 100644 index 0000000000..f066b06962 --- /dev/null +++ b/api/connector/connector_test.go @@ -0,0 +1,135 @@ +package connector + +import ( + "context" + "io" + "net/http/httptest" + "strings" + "testing" + "time" + + "go.uber.org/zap" +) + +func newTestPump() PumpImpl { + logger, _ := zap.NewDevelopment() + return PumpImpl{logger: logger.Sugar()} +} + +// blockingReader blocks on Read until Close is called, then returns io.EOF +type blockingReader struct { + ch chan struct{} +} + +func (b *blockingReader) Read(p []byte) (int, error) { + <-b.ch + return 0, io.EOF +} +func (b *blockingReader) Close() error { + select { + case <-b.ch: + // already closed + default: + close(b.ch) + } + return nil +} + +func TestStartK8sStreamWithHeartBeat_CtxCancelled_Returns(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + waitForStart bool // synchronise so cancel fires after goroutine entry + }{ + {"cancel_before_goroutine_blocks", false}, + {"cancel_after_goroutine_entry", true}, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + pump := newTestPump() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + br := &blockingReader{ch: make(chan struct{})} + w := httptest.NewRecorder() + + started := make(chan struct{}) + done := make(chan struct{}) + go func() { + defer close(done) + close(started) + pump.StartK8sStreamWithHeartBeat(ctx, w, false, br, nil) + }() + + if tc.waitForStart { + <-started + } + cancel() + + select { + case <-done: + // success: function returned after ctx cancel without deadlock + case <-time.After(3 * time.Second): + t.Fatalf("StartK8sStreamWithHeartBeat did not return after ctx cancel (%s)", tc.name) + } + }) + } +} + +// fakeStream returns a ReadCloser that emits fixed content then EOF. +type fakeStream struct { + r io.Reader + done chan struct{} +} + +func newFakeStream(content string) *fakeStream { + return &fakeStream{r: strings.NewReader(content), done: make(chan struct{})} +} +func (f *fakeStream) Read(p []byte) (int, error) { return f.r.Read(p) } +func (f *fakeStream) Close() error { + select { + case <-f.done: + default: + close(f.done) + } + return nil +} + +func TestStartK8sStreamWithHeartBeat_MalformedLines_DoNotAbortStream(t *testing.T) { + t.Parallel() + cases := []struct { + name string + payload string + }{ + {"blank_line_mid_stream", "2024-01-01T00:00:00Z hello world\n\n2024-01-01T00:00:01Z second line\n"}, + {"line_without_space", "2024-01-01T00:00:00Z hello\nnotimestamp\n2024-01-01T00:00:02Z after bad\n"}, + {"empty_stream", ""}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + pump := newTestPump() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream := newFakeStream(tc.payload) + w := httptest.NewRecorder() + done := make(chan struct{}) + go func() { + defer close(done) + pump.StartK8sStreamWithHeartBeat(ctx, w, false, stream, nil) + }() + select { + case <-done: + // success: returned cleanly without deadlock + case <-time.After(3 * time.Second): + t.Fatalf("stream did not complete in time (%s)", tc.name) + } + }) + } +} diff --git a/api/k8s/application/k8sApplicationRestHandler.go b/api/k8s/application/k8sApplicationRestHandler.go index 60a8f39cfd..a3d64a7ff5 100644 --- a/api/k8s/application/k8sApplicationRestHandler.go +++ b/api/k8s/application/k8sApplicationRestHandler.go @@ -591,8 +591,7 @@ func (handler *K8sApplicationRestHandlerImpl) GetPodLogs(w http.ResponseWriter, }(ctx.Done(), cn.CloseNotify()) } defer cancel() - defer util.Close(stream, handler.logger) - handler.pump.StartK8sStreamWithHeartBeat(w, isReconnect, stream, err) + handler.pump.StartK8sStreamWithHeartBeat(ctx, w, isReconnect, stream, err) } func (handler *K8sApplicationRestHandlerImpl) DownloadPodLogs(w http.ResponseWriter, r *http.Request) {