Skip to content

Commit ef589b0

Browse files
committed
refactor(orchestrator/buildlogger): implement zapcore.Core directly
LogEntryLogger previously implemented io.Writer and was plugged into a zap pipeline via zapcore.NewCore(jsonEncoder, logEntryLogger, DebugLevel). Each entry was serialized to JSON by the encoder and then parsed back out by Write to recover the message, level, timestamp, and fields. Two of those parse paths could fail and called logger.L().Error(context.TODO(), ...) because Write has no context. Implement zapcore.Core on LogEntryLogger instead: Check, Write(Entry, []Field), Sync, With, plus an embedded zapcore.LevelEnabler. The Entry struct already carries level, time, and message, and []Field is encoded straight into a map[string]string via zapcore.MapObjectEncoder, so the JSON encode/parse round-trip and its failure modes disappear, and with them the two context.TODO() call sites. With returns a childCore that shares the parent's capture slice and mu and only carries the accumulated fields, so logger.With(...) chains do not fork the capture buffer and Lines() still observes every entry. Wire-up in TemplateCreate is simplified: drop the JSON encoder and the zapcore.NewCore wrapper and tee LogEntryLogger directly with the build logger core. Test helper writeTestBuildLogs constructs zapcore.Entry values instead of marshalling synthetic JSON and calling Write([]byte).
1 parent 8c49001 commit ef589b0

3 files changed

Lines changed: 144 additions & 84 deletions

File tree

Lines changed: 114 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,98 +1,107 @@
11
package buildlogger
22

33
import (
4-
"bytes"
5-
"context"
6-
"encoding/json"
4+
"fmt"
75
"sync"
8-
"time"
96

10-
"go.uber.org/zap"
7+
"go.uber.org/zap/zapcore"
118
"google.golang.org/protobuf/types/known/timestamppb"
129

1310
template_manager "github.com/e2b-dev/infra/packages/shared/pkg/grpc/template-manager"
14-
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
15-
"github.com/e2b-dev/infra/packages/shared/pkg/logs"
1611
)
1712

18-
type ZapEntry struct {
19-
Ts float64 `json:"ts"`
20-
Msg string `json:"msg"`
21-
Level string `json:"level"`
22-
}
23-
13+
// LogEntryLogger is a zapcore.Core that captures every log entry into an
14+
// in-memory slice of template_manager.TemplateBuildLogEntry. It replaces the
15+
// older io.Writer-based implementation that round-tripped log lines through
16+
// JSON; consuming entries directly avoids that and removes the need for any
17+
// fallback error-logging context.
2418
type LogEntryLogger struct {
19+
zapcore.LevelEnabler
20+
2521
mu sync.Mutex
2622
lines []*template_manager.TemplateBuildLogEntry
2723
}
2824

25+
// Compile-time assertion: LogEntryLogger implements zapcore.Core.
26+
var _ zapcore.Core = (*LogEntryLogger)(nil)
27+
2928
func NewLogEntryLogger() *LogEntryLogger {
3029
return &LogEntryLogger{
31-
lines: make([]*template_manager.TemplateBuildLogEntry, 0),
30+
LevelEnabler: zapcore.DebugLevel,
31+
lines: make([]*template_manager.TemplateBuildLogEntry, 0),
3232
}
3333
}
3434

35-
func (b *LogEntryLogger) Write(p []byte) (n int, err error) {
36-
b.mu.Lock()
37-
defer b.mu.Unlock()
38-
39-
for line := range bytes.SplitSeq(p, []byte("\n")) {
40-
if len(line) > 0 {
41-
fields, err := logs.FlatJsonLogLineParser(string(line))
42-
if err != nil {
43-
logger.L().Error(context.TODO(), "error parsing log line", zap.Error(err), zap.ByteString("line", line))
44-
45-
continue
46-
}
47-
48-
var entry ZapEntry
49-
err = json.Unmarshal(line, &entry)
50-
if err != nil {
51-
logger.L().Error(context.TODO(), "failed to unmarshal log entry", zap.Error(err), zap.ByteString("line", line))
52-
53-
continue
54-
}
55-
56-
timestamp := epochToTime(entry.Ts)
57-
58-
delete(fields, "ts")
59-
delete(fields, "msg")
60-
delete(fields, "level")
35+
// With returns a Core that shares this LogEntryLogger's capture buffer but
36+
// carries additional accumulated fields. Returning a child type (rather than
37+
// cloning the LogEntryLogger) keeps a single source of truth for Lines().
38+
func (b *LogEntryLogger) With(fields []zapcore.Field) zapcore.Core {
39+
return &childCore{
40+
parent: b,
41+
with: append([]zapcore.Field(nil), fields...),
42+
}
43+
}
6144

62-
b.lines = append(b.lines, &template_manager.TemplateBuildLogEntry{
63-
Timestamp: timestamppb.New(timestamp),
64-
Message: entry.Msg,
65-
Level: stringToLogLevel(entry.Level),
66-
Fields: fields,
67-
})
68-
}
45+
func (b *LogEntryLogger) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
46+
if b.Enabled(ent.Level) {
47+
return ce.AddCore(ent, b)
6948
}
7049

71-
return len(p), nil
50+
return ce
51+
}
52+
53+
func (b *LogEntryLogger) Write(ent zapcore.Entry, fields []zapcore.Field) error {
54+
return b.write(ent, nil, fields)
7255
}
7356

7457
func (b *LogEntryLogger) Sync() error {
58+
// In-memory capture; nothing to flush. Acquire the mutex so Sync acts as
59+
// a quiescence barrier: once it returns, no Write is in flight, which
60+
// callers (e.g. on build done/success) rely on to observe a settled
61+
// capture buffer.
7562
b.mu.Lock()
7663
defer b.mu.Unlock()
7764

78-
// No-op for SafeBuffer, as it doesn't have an underlying file to sync
79-
// But wait for the mutex to ensure no writes are happening
8065
return nil
8166
}
8267

83-
func stringToLogLevel(level string) template_manager.LogLevel {
84-
switch level {
85-
case "debug":
86-
return template_manager.LogLevel_Debug
87-
case "info":
88-
return template_manager.LogLevel_Info
89-
case "warn":
90-
return template_manager.LogLevel_Warn
91-
case "error":
92-
return template_manager.LogLevel_Error
93-
default:
94-
return template_manager.LogLevel_Info
68+
func (b *LogEntryLogger) write(ent zapcore.Entry, accumulated, fields []zapcore.Field) error {
69+
// Hold the mutex for the entire write (encoding + append) so that Sync,
70+
// which acquires the same mutex, is a true quiescence barrier: once Sync
71+
// returns, no Write is in flight and the capture buffer is settled.
72+
b.mu.Lock()
73+
defer b.mu.Unlock()
74+
75+
enc := zapcore.NewMapObjectEncoder()
76+
for _, f := range accumulated {
77+
f.AddTo(enc)
9578
}
79+
for _, f := range fields {
80+
f.AddTo(enc)
81+
}
82+
83+
flat := make(map[string]string, len(enc.Fields))
84+
for k, v := range enc.Fields {
85+
switch t := v.(type) {
86+
case string:
87+
flat[k] = t
88+
case fmt.Stringer:
89+
flat[k] = t.String()
90+
case error:
91+
flat[k] = t.Error()
92+
default:
93+
flat[k] = fmt.Sprint(v)
94+
}
95+
}
96+
97+
b.lines = append(b.lines, &template_manager.TemplateBuildLogEntry{
98+
Timestamp: timestamppb.New(ent.Time.UTC()),
99+
Message: ent.Message,
100+
Level: zapLevelToLogLevel(ent.Level),
101+
Fields: flat,
102+
})
103+
104+
return nil
96105
}
97106

98107
func (b *LogEntryLogger) Lines() []*template_manager.TemplateBuildLogEntry {
@@ -106,11 +115,48 @@ func (b *LogEntryLogger) Lines() []*template_manager.TemplateBuildLogEntry {
106115
return copied
107116
}
108117

109-
func epochToTime(epoch float64) time.Time {
110-
// split into integer seconds and fractional part
111-
sec := int64(epoch)
112-
nsec := int64((epoch - float64(sec)) * 1e9) // convert fractional part to nanoseconds
118+
// childCore is the result of LogEntryLogger.With. It shares the parent's
119+
// capture state and only carries additional accumulated fields, so that
120+
// zap's per-logger With(...) chains don't fork the capture buffer.
121+
type childCore struct {
122+
parent *LogEntryLogger
123+
with []zapcore.Field
124+
}
125+
126+
func (c *childCore) Enabled(l zapcore.Level) bool { return c.parent.Enabled(l) }
127+
128+
func (c *childCore) With(fields []zapcore.Field) zapcore.Core {
129+
return &childCore{
130+
parent: c.parent,
131+
with: append(append([]zapcore.Field{}, c.with...), fields...),
132+
}
133+
}
134+
135+
func (c *childCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
136+
if c.Enabled(ent.Level) {
137+
return ce.AddCore(ent, c)
138+
}
113139

114-
// convert to time.Time
115-
return time.Unix(sec, nsec).UTC()
140+
return ce
141+
}
142+
143+
func (c *childCore) Write(ent zapcore.Entry, fields []zapcore.Field) error {
144+
return c.parent.write(ent, c.with, fields)
145+
}
146+
147+
func (c *childCore) Sync() error { return c.parent.Sync() }
148+
149+
func zapLevelToLogLevel(level zapcore.Level) template_manager.LogLevel {
150+
switch level {
151+
case zapcore.DebugLevel:
152+
return template_manager.LogLevel_Debug
153+
case zapcore.InfoLevel:
154+
return template_manager.LogLevel_Info
155+
case zapcore.WarnLevel:
156+
return template_manager.LogLevel_Warn
157+
case zapcore.ErrorLevel, zapcore.DPanicLevel, zapcore.PanicLevel, zapcore.FatalLevel:
158+
return template_manager.LogLevel_Error
159+
default:
160+
return template_manager.LogLevel_Info
161+
}
116162
}

packages/orchestrator/pkg/template/server/create_template.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,10 @@ func (s *ServerStore) TemplateCreate(ctx context.Context, templateRequest *templ
108108
return nil, fmt.Errorf("error while creating build cache: %w", err)
109109
}
110110

111-
// Add new core that will log all messages using logger (zap.Logger) to the logs buffer too
112-
encoder := zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig())
113-
bufferCore := zapcore.NewCore(encoder, logs, zapcore.DebugLevel)
114-
core := zapcore.NewTee(bufferCore, s.buildLogger.Detach(ctx).Core().
111+
// LogEntryLogger is itself a zapcore.Core that captures every entry into
112+
// an in-memory slice; tee it with the regular build logger so logs go to
113+
// both destinations.
114+
core := zapcore.NewTee(logs, s.buildLogger.Detach(ctx).Core().
115115
With([]zap.Field{
116116
{Type: zapcore.StringType, Key: "envID", String: cfg.GetTemplateID()},
117117
{Type: zapcore.StringType, Key: "buildID", String: metadata.BuildID},

packages/orchestrator/pkg/template/server/template_status_test.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,14 @@
33
package server
44

55
import (
6-
"encoding/json"
7-
"strings"
86
"testing"
97
"time"
108

119
"github.com/google/uuid"
1210
"github.com/stretchr/testify/assert"
1311
"github.com/stretchr/testify/require"
1412
"go.opentelemetry.io/otel/metric/noop"
13+
"go.uber.org/zap/zapcore"
1514
"google.golang.org/protobuf/types/known/timestamppb"
1615

1716
"github.com/e2b-dev/infra/packages/orchestrator/pkg/template/build/buildlogger"
@@ -102,21 +101,36 @@ func newTestServerStore(t *testing.T, logLines []testLogLine) (*ServerStore, str
102101
func writeTestBuildLogs(t *testing.T, buildLogs *buildlogger.LogEntryLogger, lines []testLogLine) {
103102
t.Helper()
104103

105-
var input strings.Builder
106104
for _, line := range lines {
107-
payload, err := json.Marshal(map[string]any{
108-
"ts": line.ts,
109-
"msg": line.message,
110-
"level": line.level,
111-
})
105+
err := buildLogs.Write(zapcore.Entry{
106+
Level: stringToZapLevel(line.level),
107+
Time: epochToTime(line.ts),
108+
Message: line.message,
109+
}, nil)
112110
require.NoError(t, err)
111+
}
112+
}
113113

114-
input.Write(payload)
115-
input.WriteByte('\n')
114+
func stringToZapLevel(level string) zapcore.Level {
115+
switch level {
116+
case "debug":
117+
return zapcore.DebugLevel
118+
case "info":
119+
return zapcore.InfoLevel
120+
case "warn":
121+
return zapcore.WarnLevel
122+
case "error":
123+
return zapcore.ErrorLevel
124+
default:
125+
return zapcore.InfoLevel
116126
}
127+
}
117128

118-
_, err := buildLogs.Write([]byte(input.String()))
119-
require.NoError(t, err)
129+
func epochToTime(epoch float64) time.Time {
130+
sec := int64(epoch)
131+
nsec := int64((epoch - float64(sec)) * 1e9)
132+
133+
return time.Unix(sec, nsec).UTC()
120134
}
121135

122136
func timeToEpoch(t time.Time) float64 {

0 commit comments

Comments
 (0)