Skip to content

Commit 76c2581

Browse files
authored
fix: return an error instead of a response when a request idle timeout occurs for requests sent to the user worker (#679)
1 parent ff23c1a commit 76c2581

12 files changed

Lines changed: 61 additions & 2 deletions

File tree

crates/base/src/server.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::future::Future;
44
use std::net::SocketAddr;
55
use std::pin::Pin;
66
use std::str;
7+
use std::sync::atomic::AtomicBool;
78
use std::sync::Arc;
89
use std::task::Poll;
910
use std::time::Duration;
@@ -196,6 +197,7 @@ impl Service<Request<Body>> for WorkerService {
196197
req,
197198
res_tx,
198199
conn_token: Some(cancel.clone()),
200+
idle_timed_out: Arc::new(AtomicBool::new(false)),
199201
};
200202

201203
worker_req_tx.send(msg)?;

crates/base/src/utils/test_utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use std::marker::PhantomPinned;
44
use std::path::Path;
55
use std::path::PathBuf;
6+
use std::sync::atomic::AtomicBool;
67
use std::sync::Arc;
78
use std::task::ready;
89
use std::task::Poll;
@@ -297,6 +298,7 @@ impl TestBed {
297298
req,
298299
res_tx,
299300
conn_token: Some(conn_token.clone()),
301+
idle_timed_out: Arc::new(AtomicBool::new(false)),
300302
});
301303

302304
let Ok(res) = res_rx.await else {

crates/base/src/worker/utils.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
use std::collections::HashMap;
2+
use std::sync::atomic::AtomicBool;
3+
use std::sync::atomic::Ordering;
4+
use std::sync::Arc;
25

36
use anyhow::anyhow;
47
use anyhow::bail;
@@ -63,10 +66,12 @@ pub async fn send_user_worker_request(
6366
) -> Result<Response<Body>, anyhow::Error> {
6467
let (res_tx, res_rx) =
6568
oneshot::channel::<Result<Response<Body>, hyper_v014::Error>>();
69+
let idle_timed_out = Arc::new(AtomicBool::new(false));
6670
let msg = WorkerRequestMsg {
6771
req,
6872
res_tx,
6973
conn_token,
74+
idle_timed_out: idle_timed_out.clone(),
7075
};
7176

7277
// send the message to worker
@@ -88,7 +93,9 @@ pub async fn send_user_worker_request(
8893

8994
match res {
9095
Ok(v) => {
91-
// send the response back to the caller
96+
if idle_timed_out.load(Ordering::Relaxed) {
97+
bail!(WorkerError::RequestIdleTimeout)
98+
}
9299
Ok(v)
93100
}
94101

crates/base/src/worker/worker_surface_creation.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use super::WorkerSurface;
3535
mod request {
3636
use std::future::pending;
3737
use std::io::ErrorKind;
38+
use std::sync::atomic::Ordering;
3839
use std::sync::Arc;
3940
use std::time::Duration;
4041

@@ -90,6 +91,7 @@ mod request {
9091
mut req,
9192
res_tx,
9293
conn_token,
94+
idle_timed_out,
9395
} = msg;
9496

9597
let _ = duplex_stream_tx.send((theirs, conn_token.clone()));
@@ -155,6 +157,7 @@ mod request {
155157
let res = tokio::select! {
156158
resp = request_sender.send_request(req) => resp,
157159
_ = maybe_cancel_fut => {
160+
idle_timed_out.store(true, Ordering::Relaxed);
158161
Ok(emit_status_code(
159162
http_v02::StatusCode::GATEWAY_TIMEOUT,
160163
None,

crates/base/test_cases/main/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,14 @@ Deno.serve(async (req: Request) => {
111111
// return await callWorker();
112112
// }
113113

114+
if (e instanceof Deno.errors.WorkerRequestIdleTimeout) {
115+
const error = { msg: e.toString() };
116+
return new Response(
117+
JSON.stringify(error),
118+
{ status: 504, headers: { "Content-Type": "application/json" } },
119+
);
120+
}
121+
114122
const error = { msg: e.toString() };
115123
return new Response(
116124
JSON.stringify(error),

crates/base/tests/integration_tests.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::net::Ipv4Addr;
1212
use std::net::SocketAddr;
1313
use std::path::PathBuf;
1414
use std::str::FromStr;
15+
use std::sync::atomic::AtomicBool;
1516
use std::sync::Arc;
1617
use std::time::Duration;
1718

@@ -265,6 +266,7 @@ async fn test_not_trigger_pku_sigsegv_due_to_jit_compilation_non_cli() {
265266
req,
266267
res_tx,
267268
conn_token: Some(conn_token.clone()),
269+
idle_timed_out: Arc::new(AtomicBool::new(false)),
268270
};
269271

270272
let _ = surface.msg_tx.send(msg);
@@ -549,6 +551,7 @@ async fn test_main_worker_user_worker_mod_evaluate_exception() {
549551
req,
550552
res_tx,
551553
conn_token: Some(conn_token.clone()),
554+
idle_timed_out: Arc::new(AtomicBool::new(false)),
552555
};
553556

554557
let _ = surface.msg_tx.send(msg);
@@ -1893,7 +1896,15 @@ async fn test_request_idle_timeout_no_streamed_response(
18931896
request_builder,
18941897
maybe_tls,
18951898
(|resp| async {
1896-
assert_eq!(resp.unwrap().status().as_u16(), StatusCode::GATEWAY_TIMEOUT);
1899+
let resp = resp.unwrap();
1900+
assert_eq!(resp.status().as_u16(), StatusCode::GATEWAY_TIMEOUT);
1901+
let body = resp.bytes().await.unwrap();
1902+
assert!(
1903+
std::str::from_utf8(&body)
1904+
.unwrap()
1905+
.contains("WorkerRequestIdleTimeout"),
1906+
"expected WorkerRequestIdleTimeout in body"
1907+
);
18971908
}),
18981909
TerminationToken::new()
18991910
);

examples/main/index.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,15 @@ Deno.serve(async (req: Request) => {
240240
if (e instanceof Deno.errors.WorkerAlreadyRetired) {
241241
return await callWorker();
242242
}
243+
if (e instanceof Deno.errors.WorkerRequestIdleTimeout) {
244+
return new Response(
245+
JSON.stringify({ msg: e.toString() }),
246+
{
247+
status: STATUS_CODE.GatewayTimeout,
248+
headers,
249+
},
250+
);
251+
}
243252
if (e instanceof Deno.errors.WorkerRequestCancelled) {
244253
headers.append("Connection", "close");
245254

ext/runtime/js/errors.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const InvalidWorkerResponse = buildErrorClass("InvalidWorkerResponse");
2929
const InvalidWorkerCreation = buildErrorClass("InvalidWorkerCreation");
3030
const WorkerRequestCancelled = buildErrorClass("WorkerRequestCancelled");
3131
const WorkerAlreadyRetired = buildErrorClass("WorkerAlreadyRetired");
32+
const WorkerRequestIdleTimeout = buildErrorClass("WorkerRequestIdleTimeout");
3233
const NotFound = buildErrorClass("NotFound");
3334
const PermissionDenied = buildErrorClass("PermissionDenied");
3435
const ConnectionRefused = buildErrorClass("ConnectionRefused");
@@ -78,6 +79,7 @@ function registerErrors() {
7879
core.registerErrorClass("InvalidWorkerCreation", InvalidWorkerCreation);
7980
core.registerErrorClass("WorkerRequestCancelled", WorkerRequestCancelled);
8081
core.registerErrorClass("WorkerAlreadyRetired", WorkerAlreadyRetired);
82+
core.registerErrorClass("WorkerRequestIdleTimeout", WorkerRequestIdleTimeout);
8183
core.registerErrorClass("NotFound", NotFound);
8284
core.registerErrorClass("PermissionDenied", PermissionDenied);
8385
core.registerErrorClass("ConnectionRefused", ConnectionRefused);

ext/workers/context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::collections::HashMap;
22
use std::path::PathBuf;
3+
use std::sync::atomic::AtomicBool;
34
use std::sync::atomic::AtomicUsize;
45
use std::sync::Arc;
56
use std::sync::RwLock;
@@ -310,4 +311,5 @@ pub struct WorkerRequestMsg {
310311
pub req: Request<Body>,
311312
pub res_tx: oneshot::Sender<Result<Response<Body>, hyper_v014::Error>>,
312313
pub conn_token: Option<CancellationToken>,
314+
pub idle_timed_out: Arc<AtomicBool>,
313315
}

ext/workers/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@ pub enum WorkerError {
66
RequestCancelledBySupervisor,
77
#[error("request cannot be handled because the worker has already retired")]
88
WorkerAlreadyRetired,
9+
#[error("request timed out")]
10+
RequestIdleTimeout,
911
}

0 commit comments

Comments
 (0)