Skip to content

Commit c78190d

Browse files
fubhylutter
authored andcommitted
node, chain: Add extensible compression support for RPC requests
- Replace boolean compression_enabled with Compression enum (None, Gzip) - Support per-provider compression configuration via "compression" field - Add placeholders for future compression methods (Brotli, Deflate) - Update transport layer to handle compression enum with match statement - Add comprehensive unit tests for compression configuration parsing - Update example configuration and documentation Configuration examples: compression = "gzip" # Enable gzip compression compression = "none" # Disable compression (default) Addresses issue #5671 with future-extensible design.
1 parent 1cf7f84 commit c78190d

8 files changed

Lines changed: 118 additions & 7 deletions

File tree

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chain/ethereum/src/network.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ mod tests {
314314
use graph::components::network_provider::ProviderManager;
315315
use graph::components::network_provider::ProviderName;
316316
use graph::data::value::Word;
317+
use graph::endpoint::Compression;
317318
use graph::http::HeaderMap;
318319
use graph::{
319320
endpoint::EndpointMetrics,
@@ -395,6 +396,7 @@ mod tests {
395396
metrics.clone(),
396397
"",
397398
false,
399+
Compression::None,
398400
);
399401
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));
400402

@@ -499,6 +501,7 @@ mod tests {
499501
metrics.clone(),
500502
"",
501503
false,
504+
Compression::None,
502505
);
503506
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));
504507

@@ -571,6 +574,7 @@ mod tests {
571574
metrics.clone(),
572575
"",
573576
false,
577+
Compression::None,
574578
);
575579
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));
576580

@@ -636,6 +640,7 @@ mod tests {
636640
metrics.clone(),
637641
"",
638642
false,
643+
Compression::None,
639644
);
640645
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));
641646

@@ -924,6 +929,7 @@ mod tests {
924929
endpoint_metrics.clone(),
925930
"",
926931
false,
932+
Compression::None,
927933
);
928934

929935
Arc::new(

chain/ethereum/src/transport.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use alloy::transports::{TransportError, TransportErrorKind, TransportFut};
22
use graph::components::network_provider::ProviderName;
3-
use graph::endpoint::{ConnectionType, EndpointMetrics, RequestLabels};
3+
use graph::endpoint::{Compression, ConnectionType, EndpointMetrics, RequestLabels};
44
use graph::prelude::alloy::rpc::json_rpc::{RequestPacket, ResponsePacket};
55
use graph::prelude::alloy::transports::{ipc::IpcConnect, ws::WsConnect};
66
use graph::prelude::*;
@@ -49,11 +49,15 @@ impl Transport {
4949
metrics: Arc<EndpointMetrics>,
5050
provider: impl AsRef<str>,
5151
no_eip2718: bool,
52+
compression: Compression,
5253
) -> Self {
53-
let client = reqwest::Client::builder()
54-
.default_headers(headers)
55-
.build()
56-
.expect("Failed to build HTTP client");
54+
let mut client_builder = reqwest::Client::builder().default_headers(headers);
55+
56+
if matches!(compression, Compression::Gzip) {
57+
client_builder = client_builder.gzip(true);
58+
}
59+
60+
let client = client_builder.build().expect("Failed to build HTTP client");
5761

5862
let patching_transport = PatchingHttp::new(client, rpc, no_eip2718);
5963
let metrics_transport =

graph/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ diesel_derives = { workspace = true }
2626
chrono = "0.4.43"
2727
envconfig = { workspace = true }
2828
Inflector = "0.11.3"
29-
reqwest = { version = "0.12.23", features = ["json", "stream", "multipart"] }
29+
reqwest = { version = "0.12.23", features = ["json", "stream", "multipart", "gzip"] }
3030
ethabi = "17.2"
3131
hex = "0.4.3"
3232
http0 = { version = "0", package = "http" }

graph/src/endpoint.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
};
88

99
use prometheus::IntCounterVec;
10+
use serde::{Deserialize, Serialize};
1011
use slog::{warn, Logger};
1112

1213
use crate::components::network_provider::ProviderName;
@@ -17,6 +18,16 @@ use crate::{components::metrics::MetricsRegistry, data::value::Word};
1718
/// avoid locking since we don't need to modify the entire struture.
1819
type ProviderCount = Arc<HashMap<ProviderName, AtomicU64>>;
1920

21+
/// Compression methods for RPC transports
22+
#[derive(Copy, Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
23+
pub enum Compression {
24+
#[default]
25+
#[serde(rename = "none")]
26+
None,
27+
#[serde(rename = "gzip")]
28+
Gzip,
29+
}
30+
2031
/// This struct represents all the current labels except for the result
2132
/// which is added separately. If any new labels are necessary they should
2233
/// remain in the same order as added in [`EndpointMetrics::new`]

node/resources/tests/full_config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ shard = "primary"
4848
provider = [
4949
{ label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] },
5050
{ label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }},
51+
{ label = "mainnet-2", details = { type = "web3", url = "http://rpc.mainnet.io", features = ["archive"], compression = "gzip" }},
5152
{ label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }},
5253
]
5354

node/src/chain.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ pub async fn create_ethereum_networks_for_chain(
210210
logger,
211211
"Creating transport";
212212
"url" => &web3.url,
213-
"capabilities" => capabilities
213+
"capabilities" => capabilities,
214+
"compression" => ?web3.compression
214215
);
215216

216217
use crate::config::Transport::*;
@@ -223,6 +224,7 @@ pub async fn create_ethereum_networks_for_chain(
223224
endpoint_metrics.cheap_clone(),
224225
&provider.label,
225226
no_eip2718,
227+
web3.compression,
226228
),
227229
Ipc => Transport::new_ipc(&web3.url).await,
228230
Ws => Transport::new_ws(&web3.url).await,

node/src/config.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use graph::{
22
anyhow::Error,
33
blockchain::BlockchainKind,
44
components::network_provider::ChainName,
5+
endpoint::Compression,
56
env::ENV_VARS,
67
firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN},
78
itertools::Itertools,
@@ -516,6 +517,7 @@ impl ChainSection {
516517
features,
517518
headers: Default::default(),
518519
rules: vec![],
520+
compression: Compression::None,
519521
}),
520522
};
521523
let entry = chains.entry(name.to_string()).or_insert_with(|| Chain {
@@ -694,6 +696,10 @@ pub struct Web3Provider {
694696

695697
#[serde(default, rename = "match")]
696698
rules: Vec<Web3Rule>,
699+
700+
/// Compression method for RPC requests and responses
701+
#[serde(default)]
702+
pub compression: Compression,
697703
}
698704

699705
impl Web3Provider {
@@ -891,6 +897,7 @@ impl<'de> Deserialize<'de> for Provider {
891897
.ok_or_else(|| serde::de::Error::missing_field("features"))?,
892898
headers: headers.unwrap_or_else(HeaderMap::new),
893899
rules: nodes,
900+
compression: Compression::None,
894901
}),
895902
};
896903

@@ -1203,6 +1210,7 @@ mod tests {
12031210
Chain, Config, FirehoseProvider, Provider, ProviderDetails, Shard, Transport, Web3Provider,
12041211
};
12051212
use graph::blockchain::BlockchainKind;
1213+
use graph::endpoint::Compression;
12061214
use graph::firehose::SubgraphLimit;
12071215
use graph::http::{HeaderMap, HeaderValue};
12081216
use graph::prelude::regex::Regex;
@@ -1291,6 +1299,7 @@ mod tests {
12911299
features: BTreeSet::new(),
12921300
headers: HeaderMap::new(),
12931301
rules: Vec::new(),
1302+
compression: Compression::None,
12941303
}),
12951304
},
12961305
actual
@@ -1317,6 +1326,7 @@ mod tests {
13171326
features: BTreeSet::new(),
13181327
headers: HeaderMap::new(),
13191328
rules: Vec::new(),
1329+
compression: Compression::None,
13201330
}),
13211331
},
13221332
actual
@@ -1378,6 +1388,7 @@ mod tests {
13781388
features,
13791389
headers,
13801390
rules: Vec::new(),
1391+
compression: Compression::None,
13811392
}),
13821393
},
13831394
actual
@@ -1403,6 +1414,7 @@ mod tests {
14031414
features: BTreeSet::new(),
14041415
headers: HeaderMap::new(),
14051416
rules: Vec::new(),
1417+
compression: Compression::None,
14061418
}),
14071419
},
14081420
actual
@@ -1613,6 +1625,7 @@ mod tests {
16131625
features: BTreeSet::new(),
16141626
headers: HeaderMap::new(),
16151627
rules: Vec::new(),
1628+
compression: Compression::None,
16161629
}),
16171630
},
16181631
actual
@@ -1625,6 +1638,66 @@ mod tests {
16251638
assert!(SubgraphLimit::Limit(10) > SubgraphLimit::Disabled);
16261639
}
16271640

1641+
#[test]
1642+
fn it_parses_web3_provider_with_compression() {
1643+
let actual = toml::from_str(
1644+
r#"
1645+
label = "compressed"
1646+
details = { type = "web3", url = "http://localhost:8545", features = ["archive"], compression = "gzip" }
1647+
"#,
1648+
)
1649+
.unwrap();
1650+
1651+
assert_eq!(
1652+
Provider {
1653+
label: "compressed".to_owned(),
1654+
details: ProviderDetails::Web3(Web3Provider {
1655+
transport: Transport::Rpc,
1656+
url: "http://localhost:8545".to_owned(),
1657+
features: {
1658+
let mut features = BTreeSet::new();
1659+
features.insert("archive".to_string());
1660+
features
1661+
},
1662+
headers: HeaderMap::new(),
1663+
rules: Vec::new(),
1664+
compression: Compression::Gzip,
1665+
}),
1666+
},
1667+
actual
1668+
);
1669+
}
1670+
1671+
#[test]
1672+
fn it_parses_web3_provider_with_no_compression() {
1673+
let actual = toml::from_str(
1674+
r#"
1675+
label = "uncompressed"
1676+
details = { type = "web3", url = "http://localhost:8545", features = ["archive"], compression = "none" }
1677+
"#,
1678+
)
1679+
.unwrap();
1680+
1681+
assert_eq!(
1682+
Provider {
1683+
label: "uncompressed".to_owned(),
1684+
details: ProviderDetails::Web3(Web3Provider {
1685+
transport: Transport::Rpc,
1686+
url: "http://localhost:8545".to_owned(),
1687+
features: {
1688+
let mut features = BTreeSet::new();
1689+
features.insert("archive".to_string());
1690+
features
1691+
},
1692+
headers: HeaderMap::new(),
1693+
rules: Vec::new(),
1694+
compression: Compression::None,
1695+
}),
1696+
},
1697+
actual
1698+
);
1699+
}
1700+
16281701
#[test]
16291702
fn duplicated_labels_are_not_allowed_within_chain() {
16301703
let mut actual = toml::from_str::<ChainSection>(

0 commit comments

Comments
 (0)