Skip to content
41 changes: 35 additions & 6 deletions packages/envd/internal/host/mmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,32 +135,61 @@
return opts.AccessTokenHash, nil
}

// MMDS poll backoff: start fast for the common happy path (MMDS up at boot),
// then double on each failure up to mmdsMaxBackoff so a persistently broken
// endpoint doesn't keep firing HTTP requests every 50ms.
const (
mmdsInitialBackoff = 50 * time.Millisecond
mmdsMaxBackoff = 1 * time.Second
)

func PollForMMDSOpts(ctx context.Context, mmdsChan chan<- *MMDSOpts, envVars *utils.Map[string, string]) {
httpClient := &http.Client{}
defer httpClient.CloseIdleConnections()

ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
// Log only the first failure of each kind. Tagged as syslog WARNING (<4>)
// since MMDS hiccups are expected during early boot and recoverable.
var loggedTokenErr, loggedOptsErr bool

backoff := mmdsInitialBackoff
timer := time.NewTimer(backoff)
defer timer.Stop()

advanceBackoff := func() {
backoff *= 2
if backoff > mmdsMaxBackoff {
backoff = mmdsMaxBackoff
}
timer.Reset(backoff)
}

for {
select {
case <-ctx.Done():
fmt.Fprintf(os.Stderr, "context cancelled while waiting for mmds opts")
fmt.Fprintf(os.Stderr, "<4>context cancelled while waiting for mmds opts\n")

return
case <-ticker.C:
case <-timer.C:
token, err := getMMDSToken(ctx, httpClient)
if err != nil {
fmt.Fprintf(os.Stderr, "error getting mmds token: %v\n", err)
if !loggedTokenErr {
fmt.Fprintf(os.Stderr, "<4>error getting mmds token (suppressing further failures): %v\n", err)
loggedTokenErr = true
}

advanceBackoff()
continue

Check failure on line 181 in packages/envd/internal/host/mmds.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint (/home/runner/work/infra/infra/packages/envd)

continue with no blank line before (nlreturn)
}

mmdsOpts, err := getMMDSOpts(ctx, httpClient, token)
if err != nil {
fmt.Fprintf(os.Stderr, "error getting mmds opts: %v\n", err)
if !loggedOptsErr {
fmt.Fprintf(os.Stderr, "<4>error getting mmds opts (suppressing further failures): %v\n", err)
loggedOptsErr = true
}

advanceBackoff()
continue

Check failure on line 192 in packages/envd/internal/host/mmds.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint (/home/runner/work/infra/infra/packages/envd)

continue with no blank line before (nlreturn)
}

envVars.Store("E2B_SANDBOX_ID", mmdsOpts.SandboxID)
Expand Down
96 changes: 82 additions & 14 deletions packages/envd/internal/logs/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"log"
"net/http"
"os"
"sync"
Expand All @@ -13,13 +12,32 @@ import (
"github.com/e2b-dev/infra/packages/envd/internal/host"
)

const ExporterTimeout = 10 * time.Second
const (
ExporterTimeout = 10 * time.Second

// maxBufferedBytes caps the total in-memory size of the log queue, so the
// queue stays bounded even when the collector is unreachable for a long
// time (e.g. before MMDS opts arrive on boot). Oldest logs are dropped on
// over-cap so the most recent — and most useful for whatever just
// happened — are the ones the orchestrator gets once it can be reached.
maxBufferedBytes = 4 << 20 // 4 MiB

// Exporter send-failure backoff: each failure waits sendInitialBackoff,
// doubling up to sendMaxBackoff, before any further send is attempted.
// During the cooldown window logs are dropped (we already report the
// outage to journald and the orchestrator was supposed to receive them
// anyway). Resets to the initial value on the next successful send.
sendInitialBackoff = 1 * time.Second
sendMaxBackoff = 5 * time.Minute
)

type HTTPExporter struct {
client http.Client
logs [][]byte
isNotFC bool
mmdsOpts *host.MMDSOpts
client http.Client
logs [][]byte
bufferedBytes int
isNotFC bool
verbose bool
mmdsOpts *host.MMDSOpts

// Concurrency coordination
triggers chan struct{}
Expand All @@ -28,13 +46,14 @@ type HTTPExporter struct {
startOnce sync.Once
}

func NewHTTPLogsExporter(ctx context.Context, isNotFC bool, mmdsChan <-chan *host.MMDSOpts) *HTTPExporter {
func NewHTTPLogsExporter(ctx context.Context, isNotFC bool, verbose bool, mmdsChan <-chan *host.MMDSOpts) *HTTPExporter {
exporter := &HTTPExporter{
client: http.Client{
Timeout: ExporterTimeout,
},
triggers: make(chan struct{}, 1),
isNotFC: isNotFC,
verbose: verbose,
startOnce: sync.Once{},
mmdsOpts: &host.MMDSOpts{
SandboxID: "unknown",
Expand Down Expand Up @@ -69,7 +88,12 @@ func (w *HTTPExporter) sendInstanceLogs(ctx context.Context, logs []byte, addres
return nil
}

func printLog(logs []byte) {
func (w *HTTPExporter) printLog(logs []byte) {
// Only emit on -verbose. Inside FC stdout flows into journald, which is
// what this PR is trying to keep clean during send outages.
if !w.verbose {
return
}
fmt.Fprintf(os.Stdout, "%v", string(logs))
}

Expand All @@ -95,6 +119,16 @@ func (w *HTTPExporter) listenForMMDSOptsAndStart(ctx context.Context, mmdsChan <
}

func (w *HTTPExporter) start(ctx context.Context) {
// Suppress repeated failures so a wedged collector doesn't 1:1-amplify
// envd logs into journald.
var loggedJSONErr, loggedSendErr bool

// Exponential backoff between send attempts after a failure, so a
// persistently broken collector doesn't keep wasting ExporterTimeout
// (10s) per log line.
var cooldownUntil time.Time
cooldown := sendInitialBackoff

for range w.triggers {
logs := w.getAllLogs()

Expand All @@ -103,8 +137,10 @@ func (w *HTTPExporter) start(ctx context.Context) {
}

if w.isNotFC {
for _, log := range logs {
fmt.Fprintf(os.Stdout, "%v", string(log))
if w.verbose {
for _, log := range logs {
fmt.Fprintf(os.Stdout, "%v", string(log))
}
}

continue
Expand All @@ -115,21 +151,42 @@ func (w *HTTPExporter) start(ctx context.Context) {
logLineWithOpts, err := w.mmdsOpts.AddOptsToJSON(logLine)
w.mmdsLock.RUnlock()
if err != nil {
log.Printf("error adding instance logging options (%+v) to JSON (%+v) with logs : %v\n", w.mmdsOpts, logLine, err)
if !loggedJSONErr {
fmt.Fprintf(os.Stderr, "<4>error adding instance logging options (%+v) to JSON (%+v) with logs (suppressing further failures): %v\n", w.mmdsOpts, logLine, err)
loggedJSONErr = true
}

printLog(logLine)
w.printLog(logLine)

continue
}

if time.Now().Before(cooldownUntil) {
// Skip send during cooldown; the log is dropped.
continue
}

err = w.sendInstanceLogs(ctx, logLineWithOpts, w.mmdsOpts.LogsCollectorAddress)
if err != nil {
log.Printf("error sending instance logs: %+v", err)
if !loggedSendErr {
fmt.Fprintf(os.Stderr, "<4>error sending instance logs (suppressing further failures): %v\n", err)
loggedSendErr = true
}

cooldownUntil = time.Now().Add(cooldown)
cooldown *= 2
if cooldown > sendMaxBackoff {
cooldown = sendMaxBackoff
}

printLog(logLine)
w.printLog(logLine)

continue
}

cooldown = sendInitialBackoff
cooldownUntil = time.Time{}
loggedSendErr = false
}
}
}
Expand Down Expand Up @@ -158,6 +215,7 @@ func (w *HTTPExporter) getAllLogs() [][]byte {

logs := w.logs
w.logs = nil
w.bufferedBytes = 0

return logs
}
Expand All @@ -166,6 +224,16 @@ func (w *HTTPExporter) addLogs(logs []byte) {
w.logLock.Lock()
defer w.logLock.Unlock()

// Drop oldest so total buffered size stays at or below maxBufferedBytes.
// Most recent logs are kept (those are usually the ones we want to see
// after a long buffering window, e.g. early boot before MMDS is up).
for w.bufferedBytes+len(logs) > maxBufferedBytes && len(w.logs) > 0 {
w.bufferedBytes -= len(w.logs[0])
w.logs[0] = nil // let GC reclaim the dropped log content
w.logs = w.logs[1:]
}

w.bufferedBytes += len(logs)
w.logs = append(w.logs, logs)
Comment on lines +236 to 237

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject or truncate oversized log entries before buffering

addLogs only evicts existing entries while len(w.logs) > 0, so if a single incoming record is larger than maxBufferedBytes and the queue is empty, it is still appended and bufferedBytes jumps past the configured cap. That breaks the new bounded-memory guarantee and allows one unusually large log event to exceed the intended 4 MiB limit (with repeated large events causing repeated over-cap allocations).

Useful? React with 👍 / 👎.


w.resumeProcessing()
Expand Down
12 changes: 8 additions & 4 deletions packages/envd/internal/logs/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@ import (
"github.com/e2b-dev/infra/packages/envd/internal/logs/exporter"
)

func NewLogger(ctx context.Context, isNotFC bool, mmdsChan <-chan *host.MMDSOpts) *zerolog.Logger {
func NewLogger(ctx context.Context, isNotFC bool, verbose bool, mmdsChan <-chan *host.MMDSOpts) *zerolog.Logger {
zerolog.TimestampFieldName = "timestamp"
zerolog.TimeFieldFormat = time.RFC3339Nano

exporters := []io.Writer{}

if isNotFC {
if !isNotFC {
exporters = append(exporters, exporter.NewHTTPLogsExporter(ctx, isNotFC, verbose, mmdsChan))
}
Comment on lines +21 to +23

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep a default log sink in non-Firecracker mode

When envd runs with -isnotfc (for example via packages/envd/Makefile start-docker), this change leaves exporters empty unless -verbose is also set, because the HTTP exporter is now FC-only and stdout is opt-in. With no writer, zerolog drops all envd logs, so routine failures in local/non-FC runs become invisible and debugging regresses from previous behavior where stdout logging was always enabled in non-FC mode.

Useful? React with 👍 / 👎.

// Stdout is opt-in via -verbose. Inside FC stdout flows into journald and
// dirties guest pages on every snapshot, so we keep it off by default and
// rely on the HTTP exporter to ship debug logs to the orchestrator.
if verbose {
Comment thread
cursor[bot] marked this conversation as resolved.
exporters = append(exporters, os.Stdout)
} else {
exporters = append(exporters, exporter.NewHTTPLogsExporter(ctx, isNotFC, mmdsChan), os.Stdout)
}

l := zerolog.
Expand Down
6 changes: 3 additions & 3 deletions packages/envd/internal/services/process/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func New(
}

if readErr != nil {
fmt.Fprintf(os.Stderr, "error reading from pty: %s\n", readErr)
logger.Error().Err(readErr).Msg("error reading from pty")

break
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func New(
}

if readErr != nil {
fmt.Fprintf(os.Stderr, "error reading from stdout: %s\n", readErr)
logger.Error().Err(readErr).Msg("error reading from stdout")

break
}
Expand Down Expand Up @@ -313,7 +313,7 @@ func New(
}

if readErr != nil {
fmt.Fprintf(os.Stderr, "error reading from stderr: %s\n", readErr)
logger.Error().Err(readErr).Msg("error reading from stderr")

break
}
Expand Down
10 changes: 9 additions & 1 deletion packages/envd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var (
versionFlag bool
commitFlag bool
cgroupRoot string
verbose bool
)

func parseFlags() {
Expand Down Expand Up @@ -94,6 +95,13 @@ func parseFlags() {
"cgroup root directory",
)

flag.BoolVar(
&verbose,
"verbose",
false,
"write envd logs to stdout (inside FC this would dirty journald pages; HTTP exporter still ships full debug regardless)",
)

flag.Parse()
}

Expand Down Expand Up @@ -159,7 +167,7 @@ func main() {
go host.PollForMMDSOpts(ctx, mmdsChan, defaults.EnvVars)
}

l := logs.NewLogger(ctx, isNotFC, mmdsChan)
l := logs.NewLogger(ctx, isNotFC, verbose, mmdsChan)

m := chi.NewRouter()

Expand Down
2 changes: 1 addition & 1 deletion packages/envd/pkg/version.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package pkg

const Version = "0.5.20"
const Version = "0.5.24"
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{{- /*gotype:github.com/e2b-dev/infra/packages/orchestrator/pkg/template/build/core/rootfs.templateModel*/ -}}
{{ .WriteFile "etc/systemd/journald.conf.d/e2b.conf" 0o644 }}

[Journal]
Storage=persistent
SystemMaxUse=8M
SystemMaxFileSize=2M
MaxLevelStore=warning

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The MaxLevelStore=warning setting causes journald to discard all logs with a priority higher than 4, including the default info (6) priority assigned by systemd to service stdout and stderr. Consequently, all envd logs—including the Warn and Error levels intended for preservation—will be dropped. Consider setting this to info and relying on envd's internal filtering to manage volume, or ensuring envd outputs syslog-compatible priority prefixes.

MaxLevelStore=info

Comment thread
cursor[bot] marked this conversation as resolved.
MaxLevelConsole=warning
MaxLevelKMsg=warning
MaxLevelWall=emerg
ForwardToSyslog=no
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestAdditionalOCILayers(t *testing.T) {

keysIter := maps.Keys(actualFiles)
keys := slices.Collect(keysIter)
assert.Len(t, keys, 13)
assert.Len(t, keys, 14)
assert.Equal(t, "e2b.local", actualFiles["etc/hostname"])
assert.Equal(t, "nameserver 8.8.8.8", actualFiles["etc/resolv.conf"])

Expand Down
Loading