Skip to content

Commit 953287c

Browse files
committed
fix(p2p): add dedicated delete channel and skip redundant downloads
- Add dedicated deleteTaskChan for delete tasks to ensure deletions are never blocked by downloads (even with 100 concurrent downloads) - Scout sends delete tasks to deleteChan instead of gopherChan - Gopher runs dedicated runDeleteWorker for immediate delete processing - Skip download if already seeding the model (prevents infinite loop when update events trigger DownloadOverride for already-downloaded models)
1 parent 79ddf68 commit 953287c

4 files changed

Lines changed: 273 additions & 71 deletions

File tree

cmd/model-agent/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ func initializeComponents(
238238
omeInformerFactory omev1beta1informers.SharedInformerFactory,
239239
metrics *modelagent.Metrics,
240240
gopherTaskChan chan *modelagent.GopherTask,
241+
deleteTaskChan chan *modelagent.GopherTask,
241242
logger *Logger,
242243
) (*modelagent.Scout, *modelagent.Gopher, error) {
243244
// Create node label reconciler for labeling the node based on model status
@@ -263,6 +264,7 @@ func initializeComponents(
263264
clusterBaseModelInformer,
264265
omeInformerFactory,
265266
gopherTaskChan,
267+
deleteTaskChan,
266268
kubeClient,
267269
logger)
268270
if err != nil {
@@ -308,6 +310,7 @@ func initializeComponents(
308310
cfg.downloadRetry,
309311
cfg.modelsRootDir,
310312
gopherTaskChan,
313+
deleteTaskChan,
311314
nodeLabelReconciler,
312315
metrics,
313316
logger,
@@ -364,6 +367,8 @@ func runCommand(cmd *cobra.Command, args []string) {
364367

365368
// Create a download task communication channel
366369
gopherTaskChan := make(chan *modelagent.GopherTask)
370+
// Create a dedicated delete task channel for immediate deletion processing
371+
deleteTaskChan := make(chan *modelagent.GopherTask)
367372

368373
// Initialize components
369374
scout, gopher, err := initializeComponents(
@@ -373,6 +378,7 @@ func runCommand(cmd *cobra.Command, args []string) {
373378
omeInformerFactory,
374379
metrics,
375380
gopherTaskChan,
381+
deleteTaskChan,
376382
logger,
377383
)
378384
if err != nil {

pkg/distributor/distributor.go

Lines changed: 104 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"net"
1010
"net/http"
1111
"net/netip"
12+
"os"
13+
"path/filepath"
1214
"sync"
1315
"time"
1416

@@ -121,9 +123,15 @@ func (d *ModelDistributor) Close() {
121123
}
122124

123125
// TryP2PDownload attempts to download a model from peers.
126+
// destPath is the final destination where the model files should be placed.
124127
// Returns nil if successful, error if P2P download is not available.
125-
func (d *ModelDistributor) TryP2PDownload(ctx context.Context, modelHash string, timeout time.Duration) error {
126-
peers, err := d.discoverPeers()
128+
func (d *ModelDistributor) TryP2PDownload(ctx context.Context, modelHash, destPath string, timeout time.Duration) error {
129+
// Check context before starting - fail fast if already cancelled
130+
if ctx.Err() != nil {
131+
return ctx.Err()
132+
}
133+
134+
peers, err := d.discoverPeers(ctx)
127135
if err != nil || len(peers) == 0 {
128136
return fmt.Errorf("no peers available: %v", err)
129137
}
@@ -142,25 +150,49 @@ func (d *ModelDistributor) TryP2PDownload(ctx context.Context, modelHash string,
142150
return fmt.Errorf("failed to add torrent: %w", err)
143151
}
144152

145-
// Add discovered peers
146-
peerInfos := make([]torrent.PeerInfo, len(peers))
147-
for i, p := range peers {
148-
peerInfos[i] = p
149-
}
150-
t.AddPeers(peerInfos)
151-
152153
// Wait for download with timeout
153154
ctx, cancel := context.WithTimeout(ctx, timeout)
154155
defer cancel()
155156

156157
select {
157158
case <-t.GotInfo():
159+
// Add peers after torrent info is available (required for proper handshaking)
160+
peerInfos := make([]torrent.PeerInfo, len(peers))
161+
for i, p := range peers {
162+
peerInfos[i] = p
163+
}
164+
t.AddPeers(peerInfos)
165+
d.logger.Infof("Added %d peers for model %s, starting download", len(peers), modelHash)
166+
158167
t.DownloadAll()
159168
if !d.waitForComplete(ctx, t) {
160169
t.Drop()
161170
return fmt.Errorf("download incomplete within timeout")
162171
}
163172

173+
// Downloaded files are at {dataDir}/{modelHash}
174+
// Move them to destPath and create symlink for continued seeding
175+
downloadPath := filepath.Join(d.dataDir, modelHash)
176+
177+
// Ensure parent directory of destPath exists
178+
if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil {
179+
t.Drop()
180+
return fmt.Errorf("failed to create parent directory: %w", err)
181+
}
182+
183+
// Move downloaded files to destination
184+
if err := os.Rename(downloadPath, destPath); err != nil {
185+
t.Drop()
186+
return fmt.Errorf("failed to move downloaded files to destination: %w", err)
187+
}
188+
d.logger.Infof("Moved downloaded model from %s to %s", downloadPath, destPath)
189+
190+
// Create symlink from hash path to destination for continued seeding
191+
if err := os.Symlink(destPath, downloadPath); err != nil {
192+
d.logger.Warnf("Failed to create symlink for seeding: %v", err)
193+
// Don't fail the download, seeding is optional
194+
}
195+
164196
// Store for seeding
165197
d.torrentsMu.Lock()
166198
d.activeTorrents[modelHash] = t
@@ -184,6 +216,20 @@ func (d *ModelDistributor) SeedModel(path, modelHash string) error {
184216
return nil
185217
}
186218

219+
// Create symlink from {dataDir}/{modelHash} to actual model path
220+
// This allows the torrent client to find files at the expected location
221+
symlinkPath := filepath.Join(d.dataDir, modelHash)
222+
if _, err := os.Lstat(symlinkPath); err == nil {
223+
// Symlink or file already exists, remove it
224+
if err := os.Remove(symlinkPath); err != nil {
225+
return fmt.Errorf("failed to remove existing symlink: %w", err)
226+
}
227+
}
228+
if err := os.Symlink(path, symlinkPath); err != nil {
229+
return fmt.Errorf("failed to create symlink for seeding: %w", err)
230+
}
231+
d.logger.Debugf("Created symlink %s -> %s for seeding", symlinkPath, path)
232+
187233
d.logger.Infof("Creating metainfo for model %s at path %s (this may take several minutes for large models)...", modelHash, path)
188234
startTime := time.Now()
189235

@@ -223,7 +269,12 @@ func (d *ModelDistributor) StopSeeding(modelHash string) {
223269

224270
// HasPeers checks if there are any peers available for the model.
225271
func (d *ModelDistributor) HasPeers(ctx context.Context, modelHash string) bool {
226-
peers, err := d.discoverPeers()
272+
// Check context before starting
273+
if ctx.Err() != nil {
274+
return false
275+
}
276+
277+
peers, err := d.discoverPeers(ctx)
227278
if err != nil || len(peers) == 0 {
228279
return false
229280
}
@@ -243,17 +294,10 @@ func (d *ModelDistributor) GetMetainfo(modelHash string) (*metainfo.MetaInfo, bo
243294
return nil, false
244295
}
245296

246-
info := t.Info()
247-
if info == nil {
248-
return nil, false
249-
}
250-
251-
infoBytes, err := bencode.Marshal(info)
252-
if err != nil {
253-
return nil, false
254-
}
255-
256-
return &metainfo.MetaInfo{InfoBytes: infoBytes}, true
297+
// Use the torrent's Metainfo() method to get the correct info bytes
298+
// Re-marshaling t.Info() would produce different bytes (different info hash)
299+
mi := t.Metainfo()
300+
return &mi, true
257301
}
258302

259303
// IsSeeding returns whether the distributor is seeding the given model.
@@ -292,19 +336,25 @@ type Stats struct {
292336
}
293337

294338
// discoverPeers uses DNS to find other pods in the headless service.
295-
func (d *ModelDistributor) discoverPeers() ([]torrent.PeerInfo, error) {
339+
func (d *ModelDistributor) discoverPeers(ctx context.Context) ([]torrent.PeerInfo, error) {
296340
if d.peersService == "" {
297341
return nil, fmt.Errorf("peers service not configured")
298342
}
299343

300-
ips, err := net.LookupIP(d.peersService)
344+
// Use context-aware DNS resolver to support cancellation
345+
resolver := net.Resolver{}
346+
ips, err := resolver.LookupIPAddr(ctx, d.peersService)
301347
if err != nil {
348+
// Check if cancelled
349+
if ctx.Err() != nil {
350+
return nil, ctx.Err()
351+
}
302352
return nil, fmt.Errorf("DNS lookup failed: %w", err)
303353
}
304354

305355
var peers []torrent.PeerInfo
306356
for _, ip := range ips {
307-
ipStr := ip.String()
357+
ipStr := ip.IP.String()
308358
if ipStr == d.podIP {
309359
continue // skip self
310360
}
@@ -324,30 +374,42 @@ func (d *ModelDistributor) discoverPeers() ([]torrent.PeerInfo, error) {
324374
// fetchMetainfoFromPeer tries each peer until one responds with metainfo.
325375
func (d *ModelDistributor) fetchMetainfoFromPeer(ctx context.Context, peers []torrent.PeerInfo, modelHash string) (*metainfo.MetaInfo, error) {
326376
for _, peer := range peers {
377+
// Check context before each peer attempt - fail fast if cancelled
378+
if ctx.Err() != nil {
379+
return nil, ctx.Err()
380+
}
381+
327382
// Extract IP from peer address (format: ip:port)
328383
addrPort, ok := peer.Addr.(netip.AddrPort)
329384
if !ok {
330385
continue
331386
}
332387
url := fmt.Sprintf("http://%s:%d/metainfo/%s", addrPort.Addr().String(), d.metainfoPort, modelHash)
333388

334-
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
389+
// Use per-request timeout context (10s per peer)
390+
reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
391+
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, url, nil)
335392
if err != nil {
393+
cancel()
336394
continue
337395
}
338396

339397
resp, err := d.httpClient.Do(req)
340398
if err != nil {
399+
cancel()
341400
d.logger.Debugf("Failed to fetch metainfo from %s: %v", url, err)
342401
continue
343402
}
344-
defer resp.Body.Close()
345403

346404
if resp.StatusCode != http.StatusOK {
405+
resp.Body.Close()
406+
cancel()
347407
continue
348408
}
349409

350410
mi, err := metainfo.Load(resp.Body)
411+
resp.Body.Close() // Close immediately after reading, not deferred in loop
412+
cancel()
351413
if err != nil {
352414
continue
353415
}
@@ -359,20 +421,23 @@ func (d *ModelDistributor) fetchMetainfoFromPeer(ctx context.Context, peers []to
359421
return nil, fmt.Errorf("no peer has metainfo for %s", modelHash)
360422
}
361423

362-
// waitForComplete polls until torrent download is complete.
424+
// waitForComplete waits until torrent download is complete using event-based waiting.
363425
func (d *ModelDistributor) waitForComplete(ctx context.Context, t *torrent.Torrent) bool {
364-
ticker := time.NewTicker(time.Second)
365-
defer ticker.Stop()
366-
367-
for {
368-
select {
369-
case <-ctx.Done():
370-
return false
371-
case <-ticker.C:
372-
if t.Complete().Bool() {
373-
return true
374-
}
375-
}
426+
// Get completion status - returns a struct with a channel that closes when complete
427+
completion := t.Complete()
428+
429+
// If already complete, return immediately
430+
if completion.Bool() {
431+
return true
432+
}
433+
434+
// Wait for completion or context cancellation using event-based waiting
435+
// This is more efficient than polling every second
436+
select {
437+
case <-ctx.Done():
438+
return false
439+
case <-completion.On():
440+
return true
376441
}
377442
}
378443

0 commit comments

Comments
 (0)