Skip to content

Commit c49c574

Browse files
committed
workloadmeta/docker: recover panics in stream() event handlers (quickfix)
The docker workloadmeta collector's stream() goroutine runs forever and handles two event streams (container + image) by calling handleContainerEvent / handleImageEvent inline. Neither path had any panic recovery, so a single panic inside buildCollectorEvent \u2014 for example, one raised by moby/client's JSON decoder while processing a malformed ContainerInspect response \u2014 would bubble up, terminate the goroutine and crash the whole agent process. This happened in production during an SMP regression-detector run on the gpu_check_hopper_fake_nvml experiment (every replicate crashed the agent after a few hundred seconds, exhausting the 1h10m SMP timeout); see taskmds/05001 for the full captured stack and context. Wrap both event handlers in a runWithRecovery helper that: * logs the panic + debug.Stack() at ERROR with enough context to identify the offending event (container ID + action for containers, action for images), * drops the offending event, * lets the stream loop continue processing subsequent events. This is a quickfix, not a root-cause fix \u2014 the underlying panic is still inside moby/client@v0.4.0's JSON decoder and needs to be fixed there (or upstream in Go's encoding/json + reflect interaction with container.InspectResponse). taskmds/05001 tracks that work. The companion regression test for the agent-visible half of the bug is at pkg/util/docker/inspect_panic_test.go (commit bc6a6f4). Three unit tests cover the new helper: * TestRunWithRecovery_SwallowsPanic \u2014 a panicking function returns to its caller normally. * TestRunWithRecovery_NoPanicNoOp \u2014 the happy path is unchanged. * TestRunWithRecovery_SubsequentCallsAfterPanic \u2014 the reusability property the stream loop actually relies on: a panic in one call does not affect the next.
1 parent 14ff781 commit c49c574

2 files changed

Lines changed: 124 additions & 8 deletions

File tree

comp/core/workloadmeta/collectors/internal/docker/docker.go

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"context"
1313
"errors"
1414
"fmt"
15+
"runtime/debug"
1516
"slices"
1617
"strconv"
1718
"strings"
@@ -167,16 +168,10 @@ func (c *collector) stream(ctx context.Context) {
167168
case <-health.C:
168169

169170
case ev := <-c.containerEventsCh:
170-
err := c.handleContainerEvent(ctx, ev)
171-
if err != nil {
172-
log.Warnf("%s", err.Error())
173-
}
171+
c.handleContainerEventSafely(ctx, ev)
174172

175173
case ev := <-c.imageEventsCh:
176-
err := c.handleImageEvent(ctx, ev, nil)
177-
if err != nil {
178-
log.Warnf("%s", err.Error())
179-
}
174+
c.handleImageEventSafely(ctx, ev)
180175

181176
case <-ctx.Done():
182177
var err error
@@ -198,6 +193,49 @@ func (c *collector) stream(ctx context.Context) {
198193
}
199194
}
200195

196+
// handleContainerEventSafely wraps handleContainerEvent in a defer/recover so
197+
// that a panic inside buildCollectorEvent (for example, a panic raised by
198+
// moby/client's JSON decoder while processing a malformed ContainerInspect
199+
// response — see taskmds/05001 for the full story) does not tear down the
200+
// agent. The event is dropped and the stream loop continues.
201+
func (c *collector) handleContainerEventSafely(ctx context.Context, ev *docker.ContainerEvent) {
202+
runWithRecovery(
203+
fmt.Sprintf("handling docker container event (containerID=%q action=%q)", ev.ContainerID, ev.Action),
204+
func() {
205+
if err := c.handleContainerEvent(ctx, ev); err != nil {
206+
log.Warnf("%s", err.Error())
207+
}
208+
},
209+
)
210+
}
211+
212+
// handleImageEventSafely is the panic-recovery wrapper for image events.
213+
// Mirrors handleContainerEventSafely so a single bad image event cannot
214+
// take down the collector goroutine.
215+
func (c *collector) handleImageEventSafely(ctx context.Context, ev *docker.ImageEvent) {
216+
runWithRecovery(
217+
fmt.Sprintf("handling docker image event (action=%q)", ev.Action),
218+
func() {
219+
if err := c.handleImageEvent(ctx, ev, nil); err != nil {
220+
log.Warnf("%s", err.Error())
221+
}
222+
},
223+
)
224+
}
225+
226+
// runWithRecovery invokes fn with a deferred panic recovery. If fn panics,
227+
// the panic value and a stack trace are logged at ERROR and runWithRecovery
228+
// returns normally. Extracted so the behaviour can be exercised from a unit
229+
// test without having to mock the full DockerUtil surface.
230+
func runWithRecovery(what string, fn func()) {
231+
defer func() {
232+
if r := recover(); r != nil {
233+
log.Errorf("panic while %s, dropping event: %v\n%s", what, r, debug.Stack())
234+
}
235+
}()
236+
fn()
237+
}
238+
201239
func (c *collector) generateEventsFromContainerList(ctx context.Context, filter workloadfilter.FilterBundle) error {
202240
if c.store == nil {
203241
return errors.New("Start was not called")
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2025-present Datadog, Inc.
5+
6+
//go:build docker
7+
8+
package docker
9+
10+
// Tests for the panic-recovery wrapper used inside the collector's stream
11+
// goroutine. See taskmds/05001 for the underlying bug that motivated adding
12+
// this wrapper.
13+
14+
import (
15+
"testing"
16+
17+
"github.com/stretchr/testify/require"
18+
)
19+
20+
// TestRunWithRecovery_SwallowsPanic verifies that runWithRecovery swallows
21+
// a panic raised inside the passed function and returns normally. This is
22+
// the property the docker workloadmeta collector's stream goroutine relies
23+
// on — without it, a panic inside ContainerInspect (see taskmds/05001)
24+
// propagates up to the goroutine and kills the whole agent process.
25+
func TestRunWithRecovery_SwallowsPanic(t *testing.T) {
26+
didReturn := false
27+
28+
runWithRecovery("unit test", func() {
29+
panic("simulated moby/client json decode panic")
30+
})
31+
32+
didReturn = true
33+
require.Truef(t, didReturn,
34+
"runWithRecovery must return to its caller even when the wrapped "+
35+
"function panics; if this assertion fires it means the defer/recover "+
36+
"pair was removed and any panic inside handleContainerEvent will "+
37+
"tear down the collector goroutine (and in production, the agent "+
38+
"process itself).")
39+
}
40+
41+
// TestRunWithRecovery_NoPanicNoOp verifies the happy path: if the wrapped
42+
// function returns normally, runWithRecovery does not alter control flow.
43+
func TestRunWithRecovery_NoPanicNoOp(t *testing.T) {
44+
callCount := 0
45+
runWithRecovery("unit test", func() {
46+
callCount++
47+
})
48+
require.Equal(t, 1, callCount,
49+
"runWithRecovery must invoke the wrapped function exactly once "+
50+
"when it returns normally")
51+
}
52+
53+
// TestRunWithRecovery_SubsequentCallsAfterPanic is the behaviour the stream
54+
// loop cares about: after one call to runWithRecovery panics, subsequent
55+
// calls must still work. This is what makes a single bad container event
56+
// safe to drop without taking the collector with it.
57+
func TestRunWithRecovery_SubsequentCallsAfterPanic(t *testing.T) {
58+
runs := 0
59+
60+
runWithRecovery("first", func() {
61+
runs++
62+
panic("first call panics")
63+
})
64+
runWithRecovery("second", func() {
65+
runs++
66+
})
67+
runWithRecovery("third", func() {
68+
runs++
69+
panic("third call panics too")
70+
})
71+
runWithRecovery("fourth", func() {
72+
runs++
73+
})
74+
75+
require.Equal(t, 4, runs,
76+
"runWithRecovery must be reusable across events — each call is "+
77+
"independent; a panic in one call must not affect the next")
78+
}

0 commit comments

Comments
 (0)