Skip to content

Commit a8caaef

Browse files
committed
fix: Per message setting of response time limit
Add a new associated const to the Msg trait that defines how long to wait for this message when receiving it as a response. To achieve this, make the timeout setting for stream writes and reads a function argument and pass the message specific time limit (default: 2s). For GetQuotaInfoResp, set this to 40m. This should fix running into timeouts while fetching quota if the storage server needs a long time to process the request.
1 parent a497c8d commit a8caaef

7 files changed

Lines changed: 52 additions & 30 deletions

File tree

shared/src/bee_msg.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use anyhow::{Context, Result, anyhow};
66
use bee_serde_derive::BeeSerde;
77
use std::any::Any;
88
use std::collections::{HashMap, HashSet};
9+
use std::time::Duration;
910

1011
pub mod buddy_group;
1112
pub mod misc;
@@ -26,6 +27,8 @@ pub trait BaseMsg: Any + std::fmt::Debug + Send + Sync + 'static {}
2627
pub trait Msg: BaseMsg + Default + Clone {
2728
/// Message type as defined in NetMessageTypes.h
2829
const ID: MsgId;
30+
/// How long to wait to receive this message as a response
31+
const RESPONSE_TIME_LIMIT: Duration = Duration::from_secs(2);
2932
}
3033

3134
impl<M> BaseMsg for M where M: Msg {}

shared/src/bee_msg/quota.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ pub struct GetQuotaInfoResp {
119119

120120
impl Msg for GetQuotaInfoResp {
121121
const ID: MsgId = 2098;
122+
const RESPONSE_TIME_LIMIT: Duration = Duration::from_mins(40);
122123
}
123124

124125
/// Sets exceeded quota information on server nodes.

shared/src/conn.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! Connection to other BeeGFS nodes
22
3+
use std::time::Duration;
4+
35
mod async_queue;
46
pub mod incoming;
57
pub mod msg_dispatch;
@@ -16,3 +18,6 @@ const TCP_BUF_LEN: usize = 4 * 1024 * 1024;
1618
/// Must match the `DGRAMMR_(RECV|SEND)BUF_SIZE` value in `DatagramListener.*` in the C/C++
1719
/// codebase. Must be smaller than TCP_BUF_LEN;
1820
const UDP_BUF_LEN: usize = 65536;
21+
22+
/// Reasonable time limit for most write and read operations
23+
const DEFAULT_TIME_LIMIT: Duration = Duration::from_secs(2);

shared/src/conn/incoming.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@ async fn read_stream(
139139
stream_authentication_required: bool,
140140
) -> Result<()> {
141141
// Read header
142-
stream.read_exact(&mut buf[0..Header::LEN]).await?;
142+
stream
143+
.read_exact(&mut buf[0..Header::LEN], DEFAULT_TIME_LIMIT)
144+
.await?;
143145

144146
let header = deserialize_header(&buf[0..Header::LEN])?;
145147

@@ -156,7 +158,7 @@ async fn read_stream(
156158

157159
// Read body
158160
stream
159-
.read_exact(&mut buf[Header::LEN..header.msg_len()])
161+
.read_exact(&mut buf[Header::LEN..header.msg_len()], DEFAULT_TIME_LIMIT)
160162
.await?;
161163

162164
// Forward to the dispatcher. The dispatcher is responsible for deserializing, dispatching to

shared/src/conn/msg_dispatch.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use super::stream::Stream;
44
use crate::bee_msg::{Header, Msg, MsgId, deserialize_body, serialize};
55
use crate::bee_serde::{Deserializable, Serializable};
6+
use crate::conn::DEFAULT_TIME_LIMIT;
67
use anyhow::Result;
78
use std::fmt::Debug;
89
use std::future::Future;
@@ -40,7 +41,9 @@ pub struct StreamRequest<'a> {
4041
impl Request for StreamRequest<'_> {
4142
async fn respond<M: Msg + Serializable>(self, msg: &M) -> Result<()> {
4243
let msg_len = serialize(msg, self.buf)?;
43-
self.stream.write_all(&self.buf[0..msg_len]).await
44+
self.stream
45+
.write_all(&self.buf[0..msg_len], DEFAULT_TIME_LIMIT)
46+
.await
4447
}
4548

4649
fn authenticate_connection(&mut self) {

shared/src/conn/outgoing.rs

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use super::store::Store;
33
use crate::bee_msg::misc::AuthenticateChannel;
44
use crate::bee_msg::{Header, Msg, deserialize_body, deserialize_header, serialize};
55
use crate::bee_serde::{Deserializable, Serializable};
6-
use crate::conn::TCP_BUF_LEN;
76
use crate::conn::store::StoredStream;
87
use crate::conn::stream::Stream;
8+
use crate::conn::{DEFAULT_TIME_LIMIT, TCP_BUF_LEN};
99
use crate::types::{AuthSecret, Uid};
1010
use anyhow::{Context, Result, bail};
1111
use std::fmt::Debug;
@@ -58,7 +58,9 @@ impl Pool {
5858
let mut buf = self.store.pop_buf_or_create();
5959

6060
let msg_len = serialize(msg, &mut buf)?;
61-
let resp_header = self.comm_stream(node_uid, &mut buf, msg_len, true).await?;
61+
let resp_header = self
62+
.comm_stream(node_uid, &mut buf, msg_len, Some(M::RESPONSE_TIME_LIMIT))
63+
.await?;
6264
let resp_msg = deserialize_body(&resp_header, &buf[Header::LEN..])?;
6365

6466
self.store.push_buf(buf);
@@ -75,7 +77,7 @@ impl Pool {
7577
let mut buf = self.store.pop_buf_or_create();
7678

7779
let msg_len = serialize(msg, &mut buf)?;
78-
self.comm_stream(node_uid, &mut buf, msg_len, false).await?;
80+
self.comm_stream(node_uid, &mut buf, msg_len, None).await?;
7981

8082
self.store.push_buf(buf);
8183

@@ -94,19 +96,21 @@ impl Pool {
9496
/// 2. Get a permit that allows opening a new stream. Try to open a new stream using the
9597
/// available addresses.
9698
/// 3. Pop an open stream from the store, waiting until one gets available.
99+
///
100+
/// If ``response_time_limit`` is set to `Some(t)`, a response is expected.
97101
async fn comm_stream(
98102
&self,
99103
node_uid: Uid,
100104
buf: &mut [u8],
101105
send_len: usize,
102-
expect_response: bool,
106+
response_time_limit: Option<Duration>,
103107
) -> Result<Header> {
104108
debug_assert_eq!(buf.len(), TCP_BUF_LEN);
105109

106110
// 1. Pop open streams until communication succeeds or none are left
107111
while let Some(stream) = self.store.try_pop_stream(node_uid) {
108112
match self
109-
.write_and_read_stream(buf, stream, send_len, expect_response)
113+
.write_and_read_stream(buf, stream, send_len, response_time_limit)
110114
.await
111115
{
112116
Ok(header) => return Ok(header),
@@ -132,7 +136,7 @@ impl Pool {
132136
continue;
133137
}
134138

135-
match Stream::connect_tcp(addr).await {
139+
match Stream::connect_tcp(addr, DEFAULT_TIME_LIMIT).await {
136140
Ok(stream) => {
137141
let mut stream = StoredStream::from_stream(stream, permit);
138142

@@ -152,7 +156,7 @@ impl Pool {
152156

153157
stream
154158
.as_mut()
155-
.write_all(&auth_buf[0..msg_len])
159+
.write_all(&auth_buf[0..msg_len], DEFAULT_TIME_LIMIT)
156160
.await
157161
.with_context(err_context)?;
158162

@@ -162,7 +166,7 @@ impl Pool {
162166
// Communication using the newly opened stream should usually not fail. If
163167
// it does, abort. It might be better to just try the next address though.
164168
let resp_header = self
165-
.write_and_read_stream(buf, stream, send_len, expect_response)
169+
.write_and_read_stream(buf, stream, send_len, response_time_limit)
166170
.await
167171
.with_context(err_context)?;
168172

@@ -189,7 +193,7 @@ impl Pool {
189193
})?;
190194

191195
let resp_header = self
192-
.write_and_read_stream(buf, stream, send_len, expect_response)
196+
.write_and_read_stream(buf, stream, send_len, response_time_limit)
193197
.await
194198
.with_context(|| {
195199
format!("Communication using existing stream to node with uid {node_uid} failed")
@@ -205,20 +209,27 @@ impl Pool {
205209
buf: &mut [u8],
206210
mut stream: StoredStream<Uid>,
207211
send_len: usize,
208-
expect_response: bool,
212+
response_time_limit: Option<Duration>,
209213
) -> Result<Header> {
210-
stream.as_mut().write_all(&buf[0..send_len]).await?;
214+
stream
215+
.as_mut()
216+
.write_all(&buf[0..send_len], DEFAULT_TIME_LIMIT)
217+
.await?;
211218

212-
let header = if expect_response {
219+
let header = if let Some(tl) = response_time_limit {
213220
// Read header
214-
stream.as_mut().read_exact(&mut buf[0..Header::LEN]).await?;
221+
stream
222+
.as_mut()
223+
.read_exact(&mut buf[0..Header::LEN], tl)
224+
.await?;
215225
let header = deserialize_header(&buf[0..Header::LEN])?;
216226

217227
// Read body
218228
stream
219229
.as_mut()
220-
.read_exact(&mut buf[Header::LEN..header.msg_len()])
230+
.read_exact(&mut buf[Header::LEN..header.msg_len()], tl)
221231
.await?;
232+
222233
header
223234
} else {
224235
Header::default()

shared/src/conn/stream.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
99
use tokio::net::TcpStream;
1010
use tokio::time::timeout;
1111

12-
const TIMEOUT: Duration = Duration::from_secs(2);
13-
1412
/// A connected generic stream.
1513
///
1614
/// Provides functionality to communicate with the connected peer. Can support multiple
@@ -39,9 +37,9 @@ impl From<TcpStream> for Stream {
3937
impl Stream {
4038
/// Connect to peer using TCP and obtain a [Stream] object.
4139
///
42-
/// Times out after [TIMEOUT].
43-
pub async fn connect_tcp(addr: &SocketAddr) -> Result<Self> {
44-
let stream = match timeout(TIMEOUT, TcpStream::connect(addr)).await {
40+
/// Times out after [time_limit].
41+
pub async fn connect_tcp(addr: &SocketAddr, time_limit: Duration) -> Result<Self> {
42+
let stream = match timeout(time_limit, TcpStream::connect(addr)).await {
4543
Ok(res) => res?,
4644
Err(_) => bail!("Connecting a TCP stream to {addr} timed out"),
4745
};
@@ -74,13 +72,12 @@ impl Stream {
7472
/// Reads from the stream into the provided buffer.
7573
///
7674
/// The buffer will be filled completely before the future completes. Times out after
77-
/// [TIMEOUT].
75+
/// [time_limit].
7876
///
7977
/// **Important**: Not cancel safe. If a timeout occurs, the stream may not be reused.
8078
// Clippy: Suppress false positive
81-
#[allow(clippy::needless_pass_by_ref_mut)]
82-
pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
83-
match timeout(TIMEOUT, async {
79+
pub async fn read_exact(&mut self, buf: &mut [u8], time_limit: Duration) -> Result<()> {
80+
match timeout(time_limit, async {
8481
match &mut self.stream {
8582
InnerStream::Tcp(s) => {
8683
s.read_exact(buf).await?;
@@ -98,11 +95,11 @@ impl Stream {
9895
/// Writes to the stream from the provided buffer.
9996
///
10097
/// The buffer will be written completely before the future completes. Times out after
101-
/// [TIMEOUT].
98+
/// [time_limit].
10299
///
103100
/// **Important**: Not cancel safe. If a timeout occurs, the stream may not be reused.
104-
pub async fn write_all(&mut self, buf: &[u8]) -> Result<()> {
105-
match timeout(TIMEOUT, async {
101+
pub async fn write_all(&mut self, buf: &[u8], time_limit: Duration) -> Result<()> {
102+
match timeout(time_limit, async {
106103
match &mut self.stream {
107104
InnerStream::Tcp(s) => {
108105
s.write_all(buf).await?;
@@ -113,7 +110,7 @@ impl Stream {
113110
.await
114111
{
115112
Ok(res) => res,
116-
Err(_) => Err(anyhow!("Writing to a stream to {} timed out", self.addr())),
113+
Err(_) => Err(anyhow!("Writing to stream to {} timed out", self.addr())),
117114
}
118115
}
119116

0 commit comments

Comments
 (0)