-
Notifications
You must be signed in to change notification settings - Fork 79
fix(p2p): add dedicated delete channel and skip redundant downloads #482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a628203
d8b76ad
1ffecdf
ed82069
7f9b762
1469a8d
edd3111
565ed32
79ddf68
1bcb721
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -23,6 +23,7 @@ import ( | |||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| omev1beta1client "github.com/sgl-project/ome/pkg/client/clientset/versioned" | ||||||||||||||||||||||||||||||||||
| omev1beta1informers "github.com/sgl-project/ome/pkg/client/informers/externalversions" | ||||||||||||||||||||||||||||||||||
| "github.com/sgl-project/ome/pkg/distributor" | ||||||||||||||||||||||||||||||||||
| "github.com/sgl-project/ome/pkg/logging" | ||||||||||||||||||||||||||||||||||
| "github.com/sgl-project/ome/pkg/modelagent" | ||||||||||||||||||||||||||||||||||
| "github.com/sgl-project/ome/pkg/version" | ||||||||||||||||||||||||||||||||||
|
|
@@ -43,6 +44,16 @@ type config struct { | |||||||||||||||||||||||||||||||||
| numDownloadWorker int | ||||||||||||||||||||||||||||||||||
| namespace string | ||||||||||||||||||||||||||||||||||
| logLevel string | ||||||||||||||||||||||||||||||||||
| // P2P configuration | ||||||||||||||||||||||||||||||||||
| p2pEnabled bool | ||||||||||||||||||||||||||||||||||
| p2pPeersService string | ||||||||||||||||||||||||||||||||||
| p2pTorrentPort int | ||||||||||||||||||||||||||||||||||
| p2pMetainfoPort int | ||||||||||||||||||||||||||||||||||
| p2pMaxDownloadRate int64 | ||||||||||||||||||||||||||||||||||
| p2pMaxUploadRate int64 | ||||||||||||||||||||||||||||||||||
| p2pEnableEncryption bool | ||||||||||||||||||||||||||||||||||
| p2pRequireEncryption bool | ||||||||||||||||||||||||||||||||||
| p2pDownloadTimeout time.Duration | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Logger type alias for zap.SugaredLogger | ||||||||||||||||||||||||||||||||||
|
|
@@ -89,6 +100,40 @@ func initConfig(_ *cobra.Command, _ []string) { | |||||||||||||||||||||||||||||||||
| panic("NODE_NAME environment variable is empty") | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| cfg.nodeName = nodeName | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // P2P configuration from environment variables | ||||||||||||||||||||||||||||||||||
| cfg.p2pEnabled = os.Getenv("P2P_ENABLED") == "true" | ||||||||||||||||||||||||||||||||||
| cfg.p2pPeersService = os.Getenv("PEERS_SERVICE") | ||||||||||||||||||||||||||||||||||
| if cfg.p2pPeersService == "" { | ||||||||||||||||||||||||||||||||||
| cfg.p2pPeersService = "ome-peers.ome.svc.cluster.local" | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| cfg.p2pTorrentPort = getEnvInt("P2P_TORRENT_PORT", 6881) | ||||||||||||||||||||||||||||||||||
| cfg.p2pMetainfoPort = getEnvInt("P2P_METAINFO_PORT", 8081) | ||||||||||||||||||||||||||||||||||
| cfg.p2pMaxDownloadRate = getEnvInt64("P2P_MAX_DOWNLOAD_RATE", 524288000) // 500 MB/s | ||||||||||||||||||||||||||||||||||
| cfg.p2pMaxUploadRate = getEnvInt64("P2P_MAX_UPLOAD_RATE", 524288000) // 500 MB/s | ||||||||||||||||||||||||||||||||||
| cfg.p2pEnableEncryption = os.Getenv("P2P_ENCRYPTION_ENABLED") == "true" | ||||||||||||||||||||||||||||||||||
| cfg.p2pRequireEncryption = os.Getenv("P2P_ENCRYPTION_REQUIRED") == "true" | ||||||||||||||||||||||||||||||||||
| cfg.p2pDownloadTimeout = time.Duration(getEnvInt("P2P_DOWNLOAD_TIMEOUT", 3600)) * time.Second // 1 hour default | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // getEnvInt reads an integer from environment variable with a default value | ||||||||||||||||||||||||||||||||||
| func getEnvInt(key string, defaultVal int) int { | ||||||||||||||||||||||||||||||||||
| if val := os.Getenv(key); val != "" { | ||||||||||||||||||||||||||||||||||
| if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 { | ||||||||||||||||||||||||||||||||||
| return defaultVal | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| return defaultVal | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // getEnvInt64 reads an int64 from environment variable with a default value | ||||||||||||||||||||||||||||||||||
| func getEnvInt64(key string, defaultVal int64) int64 { | ||||||||||||||||||||||||||||||||||
| if val := os.Getenv(key); val != "" { | ||||||||||||||||||||||||||||||||||
| if i, err := fmt.Sscanf(val, "%d", &defaultVal); err == nil && i == 1 { | ||||||||||||||||||||||||||||||||||
| return defaultVal | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| return defaultVal | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+130
to
137
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to
Suggested change
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // initializeLogger creates and configures a zap logger with the specified settings | ||||||||||||||||||||||||||||||||||
|
|
@@ -193,6 +238,7 @@ func initializeComponents( | |||||||||||||||||||||||||||||||||
| omeInformerFactory omev1beta1informers.SharedInformerFactory, | ||||||||||||||||||||||||||||||||||
| metrics *modelagent.Metrics, | ||||||||||||||||||||||||||||||||||
| gopherTaskChan chan *modelagent.GopherTask, | ||||||||||||||||||||||||||||||||||
| deleteTaskChan chan *modelagent.GopherTask, | ||||||||||||||||||||||||||||||||||
| logger *Logger, | ||||||||||||||||||||||||||||||||||
| ) (*modelagent.Scout, *modelagent.Gopher, error) { | ||||||||||||||||||||||||||||||||||
| // Create node label reconciler for labeling the node based on model status | ||||||||||||||||||||||||||||||||||
|
|
@@ -218,6 +264,7 @@ func initializeComponents( | |||||||||||||||||||||||||||||||||
| clusterBaseModelInformer, | ||||||||||||||||||||||||||||||||||
| omeInformerFactory, | ||||||||||||||||||||||||||||||||||
| gopherTaskChan, | ||||||||||||||||||||||||||||||||||
| deleteTaskChan, | ||||||||||||||||||||||||||||||||||
| kubeClient, | ||||||||||||||||||||||||||||||||||
| logger) | ||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||
|
|
@@ -263,6 +310,7 @@ func initializeComponents( | |||||||||||||||||||||||||||||||||
| cfg.downloadRetry, | ||||||||||||||||||||||||||||||||||
| cfg.modelsRootDir, | ||||||||||||||||||||||||||||||||||
| gopherTaskChan, | ||||||||||||||||||||||||||||||||||
| deleteTaskChan, | ||||||||||||||||||||||||||||||||||
| nodeLabelReconciler, | ||||||||||||||||||||||||||||||||||
| metrics, | ||||||||||||||||||||||||||||||||||
| logger, | ||||||||||||||||||||||||||||||||||
|
|
@@ -319,6 +367,8 @@ func runCommand(cmd *cobra.Command, args []string) { | |||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Create a download task communication channel | ||||||||||||||||||||||||||||||||||
| gopherTaskChan := make(chan *modelagent.GopherTask) | ||||||||||||||||||||||||||||||||||
| // Create a dedicated delete task channel for immediate deletion processing | ||||||||||||||||||||||||||||||||||
| deleteTaskChan := make(chan *modelagent.GopherTask) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Initialize components | ||||||||||||||||||||||||||||||||||
| scout, gopher, err := initializeComponents( | ||||||||||||||||||||||||||||||||||
|
|
@@ -328,12 +378,82 @@ func runCommand(cmd *cobra.Command, args []string) { | |||||||||||||||||||||||||||||||||
| omeInformerFactory, | ||||||||||||||||||||||||||||||||||
| metrics, | ||||||||||||||||||||||||||||||||||
| gopherTaskChan, | ||||||||||||||||||||||||||||||||||
| deleteTaskChan, | ||||||||||||||||||||||||||||||||||
| logger, | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||
| logger.Fatalf("Failed to initialize components: %v", err) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Initialize P2P distribution if enabled | ||||||||||||||||||||||||||||||||||
| var p2pDistributor *distributor.ModelDistributor | ||||||||||||||||||||||||||||||||||
| var metainfoServer *distributor.MetainfoServer | ||||||||||||||||||||||||||||||||||
| if cfg.p2pEnabled { | ||||||||||||||||||||||||||||||||||
| logger.Info("P2P model distribution is enabled, initializing...") | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Get POD_NAME and POD_IP for P2P peer identification | ||||||||||||||||||||||||||||||||||
| podName := os.Getenv("POD_NAME") | ||||||||||||||||||||||||||||||||||
| if podName == "" { | ||||||||||||||||||||||||||||||||||
| logger.Warn("POD_NAME not set, P2P coordination may not work correctly") | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| podIP := os.Getenv("POD_IP") | ||||||||||||||||||||||||||||||||||
| if podIP == "" { | ||||||||||||||||||||||||||||||||||
| logger.Warn("POD_IP not set, P2P peer discovery may not work correctly") | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Create distributor configuration | ||||||||||||||||||||||||||||||||||
| distCfg := distributor.Config{ | ||||||||||||||||||||||||||||||||||
| DataDir: cfg.modelsRootDir, | ||||||||||||||||||||||||||||||||||
| PodName: podName, | ||||||||||||||||||||||||||||||||||
| PodIP: podIP, | ||||||||||||||||||||||||||||||||||
| PeersService: cfg.p2pPeersService, | ||||||||||||||||||||||||||||||||||
| TorrentPort: cfg.p2pTorrentPort, | ||||||||||||||||||||||||||||||||||
| MetainfoPort: cfg.p2pMetainfoPort, | ||||||||||||||||||||||||||||||||||
| MaxDownloadRate: cfg.p2pMaxDownloadRate, | ||||||||||||||||||||||||||||||||||
| MaxUploadRate: cfg.p2pMaxUploadRate, | ||||||||||||||||||||||||||||||||||
| EnableEncryption: cfg.p2pEnableEncryption, | ||||||||||||||||||||||||||||||||||
| RequireEncryption: cfg.p2pRequireEncryption, | ||||||||||||||||||||||||||||||||||
| Namespace: cfg.namespace, | ||||||||||||||||||||||||||||||||||
| LeaseDurationSeconds: 120, // 2 minutes | ||||||||||||||||||||||||||||||||||
| LeaseRenewIntervalSeconds: 30, // renew every 30 seconds | ||||||||||||||||||||||||||||||||||
| P2PTimeoutSeconds: int(cfg.p2pDownloadTimeout.Seconds()), | ||||||||||||||||||||||||||||||||||
| EnableP2P: true, | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Create the P2P distributor | ||||||||||||||||||||||||||||||||||
| p2pDistributor, err = distributor.New(distCfg, logger) | ||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||
| logger.Errorf("Failed to create P2P distributor: %v", err) | ||||||||||||||||||||||||||||||||||
| logger.Warn("Continuing without P2P support") | ||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||
| // Create lease manager for P2P coordination | ||||||||||||||||||||||||||||||||||
| leaseManager := modelagent.NewP2PLeaseManager(kubeClient, cfg.namespace, cfg.nodeName, logger) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Enable P2P on the gopher | ||||||||||||||||||||||||||||||||||
| gopher.EnableP2P(p2pDistributor, leaseManager) | ||||||||||||||||||||||||||||||||||
| gopher.SetP2PTimeout(cfg.p2pDownloadTimeout) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Create and start metainfo server | ||||||||||||||||||||||||||||||||||
| metainfoServer = distributor.NewMetainfoServer( | ||||||||||||||||||||||||||||||||||
| cfg.modelsRootDir, | ||||||||||||||||||||||||||||||||||
| cfg.p2pMetainfoPort, | ||||||||||||||||||||||||||||||||||
| p2pDistributor, | ||||||||||||||||||||||||||||||||||
| logger, | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| go func() { | ||||||||||||||||||||||||||||||||||
| logger.Infof("Starting P2P metainfo server on port %d", cfg.p2pMetainfoPort) | ||||||||||||||||||||||||||||||||||
| if err := metainfoServer.ServeWithContext(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { | ||||||||||||||||||||||||||||||||||
| logger.Errorf("Metainfo server error: %v", err) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| }() | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| logger.Info("P2P model distribution initialized successfully") | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||
| logger.Info("P2P model distribution is disabled") | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+389
to
+455
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Set up a health check server | ||||||||||||||||||||||||||||||||||
| server := setupServer(cfg.port, cfg.modelsRootDir, logger) | ||||||||||||||||||||||||||||||||||
| go func() { | ||||||||||||||||||||||||||||||||||
|
|
@@ -350,6 +470,13 @@ func runCommand(cmd *cobra.Command, args []string) { | |||||||||||||||||||||||||||||||||
| if err := scout.Run(stopCh); err != nil { | ||||||||||||||||||||||||||||||||||
| logger.Fatalf("Error running scout: %v", err) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Cleanup P2P resources on shutdown | ||||||||||||||||||||||||||||||||||
| if p2pDistributor != nil { | ||||||||||||||||||||||||||||||||||
| logger.Info("Shutting down P2P distributor...") | ||||||||||||||||||||||||||||||||||
| p2pDistributor.Close() | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| _ = metainfoServer // Suppress unused warning - shutdown handled via context | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // createKubeClient creates a Kubernetes client from the provided config | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| # P2P Model Distribution | ||
|
|
||
| This directory contains Kubernetes resources for P2P-enabled model distribution in OME. | ||
|
|
||
| ## Overview | ||
|
|
||
| P2P model distribution uses BitTorrent protocol to efficiently distribute large model files across cluster nodes. Instead of each node downloading from HuggingFace simultaneously (which causes rate limiting), the first node downloads and seeds to other nodes. | ||
|
|
||
| ## Architecture | ||
|
|
||
| ``` | ||
| ┌─────────────────────────────────────────────────────────────┐ | ||
| │ Cluster │ | ||
| │ │ | ||
| │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ | ||
| │ │ Node 1 │◄──►│ Node 2 │◄──►│ Node 3 │◄──►│ Node N │ │ | ||
| │ │ (seed) │ │ (leech) │ │ (leech) │ │ (leech) │ │ | ||
| │ └────┬────┘ └─────────┘ └─────────┘ └─────────┘ │ | ||
| │ │ ▲ ▲ ▲ │ | ||
| │ │ └──────────────┴──────────────┘ │ | ||
| │ │ P2P via BitTorrent │ | ||
| │ ▼ │ | ||
| │ ┌─────────┐ │ | ||
| │ │ HF │ (only first node, coordinated via Lease) │ | ||
| │ └─────────┘ │ | ||
| │ │ | ||
| │ Peer Discovery: Headless Service DNS │ | ||
| │ Coordination: Kubernetes Lease │ | ||
| │ Storage: hostPath (enables resume) │ | ||
| └─────────────────────────────────────────────────────────────┘ | ||
| ``` | ||
|
|
||
| ## Components | ||
|
|
||
| ### Headless Service (`headless-service.yaml`) | ||
|
|
||
| Enables DNS-based peer discovery. Pods can lookup other pods' IPs to establish BitTorrent connections. | ||
|
|
||
| ### Model Agent DaemonSet (`model-agent-daemonset.yaml`) | ||
|
|
||
| Deploys the model-agent with P2P capabilities on each node. Includes: | ||
| - BitTorrent client (port 6881) | ||
| - Metainfo HTTP server (port 8081) | ||
| - Health checks and Prometheus metrics | ||
| - RBAC for leases, nodes, and model resources | ||
|
|
||
| ## Installation | ||
|
|
||
| ```bash | ||
| # Deploy the headless service | ||
| kubectl apply -f headless-service.yaml | ||
|
|
||
| # Deploy the DaemonSet with RBAC | ||
| kubectl apply -f model-agent-daemonset.yaml | ||
| ``` | ||
|
|
||
| ## Configuration | ||
|
|
||
| Environment variables for P2P configuration: | ||
|
|
||
| | Variable | Default | Description | | ||
| |----------|---------|-------------| | ||
| | `P2P_ENABLED` | `true` | Enable/disable P2P distribution | | ||
| | `PEERS_SERVICE` | `ome-peers.ome.svc.cluster.local` | Headless service DNS for peer discovery | | ||
| | `P2P_TORRENT_PORT` | `6881` | BitTorrent peer port | | ||
| | `P2P_METAINFO_PORT` | `8081` | HTTP port for metainfo sharing | | ||
| | `P2P_MAX_DOWNLOAD_RATE` | `524288000` | Max download rate (bytes/s) | | ||
| | `P2P_MAX_UPLOAD_RATE` | `524288000` | Max upload rate (bytes/s) | | ||
| | `P2P_ENCRYPTION_ENABLED` | `false` | Enable BitTorrent encryption | | ||
| | `P2P_ENCRYPTION_REQUIRED` | `false` | Require encryption for all peers | | ||
|
|
||
| ## How It Works | ||
|
|
||
| 1. **Pod starts**: Model-agent initializes P2P distributor | ||
| 2. **Model request**: Scout detects new BaseModel/ClusterBaseModel | ||
| 3. **Check local**: If model exists on hostPath, seed it | ||
| 4. **Try P2P**: Query peers for model via metainfo HTTP | ||
| 5. **Lease coordination**: If no peers have it, try to acquire lease | ||
| 6. **HF download**: Lease holder downloads from HuggingFace | ||
| 7. **Seed**: Downloaded model is seeded to other nodes | ||
| 8. **Complete**: All nodes have the model | ||
|
|
||
| ## Performance | ||
|
|
||
| | Nodes | HF Direct (parallel) | BitTorrent | | ||
| |-------|---------------------|------------| | ||
| | 1 | 20-40 min | 20-40 min (same) | | ||
| | 10 | Throttled/fails | ~5-8 min | | ||
| | 100 | Throttled/fails | ~5-10 min | | ||
|
|
||
| ## Monitoring | ||
|
|
||
| Prometheus metrics are exposed on port 8080: | ||
|
|
||
| - `ome_p2p_download_total` - Total downloads by source (p2p, hf, local) | ||
| - `ome_p2p_download_duration_seconds` - Download duration histogram | ||
| - `ome_p2p_peers_discovered` - Number of peers found via DNS | ||
| - `ome_p2p_seeding_torrents` - Number of models being seeded | ||
| - `ome_p2p_bytes_uploaded_total` - Total bytes uploaded to peers | ||
| - `ome_p2p_leases_acquired_total` - Number of leases acquired | ||
|
|
||
| ## Troubleshooting | ||
|
|
||
| ### P2P not working | ||
|
|
||
| 1. Check headless service exists: | ||
| ```bash | ||
| kubectl get svc ome-peers -n ome | ||
| ``` | ||
|
|
||
| 2. Verify DNS resolution: | ||
| ```bash | ||
| kubectl exec -it <pod> -- nslookup ome-peers.ome.svc.cluster.local | ||
| ``` | ||
|
|
||
| 3. Check P2P ports are accessible: | ||
| ```bash | ||
| kubectl exec -it <pod> -- nc -zv <peer-ip> 6881 | ||
| ``` | ||
|
|
||
| ### Lease stuck | ||
|
|
||
| 1. Check lease status: | ||
| ```bash | ||
| kubectl get leases -n ome -l ome.io/type=model-download | ||
| ``` | ||
|
|
||
| 2. Delete stuck lease: | ||
| ```bash | ||
| kubectl delete lease ome-model-<hash> -n ome | ||
| ``` | ||
|
|
||
| ### Rate limiting still occurring | ||
|
|
||
| 1. Verify P2P is enabled: | ||
| ```bash | ||
| kubectl logs <pod> | grep "P2P" | ||
| ``` | ||
|
|
||
| 2. Check only one node is downloading: | ||
| ```bash | ||
| kubectl get leases -n ome -o yaml | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| # P2P Headless Service for Peer Discovery | ||
| # This headless service enables DNS-based peer discovery for P2P model distribution. | ||
| # Pods can look up other pods' IPs via DNS to establish BitTorrent connections. | ||
| apiVersion: v1 | ||
| kind: Service | ||
| metadata: | ||
| name: ome-peers | ||
| namespace: ome | ||
| labels: | ||
| app.kubernetes.io/name: ome-model-agent | ||
| app.kubernetes.io/component: p2p-discovery | ||
| spec: | ||
| clusterIP: None # Headless service - enables DNS-based peer discovery | ||
| selector: | ||
| app.kubernetes.io/name: ome-model-agent | ||
| ports: | ||
| - port: 6881 | ||
| targetPort: 6881 | ||
| name: torrent | ||
| protocol: TCP | ||
| - port: 8081 | ||
| targetPort: 8081 | ||
| name: metainfo | ||
| protocol: TCP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of
fmt.Sscanfhere can be brittle. For example, if the environment variable is "123foo",Sscanfwill successfully parse123and return no error, which might not be the desired behavior. Usingstrconv.Atoiis more idiomatic and provides stricter parsing. It would also be clearer to parse into a new variable rather than overwritingdefaultVal. This feedback also applies togetEnvInt64.