Skip to content

Commit 4d9ed6a

Browse files
committed
improve job logger
1 parent a7e09d8 commit 4d9ed6a

2 files changed

Lines changed: 133 additions & 7 deletions

File tree

protocol/logger/job_logger.go

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import (
66
"errors"
77
"fmt"
88
"io"
9+
"log"
910
"net/http"
1011
"net/url"
1112
"regexp"
13+
"runtime/debug"
1214
"strings"
1315
"sync"
1416
"sync/atomic"
@@ -193,7 +195,7 @@ func (logger *WebsocketLiveloggerWithFallback) SendLog(wrapper *protocol.Timelin
193195
if currentLogger == nil {
194196
currentLogger = logger.initialize()
195197
if currentLogger == nil {
196-
return errors.New("SendLog failure")
198+
return errors.New("initialize failure")
197199
}
198200
}
199201
err := currentLogger.SendLog(wrapper)
@@ -237,18 +239,47 @@ func (logger *WebsocketLiveloggerWithFallback) SendLog(wrapper *protocol.Timelin
237239

238240
type internalBufferedLiveLoggerData struct {
239241
logchan chan *protocol.TimelineRecordFeedLinesWrapper
242+
logdrain chan struct{}
240243
logfinished chan struct{}
241244
}
242245

246+
// IsZero returns true if the struct is closed
247+
// * internalBufferedLiveLoggerData is replaced by a zero struct instead of nil to not auto restart
248+
func (i *internalBufferedLiveLoggerData) IsZero() bool {
249+
return i == nil || i.logchan == nil || i.logdrain == nil || i.logfinished == nil
250+
}
251+
243252
type BufferedLiveLogger struct {
244253
LiveLogger
245254
data atomic.Pointer[internalBufferedLiveLoggerData]
246255
}
247256

248-
func (logger *BufferedLiveLogger) sendLogs(logchan chan *protocol.TimelineRecordFeedLinesWrapper, logfinished chan struct{}) {
257+
func (logger *BufferedLiveLogger) sendLogs(logchan chan *protocol.TimelineRecordFeedLinesWrapper, logdrain chan struct{}, logfinished chan struct{}) {
258+
defer func() {
259+
if r := recover(); r != nil {
260+
log.Printf("panic recovered: %v\n%s", r, debug.Stack())
261+
}
262+
}()
249263
defer close(logfinished)
264+
var shouldDrain bool
250265
for {
251-
lines, ok := <-logchan
266+
var lines *protocol.TimelineRecordFeedLinesWrapper
267+
var ok bool
268+
if !shouldDrain {
269+
select {
270+
case lines, ok = <-logchan:
271+
case <-logdrain:
272+
shouldDrain = true
273+
}
274+
}
275+
if shouldDrain {
276+
// Now try read
277+
select {
278+
case lines, ok = <-logchan:
279+
default:
280+
ok = false
281+
}
282+
}
252283
if !ok {
253284
return
254285
}
@@ -291,26 +322,36 @@ func (logger *BufferedLiveLogger) sendLogs(logchan chan *protocol.TimelineRecord
291322
}
292323

293324
func (logger *BufferedLiveLogger) Close() error {
294-
if data := logger.data.Swap(nil); data != nil {
295-
close(data.logchan)
325+
if data := logger.data.Swap(&internalBufferedLiveLoggerData{}); !data.IsZero() {
326+
// Keep logchan open to avoid races and just let it be freed
327+
close(data.logdrain)
296328
<-data.logfinished
297329
}
298330
return nil
299331
}
300332

301333
func (logger *BufferedLiveLogger) SendLog(wrapper *protocol.TimelineRecordFeedLinesWrapper) error {
302334
if data := logger.data.Load(); data != nil {
303-
data.logchan <- wrapper
335+
if data.IsZero() {
336+
return errors.New("buffered live logger is closed")
337+
}
338+
select {
339+
case <-data.logdrain:
340+
return errors.New("buffered live logger closing")
341+
default:
342+
data.logchan <- wrapper
343+
}
304344
} else {
305345
logchan := make(chan *protocol.TimelineRecordFeedLinesWrapper, websocketPingSize)
306346
logfinished := make(chan struct{})
307347
ndata := internalBufferedLiveLoggerData{
308348
logchan: logchan,
349+
logdrain: make(chan struct{}),
309350
logfinished: logfinished,
310351
}
311352
if logger.data.CompareAndSwap(data, &ndata) {
312353
ndata.logchan <- wrapper
313-
go logger.sendLogs(logchan, logfinished)
354+
go logger.sendLogs(logchan, ndata.logdrain, logfinished)
314355
} else {
315356
close(ndata.logchan)
316357
close(ndata.logfinished)

protocol/logger/job_logger_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package logger
22

33
import (
4+
"fmt"
45
"testing"
6+
"time"
57

68
"github.com/ChristopherHX/github-act-runner/protocol"
9+
"github.com/stretchr/testify/require"
710
)
811

912
const (
@@ -35,3 +38,85 @@ func TestJobLogger(t *testing.T) {
3538
t.FailNow()
3639
}
3740
}
41+
42+
type TestLiveLogger struct {
43+
*testing.T
44+
}
45+
46+
// Close implements [LiveLogger].
47+
func (t *TestLiveLogger) Close() error {
48+
return nil
49+
}
50+
51+
// SendLog implements [LiveLogger].
52+
func (t *TestLiveLogger) SendLog(lines *protocol.TimelineRecordFeedLinesWrapper) error {
53+
return nil
54+
}
55+
56+
func getTestLiveLogger(t *testing.T) LiveLogger {
57+
return &TestLiveLogger{
58+
T: t,
59+
}
60+
}
61+
62+
func TestBufferedLiveLoggerDrain(t *testing.T) {
63+
bufferedLogger := &BufferedLiveLogger{
64+
LiveLogger: getTestLiveLogger(t),
65+
}
66+
67+
var logchan chan *protocol.TimelineRecordFeedLinesWrapper = make(chan *protocol.TimelineRecordFeedLinesWrapper)
68+
var logdrain chan struct{} = make(chan struct{})
69+
var logfinished chan struct{} = make(chan struct{})
70+
71+
t.Run("forwardLogs", func(t *testing.T) {
72+
t.Parallel()
73+
bufferedLogger.sendLogs(logchan, logdrain, logfinished)
74+
})
75+
76+
t.Run("sendLogs", func(t *testing.T) {
77+
t.Parallel()
78+
logchan <- &protocol.TimelineRecordFeedLinesWrapper{
79+
Count: 1,
80+
Value: []string{"line1"},
81+
}
82+
logchan <- &protocol.TimelineRecordFeedLinesWrapper{
83+
Count: 1,
84+
Value: []string{"line2"},
85+
}
86+
close(logdrain)
87+
<-logfinished
88+
})
89+
}
90+
91+
func TestBufferedLiveLogger(t *testing.T) {
92+
bufferedLogger := &BufferedLiveLogger{
93+
LiveLogger: getTestLiveLogger(t),
94+
}
95+
96+
require.NoError(t, bufferedLogger.SendLog(&protocol.TimelineRecordFeedLinesWrapper{
97+
Count: 1,
98+
Value: []string{"line1"},
99+
}))
100+
101+
t.Run("close", func(t *testing.T) {
102+
t.Parallel()
103+
time.Sleep(1122 * time.Microsecond)
104+
require.NoError(t, bufferedLogger.Close())
105+
})
106+
107+
t.Run("sendLogs", func(t *testing.T) {
108+
t.Parallel()
109+
var err error
110+
for i := range 100 {
111+
time.Sleep(1 * time.Millisecond)
112+
err = bufferedLogger.SendLog(&protocol.TimelineRecordFeedLinesWrapper{
113+
Count: 1,
114+
Value: []string{fmt.Sprintf("line %v", (i + 1))},
115+
})
116+
if err != nil {
117+
break
118+
}
119+
}
120+
require.Error(t, err)
121+
})
122+
}

0 commit comments

Comments
 (0)