Skip to content

Commit cc6d875

Browse files
committed
[P2P] Convert env vars to CLI flags and fix pod restart recovery
- Add --p2p-* CLI flags to model-agent (replaces P2P_* env vars) - Fix pod restart: check if model exists on disk before P2P/lease logic - Update daemonset to use CLI args instead of env vars - Increase default rate limits from 500 MB/s to 2 GB/s - Remove unused ConfigFromEnv() and P2P env var constants - Update README documentation
1 parent 1bcb721 commit cc6d875

12 files changed

Lines changed: 163 additions & 206 deletions

File tree

cmd/model-agent/main.go

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,25 @@ func init() {
8585
rootCmd.PersistentFlags().StringVar(&cfg.namespace, "namespace", "ome", "Kubernetes namespace to use")
8686
rootCmd.PersistentFlags().StringVar(&cfg.logLevel, "log-level", "info", "Log level (debug, info, warn, error)")
8787

88+
// P2P distribution flags
89+
rootCmd.PersistentFlags().BoolVar(&cfg.p2pEnabled, "p2p-enabled", false, "Enable P2P model distribution")
90+
rootCmd.PersistentFlags().StringVar(&cfg.p2pPeersService, "p2p-peers-service", "ome-peers.ome.svc.cluster.local", "Headless service DNS for P2P peer discovery")
91+
rootCmd.PersistentFlags().IntVar(&cfg.p2pTorrentPort, "p2p-torrent-port", 6881, "BitTorrent peer port")
92+
rootCmd.PersistentFlags().IntVar(&cfg.p2pMetainfoPort, "p2p-metainfo-port", 8081, "HTTP port for metainfo sharing")
93+
rootCmd.PersistentFlags().Int64Var(&cfg.p2pMaxDownloadRate, "p2p-max-download-rate", 2147483648, "Max P2P download rate in bytes/s (default 2 GB/s)")
94+
rootCmd.PersistentFlags().Int64Var(&cfg.p2pMaxUploadRate, "p2p-max-upload-rate", 2147483648, "Max P2P upload rate in bytes/s (default 2 GB/s)")
95+
rootCmd.PersistentFlags().BoolVar(&cfg.p2pEnableEncryption, "p2p-enable-encryption", false, "Enable BitTorrent header obfuscation")
96+
rootCmd.PersistentFlags().BoolVar(&cfg.p2pRequireEncryption, "p2p-require-encryption", false, "Require encryption for all P2P peers")
97+
rootCmd.PersistentFlags().DurationVar(&cfg.p2pDownloadTimeout, "p2p-download-timeout", time.Hour, "Timeout for P2P downloads")
98+
8899
_ = v.BindPFlags(rootCmd.PersistentFlags())
89100
v.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
90101
v.AutomaticEnv()
91102
}
92103

93104
// initConfig validates required environment variables
94105
func initConfig(_ *cobra.Command, _ []string) {
106+
// NODE_NAME comes from Kubernetes downward API, must be set as env var
95107
nodeName, ok := os.LookupEnv("NODE_NAME")
96108
if !ok {
97109
panic("NODE_NAME environment variable is not set for model-agent")
@@ -100,40 +112,6 @@ func initConfig(_ *cobra.Command, _ []string) {
100112
panic("NODE_NAME environment variable is empty")
101113
}
102114
cfg.nodeName = nodeName
103-
104-
// P2P configuration from environment variables
105-
cfg.p2pEnabled = os.Getenv("P2P_ENABLED") == "true"
106-
cfg.p2pPeersService = os.Getenv("PEERS_SERVICE")
107-
if cfg.p2pPeersService == "" {
108-
cfg.p2pPeersService = "ome-peers.ome.svc.cluster.local"
109-
}
110-
cfg.p2pTorrentPort = getEnvInt("P2P_TORRENT_PORT", 6881)
111-
cfg.p2pMetainfoPort = getEnvInt("P2P_METAINFO_PORT", 8081)
112-
cfg.p2pMaxDownloadRate = getEnvInt64("P2P_MAX_DOWNLOAD_RATE", 524288000) // 500 MB/s
113-
cfg.p2pMaxUploadRate = getEnvInt64("P2P_MAX_UPLOAD_RATE", 524288000) // 500 MB/s
114-
cfg.p2pEnableEncryption = os.Getenv("P2P_ENCRYPTION_ENABLED") == "true"
115-
cfg.p2pRequireEncryption = os.Getenv("P2P_ENCRYPTION_REQUIRED") == "true"
116-
cfg.p2pDownloadTimeout = time.Duration(getEnvInt("P2P_DOWNLOAD_TIMEOUT", 3600)) * time.Second // 1 hour default
117-
}
118-
119-
// getEnvInt reads an integer from environment variable with a default value
120-
func getEnvInt(key string, defaultVal int) int {
121-
if val := os.Getenv(key); val != "" {
122-
if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 {
123-
return defaultVal
124-
}
125-
}
126-
return defaultVal
127-
}
128-
129-
// getEnvInt64 reads an int64 from environment variable with a default value
130-
func getEnvInt64(key string, defaultVal int64) int64 {
131-
if val := os.Getenv(key); val != "" {
132-
if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 {
133-
return defaultVal
134-
}
135-
}
136-
return defaultVal
137115
}
138116

139117
// initializeLogger creates and configures a zap logger with the specified settings

config/p2p/README.md

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,27 @@ kubectl apply -f model-agent-daemonset.yaml
5656

5757
## Configuration
5858

59-
Environment variables for P2P configuration:
60-
61-
| Variable | Default | Description |
62-
|----------|---------|-------------|
63-
| `P2P_ENABLED` | `true` | Enable/disable P2P distribution |
64-
| `PEERS_SERVICE` | `ome-peers.ome.svc.cluster.local` | Headless service DNS for peer discovery |
65-
| `P2P_TORRENT_PORT` | `6881` | BitTorrent peer port |
66-
| `P2P_METAINFO_PORT` | `8081` | HTTP port for metainfo sharing |
67-
| `P2P_MAX_DOWNLOAD_RATE` | `524288000` | Max download rate (bytes/s) |
68-
| `P2P_MAX_UPLOAD_RATE` | `524288000` | Max upload rate (bytes/s) |
69-
| `P2P_ENCRYPTION_ENABLED` | `false` | Enable BitTorrent encryption |
70-
| `P2P_ENCRYPTION_REQUIRED` | `false` | Require encryption for all peers |
59+
CLI flags for P2P configuration (passed via `args` in the DaemonSet):
60+
61+
| Flag | Default | Description |
62+
|------|---------|-------------|
63+
| `--p2p-enabled` | `false` | Enable/disable P2P distribution |
64+
| `--p2p-peers-service` | `ome-peers.ome.svc.cluster.local` | Headless service DNS for peer discovery |
65+
| `--p2p-torrent-port` | `6881` | BitTorrent peer port |
66+
| `--p2p-metainfo-port` | `8081` | HTTP port for metainfo sharing |
67+
| `--p2p-max-download-rate` | `2147483648` | Max download rate (bytes/s, default 2 GB/s) |
68+
| `--p2p-max-upload-rate` | `2147483648` | Max upload rate (bytes/s, default 2 GB/s) |
69+
| `--p2p-enable-encryption` | `false` | Enable BitTorrent header obfuscation |
70+
| `--p2p-require-encryption` | `false` | Require encryption for all peers |
71+
| `--p2p-download-timeout` | `1h` | Timeout for P2P downloads |
72+
73+
Environment variables (from Kubernetes downward API, required):
74+
75+
| Variable | Description |
76+
|----------|-------------|
77+
| `NODE_NAME` | Node name from `spec.nodeName` |
78+
| `POD_NAME` | Pod name from `metadata.name` |
79+
| `POD_IP` | Pod IP from `status.podIP` |
7180

7281
## How It Works
7382

config/p2p/model-agent-daemonset.yaml

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,18 @@ spec:
3333
- name: model-agent
3434
image: ome-model-agent:latest
3535
imagePullPolicy: IfNotPresent
36+
args:
37+
# P2P Configuration (via CLI flags)
38+
- --p2p-enabled=true
39+
- --p2p-peers-service=ome-peers.ome.svc.cluster.local
40+
- --p2p-torrent-port=6881
41+
- --p2p-metainfo-port=8081
42+
- --p2p-max-download-rate=2147483648 # 2 GB/s
43+
- --p2p-max-upload-rate=2147483648 # 2 GB/s
44+
- --p2p-download-timeout=1h
45+
# Model storage
46+
- --models-root-dir=/mnt/models
47+
- --namespace=ome
3648
ports:
3749
- containerPort: 8080
3850
name: http
@@ -44,7 +56,7 @@ spec:
4456
name: metainfo
4557
protocol: TCP
4658
env:
47-
# Node identification
59+
# Node identification (from Kubernetes downward API)
4860
- name: NODE_NAME
4961
valueFrom:
5062
fieldRef:
@@ -61,22 +73,6 @@ spec:
6173
valueFrom:
6274
fieldRef:
6375
fieldPath: status.podIP
64-
# P2P Configuration
65-
- name: P2P_ENABLED
66-
value: "true"
67-
- name: PEERS_SERVICE
68-
value: "ome-peers.ome.svc.cluster.local"
69-
- name: P2P_TORRENT_PORT
70-
value: "6881"
71-
- name: P2P_METAINFO_PORT
72-
value: "8081"
73-
- name: P2P_MAX_DOWNLOAD_RATE
74-
value: "524288000" # 500 MB/s
75-
- name: P2P_MAX_UPLOAD_RATE
76-
value: "524288000" # 500 MB/s
77-
# Model storage
78-
- name: MODEL_DIR
79-
value: "/mnt/models"
8076
volumeMounts:
8177
- name: models
8278
mountPath: /mnt/models

pkg/constants/constants.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -390,18 +390,6 @@ const (
390390
P2PMaxWaitTimeMinutes = 240 // Maximum absolute wait time (4 hours) for very large models
391391
)
392392

393-
// P2P environment variable keys
394-
var (
395-
P2PEnabledEnvVar = "P2P_ENABLED"
396-
P2PPeersServiceEnvVar = "PEERS_SERVICE"
397-
P2PTorrentPortEnvVar = "P2P_TORRENT_PORT"
398-
P2PMetainfoPortEnvVar = "P2P_METAINFO_PORT"
399-
P2PMaxDownloadRateEnvVar = "P2P_MAX_DOWNLOAD_RATE"
400-
P2PMaxUploadRateEnvVar = "P2P_MAX_UPLOAD_RATE"
401-
P2PEncryptionEnabledEnvVar = "P2P_ENCRYPTION_ENABLED"
402-
P2PEncryptionRequiredEnvVar = "P2P_ENCRYPTION_REQUIRED"
403-
)
404-
405393
// Serving Container Block Lists
406394
const (
407395
BlocklistConfigMapVolumeName = "configmap-blocklist-volume"

pkg/distributor/config.go

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package distributor
22

33
import (
44
"fmt"
5-
"os"
65
)
76

87
// Config holds the configuration for the P2P model distributor.
@@ -65,8 +64,8 @@ func DefaultConfig() Config {
6564
Namespace: "ome",
6665
TorrentPort: 6881,
6766
MetainfoPort: 8081,
68-
MaxDownloadRate: 500 * 1024 * 1024, // 500 MB/s
69-
MaxUploadRate: 500 * 1024 * 1024, // 500 MB/s
67+
MaxDownloadRate: 2 * 1024 * 1024 * 1024, // 2 GB/s
68+
MaxUploadRate: 2 * 1024 * 1024 * 1024, // 2 GB/s
7069
EnableEncryption: false,
7170
RequireEncryption: false,
7271
LeaseDurationSeconds: 120, // 2 minutes
@@ -76,38 +75,6 @@ func DefaultConfig() Config {
7675
}
7776
}
7877

79-
// ConfigFromEnv creates a Config from environment variables with defaults.
80-
func ConfigFromEnv() Config {
81-
cfg := DefaultConfig()
82-
83-
if v := os.Getenv("MODEL_DIR"); v != "" {
84-
cfg.DataDir = v
85-
}
86-
if v := os.Getenv("POD_NAMESPACE"); v != "" {
87-
cfg.Namespace = v
88-
}
89-
if v := os.Getenv("POD_NAME"); v != "" {
90-
cfg.PodName = v
91-
}
92-
if v := os.Getenv("POD_IP"); v != "" {
93-
cfg.PodIP = v
94-
}
95-
if v := os.Getenv("PEERS_SERVICE"); v != "" {
96-
cfg.PeersService = v
97-
}
98-
if v := os.Getenv("P2P_ENABLED"); v == "false" {
99-
cfg.EnableP2P = false
100-
}
101-
if v := os.Getenv("P2P_ENCRYPTION_ENABLED"); v == "true" {
102-
cfg.EnableEncryption = true
103-
}
104-
if v := os.Getenv("P2P_ENCRYPTION_REQUIRED"); v == "true" {
105-
cfg.RequireEncryption = true
106-
}
107-
108-
return cfg
109-
}
110-
11178
// Validate checks if the configuration is valid.
11279
func (c *Config) Validate() error {
11380
if c.DataDir == "" {

pkg/distributor/distributor.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (d *ModelDistributor) TryP2PDownload(ctx context.Context, modelHash, destPa
136136
return fmt.Errorf("no peers available: %v", err)
137137
}
138138

139-
d.logger.Infof("Discovered %d peers for model %s", len(peers), modelHash)
139+
d.logger.Debugf("Discovered %d peers for model %s", len(peers), modelHash)
140140
d.metrics.RecordPeersDiscovered(modelHash, len(peers))
141141

142142
// Try to get metainfo from a peer
@@ -162,7 +162,7 @@ func (d *ModelDistributor) TryP2PDownload(ctx context.Context, modelHash, destPa
162162
peerInfos[i] = p
163163
}
164164
t.AddPeers(peerInfos)
165-
d.logger.Infof("Added %d peers for model %s, starting download", len(peers), modelHash)
165+
d.logger.Debugf("Added %d peers for model %s, starting download", len(peers), modelHash)
166166

167167
t.DownloadAll()
168168
if !d.waitForComplete(ctx, t) {
@@ -185,7 +185,7 @@ func (d *ModelDistributor) TryP2PDownload(ctx context.Context, modelHash, destPa
185185
t.Drop()
186186
return fmt.Errorf("failed to move downloaded files to destination: %w", err)
187187
}
188-
d.logger.Infof("Moved downloaded model from %s to %s", downloadPath, destPath)
188+
d.logger.Debugf("Moved downloaded model from %s to %s", downloadPath, destPath)
189189

190190
// Create symlink from hash path to destination for continued seeding
191191
if err := os.Symlink(destPath, downloadPath); err != nil {
@@ -263,7 +263,7 @@ func (d *ModelDistributor) StopSeeding(modelHash string) {
263263
if t, exists := d.activeTorrents[modelHash]; exists {
264264
t.Drop()
265265
delete(d.activeTorrents, modelHash)
266-
d.logger.Infof("Stopped seeding model %s", modelHash)
266+
d.logger.Debugf("Stopped seeding model %s", modelHash)
267267
}
268268
}
269269

@@ -414,7 +414,7 @@ func (d *ModelDistributor) fetchMetainfoFromPeer(ctx context.Context, peers []to
414414
continue
415415
}
416416

417-
d.logger.Infof("Fetched metainfo for %s from peer %s", modelHash, peer.Addr)
417+
d.logger.Debugf("Fetched metainfo for %s from peer %s", modelHash, peer.Addr)
418418
return mi, nil
419419
}
420420

0 commit comments

Comments
 (0)