From ef589b053b48c244211c8ad7d2f4fb1709fd82fb Mon Sep 17 00:00:00 2001 From: Tomas Virgl <739690+tvi@users.noreply.github.com> Date: Sat, 9 May 2026 22:50:50 -0700 Subject: [PATCH] refactor(orchestrator/buildlogger): implement zapcore.Core directly LogEntryLogger previously implemented io.Writer and was plugged into a zap pipeline via zapcore.NewCore(jsonEncoder, logEntryLogger, DebugLevel). Each entry was serialized to JSON by the encoder and then parsed back out by Write to recover the message, level, timestamp, and fields. Two of those parse paths could fail and called logger.L().Error(context.TODO(), ...) because Write has no context. Implement zapcore.Core on LogEntryLogger instead: Check, Write(Entry, []Field), Sync, With, plus an embedded zapcore.LevelEnabler. The Entry struct already carries level, time, and message, and []Field is encoded straight into a map[string]string via zapcore.MapObjectEncoder, so the JSON encode/parse round-trip and its failure modes disappear, and with them the two context.TODO() call sites. With returns a childCore that shares the parent's capture slice and mu and only carries the accumulated fields, so logger.With(...) chains do not fork the capture buffer and Lines() still observes every entry. Wire-up in TemplateCreate is simplified: drop the JSON encoder and the zapcore.NewCore wrapper and tee LogEntryLogger directly with the build logger core. Test helper writeTestBuildLogs constructs zapcore.Entry values instead of marshalling synthetic JSON and calling Write([]byte). --- .../build/buildlogger/log_entry_logger.go | 182 +++++++++++------- .../pkg/template/server/create_template.go | 8 +- .../template/server/template_status_test.go | 38 ++-- 3 files changed, 144 insertions(+), 84 deletions(-) diff --git a/packages/orchestrator/pkg/template/build/buildlogger/log_entry_logger.go b/packages/orchestrator/pkg/template/build/buildlogger/log_entry_logger.go index 73479cede9..8cba31f3bc 100644 --- a/packages/orchestrator/pkg/template/build/buildlogger/log_entry_logger.go +++ b/packages/orchestrator/pkg/template/build/buildlogger/log_entry_logger.go @@ -1,98 +1,107 @@ package buildlogger import ( - "bytes" - "context" - "encoding/json" + "fmt" "sync" - "time" - "go.uber.org/zap" + "go.uber.org/zap/zapcore" "google.golang.org/protobuf/types/known/timestamppb" template_manager "github.com/e2b-dev/infra/packages/shared/pkg/grpc/template-manager" - "github.com/e2b-dev/infra/packages/shared/pkg/logger" - "github.com/e2b-dev/infra/packages/shared/pkg/logs" ) -type ZapEntry struct { - Ts float64 `json:"ts"` - Msg string `json:"msg"` - Level string `json:"level"` -} - +// LogEntryLogger is a zapcore.Core that captures every log entry into an +// in-memory slice of template_manager.TemplateBuildLogEntry. It replaces the +// older io.Writer-based implementation that round-tripped log lines through +// JSON; consuming entries directly avoids that and removes the need for any +// fallback error-logging context. type LogEntryLogger struct { + zapcore.LevelEnabler + mu sync.Mutex lines []*template_manager.TemplateBuildLogEntry } +// Compile-time assertion: LogEntryLogger implements zapcore.Core. +var _ zapcore.Core = (*LogEntryLogger)(nil) + func NewLogEntryLogger() *LogEntryLogger { return &LogEntryLogger{ - lines: make([]*template_manager.TemplateBuildLogEntry, 0), + LevelEnabler: zapcore.DebugLevel, + lines: make([]*template_manager.TemplateBuildLogEntry, 0), } } -func (b *LogEntryLogger) Write(p []byte) (n int, err error) { - b.mu.Lock() - defer b.mu.Unlock() - - for line := range bytes.SplitSeq(p, []byte("\n")) { - if len(line) > 0 { - fields, err := logs.FlatJsonLogLineParser(string(line)) - if err != nil { - logger.L().Error(context.TODO(), "error parsing log line", zap.Error(err), zap.ByteString("line", line)) - - continue - } - - var entry ZapEntry - err = json.Unmarshal(line, &entry) - if err != nil { - logger.L().Error(context.TODO(), "failed to unmarshal log entry", zap.Error(err), zap.ByteString("line", line)) - - continue - } - - timestamp := epochToTime(entry.Ts) - - delete(fields, "ts") - delete(fields, "msg") - delete(fields, "level") +// With returns a Core that shares this LogEntryLogger's capture buffer but +// carries additional accumulated fields. Returning a child type (rather than +// cloning the LogEntryLogger) keeps a single source of truth for Lines(). +func (b *LogEntryLogger) With(fields []zapcore.Field) zapcore.Core { + return &childCore{ + parent: b, + with: append([]zapcore.Field(nil), fields...), + } +} - b.lines = append(b.lines, &template_manager.TemplateBuildLogEntry{ - Timestamp: timestamppb.New(timestamp), - Message: entry.Msg, - Level: stringToLogLevel(entry.Level), - Fields: fields, - }) - } +func (b *LogEntryLogger) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if b.Enabled(ent.Level) { + return ce.AddCore(ent, b) } - return len(p), nil + return ce +} + +func (b *LogEntryLogger) Write(ent zapcore.Entry, fields []zapcore.Field) error { + return b.write(ent, nil, fields) } func (b *LogEntryLogger) Sync() error { + // In-memory capture; nothing to flush. Acquire the mutex so Sync acts as + // a quiescence barrier: once it returns, no Write is in flight, which + // callers (e.g. on build done/success) rely on to observe a settled + // capture buffer. b.mu.Lock() defer b.mu.Unlock() - // No-op for SafeBuffer, as it doesn't have an underlying file to sync - // But wait for the mutex to ensure no writes are happening return nil } -func stringToLogLevel(level string) template_manager.LogLevel { - switch level { - case "debug": - return template_manager.LogLevel_Debug - case "info": - return template_manager.LogLevel_Info - case "warn": - return template_manager.LogLevel_Warn - case "error": - return template_manager.LogLevel_Error - default: - return template_manager.LogLevel_Info +func (b *LogEntryLogger) write(ent zapcore.Entry, accumulated, fields []zapcore.Field) error { + // Hold the mutex for the entire write (encoding + append) so that Sync, + // which acquires the same mutex, is a true quiescence barrier: once Sync + // returns, no Write is in flight and the capture buffer is settled. + b.mu.Lock() + defer b.mu.Unlock() + + enc := zapcore.NewMapObjectEncoder() + for _, f := range accumulated { + f.AddTo(enc) } + for _, f := range fields { + f.AddTo(enc) + } + + flat := make(map[string]string, len(enc.Fields)) + for k, v := range enc.Fields { + switch t := v.(type) { + case string: + flat[k] = t + case fmt.Stringer: + flat[k] = t.String() + case error: + flat[k] = t.Error() + default: + flat[k] = fmt.Sprint(v) + } + } + + b.lines = append(b.lines, &template_manager.TemplateBuildLogEntry{ + Timestamp: timestamppb.New(ent.Time.UTC()), + Message: ent.Message, + Level: zapLevelToLogLevel(ent.Level), + Fields: flat, + }) + + return nil } func (b *LogEntryLogger) Lines() []*template_manager.TemplateBuildLogEntry { @@ -106,11 +115,48 @@ func (b *LogEntryLogger) Lines() []*template_manager.TemplateBuildLogEntry { return copied } -func epochToTime(epoch float64) time.Time { - // split into integer seconds and fractional part - sec := int64(epoch) - nsec := int64((epoch - float64(sec)) * 1e9) // convert fractional part to nanoseconds +// childCore is the result of LogEntryLogger.With. It shares the parent's +// capture state and only carries additional accumulated fields, so that +// zap's per-logger With(...) chains don't fork the capture buffer. +type childCore struct { + parent *LogEntryLogger + with []zapcore.Field +} + +func (c *childCore) Enabled(l zapcore.Level) bool { return c.parent.Enabled(l) } + +func (c *childCore) With(fields []zapcore.Field) zapcore.Core { + return &childCore{ + parent: c.parent, + with: append(append([]zapcore.Field{}, c.with...), fields...), + } +} + +func (c *childCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if c.Enabled(ent.Level) { + return ce.AddCore(ent, c) + } - // convert to time.Time - return time.Unix(sec, nsec).UTC() + return ce +} + +func (c *childCore) Write(ent zapcore.Entry, fields []zapcore.Field) error { + return c.parent.write(ent, c.with, fields) +} + +func (c *childCore) Sync() error { return c.parent.Sync() } + +func zapLevelToLogLevel(level zapcore.Level) template_manager.LogLevel { + switch level { + case zapcore.DebugLevel: + return template_manager.LogLevel_Debug + case zapcore.InfoLevel: + return template_manager.LogLevel_Info + case zapcore.WarnLevel: + return template_manager.LogLevel_Warn + case zapcore.ErrorLevel, zapcore.DPanicLevel, zapcore.PanicLevel, zapcore.FatalLevel: + return template_manager.LogLevel_Error + default: + return template_manager.LogLevel_Info + } } diff --git a/packages/orchestrator/pkg/template/server/create_template.go b/packages/orchestrator/pkg/template/server/create_template.go index 6ae974f65a..8cc22df7e8 100644 --- a/packages/orchestrator/pkg/template/server/create_template.go +++ b/packages/orchestrator/pkg/template/server/create_template.go @@ -108,10 +108,10 @@ func (s *ServerStore) TemplateCreate(ctx context.Context, templateRequest *templ return nil, fmt.Errorf("error while creating build cache: %w", err) } - // Add new core that will log all messages using logger (zap.Logger) to the logs buffer too - encoder := zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()) - bufferCore := zapcore.NewCore(encoder, logs, zapcore.DebugLevel) - core := zapcore.NewTee(bufferCore, s.buildLogger.Detach(ctx).Core(). + // LogEntryLogger is itself a zapcore.Core that captures every entry into + // an in-memory slice; tee it with the regular build logger so logs go to + // both destinations. + core := zapcore.NewTee(logs, s.buildLogger.Detach(ctx).Core(). With([]zap.Field{ {Type: zapcore.StringType, Key: "envID", String: cfg.GetTemplateID()}, {Type: zapcore.StringType, Key: "buildID", String: metadata.BuildID}, diff --git a/packages/orchestrator/pkg/template/server/template_status_test.go b/packages/orchestrator/pkg/template/server/template_status_test.go index c87aa2eace..c7005112ad 100644 --- a/packages/orchestrator/pkg/template/server/template_status_test.go +++ b/packages/orchestrator/pkg/template/server/template_status_test.go @@ -3,8 +3,6 @@ package server import ( - "encoding/json" - "strings" "testing" "time" @@ -12,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/metric/noop" + "go.uber.org/zap/zapcore" "google.golang.org/protobuf/types/known/timestamppb" "github.com/e2b-dev/infra/packages/orchestrator/pkg/template/build/buildlogger" @@ -102,21 +101,36 @@ func newTestServerStore(t *testing.T, logLines []testLogLine) (*ServerStore, str func writeTestBuildLogs(t *testing.T, buildLogs *buildlogger.LogEntryLogger, lines []testLogLine) { t.Helper() - var input strings.Builder for _, line := range lines { - payload, err := json.Marshal(map[string]any{ - "ts": line.ts, - "msg": line.message, - "level": line.level, - }) + err := buildLogs.Write(zapcore.Entry{ + Level: stringToZapLevel(line.level), + Time: epochToTime(line.ts), + Message: line.message, + }, nil) require.NoError(t, err) + } +} - input.Write(payload) - input.WriteByte('\n') +func stringToZapLevel(level string) zapcore.Level { + switch level { + case "debug": + return zapcore.DebugLevel + case "info": + return zapcore.InfoLevel + case "warn": + return zapcore.WarnLevel + case "error": + return zapcore.ErrorLevel + default: + return zapcore.InfoLevel } +} - _, err := buildLogs.Write([]byte(input.String())) - require.NoError(t, err) +func epochToTime(epoch float64) time.Time { + sec := int64(epoch) + nsec := int64((epoch - float64(sec)) * 1e9) + + return time.Unix(sec, nsec).UTC() } func timeToEpoch(t time.Time) float64 {