Skip to content

Commit 5b53072

Browse files
authored
feat: Request needed pieces from new peers on connection (#188)
2 parents 9cd0c26 + 5f62cad commit 5b53072

3 files changed

Lines changed: 269 additions & 160 deletions

File tree

crates/libtortillas/src/peer/actor.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,11 @@ impl Actor for PeerActor {
330330
stream.send(PeerMessages::Bitfield(bitfield)).await?;
331331
}
332332

333+
supervisor
334+
.tell(TorrentMessage::PeerReady(peer.id.unwrap()))
335+
.await
336+
.map_err(|e| PeerActorError::SupervisorCommunicationFailed(e.to_string()))?;
337+
333338
Ok(Self {
334339
peer,
335340
stream,

crates/libtortillas/src/torrent/actor.rs

Lines changed: 237 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use dashmap::DashMap;
1414
use kameo::{Actor, actor::ActorRef, mailbox};
1515
use librqbit_utp::UtpSocketUdp;
1616
use serde::{Deserialize, Serialize};
17-
use tokio::sync::oneshot;
17+
use tokio::{fs, sync::oneshot};
1818
use tracing::{debug, error, info, instrument, trace, warn};
1919

2020
use super::util;
@@ -253,6 +253,242 @@ impl TorrentActor {
253253
self.pending_start = false;
254254
}
255255

256+
/// Handles an incoming piece block from a peer. This is the main entry point
257+
/// that orchestrates receiving, validating, and storing piece blocks. If
258+
/// all blocks for a piece are received, it triggers piece completion
259+
/// logic.
260+
pub async fn incoming_piece(&mut self, index: usize, offset: usize, block: Bytes) {
261+
let info_dict = match &self.info {
262+
Some(info) => info,
263+
None => {
264+
warn!("Received piece block before info dict was available");
265+
return;
266+
}
267+
};
268+
269+
let piece_length = info_dict.piece_length as usize;
270+
let expected_blocks = piece_length.div_ceil(BLOCK_SIZE);
271+
272+
let block_index = offset / BLOCK_SIZE;
273+
if block_index >= expected_blocks {
274+
warn!("Received piece block with invalid offset");
275+
return;
276+
}
277+
278+
if self.is_duplicate_block(index, block_index) {
279+
trace!("Received duplicate piece block");
280+
return;
281+
}
282+
283+
self.initialize_and_mark_block(index, block_index);
284+
285+
self
286+
.broadcast_to_peers(PeerTell::CancelPiece(index, offset, block.len()))
287+
.await;
288+
289+
self.write_block_to_storage(index, offset, block).await;
290+
291+
if self.is_piece_complete(index) {
292+
self.piece_completed(index).await;
293+
} else {
294+
let (piece_idx, block_offset, block_length) = self.next_block_coordinates(index);
295+
self
296+
.broadcast_to_peers(PeerTell::NeedPiece(piece_idx, block_offset, block_length))
297+
.await;
298+
trace!(piece = piece_idx, "Requested next block");
299+
}
300+
}
301+
302+
/// Checks if a block has already been received and initializes the block map
303+
/// for a piece if it doesn't exist yet. Also marks the current block as
304+
/// received in the block map.
305+
fn initialize_and_mark_block(&mut self, index: usize, block_index: usize) {
306+
if !self.block_map.contains_key(&index) {
307+
let info_dict = self
308+
.info_dict()
309+
.expect("Can't receive piece without info dict");
310+
311+
let piece_length = info_dict.piece_length as usize;
312+
let total_blocks = piece_length.div_ceil(BLOCK_SIZE);
313+
let mut vec = BitVec::with_capacity(total_blocks);
314+
vec.resize(total_blocks, false);
315+
self.block_map.insert(index, vec);
316+
}
317+
318+
self
319+
.block_map
320+
.get_mut(&index)
321+
.unwrap()
322+
.set(block_index, true);
323+
}
324+
325+
/// Writes a block to the appropriate storage location based on the
326+
/// configured storage strategy. Currently supports disk-based storage
327+
/// with file-based storage unimplemented.
328+
async fn write_block_to_storage(&self, index: usize, offset: usize, block: Bytes) {
329+
match &self.piece_storage {
330+
PieceStorageStrategy::Disk(_) => {
331+
let path = self
332+
.get_piece_path(index)
333+
.expect("Failed to get piece path");
334+
util::write_block_to_file(path, offset, block)
335+
.await
336+
.expect("Failed to write block to file")
337+
}
338+
PieceStorageStrategy::InFile => {
339+
unimplemented!()
340+
}
341+
}
342+
}
343+
344+
/// Handles the completion of a full piece. This validates the piece hash,
345+
/// sends it to the piece manager, updates the bitfield, notifies peers,
346+
/// updates trackers, and either requests the next piece or transitions to
347+
/// seeding mode if done.
348+
async fn piece_completed(&mut self, index: usize) {
349+
let info_dict = self
350+
.info_dict()
351+
.expect("Can't receive piece without info dict");
352+
353+
let previous_blocks = self.block_map.remove(&index);
354+
let cur_piece = self.next_piece;
355+
let piece_count = info_dict.piece_count();
356+
let total_length = info_dict.total_length();
357+
358+
if !self.validate_and_send_piece(index, previous_blocks).await {
359+
return;
360+
}
361+
362+
self.next_piece += 1;
363+
self.bitfield.set_aliased(index, true);
364+
365+
debug!(
366+
piece_index = index,
367+
pieces_left = piece_count.saturating_sub(index + 1),
368+
"Piece is now complete"
369+
);
370+
371+
self.broadcast_to_peers(PeerTell::Have(cur_piece)).await;
372+
373+
if let Some(total_downloaded) = self.total_bytes_downloaded() {
374+
let total_bytes_left = total_length - total_downloaded;
375+
self
376+
.update_trackers(TrackerUpdate::Left(total_bytes_left))
377+
.await;
378+
}
379+
380+
if self.next_piece >= piece_count {
381+
self.state = TorrentState::Seeding;
382+
self
383+
.update_trackers(TrackerUpdate::Event(Event::Completed))
384+
.await;
385+
self.broadcast_to_trackers(TrackerMessage::Announce).await;
386+
info!("Torrenting process completed, switching to seeding mode");
387+
} else {
388+
let (piece_idx, block_offset, block_length) = self.next_block_coordinates(self.next_piece);
389+
self
390+
.broadcast_to_peers(PeerTell::NeedPiece(piece_idx, block_offset, block_length))
391+
.await;
392+
}
393+
}
394+
395+
/// Validates a completed piece by checking its hash and sends it to the
396+
/// piece manager. Returns false if validation fails or the piece manager
397+
/// rejects it, which triggers a re-request of the piece. Returns true if
398+
/// the piece is successfully validated and stored.
399+
async fn validate_and_send_piece(
400+
&mut self, index: usize, previous_blocks: Option<(usize, BitVec)>,
401+
) -> bool {
402+
let info_dict = self
403+
.info_dict()
404+
.expect("Can't receive piece without info dict");
405+
406+
match &self.piece_storage {
407+
PieceStorageStrategy::Disk(_) => {
408+
let path = self
409+
.get_piece_path(index)
410+
.expect("Failed to get piece path");
411+
412+
if util::validate_piece_file(path.clone(), info_dict.pieces[index])
413+
.await
414+
.is_err()
415+
{
416+
warn!(path = %path.display(), index, "Piece file is invalid, clearing it");
417+
let path_clone = path.clone();
418+
419+
tokio::spawn(async move {
420+
fs::remove_file(&path_clone).await.unwrap_or_else(|_| {
421+
error!(path = ?path_clone.display(), "Failed to delete file piece");
422+
});
423+
});
424+
return false;
425+
}
426+
427+
let data = fs::read(&path).await.unwrap().into();
428+
if let Err(err) = self.piece_manager.recv(index, data).await {
429+
warn!(?err, index, path = %path.display(), "Piece manager rejected piece; re-requesting");
430+
if let Some((_, mut blocks)) = previous_blocks {
431+
blocks.fill(false);
432+
self.block_map.insert(index, blocks);
433+
}
434+
let (piece_idx, block_offset, block_length) = self.next_block_coordinates(index);
435+
self
436+
.broadcast_to_peers(PeerTell::NeedPiece(piece_idx, block_offset, block_length))
437+
.await;
438+
return false;
439+
}
440+
}
441+
PieceStorageStrategy::InFile => {
442+
unimplemented!()
443+
}
444+
}
445+
true
446+
}
447+
448+
/// Calculates the coordinates of the next block to request for a given
449+
/// piece. Returns a tuple of (piece_index, offset, block_length) where
450+
/// the offset points to the next unreceived block and the length accounts
451+
/// for the final block potentially being smaller than the standard block
452+
/// size.
453+
pub fn next_block_coordinates(&self, piece_index: usize) -> (usize, usize, usize) {
454+
let info_dict = self
455+
.info_dict()
456+
.expect("Can't receive piece without info dict");
457+
458+
let piece_length = info_dict.piece_length as usize;
459+
460+
let next_block_index = self
461+
.block_map
462+
.get(&piece_index)
463+
.and_then(|blocks| blocks.iter().position(|b| !*b))
464+
.unwrap_or(0);
465+
466+
let offset = next_block_index * BLOCK_SIZE;
467+
let is_overflowing = offset + BLOCK_SIZE > piece_length;
468+
let block_length = if is_overflowing {
469+
piece_length - offset
470+
} else {
471+
BLOCK_SIZE
472+
};
473+
474+
(piece_index, offset, block_length)
475+
}
476+
477+
fn is_duplicate_block(&self, index: usize, block_index: usize) -> bool {
478+
self
479+
.block_map
480+
.get(&index)
481+
.and_then(|block_map| block_map.get(block_index).as_deref().copied())
482+
.unwrap_or(false)
483+
}
484+
485+
fn is_piece_complete(&self, index: usize) -> bool {
486+
self
487+
.block_map
488+
.get(&index)
489+
.map(|blocks| blocks.iter().all(|b| *b))
490+
.unwrap_or(false)
491+
}
256492
pub async fn start(&mut self) {
257493
if self.is_full() {
258494
self.state = TorrentState::Seeding;

0 commit comments

Comments
 (0)