Skip to content

Commit 4c83bd1

Browse files
Kangyan-Zhouclaude
andcommitted
feat: add K8s Lease-based leader election for shared storage downloads
On shared filesystems (NFS/GPFS/CephFS/Lustre), only one agent should download model files. Others wait with jitter and recheck for files. - Detect shared storage via syscall.Statfs filesystem magic numbers - Use K8s Lease "model-download-leader" for leader election - Non-leaders wait up to 5.5min with 15s jitter between rechecks - Handle expired leases, API errors, context cancellation - Fall back to downloading if leader times out Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3b1140e commit 4c83bd1

2 files changed

Lines changed: 215 additions & 22 deletions

File tree

cmd/model-agent/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ func initializeComponents(
268268
logger,
269269
baseModelInformer.Lister(),
270270
clusterBaseModelInformer.Lister(),
271+
cfg.nodeName,
272+
cfg.namespace,
271273
)
272274
if err != nil {
273275
return nil, nil, fmt.Errorf("failed to create gopher: %w", err)

pkg/modelagent/gopher.go

Lines changed: 213 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,21 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"math/rand"
78
"os"
89
"path/filepath"
910
"strings"
1011
"sync"
1112
"sync/atomic"
13+
"syscall"
1214
"time"
1315

1416
"k8s.io/apimachinery/pkg/labels"
1517

1618
"github.com/oracle/oci-go-sdk/v65/objectstorage"
1719
"go.uber.org/zap"
20+
coordinationv1 "k8s.io/api/coordination/v1"
21+
apierrors "k8s.io/apimachinery/pkg/api/errors"
1822
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1923
"k8s.io/client-go/kubernetes"
2024

@@ -64,6 +68,13 @@ type Gopher struct {
6468
// Track active downloads for cancellation
6569
activeDownloads map[string]context.CancelFunc // key: model UID
6670
activeDownloadsMutex sync.RWMutex
71+
72+
// Shared storage coordination: when modelRootDir is on a shared filesystem
73+
// (NFS, GPFS, CephFS, Lustre), only the download leader should download.
74+
// Other agents wait with jitter and recheck for files on disk.
75+
isSharedStorage bool
76+
nodeName string
77+
namespace string
6778
}
6879

6980
const (
@@ -84,12 +95,19 @@ func NewGopher(
8495
metrics *Metrics,
8596
logger *zap.SugaredLogger,
8697
baseModelLister omev1beta1lister.BaseModelLister,
87-
clusterBaseModelLister omev1beta1lister.ClusterBaseModelLister) (*Gopher, error) {
98+
clusterBaseModelLister omev1beta1lister.ClusterBaseModelLister,
99+
nodeName string,
100+
namespace string) (*Gopher, error) {
88101

89102
if xetConfig == nil {
90103
return nil, fmt.Errorf("xet hugging face config cannot be nil")
91104
}
92105

106+
shared := isSharedFilesystem(modelRootDir, logger)
107+
if shared {
108+
logger.Infof("Detected shared filesystem at %s — download leader election enabled", modelRootDir)
109+
}
110+
93111
return &Gopher{
94112
modelConfigParser: modelConfigParser,
95113
configMapReconciler: configMapReconciler,
@@ -106,6 +124,9 @@ func NewGopher(
106124
activeDownloads: make(map[string]context.CancelFunc),
107125
baseModelLister: baseModelLister,
108126
clusterBaseModelLister: clusterBaseModelLister,
127+
isSharedStorage: shared,
128+
nodeName: nodeName,
129+
namespace: namespace,
109130
}, nil
110131
}
111132

@@ -331,15 +352,8 @@ func (s *Gopher) processTask(task *GopherTask) error {
331352
// or failed retry that must re-evaluate the model files.
332353
if task.TaskType == Download && s.isModelAlreadyDownloaded(destPath) {
333354
s.logger.Infof("Model %s already exists at %s (shared storage), skipping OCI download", modelInfo, destPath)
334-
var baseModel *v1beta1.BaseModel
335-
var clusterBaseModel *v1beta1.ClusterBaseModel
336-
if task.BaseModel != nil {
337-
baseModel = task.BaseModel
338-
} else if task.ClusterBaseModel != nil {
339-
clusterBaseModel = task.ClusterBaseModel
340-
}
341-
if err := s.safeParseAndUpdateModelConfig(destPath, baseModel, clusterBaseModel, nil); err != nil {
342-
return fmt.Errorf("model files exist at %s but config update failed: %w", destPath, err)
355+
if err := s.skipDownloadAndUpdateConfig(destPath, task); err != nil {
356+
return err
343357
}
344358
break
345359
}
@@ -997,21 +1011,25 @@ func (s *Gopher) processHuggingFaceModel(ctx context.Context, task *GopherTask,
9971011
// from HuggingFace, causing rate-limiting and hours of unnecessary I/O.
9981012
// Only for fresh Download tasks — DownloadOverride indicates a spec change
9991013
// or failed retry that must re-evaluate the model files.
1000-
if task.TaskType == Download && s.isModelAlreadyDownloaded(destPath) {
1001-
s.logger.Infof("Model %s already exists at %s (shared storage), skipping HuggingFace download", modelInfo, destPath)
1002-
1003-
var baseModel *v1beta1.BaseModel
1004-
var clusterBaseModel *v1beta1.ClusterBaseModel
1005-
if task.BaseModel != nil {
1006-
baseModel = task.BaseModel
1007-
} else if task.ClusterBaseModel != nil {
1008-
clusterBaseModel = task.ClusterBaseModel
1014+
if task.TaskType == Download {
1015+
if s.isModelAlreadyDownloaded(destPath) {
1016+
s.logger.Infof("Model %s already exists at %s (shared storage), skipping HuggingFace download", modelInfo, destPath)
1017+
return s.skipDownloadAndUpdateConfig(destPath, task)
10091018
}
10101019

1011-
if err := s.safeParseAndUpdateModelConfig(destPath, baseModel, clusterBaseModel, nil); err != nil {
1012-
return fmt.Errorf("model files exist at %s but config update failed: %w", destPath, err)
1020+
// On shared storage, only the download leader should proceed. Non-leaders
1021+
// wait for the leader to finish and then recheck for files on disk.
1022+
if s.isSharedStorage && !s.isDownloadLeader(ctx) {
1023+
if s.waitForSharedStorageModel(ctx, destPath, modelInfo) {
1024+
s.logger.Infof("Model %s appeared on shared storage at %s after waiting for leader", modelInfo, destPath)
1025+
return s.skipDownloadAndUpdateConfig(destPath, task)
1026+
}
1027+
if ctx.Err() != nil {
1028+
return fmt.Errorf("download cancelled while waiting for shared storage leader: %w", ctx.Err())
1029+
}
1030+
// Timed out waiting — fall through to download as a fallback
1031+
s.logger.Warnf("Model %s not found after waiting for leader, proceeding with own download", modelInfo)
10131032
}
1014-
return nil
10151033
}
10161034

10171035
// fetch sha value based on model ID from Huggingface model API
@@ -1484,6 +1502,24 @@ func (s *Gopher) isRemoveParentArtifactDirectory(ctx context.Context, hasChildre
14841502
return !exists
14851503
}
14861504

1505+
// skipDownloadAndUpdateConfig handles the case where model files already exist
1506+
// at the destination path (e.g., downloaded by another node on shared storage,
1507+
// or left from a previous run). It parses the model config and updates the
1508+
// ConfigMap, bypassing the download step.
1509+
func (s *Gopher) skipDownloadAndUpdateConfig(destPath string, task *GopherTask) error {
1510+
var baseModel *v1beta1.BaseModel
1511+
var clusterBaseModel *v1beta1.ClusterBaseModel
1512+
if task.BaseModel != nil {
1513+
baseModel = task.BaseModel
1514+
} else if task.ClusterBaseModel != nil {
1515+
clusterBaseModel = task.ClusterBaseModel
1516+
}
1517+
if err := s.safeParseAndUpdateModelConfig(destPath, baseModel, clusterBaseModel, nil); err != nil {
1518+
return fmt.Errorf("model files exist at %s but config update failed: %w", destPath, err)
1519+
}
1520+
return nil
1521+
}
1522+
14871523
// isModelAlreadyDownloaded checks whether the model files are already present at
14881524
// destPath. This handles the shared-storage case: when multiple nodes mount the
14891525
// same filesystem (e.g., NFS at /storage/models), the first node that finishes an
@@ -1697,3 +1733,158 @@ func (s *Gopher) checkConfigAndWeights(destPath string) bool {
16971733
s.logger.Infof("checkConfigAndWeights(%s): config.json exists but no weight files found", destPath)
16981734
return false
16991735
}
1736+
1737+
// isSharedFilesystem detects whether the given path is on a shared/network
1738+
// filesystem by checking the filesystem type via syscall.Statfs.
1739+
// Known shared filesystem types: NFS, GPFS, CephFS, Lustre, GlusterFS, FUSE.
1740+
// Note: filesystem type detection via magic numbers only works on Linux.
1741+
// On macOS/Darwin, Statfs_t has a different layout and this will return false.
1742+
func isSharedFilesystem(path string, logger *zap.SugaredLogger) bool {
1743+
var stat syscall.Statfs_t
1744+
if err := syscall.Statfs(path, &stat); err != nil {
1745+
logger.Warnf("isSharedFilesystem(%s): syscall.Statfs failed: %v — shared storage detection disabled", path, err)
1746+
return false
1747+
}
1748+
// Filesystem magic numbers (from linux/magic.h and kernel sources)
1749+
switch stat.Type {
1750+
case 0x6969: // NFS_SUPER_MAGIC
1751+
return true
1752+
case 0x47504653: // GPFS (IBM Spectrum Scale)
1753+
return true
1754+
case 0x00C36400: // CEPH_SUPER_MAGIC
1755+
return true
1756+
case 0x0BD00BD0: // LUSTRE_SUPER_MAGIC
1757+
return true
1758+
case 0x65735546: // FUSE_SUPER_MAGIC (commonly used for network mounts)
1759+
return true
1760+
case 0x6A656A62: // GlusterFS
1761+
return true
1762+
default:
1763+
return false
1764+
}
1765+
}
1766+
1767+
const (
1768+
// downloadLeaderLeaseName is the K8s Lease used for leader election among
1769+
// model-agents on shared storage. Only the leader downloads; others wait.
1770+
downloadLeaderLeaseName = "model-download-leader"
1771+
1772+
// downloadLeaderLeaseDuration is how long a leader holds the lease.
1773+
downloadLeaderLeaseDuration = 5 * time.Minute
1774+
1775+
// sharedStorageRecheckInterval is how often non-leaders recheck for files.
1776+
sharedStorageRecheckInterval = 30 * time.Second
1777+
1778+
// sharedStorageMaxJitter is the max random jitter added before rechecks.
1779+
sharedStorageMaxJitter = 15 * time.Second
1780+
)
1781+
1782+
// isDownloadLeader checks if this node currently holds the model-download-leader
1783+
// Lease. If the lease doesn't exist, it tries to create it (becoming leader).
1784+
// If held by this node, it renews and returns true. If held by another node, it
1785+
// checks for expiry and attempts to take over an expired lease; otherwise returns false.
1786+
func (s *Gopher) isDownloadLeader(ctx context.Context) bool {
1787+
leasesClient := s.kubeClient.CoordinationV1().Leases(s.namespace)
1788+
now := metav1.NewMicroTime(time.Now())
1789+
1790+
lease, err := leasesClient.Get(ctx, downloadLeaderLeaseName, metav1.GetOptions{})
1791+
if err != nil {
1792+
if !apierrors.IsNotFound(err) {
1793+
// API server error — coordination unavailable, download independently
1794+
s.logger.Warnf("Failed to check download leader lease (API error): %v — this node will download independently", err)
1795+
return true
1796+
}
1797+
// Lease doesn't exist — try to create it and become leader
1798+
leaseDuration := int32(downloadLeaderLeaseDuration.Seconds())
1799+
newLease := &coordinationv1.Lease{
1800+
ObjectMeta: metav1.ObjectMeta{
1801+
Name: downloadLeaderLeaseName,
1802+
Namespace: s.namespace,
1803+
},
1804+
Spec: coordinationv1.LeaseSpec{
1805+
HolderIdentity: &s.nodeName,
1806+
LeaseDurationSeconds: &leaseDuration,
1807+
AcquireTime: &now,
1808+
RenewTime: &now,
1809+
},
1810+
}
1811+
_, createErr := leasesClient.Create(ctx, newLease, metav1.CreateOptions{})
1812+
if createErr != nil {
1813+
s.logger.Infof("Failed to acquire download leader lease (another node won): %v", createErr)
1814+
return false
1815+
}
1816+
s.logger.Infof("Acquired download leader lease — this node (%s) will download models", s.nodeName)
1817+
return true
1818+
}
1819+
1820+
// Lease exists — check if we hold it or if it's expired
1821+
if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == s.nodeName {
1822+
// We hold it — renew
1823+
lease.Spec.RenewTime = &now
1824+
_, err = leasesClient.Update(ctx, lease, metav1.UpdateOptions{})
1825+
if err != nil {
1826+
if apierrors.IsConflict(err) || apierrors.IsNotFound(err) {
1827+
s.logger.Warnf("Lost download leader lease during renewal: %v — yielding leadership", err)
1828+
return false
1829+
}
1830+
s.logger.Warnf("Failed to renew download leader lease (transient error): %v — proceeding as leader", err)
1831+
}
1832+
return true
1833+
}
1834+
1835+
// Another node holds it — check if expired
1836+
if lease.Spec.RenewTime != nil && lease.Spec.LeaseDurationSeconds != nil {
1837+
expiry := lease.Spec.RenewTime.Time.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second)
1838+
if time.Now().After(expiry) {
1839+
// Expired — take over
1840+
lease.Spec.HolderIdentity = &s.nodeName
1841+
lease.Spec.AcquireTime = &now
1842+
lease.Spec.RenewTime = &now
1843+
_, err = leasesClient.Update(ctx, lease, metav1.UpdateOptions{})
1844+
if err != nil {
1845+
s.logger.Infof("Failed to take over expired download leader lease: %v", err)
1846+
return false
1847+
}
1848+
s.logger.Infof("Took over expired download leader lease — this node (%s) will download models", s.nodeName)
1849+
return true
1850+
}
1851+
}
1852+
1853+
holderID := "<unknown>"
1854+
if lease.Spec.HolderIdentity != nil {
1855+
holderID = *lease.Spec.HolderIdentity
1856+
}
1857+
s.logger.Infof("Download leader lease held by %s — this node (%s) will wait for shared storage files",
1858+
holderID, s.nodeName)
1859+
return false
1860+
}
1861+
1862+
// waitForSharedStorageModel waits for a model to appear on shared storage,
1863+
// with jitter and periodic rechecks. Returns true if the model appeared (another
1864+
// node downloaded it), false if the context was cancelled or max wait exceeded.
1865+
// Maximum wait time is downloadLeaderLeaseDuration + 30s (currently 5m30s).
1866+
func (s *Gopher) waitForSharedStorageModel(ctx context.Context, destPath string, modelInfo string) bool {
1867+
maxWait := downloadLeaderLeaseDuration + 30*time.Second
1868+
deadline := time.Now().Add(maxWait)
1869+
1870+
for time.Now().Before(deadline) {
1871+
// Add random jitter to avoid thundering herd on recheck
1872+
jitter := time.Duration(rand.Int63n(int64(sharedStorageMaxJitter)))
1873+
s.logger.Infof("Shared storage: waiting %v before rechecking %s for model %s", sharedStorageRecheckInterval+jitter, destPath, modelInfo)
1874+
1875+
select {
1876+
case <-ctx.Done():
1877+
s.logger.Infof("Shared storage: wait cancelled for model %s at %s: %v", modelInfo, destPath, ctx.Err())
1878+
return false
1879+
case <-time.After(sharedStorageRecheckInterval + jitter):
1880+
}
1881+
1882+
if s.isModelAlreadyDownloaded(destPath) {
1883+
s.logger.Infof("Shared storage: model %s appeared at %s (downloaded by leader)", modelInfo, destPath)
1884+
return true
1885+
}
1886+
}
1887+
1888+
s.logger.Warnf("Shared storage: timed out waiting for model %s at %s — will attempt own download", modelInfo, destPath)
1889+
return false
1890+
}

0 commit comments

Comments
 (0)