Skip to content

Commit cb1fd8a

Browse files
Kangyan-Zhouclaude
andcommitted
feat: add K8s Lease-based leader election for shared storage downloads
On shared filesystems (NFS/GPFS/CephFS/Lustre), only one agent should download model files per model. Others wait with jitter and recheck. - Detect shared storage via syscall.Statfs filesystem magic numbers - Per-model K8s Leases (model-download-<name>) for parallel downloads of different models while preventing duplicate downloads of the same model - Non-leaders wait up to 5.5min with 15s jitter between rechecks - Handle expired leases, API errors (IsNotFound vs transient), context cancellation - Guard against nil HolderIdentity, lease renewal conflicts - Use time.NewTimer with explicit Stop() to avoid timer leaks - Fall back to downloading if leader times out Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3b1140e commit cb1fd8a

2 files changed

Lines changed: 233 additions & 22 deletions

File tree

cmd/model-agent/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ func initializeComponents(
268268
logger,
269269
baseModelInformer.Lister(),
270270
clusterBaseModelInformer.Lister(),
271+
cfg.nodeName,
272+
cfg.namespace,
271273
)
272274
if err != nil {
273275
return nil, nil, fmt.Errorf("failed to create gopher: %w", err)

pkg/modelagent/gopher.go

Lines changed: 231 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,21 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"math/rand"
78
"os"
89
"path/filepath"
910
"strings"
1011
"sync"
1112
"sync/atomic"
13+
"syscall"
1214
"time"
1315

1416
"k8s.io/apimachinery/pkg/labels"
1517

1618
"github.com/oracle/oci-go-sdk/v65/objectstorage"
1719
"go.uber.org/zap"
20+
coordinationv1 "k8s.io/api/coordination/v1"
21+
apierrors "k8s.io/apimachinery/pkg/api/errors"
1822
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1923
"k8s.io/client-go/kubernetes"
2024

@@ -64,6 +68,13 @@ type Gopher struct {
6468
// Track active downloads for cancellation
6569
activeDownloads map[string]context.CancelFunc // key: model UID
6670
activeDownloadsMutex sync.RWMutex
71+
72+
// Shared storage coordination: when modelRootDir is on a shared filesystem
73+
// (NFS, GPFS, CephFS, Lustre), only the download leader should download.
74+
// Other agents wait with jitter and recheck for files on disk.
75+
isSharedStorage bool
76+
nodeName string
77+
namespace string
6778
}
6879

6980
const (
@@ -84,12 +95,19 @@ func NewGopher(
8495
metrics *Metrics,
8596
logger *zap.SugaredLogger,
8697
baseModelLister omev1beta1lister.BaseModelLister,
87-
clusterBaseModelLister omev1beta1lister.ClusterBaseModelLister) (*Gopher, error) {
98+
clusterBaseModelLister omev1beta1lister.ClusterBaseModelLister,
99+
nodeName string,
100+
namespace string) (*Gopher, error) {
88101

89102
if xetConfig == nil {
90103
return nil, fmt.Errorf("xet hugging face config cannot be nil")
91104
}
92105

106+
shared := isSharedFilesystem(modelRootDir, logger)
107+
if shared {
108+
logger.Infof("Detected shared filesystem at %s — download leader election enabled", modelRootDir)
109+
}
110+
93111
return &Gopher{
94112
modelConfigParser: modelConfigParser,
95113
configMapReconciler: configMapReconciler,
@@ -106,6 +124,9 @@ func NewGopher(
106124
activeDownloads: make(map[string]context.CancelFunc),
107125
baseModelLister: baseModelLister,
108126
clusterBaseModelLister: clusterBaseModelLister,
127+
isSharedStorage: shared,
128+
nodeName: nodeName,
129+
namespace: namespace,
109130
}, nil
110131
}
111132

@@ -331,15 +352,8 @@ func (s *Gopher) processTask(task *GopherTask) error {
331352
// or failed retry that must re-evaluate the model files.
332353
if task.TaskType == Download && s.isModelAlreadyDownloaded(destPath) {
333354
s.logger.Infof("Model %s already exists at %s (shared storage), skipping OCI download", modelInfo, destPath)
334-
var baseModel *v1beta1.BaseModel
335-
var clusterBaseModel *v1beta1.ClusterBaseModel
336-
if task.BaseModel != nil {
337-
baseModel = task.BaseModel
338-
} else if task.ClusterBaseModel != nil {
339-
clusterBaseModel = task.ClusterBaseModel
340-
}
341-
if err := s.safeParseAndUpdateModelConfig(destPath, baseModel, clusterBaseModel, nil); err != nil {
342-
return fmt.Errorf("model files exist at %s but config update failed: %w", destPath, err)
355+
if err := s.skipDownloadAndUpdateConfig(destPath, task); err != nil {
356+
return err
343357
}
344358
break
345359
}
@@ -997,21 +1011,25 @@ func (s *Gopher) processHuggingFaceModel(ctx context.Context, task *GopherTask,
9971011
// from HuggingFace, causing rate-limiting and hours of unnecessary I/O.
9981012
// Only for fresh Download tasks — DownloadOverride indicates a spec change
9991013
// or failed retry that must re-evaluate the model files.
1000-
if task.TaskType == Download && s.isModelAlreadyDownloaded(destPath) {
1001-
s.logger.Infof("Model %s already exists at %s (shared storage), skipping HuggingFace download", modelInfo, destPath)
1002-
1003-
var baseModel *v1beta1.BaseModel
1004-
var clusterBaseModel *v1beta1.ClusterBaseModel
1005-
if task.BaseModel != nil {
1006-
baseModel = task.BaseModel
1007-
} else if task.ClusterBaseModel != nil {
1008-
clusterBaseModel = task.ClusterBaseModel
1014+
if task.TaskType == Download {
1015+
if s.isModelAlreadyDownloaded(destPath) {
1016+
s.logger.Infof("Model %s already exists at %s (shared storage), skipping HuggingFace download", modelInfo, destPath)
1017+
return s.skipDownloadAndUpdateConfig(destPath, task)
10091018
}
10101019

1011-
if err := s.safeParseAndUpdateModelConfig(destPath, baseModel, clusterBaseModel, nil); err != nil {
1012-
return fmt.Errorf("model files exist at %s but config update failed: %w", destPath, err)
1020+
// On shared storage, only the download leader should proceed. Non-leaders
1021+
// wait for the leader to finish and then recheck for files on disk.
1022+
if s.isSharedStorage && !s.isDownloadLeader(ctx, modelInfo) {
1023+
if s.waitForSharedStorageModel(ctx, destPath, modelInfo) {
1024+
s.logger.Infof("Model %s appeared on shared storage at %s after waiting for leader", modelInfo, destPath)
1025+
return s.skipDownloadAndUpdateConfig(destPath, task)
1026+
}
1027+
if ctx.Err() != nil {
1028+
return fmt.Errorf("download cancelled while waiting for shared storage leader: %w", ctx.Err())
1029+
}
1030+
// Timed out waiting — fall through to download as a fallback
1031+
s.logger.Warnf("Model %s not found after waiting for leader, proceeding with own download", modelInfo)
10131032
}
1014-
return nil
10151033
}
10161034

10171035
// fetch sha value based on model ID from Huggingface model API
@@ -1484,6 +1502,24 @@ func (s *Gopher) isRemoveParentArtifactDirectory(ctx context.Context, hasChildre
14841502
return !exists
14851503
}
14861504

1505+
// skipDownloadAndUpdateConfig handles the case where model files already exist
1506+
// at the destination path (e.g., downloaded by another node on shared storage,
1507+
// or left from a previous run). It parses the model config and updates the
1508+
// ConfigMap, bypassing the download step.
1509+
func (s *Gopher) skipDownloadAndUpdateConfig(destPath string, task *GopherTask) error {
1510+
var baseModel *v1beta1.BaseModel
1511+
var clusterBaseModel *v1beta1.ClusterBaseModel
1512+
if task.BaseModel != nil {
1513+
baseModel = task.BaseModel
1514+
} else if task.ClusterBaseModel != nil {
1515+
clusterBaseModel = task.ClusterBaseModel
1516+
}
1517+
if err := s.safeParseAndUpdateModelConfig(destPath, baseModel, clusterBaseModel, nil); err != nil {
1518+
return fmt.Errorf("model files exist at %s but config update failed: %w", destPath, err)
1519+
}
1520+
return nil
1521+
}
1522+
14871523
// isModelAlreadyDownloaded checks whether the model files are already present at
14881524
// destPath. This handles the shared-storage case: when multiple nodes mount the
14891525
// same filesystem (e.g., NFS at /storage/models), the first node that finishes an
@@ -1697,3 +1733,176 @@ func (s *Gopher) checkConfigAndWeights(destPath string) bool {
16971733
s.logger.Infof("checkConfigAndWeights(%s): config.json exists but no weight files found", destPath)
16981734
return false
16991735
}
1736+
1737+
// isSharedFilesystem detects whether the given path is on a shared/network
1738+
// filesystem by checking the filesystem type via syscall.Statfs.
1739+
// Known shared filesystem types: NFS, GPFS, CephFS, Lustre, GlusterFS, FUSE.
1740+
// Note: filesystem type detection via magic numbers only works on Linux.
1741+
// On macOS/Darwin, Statfs_t has a different layout and this will return false.
1742+
func isSharedFilesystem(path string, logger *zap.SugaredLogger) bool {
1743+
var stat syscall.Statfs_t
1744+
if err := syscall.Statfs(path, &stat); err != nil {
1745+
logger.Warnf("isSharedFilesystem(%s): syscall.Statfs failed: %v — shared storage detection disabled", path, err)
1746+
return false
1747+
}
1748+
// Filesystem magic numbers (from linux/magic.h and kernel sources)
1749+
switch stat.Type {
1750+
case 0x6969: // NFS_SUPER_MAGIC
1751+
return true
1752+
case 0x47504653: // GPFS (IBM Spectrum Scale)
1753+
return true
1754+
case 0x00C36400: // CEPH_SUPER_MAGIC
1755+
return true
1756+
case 0x0BD00BD0: // LUSTRE_SUPER_MAGIC
1757+
return true
1758+
case 0x65735546: // FUSE_SUPER_MAGIC (commonly used for network mounts)
1759+
return true
1760+
case 0x6A656A62: // GlusterFS
1761+
return true
1762+
default:
1763+
return false
1764+
}
1765+
}
1766+
1767+
const (
1768+
// downloadLeaderLeasePrefix is the prefix for per-model K8s Leases used for
1769+
// leader election on shared storage. Each model gets its own lease so different
1770+
// models can be downloaded in parallel by different nodes.
1771+
downloadLeaderLeasePrefix = "model-download-"
1772+
1773+
// downloadLeaderLeaseDuration is how long a leader holds a per-model lease.
1774+
downloadLeaderLeaseDuration = 5 * time.Minute
1775+
1776+
// sharedStorageRecheckInterval is how often non-leaders recheck for files.
1777+
sharedStorageRecheckInterval = 30 * time.Second
1778+
1779+
// sharedStorageMaxJitter is the max random jitter added before rechecks.
1780+
sharedStorageMaxJitter = 15 * time.Second
1781+
)
1782+
1783+
// sanitizeLeaseeName converts a model identifier (e.g., "google/gemma-4-31B-it")
1784+
// into a valid K8s resource name (lowercase, no slashes, max 253 chars).
1785+
func sanitizeLeaseName(modelInfo string) string {
1786+
name := strings.ToLower(modelInfo)
1787+
name = strings.ReplaceAll(name, "/", "-")
1788+
name = strings.ReplaceAll(name, "_", "-")
1789+
// K8s names must be <= 253 chars and match [a-z0-9]([-a-z0-9]*[a-z0-9])?
1790+
if len(name) > 200 {
1791+
name = name[:200]
1792+
}
1793+
return downloadLeaderLeasePrefix + name
1794+
}
1795+
1796+
// isDownloadLeader checks if this node currently holds the per-model download
1797+
// Lease. Each model gets its own lease so different models can be downloaded in
1798+
// parallel by different nodes. If the lease doesn't exist, it tries to create it
1799+
// (becoming leader). If held by this node, it renews and returns true. If held by
1800+
// another node, it checks for expiry and attempts to take over; otherwise returns false.
1801+
func (s *Gopher) isDownloadLeader(ctx context.Context, modelInfo string) bool {
1802+
leaseName := sanitizeLeaseName(modelInfo)
1803+
leasesClient := s.kubeClient.CoordinationV1().Leases(s.namespace)
1804+
now := metav1.NewMicroTime(time.Now())
1805+
1806+
lease, err := leasesClient.Get(ctx, leaseName, metav1.GetOptions{})
1807+
if err != nil {
1808+
if !apierrors.IsNotFound(err) {
1809+
// API server error — coordination unavailable, download independently
1810+
s.logger.Warnf("Failed to check download leader lease (API error): %v — this node will download independently", err)
1811+
return true
1812+
}
1813+
// Lease doesn't exist — try to create it and become leader
1814+
leaseDuration := int32(downloadLeaderLeaseDuration.Seconds())
1815+
newLease := &coordinationv1.Lease{
1816+
ObjectMeta: metav1.ObjectMeta{
1817+
Name: leaseName,
1818+
Namespace: s.namespace,
1819+
},
1820+
Spec: coordinationv1.LeaseSpec{
1821+
HolderIdentity: &s.nodeName,
1822+
LeaseDurationSeconds: &leaseDuration,
1823+
AcquireTime: &now,
1824+
RenewTime: &now,
1825+
},
1826+
}
1827+
_, createErr := leasesClient.Create(ctx, newLease, metav1.CreateOptions{})
1828+
if createErr != nil {
1829+
s.logger.Infof("Failed to acquire download leader lease for %s (another node won): %v", modelInfo, createErr)
1830+
return false
1831+
}
1832+
s.logger.Infof("Acquired download leader lease for %s — this node (%s) will download", modelInfo, s.nodeName)
1833+
return true
1834+
}
1835+
1836+
// Lease exists — check if we hold it or if it's expired
1837+
if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == s.nodeName {
1838+
// We hold it — renew
1839+
lease.Spec.RenewTime = &now
1840+
_, err = leasesClient.Update(ctx, lease, metav1.UpdateOptions{})
1841+
if err != nil {
1842+
if apierrors.IsConflict(err) || apierrors.IsNotFound(err) {
1843+
s.logger.Warnf("Lost download leader lease during renewal: %v — yielding leadership", err)
1844+
return false
1845+
}
1846+
s.logger.Warnf("Failed to renew download leader lease (transient error): %v — proceeding as leader", err)
1847+
}
1848+
return true
1849+
}
1850+
1851+
// Another node holds it — check if expired
1852+
if lease.Spec.RenewTime != nil && lease.Spec.LeaseDurationSeconds != nil {
1853+
expiry := lease.Spec.RenewTime.Time.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second)
1854+
if time.Now().After(expiry) {
1855+
// Expired — take over
1856+
lease.Spec.HolderIdentity = &s.nodeName
1857+
lease.Spec.AcquireTime = &now
1858+
lease.Spec.RenewTime = &now
1859+
_, err = leasesClient.Update(ctx, lease, metav1.UpdateOptions{})
1860+
if err != nil {
1861+
s.logger.Infof("Failed to take over expired download leader lease: %v", err)
1862+
return false
1863+
}
1864+
s.logger.Infof("Took over expired download leader lease for %s — this node (%s) will download", modelInfo, s.nodeName)
1865+
return true
1866+
}
1867+
}
1868+
1869+
holderID := "<unknown>"
1870+
if lease.Spec.HolderIdentity != nil {
1871+
holderID = *lease.Spec.HolderIdentity
1872+
}
1873+
s.logger.Infof("Download leader lease for %s held by %s — this node (%s) will wait for shared storage files",
1874+
modelInfo, holderID, s.nodeName)
1875+
return false
1876+
}
1877+
1878+
// waitForSharedStorageModel waits for a model to appear on shared storage,
1879+
// with jitter and periodic rechecks. Returns true if the model appeared (another
1880+
// node downloaded it), false if the context was cancelled or max wait exceeded.
1881+
// Maximum wait time is downloadLeaderLeaseDuration + 30s (currently 5m30s).
1882+
func (s *Gopher) waitForSharedStorageModel(ctx context.Context, destPath string, modelInfo string) bool {
1883+
maxWait := downloadLeaderLeaseDuration + 30*time.Second
1884+
deadline := time.Now().Add(maxWait)
1885+
1886+
for time.Now().Before(deadline) {
1887+
// Add random jitter to avoid thundering herd on recheck
1888+
jitter := time.Duration(rand.Int63n(int64(sharedStorageMaxJitter)))
1889+
s.logger.Infof("Shared storage: waiting %v before rechecking %s for model %s", sharedStorageRecheckInterval+jitter, destPath, modelInfo)
1890+
1891+
timer := time.NewTimer(sharedStorageRecheckInterval + jitter)
1892+
select {
1893+
case <-ctx.Done():
1894+
timer.Stop()
1895+
s.logger.Infof("Shared storage: wait cancelled for model %s at %s: %v", modelInfo, destPath, ctx.Err())
1896+
return false
1897+
case <-timer.C:
1898+
}
1899+
1900+
if s.isModelAlreadyDownloaded(destPath) {
1901+
s.logger.Infof("Shared storage: model %s appeared at %s (downloaded by leader)", modelInfo, destPath)
1902+
return true
1903+
}
1904+
}
1905+
1906+
s.logger.Warnf("Shared storage: timed out waiting for model %s at %s — will attempt own download", modelInfo, destPath)
1907+
return false
1908+
}

0 commit comments

Comments
 (0)