Skip to content
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 107 additions & 46 deletions protocol/logger/job_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"regexp"
"strings"
"sync"
"sync/atomic"
"time"

"nhooyr.io/websocket"
Expand Down Expand Up @@ -50,24 +51,23 @@
type WebsocketLivelogger struct {
JobRequest *protocol.AgentJobRequestMessage
Connection *protocol.VssConnection
ws *websocket.Conn
ws atomic.Pointer[websocket.Conn]
FeedStreamURL string
}

func (logger *WebsocketLivelogger) Close() error {
if logger.ws != nil {
err := logger.ws.Close(websocket.StatusGoingAway, "Bye!")
logger.ws = nil
return logger.replace(nil)
}

func (logger *WebsocketLivelogger) replace(n *websocket.Conn) error {
if ws := logger.ws.Swap(n); ws != nil {
err := ws.Close(websocket.StatusGoingAway, "Bye!")
return err
}
return nil
}

func (logger *WebsocketLivelogger) Connect() error {
err := logger.Close()
if err != nil && logger.Connection.Trace {
fmt.Printf("Failed to close old websocket connection %s\n", err.Error())
}
if logger.Connection.Trace {
fmt.Printf("Try to connect to websocket %s\n", logger.FeedStreamURL)
}
Expand All @@ -87,40 +87,48 @@
},
})
// While reconnecting never assign this to null
if ws != nil {
logger.ws = ws
if ws != nil && err == nil {
if err = logger.replace(ws); err != nil && logger.Connection.Trace {
fmt.Printf("Failed to close old websocket connection %s\n", err.Error())
}
err = nil
}
return err
}

func (logger *WebsocketLivelogger) SendLog(lines *protocol.TimelineRecordFeedLinesWrapper) error {
// Do not try to send if something is wrong
if logger.ws == nil {
ws := logger.ws.Load()
if ws == nil {
return fmt.Errorf("missing websocket connection")
}
ctx, cancel := context.WithTimeout(context.Background(), websocketMessageTimeout)
defer cancel()
return wsjson.Write(ctx, logger.ws, lines)
return wsjson.Write(ctx, ws, lines)
}

type WebsocketLiveloggerWithFallback struct {
JobRequest *protocol.AgentJobRequestMessage
Connection *protocol.VssConnection
currentLogger LiveLogger
currentLogger atomic.Pointer[LiveLogger]
FeedStreamURL string
ForceWebsock bool
}

func (logger *WebsocketLiveloggerWithFallback) InitializeVssLogger() {
_ = logger.Close() // Ignore error for cleanup
logger.currentLogger = &VssLiveLogger{
func (logger *WebsocketLiveloggerWithFallback) initializeVssLogger() LiveLogger {
l := &VssLiveLogger{
JobRequest: logger.JobRequest,
Connection: logger.Connection,
}
_ = logger.replace(l) // Ignore error for cleanup
return l
}

func (logger *WebsocketLiveloggerWithFallback) Initialize() {
_ = logger.Close() // Ignore error for cleanup
func (logger *WebsocketLiveloggerWithFallback) InitializeVssLogger() {
logger.initializeVssLogger()
}

func (logger *WebsocketLiveloggerWithFallback) initialize() LiveLogger {
if logger.FeedStreamURL != "" {
wslogger := &WebsocketLivelogger{
JobRequest: logger.JobRequest,
Expand All @@ -129,54 +137,94 @@
}
err := wslogger.Connect()
if err == nil {
logger.currentLogger = wslogger
return
_ = logger.replace(wslogger) // Ignore error for cleanup
return wslogger
} else if logger.Connection.Trace {
fmt.Printf("Failed to connect to websocket %s, fallback to vsslogger\n", err.Error())
}
}
if !logger.ForceWebsock {
logger.InitializeVssLogger()
return logger.initializeVssLogger()
}
return nil
}

func (logger *WebsocketLiveloggerWithFallback) Close() error {
if logger.currentLogger != nil {
err := logger.currentLogger.Close()
logger.currentLogger = nil
return err
func (logger *WebsocketLiveloggerWithFallback) Initialize() {
logger.initialize()
}

type errorLogger struct{}

// Close implements [LiveLogger].
func (e *errorLogger) Close() error {
return nil
}

// SendLog implements [LiveLogger].
func (e *errorLogger) SendLog(lines *protocol.TimelineRecordFeedLinesWrapper) error {
return errors.New("missing Logger Connection")
}
Comment thread
ChristopherHX marked this conversation as resolved.

func makePointer[T any](p T) *T {
return &p
}

Check failure on line 170 in protocol/logger/job_logger.go

View workflow job for this annotation

GitHub Actions / Lint

File is not properly formatted (gofumpt)
func getPointer[T any](p *T) T {
if p == nil {
var zero T
return zero
}
return *p
}

Comment thread
ChristopherHX marked this conversation as resolved.
func (logger *WebsocketLiveloggerWithFallback) replace(n LiveLogger) error {
if currentLogger := logger.currentLogger.Swap(makePointer(n)); getPointer(currentLogger) != nil {
return (*currentLogger).Close()
}
return nil
}

func (logger *WebsocketLiveloggerWithFallback) Close() error {
return logger.replace(&errorLogger{})
}

func (logger *WebsocketLiveloggerWithFallback) SendLog(wrapper *protocol.TimelineRecordFeedLinesWrapper) error {
if logger.currentLogger == nil {
logger.Initialize()
currentLogger := getPointer(logger.currentLogger.Load())
if currentLogger == nil {
currentLogger = logger.initialize()
if currentLogger == nil {
return errors.New("SendLog failure")
}
Comment thread
ChristopherHX marked this conversation as resolved.
}
Comment thread
ChristopherHX marked this conversation as resolved.
err := logger.currentLogger.SendLog(wrapper)
err := currentLogger.SendLog(wrapper)
if err != nil {
if logger.Connection.Trace {
fmt.Printf("Failed to send webconsole log %s\n", err.Error())
}
if wslogger, err := logger.currentLogger.(*WebsocketLivelogger); err {
if wslogger, err := currentLogger.(*WebsocketLivelogger); err {
Comment thread
ChristopherHX marked this conversation as resolved.
Outdated
if err := wslogger.Connect(); err != nil {
if !logger.ForceWebsock {
if logger.Connection.Trace {
fmt.Printf("Failed to reconnect to websocket %s, fallback to vsslogger\n", err.Error())
}
logger.InitializeVssLogger()
return logger.currentLogger.SendLog(wrapper)
currentLogger = logger.initializeVssLogger()
if currentLogger == nil {
return errors.New("SendLog failure")
}
return currentLogger.SendLog(wrapper)
}
return err
}
err := logger.currentLogger.SendLog(wrapper)
err := currentLogger.SendLog(wrapper)
if err != nil {
if !logger.ForceWebsock {
if logger.Connection.Trace {
fmt.Printf("Failed to send webconsole log %s, fallback to vsslogger\n", err.Error())
}
logger.InitializeVssLogger()
return logger.currentLogger.SendLog(wrapper)
currentLogger = logger.initializeVssLogger()
if currentLogger == nil {
return errors.New("SendLog failure")
}
return currentLogger.SendLog(wrapper)
}
return err
}
Expand All @@ -186,12 +234,16 @@
return err
}

type BufferedLiveLogger struct {
LiveLogger
type internalBufferedLiveLoggerData struct {
logchan chan *protocol.TimelineRecordFeedLinesWrapper
logfinished chan struct{}
}

type BufferedLiveLogger struct {
LiveLogger
data atomic.Pointer[internalBufferedLiveLoggerData]
}

func (logger *BufferedLiveLogger) sendLogs(logchan chan *protocol.TimelineRecordFeedLinesWrapper, logfinished chan struct{}) {
defer close(logfinished)
for {
Expand Down Expand Up @@ -238,23 +290,32 @@
}

func (logger *BufferedLiveLogger) Close() error {
if logger.logchan != nil {
close(logger.logchan)
logger.logchan = nil
<-logger.logfinished
if data := logger.data.Swap(nil); data != nil {
close(data.logchan)
data.logchan = nil
<-data.logfinished
}
Comment thread
ChristopherHX marked this conversation as resolved.
Outdated
return nil
}

func (logger *BufferedLiveLogger) SendLog(wrapper *protocol.TimelineRecordFeedLinesWrapper) error {
if logger.logchan == nil {
if data := logger.data.Load(); data != nil {
data.logchan <- wrapper
} else {
logchan := make(chan *protocol.TimelineRecordFeedLinesWrapper, websocketPingSize)
logger.logchan = logchan
logfinished := make(chan struct{})
logger.logfinished = logfinished
go logger.sendLogs(logchan, logfinished)
ndata := internalBufferedLiveLoggerData{
logchan: logchan,
logfinished: logfinished,
}
if logger.data.CompareAndSwap(data, &ndata) {
go logger.sendLogs(logchan, logfinished)
} else {
close(ndata.logchan)
Comment thread
ChristopherHX marked this conversation as resolved.
Outdated
close(ndata.logfinished)
return logger.SendLog(wrapper)
}
}
logger.logchan <- wrapper
return nil
Comment thread
ChristopherHX marked this conversation as resolved.
Outdated
}

Expand Down
Loading