diff --git a/packages/envd/internal/api/init.go b/packages/envd/internal/api/init.go index e5ea9fec33..175ec049fb 100644 --- a/packages/envd/internal/api/init.go +++ b/packages/envd/internal/api/init.go @@ -21,9 +21,14 @@ import ( "github.com/e2b-dev/infra/packages/envd/internal/host" "github.com/e2b-dev/infra/packages/envd/internal/logs" + "github.com/e2b-dev/infra/packages/envd/internal/logs/ratelimit" "github.com/e2b-dev/infra/packages/shared/pkg/keys" ) +// /init is hammered by the orchestrator's infinite retry loop, so a +// persistent pin failure would otherwise flood the log. +var pinMMDSWarnLimit = ratelimit.New(10 * time.Second) + var ( ErrAccessTokenMismatch = errors.New("access token validation failed") ErrAccessTokenResetNotAuthorized = errors.New("access token reset not authorized") @@ -73,6 +78,18 @@ func (a *API) checkMMDSHash(ctx context.Context, requestToken *SecureToken) (boo } mmdsHash, err := a.mmdsClient.GetAccessTokenHash(ctx) + if err != nil { + // Self-heal: a user-installed PREROUTING/OUTPUT redirect on + // 169.254.169.254:80 in the same netns can shadow our route. + // Re-pin our RETURN rule at position 1 of nat PREROUTING and + // OUTPUT, then retry once. + if pinErr := host.PinMMDSRoute(ctx); pinErr != nil { + if ok, suppressed := pinMMDSWarnLimit.Allow(); ok { + a.logger.Warn().Err(pinErr).Int64("suppressed", suppressed).Msg("failed to pin MMDS iptables route") + } + } + mmdsHash, err = a.mmdsClient.GetAccessTokenHash(ctx) + } if err != nil { return false, false } diff --git a/packages/envd/internal/host/mmds.go b/packages/envd/internal/host/mmds.go index 4be3464ab0..27bfe00463 100644 --- a/packages/envd/internal/host/mmds.go +++ b/packages/envd/internal/host/mmds.go @@ -130,7 +130,13 @@ func GetAccessTokenHashFromMMDS(ctx context.Context) (string, error) { } func PollForMMDSOpts(ctx context.Context, mmdsChan chan<- *MMDSOpts, envVars *utils.EnvVars) { - httpClient := &http.Client{Transport: &http.Transport{DisableKeepAlives: true}} + // Match mmdsAccessTokenClient: bound any single tick (e.g. -j DROP on + // MMDS would otherwise hang on the TCP handshake) and avoid keepalive + // so a broken intermediate doesn't poison a kept-open connection. + httpClient := &http.Client{ + Timeout: mmdsAccessTokenRequestClientTimeout, + Transport: &http.Transport{DisableKeepAlives: true}, + } defer httpClient.CloseIdleConnections() var lastErr error diff --git a/packages/envd/internal/host/mmds_route_linux.go b/packages/envd/internal/host/mmds_route_linux.go new file mode 100644 index 0000000000..170e877247 --- /dev/null +++ b/packages/envd/internal/host/mmds_route_linux.go @@ -0,0 +1,56 @@ +//go:build linux + +package host + +import ( + "context" + "fmt" + "os/exec" + + "golang.org/x/sync/semaphore" +) + +// pinMMDSSem serializes self-heal calls so concurrent /init retries don't +// run iptables in parallel against the same nat table. +var pinMMDSSem = semaphore.NewWeighted(1) + +// PinMMDSRoute pins a RETURN rule for MMDS traffic (169.254.169.254:80) at +// position 1 of nat PREROUTING and OUTPUT. Idempotent: each run deletes any +// existing copy of the rule first, then re-inserts at position 1, so user +// rules added above ours get pushed down. +// +// Intended for the self-heal path: only called when a real MMDS lookup +// fails. Concurrent callers are coalesced via a semaphore — only one runs +// at a time, the rest return nil immediately. Returns the first -I failure +// (if any); -D failures are expected (rule absent on first run) and +// silently swallowed. +func PinMMDSRoute(ctx context.Context) error { + if !pinMMDSSem.TryAcquire(1) { + return nil + } + defer pinMMDSSem.Release(1) + + rule := []string{"-d", "169.254.169.254", "-p", "tcp", "--dport", "80", "-j", "RETURN"} + for _, chain := range []string{"PREROUTING", "OUTPUT"} { + // -D fails when the rule is absent (exit 1, expected on first run); + // nothing actionable to log. + _ = iptables(ctx, append([]string{"-D", chain}, rule...)...) + if err := iptables(ctx, append([]string{"-I", chain, "1"}, rule...)...); err != nil { + return fmt.Errorf("iptables -I nat %s: %w", chain, err) + } + } + + return nil +} + +// iptables runs `iptables -w 5 -t nat ...`. -w waits up to 5s for the +// xtables lock (a user iptables process may race us). +func iptables(ctx context.Context, args ...string) error { + full := append([]string{"-w", "5", "-t", "nat"}, args...) + out, err := exec.CommandContext(ctx, "iptables", full...).CombinedOutput() + if err != nil { + return fmt.Errorf("%w: %s", err, out) + } + + return nil +} diff --git a/packages/envd/internal/host/mmds_route_other.go b/packages/envd/internal/host/mmds_route_other.go new file mode 100644 index 0000000000..b009d37228 --- /dev/null +++ b/packages/envd/internal/host/mmds_route_other.go @@ -0,0 +1,7 @@ +//go:build !linux + +package host + +import "context" + +func PinMMDSRoute(_ context.Context) error { return nil } diff --git a/packages/envd/internal/logs/exporter/rate_limited_logger.go b/packages/envd/internal/logs/exporter/rate_limited_logger.go index 22500e2948..1845d10320 100644 --- a/packages/envd/internal/logs/exporter/rate_limited_logger.go +++ b/packages/envd/internal/logs/exporter/rate_limited_logger.go @@ -2,34 +2,24 @@ package exporter import ( "log" - "sync/atomic" "time" + + "github.com/e2b-dev/infra/packages/envd/internal/logs/ratelimit" ) type rateLimitedLogger struct { - floor time.Duration - format string - lastLogged atomic.Pointer[time.Time] - suppressed atomic.Int64 + limit *ratelimit.Limiter + format string } func newRateLimitedLogger(floor time.Duration, format string) *rateLimitedLogger { - return &rateLimitedLogger{floor: floor, format: format} + return &rateLimitedLogger{limit: ratelimit.New(floor), format: format} } func (r *rateLimitedLogger) log(args ...any) { - last := r.lastLogged.Load() - if last != nil && time.Since(*last) <= r.floor { - r.suppressed.Add(1) - - return - } - now := time.Now() - if !r.lastLogged.CompareAndSwap(last, &now) { - r.suppressed.Add(1) - + ok, suppressed := r.limit.Allow() + if !ok { return } - suppressed := r.suppressed.Swap(0) log.Printf(r.format+" (%d suppressed since last log)", append(args, suppressed)...) } diff --git a/packages/envd/internal/logs/ratelimit/ratelimit.go b/packages/envd/internal/logs/ratelimit/ratelimit.go new file mode 100644 index 0000000000..78e36b9b28 --- /dev/null +++ b/packages/envd/internal/logs/ratelimit/ratelimit.go @@ -0,0 +1,39 @@ +package ratelimit + +import ( + "sync/atomic" + "time" +) + +// Limiter gates a recurring log to at most one emit per `floor`, counting +// suppressed attempts in between. The caller decides how to format/emit; +// this type only owns the gating decision. +type Limiter struct { + floor time.Duration + lastLogged atomic.Pointer[time.Time] + suppressed atomic.Int64 +} + +func New(floor time.Duration) *Limiter { + return &Limiter{floor: floor} +} + +// Allow returns (true, suppressedSinceLast) when the caller should emit a +// log line; false otherwise. On true the caller should include +// `suppressedSinceLast` in the emitted message. +func (r *Limiter) Allow() (bool, int64) { + last := r.lastLogged.Load() + if last != nil && time.Since(*last) <= r.floor { + r.suppressed.Add(1) + + return false, 0 + } + now := time.Now() + if !r.lastLogged.CompareAndSwap(last, &now) { + r.suppressed.Add(1) + + return false, 0 + } + + return true, r.suppressed.Swap(0) +} diff --git a/packages/envd/pkg/version.go b/packages/envd/pkg/version.go index 4cfbb08b29..48aeaf4b70 100644 --- a/packages/envd/pkg/version.go +++ b/packages/envd/pkg/version.go @@ -1,3 +1,3 @@ package pkg -const Version = "0.5.24" +const Version = "0.5.25"