Skip to content

Commit 31dfda9

Browse files
committed
batch GPU_LOST and unmonitored health events to reduce ResourceSlice updates
Signed-off-by: Swati Gupta <swatig@nvidia.com>
1 parent b515129 commit 31dfda9

2 files changed

Lines changed: 59 additions & 25 deletions

File tree

cmd/gpu-kubelet-plugin/device_health.go

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,11 @@ const (
5858
// DeviceHealthEvent carries a typed health notification from the NVML health
5959
// monitor to the driver's event handler, enabling the driver to set the
6060
// appropriate DRA device taint per the Option A schema (KEP-5055).
61+
// Devices is a batch: for GPU_LOST and unmonitored events where all affected devices
62+
// are aggregated into a single event so the consumer applies one ResourceSlice
63+
// update instead of N.
6164
type DeviceHealthEvent struct {
62-
Device *AllocatableDevice
65+
Devices []*AllocatableDevice
6366
EventType DeviceHealthEventType
6467
// inspired by NVML Event type and only meaningful for xid errors.
6568
// may have to create a custom type based on future device-api
@@ -267,7 +270,7 @@ func (m *nvmlDeviceHealthMonitor) run(ctx context.Context) {
267270

268271
klog.V(4).Infof("Sending XID=%d health event for device %s", xid, affectedDevice.UUID())
269272
m.unhealthy <- &DeviceHealthEvent{
270-
Device: affectedDevice,
273+
Devices: []*AllocatableDevice{affectedDevice},
271274
EventType: HealthEventXID,
272275
EventData: xid,
273276
}
@@ -279,30 +282,51 @@ func (m *nvmlDeviceHealthMonitor) Unhealthy() <-chan *DeviceHealthEvent {
279282
return m.unhealthy
280283
}
281284

285+
// sendHealthEventForAllDevices aggregates every device across all GPUs into a
286+
// single batched DeviceHealthEvent so the consumer makes one ResourceSlice
287+
// update.
282288
func (m *nvmlDeviceHealthMonitor) sendHealthEventForAllDevices(eventType DeviceHealthEventType) {
289+
var devices []*AllocatableDevice
283290
for _, giMap := range m.deviceByPlacement {
284-
m.sendHealthEventForDevices(giMap, eventType)
291+
devices = append(devices, collectDevices(giMap)...)
285292
}
293+
m.sendBatchedHealthEvent(devices, eventType)
286294
}
287295

288-
// sendHealthEventForDevices sends a DeviceHealthEvent for every device under
289-
// the given parent-level map. Uses non-blocking sends to protect the monitor
290-
// goroutine from deadlocks when the channel is full.
296+
// sendHealthEventForDevices aggregates all devices under a single parent GPU
297+
// into one batched DeviceHealthEvent.
291298
func (m *nvmlDeviceHealthMonitor) sendHealthEventForDevices(giMap map[uint32]map[uint32]*AllocatableDevice, eventType DeviceHealthEventType) {
299+
m.sendBatchedHealthEvent(collectDevices(giMap), eventType)
300+
}
301+
302+
// collectDevices flattens a GI→CI device map into a slice.
303+
func collectDevices(giMap map[uint32]map[uint32]*AllocatableDevice) []*AllocatableDevice {
304+
var devices []*AllocatableDevice
292305
for _, ciMap := range giMap {
293306
for _, dev := range ciMap {
294-
event := &DeviceHealthEvent{
295-
Device: dev,
296-
EventType: eventType,
297-
}
298-
select {
299-
case m.unhealthy <- event:
300-
klog.V(6).Infof("Sent %s health event for device %s", eventType, dev.UUID())
301-
default:
302-
klog.Errorf("Health event channel full; dropping %s event for device %s", eventType, dev.UUID())
303-
}
307+
devices = append(devices, dev)
304308
}
305309
}
310+
return devices
311+
}
312+
313+
// sendBatchedHealthEvent sends a single DeviceHealthEvent containing all
314+
// affected devices. Uses a non-blocking send to protect the monitor goroutine
315+
// from deadlocks when the channel is full.
316+
func (m *nvmlDeviceHealthMonitor) sendBatchedHealthEvent(devices []*AllocatableDevice, eventType DeviceHealthEventType) {
317+
if len(devices) == 0 {
318+
return
319+
}
320+
event := &DeviceHealthEvent{
321+
Devices: devices,
322+
EventType: eventType,
323+
}
324+
select {
325+
case m.unhealthy <- event:
326+
klog.V(6).Infof("Sent batched %s health event for %d device(s)", eventType, len(devices))
327+
default:
328+
klog.Errorf("Health event channel full; dropping batched %s event for %d device(s)", eventType, len(devices))
329+
}
306330
}
307331

308332
// The purpose of this function is to allow for a O(1) lookup of

cmd/gpu-kubelet-plugin/driver.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -470,13 +470,19 @@ func (d *driver) deviceHealthEvents(ctx context.Context, nodeName string) {
470470
klog.V(6).Info("Health monitor channel closed")
471471
return
472472
}
473-
uuid := event.Device.UUID()
474-
klog.Warningf("Received %s health event for device %s", event.EventType, uuid)
475473

476474
taint := healthEventToTaint(event, d.deviceHealthMonitor)
477-
if !d.state.AddDeviceTaint(event.Device, taint) {
475+
modified := false
476+
for _, dev := range event.Devices {
477+
klog.Warningf("Received %s health event for device %s", event.EventType, dev.UUID())
478+
if d.state.AddDeviceTaint(dev, taint) {
479+
modified = true
480+
}
481+
}
482+
if !modified {
478483
continue
479484
}
485+
480486
var resourceSlice resourceslice.Slice
481487
for _, devices := range d.state.perGPUAllocatable.allocatablesMap {
482488
for _, dev := range devices {
@@ -501,21 +507,25 @@ func (d *driver) deviceHealthEvents(ctx context.Context, nodeName string) {
501507
// This is a temporary compromise while device taints/tolerations (KEP-5055)
502508
// are available as a Beta feature. An interim improvement could be adding
503509
// a retry/backoff or switch to patch updates instead of full republish.
504-
klog.V(4).Infof("Republishing ResourceSlice: device %s tainted with %s=%q (effect=%s)",
505-
uuid, taint.Key, taint.Value, taint.Effect)
510+
klog.V(4).Infof("Republishing ResourceSlice: %d device(s) tainted with %s=%q (effect=%s)",
511+
len(event.Devices), taint.Key, taint.Value, taint.Effect)
506512

507513
resources := resourceslice.DriverResources{
508514
Pools: map[string]resourceslice.Pool{
509515
nodeName: {Slices: []resourceslice.Slice{resourceSlice}},
510516
},
511517
}
512518

513-
// TODO: Instead of acting on per event basis, add event aggregation on the receiving side before `publishResources`.
514-
// Evaluate two batching strategies:
515-
// 1. Channel drain: Non-blocking pull of all pending events (Pro: zero latency; Con: susceptible to NVML lag).
519+
// NOTE: GPU_LOST and unmonitored events are already batched at the
520+
// sender (all affected devices arrive in a single DeviceHealthEvent).
521+
// XID events are still per-device and may cause repeated publishes.
522+
// TODO: Add receiver-side event aggregation before PublishResources.
523+
// Evaluate two strategies:
524+
// 1. Channel drain: non-blocking pull of all pending events (Pro: zero latency; Con: susceptible to NVML lag).
516525
// 2. Timer debounce: e.g., 50ms window (Pro: standard K8s API protection; Con: slight delay).
526+
// This also needs to be handle properly in the recovery path.
517527
if err := d.pluginhelper.PublishResources(ctx, resources); err != nil {
518-
klog.Errorf("Failed to publish resources after taint update for device %s: %v", uuid, err)
528+
klog.Errorf("Failed to publish resources after taint update: %v", err)
519529
}
520530
}
521531
}

0 commit comments

Comments
 (0)