Skip to content

Commit f2b620d

Browse files
committed
[P2P] Convert env vars to CLI flags and fix pod restart recovery
- Add --p2p-* CLI flags to model-agent (replaces P2P_* env vars) - Fix pod restart: check if model exists on disk before P2P/lease logic - Update daemonset to use CLI args instead of env vars - Increase default rate limits from 500 MB/s to 2 GB/s - Remove unused ConfigFromEnv() and P2P env var constants - Update README documentation
1 parent 1bcb721 commit f2b620d

19 files changed

Lines changed: 1257 additions & 514 deletions

cmd/model-agent/main.go

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,25 @@ func init() {
8585
rootCmd.PersistentFlags().StringVar(&cfg.namespace, "namespace", "ome", "Kubernetes namespace to use")
8686
rootCmd.PersistentFlags().StringVar(&cfg.logLevel, "log-level", "info", "Log level (debug, info, warn, error)")
8787

88+
// P2P distribution flags
89+
rootCmd.PersistentFlags().BoolVar(&cfg.p2pEnabled, "p2p-enabled", false, "Enable P2P model distribution")
90+
rootCmd.PersistentFlags().StringVar(&cfg.p2pPeersService, "p2p-peers-service", "ome-peers.ome.svc.cluster.local", "Headless service DNS for P2P peer discovery")
91+
rootCmd.PersistentFlags().IntVar(&cfg.p2pTorrentPort, "p2p-torrent-port", 6881, "BitTorrent peer port")
92+
rootCmd.PersistentFlags().IntVar(&cfg.p2pMetainfoPort, "p2p-metainfo-port", 8081, "HTTP port for metainfo sharing")
93+
rootCmd.PersistentFlags().Int64Var(&cfg.p2pMaxDownloadRate, "p2p-max-download-rate", 2147483648, "Max P2P download rate in bytes/s (default 2 GB/s)")
94+
rootCmd.PersistentFlags().Int64Var(&cfg.p2pMaxUploadRate, "p2p-max-upload-rate", 2147483648, "Max P2P upload rate in bytes/s (default 2 GB/s)")
95+
rootCmd.PersistentFlags().BoolVar(&cfg.p2pEnableEncryption, "p2p-enable-encryption", false, "Enable BitTorrent header obfuscation")
96+
rootCmd.PersistentFlags().BoolVar(&cfg.p2pRequireEncryption, "p2p-require-encryption", false, "Require encryption for all P2P peers")
97+
rootCmd.PersistentFlags().DurationVar(&cfg.p2pDownloadTimeout, "p2p-download-timeout", time.Hour, "Timeout for P2P downloads")
98+
8899
_ = v.BindPFlags(rootCmd.PersistentFlags())
89100
v.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
90101
v.AutomaticEnv()
91102
}
92103

93104
// initConfig validates required environment variables
94105
func initConfig(_ *cobra.Command, _ []string) {
106+
// NODE_NAME comes from Kubernetes downward API, must be set as env var
95107
nodeName, ok := os.LookupEnv("NODE_NAME")
96108
if !ok {
97109
panic("NODE_NAME environment variable is not set for model-agent")
@@ -100,40 +112,6 @@ func initConfig(_ *cobra.Command, _ []string) {
100112
panic("NODE_NAME environment variable is empty")
101113
}
102114
cfg.nodeName = nodeName
103-
104-
// P2P configuration from environment variables
105-
cfg.p2pEnabled = os.Getenv("P2P_ENABLED") == "true"
106-
cfg.p2pPeersService = os.Getenv("PEERS_SERVICE")
107-
if cfg.p2pPeersService == "" {
108-
cfg.p2pPeersService = "ome-peers.ome.svc.cluster.local"
109-
}
110-
cfg.p2pTorrentPort = getEnvInt("P2P_TORRENT_PORT", 6881)
111-
cfg.p2pMetainfoPort = getEnvInt("P2P_METAINFO_PORT", 8081)
112-
cfg.p2pMaxDownloadRate = getEnvInt64("P2P_MAX_DOWNLOAD_RATE", 524288000) // 500 MB/s
113-
cfg.p2pMaxUploadRate = getEnvInt64("P2P_MAX_UPLOAD_RATE", 524288000) // 500 MB/s
114-
cfg.p2pEnableEncryption = os.Getenv("P2P_ENCRYPTION_ENABLED") == "true"
115-
cfg.p2pRequireEncryption = os.Getenv("P2P_ENCRYPTION_REQUIRED") == "true"
116-
cfg.p2pDownloadTimeout = time.Duration(getEnvInt("P2P_DOWNLOAD_TIMEOUT", 3600)) * time.Second // 1 hour default
117-
}
118-
119-
// getEnvInt reads an integer from environment variable with a default value
120-
func getEnvInt(key string, defaultVal int) int {
121-
if val := os.Getenv(key); val != "" {
122-
if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 {
123-
return defaultVal
124-
}
125-
}
126-
return defaultVal
127-
}
128-
129-
// getEnvInt64 reads an int64 from environment variable with a default value
130-
func getEnvInt64(key string, defaultVal int64) int64 {
131-
if val := os.Getenv(key); val != "" {
132-
if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 {
133-
return defaultVal
134-
}
135-
}
136-
return defaultVal
137115
}
138116

139117
// initializeLogger creates and configures a zap logger with the specified settings
Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,27 @@ kubectl apply -f model-agent-daemonset.yaml
5656

5757
## Configuration
5858

59-
Environment variables for P2P configuration:
60-
61-
| Variable | Default | Description |
62-
|----------|---------|-------------|
63-
| `P2P_ENABLED` | `true` | Enable/disable P2P distribution |
64-
| `PEERS_SERVICE` | `ome-peers.ome.svc.cluster.local` | Headless service DNS for peer discovery |
65-
| `P2P_TORRENT_PORT` | `6881` | BitTorrent peer port |
66-
| `P2P_METAINFO_PORT` | `8081` | HTTP port for metainfo sharing |
67-
| `P2P_MAX_DOWNLOAD_RATE` | `524288000` | Max download rate (bytes/s) |
68-
| `P2P_MAX_UPLOAD_RATE` | `524288000` | Max upload rate (bytes/s) |
69-
| `P2P_ENCRYPTION_ENABLED` | `false` | Enable BitTorrent encryption |
70-
| `P2P_ENCRYPTION_REQUIRED` | `false` | Require encryption for all peers |
59+
CLI flags for P2P configuration (passed via `args` in the DaemonSet):
60+
61+
| Flag | Default | Description |
62+
|------|---------|-------------|
63+
| `--p2p-enabled` | `false` | Enable/disable P2P distribution |
64+
| `--p2p-peers-service` | `ome-peers.ome.svc.cluster.local` | Headless service DNS for peer discovery |
65+
| `--p2p-torrent-port` | `6881` | BitTorrent peer port |
66+
| `--p2p-metainfo-port` | `8081` | HTTP port for metainfo sharing |
67+
| `--p2p-max-download-rate` | `2147483648` | Max download rate (bytes/s, default 2 GB/s) |
68+
| `--p2p-max-upload-rate` | `2147483648` | Max upload rate (bytes/s, default 2 GB/s) |
69+
| `--p2p-enable-encryption` | `false` | Enable BitTorrent header obfuscation |
70+
| `--p2p-require-encryption` | `false` | Require encryption for all peers |
71+
| `--p2p-download-timeout` | `1h` | Timeout for P2P downloads |
72+
73+
Environment variables (from Kubernetes downward API, required):
74+
75+
| Variable | Description |
76+
|----------|-------------|
77+
| `NODE_NAME` | Node name from `spec.nodeName` |
78+
| `POD_NAME` | Pod name from `metadata.name` |
79+
| `POD_IP` | Pod IP from `status.podIP` |
7180

7281
## How It Works
7382

config/model-agent/clusterrole.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,7 @@ rules:
2020
- apiGroups: [ "ome.io" ]
2121
resources: [ "clusterbasemodels" ]
2222
verbs: [ "get", "list", "watch", "patch", "update" ]
23+
# Leases for P2P download coordination
24+
- apiGroups: [ "coordination.k8s.io" ]
25+
resources: [ "leases" ]
26+
verbs: [ "get", "list", "create", "update", "patch", "delete" ]

config/model-agent/daemonset.yaml

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,24 +38,47 @@ spec:
3838
type: DirectoryOrCreate
3939
containers:
4040
- name: model-agent
41-
image: ghcr.io/moirai-internal/model-agent:v0.1.2
41+
image: fra.ocir.io/idqj093njucb/model-agent:49bdf63
4242
imagePullPolicy: Always
4343
ports:
4444
- name: metrics
4545
containerPort: 8080
4646
protocol: TCP
47+
- name: torrent
48+
containerPort: 6881
49+
protocol: TCP
50+
- name: metainfo
51+
containerPort: 8081
52+
protocol: TCP
4753
args:
48-
- --models-root-dir
49-
- /raid/models
50-
- --num-download-worker
51-
- '2'
52-
- --concurrency
53-
- '2'
54+
- --models-root-dir=/raid/models
55+
- --num-download-worker=2
56+
- --concurrency=2
57+
# P2P distribution flags
58+
- --p2p-enabled=true
59+
- --p2p-peers-service=ome-peers.ome.svc.cluster.local
60+
- --p2p-torrent-port=6881
61+
- --p2p-metainfo-port=8081
62+
- --p2p-max-download-rate=2147483648
63+
- --p2p-max-upload-rate=2147483648
64+
- --p2p-download-timeout=1h
5465
env:
5566
- name: NODE_NAME
5667
valueFrom:
5768
fieldRef:
5869
fieldPath: spec.nodeName
70+
- name: POD_NAME
71+
valueFrom:
72+
fieldRef:
73+
fieldPath: metadata.name
74+
- name: POD_NAMESPACE
75+
valueFrom:
76+
fieldRef:
77+
fieldPath: metadata.namespace
78+
- name: POD_IP
79+
valueFrom:
80+
fieldRef:
81+
fieldPath: status.podIP
5982
- name: INSTANCE_TYPE_MAP
6083
valueFrom:
6184
configMapKeyRef:
Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
1-
# P2P Headless Service for Peer Discovery
2-
# This headless service enables DNS-based peer discovery for P2P model distribution.
1+
# Headless Service for P2P Peer Discovery
2+
# Enables DNS-based peer discovery for P2P model distribution.
33
# Pods can look up other pods' IPs via DNS to establish BitTorrent connections.
44
apiVersion: v1
55
kind: Service
66
metadata:
77
name: ome-peers
88
namespace: ome
99
labels:
10-
app.kubernetes.io/name: ome-model-agent
11-
app.kubernetes.io/component: p2p-discovery
10+
app.kubernetes.io/component: "ome-model-agent-daemonset"
1211
spec:
1312
clusterIP: None # Headless service - enables DNS-based peer discovery
1413
selector:
15-
app.kubernetes.io/name: ome-model-agent
14+
app.kubernetes.io/component: "ome-model-agent-daemonset"
1615
ports:
1716
- port: 6881
1817
targetPort: 6881

config/model-agent/kustomization.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ resources:
77
- daemonset.yaml
88
- serviceaccount.yaml
99
- configmap.yaml
10+
- headless-service.yaml

config/p2p/model-agent-daemonset.yaml

Lines changed: 0 additions & 167 deletions
This file was deleted.

0 commit comments

Comments
 (0)