Skip to content

Commit bc25872

Browse files
committed
refactor(orchestrator/buildlogger): implement zapcore.Core directly
LogEntryLogger previously implemented io.Writer and was plugged into a zap pipeline via zapcore.NewCore(jsonEncoder, logEntryLogger, DebugLevel). Each entry was serialized to JSON by the encoder and then parsed back out by Write to recover the message, level, timestamp, and fields. Two of those parse paths could fail and called logger.L().Error(context.TODO(), ...) because Write has no context. Implement zapcore.Core on LogEntryLogger instead: Check, Write(Entry, []Field), Sync, With, plus an embedded zapcore.LevelEnabler. The Entry struct already carries level, time, and message, and []Field is encoded straight into a map[string]string via zapcore.MapObjectEncoder, so the JSON encode/parse round-trip and its failure modes disappear, and with them the two context.TODO() call sites. With returns a childCore that shares the parent's capture slice and mu and only carries the accumulated fields, so logger.With(...) chains do not fork the capture buffer and Lines() still observes every entry. Wire-up in TemplateCreate is simplified: drop the JSON encoder and the zapcore.NewCore wrapper and tee LogEntryLogger directly with the build logger core. Test helper writeTestBuildLogs constructs zapcore.Entry values instead of marshalling synthetic JSON and calling Write([]byte).
1 parent 8c49001 commit bc25872

3 files changed

Lines changed: 139 additions & 83 deletions

File tree

Lines changed: 109 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,100 +1,105 @@
11
package buildlogger
22

33
import (
4-
"bytes"
5-
"context"
6-
"encoding/json"
4+
"fmt"
75
"sync"
8-
"time"
96

10-
"go.uber.org/zap"
7+
"go.uber.org/zap/zapcore"
118
"google.golang.org/protobuf/types/known/timestamppb"
129

1310
template_manager "github.com/e2b-dev/infra/packages/shared/pkg/grpc/template-manager"
14-
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
15-
"github.com/e2b-dev/infra/packages/shared/pkg/logs"
1611
)
1712

18-
type ZapEntry struct {
19-
Ts float64 `json:"ts"`
20-
Msg string `json:"msg"`
21-
Level string `json:"level"`
22-
}
23-
13+
// LogEntryLogger is a zapcore.Core that captures every log entry into an
14+
// in-memory slice of template_manager.TemplateBuildLogEntry. It replaces the
15+
// older io.Writer-based implementation that round-tripped log lines through
16+
// JSON; consuming entries directly avoids that and removes the need for any
17+
// fallback error-logging context.
2418
type LogEntryLogger struct {
19+
zapcore.LevelEnabler
20+
2521
mu sync.Mutex
2622
lines []*template_manager.TemplateBuildLogEntry
2723
}
2824

25+
// Compile-time assertion: LogEntryLogger implements zapcore.Core.
26+
var _ zapcore.Core = (*LogEntryLogger)(nil)
27+
2928
func NewLogEntryLogger() *LogEntryLogger {
3029
return &LogEntryLogger{
31-
lines: make([]*template_manager.TemplateBuildLogEntry, 0),
30+
LevelEnabler: zapcore.DebugLevel,
31+
lines: make([]*template_manager.TemplateBuildLogEntry, 0),
3232
}
3333
}
3434

35-
func (b *LogEntryLogger) Write(p []byte) (n int, err error) {
36-
b.mu.Lock()
37-
defer b.mu.Unlock()
35+
// With returns a Core that shares this LogEntryLogger's capture buffer but
36+
// carries additional accumulated fields. Returning a child type (rather than
37+
// cloning the LogEntryLogger) keeps a single source of truth for Lines().
38+
func (b *LogEntryLogger) With(fields []zapcore.Field) zapcore.Core {
39+
return &childCore{
40+
parent: b,
41+
with: append([]zapcore.Field(nil), fields...),
42+
}
43+
}
3844

39-
for line := range bytes.SplitSeq(p, []byte("\n")) {
40-
if len(line) > 0 {
41-
fields, err := logs.FlatJsonLogLineParser(string(line))
42-
if err != nil {
43-
logger.L().Error(context.TODO(), "error parsing log line", zap.Error(err), zap.ByteString("line", line))
45+
func (b *LogEntryLogger) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
46+
if b.Enabled(ent.Level) {
47+
return ce.AddCore(ent, b)
48+
}
4449

45-
continue
46-
}
50+
return ce
51+
}
4752

48-
var entry ZapEntry
49-
err = json.Unmarshal(line, &entry)
50-
if err != nil {
51-
logger.L().Error(context.TODO(), "failed to unmarshal log entry", zap.Error(err), zap.ByteString("line", line))
53+
func (b *LogEntryLogger) Write(ent zapcore.Entry, fields []zapcore.Field) error {
54+
return b.write(ent, nil, fields)
55+
}
5256

53-
continue
54-
}
57+
func (b *LogEntryLogger) Sync() error {
58+
// In-memory capture; nothing to flush. Acquire the mutex so Sync acts as
59+
// a quiescence barrier: once it returns, no Write is in flight, which
60+
// callers (e.g. on build done/success) rely on to observe a settled
61+
// capture buffer.
62+
b.mu.Lock()
63+
defer b.mu.Unlock()
5564

56-
timestamp := epochToTime(entry.Ts)
65+
return nil
66+
}
5767

58-
delete(fields, "ts")
59-
delete(fields, "msg")
60-
delete(fields, "level")
68+
func (b *LogEntryLogger) write(ent zapcore.Entry, accumulated, fields []zapcore.Field) error {
69+
enc := zapcore.NewMapObjectEncoder()
70+
for _, f := range accumulated {
71+
f.AddTo(enc)
72+
}
73+
for _, f := range fields {
74+
f.AddTo(enc)
75+
}
6176

62-
b.lines = append(b.lines, &template_manager.TemplateBuildLogEntry{
63-
Timestamp: timestamppb.New(timestamp),
64-
Message: entry.Msg,
65-
Level: stringToLogLevel(entry.Level),
66-
Fields: fields,
67-
})
77+
flat := make(map[string]string, len(enc.Fields))
78+
for k, v := range enc.Fields {
79+
switch t := v.(type) {
80+
case string:
81+
flat[k] = t
82+
case fmt.Stringer:
83+
flat[k] = t.String()
84+
case error:
85+
flat[k] = t.Error()
86+
default:
87+
flat[k] = fmt.Sprint(v)
6888
}
6989
}
7090

71-
return len(p), nil
72-
}
73-
74-
func (b *LogEntryLogger) Sync() error {
7591
b.mu.Lock()
7692
defer b.mu.Unlock()
93+
b.lines = append(b.lines, &template_manager.TemplateBuildLogEntry{
94+
Timestamp: timestamppb.New(ent.Time.UTC()),
95+
Message: ent.Message,
96+
Level: zapLevelToLogLevel(ent.Level),
97+
Fields: flat,
98+
})
7799

78-
// No-op for SafeBuffer, as it doesn't have an underlying file to sync
79-
// But wait for the mutex to ensure no writes are happening
80100
return nil
81101
}
82102

83-
func stringToLogLevel(level string) template_manager.LogLevel {
84-
switch level {
85-
case "debug":
86-
return template_manager.LogLevel_Debug
87-
case "info":
88-
return template_manager.LogLevel_Info
89-
case "warn":
90-
return template_manager.LogLevel_Warn
91-
case "error":
92-
return template_manager.LogLevel_Error
93-
default:
94-
return template_manager.LogLevel_Info
95-
}
96-
}
97-
98103
func (b *LogEntryLogger) Lines() []*template_manager.TemplateBuildLogEntry {
99104
b.mu.Lock()
100105
defer b.mu.Unlock()
@@ -106,11 +111,48 @@ func (b *LogEntryLogger) Lines() []*template_manager.TemplateBuildLogEntry {
106111
return copied
107112
}
108113

109-
func epochToTime(epoch float64) time.Time {
110-
// split into integer seconds and fractional part
111-
sec := int64(epoch)
112-
nsec := int64((epoch - float64(sec)) * 1e9) // convert fractional part to nanoseconds
114+
// childCore is the result of LogEntryLogger.With. It shares the parent's
115+
// capture state and only carries additional accumulated fields, so that
116+
// zap's per-logger With(...) chains don't fork the capture buffer.
117+
type childCore struct {
118+
parent *LogEntryLogger
119+
with []zapcore.Field
120+
}
121+
122+
func (c *childCore) Enabled(l zapcore.Level) bool { return c.parent.Enabled(l) }
113123

114-
// convert to time.Time
115-
return time.Unix(sec, nsec).UTC()
124+
func (c *childCore) With(fields []zapcore.Field) zapcore.Core {
125+
return &childCore{
126+
parent: c.parent,
127+
with: append(append([]zapcore.Field{}, c.with...), fields...),
128+
}
129+
}
130+
131+
func (c *childCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
132+
if c.Enabled(ent.Level) {
133+
return ce.AddCore(ent, c)
134+
}
135+
136+
return ce
137+
}
138+
139+
func (c *childCore) Write(ent zapcore.Entry, fields []zapcore.Field) error {
140+
return c.parent.write(ent, c.with, fields)
141+
}
142+
143+
func (c *childCore) Sync() error { return c.parent.Sync() }
144+
145+
func zapLevelToLogLevel(level zapcore.Level) template_manager.LogLevel {
146+
switch level {
147+
case zapcore.DebugLevel:
148+
return template_manager.LogLevel_Debug
149+
case zapcore.InfoLevel:
150+
return template_manager.LogLevel_Info
151+
case zapcore.WarnLevel:
152+
return template_manager.LogLevel_Warn
153+
case zapcore.ErrorLevel, zapcore.DPanicLevel, zapcore.PanicLevel, zapcore.FatalLevel:
154+
return template_manager.LogLevel_Error
155+
default:
156+
return template_manager.LogLevel_Info
157+
}
116158
}

packages/orchestrator/pkg/template/server/create_template.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,10 @@ func (s *ServerStore) TemplateCreate(ctx context.Context, templateRequest *templ
108108
return nil, fmt.Errorf("error while creating build cache: %w", err)
109109
}
110110

111-
// Add new core that will log all messages using logger (zap.Logger) to the logs buffer too
112-
encoder := zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig())
113-
bufferCore := zapcore.NewCore(encoder, logs, zapcore.DebugLevel)
114-
core := zapcore.NewTee(bufferCore, s.buildLogger.Detach(ctx).Core().
111+
// LogEntryLogger is itself a zapcore.Core that captures every entry into
112+
// an in-memory slice; tee it with the regular build logger so logs go to
113+
// both destinations.
114+
core := zapcore.NewTee(logs, s.buildLogger.Detach(ctx).Core().
115115
With([]zap.Field{
116116
{Type: zapcore.StringType, Key: "envID", String: cfg.GetTemplateID()},
117117
{Type: zapcore.StringType, Key: "buildID", String: metadata.BuildID},

packages/orchestrator/pkg/template/server/template_status_test.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,14 @@
33
package server
44

55
import (
6-
"encoding/json"
7-
"strings"
86
"testing"
97
"time"
108

119
"github.com/google/uuid"
1210
"github.com/stretchr/testify/assert"
1311
"github.com/stretchr/testify/require"
1412
"go.opentelemetry.io/otel/metric/noop"
13+
"go.uber.org/zap/zapcore"
1514
"google.golang.org/protobuf/types/known/timestamppb"
1615

1716
"github.com/e2b-dev/infra/packages/orchestrator/pkg/template/build/buildlogger"
@@ -102,21 +101,36 @@ func newTestServerStore(t *testing.T, logLines []testLogLine) (*ServerStore, str
102101
func writeTestBuildLogs(t *testing.T, buildLogs *buildlogger.LogEntryLogger, lines []testLogLine) {
103102
t.Helper()
104103

105-
var input strings.Builder
106104
for _, line := range lines {
107-
payload, err := json.Marshal(map[string]any{
108-
"ts": line.ts,
109-
"msg": line.message,
110-
"level": line.level,
111-
})
105+
err := buildLogs.Write(zapcore.Entry{
106+
Level: stringToZapLevel(line.level),
107+
Time: epochToTime(line.ts),
108+
Message: line.message,
109+
}, nil)
112110
require.NoError(t, err)
111+
}
112+
}
113113

114-
input.Write(payload)
115-
input.WriteByte('\n')
114+
func stringToZapLevel(level string) zapcore.Level {
115+
switch level {
116+
case "debug":
117+
return zapcore.DebugLevel
118+
case "info":
119+
return zapcore.InfoLevel
120+
case "warn":
121+
return zapcore.WarnLevel
122+
case "error":
123+
return zapcore.ErrorLevel
124+
default:
125+
return zapcore.InfoLevel
116126
}
127+
}
117128

118-
_, err := buildLogs.Write([]byte(input.String()))
119-
require.NoError(t, err)
129+
func epochToTime(epoch float64) time.Time {
130+
sec := int64(epoch)
131+
nsec := int64((epoch - float64(sec)) * 1e9)
132+
133+
return time.Unix(sec, nsec).UTC()
120134
}
121135

122136
func timeToEpoch(t time.Time) float64 {

0 commit comments

Comments
 (0)