Skip to content

Commit 587aa9e

Browse files
authored
Serve typed blocks from recent block cache to avoid repeated deserialization (#6400)
* graph, chain/ethereum: Move json_patch and json_block modules to graph crate Move EthereumJsonBlock and JSON patching utilities from chain/ethereum to graph/src/components/ethereum/ so they can be used by the store layer without circular dependencies. This prepares for typed block caching where CachedBlock::from_json() needs access to these utilities. * graph: Add CachedBlock enum and typed block cache * graph, chain/ethereum, store: Address review feedback - Return Arc<LightEthereumBlock> from into_light_block() to avoid deep clone - Move cache_block_to_block_ptr_ext into CacheBlock::to_extended_block_ptr()
1 parent 46e01a7 commit 587aa9e

14 files changed

Lines changed: 316 additions & 242 deletions

File tree

chain/ethereum/src/chain.rs

Lines changed: 15 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ use crate::codec::HeaderOnlyBlock;
4646
use crate::data_source::DataSourceTemplate;
4747
use crate::data_source::UnresolvedDataSourceTemplate;
4848
use crate::ingestor::PollingBlockIngestor;
49-
use crate::json_block::EthereumJsonBlock;
5049
use crate::network::EthereumNetworkAdapters;
5150
use crate::polling_block_stream::PollingBlockStream;
5251
use crate::runtime::runtime_adapter::eth_call_gas;
@@ -66,7 +65,6 @@ use graph::blockchain::block_stream::{
6665
BlockStream, BlockStreamBuilder, BlockStreamError, BlockStreamMapper, FirehoseCursor,
6766
TriggersAdapterWrapper,
6867
};
69-
7068
/// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320
7169
const CELO_CHAIN_IDS: [u64; 3] = [42220, 44787, 62320];
7270

@@ -1076,55 +1074,26 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
10761074
.ancestor_block(ptr.clone(), offset, root.clone())
10771075
.await?;
10781076

1079-
// First check if we have the ancestor in cache and can deserialize it.
1080-
// The cached JSON can be in one of three formats:
1081-
// 1. Full RPC format: {"block": {...}, "transaction_receipts": [...]}
1082-
// 2. Shallow/header-only: {"timestamp": "...", "data": null} - only timestamp, no block data
1083-
// 3. Legacy direct: block fields at root level {hash, number, transactions, ...}
1084-
// We need full format with receipts for ancestor_block (used for trigger processing).
1077+
// Use full blocks (with receipts) directly from cache.
1078+
// Light blocks (no receipts) need to be fetched from Firehose/RPC.
10851079
let block_ptr = match cached {
1086-
Some((json, ptr)) => {
1087-
let json_block = EthereumJsonBlock::new(json);
1088-
if json_block.is_shallow() {
1089-
trace!(
1090-
self.logger,
1091-
"Cached block #{} {} is shallow (header-only). Falling back to Firehose/RPC.",
1092-
ptr.number,
1093-
ptr.hash_hex(),
1094-
);
1095-
ptr
1096-
} else if json_block.is_legacy_format() {
1080+
Some((cached_block, ptr)) => match cached_block.into_full_block() {
1081+
Some(block) => {
1082+
return Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
1083+
ethereum_block: block,
1084+
calls: None,
1085+
})));
1086+
}
1087+
None => {
10971088
trace!(
10981089
self.logger,
1099-
"Cached block #{} {} is legacy light format. Falling back to Firehose/RPC.",
1090+
"Cached block #{} {} is light (no receipts). Falling back to Firehose/RPC.",
11001091
ptr.number,
11011092
ptr.hash_hex(),
11021093
);
11031094
ptr
1104-
} else {
1105-
match json_block.into_full_block() {
1106-
Ok(block) => {
1107-
return Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
1108-
ethereum_block: block,
1109-
calls: None,
1110-
})));
1111-
}
1112-
Err(e) => {
1113-
warn!(
1114-
self.logger,
1115-
"Failed to deserialize cached ancestor block #{} {} (offset {} from #{}): {}. \
1116-
Falling back to Firehose/RPC.",
1117-
ptr.number,
1118-
ptr.hash_hex(),
1119-
offset,
1120-
ptr_for_log.number,
1121-
e
1122-
);
1123-
ptr
1124-
}
1125-
}
11261095
}
1127-
}
1096+
},
11281097
None => {
11291098
// Cache miss - fall back to walking the chain via parent_ptr() calls.
11301099
// This provides resilience when the block cache is empty (e.g., after truncation).
@@ -1179,35 +1148,10 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
11791148
let block = match self.chain_client.as_ref() {
11801149
ChainClient::Firehose(endpoints) => {
11811150
let chain_store = self.chain_store.cheap_clone();
1182-
// First try to get the block from the store
1183-
// See ancestor_block() for documentation of the 3 cached JSON formats.
1151+
// First try to get the block from the store (typed cache)
11841152
if let Ok(blocks) = chain_store.blocks(vec![block.hash.clone()]).await {
1185-
if let Some(cached_json) = blocks.into_iter().next() {
1186-
let json_block = EthereumJsonBlock::new(cached_json);
1187-
if json_block.is_shallow() {
1188-
trace!(
1189-
self.logger,
1190-
"Cached block #{} {} is shallow. Falling back to Firehose.",
1191-
block.number,
1192-
block.hash_hex(),
1193-
);
1194-
} else {
1195-
match json_block.into_light_block() {
1196-
Ok(light_block) => {
1197-
return Ok(light_block.parent_ptr());
1198-
}
1199-
Err(e) => {
1200-
warn!(
1201-
self.logger,
1202-
"Failed to deserialize cached block #{} {}: {}. \
1203-
Falling back to Firehose.",
1204-
block.number,
1205-
block.hash_hex(),
1206-
e
1207-
);
1208-
}
1209-
}
1210-
}
1153+
if let Some(cached_block) = blocks.into_iter().next() {
1154+
return Ok(cached_block.light_block().parent_ptr());
12111155
}
12121156
}
12131157

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ use crate::adapter::EthereumRpcError;
6666
use crate::adapter::ProviderStatus;
6767
use crate::call_helper::interpret_eth_call_error;
6868
use crate::chain::BlockFinality;
69-
use crate::json_block::EthereumJsonBlock;
7069
use crate::trigger::{LogPosition, LogRef};
7170
use crate::Chain;
7271
use crate::NodeCapabilities;
@@ -1641,23 +1640,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
16411640
.map_err(|e| error!(&logger, "Error accessing block cache {}", e))
16421641
.unwrap_or_default()
16431642
.into_iter()
1644-
.filter_map(|value| {
1645-
let json_block = EthereumJsonBlock::new(value);
1646-
if json_block.is_shallow() {
1647-
return None;
1648-
}
1649-
json_block
1650-
.into_light_block()
1651-
.map_err(|e| {
1652-
warn!(
1653-
&logger,
1654-
"Failed to deserialize cached block: {}. Block will be re-fetched from RPC.",
1655-
e
1656-
);
1657-
})
1658-
.ok()
1659-
})
1660-
.map(Arc::new)
1643+
.map(CachedBlock::into_light_block)
16611644
.collect();
16621645

16631646
let missing_blocks = Vec::from_iter(

chain/ethereum/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ mod data_source;
77
mod env;
88
mod ethereum_adapter;
99
mod ingestor;
10-
mod json_block;
11-
mod json_patch;
1210
mod polling_block_stream;
1311
pub mod runtime;
1412
mod transport;

chain/ethereum/src/transport.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use crate::json_patch;
21
use alloy::transports::{TransportError, TransportErrorKind, TransportFut};
2+
use graph::components::ethereum::json_patch;
33
use graph::components::network_provider::ProviderName;
44
use graph::endpoint::{ConnectionType, EndpointMetrics, RequestLabels};
55
use graph::prelude::alloy::rpc::json_rpc::{RequestPacket, ResponsePacket};

graph/src/blockchain/mock.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::{
22
bail,
33
components::{
4+
ethereum::CachedBlock,
45
link_resolver::LinkResolver,
56
network_provider::ChainName,
67
store::{
@@ -527,15 +528,18 @@ impl ChainStore for MockChainStore {
527528
) -> Result<Option<B256>, Error> {
528529
unimplemented!()
529530
}
530-
async fn blocks(self: Arc<Self>, _hashes: Vec<BlockHash>) -> Result<Vec<Value>, Error> {
531+
async fn blocks(self: Arc<Self>, _hashes: Vec<BlockHash>) -> Result<Vec<CachedBlock>, Error> {
532+
unimplemented!()
533+
}
534+
async fn blocks_as_json(self: Arc<Self>, _hashes: Vec<BlockHash>) -> Result<Vec<Value>, Error> {
531535
unimplemented!()
532536
}
533537
async fn ancestor_block(
534538
self: Arc<Self>,
535539
_block_ptr: BlockPtr,
536540
_offset: BlockNumber,
537541
_root: Option<BlockHash>,
538-
) -> Result<Option<(Value, BlockPtr)>, Error> {
542+
) -> Result<Option<(CachedBlock, BlockPtr)>, Error> {
539543
unimplemented!()
540544
}
541545
async fn cleanup_cached_blocks(

chain/ethereum/src/json_block.rs renamed to graph/src/components/ethereum/json_block.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use graph::prelude::serde_json::{self as json, Value};
2-
use graph::prelude::{EthereumBlock, LightEthereumBlock};
1+
use serde_json::{self as json, Value};
32

4-
use crate::json_patch;
3+
use super::json_patch;
4+
use super::types::{CachedBlock, EthereumBlock, LightEthereumBlock};
55

66
#[derive(Debug)]
77
pub struct EthereumJsonBlock(Value);
@@ -49,4 +49,18 @@ impl EthereumJsonBlock {
4949
json_patch::patch_block_transactions(&mut inner);
5050
json::from_value(inner)
5151
}
52+
53+
/// Tries to deserialize into a `CachedBlock`. Uses `transaction_receipts`
54+
/// presence to decide between full and light block, avoiding a JSON clone.
55+
pub fn try_into_cached_block(self) -> Option<CachedBlock> {
56+
let has_receipts = self
57+
.0
58+
.get("transaction_receipts")
59+
.is_some_and(|v| !v.is_null());
60+
if has_receipts {
61+
self.into_full_block().ok().map(CachedBlock::Full)
62+
} else {
63+
self.into_light_block().ok().map(CachedBlock::Light)
64+
}
65+
}
5266
}

chain/ethereum/src/json_patch.rs renamed to graph/src/components/ethereum/json_patch.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
//!
88
//! Also used by `PatchingHttp` for chains that don't support EIP-2718 typed transactions.
99
10-
use graph::prelude::serde_json::Value;
10+
use serde_json::Value;
1111

12-
pub(crate) fn patch_type_field(obj: &mut Value) -> bool {
12+
pub fn patch_type_field(obj: &mut Value) -> bool {
1313
if let Value::Object(map) = obj {
1414
if !map.contains_key("type") {
1515
map.insert("type".to_string(), Value::String("0x0".to_string()));
@@ -19,7 +19,7 @@ pub(crate) fn patch_type_field(obj: &mut Value) -> bool {
1919
false
2020
}
2121

22-
pub(crate) fn patch_block_transactions(block: &mut Value) -> bool {
22+
pub fn patch_block_transactions(block: &mut Value) -> bool {
2323
let Some(txs) = block.get_mut("transactions").and_then(|t| t.as_array_mut()) else {
2424
return false;
2525
};
@@ -30,7 +30,7 @@ pub(crate) fn patch_block_transactions(block: &mut Value) -> bool {
3030
patched
3131
}
3232

33-
pub(crate) fn patch_receipts(result: &mut Value) -> bool {
33+
pub fn patch_receipts(result: &mut Value) -> bool {
3434
match result {
3535
Value::Object(_) => patch_type_field(result),
3636
Value::Array(arr) => {
@@ -47,7 +47,7 @@ pub(crate) fn patch_receipts(result: &mut Value) -> bool {
4747
#[cfg(test)]
4848
mod tests {
4949
use super::*;
50-
use graph::prelude::serde_json::json;
50+
use serde_json::json;
5151

5252
#[test]
5353
fn patch_type_field_adds_missing_type() {

graph/src/components/ethereum/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
pub mod json_block;
2+
pub mod json_patch;
13
mod network;
24
mod types;
35

6+
pub use self::json_block::EthereumJsonBlock;
47
pub use self::network::AnyNetworkBare;
58
pub use self::types::{
6-
AnyBlock, AnyTransaction, AnyTransactionReceiptBare, EthereumBlock, EthereumBlockWithCalls,
7-
EthereumCall, LightEthereumBlock, LightEthereumBlockExt,
9+
AnyBlock, AnyTransaction, AnyTransactionReceiptBare, CachedBlock, EthereumBlock,
10+
EthereumBlockWithCalls, EthereumCall, LightEthereumBlock, LightEthereumBlockExt,
811
};
912

1013
// Re-export Alloy network types for convenience

graph/src/components/ethereum/types.rs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@ use crate::{
1717
prelude::BlockNumber,
1818
};
1919

20+
use super::json_block::EthereumJsonBlock;
21+
2022
pub type AnyTransaction = Transaction<AnyTxEnvelope>;
2123
pub type AnyBlock = Block<AnyTransaction, Header<AnyHeader>>;
2224
/// Like alloy's `AnyTransactionReceipt` but without the `WithOtherFields` wrapper,
2325
/// avoiding `#[serde(flatten)]` overhead during deserialization.
2426
pub type AnyTransactionReceiptBare = TransactionReceipt<AnyReceiptEnvelope<Log>>;
2527

2628
#[allow(dead_code)]
27-
#[derive(Debug, Deserialize, Serialize)]
29+
#[derive(Clone, Debug, Deserialize, Serialize)]
2830
pub struct LightEthereumBlock(AnyBlock);
2931

3032
impl Default for LightEthereumBlock {
@@ -259,3 +261,55 @@ impl<'a> From<&'a EthereumCall> for BlockPtr {
259261
BlockPtr::from((call.block_hash, call.block_number))
260262
}
261263
}
264+
265+
/// Typed cached block for Ethereum. Stores the deserialized block so that
266+
/// repeated reads from the in-memory cache avoid `serde_json::from_value()`.
267+
#[derive(Clone, Debug)]
268+
#[allow(clippy::large_enum_variant)]
269+
pub enum CachedBlock {
270+
Full(EthereumBlock),
271+
Light(LightEthereumBlock),
272+
}
273+
274+
impl CachedBlock {
275+
pub fn light_block(&self) -> &LightEthereumBlock {
276+
match self {
277+
CachedBlock::Full(block) => &block.block,
278+
CachedBlock::Light(block) => block,
279+
}
280+
}
281+
282+
pub fn into_light_block(self) -> Arc<LightEthereumBlock> {
283+
match self {
284+
CachedBlock::Full(block) => block.block,
285+
CachedBlock::Light(block) => Arc::new(block),
286+
}
287+
}
288+
289+
pub fn into_full_block(self) -> Option<EthereumBlock> {
290+
match self {
291+
CachedBlock::Full(block) => Some(block),
292+
CachedBlock::Light(_) => None,
293+
}
294+
}
295+
296+
pub fn from_json(value: serde_json::Value) -> Option<Self> {
297+
let json_block = EthereumJsonBlock::new(value);
298+
if json_block.is_shallow() {
299+
return None;
300+
}
301+
json_block.try_into_cached_block()
302+
}
303+
304+
pub fn timestamp(&self) -> Option<u64> {
305+
Some(self.light_block().timestamp_u64())
306+
}
307+
308+
pub fn parent_ptr(&self) -> Option<BlockPtr> {
309+
self.light_block().parent_ptr()
310+
}
311+
312+
pub fn ptr(&self) -> BlockPtr {
313+
self.light_block().block_ptr()
314+
}
315+
}

graph/src/components/store/traits.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use async_trait::async_trait;
77
use super::*;
88
use crate::blockchain::block_stream::{EntitySourceOperation, FirehoseCursor};
99
use crate::blockchain::{BlockTime, ChainIdentifier, ExtendedBlockPtr};
10+
use crate::components::ethereum::CachedBlock;
1011
use crate::components::metrics::stopwatch::StopwatchMetrics;
1112
use crate::components::network_provider::ChainName;
1213
use crate::components::server::index_node::VersionInfo;
@@ -553,8 +554,12 @@ pub trait ChainStore: ChainHeadStore {
553554
ancestor_count: BlockNumber,
554555
) -> Result<Option<B256>, Error>;
555556

556-
/// Returns the blocks present in the store.
557-
async fn blocks(
557+
/// Returns the blocks present in the store as typed cached blocks.
558+
async fn blocks(self: Arc<Self>, hashes: Vec<BlockHash>) -> Result<Vec<CachedBlock>, Error>;
559+
560+
/// Returns blocks as raw JSON. Used by callers that need the original
561+
/// JSON representation (e.g., GraphQL block queries, CLI tools).
562+
async fn blocks_as_json(
558563
self: Arc<Self>,
559564
hashes: Vec<BlockHash>,
560565
) -> Result<Vec<serde_json::Value>, Error>;
@@ -584,7 +589,7 @@ pub trait ChainStore: ChainHeadStore {
584589
block_ptr: BlockPtr,
585590
offset: BlockNumber,
586591
root: Option<BlockHash>,
587-
) -> Result<Option<(serde_json::Value, BlockPtr)>, Error>;
592+
) -> Result<Option<(CachedBlock, BlockPtr)>, Error>;
588593

589594
/// Remove old blocks from the cache we maintain in the database and
590595
/// return a pair containing the number of the oldest block retained

0 commit comments

Comments
 (0)