Skip to content

Commit 25e66d2

Browse files
authored
allow configuring socket options for raw protosocket servers (#12)
1 parent f2d9c81 commit 25e66d2

6 files changed

Lines changed: 150 additions & 35 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

example-telnet/src/main.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,22 @@ use std::{
77
use protosocket::{
88
ConnectionBindings, DeserializeError, Deserializer, MessageReactor, ReactorStatus, Serializer,
99
};
10-
use protosocket_server::{ProtosocketServer, ServerConnector};
10+
use protosocket_server::{ProtosocketServerConfig, ServerConnector};
1111

1212
#[allow(clippy::expect_used)]
1313
#[tokio::main]
1414
async fn main() -> Result<(), Box<dyn std::error::Error>> {
1515
env_logger::init();
1616

1717
let server_context = ServerContext::default();
18-
let server = ProtosocketServer::new(
19-
"127.0.0.1:9000".parse()?,
20-
tokio::runtime::Handle::current(),
21-
server_context,
22-
)
23-
.await?;
18+
let config = ProtosocketServerConfig::default();
19+
let server = config
20+
.bind_tcp(
21+
"127.0.0.1:9000".parse()?,
22+
server_context,
23+
tokio::runtime::Handle::current(),
24+
)
25+
.await?;
2426

2527
tokio::spawn(server).await??;
2628
Ok(())

protosocket-server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,6 @@ protosocket = { workspace = true }
1616
bytes = { workspace = true }
1717
futures = { workspace = true }
1818
log = { workspace = true }
19+
socket2 = { workspace = true, features = ["all"] }
1920
thiserror = { workspace = true }
2021
tokio = { workspace = true }

protosocket-server/src/connection_server.rs

Lines changed: 131 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1+
use protosocket::Connection;
2+
use protosocket::ConnectionBindings;
3+
use protosocket::Serializer;
4+
use socket2::TcpKeepalive;
5+
use std::ffi::c_int;
16
use std::future::Future;
27
use std::io::Error;
38
use std::net::SocketAddr;
49
use std::pin::Pin;
5-
use std::sync::Arc;
610
use std::task::Context;
711
use std::task::Poll;
8-
9-
use protosocket::Connection;
10-
use protosocket::ConnectionBindings;
11-
use protosocket::Serializer;
1212
use tokio::sync::mpsc;
1313

1414
pub trait ServerConnector: Unpin {
@@ -47,6 +47,8 @@ pub trait ServerConnector: Unpin {
4747
/// connection - you decide what those mean for you!
4848
///
4949
/// A ProtosocketServer is a future: You spawn it and it runs forever.
50+
///
51+
/// Construct a new ProtosocketServer by creating a ProtosocketServerConfig and calling the {{bind_tcp}} method.
5052
pub struct ProtosocketServer<Connector: ServerConnector> {
5153
connector: Connector,
5254
listener: tokio::net::TcpListener,
@@ -56,42 +58,143 @@ pub struct ProtosocketServer<Connector: ServerConnector> {
5658
runtime: tokio::runtime::Handle,
5759
}
5860

61+
/// Socket configuration options for a ProtosocketServer.
62+
pub struct ProtosocketSocketConfig {
63+
nodelay: bool,
64+
reuse: bool,
65+
keepalive_duration: Option<std::time::Duration>,
66+
listen_backlog: u32,
67+
}
68+
69+
impl ProtosocketSocketConfig {
70+
/// Whether nodelay should be set on the socket.
71+
pub fn nodelay(mut self, nodelay: bool) -> Self {
72+
self.nodelay = nodelay;
73+
self
74+
}
75+
/// Whether reuseaddr and reuseport should be set on the socket.
76+
pub fn reuse(mut self, reuse: bool) -> Self {
77+
self.reuse = reuse;
78+
self
79+
}
80+
/// The keepalive window to be set on the socket.
81+
pub fn keepalive_duration(mut self, keepalive_duration: std::time::Duration) -> Self {
82+
self.keepalive_duration = Some(keepalive_duration);
83+
self
84+
}
85+
/// The backlog to be set on the socket when invoking `listen`.
86+
pub fn listen_backlog(mut self, backlog: u32) -> Self {
87+
self.listen_backlog = backlog;
88+
self
89+
}
90+
}
91+
92+
impl Default for ProtosocketSocketConfig {
93+
fn default() -> Self {
94+
Self {
95+
nodelay: true,
96+
reuse: true,
97+
keepalive_duration: None,
98+
listen_backlog: 65536,
99+
}
100+
}
101+
}
102+
103+
pub struct ProtosocketServerConfig {
104+
max_buffer_length: usize,
105+
max_queued_outbound_messages: usize,
106+
buffer_allocation_increment: usize,
107+
socket_config: ProtosocketSocketConfig,
108+
}
109+
110+
impl ProtosocketServerConfig {
111+
/// The maximum buffer length per connection on this server.
112+
pub fn max_buffer_length(mut self, max_buffer_length: usize) -> Self {
113+
self.max_buffer_length = max_buffer_length;
114+
self
115+
}
116+
/// The maximum number of queued outbound messages per connection on this server.
117+
pub fn max_queued_outbound_messages(mut self, max_queued_outbound_messages: usize) -> Self {
118+
self.max_queued_outbound_messages = max_queued_outbound_messages;
119+
self
120+
}
121+
/// The step size for allocating additional memory for connection buffers on this server.
122+
pub fn buffer_allocation_increment(mut self, buffer_allocation_increment: usize) -> Self {
123+
self.buffer_allocation_increment = buffer_allocation_increment;
124+
self
125+
}
126+
/// The tcp socket configuration options for this server.
127+
pub fn socket_config(mut self, config: ProtosocketSocketConfig) -> Self {
128+
self.socket_config = config;
129+
self
130+
}
131+
132+
/// Binds a tcp listener to the given address and returns a ProtosocketServer with this configuration.
133+
/// After binding, you must await the returned server future to process requests.
134+
pub async fn bind_tcp<Connector: ServerConnector>(
135+
self,
136+
address: SocketAddr,
137+
connector: Connector,
138+
runtime: tokio::runtime::Handle,
139+
) -> crate::Result<ProtosocketServer<Connector>> {
140+
ProtosocketServer::new(address, runtime, connector, self).await
141+
}
142+
}
143+
144+
impl Default for ProtosocketServerConfig {
145+
fn default() -> Self {
146+
Self {
147+
max_buffer_length: 16 * (2 << 20),
148+
max_queued_outbound_messages: 128,
149+
buffer_allocation_increment: 1 << 20,
150+
socket_config: Default::default(),
151+
}
152+
}
153+
}
154+
59155
impl<Connector: ServerConnector> ProtosocketServer<Connector> {
60156
/// Construct a new `ProtosocketServer` listening on the provided address.
61157
/// The address will be bound and listened upon with `SO_REUSEADDR` set.
62158
/// The server will use the provided runtime to spawn new tcp connections as `protosocket::Connection`s.
63-
pub async fn new(
64-
address: std::net::SocketAddr,
159+
async fn new(
160+
address: SocketAddr,
65161
runtime: tokio::runtime::Handle,
66162
connector: Connector,
163+
config: ProtosocketServerConfig,
67164
) -> crate::Result<Self> {
68-
let listener = tokio::net::TcpListener::bind(address)
69-
.await
70-
.map_err(Arc::new)?;
165+
let socket = socket2::Socket::new(
166+
match address {
167+
SocketAddr::V4(_) => socket2::Domain::IPV4,
168+
SocketAddr::V6(_) => socket2::Domain::IPV6,
169+
},
170+
socket2::Type::STREAM,
171+
None,
172+
)?;
173+
174+
let mut tcp_keepalive = TcpKeepalive::new();
175+
if let Some(duration) = config.socket_config.keepalive_duration {
176+
tcp_keepalive = tcp_keepalive.with_time(duration);
177+
}
178+
179+
socket.set_nonblocking(true)?;
180+
socket.set_tcp_nodelay(config.socket_config.nodelay)?;
181+
socket.set_tcp_keepalive(&tcp_keepalive)?;
182+
socket.set_reuse_port(config.socket_config.reuse)?;
183+
socket.set_reuse_address(config.socket_config.reuse)?;
184+
185+
socket.bind(&address.into())?;
186+
socket.listen(config.socket_config.listen_backlog as c_int)?;
187+
188+
let listener = tokio::net::TcpListener::from_std(socket.into())?;
71189
Ok(Self {
72190
connector,
73191
listener,
74-
max_buffer_length: 16 * (2 << 20),
75-
max_queued_outbound_messages: 128,
76-
buffer_allocation_increment: 1 << 20,
192+
max_buffer_length: config.max_buffer_length,
193+
max_queued_outbound_messages: config.max_queued_outbound_messages,
194+
buffer_allocation_increment: config.buffer_allocation_increment,
77195
runtime,
78196
})
79197
}
80-
81-
/// Set the maximum buffer length for connections created by this server after the setting is applied.
82-
pub fn set_max_buffer_length(&mut self, max_buffer_length: usize) {
83-
self.max_buffer_length = max_buffer_length;
84-
}
85-
86-
/// Set the maximum queued outbound messages for connections created by this server after the setting is applied.
87-
pub fn set_max_queued_outbound_messages(&mut self, max_queued_outbound_messages: usize) {
88-
self.max_queued_outbound_messages = max_queued_outbound_messages;
89-
}
90-
91-
/// Set the step size for allocating additional memory for connection buffers created by this server after the setting is applied.
92-
pub fn set_buffer_allocation_increment(&mut self, buffer_allocation_increment: usize) {
93-
self.buffer_allocation_increment = buffer_allocation_increment;
94-
}
95198
}
96199

97200
impl<Connector: ServerConnector> Future for ProtosocketServer<Connector> {

protosocket-server/src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,9 @@ pub enum Error {
1313
#[error("Requested resource was dead: ({0})")]
1414
Dead(&'static str),
1515
}
16+
17+
impl From<std::io::Error> for Error {
18+
fn from(e: std::io::Error) -> Self {
19+
Self::IoFailure(Arc::new(e))
20+
}
21+
}

protosocket-server/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ pub(crate) mod connection_server;
77
pub(crate) mod error;
88

99
pub use connection_server::ProtosocketServer;
10+
pub use connection_server::ProtosocketServerConfig;
11+
pub use connection_server::ProtosocketSocketConfig;
1012
pub use connection_server::ServerConnector;
1113
pub use error::Error;
1214
pub use error::Result;

0 commit comments

Comments
 (0)