Skip to content

Commit 79ddf68

Browse files
committed
feat(p2p): wire P2P distribution into model-agent main
Integrate P2P model distribution into the model-agent entry point: - Add P2P configuration fields (ports, rates, encryption, timeout) - Read P2P settings from environment variables - Create ModelDistributor when P2P_ENABLED=true - Create P2PLeaseManager for download coordination - Call gopher.EnableP2P() to activate P2P download flow - Start MetainfoServer for peer discovery - Add graceful shutdown for P2P resources Environment variables: - P2P_ENABLED: Enable/disable P2P (default: false) - PEERS_SERVICE: Headless service DNS for peer discovery - P2P_TORRENT_PORT: BitTorrent port (default: 6881) - P2P_METAINFO_PORT: Metainfo HTTP port (default: 8081) - P2P_MAX_DOWNLOAD_RATE: Max download rate in bytes/s - P2P_MAX_UPLOAD_RATE: Max upload rate in bytes/s - P2P_ENCRYPTION_ENABLED: Enable BitTorrent encryption - P2P_DOWNLOAD_TIMEOUT: P2P download timeout in seconds
1 parent 565ed32 commit 79ddf68

5 files changed

Lines changed: 207 additions & 26 deletions

File tree

cmd/model-agent/main.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
omev1beta1client "github.com/sgl-project/ome/pkg/client/clientset/versioned"
2525
omev1beta1informers "github.com/sgl-project/ome/pkg/client/informers/externalversions"
26+
"github.com/sgl-project/ome/pkg/distributor"
2627
"github.com/sgl-project/ome/pkg/logging"
2728
"github.com/sgl-project/ome/pkg/modelagent"
2829
"github.com/sgl-project/ome/pkg/version"
@@ -43,6 +44,16 @@ type config struct {
4344
numDownloadWorker int
4445
namespace string
4546
logLevel string
47+
// P2P configuration
48+
p2pEnabled bool
49+
p2pPeersService string
50+
p2pTorrentPort int
51+
p2pMetainfoPort int
52+
p2pMaxDownloadRate int64
53+
p2pMaxUploadRate int64
54+
p2pEnableEncryption bool
55+
p2pRequireEncryption bool
56+
p2pDownloadTimeout time.Duration
4657
}
4758

4859
// Logger type alias for zap.SugaredLogger
@@ -89,6 +100,40 @@ func initConfig(_ *cobra.Command, _ []string) {
89100
panic("NODE_NAME environment variable is empty")
90101
}
91102
cfg.nodeName = nodeName
103+
104+
// P2P configuration from environment variables
105+
cfg.p2pEnabled = os.Getenv("P2P_ENABLED") == "true"
106+
cfg.p2pPeersService = os.Getenv("PEERS_SERVICE")
107+
if cfg.p2pPeersService == "" {
108+
cfg.p2pPeersService = "ome-peers.ome.svc.cluster.local"
109+
}
110+
cfg.p2pTorrentPort = getEnvInt("P2P_TORRENT_PORT", 6881)
111+
cfg.p2pMetainfoPort = getEnvInt("P2P_METAINFO_PORT", 8081)
112+
cfg.p2pMaxDownloadRate = getEnvInt64("P2P_MAX_DOWNLOAD_RATE", 524288000) // 500 MB/s
113+
cfg.p2pMaxUploadRate = getEnvInt64("P2P_MAX_UPLOAD_RATE", 524288000) // 500 MB/s
114+
cfg.p2pEnableEncryption = os.Getenv("P2P_ENCRYPTION_ENABLED") == "true"
115+
cfg.p2pRequireEncryption = os.Getenv("P2P_ENCRYPTION_REQUIRED") == "true"
116+
cfg.p2pDownloadTimeout = time.Duration(getEnvInt("P2P_DOWNLOAD_TIMEOUT", 3600)) * time.Second // 1 hour default
117+
}
118+
119+
// getEnvInt reads an integer from environment variable with a default value
120+
func getEnvInt(key string, defaultVal int) int {
121+
if val := os.Getenv(key); val != "" {
122+
if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 {
123+
return defaultVal
124+
}
125+
}
126+
return defaultVal
127+
}
128+
129+
// getEnvInt64 reads an int64 from environment variable with a default value
130+
func getEnvInt64(key string, defaultVal int64) int64 {
131+
if val := os.Getenv(key); val != "" {
132+
if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 {
133+
return defaultVal
134+
}
135+
}
136+
return defaultVal
92137
}
93138

94139
// initializeLogger creates and configures a zap logger with the specified settings
@@ -334,6 +379,75 @@ func runCommand(cmd *cobra.Command, args []string) {
334379
logger.Fatalf("Failed to initialize components: %v", err)
335380
}
336381

382+
// Initialize P2P distribution if enabled
383+
var p2pDistributor *distributor.ModelDistributor
384+
var metainfoServer *distributor.MetainfoServer
385+
if cfg.p2pEnabled {
386+
logger.Info("P2P model distribution is enabled, initializing...")
387+
388+
// Get POD_NAME and POD_IP for P2P peer identification
389+
podName := os.Getenv("POD_NAME")
390+
if podName == "" {
391+
logger.Warn("POD_NAME not set, P2P coordination may not work correctly")
392+
}
393+
podIP := os.Getenv("POD_IP")
394+
if podIP == "" {
395+
logger.Warn("POD_IP not set, P2P peer discovery may not work correctly")
396+
}
397+
398+
// Create distributor configuration
399+
distCfg := distributor.Config{
400+
DataDir: cfg.modelsRootDir,
401+
PodName: podName,
402+
PodIP: podIP,
403+
PeersService: cfg.p2pPeersService,
404+
TorrentPort: cfg.p2pTorrentPort,
405+
MetainfoPort: cfg.p2pMetainfoPort,
406+
MaxDownloadRate: cfg.p2pMaxDownloadRate,
407+
MaxUploadRate: cfg.p2pMaxUploadRate,
408+
EnableEncryption: cfg.p2pEnableEncryption,
409+
RequireEncryption: cfg.p2pRequireEncryption,
410+
Namespace: cfg.namespace,
411+
LeaseDurationSeconds: 120, // 2 minutes
412+
LeaseRenewIntervalSeconds: 30, // renew every 30 seconds
413+
P2PTimeoutSeconds: int(cfg.p2pDownloadTimeout.Seconds()),
414+
EnableP2P: true,
415+
}
416+
417+
// Create the P2P distributor
418+
p2pDistributor, err = distributor.New(distCfg, logger)
419+
if err != nil {
420+
logger.Errorf("Failed to create P2P distributor: %v", err)
421+
logger.Warn("Continuing without P2P support")
422+
} else {
423+
// Create lease manager for P2P coordination
424+
leaseManager := modelagent.NewP2PLeaseManager(kubeClient, cfg.namespace, cfg.nodeName, logger)
425+
426+
// Enable P2P on the gopher
427+
gopher.EnableP2P(p2pDistributor, leaseManager)
428+
gopher.SetP2PTimeout(cfg.p2pDownloadTimeout)
429+
430+
// Create and start metainfo server
431+
metainfoServer = distributor.NewMetainfoServer(
432+
cfg.modelsRootDir,
433+
cfg.p2pMetainfoPort,
434+
p2pDistributor,
435+
logger,
436+
)
437+
438+
go func() {
439+
logger.Infof("Starting P2P metainfo server on port %d", cfg.p2pMetainfoPort)
440+
if err := metainfoServer.ServeWithContext(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) {
441+
logger.Errorf("Metainfo server error: %v", err)
442+
}
443+
}()
444+
445+
logger.Info("P2P model distribution initialized successfully")
446+
}
447+
} else {
448+
logger.Info("P2P model distribution is disabled")
449+
}
450+
337451
// Set up a health check server
338452
server := setupServer(cfg.port, cfg.modelsRootDir, logger)
339453
go func() {
@@ -350,6 +464,13 @@ func runCommand(cmd *cobra.Command, args []string) {
350464
if err := scout.Run(stopCh); err != nil {
351465
logger.Fatalf("Error running scout: %v", err)
352466
}
467+
468+
// Cleanup P2P resources on shutdown
469+
if p2pDistributor != nil {
470+
logger.Info("Shutting down P2P distributor...")
471+
p2pDistributor.Close()
472+
}
473+
_ = metainfoServer // Suppress unused warning - shutdown handled via context
353474
}
354475

355476
// createKubeClient creates a Kubernetes client from the provided config

pkg/constants/constants.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -383,10 +383,11 @@ const (
383383
P2PDefaultPieceLength = 4 * 1024 * 1024 // 4MB pieces
384384

385385
// P2P wait configuration for nodes waiting for P2P availability
386-
P2PDefaultWaitMaxAttempts = 60 // Maximum attempts before giving up on P2P
387-
P2PDefaultWaitBaseDelayMs = 2000 // Base delay between attempts (2 seconds)
388-
P2PDefaultWaitMaxDelayMs = 30000 // Maximum delay between attempts (30 seconds)
389-
P2PDefaultWaitBackoffDivisor = 10 // Backoff increases every N attempts
386+
P2PDefaultWaitMaxAttempts = 60 // Deprecated: kept for compatibility
387+
P2PDefaultWaitBaseDelayMs = 2000 // Interval between lease/peer checks (2 seconds)
388+
P2PDefaultWaitMaxDelayMs = 30000 // Deprecated: kept for compatibility
389+
P2PDefaultWaitBackoffDivisor = 10 // Deprecated: kept for compatibility
390+
P2PMaxWaitTimeMinutes = 240 // Maximum absolute wait time (4 hours) for very large models
390391
)
391392

392393
// P2P environment variable keys

pkg/distributor/distributor.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,17 @@ func (d *ModelDistributor) SeedModel(path, modelHash string) error {
184184
return nil
185185
}
186186

187+
d.logger.Infof("Creating metainfo for model %s at path %s (this may take several minutes for large models)...", modelHash, path)
188+
startTime := time.Now()
189+
187190
mi, err := d.createMetainfo(path, modelHash)
188191
if err != nil {
189192
return fmt.Errorf("failed to create metainfo: %w", err)
190193
}
191194

195+
metainfoTime := time.Since(startTime)
196+
d.logger.Infof("Metainfo created for model %s in %v", modelHash, metainfoTime.Round(time.Second))
197+
192198
t, err := d.torrentClient.AddTorrent(mi)
193199
if err != nil {
194200
return fmt.Errorf("failed to add torrent: %w", err)
@@ -198,7 +204,8 @@ func (d *ModelDistributor) SeedModel(path, modelHash string) error {
198204
d.activeTorrents[modelHash] = t
199205
d.metrics.RecordSeeding(modelHash)
200206

201-
d.logger.Infof("Started seeding model %s", modelHash)
207+
totalTime := time.Since(startTime)
208+
d.logger.Infof("Started seeding model %s (total setup time: %v)", modelHash, totalTime.Round(time.Second))
202209
return nil
203210
}
204211

pkg/distributor/metainfo_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (s *MetainfoServer) Start() error {
4646
Addr: fmt.Sprintf(":%d", s.port),
4747
Handler: mux,
4848
ReadTimeout: 30 * time.Second,
49-
WriteTimeout: 30 * time.Second,
49+
WriteTimeout: 10 * time.Minute, // Large metainfo files (30+ MB for 1TB models) need more time
5050
IdleTimeout: 60 * time.Second,
5151
}
5252

pkg/modelagent/gopher.go

Lines changed: 72 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,55 +1138,85 @@ func (s *Gopher) downloadFromHuggingFace(ctx context.Context, task *GopherTask,
11381138

11391139
// waitForP2PAvailability waits for the model to become available via P2P.
11401140
// This is used when another node holds the download lease.
1141-
// It uses exponential backoff to avoid overwhelming the cluster with DNS lookups.
1142-
func (s *Gopher) waitForP2PAvailability(ctx context.Context, modelHash, modelInfo string) error {
1141+
// It checks the lease status to determine whether to keep waiting:
1142+
// - If lease is complete: P2P should be available, try download
1143+
// - If lease exists and not expired: keep waiting (lease holder still downloading)
1144+
// - If lease expired or not found: give up (lease holder crashed)
1145+
// - If model is being deleted: abort early
1146+
func (s *Gopher) waitForP2PAvailability(ctx context.Context, task *GopherTask, modelHash, modelInfo, leaseName string) error {
11431147
if s.p2pDistributor == nil {
11441148
return fmt.Errorf("P2P distributor not configured")
11451149
}
11461150

11471151
// Use constants for configurable wait behavior
1148-
maxAttempts := constants.P2PDefaultWaitMaxAttempts
1149-
baseDelay := time.Duration(constants.P2PDefaultWaitBaseDelayMs) * time.Millisecond
1150-
maxDelay := time.Duration(constants.P2PDefaultWaitMaxDelayMs) * time.Millisecond
1151-
backoffDivisor := constants.P2PDefaultWaitBackoffDivisor
1152+
checkInterval := time.Duration(constants.P2PDefaultWaitBaseDelayMs) * time.Millisecond
1153+
maxWaitTime := time.Duration(constants.P2PMaxWaitTimeMinutes) * time.Minute
1154+
startTime := time.Now()
1155+
1156+
for {
1157+
elapsed := time.Since(startTime)
11521158

1153-
for attempt := 0; attempt < maxAttempts; attempt++ {
11541159
// Check context cancellation first
11551160
select {
11561161
case <-ctx.Done():
11571162
return ctx.Err()
11581163
default:
11591164
}
11601165

1166+
// Check absolute maximum wait time
1167+
if elapsed > maxWaitTime {
1168+
return fmt.Errorf("absolute timeout waiting for P2P availability for model %s after %v", modelInfo, elapsed)
1169+
}
1170+
1171+
// Check if the model is being deleted - abort early to allow cleanup
1172+
if s.isModelBeingDeleted(task) {
1173+
s.logger.Infof("Model %s is being deleted, aborting P2P wait", modelInfo)
1174+
return fmt.Errorf("model %s is being deleted, aborting P2P wait", modelInfo)
1175+
}
1176+
1177+
// Check lease status to decide whether to keep waiting
1178+
lease, err := s.p2pLeaseManager.Get(ctx, leaseName)
1179+
if err != nil {
1180+
s.logger.Debugf("Failed to get lease %s: %v, will retry", leaseName, err)
1181+
} else if lease != nil {
1182+
// Check if lease is complete (download finished, seeding started)
1183+
if s.p2pLeaseManager.IsComplete(lease) {
1184+
s.logger.Infof("Lease %s is complete, P2P should be available for model %s", leaseName, modelInfo)
1185+
// Give a short delay for seeding to fully start
1186+
time.Sleep(2 * time.Second)
1187+
} else if s.p2pLeaseManager.IsExpired(lease) {
1188+
// Lease expired - holder might have crashed
1189+
s.logger.Warnf("Lease %s expired for model %s, giving up on P2P wait", leaseName, modelInfo)
1190+
return fmt.Errorf("lease expired while waiting for P2P availability for model %s", modelInfo)
1191+
} else {
1192+
// Lease is active but not complete - holder still downloading
1193+
s.logger.Debugf("Lease %s still active (holder: %s) for model %s, waiting... (elapsed: %v)",
1194+
leaseName, *lease.Spec.HolderIdentity, modelInfo, elapsed.Round(time.Second))
1195+
}
1196+
}
1197+
11611198
// Check if model is available via P2P
11621199
if s.p2pDistributor.HasPeers(ctx, modelHash) {
11631200
s.logger.Infof("P2P peers now available for model %s, attempting download", modelInfo)
11641201
if err := s.p2pDistributor.TryP2PDownload(ctx, modelHash, s.p2pTimeout); err == nil {
1165-
s.logger.Infof("Successfully downloaded model %s via P2P after waiting", modelInfo)
1202+
s.logger.Infof("Successfully downloaded model %s via P2P after waiting %v", modelInfo, elapsed.Round(time.Second))
11661203
return nil
11671204
} else {
11681205
s.logger.Warnf("P2P download attempt failed for model %s: %v", modelInfo, err)
11691206
}
11701207
}
11711208

1172-
// Calculate delay with exponential backoff
1173-
// Backoff increases every backoffDivisor attempts to avoid rapid polling
1174-
delay := baseDelay * time.Duration(1<<uint(attempt/backoffDivisor))
1175-
if delay > maxDelay {
1176-
delay = maxDelay
1209+
// Log progress periodically (every 30 seconds)
1210+
if int(elapsed.Seconds())%30 == 0 && elapsed.Seconds() > 0 {
1211+
s.logger.Infof("Still waiting for P2P availability for model %s (elapsed: %v)", modelInfo, elapsed.Round(time.Second))
11771212
}
11781213

1179-
s.logger.Debugf("Waiting %v before next P2P check for model %s (attempt %d/%d)",
1180-
delay, modelInfo, attempt+1, maxAttempts)
1181-
11821214
select {
11831215
case <-ctx.Done():
11841216
return ctx.Err()
1185-
case <-time.After(delay):
1217+
case <-time.After(checkInterval):
11861218
}
11871219
}
1188-
1189-
return fmt.Errorf("timeout waiting for P2P availability for model %s after %d attempts", modelInfo, maxAttempts)
11901220
}
11911221

11921222
// downloadWithP2P orchestrates the model download with P2P support.
@@ -1240,7 +1270,7 @@ func (s *Gopher) downloadWithP2P(ctx context.Context, task *GopherTask, baseMode
12401270

12411271
// Lease held by another node - wait for P2P availability
12421272
s.logger.Infof("Lease held by another node for model %s, waiting for P2P availability", modelInfo)
1243-
if err := s.waitForP2PAvailability(ctx, modelHash, modelInfo); err == nil {
1273+
if err := s.waitForP2PAvailability(ctx, task, modelHash, modelInfo, leaseName); err == nil {
12441274
s.logger.Infof("Model %s now available via P2P", modelInfo)
12451275
return nil
12461276
} else {
@@ -1286,6 +1316,28 @@ func (s *Gopher) downloadWithLeaseHeld(ctx context.Context, task *GopherTask, ba
12861316
return nil
12871317
}
12881318

1319+
// isModelBeingDeleted checks if the model resource is being deleted (has deletionTimestamp).
1320+
// This is used to abort long-running operations early when the resource is deleted.
1321+
func (s *Gopher) isModelBeingDeleted(task *GopherTask) bool {
1322+
if task.BaseModel != nil {
1323+
bm, err := s.baseModelLister.BaseModels(task.BaseModel.Namespace).Get(task.BaseModel.Name)
1324+
if err != nil {
1325+
// If we can't get the resource, it might be deleted
1326+
return true
1327+
}
1328+
return !bm.ObjectMeta.DeletionTimestamp.IsZero()
1329+
}
1330+
if task.ClusterBaseModel != nil {
1331+
cbm, err := s.clusterBaseModelLister.Get(task.ClusterBaseModel.Name)
1332+
if err != nil {
1333+
// If we can't get the resource, it might be deleted
1334+
return true
1335+
}
1336+
return !cbm.ObjectMeta.DeletionTimestamp.IsZero()
1337+
}
1338+
return false
1339+
}
1340+
12891341
// startSeeding begins seeding the model to peers. Errors are logged but not returned
12901342
// since seeding failure shouldn't fail the overall download operation.
12911343
func (s *Gopher) startSeeding(destPath, modelHash, modelInfo string) {

0 commit comments

Comments
 (0)