Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions cmd/model-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

omev1beta1client "github.com/sgl-project/ome/pkg/client/clientset/versioned"
omev1beta1informers "github.com/sgl-project/ome/pkg/client/informers/externalversions"
"github.com/sgl-project/ome/pkg/distributor"
"github.com/sgl-project/ome/pkg/logging"
"github.com/sgl-project/ome/pkg/modelagent"
"github.com/sgl-project/ome/pkg/version"
Expand All @@ -43,6 +44,16 @@ type config struct {
numDownloadWorker int
namespace string
logLevel string
// P2P configuration
p2pEnabled bool
p2pPeersService string
p2pTorrentPort int
p2pMetainfoPort int
p2pMaxDownloadRate int64
p2pMaxUploadRate int64
p2pEnableEncryption bool
p2pRequireEncryption bool
p2pDownloadTimeout time.Duration
}

// Logger type alias for zap.SugaredLogger
Expand Down Expand Up @@ -89,6 +100,40 @@ func initConfig(_ *cobra.Command, _ []string) {
panic("NODE_NAME environment variable is empty")
}
cfg.nodeName = nodeName

// P2P configuration from environment variables
cfg.p2pEnabled = os.Getenv("P2P_ENABLED") == "true"
cfg.p2pPeersService = os.Getenv("PEERS_SERVICE")
if cfg.p2pPeersService == "" {
cfg.p2pPeersService = "ome-peers.ome.svc.cluster.local"
}
cfg.p2pTorrentPort = getEnvInt("P2P_TORRENT_PORT", 6881)
cfg.p2pMetainfoPort = getEnvInt("P2P_METAINFO_PORT", 8081)
cfg.p2pMaxDownloadRate = getEnvInt64("P2P_MAX_DOWNLOAD_RATE", 524288000) // 500 MB/s
cfg.p2pMaxUploadRate = getEnvInt64("P2P_MAX_UPLOAD_RATE", 524288000) // 500 MB/s
cfg.p2pEnableEncryption = os.Getenv("P2P_ENCRYPTION_ENABLED") == "true"
cfg.p2pRequireEncryption = os.Getenv("P2P_ENCRYPTION_REQUIRED") == "true"
cfg.p2pDownloadTimeout = time.Duration(getEnvInt("P2P_DOWNLOAD_TIMEOUT", 3600)) * time.Second // 1 hour default
}

// getEnvInt reads an integer from environment variable with a default value
func getEnvInt(key string, defaultVal int) int {
if val := os.Getenv(key); val != "" {
if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 {
return defaultVal
}
}
return defaultVal
}
Comment on lines +120 to +127
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The use of fmt.Sscanf here can be brittle. For example, if the environment variable is "123foo", Sscanf will successfully parse 123 and return no error, which might not be the desired behavior. Using strconv.Atoi is more idiomatic and provides stricter parsing. It would also be clearer to parse into a new variable rather than overwriting defaultVal. This feedback also applies to getEnvInt64.

Suggested change
func getEnvInt(key string, defaultVal int) int {
if val := os.Getenv(key); val != "" {
if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 {
return defaultVal
}
}
return defaultVal
}
func getEnvInt(key string, defaultVal int) int {
if valStr := os.Getenv(key); valStr != "" {
if i, err := strconv.Atoi(valStr); err == nil {
return i
}
}
return defaultVal
}


// getEnvInt64 reads an int64 from environment variable with a default value
func getEnvInt64(key string, defaultVal int64) int64 {
if val := os.Getenv(key); val != "" {
if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 {
return defaultVal
}
}
return defaultVal
}
Comment on lines +130 to 137
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to getEnvInt, using strconv.ParseInt would be more robust and idiomatic for parsing the int64 value from the environment variable compared to fmt.Sscanf.

Suggested change
func getEnvInt64(key string, defaultVal int64) int64 {
if val := os.Getenv(key); val != "" {
if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 {
return defaultVal
}
}
return defaultVal
}
func getEnvInt64(key string, defaultVal int64) int64 {
if valStr := os.Getenv(key); valStr != "" {
if i, err := strconv.ParseInt(valStr, 10, 64); err == nil {
return i
}
}
return defaultVal
}


// initializeLogger creates and configures a zap logger with the specified settings
Expand Down Expand Up @@ -193,6 +238,7 @@ func initializeComponents(
omeInformerFactory omev1beta1informers.SharedInformerFactory,
metrics *modelagent.Metrics,
gopherTaskChan chan *modelagent.GopherTask,
deleteTaskChan chan *modelagent.GopherTask,
logger *Logger,
) (*modelagent.Scout, *modelagent.Gopher, error) {
// Create node label reconciler for labeling the node based on model status
Expand All @@ -218,6 +264,7 @@ func initializeComponents(
clusterBaseModelInformer,
omeInformerFactory,
gopherTaskChan,
deleteTaskChan,
kubeClient,
logger)
if err != nil {
Expand Down Expand Up @@ -263,6 +310,7 @@ func initializeComponents(
cfg.downloadRetry,
cfg.modelsRootDir,
gopherTaskChan,
deleteTaskChan,
nodeLabelReconciler,
metrics,
logger,
Expand Down Expand Up @@ -319,6 +367,8 @@ func runCommand(cmd *cobra.Command, args []string) {

// Create a download task communication channel
gopherTaskChan := make(chan *modelagent.GopherTask)
// Create a dedicated delete task channel for immediate deletion processing
deleteTaskChan := make(chan *modelagent.GopherTask)

// Initialize components
scout, gopher, err := initializeComponents(
Expand All @@ -328,12 +378,82 @@ func runCommand(cmd *cobra.Command, args []string) {
omeInformerFactory,
metrics,
gopherTaskChan,
deleteTaskChan,
logger,
)
if err != nil {
logger.Fatalf("Failed to initialize components: %v", err)
}

// Initialize P2P distribution if enabled
var p2pDistributor *distributor.ModelDistributor
var metainfoServer *distributor.MetainfoServer
if cfg.p2pEnabled {
logger.Info("P2P model distribution is enabled, initializing...")

// Get POD_NAME and POD_IP for P2P peer identification
podName := os.Getenv("POD_NAME")
if podName == "" {
logger.Warn("POD_NAME not set, P2P coordination may not work correctly")
}
podIP := os.Getenv("POD_IP")
if podIP == "" {
logger.Warn("POD_IP not set, P2P peer discovery may not work correctly")
}

// Create distributor configuration
distCfg := distributor.Config{
DataDir: cfg.modelsRootDir,
PodName: podName,
PodIP: podIP,
PeersService: cfg.p2pPeersService,
TorrentPort: cfg.p2pTorrentPort,
MetainfoPort: cfg.p2pMetainfoPort,
MaxDownloadRate: cfg.p2pMaxDownloadRate,
MaxUploadRate: cfg.p2pMaxUploadRate,
EnableEncryption: cfg.p2pEnableEncryption,
RequireEncryption: cfg.p2pRequireEncryption,
Namespace: cfg.namespace,
LeaseDurationSeconds: 120, // 2 minutes
LeaseRenewIntervalSeconds: 30, // renew every 30 seconds
P2PTimeoutSeconds: int(cfg.p2pDownloadTimeout.Seconds()),
EnableP2P: true,
}

// Create the P2P distributor
p2pDistributor, err = distributor.New(distCfg, logger)
if err != nil {
logger.Errorf("Failed to create P2P distributor: %v", err)
logger.Warn("Continuing without P2P support")
} else {
// Create lease manager for P2P coordination
leaseManager := modelagent.NewP2PLeaseManager(kubeClient, cfg.namespace, cfg.nodeName, logger)

// Enable P2P on the gopher
gopher.EnableP2P(p2pDistributor, leaseManager)
gopher.SetP2PTimeout(cfg.p2pDownloadTimeout)

// Create and start metainfo server
metainfoServer = distributor.NewMetainfoServer(
cfg.modelsRootDir,
cfg.p2pMetainfoPort,
p2pDistributor,
logger,
)

go func() {
logger.Infof("Starting P2P metainfo server on port %d", cfg.p2pMetainfoPort)
if err := metainfoServer.ServeWithContext(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Errorf("Metainfo server error: %v", err)
}
}()

logger.Info("P2P model distribution initialized successfully")
}
} else {
logger.Info("P2P model distribution is disabled")
}
Comment on lines +389 to +455
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of code for P2P initialization is quite long and makes the runCommand function harder to read. Consider extracting this logic into a separate helper function, for example initializeP2PComponents, to improve modularity and readability.


// Set up a health check server
server := setupServer(cfg.port, cfg.modelsRootDir, logger)
go func() {
Expand All @@ -350,6 +470,13 @@ func runCommand(cmd *cobra.Command, args []string) {
if err := scout.Run(stopCh); err != nil {
logger.Fatalf("Error running scout: %v", err)
}

// Cleanup P2P resources on shutdown
if p2pDistributor != nil {
logger.Info("Shutting down P2P distributor...")
p2pDistributor.Close()
}
_ = metainfoServer // Suppress unused warning - shutdown handled via context
}

// createKubeClient creates a Kubernetes client from the provided config
Expand Down
143 changes: 143 additions & 0 deletions config/p2p/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# P2P Model Distribution

This directory contains Kubernetes resources for P2P-enabled model distribution in OME.

## Overview

P2P model distribution uses BitTorrent protocol to efficiently distribute large model files across cluster nodes. Instead of each node downloading from HuggingFace simultaneously (which causes rate limiting), the first node downloads and seeds to other nodes.

## Architecture

```
┌─────────────────────────────────────────────────────────────┐
│ Cluster │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Node 1 │◄──►│ Node 2 │◄──►│ Node 3 │◄──►│ Node N │ │
│ │ (seed) │ │ (leech) │ │ (leech) │ │ (leech) │ │
│ └────┬────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │ ▲ ▲ ▲ │
│ │ └──────────────┴──────────────┘ │
│ │ P2P via BitTorrent │
│ ▼ │
│ ┌─────────┐ │
│ │ HF │ (only first node, coordinated via Lease) │
│ └─────────┘ │
│ │
│ Peer Discovery: Headless Service DNS │
│ Coordination: Kubernetes Lease │
│ Storage: hostPath (enables resume) │
└─────────────────────────────────────────────────────────────┘
```

## Components

### Headless Service (`headless-service.yaml`)

Enables DNS-based peer discovery. Pods can lookup other pods' IPs to establish BitTorrent connections.

### Model Agent DaemonSet (`model-agent-daemonset.yaml`)

Deploys the model-agent with P2P capabilities on each node. Includes:
- BitTorrent client (port 6881)
- Metainfo HTTP server (port 8081)
- Health checks and Prometheus metrics
- RBAC for leases, nodes, and model resources

## Installation

```bash
# Deploy the headless service
kubectl apply -f headless-service.yaml

# Deploy the DaemonSet with RBAC
kubectl apply -f model-agent-daemonset.yaml
```

## Configuration

Environment variables for P2P configuration:

| Variable | Default | Description |
|----------|---------|-------------|
| `P2P_ENABLED` | `true` | Enable/disable P2P distribution |
| `PEERS_SERVICE` | `ome-peers.ome.svc.cluster.local` | Headless service DNS for peer discovery |
| `P2P_TORRENT_PORT` | `6881` | BitTorrent peer port |
| `P2P_METAINFO_PORT` | `8081` | HTTP port for metainfo sharing |
| `P2P_MAX_DOWNLOAD_RATE` | `524288000` | Max download rate (bytes/s) |
| `P2P_MAX_UPLOAD_RATE` | `524288000` | Max upload rate (bytes/s) |
| `P2P_ENCRYPTION_ENABLED` | `false` | Enable BitTorrent encryption |
| `P2P_ENCRYPTION_REQUIRED` | `false` | Require encryption for all peers |

## How It Works

1. **Pod starts**: Model-agent initializes P2P distributor
2. **Model request**: Scout detects new BaseModel/ClusterBaseModel
3. **Check local**: If model exists on hostPath, seed it
4. **Try P2P**: Query peers for model via metainfo HTTP
5. **Lease coordination**: If no peers have it, try to acquire lease
6. **HF download**: Lease holder downloads from HuggingFace
7. **Seed**: Downloaded model is seeded to other nodes
8. **Complete**: All nodes have the model

## Performance

| Nodes | HF Direct (parallel) | BitTorrent |
|-------|---------------------|------------|
| 1 | 20-40 min | 20-40 min (same) |
| 10 | Throttled/fails | ~5-8 min |
| 100 | Throttled/fails | ~5-10 min |

## Monitoring

Prometheus metrics are exposed on port 8080:

- `ome_p2p_download_total` - Total downloads by source (p2p, hf, local)
- `ome_p2p_download_duration_seconds` - Download duration histogram
- `ome_p2p_peers_discovered` - Number of peers found via DNS
- `ome_p2p_seeding_torrents` - Number of models being seeded
- `ome_p2p_bytes_uploaded_total` - Total bytes uploaded to peers
- `ome_p2p_leases_acquired_total` - Number of leases acquired

## Troubleshooting

### P2P not working

1. Check headless service exists:
```bash
kubectl get svc ome-peers -n ome
```

2. Verify DNS resolution:
```bash
kubectl exec -it <pod> -- nslookup ome-peers.ome.svc.cluster.local
```

3. Check P2P ports are accessible:
```bash
kubectl exec -it <pod> -- nc -zv <peer-ip> 6881
```

### Lease stuck

1. Check lease status:
```bash
kubectl get leases -n ome -l ome.io/type=model-download
```

2. Delete stuck lease:
```bash
kubectl delete lease ome-model-<hash> -n ome
```

### Rate limiting still occurring

1. Verify P2P is enabled:
```bash
kubectl logs <pod> | grep "P2P"
```

2. Check only one node is downloading:
```bash
kubectl get leases -n ome -o yaml
```
24 changes: 24 additions & 0 deletions config/p2p/headless-service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# P2P Headless Service for Peer Discovery
# This headless service enables DNS-based peer discovery for P2P model distribution.
# Pods can look up other pods' IPs via DNS to establish BitTorrent connections.
apiVersion: v1
kind: Service
metadata:
name: ome-peers
namespace: ome
labels:
app.kubernetes.io/name: ome-model-agent
app.kubernetes.io/component: p2p-discovery
spec:
clusterIP: None # Headless service - enables DNS-based peer discovery
selector:
app.kubernetes.io/name: ome-model-agent
ports:
- port: 6881
targetPort: 6881
name: torrent
protocol: TCP
- port: 8081
targetPort: 8081
name: metainfo
protocol: TCP
Loading
Loading