Skip to content

Commit 90944d5

Browse files
authored
fix(envd): self-heal MMDS routing on /init lookup failure (#2701)
On MMDS lookup failure (no happy-path cost), install a private nat chain (E2B_MMDS) that \`RETURN\`s for \`169.254.169.254:80\` and pin its jump at position 1 in PREROUTING and OUTPUT, removing prior copies first. Retry the lookup once. Idempotent: \`-N\`/\`-D\` errors are expected and swallowed. ## Follow-ups (separate PRs) - Survey envd \`/init\` failure modes to add similar in-envd recovery for other preconditions / loops. - Long-term: dedicated envd netns so user iptables cannot affect us at all.
1 parent aa2d4c5 commit 90944d5

7 files changed

Lines changed: 134 additions & 19 deletions

File tree

packages/envd/internal/api/init.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,14 @@ import (
2121

2222
"github.com/e2b-dev/infra/packages/envd/internal/host"
2323
"github.com/e2b-dev/infra/packages/envd/internal/logs"
24+
"github.com/e2b-dev/infra/packages/envd/internal/logs/ratelimit"
2425
"github.com/e2b-dev/infra/packages/shared/pkg/keys"
2526
)
2627

28+
// /init is hammered by the orchestrator's infinite retry loop, so a
29+
// persistent pin failure would otherwise flood the log.
30+
var pinMMDSWarnLimit = ratelimit.New(10 * time.Second)
31+
2732
var (
2833
ErrAccessTokenMismatch = errors.New("access token validation failed")
2934
ErrAccessTokenResetNotAuthorized = errors.New("access token reset not authorized")
@@ -73,6 +78,18 @@ func (a *API) checkMMDSHash(ctx context.Context, requestToken *SecureToken) (boo
7378
}
7479

7580
mmdsHash, err := a.mmdsClient.GetAccessTokenHash(ctx)
81+
if err != nil {
82+
// Self-heal: a user-installed PREROUTING/OUTPUT redirect on
83+
// 169.254.169.254:80 in the same netns can shadow our route.
84+
// Re-pin our RETURN rule at position 1 of nat PREROUTING and
85+
// OUTPUT, then retry once.
86+
if pinErr := host.PinMMDSRoute(ctx); pinErr != nil {
87+
if ok, suppressed := pinMMDSWarnLimit.Allow(); ok {
88+
a.logger.Warn().Err(pinErr).Int64("suppressed", suppressed).Msg("failed to pin MMDS iptables route")
89+
}
90+
}
91+
mmdsHash, err = a.mmdsClient.GetAccessTokenHash(ctx)
92+
}
7693
if err != nil {
7794
return false, false
7895
}

packages/envd/internal/host/mmds.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,13 @@ func GetAccessTokenHashFromMMDS(ctx context.Context) (string, error) {
130130
}
131131

132132
func PollForMMDSOpts(ctx context.Context, mmdsChan chan<- *MMDSOpts, envVars *utils.EnvVars) {
133-
httpClient := &http.Client{Transport: &http.Transport{DisableKeepAlives: true}}
133+
// Match mmdsAccessTokenClient: bound any single tick (e.g. -j DROP on
134+
// MMDS would otherwise hang on the TCP handshake) and avoid keepalive
135+
// so a broken intermediate doesn't poison a kept-open connection.
136+
httpClient := &http.Client{
137+
Timeout: mmdsAccessTokenRequestClientTimeout,
138+
Transport: &http.Transport{DisableKeepAlives: true},
139+
}
134140
defer httpClient.CloseIdleConnections()
135141

136142
var lastErr error
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
//go:build linux
2+
3+
package host
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"os/exec"
9+
10+
"golang.org/x/sync/semaphore"
11+
)
12+
13+
// pinMMDSSem serializes self-heal calls so concurrent /init retries don't
14+
// run iptables in parallel against the same nat table.
15+
var pinMMDSSem = semaphore.NewWeighted(1)
16+
17+
// PinMMDSRoute pins a RETURN rule for MMDS traffic (169.254.169.254:80) at
18+
// position 1 of nat PREROUTING and OUTPUT. Idempotent: each run deletes any
19+
// existing copy of the rule first, then re-inserts at position 1, so user
20+
// rules added above ours get pushed down.
21+
//
22+
// Intended for the self-heal path: only called when a real MMDS lookup
23+
// fails. Concurrent callers are coalesced via a semaphore — only one runs
24+
// at a time, the rest return nil immediately. Returns the first -I failure
25+
// (if any); -D failures are expected (rule absent on first run) and
26+
// silently swallowed.
27+
func PinMMDSRoute(ctx context.Context) error {
28+
if !pinMMDSSem.TryAcquire(1) {
29+
return nil
30+
}
31+
defer pinMMDSSem.Release(1)
32+
33+
rule := []string{"-d", "169.254.169.254", "-p", "tcp", "--dport", "80", "-j", "RETURN"}
34+
for _, chain := range []string{"PREROUTING", "OUTPUT"} {
35+
// -D fails when the rule is absent (exit 1, expected on first run);
36+
// nothing actionable to log.
37+
_ = iptables(ctx, append([]string{"-D", chain}, rule...)...)
38+
if err := iptables(ctx, append([]string{"-I", chain, "1"}, rule...)...); err != nil {
39+
return fmt.Errorf("iptables -I nat %s: %w", chain, err)
40+
}
41+
}
42+
43+
return nil
44+
}
45+
46+
// iptables runs `iptables -w 5 -t nat ...`. -w waits up to 5s for the
47+
// xtables lock (a user iptables process may race us).
48+
func iptables(ctx context.Context, args ...string) error {
49+
full := append([]string{"-w", "5", "-t", "nat"}, args...)
50+
out, err := exec.CommandContext(ctx, "iptables", full...).CombinedOutput()
51+
if err != nil {
52+
return fmt.Errorf("%w: %s", err, out)
53+
}
54+
55+
return nil
56+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
//go:build !linux
2+
3+
package host
4+
5+
import "context"
6+
7+
func PinMMDSRoute(_ context.Context) error { return nil }

packages/envd/internal/logs/exporter/rate_limited_logger.go

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,24 @@ package exporter
22

33
import (
44
"log"
5-
"sync/atomic"
65
"time"
6+
7+
"github.com/e2b-dev/infra/packages/envd/internal/logs/ratelimit"
78
)
89

910
type rateLimitedLogger struct {
10-
floor time.Duration
11-
format string
12-
lastLogged atomic.Pointer[time.Time]
13-
suppressed atomic.Int64
11+
limit *ratelimit.Limiter
12+
format string
1413
}
1514

1615
func newRateLimitedLogger(floor time.Duration, format string) *rateLimitedLogger {
17-
return &rateLimitedLogger{floor: floor, format: format}
16+
return &rateLimitedLogger{limit: ratelimit.New(floor), format: format}
1817
}
1918

2019
func (r *rateLimitedLogger) log(args ...any) {
21-
last := r.lastLogged.Load()
22-
if last != nil && time.Since(*last) <= r.floor {
23-
r.suppressed.Add(1)
24-
25-
return
26-
}
27-
now := time.Now()
28-
if !r.lastLogged.CompareAndSwap(last, &now) {
29-
r.suppressed.Add(1)
30-
20+
ok, suppressed := r.limit.Allow()
21+
if !ok {
3122
return
3223
}
33-
suppressed := r.suppressed.Swap(0)
3424
log.Printf(r.format+" (%d suppressed since last log)", append(args, suppressed)...)
3525
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package ratelimit
2+
3+
import (
4+
"sync/atomic"
5+
"time"
6+
)
7+
8+
// Limiter gates a recurring log to at most one emit per `floor`, counting
9+
// suppressed attempts in between. The caller decides how to format/emit;
10+
// this type only owns the gating decision.
11+
type Limiter struct {
12+
floor time.Duration
13+
lastLogged atomic.Pointer[time.Time]
14+
suppressed atomic.Int64
15+
}
16+
17+
func New(floor time.Duration) *Limiter {
18+
return &Limiter{floor: floor}
19+
}
20+
21+
// Allow returns (true, suppressedSinceLast) when the caller should emit a
22+
// log line; false otherwise. On true the caller should include
23+
// `suppressedSinceLast` in the emitted message.
24+
func (r *Limiter) Allow() (bool, int64) {
25+
last := r.lastLogged.Load()
26+
if last != nil && time.Since(*last) <= r.floor {
27+
r.suppressed.Add(1)
28+
29+
return false, 0
30+
}
31+
now := time.Now()
32+
if !r.lastLogged.CompareAndSwap(last, &now) {
33+
r.suppressed.Add(1)
34+
35+
return false, 0
36+
}
37+
38+
return true, r.suppressed.Swap(0)
39+
}

packages/envd/pkg/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
package pkg
22

3-
const Version = "0.5.24"
3+
const Version = "0.5.25"

0 commit comments

Comments
 (0)