Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 46 additions & 21 deletions api/connector/Connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,30 @@

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"sync"
"time"

"github.com/devtron-labs/devtron/api/bean"
"github.com/gogo/protobuf/proto"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/juju/errors"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
"time"
)

var delimiter = []byte("\n\n")

type Pump interface {
StartStreamWithTransformer(w http.ResponseWriter, recv func() (proto.Message, error), err error, transformer func(interface{}) interface{})
StartK8sStreamWithHeartBeat(w http.ResponseWriter, isReconnect bool, stream io.ReadCloser, err error)
StartK8sStreamWithHeartBeat(ctx context.Context, w http.ResponseWriter, isReconnect bool, stream io.ReadCloser, err error)
}

type PumpImpl struct {
Expand All @@ -53,7 +54,7 @@
}
}

func (impl PumpImpl) StartK8sStreamWithHeartBeat(w http.ResponseWriter, isReconnect bool, stream io.ReadCloser, err error) {
func (impl PumpImpl) StartK8sStreamWithHeartBeat(ctx context.Context, w http.ResponseWriter, isReconnect bool, stream io.ReadCloser, err error) {

Check failure on line 57 in api/connector/Connector.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 37 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=devtron-labs_devtron&issues=AZ3jAUzJOL9bm9SrHs_C&open=AZ3jAUzJOL9bm9SrHs_C&pullRequest=6958
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "unexpected server doesnt support streaming", http.StatusInternalServerError)
Expand Down Expand Up @@ -82,47 +83,69 @@
}
// heartbeat start
ticker := time.NewTicker(30 * time.Second)
done := make(chan bool)
done := make(chan struct{}) // close(done) never blocks, so no buffer needed
var mux sync.Mutex
go func() error {

go func() {
for {
select {
case <-done:
return nil
return
case <-ctx.Done():
stream.Close() // unblocks the blocking bufReader.ReadString below
return
case t := <-ticker.C:
mux.Lock()
err := impl.sendEvent(nil, []byte("PING"), []byte(t.String()), w)
if err == nil {
f.Flush()
}
mux.Unlock()
if err != nil {
impl.logger.Errorw("error in writing PING over sse", "err", err)
return err
return
}
f.Flush()
}
}
}()
defer func() {
ticker.Stop()
done <- true
stream.Close() // idempotent: safe to call after goroutine already closed it
close(done) // signals goroutine to exit if still running
}()

bufReader := bufio.NewReader(stream)
eof := false
for !eof {
// fast-exit: if ctx expired between reads, return immediately
select {
case <-ctx.Done():
return
default:
}

log, err := bufReader.ReadString('\n')
if err == io.EOF {
eof = true
// stop if we reached end of stream and the next line is empty
if log == "" {
return
}
} else if err != nil && err != io.EOF {
} else if err != nil {
if ctx.Err() != nil {
// stream was closed because ctx expired — not an application error
return
}
impl.logger.Errorw("error in reading buffer string, StartK8sStreamWithHeartBeat", "err", err)
return
}
log = strings.TrimSpace(log) // Remove trailing line ending
a := regexp.MustCompile(" ")
splitLog := a.Split(log, 2)
log = strings.TrimSpace(log)
if log == "" {
continue // blank line mid-stream: skip without aborting
}
splitLog := strings.SplitN(log, " ", 2)
if len(splitLog) < 2 {
continue // no space separator: not a valid log line, skip
}
parsedTime, err := time.Parse(time.RFC3339, splitLog[0])
if err != nil {
impl.logger.Errorw("error in writing data over sse", "err", err)
Expand All @@ -133,12 +156,14 @@
if len(splitLog) == 2 {
err = impl.sendEvent([]byte(eventId), nil, []byte(splitLog[1]), w)
}
if err == nil {
f.Flush()
}
mux.Unlock()
if err != nil {
impl.logger.Errorw("error in writing data over sse", "err", err)
return
}
f.Flush()
}
// heartbeat end
}
Expand Down
135 changes: 135 additions & 0 deletions api/connector/connector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package connector

import (
"context"
"io"
"net/http/httptest"
"strings"
"testing"
"time"

"go.uber.org/zap"
)

func newTestPump() PumpImpl {
logger, _ := zap.NewDevelopment()
return PumpImpl{logger: logger.Sugar()}
}

// blockingReader blocks on Read until Close is called, then returns io.EOF
type blockingReader struct {
ch chan struct{}
}

func (b *blockingReader) Read(p []byte) (int, error) {
<-b.ch
return 0, io.EOF
}
func (b *blockingReader) Close() error {
select {
case <-b.ch:
// already closed
default:
close(b.ch)
}
return nil
}

func TestStartK8sStreamWithHeartBeat_CtxCancelled_Returns(t *testing.T) {
t.Parallel()

cases := []struct {
name string
waitForStart bool // synchronise so cancel fires after goroutine entry
}{
{"cancel_before_goroutine_blocks", false},
{"cancel_after_goroutine_entry", true},
}

for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

pump := newTestPump()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

br := &blockingReader{ch: make(chan struct{})}
w := httptest.NewRecorder()

started := make(chan struct{})
done := make(chan struct{})
go func() {
defer close(done)
close(started)
pump.StartK8sStreamWithHeartBeat(ctx, w, false, br, nil)
}()

if tc.waitForStart {
<-started
}
cancel()

select {
case <-done:
// success: function returned after ctx cancel without deadlock
case <-time.After(3 * time.Second):
t.Fatalf("StartK8sStreamWithHeartBeat did not return after ctx cancel (%s)", tc.name)
}
})
}
}

// fakeStream returns a ReadCloser that emits fixed content then EOF.
type fakeStream struct {
r io.Reader
done chan struct{}
}

func newFakeStream(content string) *fakeStream {
return &fakeStream{r: strings.NewReader(content), done: make(chan struct{})}
}
func (f *fakeStream) Read(p []byte) (int, error) { return f.r.Read(p) }
func (f *fakeStream) Close() error {
select {
case <-f.done:
default:
close(f.done)
}
return nil
}

func TestStartK8sStreamWithHeartBeat_MalformedLines_DoNotAbortStream(t *testing.T) {
t.Parallel()
cases := []struct {
name string
payload string
}{
{"blank_line_mid_stream", "2024-01-01T00:00:00Z hello world\n\n2024-01-01T00:00:01Z second line\n"},
{"line_without_space", "2024-01-01T00:00:00Z hello\nnotimestamp\n2024-01-01T00:00:02Z after bad\n"},
{"empty_stream", ""},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
pump := newTestPump()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream := newFakeStream(tc.payload)
w := httptest.NewRecorder()
done := make(chan struct{})
go func() {
defer close(done)
pump.StartK8sStreamWithHeartBeat(ctx, w, false, stream, nil)
}()
select {
case <-done:
// success: returned cleanly without deadlock
case <-time.After(3 * time.Second):
t.Fatalf("stream did not complete in time (%s)", tc.name)
}
})
}
}
3 changes: 1 addition & 2 deletions api/k8s/application/k8sApplicationRestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,7 @@ func (handler *K8sApplicationRestHandlerImpl) GetPodLogs(w http.ResponseWriter,
}(ctx.Done(), cn.CloseNotify())
}
defer cancel()
defer util.Close(stream, handler.logger)
handler.pump.StartK8sStreamWithHeartBeat(w, isReconnect, stream, err)
handler.pump.StartK8sStreamWithHeartBeat(ctx, w, isReconnect, stream, err)
}

func (handler *K8sApplicationRestHandlerImpl) DownloadPodLogs(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading