Skip to content

Commit c47d831

Browse files
authored
fix(base/server): return HTTP error responses instead of dropping connection in WorkerService (#696)
* fix(base/server): return HTTP error responses instead of dropping connection in WorkerService * stamp: meow * stamp: update tests
1 parent 083f748 commit c47d831

2 files changed

Lines changed: 37 additions & 64 deletions

File tree

crates/base/src/server.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ mod signal {
7171
pub use tokio::signal::unix;
7272
}
7373

74+
const SERVED_BY: &str = concat!(env!("CARGO_PKG_NAME"), "/server");
75+
7476
pub enum ServerEvent {
7577
ConnectionError(hyper_v014::Error),
7678
#[cfg(debug_assertions)]
@@ -221,20 +223,29 @@ impl Service<Request<Body>> for WorkerService {
221223

222224
let res = match res_rx.await {
223225
Ok(res) => res,
224-
Err(err) => {
226+
Err(_) => {
225227
metric_src.incl_handled_requests();
226-
return Err(err.into());
228+
return Ok(
229+
Response::builder()
230+
.status(http_v02::StatusCode::INTERNAL_SERVER_ERROR)
231+
.header("x-served-by", SERVED_BY)
232+
.body(Body::empty())
233+
.unwrap(),
234+
);
227235
}
228236
};
229237

230-
// If the token has already been canceled, drop the socket connection
231-
// without sending a response.
238+
// If the token has already been canceled, return 503 instead of
239+
// dropping the socket connection without a response.
232240
if cancel.is_cancelled() {
233241
error!("connection aborted (uri: {:?})", req_uri.to_string());
234-
return Err(anyhow!(std::io::Error::new(
235-
std::io::ErrorKind::ConnectionAborted,
236-
"connection aborted"
237-
)));
242+
return Ok(
243+
Response::builder()
244+
.status(http_v02::StatusCode::SERVICE_UNAVAILABLE)
245+
.header("x-served-by", SERVED_BY)
246+
.body(Body::empty())
247+
.unwrap(),
248+
);
238249
}
239250

240251
let res = match res {

crates/base/tests/integration_tests.rs

Lines changed: 18 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
use std::borrow::Cow;
55
use std::collections::HashMap;
6-
use std::error::Error;
76
use std::io;
87
use std::io::BufRead;
98
use std::io::Cursor;
@@ -1267,25 +1266,16 @@ async fn req_failure_case_op_cancel_from_server_due_to_cpu_resource_limit() {
12671266
120 * MB,
12681267
None,
12691268
|resp| async {
1270-
if let Err(err) = resp {
1271-
assert_connection_aborted(err);
1272-
} else {
1273-
let res = resp.unwrap();
1274-
1275-
assert_eq!(res.status().as_u16(), 500);
1276-
1277-
let res = res.json::<ErrorResponsePayload>().await;
1278-
1279-
assert!(res.is_ok());
1280-
1281-
let msg = res.unwrap().msg;
1269+
let res = resp.unwrap();
12821270

1283-
assert!(
1284-
msg
1285-
== "WorkerRequestCancelled: request has been cancelled by supervisor"
1286-
|| msg == "broken pipe"
1287-
);
1288-
}
1271+
assert_eq!(res.status().as_u16(), 503);
1272+
assert_eq!(
1273+
res
1274+
.headers()
1275+
.get("x-served-by")
1276+
.map(|v| v.to_str().unwrap()),
1277+
Some(concat!(env!("CARGO_PKG_NAME"), "/server"))
1278+
);
12891279
},
12901280
)
12911281
.await;
@@ -1299,28 +1289,16 @@ async fn req_failure_case_op_cancel_from_server_due_to_cpu_resource_limit_2() {
12991289
10 * MB,
13001290
Some("image/png"),
13011291
|resp| async {
1302-
if let Err(err) = resp {
1303-
assert_connection_aborted(err);
1304-
} else {
1305-
let res = resp.unwrap();
1306-
1307-
assert_eq!(res.status().as_u16(), 500);
1308-
1309-
let res = res.json::<ErrorResponsePayload>().await;
1310-
1311-
assert!(res.is_ok());
1312-
1313-
let msg = res.unwrap().msg;
1292+
let res = resp.unwrap();
13141293

1315-
assert!(
1316-
!msg.starts_with("TypeError: request body receiver not connected")
1317-
);
1318-
assert!(
1319-
msg
1320-
== "WorkerRequestCancelled: request has been cancelled by supervisor"
1321-
|| msg == "broken pipe"
1322-
);
1323-
}
1294+
assert_eq!(res.status().as_u16(), 503);
1295+
assert_eq!(
1296+
res
1297+
.headers()
1298+
.get("x-served-by")
1299+
.map(|v| v.to_str().unwrap()),
1300+
Some(concat!(env!("CARGO_PKG_NAME"), "/server"))
1301+
);
13241302
},
13251303
)
13261304
.await;
@@ -4642,19 +4620,3 @@ fn new_localhost_tls(secure: bool) -> Option<Tls> {
46424620
Tls::new(SECURE_PORT, TLS_LOCALHOST_KEY, TLS_LOCALHOST_CERT).unwrap()
46434621
})
46444622
}
4645-
4646-
fn assert_connection_aborted(err: reqwest::Error) {
4647-
let source = err.source();
4648-
let hyper_err = source
4649-
.and_then(|err| err.downcast_ref::<hyper::Error>())
4650-
.unwrap();
4651-
4652-
if hyper_err.is_incomplete_message() {
4653-
return;
4654-
}
4655-
4656-
let cause = hyper_err.source().unwrap();
4657-
let cause = cause.downcast_ref::<std::io::Error>().unwrap();
4658-
4659-
assert_eq!(cause.kind(), std::io::ErrorKind::ConnectionReset);
4660-
}

0 commit comments

Comments
 (0)