Skip to content

Commit 0e6c185

Browse files
committed
feat: enhance WebSocket connector build and test processes for server and client integration
1 parent 6f0ec89 commit 0e6c185

3 files changed

Lines changed: 118 additions & 10 deletions

File tree

Makefile

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ build:
9393
cargo build --package aimdb-knx-connector --features "std,tokio-runtime"
9494
@printf "$(YELLOW) → Building WS protocol$(NC)\n"
9595
cargo build --package aimdb-ws-protocol
96-
@printf "$(YELLOW) → Building WebSocket connector$(NC)\n"
97-
cargo build --package aimdb-websocket-connector --features "tokio-runtime"
96+
@printf "$(YELLOW) → Building WebSocket connector (server + client)$(NC)\n"
97+
cargo build --package aimdb-websocket-connector --features "server,client"
9898
@printf "$(YELLOW) → Building WASM adapter$(NC)\n"
9999
cargo build --package aimdb-wasm-adapter --target wasm32-unknown-unknown --features "wasm-runtime"
100100

@@ -150,8 +150,10 @@ test:
150150
cargo test --package aimdb-knx-connector --features "std,tokio-runtime"
151151
@printf "$(YELLOW) → Testing WS protocol$(NC)\n"
152152
cargo test --package aimdb-ws-protocol
153-
@printf "$(YELLOW) → Testing WebSocket connector$(NC)\n"
154-
cargo test --package aimdb-websocket-connector --features "tokio-runtime"
153+
@printf "$(YELLOW) → Testing WebSocket connector (server + client: unit, real-socket e2e, AimDB round-trip)$(NC)\n"
154+
cargo test --package aimdb-websocket-connector --features "server,client"
155+
@printf "$(YELLOW) → Testing WebSocket connector client-only build$(NC)\n"
156+
cargo test --package aimdb-websocket-connector --no-default-features --features "client" --lib
155157

156158
fmt:
157159
@printf "$(GREEN)Formatting code (workspace members only)...$(NC)\n"
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
//! Minimal runnable WebSocket **server** demo (doc 039-validation Layer 5) — the
2+
//! first real consumer of the connector and a manual-smoke vehicle.
3+
//!
4+
//! Run:
5+
//! ```text
6+
//! cargo run -p aimdb-websocket-connector --example ws_server
7+
//! ```
8+
//! Then connect a client and subscribe to the ticking `counter` record:
9+
//! ```text
10+
//! wscat -c ws://127.0.0.1:8080/ws
11+
//! > {"type":"subscribe","topics":["counter"]}
12+
//! < {"type":"subscribed","topics":["counter"]}
13+
//! < {"type":"data","topic":"counter","payload":{"n":1},"ts":...}
14+
//! ```
15+
//! Or write to the inbound `echo` record:
16+
//! ```text
17+
//! > {"type":"write","topic":"echo","payload":{"msg":"hi"}}
18+
//! ```
19+
20+
use std::sync::Arc;
21+
use std::time::Duration;
22+
23+
use aimdb_core::buffer::BufferCfg;
24+
use aimdb_core::AimDbBuilder;
25+
use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt};
26+
use aimdb_websocket_connector::WebSocketConnector;
27+
use serde::{Deserialize, Serialize};
28+
use serde_json::json;
29+
30+
#[derive(Debug, Clone, Serialize, Deserialize)]
31+
struct Counter {
32+
n: u64,
33+
}
34+
35+
#[derive(Debug, Clone, Serialize, Deserialize)]
36+
struct Echo {
37+
msg: String,
38+
}
39+
40+
#[tokio::main]
41+
async fn main() {
42+
let addr = "127.0.0.1:8080";
43+
44+
let mut sb = AimDbBuilder::new()
45+
.runtime(Arc::new(TokioAdapter))
46+
.with_connector(
47+
WebSocketConnector::new()
48+
.bind(addr)
49+
.path("/ws")
50+
.with_late_join(true),
51+
);
52+
53+
// Outbound: pushed to every subscribed client.
54+
sb.configure::<Counter>("counter", |reg| {
55+
reg.buffer(BufferCfg::SingleLatest)
56+
.with_remote_access()
57+
.link_to("ws://counter")
58+
.with_serializer_raw(|c: &Counter| Ok(serde_json::to_vec(c).unwrap()))
59+
.finish();
60+
});
61+
// Inbound: clients may `write` to it.
62+
sb.configure::<Echo>("echo", |reg| {
63+
reg.buffer(BufferCfg::SingleLatest)
64+
.with_remote_access()
65+
.link_from("ws://echo")
66+
.with_deserializer_raw(|d: &[u8]| {
67+
serde_json::from_slice::<Echo>(d).map_err(|e| e.to_string())
68+
})
69+
.finish();
70+
});
71+
72+
let (db, runner) = sb.build().await.expect("build db");
73+
let db = Arc::new(db);
74+
tokio::spawn(runner.run());
75+
76+
println!("WS server listening on ws://{addr}/ws");
77+
println!(" subscribe: wscat -c ws://{addr}/ws → {{\"type\":\"subscribe\",\"topics\":[\"counter\"]}}");
78+
79+
let mut n = 0u64;
80+
loop {
81+
n += 1;
82+
db.set_record_from_json("counter", json!({ "n": n }))
83+
.expect("set counter");
84+
if let Some(echo) = db.try_latest_as_json("echo") {
85+
println!("echo record = {echo}");
86+
}
87+
tokio::time::sleep(Duration::from_secs(1)).await;
88+
}
89+
}

aimdb-websocket-connector/src/e2e.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,10 @@ async fn many_clients_fanout_and_resource_cleanup() {
446446
},
447447
)
448448
.await;
449-
assert!(matches!(ws_recv(&mut c).await, ServerMessage::Subscribed { .. }));
449+
assert!(matches!(
450+
ws_recv(&mut c).await,
451+
ServerMessage::Subscribed { .. }
452+
));
450453
clients.push(c);
451454
}
452455
wait_until(|| bus.subscription_count() == 20, "20 subscriptions").await;
@@ -491,7 +494,10 @@ async fn stalled_client_does_not_block_a_healthy_one() {
491494
},
492495
)
493496
.await;
494-
assert!(matches!(ws_recv(&mut healthy).await, ServerMessage::Subscribed { .. }));
497+
assert!(matches!(
498+
ws_recv(&mut healthy).await,
499+
ServerMessage::Subscribed { .. }
500+
));
495501
wait_until(|| bus.subscription_count() == 2, "2 subscriptions").await;
496502

497503
// Flood well past the bounded funnel (256). The stalled client's pump drops
@@ -551,8 +557,14 @@ async fn golden_wire_frames() {
551557
},
552558
)
553559
.await;
554-
assert_eq!(recv_value(&mut c).await, json!({"type": "subscribed", "topics": ["t"]}));
555-
assert_eq!(recv_value(&mut c).await, json!({"type": "snapshot", "topic": "t", "payload": 5}));
560+
assert_eq!(
561+
recv_value(&mut c).await,
562+
json!({"type": "subscribed", "topics": ["t"]})
563+
);
564+
assert_eq!(
565+
recv_value(&mut c).await,
566+
json!({"type": "snapshot", "topic": "t", "payload": 5})
567+
);
556568

557569
bus.broadcast("t", b"42").await;
558570
assert_eq!(
@@ -600,7 +612,12 @@ async fn async_authorize_subscribe_gates_despite_allow_all_permissions() {
600612
},
601613
)
602614
.await;
603-
assert!(matches!(ws_recv(&mut c).await, ServerMessage::Subscribed { .. }));
615+
assert!(matches!(
616+
ws_recv(&mut c).await,
617+
ServerMessage::Subscribed { .. }
618+
));
604619
bus.broadcast("public/x", b"1").await;
605-
assert!(matches!(ws_recv(&mut c).await, ServerMessage::Data { topic, .. } if topic == "public/x"));
620+
assert!(
621+
matches!(ws_recv(&mut c).await, ServerMessage::Data { topic, .. } if topic == "public/x")
622+
);
606623
}

0 commit comments

Comments
 (0)