Skip to content

Commit 7567211

Browse files
committed
refactor(orchestrator/buildlogger): implement zapcore.Core directly
LogEntryLogger previously implemented io.Writer and was plugged into a zap pipeline via zapcore.NewCore(jsonEncoder, logEntryLogger, DebugLevel). Each entry was serialized to JSON by the encoder and then parsed back out by Write to recover the message, level, timestamp, and fields. Two of those parse paths could fail and called logger.L().Error(context.TODO(), ...) because Write has no context. Implement zapcore.Core on LogEntryLogger instead: Check, Write(Entry, []Field), Sync, With, plus an embedded zapcore.LevelEnabler. The Entry struct already carries level, time, and message, and []Field is encoded straight into a map[string]string via zapcore.MapObjectEncoder, so the JSON encode/parse round-trip and its failure modes disappear, and with them the two context.TODO() call sites. With returns a childCore that shares the parent's capture slice and mu and only carries the accumulated fields, so logger.With(...) chains do not fork the capture buffer and Lines() still observes every entry. Wire-up in TemplateCreate is simplified: drop the JSON encoder and the zapcore.NewCore wrapper and tee LogEntryLogger directly with the build logger core. Test helper writeTestBuildLogs constructs zapcore.Entry values instead of marshalling synthetic JSON and calling Write([]byte).
1 parent 91d00d4 commit 7567211

3 files changed

Lines changed: 139 additions & 84 deletions

File tree

Lines changed: 109 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,100 +1,104 @@
11
package buildlogger
22

33
import (
4-
"bytes"
5-
"context"
6-
"encoding/json"
4+
"fmt"
75
"sync"
8-
"time"
96

10-
"go.uber.org/zap"
7+
"go.uber.org/zap/zapcore"
118
"google.golang.org/protobuf/types/known/timestamppb"
129

1310
template_manager "github.com/e2b-dev/infra/packages/shared/pkg/grpc/template-manager"
14-
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
15-
"github.com/e2b-dev/infra/packages/shared/pkg/logs"
1611
)
1712

18-
type ZapEntry struct {
19-
Ts float64 `json:"ts"`
20-
Msg string `json:"msg"`
21-
Level string `json:"level"`
22-
}
23-
13+
// LogEntryLogger is a zapcore.Core that captures every log entry into an
14+
// in-memory slice of template_manager.TemplateBuildLogEntry. It replaces the
15+
// older io.Writer-based implementation that round-tripped log lines through
16+
// JSON; consuming entries directly avoids that and removes the need for any
17+
// fallback error-logging context.
2418
type LogEntryLogger struct {
19+
zapcore.LevelEnabler
20+
2521
mu sync.Mutex
2622
lines []*template_manager.TemplateBuildLogEntry
2723
}
2824

25+
// Compile-time assertion: LogEntryLogger implements zapcore.Core.
26+
var _ zapcore.Core = (*LogEntryLogger)(nil)
27+
2928
func NewLogEntryLogger() *LogEntryLogger {
3029
return &LogEntryLogger{
31-
lines: make([]*template_manager.TemplateBuildLogEntry, 0),
30+
LevelEnabler: zapcore.DebugLevel,
31+
lines: make([]*template_manager.TemplateBuildLogEntry, 0),
3232
}
3333
}
3434

35-
func (b *LogEntryLogger) Write(p []byte) (n int, err error) {
36-
b.mu.Lock()
37-
defer b.mu.Unlock()
38-
39-
for line := range bytes.SplitSeq(p, []byte("\n")) {
40-
if len(line) > 0 {
41-
fields, err := logs.FlatJsonLogLineParser(string(line))
42-
if err != nil {
43-
logger.L().Error(context.TODO(), "error parsing log line", zap.Error(err), zap.ByteString("line", line))
35+
// With returns a Core that shares this LogEntryLogger's capture buffer but
36+
// carries additional accumulated fields. Returning a child type (rather than
37+
// cloning the LogEntryLogger) keeps a single source of truth for Lines().
38+
func (b *LogEntryLogger) With(fields []zapcore.Field) zapcore.Core {
39+
return &childCore{
40+
parent: b,
41+
with: append([]zapcore.Field(nil), fields...),
42+
}
43+
}
4444

45-
continue
46-
}
45+
func (b *LogEntryLogger) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
46+
if b.Enabled(ent.Level) {
47+
return ce.AddCore(ent, b)
48+
}
4749

48-
var entry ZapEntry
49-
err = json.Unmarshal(line, &entry)
50-
if err != nil {
51-
logger.L().Error(context.TODO(), "failed to unmarshal log entry", zap.Error(err), zap.ByteString("line", line))
50+
return ce
51+
}
5252

53-
continue
54-
}
53+
func (b *LogEntryLogger) Write(ent zapcore.Entry, fields []zapcore.Field) error {
54+
return b.write(ent, nil, fields)
55+
}
5556

56-
timestamp := epochToTime(entry.Ts)
57+
func (b *LogEntryLogger) Sync() error {
58+
// In-memory capture; nothing to flush. Acquire the mutex so Sync acts as
59+
// a quiescence barrier: once it returns, no Write is in flight, which
60+
// callers (e.g. on build done/success) rely on to observe a settled
61+
// capture buffer.
62+
b.mu.Lock()
63+
defer b.mu.Unlock()
64+
return nil
65+
}
5766

58-
delete(fields, "ts")
59-
delete(fields, "msg")
60-
delete(fields, "level")
67+
func (b *LogEntryLogger) write(ent zapcore.Entry, accumulated, fields []zapcore.Field) error {
68+
enc := zapcore.NewMapObjectEncoder()
69+
for _, f := range accumulated {
70+
f.AddTo(enc)
71+
}
72+
for _, f := range fields {
73+
f.AddTo(enc)
74+
}
6175

62-
b.lines = append(b.lines, &template_manager.TemplateBuildLogEntry{
63-
Timestamp: timestamppb.New(timestamp),
64-
Message: entry.Msg,
65-
Level: stringToLogLevel(entry.Level),
66-
Fields: fields,
67-
})
76+
flat := make(map[string]string, len(enc.Fields))
77+
for k, v := range enc.Fields {
78+
switch t := v.(type) {
79+
case string:
80+
flat[k] = t
81+
case fmt.Stringer:
82+
flat[k] = t.String()
83+
case error:
84+
flat[k] = t.Error()
85+
default:
86+
flat[k] = fmt.Sprint(v)
6887
}
6988
}
7089

71-
return len(p), nil
72-
}
73-
74-
func (b *LogEntryLogger) Sync() error {
7590
b.mu.Lock()
7691
defer b.mu.Unlock()
92+
b.lines = append(b.lines, &template_manager.TemplateBuildLogEntry{
93+
Timestamp: timestamppb.New(ent.Time.UTC()),
94+
Message: ent.Message,
95+
Level: zapLevelToLogLevel(ent.Level),
96+
Fields: flat,
97+
})
7798

78-
// No-op for SafeBuffer, as it doesn't have an underlying file to sync
79-
// But wait for the mutex to ensure no writes are happening
8099
return nil
81100
}
82101

83-
func stringToLogLevel(level string) template_manager.LogLevel {
84-
switch level {
85-
case "debug":
86-
return template_manager.LogLevel_Debug
87-
case "info":
88-
return template_manager.LogLevel_Info
89-
case "warn":
90-
return template_manager.LogLevel_Warn
91-
case "error":
92-
return template_manager.LogLevel_Error
93-
default:
94-
return template_manager.LogLevel_Info
95-
}
96-
}
97-
98102
func (b *LogEntryLogger) Lines() []*template_manager.TemplateBuildLogEntry {
99103
b.mu.Lock()
100104
defer b.mu.Unlock()
@@ -106,11 +110,48 @@ func (b *LogEntryLogger) Lines() []*template_manager.TemplateBuildLogEntry {
106110
return copied
107111
}
108112

109-
func epochToTime(epoch float64) time.Time {
110-
// split into integer seconds and fractional part
111-
sec := int64(epoch)
112-
nsec := int64((epoch - float64(sec)) * 1e9) // convert fractional part to nanoseconds
113+
// childCore is the result of LogEntryLogger.With. It shares the parent's
114+
// capture state and only carries additional accumulated fields, so that
115+
// zap's per-logger With(...) chains don't fork the capture buffer.
116+
type childCore struct {
117+
parent *LogEntryLogger
118+
with []zapcore.Field
119+
}
120+
121+
func (c *childCore) Enabled(l zapcore.Level) bool { return c.parent.Enabled(l) }
122+
123+
func (c *childCore) With(fields []zapcore.Field) zapcore.Core {
124+
return &childCore{
125+
parent: c.parent,
126+
with: append(append([]zapcore.Field{}, c.with...), fields...),
127+
}
128+
}
129+
130+
func (c *childCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
131+
if c.Enabled(ent.Level) {
132+
return ce.AddCore(ent, c)
133+
}
134+
135+
return ce
136+
}
137+
138+
func (c *childCore) Write(ent zapcore.Entry, fields []zapcore.Field) error {
139+
return c.parent.write(ent, c.with, fields)
140+
}
113141

114-
// convert to time.Time
115-
return time.Unix(sec, nsec).UTC()
142+
func (c *childCore) Sync() error { return c.parent.Sync() }
143+
144+
func zapLevelToLogLevel(level zapcore.Level) template_manager.LogLevel {
145+
switch level {
146+
case zapcore.DebugLevel:
147+
return template_manager.LogLevel_Debug
148+
case zapcore.InfoLevel:
149+
return template_manager.LogLevel_Info
150+
case zapcore.WarnLevel:
151+
return template_manager.LogLevel_Warn
152+
case zapcore.ErrorLevel, zapcore.DPanicLevel, zapcore.PanicLevel, zapcore.FatalLevel:
153+
return template_manager.LogLevel_Error
154+
default:
155+
return template_manager.LogLevel_Info
156+
}
116157
}

packages/orchestrator/pkg/template/server/create_template.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,10 @@ func (s *ServerStore) TemplateCreate(ctx context.Context, templateRequest *templ
108108
return nil, fmt.Errorf("error while creating build cache: %w", err)
109109
}
110110

111-
// Add new core that will log all messages using logger (zap.Logger) to the logs buffer too
112-
encoder := zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig())
113-
bufferCore := zapcore.NewCore(encoder, logs, zapcore.DebugLevel)
114-
core := zapcore.NewTee(bufferCore, s.buildLogger.Detach(ctx).Core().
111+
// LogEntryLogger is itself a zapcore.Core that captures every entry into
112+
// an in-memory slice; tee it with the regular build logger so logs go to
113+
// both destinations.
114+
core := zapcore.NewTee(logs, s.buildLogger.Detach(ctx).Core().
115115
With([]zap.Field{
116116
{Type: zapcore.StringType, Key: "envID", String: cfg.GetTemplateID()},
117117
{Type: zapcore.StringType, Key: "buildID", String: metadata.BuildID},

packages/orchestrator/pkg/template/server/template_status_test.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,14 @@
33
package server
44

55
import (
6-
"encoding/json"
7-
"strings"
86
"testing"
97
"time"
108

119
"github.com/google/uuid"
1210
"github.com/stretchr/testify/assert"
1311
"github.com/stretchr/testify/require"
1412
"go.opentelemetry.io/otel/metric/noop"
13+
"go.uber.org/zap/zapcore"
1514
"google.golang.org/protobuf/types/known/timestamppb"
1615

1716
"github.com/e2b-dev/infra/packages/orchestrator/pkg/template/build/buildlogger"
@@ -102,21 +101,36 @@ func newTestServerStore(t *testing.T, logLines []testLogLine) (*ServerStore, str
102101
func writeTestBuildLogs(t *testing.T, buildLogs *buildlogger.LogEntryLogger, lines []testLogLine) {
103102
t.Helper()
104103

105-
var input strings.Builder
106104
for _, line := range lines {
107-
payload, err := json.Marshal(map[string]any{
108-
"ts": line.ts,
109-
"msg": line.message,
110-
"level": line.level,
111-
})
105+
err := buildLogs.Write(zapcore.Entry{
106+
Level: stringToZapLevel(line.level),
107+
Time: epochToTime(line.ts),
108+
Message: line.message,
109+
}, nil)
112110
require.NoError(t, err)
111+
}
112+
}
113113

114-
input.Write(payload)
115-
input.WriteByte('\n')
114+
func stringToZapLevel(level string) zapcore.Level {
115+
switch level {
116+
case "debug":
117+
return zapcore.DebugLevel
118+
case "info":
119+
return zapcore.InfoLevel
120+
case "warn":
121+
return zapcore.WarnLevel
122+
case "error":
123+
return zapcore.ErrorLevel
124+
default:
125+
return zapcore.InfoLevel
116126
}
127+
}
117128

118-
_, err := buildLogs.Write([]byte(input.String()))
119-
require.NoError(t, err)
129+
func epochToTime(epoch float64) time.Time {
130+
sec := int64(epoch)
131+
nsec := int64((epoch - float64(sec)) * 1e9)
132+
133+
return time.Unix(sec, nsec).UTC()
120134
}
121135

122136
func timeToEpoch(t time.Time) float64 {

0 commit comments

Comments
 (0)