Skip to content

Commit 1a8a109

Browse files
authored
Merge pull request #125 from decipherhub/fix/eth-subscribe
fix: broadcast blocks to eth_subscribe subscribers
2 parents 062eee5 + cde6936 commit 1a8a109

4 files changed

Lines changed: 136 additions & 3 deletions

File tree

crates/node/src/node.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,16 @@ impl Node {
756756
None
757757
};
758758

759+
// Create subscription manager for WebSocket subscriptions (eth_subscribe)
760+
// This needs to be shared between the RPC server and the event loop
761+
// so that new blocks can be broadcast to subscribers
762+
let subscription_manager = if rpc_storage.is_some() {
763+
use cipherbft_rpc::SubscriptionManager;
764+
Some(Arc::new(SubscriptionManager::default()))
765+
} else {
766+
None
767+
};
768+
759769
// Start RPC server if enabled
760770
if let Some(ref storage) = rpc_storage {
761771
use cipherbft_rpc::{
@@ -772,8 +782,16 @@ impl Node {
772782
let executor = Arc::new(StubExecutionApi::new());
773783
let network = Arc::new(StubNetworkApi::new());
774784

775-
let rpc_server =
776-
RpcServer::new(rpc_config, storage.clone(), mempool, executor, network);
785+
// Use with_subscription_manager to share the subscription manager
786+
// between the RPC server and the event loop for broadcasting blocks
787+
let rpc_server = RpcServer::with_subscription_manager(
788+
rpc_config,
789+
storage.clone(),
790+
mempool,
791+
executor,
792+
network,
793+
subscription_manager.clone().unwrap(),
794+
);
777795

778796
let http_port = self.config.rpc_http_port;
779797
let ws_port = self.config.rpc_ws_port;
@@ -819,6 +837,7 @@ impl Node {
819837
&mut decided_rx,
820838
execution_bridge,
821839
rpc_storage,
840+
subscription_manager,
822841
)
823842
.await;
824843

@@ -840,6 +859,7 @@ impl Node {
840859
}
841860

842861
/// Internal event loop that handles messages and can be cancelled.
862+
#[allow(clippy::too_many_arguments)]
843863
async fn run_event_loop(
844864
cancel_token: CancellationToken,
845865
primary_incoming_rx: &mut mpsc::Receiver<(ValidatorId, DclMessage)>,
@@ -848,6 +868,7 @@ impl Node {
848868
decided_rx: &mut mpsc::Receiver<(ConsensusHeight, Cut)>,
849869
execution_bridge: Option<Arc<ExecutionBridge>>,
850870
rpc_storage: Option<Arc<cipherbft_rpc::MdbxRpcStorage<InMemoryProvider>>>,
871+
subscription_manager: Option<Arc<cipherbft_rpc::SubscriptionManager>>,
851872
) -> Result<()> {
852873
loop {
853874
tokio::select! {
@@ -940,6 +961,14 @@ impl Node {
940961
error!("Failed to store block {} to MDBX: {}", height.0, e);
941962
} else {
942963
debug!("Stored block {} to MDBX with hash {}", height.0, block_result.block_hash);
964+
965+
// Broadcast to WebSocket subscribers (eth_subscribe("newHeads"))
966+
if let Some(ref sub_mgr) = subscription_manager {
967+
use cipherbft_rpc::storage_block_to_rpc_block;
968+
let rpc_block = storage_block_to_rpc_block(block.clone(), false);
969+
sub_mgr.broadcast_block(rpc_block);
970+
debug!("Broadcast block {} to WebSocket subscribers", height.0);
971+
}
943972
}
944973

945974
// Store receipts for eth_getBlockReceipts queries

crates/rpc/src/adapters.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1811,6 +1811,76 @@ impl NetworkApi for StubNetworkApi {
18111811
}
18121812
}
18131813

1814+
/// Convert a storage block to an RPC block format.
1815+
///
1816+
/// This is a standalone function for use by the node when broadcasting
1817+
/// new blocks to WebSocket subscribers via `eth_subscribe("newHeads")`.
1818+
pub fn storage_block_to_rpc_block(
1819+
storage_block: cipherbft_storage::blocks::Block,
1820+
full_txs: bool,
1821+
) -> Block {
1822+
use alloy_primitives::{Bloom, B64};
1823+
1824+
// Convert transaction hashes to B256
1825+
let tx_hashes: Vec<B256> = storage_block
1826+
.transaction_hashes
1827+
.iter()
1828+
.map(|h| B256::from(*h))
1829+
.collect();
1830+
1831+
// Build transactions field based on full_txs flag
1832+
let transactions = if full_txs {
1833+
// TODO: In the future, this should return full Transaction objects
1834+
BlockTransactions::Hashes(tx_hashes)
1835+
} else {
1836+
BlockTransactions::Hashes(tx_hashes)
1837+
};
1838+
1839+
// Build the consensus header
1840+
let consensus_header = alloy_consensus::Header {
1841+
parent_hash: B256::from(storage_block.parent_hash),
1842+
ommers_hash: B256::from(storage_block.ommers_hash),
1843+
beneficiary: Address::from(storage_block.beneficiary),
1844+
state_root: B256::from(storage_block.state_root),
1845+
transactions_root: B256::from(storage_block.transactions_root),
1846+
receipts_root: B256::from(storage_block.receipts_root),
1847+
logs_bloom: Bloom::from_slice(&storage_block.logs_bloom),
1848+
difficulty: U256::from_be_bytes(storage_block.difficulty),
1849+
number: storage_block.number,
1850+
gas_limit: storage_block.gas_limit,
1851+
gas_used: storage_block.gas_used,
1852+
timestamp: storage_block.timestamp,
1853+
extra_data: Bytes::from(storage_block.extra_data.clone()),
1854+
mix_hash: B256::from(storage_block.mix_hash),
1855+
nonce: B64::from(storage_block.nonce),
1856+
base_fee_per_gas: storage_block.base_fee_per_gas,
1857+
withdrawals_root: None,
1858+
blob_gas_used: None,
1859+
excess_blob_gas: None,
1860+
parent_beacon_block_root: None,
1861+
requests_hash: None,
1862+
};
1863+
1864+
// Build the RPC header with hash and total difficulty
1865+
let block_hash = B256::from(storage_block.hash);
1866+
let total_difficulty = U256::from_be_bytes(storage_block.total_difficulty);
1867+
1868+
let rpc_header = Header {
1869+
hash: block_hash,
1870+
inner: consensus_header,
1871+
total_difficulty: Some(total_difficulty),
1872+
size: None,
1873+
};
1874+
1875+
// Build the final RPC block
1876+
Block {
1877+
header: rpc_header,
1878+
uncles: Vec::new(),
1879+
transactions,
1880+
withdrawals: None,
1881+
}
1882+
}
1883+
18141884
#[cfg(test)]
18151885
mod tests {
18161886
use super::*;

crates/rpc/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,9 @@ pub use adapters::{StubExecutionApi, StubMempoolApi, StubNetworkApi, StubRpcStor
167167
// Real implementations backed by storage
168168
pub use adapters::{EvmExecutionApi, MdbxRpcStorage, PoolMempoolApi, ProviderBasedRpcStorage};
169169

170+
// Block conversion utilities for subscription broadcasting
171+
pub use adapters::storage_block_to_rpc_block;
172+
170173
// RPC server traits (for method registration)
171174
pub use eth::EthRpcServer;
172175
pub use net::NetRpcServer;

crates/rpc/src/server.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,37 @@ where
7272
mempool: Arc<M>,
7373
executor: Arc<E>,
7474
network: Arc<N>,
75+
) -> Self {
76+
Self::with_subscription_manager(
77+
config,
78+
storage,
79+
mempool,
80+
executor,
81+
network,
82+
Arc::new(SubscriptionManager::default()),
83+
)
84+
}
85+
86+
/// Create a new RPC server with an external subscription manager.
87+
///
88+
/// This allows sharing the subscription manager with other components
89+
/// (e.g., the node's event loop) to enable broadcasting events to
90+
/// WebSocket subscribers.
91+
pub fn with_subscription_manager(
92+
config: RpcConfig,
93+
storage: Arc<S>,
94+
mempool: Arc<M>,
95+
executor: Arc<E>,
96+
network: Arc<N>,
97+
subscription_manager: Arc<SubscriptionManager>,
7598
) -> Self {
7699
Self {
77100
config: Arc::new(config),
78101
storage,
79102
mempool,
80103
executor,
81104
network,
82-
subscription_manager: Arc::new(SubscriptionManager::default()),
105+
subscription_manager,
83106
state: Arc::new(RwLock::new(ServerState::Stopped)),
84107
http_handle: Arc::new(RwLock::new(None)),
85108
ws_handle: Arc::new(RwLock::new(None)),
@@ -96,6 +119,14 @@ where
96119
&self.subscription_manager
97120
}
98121

122+
/// Get the subscription manager as an Arc.
123+
///
124+
/// This is useful for sharing the subscription manager with other components
125+
/// that need to broadcast events to WebSocket subscribers.
126+
pub fn subscription_manager_arc(&self) -> Arc<SubscriptionManager> {
127+
Arc::clone(&self.subscription_manager)
128+
}
129+
99130
/// Get the current server state.
100131
pub async fn state(&self) -> ServerState {
101132
*self.state.read().await

0 commit comments

Comments
 (0)