diff --git a/examples/deshred.rs b/examples/deshred.rs index 7ba1f475..a90ed1ef 100644 --- a/examples/deshred.rs +++ b/examples/deshred.rs @@ -1,10 +1,13 @@ use jito_protos::shredstream::{ shredstream_proxy_client::ShredstreamProxyClient, SubscribeEntriesRequest, }; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; +use solana_sdk::message::VersionedMessage; #[tokio::main] async fn main() -> Result<(), std::io::Error> { - let mut client = ShredstreamProxyClient::connect("http://127.0.0.1:9999") + let mut client = ShredstreamProxyClient::connect("http://X.X.X.X:Y") .await .unwrap(); let mut stream = client @@ -14,6 +17,12 @@ async fn main() -> Result<(), std::io::Error> { .into_inner(); while let Some(slot_entry) = stream.message().await.unwrap() { + // Create a hash identifier for this message batch + let mut hasher = DefaultHasher::new(); + slot_entry.entries.hash(&mut hasher); + let message_hash = hasher.finish(); + let message_id = format!("{:08x}", message_hash & 0xFFFFFFFF); // Use last 8 hex digits + let entries = match bincode::deserialize::>(&slot_entry.entries) { Ok(e) => e, @@ -23,11 +32,103 @@ async fn main() -> Result<(), std::io::Error> { } }; println!( - "slot {}, entries: {}, transactions: {}", + "[{}] slot {}, entries: {}, transactions: {}", + message_id, slot_entry.slot, entries.len(), entries.iter().map(|e| e.transactions.len()).sum::() ); + // Print transaction count for each individual entry + for (i, entry) in entries.iter().enumerate() { + println!(" [{}] Entry {}: {} transactions", message_id, i + 1, entry.transactions.len()); + + // Print details for each transaction in this entry + for (j, transaction) in entry.transactions.iter().enumerate() { + println!(" ╔══ Transaction {} ══", j + 1); + + // Show all signatures + println!(" ║ Signatures ({}): ", transaction.signatures.len()); + for (sig_idx, signature) in transaction.signatures.iter().enumerate() { + println!(" ║ {}: {}", sig_idx + 1, signature); + } + + // Show message details + println!(" ║ Message:"); + + // Handle VersionedMessage - get detailed info + match &transaction.message { + VersionedMessage::Legacy(legacy_msg) => { + println!(" ║ Type: Legacy"); + println!(" ║ Recent Blockhash: {}", legacy_msg.recent_blockhash); + println!(" ║ Header: req_sigs={}, readonly_signed={}, readonly_unsigned={}", + legacy_msg.header.num_required_signatures, + legacy_msg.header.num_readonly_signed_accounts, + legacy_msg.header.num_readonly_unsigned_accounts + ); + + // Show account keys + println!(" ║ Account Keys ({}):", legacy_msg.account_keys.len()); + for (acc_idx, account) in legacy_msg.account_keys.iter().enumerate() { + println!(" ║ {}: {}", acc_idx, account); + } + + // Show instructions + println!(" ║ Instructions ({}):", legacy_msg.instructions.len()); + for (inst_idx, instruction) in legacy_msg.instructions.iter().enumerate() { + println!(" ║ Instruction {}:", inst_idx + 1); + println!(" ║ Program ID Index: {}", instruction.program_id_index); + println!(" ║ Account Indices: {:?}", instruction.accounts); + println!(" ║ Data: {} bytes", instruction.data.len()); + if instruction.data.len() <= 32 { + println!(" ║ Data Hex: {}", hex::encode(&instruction.data)); + } else { + println!(" ║ Data Hex (first 32 bytes): {}", hex::encode(&instruction.data[..32])); + } + } + } + VersionedMessage::V0(v0_msg) => { + println!(" ║ Type: V0"); + println!(" ║ Recent Blockhash: {}", v0_msg.recent_blockhash); + println!(" ║ Header: req_sigs={}, readonly_signed={}, readonly_unsigned={}", + v0_msg.header.num_required_signatures, + v0_msg.header.num_readonly_signed_accounts, + v0_msg.header.num_readonly_unsigned_accounts + ); + + // Show account keys + println!(" ║ Account Keys ({}):", v0_msg.account_keys.len()); + for (acc_idx, account) in v0_msg.account_keys.iter().enumerate() { + println!(" ║ {}: {}", acc_idx, account); + } + + // Show address table lookups if any + if !v0_msg.address_table_lookups.is_empty() { + println!(" ║ Address Table Lookups ({}):", v0_msg.address_table_lookups.len()); + for (lookup_idx, lookup) in v0_msg.address_table_lookups.iter().enumerate() { + println!(" ║ {}: table={}, writable={:?}, readonly={:?}", + lookup_idx, lookup.account_key, lookup.writable_indexes, lookup.readonly_indexes); + } + } + + // Show instructions + println!(" ║ Instructions ({}):", v0_msg.instructions.len()); + for (inst_idx, instruction) in v0_msg.instructions.iter().enumerate() { + println!(" ║ Instruction {}:", inst_idx + 1); + println!(" ║ Program ID Index: {}", instruction.program_id_index); + println!(" ║ Account Indices: {:?}", instruction.accounts); + println!(" ║ Data: {} bytes", instruction.data.len()); + if instruction.data.len() <= 32 { + println!(" ║ Data Hex: {}", hex::encode(&instruction.data)); + } else { + println!(" ║ Data Hex (first 32 bytes): {}", hex::encode(&instruction.data[..32])); + } + } + } + } + + println!(" ╚══════════════════════════════════"); + } + } } Ok(()) } diff --git a/proxy/src/forwarder.rs b/proxy/src/forwarder.rs index 0fdb5f16..0f644f53 100644 --- a/proxy/src/forwarder.rs +++ b/proxy/src/forwarder.rs @@ -51,6 +51,7 @@ pub fn start_forwarder_threads( src_port: u16, maybe_multicast_socket: Option>, num_threads: Option, + multicast_ttl: Option, deduper: Arc>>, should_reconstruct_shreds: bool, entry_sender: Arc>, @@ -163,6 +164,11 @@ pub fn start_forwarder_threads( let send_socket = UdpSocket::bind(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)) .expect("to bind to udp port for forwarding"); + if let Some(ttl) = multicast_ttl { + send_socket + .set_multicast_ttl_v4(ttl) + .expect("to set multicast TTL"); + } let mut local_dest_sockets = unioned_dest_sockets.load(); let refresh_subscribers_tick = if use_discovery_service { @@ -725,4 +731,19 @@ mod tests { 6 ); } + + #[test] + fn test_multicast_ttl_is_set() { + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + // Default multicast TTL should be 1 + assert_eq!(socket.multicast_ttl_v4().unwrap(), 1); + + // Set TTL to 64 and verify + socket.set_multicast_ttl_v4(64).unwrap(); + assert_eq!(socket.multicast_ttl_v4().unwrap(), 64); + + // Set TTL to 128 and verify + socket.set_multicast_ttl_v4(128).unwrap(); + assert_eq!(socket.multicast_ttl_v4().unwrap(), 128); + } } diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 8bcae1c7..f82895d9 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -144,6 +144,11 @@ struct CommonArgs { /// Number of threads to use. Defaults to use up to 4. #[arg(long, env)] num_threads: Option, + + /// Multicast TTL (time-to-live / hop limit) for forwarded packets. + /// Only relevant when forwarding to multicast destinations. Default OS value is 1. + #[arg(long, env)] + multicast_ttl: Option, } #[derive(Debug, Error)] @@ -302,6 +307,7 @@ fn main() -> Result<(), ShredstreamProxyError> { args.src_bind_port, maybe_multicast_socket, args.num_threads, + args.multicast_ttl, deduper.clone(), args.grpc_service_port.is_some(), entry_sender.clone(),