Skip to content

Commit 0c8d5cc

Browse files
apollo_infra: add tcp keepalive to remote client
Set SO_KEEPALIVE on outbound sockets via a configurable idle time (tcp_keepalive_idle_time_ms). This lets the OS detect dead peers that disappear without sending FIN/RST. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent b43fff4 commit 0c8d5cc

4 files changed

Lines changed: 74 additions & 1 deletion

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/apollo_infra/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ rstest.workspace = true
3030
serde = { workspace = true, features = ["derive"] }
3131
serde_json.workspace = true
3232
starknet_api.workspace = true
33+
static_assertions.workspace = true
3334
thiserror.workspace = true
3435
time = { workspace = true, features = ["macros"] }
3536
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
@@ -38,6 +39,7 @@ tracing.workspace = true
3839
tracing-subscriber = { workspace = true, features = ["env-filter", "json", "time"] }
3940
validator.workspace = true
4041

42+
4143
[dev-dependencies]
4244
apollo_infra_utils = { workspace = true, features = ["testing"] }
4345
apollo_metrics = { workspace = true, features = ["testing"] }
@@ -47,5 +49,6 @@ metrics.workspace = true
4749
metrics-exporter-prometheus.workspace = true
4850
once_cell.workspace = true
4951
pretty_assertions.workspace = true
52+
socket2.workspace = true
5053
starknet-types-core.workspace = true
5154
strum = { workspace = true, features = ["derive"] }

crates/apollo_infra/src/component_client/remote_component_client.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use hyper_util::client::legacy::Client;
1717
use hyper_util::rt::TokioExecutor;
1818
use serde::de::DeserializeOwned;
1919
use serde::{Deserialize, Serialize};
20+
use static_assertions::const_assert;
2021
use tokio::sync::Mutex;
2122
use tokio::time::Instant;
2223
use tracing::field::{display, Empty};
@@ -39,6 +40,11 @@ pub const DEFAULT_RETRIES: usize = 15;
3940
pub const REQUEST_TIMEOUT_ERROR_MESSAGE: &str = "request timed out";
4041

4142
const DEFAULT_IDLE_CONNECTIONS: usize = 10;
43+
const TCP_IDLE_TIMEOUT_FACTOR: f64 = 1.5;
44+
// Ensure tcp connection timeout is greater than http2 connection timeout by requiring a factor
45+
// greater than 1.
46+
const_assert!(TCP_IDLE_TIMEOUT_FACTOR > 1.0);
47+
4248
// 8 MiB — bounds memory materialized from a single response as defense in depth.
4349
const DEFAULT_MAX_RESPONSE_BODY_BYTES: usize = 8 * 1024 * 1024;
4450
const DEFAULT_IDLE_TIMEOUT_MS: u64 = 30000;
@@ -54,6 +60,8 @@ const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30_000;
5460
pub struct RemoteClientConfig {
5561
pub retries: usize,
5662
pub idle_connections: usize,
63+
// Determines client connection timeouts. Used plainly for HTTP/2 connections, and with a
64+
// `TCP_IDLE_TIMEOUT_FACTOR` for TCP connections.
5765
pub idle_timeout_ms: u64,
5866
pub attempts_per_log: usize,
5967
pub initial_retry_delay_ms: u64,
@@ -184,13 +192,20 @@ where
184192
metrics: &'static RemoteClientMetrics,
185193
) -> Self {
186194
let uri = format!("http://{url}:{port}/").parse().unwrap();
195+
196+
let idle_timeout = Duration::from_millis(config.idle_timeout_ms);
197+
198+
// Create the tcp connector.
187199
let mut connector = HttpConnector::new();
188200
connector.set_nodelay(config.set_tcp_nodelay);
189201
connector.set_connect_timeout(Some(Duration::from_millis(config.connection_timeout_ms)));
202+
connector.set_keepalive(Some(idle_timeout.mul_f64(TCP_IDLE_TIMEOUT_FACTOR)));
203+
204+
// Create the HTTP/2 client.
190205
let client = Client::builder(TokioExecutor::new())
191206
.http2_only(true)
192207
.pool_max_idle_per_host(config.idle_connections)
193-
.pool_idle_timeout(Duration::from_millis(config.idle_timeout_ms))
208+
.pool_idle_timeout(idle_timeout)
194209
.build(connector);
195210

196211
debug!("RemoteComponentClient created with URI: {uri:?}");

crates/apollo_infra/src/tests/remote_component_client_server_test.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::convert::Infallible;
22
use std::fmt::Debug;
33
use std::future::ready;
44
use std::net::{Ipv4Addr, SocketAddr};
5+
use std::os::unix::io::BorrowedFd;
56
use std::sync::Arc;
67
use std::time::Duration;
78

@@ -23,6 +24,7 @@ use metrics_exporter_prometheus::PrometheusBuilder;
2324
use rstest::rstest;
2425
use serde::de::DeserializeOwned;
2526
use serde::Serialize;
27+
use socket2::SockRef;
2628
use starknet_types_core::felt::Felt;
2729
use tokio::io::{AsyncReadExt, AsyncWriteExt};
2830
use tokio::net::{TcpListener, TcpStream};
@@ -734,3 +736,54 @@ async fn zombie_connection_is_evicted() {
734736
connection is still open"
735737
);
736738
}
739+
740+
/// Returns whether the outbound socket in this process that is connected to `server_addr`
741+
/// has `SO_KEEPALIVE` enabled. Returns `false` if no such socket is found.
742+
fn client_socket_has_keepalive(server_addr: SocketAddr) -> bool {
743+
for fd in 0_i32..4096 {
744+
// SAFETY: We only borrow the fd transiently to read socket options; `SockRef` does
745+
// not take ownership of or close the fd. Invalid fds produce errors from `peer_addr`
746+
// and `keepalive`, which we handle gracefully via `.ok()` / `.unwrap_or`.
747+
let borrowed = unsafe { BorrowedFd::borrow_raw(fd) };
748+
let sock = SockRef::from(&borrowed);
749+
if sock
750+
.peer_addr()
751+
.ok()
752+
.and_then(|a: socket2::SockAddr| a.as_socket())
753+
.is_some_and(|a| a == server_addr)
754+
{
755+
return sock.keepalive().unwrap_or(false);
756+
}
757+
}
758+
false
759+
}
760+
761+
/// Verifies that `SO_KEEPALIVE` on the client's outbound socket is set.
762+
#[rstest]
763+
#[tokio::test]
764+
async fn tcp_keepalive_socket_is_set() {
765+
const ARBITRARY_IDLE_TIMEOUT_MS: u64 = 100;
766+
767+
let mut ports = available_ports_factory(unique_u16!());
768+
let a_socket = ports.get_next_local_host_socket();
769+
let b_socket = ports.get_next_local_host_socket();
770+
771+
// Hyper connects lazily on first request, so after setup no socket is connected to
772+
// a_socket yet — our test client's connection will be the only one to check.
773+
setup_for_tests(VALID_VALUE_A, a_socket, b_socket, MAX_CONCURRENCY, None).await;
774+
775+
let client = ComponentAClient::new(
776+
RemoteClientConfig { idle_timeout_ms: ARBITRARY_IDLE_TIMEOUT_MS, ..Default::default() },
777+
&a_socket.ip().to_string(),
778+
a_socket.port(),
779+
&TEST_REMOTE_CLIENT_METRICS,
780+
);
781+
782+
// Establish the connection and leave it in Hyper's pool.
783+
client.a_get_value().await.expect("request should succeed");
784+
785+
assert!(
786+
client_socket_has_keepalive(a_socket),
787+
"SO_KEEPALIVE on the outbound socket should be set."
788+
);
789+
}

0 commit comments

Comments
 (0)