diff --git a/packages/orchestrator/pkg/template/build/buildlogger/log_entry_logger.go b/packages/orchestrator/pkg/template/build/buildlogger/log_entry_logger.go index 73479cede9..8cba31f3bc 100644 --- a/packages/orchestrator/pkg/template/build/buildlogger/log_entry_logger.go +++ b/packages/orchestrator/pkg/template/build/buildlogger/log_entry_logger.go @@ -1,98 +1,107 @@ package buildlogger import ( - "bytes" - "context" - "encoding/json" + "fmt" "sync" - "time" - "go.uber.org/zap" + "go.uber.org/zap/zapcore" "google.golang.org/protobuf/types/known/timestamppb" template_manager "github.com/e2b-dev/infra/packages/shared/pkg/grpc/template-manager" - "github.com/e2b-dev/infra/packages/shared/pkg/logger" - "github.com/e2b-dev/infra/packages/shared/pkg/logs" ) -type ZapEntry struct { - Ts float64 `json:"ts"` - Msg string `json:"msg"` - Level string `json:"level"` -} - +// LogEntryLogger is a zapcore.Core that captures every log entry into an +// in-memory slice of template_manager.TemplateBuildLogEntry. It replaces the +// older io.Writer-based implementation that round-tripped log lines through +// JSON; consuming entries directly avoids that and removes the need for any +// fallback error-logging context. type LogEntryLogger struct { + zapcore.LevelEnabler + mu sync.Mutex lines []*template_manager.TemplateBuildLogEntry } +// Compile-time assertion: LogEntryLogger implements zapcore.Core. +var _ zapcore.Core = (*LogEntryLogger)(nil) + func NewLogEntryLogger() *LogEntryLogger { return &LogEntryLogger{ - lines: make([]*template_manager.TemplateBuildLogEntry, 0), + LevelEnabler: zapcore.DebugLevel, + lines: make([]*template_manager.TemplateBuildLogEntry, 0), } } -func (b *LogEntryLogger) Write(p []byte) (n int, err error) { - b.mu.Lock() - defer b.mu.Unlock() - - for line := range bytes.SplitSeq(p, []byte("\n")) { - if len(line) > 0 { - fields, err := logs.FlatJsonLogLineParser(string(line)) - if err != nil { - logger.L().Error(context.TODO(), "error parsing log line", zap.Error(err), zap.ByteString("line", line)) - - continue - } - - var entry ZapEntry - err = json.Unmarshal(line, &entry) - if err != nil { - logger.L().Error(context.TODO(), "failed to unmarshal log entry", zap.Error(err), zap.ByteString("line", line)) - - continue - } - - timestamp := epochToTime(entry.Ts) - - delete(fields, "ts") - delete(fields, "msg") - delete(fields, "level") +// With returns a Core that shares this LogEntryLogger's capture buffer but +// carries additional accumulated fields. Returning a child type (rather than +// cloning the LogEntryLogger) keeps a single source of truth for Lines(). +func (b *LogEntryLogger) With(fields []zapcore.Field) zapcore.Core { + return &childCore{ + parent: b, + with: append([]zapcore.Field(nil), fields...), + } +} - b.lines = append(b.lines, &template_manager.TemplateBuildLogEntry{ - Timestamp: timestamppb.New(timestamp), - Message: entry.Msg, - Level: stringToLogLevel(entry.Level), - Fields: fields, - }) - } +func (b *LogEntryLogger) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if b.Enabled(ent.Level) { + return ce.AddCore(ent, b) } - return len(p), nil + return ce +} + +func (b *LogEntryLogger) Write(ent zapcore.Entry, fields []zapcore.Field) error { + return b.write(ent, nil, fields) } func (b *LogEntryLogger) Sync() error { + // In-memory capture; nothing to flush. Acquire the mutex so Sync acts as + // a quiescence barrier: once it returns, no Write is in flight, which + // callers (e.g. on build done/success) rely on to observe a settled + // capture buffer. b.mu.Lock() defer b.mu.Unlock() - // No-op for SafeBuffer, as it doesn't have an underlying file to sync - // But wait for the mutex to ensure no writes are happening return nil } -func stringToLogLevel(level string) template_manager.LogLevel { - switch level { - case "debug": - return template_manager.LogLevel_Debug - case "info": - return template_manager.LogLevel_Info - case "warn": - return template_manager.LogLevel_Warn - case "error": - return template_manager.LogLevel_Error - default: - return template_manager.LogLevel_Info +func (b *LogEntryLogger) write(ent zapcore.Entry, accumulated, fields []zapcore.Field) error { + // Hold the mutex for the entire write (encoding + append) so that Sync, + // which acquires the same mutex, is a true quiescence barrier: once Sync + // returns, no Write is in flight and the capture buffer is settled. + b.mu.Lock() + defer b.mu.Unlock() + + enc := zapcore.NewMapObjectEncoder() + for _, f := range accumulated { + f.AddTo(enc) } + for _, f := range fields { + f.AddTo(enc) + } + + flat := make(map[string]string, len(enc.Fields)) + for k, v := range enc.Fields { + switch t := v.(type) { + case string: + flat[k] = t + case fmt.Stringer: + flat[k] = t.String() + case error: + flat[k] = t.Error() + default: + flat[k] = fmt.Sprint(v) + } + } + + b.lines = append(b.lines, &template_manager.TemplateBuildLogEntry{ + Timestamp: timestamppb.New(ent.Time.UTC()), + Message: ent.Message, + Level: zapLevelToLogLevel(ent.Level), + Fields: flat, + }) + + return nil } func (b *LogEntryLogger) Lines() []*template_manager.TemplateBuildLogEntry { @@ -106,11 +115,48 @@ func (b *LogEntryLogger) Lines() []*template_manager.TemplateBuildLogEntry { return copied } -func epochToTime(epoch float64) time.Time { - // split into integer seconds and fractional part - sec := int64(epoch) - nsec := int64((epoch - float64(sec)) * 1e9) // convert fractional part to nanoseconds +// childCore is the result of LogEntryLogger.With. It shares the parent's +// capture state and only carries additional accumulated fields, so that +// zap's per-logger With(...) chains don't fork the capture buffer. +type childCore struct { + parent *LogEntryLogger + with []zapcore.Field +} + +func (c *childCore) Enabled(l zapcore.Level) bool { return c.parent.Enabled(l) } + +func (c *childCore) With(fields []zapcore.Field) zapcore.Core { + return &childCore{ + parent: c.parent, + with: append(append([]zapcore.Field{}, c.with...), fields...), + } +} + +func (c *childCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if c.Enabled(ent.Level) { + return ce.AddCore(ent, c) + } - // convert to time.Time - return time.Unix(sec, nsec).UTC() + return ce +} + +func (c *childCore) Write(ent zapcore.Entry, fields []zapcore.Field) error { + return c.parent.write(ent, c.with, fields) +} + +func (c *childCore) Sync() error { return c.parent.Sync() } + +func zapLevelToLogLevel(level zapcore.Level) template_manager.LogLevel { + switch level { + case zapcore.DebugLevel: + return template_manager.LogLevel_Debug + case zapcore.InfoLevel: + return template_manager.LogLevel_Info + case zapcore.WarnLevel: + return template_manager.LogLevel_Warn + case zapcore.ErrorLevel, zapcore.DPanicLevel, zapcore.PanicLevel, zapcore.FatalLevel: + return template_manager.LogLevel_Error + default: + return template_manager.LogLevel_Info + } } diff --git a/packages/orchestrator/pkg/template/server/create_template.go b/packages/orchestrator/pkg/template/server/create_template.go index 6ae974f65a..8cc22df7e8 100644 --- a/packages/orchestrator/pkg/template/server/create_template.go +++ b/packages/orchestrator/pkg/template/server/create_template.go @@ -108,10 +108,10 @@ func (s *ServerStore) TemplateCreate(ctx context.Context, templateRequest *templ return nil, fmt.Errorf("error while creating build cache: %w", err) } - // Add new core that will log all messages using logger (zap.Logger) to the logs buffer too - encoder := zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()) - bufferCore := zapcore.NewCore(encoder, logs, zapcore.DebugLevel) - core := zapcore.NewTee(bufferCore, s.buildLogger.Detach(ctx).Core(). + // LogEntryLogger is itself a zapcore.Core that captures every entry into + // an in-memory slice; tee it with the regular build logger so logs go to + // both destinations. + core := zapcore.NewTee(logs, s.buildLogger.Detach(ctx).Core(). With([]zap.Field{ {Type: zapcore.StringType, Key: "envID", String: cfg.GetTemplateID()}, {Type: zapcore.StringType, Key: "buildID", String: metadata.BuildID}, diff --git a/packages/orchestrator/pkg/template/server/template_status_test.go b/packages/orchestrator/pkg/template/server/template_status_test.go index c87aa2eace..c7005112ad 100644 --- a/packages/orchestrator/pkg/template/server/template_status_test.go +++ b/packages/orchestrator/pkg/template/server/template_status_test.go @@ -3,8 +3,6 @@ package server import ( - "encoding/json" - "strings" "testing" "time" @@ -12,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/metric/noop" + "go.uber.org/zap/zapcore" "google.golang.org/protobuf/types/known/timestamppb" "github.com/e2b-dev/infra/packages/orchestrator/pkg/template/build/buildlogger" @@ -102,21 +101,36 @@ func newTestServerStore(t *testing.T, logLines []testLogLine) (*ServerStore, str func writeTestBuildLogs(t *testing.T, buildLogs *buildlogger.LogEntryLogger, lines []testLogLine) { t.Helper() - var input strings.Builder for _, line := range lines { - payload, err := json.Marshal(map[string]any{ - "ts": line.ts, - "msg": line.message, - "level": line.level, - }) + err := buildLogs.Write(zapcore.Entry{ + Level: stringToZapLevel(line.level), + Time: epochToTime(line.ts), + Message: line.message, + }, nil) require.NoError(t, err) + } +} - input.Write(payload) - input.WriteByte('\n') +func stringToZapLevel(level string) zapcore.Level { + switch level { + case "debug": + return zapcore.DebugLevel + case "info": + return zapcore.InfoLevel + case "warn": + return zapcore.WarnLevel + case "error": + return zapcore.ErrorLevel + default: + return zapcore.InfoLevel } +} - _, err := buildLogs.Write([]byte(input.String())) - require.NoError(t, err) +func epochToTime(epoch float64) time.Time { + sec := int64(epoch) + nsec := int64((epoch - float64(sec)) * 1e9) + + return time.Unix(sec, nsec).UTC() } func timeToEpoch(t time.Time) float64 {