Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions packages/envd/internal/api/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@ import (

"github.com/e2b-dev/infra/packages/envd/internal/host"
"github.com/e2b-dev/infra/packages/envd/internal/logs"
"github.com/e2b-dev/infra/packages/envd/internal/logs/ratelimit"
"github.com/e2b-dev/infra/packages/shared/pkg/keys"
)

// /init is hammered by the orchestrator's infinite retry loop, so a
// persistent pin failure would otherwise flood the log.
var pinMMDSWarnLimit = ratelimit.New(10 * time.Second)

var (
ErrAccessTokenMismatch = errors.New("access token validation failed")
ErrAccessTokenResetNotAuthorized = errors.New("access token reset not authorized")
Expand Down Expand Up @@ -73,6 +78,18 @@ func (a *API) checkMMDSHash(ctx context.Context, requestToken *SecureToken) (boo
}

mmdsHash, err := a.mmdsClient.GetAccessTokenHash(ctx)
if err != nil {
// Self-heal: a user-installed PREROUTING/OUTPUT redirect on
// 169.254.169.254:80 in the same netns can shadow our route.
// Re-pin our RETURN rule at position 1 of nat PREROUTING and
// OUTPUT, then retry once.
if pinErr := host.PinMMDSRoute(ctx); pinErr != nil {
if ok, suppressed := pinMMDSWarnLimit.Allow(); ok {
a.logger.Warn().Err(pinErr).Int64("suppressed", suppressed).Msg("failed to pin MMDS iptables route")
}
}
Comment thread
ValentaTomas marked this conversation as resolved.
mmdsHash, err = a.mmdsClient.GetAccessTokenHash(ctx)
}
Comment thread
ValentaTomas marked this conversation as resolved.
if err != nil {
return false, false
}
Expand Down
8 changes: 7 additions & 1 deletion packages/envd/internal/host/mmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,13 @@ func GetAccessTokenHashFromMMDS(ctx context.Context) (string, error) {
}

func PollForMMDSOpts(ctx context.Context, mmdsChan chan<- *MMDSOpts, envVars *utils.EnvVars) {
httpClient := &http.Client{Transport: &http.Transport{DisableKeepAlives: true}}
// Match mmdsAccessTokenClient: bound any single tick (e.g. -j DROP on
// MMDS would otherwise hang on the TCP handshake) and avoid keepalive
// so a broken intermediate doesn't poison a kept-open connection.
httpClient := &http.Client{
Timeout: mmdsAccessTokenRequestClientTimeout,
Transport: &http.Transport{DisableKeepAlives: true},
}
defer httpClient.CloseIdleConnections()

var lastErr error
Expand Down
56 changes: 56 additions & 0 deletions packages/envd/internal/host/mmds_route_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//go:build linux

package host

import (
"context"
"fmt"
"os/exec"

"golang.org/x/sync/semaphore"
)

// pinMMDSSem serializes self-heal calls so concurrent /init retries don't
// run iptables in parallel against the same nat table.
var pinMMDSSem = semaphore.NewWeighted(1)

// PinMMDSRoute pins a RETURN rule for MMDS traffic (169.254.169.254:80) at
// position 1 of nat PREROUTING and OUTPUT. Idempotent: each run deletes any
// existing copy of the rule first, then re-inserts at position 1, so user
// rules added above ours get pushed down.
//
// Intended for the self-heal path: only called when a real MMDS lookup
// fails. Concurrent callers are coalesced via a semaphore — only one runs
// at a time, the rest return nil immediately. Returns the first -I failure
// (if any); -D failures are expected (rule absent on first run) and
// silently swallowed.
func PinMMDSRoute(ctx context.Context) error {
if !pinMMDSSem.TryAcquire(1) {
return nil
}
defer pinMMDSSem.Release(1)

rule := []string{"-d", "169.254.169.254", "-p", "tcp", "--dport", "80", "-j", "RETURN"}
for _, chain := range []string{"PREROUTING", "OUTPUT"} {
// -D fails when the rule is absent (exit 1, expected on first run);
// nothing actionable to log.
_ = iptables(ctx, append([]string{"-D", chain}, rule...)...)
if err := iptables(ctx, append([]string{"-I", chain, "1"}, rule...)...); err != nil {
return fmt.Errorf("iptables -I nat %s: %w", chain, err)
}
}

return nil
}

// iptables runs `iptables -w 5 -t nat ...`. -w waits up to 5s for the
// xtables lock (a user iptables process may race us).
func iptables(ctx context.Context, args ...string) error {
full := append([]string{"-w", "5", "-t", "nat"}, args...)
out, err := exec.CommandContext(ctx, "iptables", full...).CombinedOutput()
if err != nil {
return fmt.Errorf("%w: %s", err, out)
}

return nil
}
7 changes: 7 additions & 0 deletions packages/envd/internal/host/mmds_route_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//go:build !linux

package host

import "context"

func PinMMDSRoute(_ context.Context) error { return nil }
24 changes: 7 additions & 17 deletions packages/envd/internal/logs/exporter/rate_limited_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,24 @@ package exporter

import (
"log"
"sync/atomic"
"time"

"github.com/e2b-dev/infra/packages/envd/internal/logs/ratelimit"
)

type rateLimitedLogger struct {
floor time.Duration
format string
lastLogged atomic.Pointer[time.Time]
suppressed atomic.Int64
limit *ratelimit.Limiter
format string
}

func newRateLimitedLogger(floor time.Duration, format string) *rateLimitedLogger {
return &rateLimitedLogger{floor: floor, format: format}
return &rateLimitedLogger{limit: ratelimit.New(floor), format: format}
}

func (r *rateLimitedLogger) log(args ...any) {
last := r.lastLogged.Load()
if last != nil && time.Since(*last) <= r.floor {
r.suppressed.Add(1)

return
}
now := time.Now()
if !r.lastLogged.CompareAndSwap(last, &now) {
r.suppressed.Add(1)

ok, suppressed := r.limit.Allow()
if !ok {
return
}
suppressed := r.suppressed.Swap(0)
log.Printf(r.format+" (%d suppressed since last log)", append(args, suppressed)...)
}
39 changes: 39 additions & 0 deletions packages/envd/internal/logs/ratelimit/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package ratelimit

import (
"sync/atomic"
"time"
)

// Limiter gates a recurring log to at most one emit per `floor`, counting
// suppressed attempts in between. The caller decides how to format/emit;
// this type only owns the gating decision.
type Limiter struct {
floor time.Duration
lastLogged atomic.Pointer[time.Time]
suppressed atomic.Int64
}

func New(floor time.Duration) *Limiter {
return &Limiter{floor: floor}
}

// Allow returns (true, suppressedSinceLast) when the caller should emit a
// log line; false otherwise. On true the caller should include
// `suppressedSinceLast` in the emitted message.
func (r *Limiter) Allow() (bool, int64) {
last := r.lastLogged.Load()
if last != nil && time.Since(*last) <= r.floor {
r.suppressed.Add(1)

return false, 0
}
now := time.Now()
if !r.lastLogged.CompareAndSwap(last, &now) {
r.suppressed.Add(1)

return false, 0
}

return true, r.suppressed.Swap(0)
}
2 changes: 1 addition & 1 deletion packages/envd/pkg/version.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package pkg

const Version = "0.5.24"
const Version = "0.5.25"
Loading