Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,98 +1,107 @@
package buildlogger

import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/types/known/timestamppb"

template_manager "github.com/e2b-dev/infra/packages/shared/pkg/grpc/template-manager"
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
"github.com/e2b-dev/infra/packages/shared/pkg/logs"
)

type ZapEntry struct {
Ts float64 `json:"ts"`
Msg string `json:"msg"`
Level string `json:"level"`
}

// LogEntryLogger is a zapcore.Core that captures every log entry into an
// in-memory slice of template_manager.TemplateBuildLogEntry. It replaces the
// older io.Writer-based implementation that round-tripped log lines through
// JSON; consuming entries directly avoids that and removes the need for any
// fallback error-logging context.
type LogEntryLogger struct {
zapcore.LevelEnabler

mu sync.Mutex
lines []*template_manager.TemplateBuildLogEntry
}

// Compile-time assertion: LogEntryLogger implements zapcore.Core.
var _ zapcore.Core = (*LogEntryLogger)(nil)

func NewLogEntryLogger() *LogEntryLogger {
return &LogEntryLogger{
lines: make([]*template_manager.TemplateBuildLogEntry, 0),
LevelEnabler: zapcore.DebugLevel,
lines: make([]*template_manager.TemplateBuildLogEntry, 0),
}
}

func (b *LogEntryLogger) Write(p []byte) (n int, err error) {
b.mu.Lock()
defer b.mu.Unlock()

for line := range bytes.SplitSeq(p, []byte("\n")) {
if len(line) > 0 {
fields, err := logs.FlatJsonLogLineParser(string(line))
if err != nil {
logger.L().Error(context.TODO(), "error parsing log line", zap.Error(err), zap.ByteString("line", line))

continue
}

var entry ZapEntry
err = json.Unmarshal(line, &entry)
if err != nil {
logger.L().Error(context.TODO(), "failed to unmarshal log entry", zap.Error(err), zap.ByteString("line", line))

continue
}

timestamp := epochToTime(entry.Ts)

delete(fields, "ts")
delete(fields, "msg")
delete(fields, "level")
// With returns a Core that shares this LogEntryLogger's capture buffer but
// carries additional accumulated fields. Returning a child type (rather than
// cloning the LogEntryLogger) keeps a single source of truth for Lines().
func (b *LogEntryLogger) With(fields []zapcore.Field) zapcore.Core {
return &childCore{
parent: b,
with: append([]zapcore.Field(nil), fields...),
}
}

b.lines = append(b.lines, &template_manager.TemplateBuildLogEntry{
Timestamp: timestamppb.New(timestamp),
Message: entry.Msg,
Level: stringToLogLevel(entry.Level),
Fields: fields,
})
}
func (b *LogEntryLogger) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if b.Enabled(ent.Level) {
return ce.AddCore(ent, b)
}

return len(p), nil
return ce
}

func (b *LogEntryLogger) Write(ent zapcore.Entry, fields []zapcore.Field) error {
return b.write(ent, nil, fields)
}

func (b *LogEntryLogger) Sync() error {
// In-memory capture; nothing to flush. Acquire the mutex so Sync acts as
// a quiescence barrier: once it returns, no Write is in flight, which
// callers (e.g. on build done/success) rely on to observe a settled
// capture buffer.
b.mu.Lock()
defer b.mu.Unlock()

// No-op for SafeBuffer, as it doesn't have an underlying file to sync
// But wait for the mutex to ensure no writes are happening
return nil
}

func stringToLogLevel(level string) template_manager.LogLevel {
switch level {
case "debug":
return template_manager.LogLevel_Debug
case "info":
return template_manager.LogLevel_Info
case "warn":
return template_manager.LogLevel_Warn
case "error":
return template_manager.LogLevel_Error
default:
return template_manager.LogLevel_Info
func (b *LogEntryLogger) write(ent zapcore.Entry, accumulated, fields []zapcore.Field) error {
// Hold the mutex for the entire write (encoding + append) so that Sync,
// which acquires the same mutex, is a true quiescence barrier: once Sync
// returns, no Write is in flight and the capture buffer is settled.
b.mu.Lock()
defer b.mu.Unlock()

enc := zapcore.NewMapObjectEncoder()
for _, f := range accumulated {
f.AddTo(enc)
}
for _, f := range fields {
f.AddTo(enc)
}
Comment thread
tvi marked this conversation as resolved.

flat := make(map[string]string, len(enc.Fields))
for k, v := range enc.Fields {
switch t := v.(type) {
case string:
flat[k] = t
case fmt.Stringer:
flat[k] = t.String()
case error:
flat[k] = t.Error()
default:
flat[k] = fmt.Sprint(v)
}
}

b.lines = append(b.lines, &template_manager.TemplateBuildLogEntry{
Timestamp: timestamppb.New(ent.Time.UTC()),
Message: ent.Message,
Level: zapLevelToLogLevel(ent.Level),
Fields: flat,
})
Comment thread
cursor[bot] marked this conversation as resolved.

return nil
}

func (b *LogEntryLogger) Lines() []*template_manager.TemplateBuildLogEntry {
Expand All @@ -106,11 +115,48 @@ func (b *LogEntryLogger) Lines() []*template_manager.TemplateBuildLogEntry {
return copied
}

func epochToTime(epoch float64) time.Time {
// split into integer seconds and fractional part
sec := int64(epoch)
nsec := int64((epoch - float64(sec)) * 1e9) // convert fractional part to nanoseconds
// childCore is the result of LogEntryLogger.With. It shares the parent's
// capture state and only carries additional accumulated fields, so that
// zap's per-logger With(...) chains don't fork the capture buffer.
type childCore struct {
parent *LogEntryLogger
with []zapcore.Field
}

func (c *childCore) Enabled(l zapcore.Level) bool { return c.parent.Enabled(l) }

func (c *childCore) With(fields []zapcore.Field) zapcore.Core {
return &childCore{
parent: c.parent,
with: append(append([]zapcore.Field{}, c.with...), fields...),
}
}

func (c *childCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if c.Enabled(ent.Level) {
return ce.AddCore(ent, c)
}

// convert to time.Time
return time.Unix(sec, nsec).UTC()
return ce
}

func (c *childCore) Write(ent zapcore.Entry, fields []zapcore.Field) error {
return c.parent.write(ent, c.with, fields)
}

func (c *childCore) Sync() error { return c.parent.Sync() }

func zapLevelToLogLevel(level zapcore.Level) template_manager.LogLevel {
switch level {
case zapcore.DebugLevel:
return template_manager.LogLevel_Debug
case zapcore.InfoLevel:
return template_manager.LogLevel_Info
case zapcore.WarnLevel:
return template_manager.LogLevel_Warn
case zapcore.ErrorLevel, zapcore.DPanicLevel, zapcore.PanicLevel, zapcore.FatalLevel:
return template_manager.LogLevel_Error
default:
return template_manager.LogLevel_Info
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ func (s *ServerStore) TemplateCreate(ctx context.Context, templateRequest *templ
return nil, fmt.Errorf("error while creating build cache: %w", err)
}

// Add new core that will log all messages using logger (zap.Logger) to the logs buffer too
encoder := zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig())
bufferCore := zapcore.NewCore(encoder, logs, zapcore.DebugLevel)
core := zapcore.NewTee(bufferCore, s.buildLogger.Detach(ctx).Core().
// LogEntryLogger is itself a zapcore.Core that captures every entry into
// an in-memory slice; tee it with the regular build logger so logs go to
// both destinations.
core := zapcore.NewTee(logs, s.buildLogger.Detach(ctx).Core().
With([]zap.Field{
{Type: zapcore.StringType, Key: "envID", String: cfg.GetTemplateID()},
{Type: zapcore.StringType, Key: "buildID", String: metadata.BuildID},
Expand Down
38 changes: 26 additions & 12 deletions packages/orchestrator/pkg/template/server/template_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
package server

import (
"encoding/json"
"strings"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/e2b-dev/infra/packages/orchestrator/pkg/template/build/buildlogger"
Expand Down Expand Up @@ -102,21 +101,36 @@ func newTestServerStore(t *testing.T, logLines []testLogLine) (*ServerStore, str
func writeTestBuildLogs(t *testing.T, buildLogs *buildlogger.LogEntryLogger, lines []testLogLine) {
t.Helper()

var input strings.Builder
for _, line := range lines {
payload, err := json.Marshal(map[string]any{
"ts": line.ts,
"msg": line.message,
"level": line.level,
})
err := buildLogs.Write(zapcore.Entry{
Level: stringToZapLevel(line.level),
Time: epochToTime(line.ts),
Message: line.message,
}, nil)
require.NoError(t, err)
}
}

input.Write(payload)
input.WriteByte('\n')
func stringToZapLevel(level string) zapcore.Level {
switch level {
case "debug":
return zapcore.DebugLevel
case "info":
return zapcore.InfoLevel
case "warn":
return zapcore.WarnLevel
case "error":
return zapcore.ErrorLevel
default:
return zapcore.InfoLevel
}
}

_, err := buildLogs.Write([]byte(input.String()))
require.NoError(t, err)
func epochToTime(epoch float64) time.Time {
sec := int64(epoch)
nsec := int64((epoch - float64(sec)) * 1e9)

return time.Unix(sec, nsec).UTC()
}

func timeToEpoch(t time.Time) float64 {
Expand Down
Loading