Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 103 additions & 2 deletions examples/deshred.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use jito_protos::shredstream::{
shredstream_proxy_client::ShredstreamProxyClient, SubscribeEntriesRequest,
};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use solana_sdk::message::VersionedMessage;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let mut client = ShredstreamProxyClient::connect("http://127.0.0.1:9999")
let mut client = ShredstreamProxyClient::connect("http://X.X.X.X:Y")
.await
.unwrap();
let mut stream = client
Expand All @@ -14,6 +17,12 @@ async fn main() -> Result<(), std::io::Error> {
.into_inner();

while let Some(slot_entry) = stream.message().await.unwrap() {
// Create a hash identifier for this message batch
let mut hasher = DefaultHasher::new();
slot_entry.entries.hash(&mut hasher);
let message_hash = hasher.finish();
let message_id = format!("{:08x}", message_hash & 0xFFFFFFFF); // Use last 8 hex digits

let entries =
match bincode::deserialize::<Vec<solana_entry::entry::Entry>>(&slot_entry.entries) {
Ok(e) => e,
Expand All @@ -23,11 +32,103 @@ async fn main() -> Result<(), std::io::Error> {
}
};
println!(
"slot {}, entries: {}, transactions: {}",
"[{}] slot {}, entries: {}, transactions: {}",
message_id,
slot_entry.slot,
entries.len(),
entries.iter().map(|e| e.transactions.len()).sum::<usize>()
);
// Print transaction count for each individual entry
for (i, entry) in entries.iter().enumerate() {
println!(" [{}] Entry {}: {} transactions", message_id, i + 1, entry.transactions.len());

// Print details for each transaction in this entry
for (j, transaction) in entry.transactions.iter().enumerate() {
println!(" ╔══ Transaction {} ══", j + 1);

// Show all signatures
println!(" ║ Signatures ({}): ", transaction.signatures.len());
for (sig_idx, signature) in transaction.signatures.iter().enumerate() {
println!(" ║ {}: {}", sig_idx + 1, signature);
}

// Show message details
println!(" ║ Message:");

// Handle VersionedMessage - get detailed info
match &transaction.message {
VersionedMessage::Legacy(legacy_msg) => {
println!(" ║ Type: Legacy");
println!(" ║ Recent Blockhash: {}", legacy_msg.recent_blockhash);
println!(" ║ Header: req_sigs={}, readonly_signed={}, readonly_unsigned={}",
legacy_msg.header.num_required_signatures,
legacy_msg.header.num_readonly_signed_accounts,
legacy_msg.header.num_readonly_unsigned_accounts
);

// Show account keys
println!(" ║ Account Keys ({}):", legacy_msg.account_keys.len());
for (acc_idx, account) in legacy_msg.account_keys.iter().enumerate() {
println!(" ║ {}: {}", acc_idx, account);
}

// Show instructions
println!(" ║ Instructions ({}):", legacy_msg.instructions.len());
for (inst_idx, instruction) in legacy_msg.instructions.iter().enumerate() {
println!(" ║ Instruction {}:", inst_idx + 1);
println!(" ║ Program ID Index: {}", instruction.program_id_index);
println!(" ║ Account Indices: {:?}", instruction.accounts);
println!(" ║ Data: {} bytes", instruction.data.len());
if instruction.data.len() <= 32 {
println!(" ║ Data Hex: {}", hex::encode(&instruction.data));
} else {
println!(" ║ Data Hex (first 32 bytes): {}", hex::encode(&instruction.data[..32]));
}
}
}
VersionedMessage::V0(v0_msg) => {
println!(" ║ Type: V0");
println!(" ║ Recent Blockhash: {}", v0_msg.recent_blockhash);
println!(" ║ Header: req_sigs={}, readonly_signed={}, readonly_unsigned={}",
v0_msg.header.num_required_signatures,
v0_msg.header.num_readonly_signed_accounts,
v0_msg.header.num_readonly_unsigned_accounts
);

// Show account keys
println!(" ║ Account Keys ({}):", v0_msg.account_keys.len());
for (acc_idx, account) in v0_msg.account_keys.iter().enumerate() {
println!(" ║ {}: {}", acc_idx, account);
}

// Show address table lookups if any
if !v0_msg.address_table_lookups.is_empty() {
println!(" ║ Address Table Lookups ({}):", v0_msg.address_table_lookups.len());
for (lookup_idx, lookup) in v0_msg.address_table_lookups.iter().enumerate() {
println!(" ║ {}: table={}, writable={:?}, readonly={:?}",
lookup_idx, lookup.account_key, lookup.writable_indexes, lookup.readonly_indexes);
}
}

// Show instructions
println!(" ║ Instructions ({}):", v0_msg.instructions.len());
for (inst_idx, instruction) in v0_msg.instructions.iter().enumerate() {
println!(" ║ Instruction {}:", inst_idx + 1);
println!(" ║ Program ID Index: {}", instruction.program_id_index);
println!(" ║ Account Indices: {:?}", instruction.accounts);
println!(" ║ Data: {} bytes", instruction.data.len());
if instruction.data.len() <= 32 {
println!(" ║ Data Hex: {}", hex::encode(&instruction.data));
} else {
println!(" ║ Data Hex (first 32 bytes): {}", hex::encode(&instruction.data[..32]));
}
}
}
}

println!(" ╚══════════════════════════════════");
}
}
}
Ok(())
}
21 changes: 21 additions & 0 deletions proxy/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub fn start_forwarder_threads(
src_port: u16,
maybe_multicast_socket: Option<Vec<UdpSocket>>,
num_threads: Option<usize>,
multicast_ttl: Option<u32>,
deduper: Arc<RwLock<Deduper<2, [u8]>>>,
should_reconstruct_shreds: bool,
entry_sender: Arc<Sender<PbEntry>>,
Expand Down Expand Up @@ -163,6 +164,11 @@ pub fn start_forwarder_threads(
let send_socket =
UdpSocket::bind(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0))
.expect("to bind to udp port for forwarding");
if let Some(ttl) = multicast_ttl {
send_socket
.set_multicast_ttl_v4(ttl)
.expect("to set multicast TTL");
}
let mut local_dest_sockets = unioned_dest_sockets.load();

let refresh_subscribers_tick = if use_discovery_service {
Expand Down Expand Up @@ -725,4 +731,19 @@ mod tests {
6
);
}

#[test]
fn test_multicast_ttl_is_set() {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
// Default multicast TTL should be 1
assert_eq!(socket.multicast_ttl_v4().unwrap(), 1);

// Set TTL to 64 and verify
socket.set_multicast_ttl_v4(64).unwrap();
assert_eq!(socket.multicast_ttl_v4().unwrap(), 64);

// Set TTL to 128 and verify
socket.set_multicast_ttl_v4(128).unwrap();
assert_eq!(socket.multicast_ttl_v4().unwrap(), 128);
}
}
6 changes: 6 additions & 0 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ struct CommonArgs {
/// Number of threads to use. Defaults to use up to 4.
#[arg(long, env)]
num_threads: Option<usize>,

/// Multicast TTL (time-to-live / hop limit) for forwarded packets.
/// Only relevant when forwarding to multicast destinations. Default OS value is 1.
#[arg(long, env)]
multicast_ttl: Option<u32>,
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -302,6 +307,7 @@ fn main() -> Result<(), ShredstreamProxyError> {
args.src_bind_port,
maybe_multicast_socket,
args.num_threads,
args.multicast_ttl,
deduper.clone(),
args.grpc_service_port.is_some(),
entry_sender.clone(),
Expand Down