Skip to content

Commit 0a16c5c

Browse files
committed
rusk: implement RuskHttp builder and shutdown
1 parent 0e2bb70 commit 0a16c5c

2 files changed

Lines changed: 87 additions & 24 deletions

File tree

rusk/src/lib/builder/http_only.rs

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,31 +4,52 @@
44
//
55
// Copyright (c) DUSK NETWORK. All rights reserved.
66

7+
use std::time::Duration;
8+
79
use tokio::sync::broadcast;
810
use tracing::info;
911

10-
use crate::http::{DataSources, HttpServer, HttpServerConfig};
12+
use crate::http::{DataSources, HandleRequest, HttpServer, HttpServerConfig};
1113

12-
#[derive(Default)]
1314
pub struct RuskHttpBuilder {
1415
http: Option<HttpServerConfig>,
16+
data_sources: DataSources,
17+
shutdown_timeout: Duration,
1518
}
1619

1720
impl RuskHttpBuilder {
21+
pub fn new() -> Self {
22+
Self {
23+
http: None,
24+
data_sources: DataSources::default(),
25+
shutdown_timeout: Duration::from_secs(30),
26+
}
27+
}
28+
1829
pub fn with_http(mut self, http: HttpServerConfig) -> Self {
1930
self.http = Some(http);
2031
self
2132
}
2233

23-
pub async fn build_and_run(self) -> anyhow::Result<()> {
34+
pub fn with_data_source(mut self, source: Box<dyn HandleRequest>) -> Self {
35+
self.data_sources.sources.push(source);
36+
self
37+
}
38+
39+
pub fn with_shutdown_timeout(mut self, timeout: Duration) -> Self {
40+
self.shutdown_timeout = timeout;
41+
self
42+
}
43+
44+
pub async fn build(self) -> anyhow::Result<RuskHttp> {
2445
let (_rues_sender, rues_receiver) = broadcast::channel(1);
2546

26-
let mut _ws_server = None;
47+
let mut server = None;
2748
if let Some(http) = self.http {
2849
info!("Configuring HTTP");
2950

3051
#[allow(unused_mut)]
31-
let mut handler = DataSources::default();
52+
let mut handler = self.data_sources;
3253

3354
#[cfg(feature = "prover")]
3455
handler.sources.push(Box::new(rusk_prover::LocalProver));
@@ -38,7 +59,7 @@ impl RuskHttpBuilder {
3859
_ => None,
3960
};
4061

41-
let (server, _) = HttpServer::bind(
62+
let (http_server, _) = HttpServer::bind(
4263
handler,
4364
rues_receiver,
4465
http.ws_event_channel_cap,
@@ -48,13 +69,51 @@ impl RuskHttpBuilder {
4869
)
4970
.await?;
5071

51-
_ws_server = Some(server);
72+
server = Some(http_server);
5273
}
5374

54-
if let Some(s) = _ws_server {
55-
s.wait().await?;
75+
Ok(RuskHttp {
76+
server,
77+
shutdown_timeout: self.shutdown_timeout,
78+
})
79+
}
80+
81+
pub async fn build_and_run(self) -> anyhow::Result<()> {
82+
let mut http = self.build().await?;
83+
http.run().await
84+
}
85+
}
86+
87+
impl Default for RuskHttpBuilder {
88+
fn default() -> Self {
89+
Self::new()
90+
}
91+
}
92+
93+
pub struct RuskHttp {
94+
server: Option<HttpServer>,
95+
shutdown_timeout: Duration,
96+
}
97+
98+
impl RuskHttp {
99+
pub async fn run(&mut self) -> anyhow::Result<()> {
100+
if let Some(server) = &mut self.server {
101+
server.wait().await?;
56102
}
103+
Ok(())
104+
}
57105

106+
pub async fn shutdown(&mut self) -> anyhow::Result<()> {
107+
if let Some(server) = &mut self.server {
108+
tokio::time::timeout(self.shutdown_timeout, server.shutdown())
109+
.await
110+
.map_err(|_| {
111+
anyhow::anyhow!(
112+
"HTTP server failed to shut down within {} seconds",
113+
self.shutdown_timeout.as_secs()
114+
)
115+
})??;
116+
}
58117
Ok(())
59118
}
60119
}

rusk/src/lib/http.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,9 @@ const RUSK_VERSION_HEADER: &str = "Rusk-Version";
8989
const RUSK_VERSION_STRICT_HEADER: &str = "Rusk-Version-Strict";
9090

9191
pub struct HttpServer {
92-
handle: task::JoinHandle<()>,
93-
_shutdown: broadcast::Sender<Infallible>,
92+
handle: Option<task::JoinHandle<()>>,
93+
#[cfg_attr(feature = "chain", allow(dead_code))]
94+
shutdown: Option<broadcast::Sender<Infallible>>,
9495
}
9596

9697
pub struct HttpServerConfig {
@@ -102,8 +103,17 @@ pub struct HttpServerConfig {
102103
}
103104

104105
impl HttpServer {
105-
pub async fn wait(self) -> Result<(), JoinError> {
106-
self.handle.await
106+
pub async fn wait(&mut self) -> Result<(), JoinError> {
107+
match self.handle.take() {
108+
Some(handle) => handle.await,
109+
None => Ok(()),
110+
}
111+
}
112+
113+
#[cfg(not(feature = "chain"))]
114+
pub(crate) async fn shutdown(&mut self) -> Result<(), JoinError> {
115+
self.shutdown.take(); // this closes the channel to signal shutdown
116+
self.wait().await
107117
}
108118

109119
pub async fn bind<A, H, P1, P2>(
@@ -141,8 +151,8 @@ impl HttpServer {
141151
));
142152

143153
let server = Self {
144-
handle,
145-
_shutdown: shutdown_sender,
154+
handle: Some(handle),
155+
shutdown: Some(shutdown_sender),
146156
};
147157
Ok((server, local_addr))
148158
}
@@ -256,16 +266,9 @@ async fn listening_loop<H>(
256266
ws_event_channel_cap,
257267
};
258268

259-
let runtime = tokio::runtime::Builder::new_multi_thread()
260-
.worker_threads(4)
261-
.thread_name("http")
262-
.enable_all()
263-
.build()
264-
.expect("http runtime to be created");
265269
loop {
266270
tokio::select! {
267271
_ = shutdown.recv() => {
268-
runtime.shutdown_background();
269272
break;
270273
}
271274
r = listener.accept() => {
@@ -279,8 +282,9 @@ async fn listening_loop<H>(
279282
let stream = TokioIo::new(stream);
280283
let service = service.clone();
281284

282-
runtime.spawn(async move {
283-
let conn = http.serve_connection_with_upgrades(stream, service);
285+
task::spawn(async move {
286+
let conn =
287+
http.serve_connection_with_upgrades(stream, service);
284288
conn.await
285289
});
286290
}

0 commit comments

Comments
 (0)