Skip to content

Commit bd2994d

Browse files
authored
add request/reply support in http-server (#68)
resolves #67 Signed-off-by: markfisher <mark@modulewise.com>
1 parent e05d665 commit bd2994d

4 files changed

Lines changed: 107 additions & 18 deletions

File tree

crates/http-server/src/config.rs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@ pub enum RouteTarget {
2626
body: Option<String>,
2727
},
2828
/// Publish the request body to a channel.
29-
Channel { channel: String },
29+
Channel {
30+
channel: String,
31+
/// If set, use request-reply, else fire-and-forget.
32+
reply_timeout_ms: Option<u64>,
33+
},
3034
}
3135

3236
/// Parsed HTTP server definition.
@@ -74,7 +78,15 @@ impl ConfigHandler for HttpServerConfigHandler {
7478
fn claimed_properties(&self) -> HashMap<&str, &[&str]> {
7579
HashMap::from([(
7680
"server",
77-
["type", "port", "route", "otlp-endpoint", "otlp-protocol"].as_slice(),
81+
[
82+
"type",
83+
"port",
84+
"route",
85+
"otlp-endpoint",
86+
"otlp-protocol",
87+
"reply-timeout-ms",
88+
]
89+
.as_slice(),
7890
)])
7991
}
8092

@@ -203,6 +215,19 @@ fn parse_routes(server_name: &str, properties: &mut PropertyMap) -> Result<Vec<R
203215
let function = get_optional_string(&route_props, "function", &ctx)?;
204216
let channel = get_optional_string(&route_props, "channel", &ctx)?;
205217
let body = get_optional_string(&route_props, "body", &ctx)?;
218+
let reply_timeout_ms = match route_props.get("reply-timeout-ms") {
219+
Some(serde_json::Value::Number(n)) => Some(
220+
n.as_u64()
221+
.ok_or_else(|| ctx("'reply-timeout-ms'", "must be a non-negative integer"))?,
222+
),
223+
Some(got) => {
224+
return Err(ctx(
225+
"'reply-timeout-ms'",
226+
&format!("must be a number, got {got}"),
227+
));
228+
}
229+
None => None,
230+
};
206231

207232
let has_component = component.is_some() || function.is_some();
208233
let has_channel = channel.is_some();
@@ -218,7 +243,10 @@ fn parse_routes(server_name: &str, properties: &mut PropertyMap) -> Result<Vec<R
218243
if body.is_some() {
219244
return Err(ctx("", "'body' is not valid on channel routes"));
220245
}
221-
RouteTarget::Channel { channel }
246+
RouteTarget::Channel {
247+
channel,
248+
reply_timeout_ms,
249+
}
222250
} else {
223251
let component = component
224252
.ok_or_else(|| ctx("", "missing 'component' (or 'channel' for channel routes)"))?;
@@ -534,8 +562,12 @@ mod tests {
534562
let servers = config.lock().unwrap();
535563
assert_eq!(servers[0].routes[0].method, "POST");
536564
match &servers[0].routes[0].target {
537-
RouteTarget::Channel { channel } => {
565+
RouteTarget::Channel {
566+
channel,
567+
reply_timeout_ms,
568+
} => {
538569
assert_eq!(channel, "incoming-events");
570+
assert!(reply_timeout_ms.is_none());
539571
}
540572
other => panic!("expected Channel target, got {other:?}"),
541573
}

crates/http-server/src/server.rs

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ enum Target {
3737
},
3838
Channel {
3939
channel: String,
40+
reply_timeout_ms: Option<u64>,
4041
},
4142
}
4243

@@ -89,8 +90,12 @@ impl Route {
8990
body: body.clone(),
9091
}
9192
}
92-
RouteTarget::Channel { channel } => Target::Channel {
93+
RouteTarget::Channel {
94+
channel,
95+
reply_timeout_ms,
96+
} => Target::Channel {
9397
channel: channel.clone(),
98+
reply_timeout_ms: *reply_timeout_ms,
9499
},
95100
};
96101

@@ -167,7 +172,13 @@ impl Router {
167172
self.invoke_component(component, function, body.as_deref(), params, req)
168173
.await
169174
}
170-
Target::Channel { channel } => self.publish_to_channel(channel, req).await,
175+
Target::Channel {
176+
channel,
177+
reply_timeout_ms,
178+
} => {
179+
self.publish_to_channel(channel, req, *reply_timeout_ms)
180+
.await
181+
}
171182
}
172183
}
173184

@@ -346,6 +357,7 @@ impl Router {
346357
&self,
347358
channel: &str,
348359
req: Request<Incoming>,
360+
reply_timeout_ms: Option<u64>,
349361
) -> Response<Full<Bytes>> {
350362
let publisher = match &self.publisher {
351363
Some(p) => p,
@@ -379,17 +391,61 @@ impl Router {
379391
let mut headers = HashMap::new();
380392
headers.insert("content-type".to_string(), content_type);
381393

382-
match publisher.publish(channel, body_bytes, headers).await {
383-
Ok(()) => Response::builder()
384-
.status(StatusCode::ACCEPTED)
385-
.body(Full::new(Bytes::new()))
386-
.unwrap(),
387-
Err(e) => Response::builder()
388-
.status(StatusCode::INTERNAL_SERVER_ERROR)
389-
.body(Full::new(Bytes::from(format!(
390-
"failed to publish to channel '{channel}': {e}"
391-
))))
392-
.unwrap(),
394+
match reply_timeout_ms {
395+
None => match publisher.publish(channel, body_bytes, headers).await {
396+
Ok(()) => Response::builder()
397+
.status(StatusCode::ACCEPTED)
398+
.body(Full::new(Bytes::new()))
399+
.unwrap(),
400+
Err(e) => Response::builder()
401+
.status(StatusCode::INTERNAL_SERVER_ERROR)
402+
.body(Full::new(Bytes::from(format!(
403+
"failed to publish to channel '{channel}': {e}"
404+
))))
405+
.unwrap(),
406+
},
407+
Some(timeout_ms) => {
408+
let handle = match publisher
409+
.publish_request(channel, body_bytes, headers)
410+
.await
411+
{
412+
Ok(h) => h,
413+
Err(e) => {
414+
return Response::builder()
415+
.status(StatusCode::INTERNAL_SERVER_ERROR)
416+
.body(Full::new(Bytes::from(format!(
417+
"failed to publish to channel '{channel}': {e}"
418+
))))
419+
.unwrap();
420+
}
421+
};
422+
423+
let timeout = std::time::Duration::from_millis(timeout_ms);
424+
match tokio::time::timeout(timeout, handle.take()).await {
425+
Ok(Ok(reply)) => {
426+
let reply_content_type = reply
427+
.headers()
428+
.content_type()
429+
.unwrap_or("application/octet-stream")
430+
.to_string();
431+
Response::builder()
432+
.status(StatusCode::OK)
433+
.header("content-type", reply_content_type)
434+
.body(Full::new(Bytes::copy_from_slice(reply.body())))
435+
.unwrap()
436+
}
437+
Ok(Err(e)) => Response::builder()
438+
.status(StatusCode::INTERNAL_SERVER_ERROR)
439+
.body(Full::new(Bytes::from(format!("reply error: {e}"))))
440+
.unwrap(),
441+
Err(_) => Response::builder()
442+
.status(StatusCode::GATEWAY_TIMEOUT)
443+
.body(Full::new(Bytes::from(format!(
444+
"no reply received within {timeout_ms}ms"
445+
))))
446+
.unwrap(),
447+
}
448+
}
393449
}
394450
}
395451
}

examples/http-server/config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ function = "greet"
1010
[server.api.route.bonjour]
1111
path = "/bonjour"
1212
channel = "names"
13+
reply-timeout-ms = 1000
1314

1415
[component.hello]
1516
uri = "../hello-world/greeter/target/wasm32-unknown-unknown/release/greeter.wasm"

examples/http-server/greet.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ echo "--- GET /hello/world (component route) ---"
66
curl -s localhost:8888/hello/World; echo
77

88
echo "--- POST /bonjour (channel route -> subscription -> bonjour component) ---"
9-
curl -s -X POST -H "Content-Type: text/plain" -d "le Monde" localhost:8888/bonjour
9+
curl -s -X POST -H "Content-Type: text/plain" -d "le Monde" localhost:8888/bonjour; echo

0 commit comments

Comments
 (0)