Skip to content

Commit e40edcb

Browse files
committed
Implement WebSocket transport layer with core types, protocol handling, and client features
- Added core types: RequestId, Topic, MessageKind, and WebSocketConfig. - Defined ProtocolHandler trait with default implementations and documentation. - Created PendingRequest and SubscriptionStore using lock-free scc::HashMap. - Implemented ConnectionActor for managing WebSocket connections and state transitions. - Developed WebSocketClient for handling requests, subscriptions, and message sending. - Added JSON-based protocol handlers for generic and JSON-RPC protocols. - Created mock server and comprehensive tests for stores, integration, and stress scenarios. - Documented modules, examples, and updated README for clarity and usage guidance.
1 parent 5152190 commit e40edcb

6 files changed

Lines changed: 242 additions & 161 deletions

File tree

Cargo.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ members = ["crates/*"]
33
resolver = "3"
44

55
[workspace.package]
6-
version = "1.0.0"
6+
version = "1.1.0"
77
edition = "2024"
88
authors = ["Akagi201 <akagi201@gmail.com>"]
99
license = "Apache-2.0"
@@ -182,8 +182,8 @@ all = "warn"
182182

183183
[workspace.dependencies]
184184
# local crates
185-
fastwebsockets = { path = "crates/fastwebsockets", package = "hpx-fastwebsockets", version = "1.0.0" }
186-
hpx = { path = "crates/hpx", version = "1.0.0" }
185+
fastwebsockets = { path = "crates/fastwebsockets", package = "hpx-fastwebsockets", version = "1.1.0" }
186+
hpx = { path = "crates/hpx", version = "1.1.0" }
187187

188188
# external crates
189189
ahash = "0.8.12"
@@ -192,7 +192,7 @@ async-trait = "0.1.80"
192192
axum = "0.8.8"
193193
axum-core = "0.5.6"
194194
base64 = "0.22.1"
195-
boring = { git = "https://github.com/cloudflare/boring.git", rev = "5f4cf54cc5a7f488a924f48ea06d334ac121e5ac", version = "5.0.0-alpha.3" }
195+
boring = { git = "https://github.com/cloudflare/boring.git", rev = "06ca1fd7461d3626746c2cd36d1ada07c85c952d", version = "5.0.0-alpha.3" }
196196
brotli = "8.0.2"
197197
bytes = "1.11.1"
198198
cookie = "0.18.1"
@@ -229,8 +229,8 @@ rand = "0.9.2"
229229
rustls = "0.23.36"
230230
rustls-pemfile = "2.2.0"
231231
rustls-pki-types = "1.14.0"
232+
scc = "3.5.5"
232233
schnellru = "0.2.4"
233-
scc = "2"
234234
serde = "1.0.228"
235235
serde_json = "1.0.149"
236236
serde_urlencoded = "0.7.1"
@@ -247,7 +247,7 @@ system-configuration = "0.7.0"
247247
tempfile = "3.17.1"
248248
thiserror = "2.0.18"
249249
tokio = "1.49.0"
250-
tokio-boring = { git = "https://github.com/cloudflare/boring.git", rev = "5f4cf54cc5a7f488a924f48ea06d334ac121e5ac", version = "5.0.0-alpha.3" }
250+
tokio-boring = { git = "https://github.com/cloudflare/boring.git", rev = "06ca1fd7461d3626746c2cd36d1ada07c85c952d", version = "5.0.0-alpha.3" }
251251
tokio-rustls = "0.26"
252252
tokio-socks = "0.5.2"
253253
tokio-test = "0.4.5"

crates/hpx-transport/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "metrics"] }
2929
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
3030
parking_lot.workspace = true
3131
rand.workspace = true
32+
scc.workspace = true
3233
serde = { workspace = true, features = ["derive"] }
3334
serde_json = { workspace = true }
34-
scc.workspace = true
3535
serde_urlencoded.workspace = true
3636
sha2.workspace = true
3737
thiserror.workspace = true

crates/hpx-transport/src/websocket/pending.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ impl PendingRequestStore {
6565
};
6666

6767
// Insert returns Err if key already exists
68-
if self.requests.insert(id, pending).is_err() {
68+
if self.requests.insert_sync(id, pending).is_err() {
6969
return None;
7070
}
7171

@@ -76,7 +76,7 @@ impl PendingRequestStore {
7676
///
7777
/// Returns `true` if the request was found and resolved, `false` otherwise.
7878
pub fn resolve(&self, id: &RequestId, response: TransportResult<String>) -> bool {
79-
if let Some((_, pending)) = self.requests.remove(id) {
79+
if let Some((_, pending)) = self.requests.remove_sync(id) {
8080
// Send the response; ignore error (receiver may have dropped)
8181
let _ = pending.response_tx.send(response);
8282
return true;
@@ -90,7 +90,7 @@ impl PendingRequestStore {
9090
pub fn cleanup_stale(&self) {
9191
let now = Instant::now();
9292
self.requests
93-
.retain(|_, pending| now.duration_since(pending.created_at) < pending.timeout);
93+
.retain_sync(|_, pending| now.duration_since(pending.created_at) < pending.timeout);
9494
}
9595

9696
/// Clean up stale requests and notify them of timeout.
@@ -101,15 +101,16 @@ impl PendingRequestStore {
101101
let mut expired = Vec::new();
102102

103103
// First, collect expired IDs
104-
self.requests.scan(|id, pending| {
104+
self.requests.retain_sync(|id, pending| {
105105
if now.duration_since(pending.created_at) >= pending.timeout {
106106
expired.push((id.clone(), pending.timeout));
107107
}
108+
true
108109
});
109110

110111
// Then remove and notify each one
111112
for (id, timeout) in expired {
112-
if let Some((_, pending)) = self.requests.remove(&id) {
113+
if let Some((_, pending)) = self.requests.remove_sync(&id) {
113114
let _ = pending
114115
.response_tx
115116
.send(Err(TransportError::request_timeout(
@@ -140,12 +141,13 @@ impl PendingRequestStore {
140141
/// This should be called on connection close to notify all waiters.
141142
pub fn clear_with_error(&self, error_message: &str) {
142143
let mut ids = Vec::new();
143-
self.requests.scan(|id, _| {
144+
self.requests.retain_sync(|id, _| {
144145
ids.push(id.clone());
146+
true
145147
});
146148

147149
for id in ids {
148-
if let Some((_, pending)) = self.requests.remove(&id) {
150+
if let Some((_, pending)) = self.requests.remove_sync(&id) {
149151
let _ = pending
150152
.response_tx
151153
.send(Err(TransportError::connection_closed(Some(
@@ -157,7 +159,7 @@ impl PendingRequestStore {
157159

158160
/// Clear all pending requests without notification.
159161
pub fn clear(&self) {
160-
self.requests.clear();
162+
self.requests.clear_sync();
161163
}
162164
}
163165

crates/hpx-transport/src/websocket/subscription.rs

Lines changed: 26 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,11 @@ impl SubscriptionStore {
4141
/// Returns `(receiver, is_new)` where `is_new` is true if this is
4242
/// a new subscription that needs to be sent to the server.
4343
pub fn subscribe(&self, topic: Topic) -> (broadcast::Receiver<WsMessage>, bool) {
44-
// Try to get existing subscription first
45-
if let Some(entry) = self.subscriptions.get(&topic) {
46-
let receiver = entry.get().sender.subscribe();
47-
drop(entry); // Release read lock before modifying
48-
49-
// Increment ref count
50-
let _ = self.subscriptions.update(&topic, |_, entry| {
51-
entry.ref_count += 1;
52-
});
53-
44+
// Try to get existing subscription and increment ref count atomically
45+
if let Some(receiver) = self.subscriptions.update_sync(&topic, |_, entry| {
46+
entry.ref_count += 1;
47+
entry.sender.subscribe()
48+
}) {
5449
return (receiver, false);
5550
}
5651

@@ -62,18 +57,14 @@ impl SubscriptionStore {
6257
};
6358

6459
// Insert; if another thread beat us, subscribe to their channel
65-
if let Err((_, _entry)) = self.subscriptions.insert(topic.clone(), entry) {
60+
if let Err((_, _entry)) = self.subscriptions.insert_sync(topic.clone(), entry) {
6661
// Race: another thread inserted first
6762
// The entry we tried to insert is returned on error
6863

69-
if let Some(existing) = self.subscriptions.get(&topic) {
70-
let receiver = existing.get().sender.subscribe();
71-
drop(existing);
72-
73-
let _ = self.subscriptions.update(&topic, |_, entry| {
74-
entry.ref_count += 1;
75-
});
76-
64+
if let Some(receiver) = self.subscriptions.update_sync(&topic, |_, entry| {
65+
entry.ref_count += 1;
66+
entry.sender.subscribe()
67+
}) {
7768
return (receiver, false);
7869
}
7970
}
@@ -85,17 +76,10 @@ impl SubscriptionStore {
8576
///
8677
/// Returns `Some(receiver)` if the topic exists, `None` otherwise.
8778
pub fn add_subscriber(&self, topic: &Topic) -> Option<broadcast::Receiver<WsMessage>> {
88-
if let Some(entry) = self.subscriptions.get(topic) {
89-
let receiver = entry.get().sender.subscribe();
90-
drop(entry);
91-
92-
let _ = self.subscriptions.update(topic, |_, entry| {
93-
entry.ref_count += 1;
94-
});
95-
96-
return Some(receiver);
97-
}
98-
None
79+
self.subscriptions.update_sync(topic, |_, entry| {
80+
entry.ref_count += 1;
81+
entry.sender.subscribe()
82+
})
9983
}
10084

10185
/// Unsubscribe from a topic.
@@ -105,15 +89,15 @@ impl SubscriptionStore {
10589
pub fn unsubscribe(&self, topic: &Topic) -> bool {
10690
let mut should_remove = false;
10791

108-
let _ = self.subscriptions.update(topic, |_, entry| {
92+
let _ = self.subscriptions.update_sync(topic, |_, entry| {
10993
entry.ref_count = entry.ref_count.saturating_sub(1);
11094
if entry.ref_count == 0 {
11195
should_remove = true;
11296
}
11397
});
11498

11599
if should_remove {
116-
self.subscriptions.remove(topic);
100+
self.subscriptions.remove_sync(topic);
117101
return true;
118102
}
119103

@@ -125,28 +109,28 @@ impl SubscriptionStore {
125109
/// Returns `true` if the topic exists (even if no active receivers),
126110
/// `false` if the topic doesn't exist.
127111
pub fn publish(&self, topic: &Topic, message: WsMessage) -> bool {
128-
if let Some(entry) = self.subscriptions.get(topic) {
129-
// Ignore send errors (no active receivers is fine)
130-
let _ = entry.get().sender.send(message);
131-
return true;
132-
}
133-
false
112+
self.subscriptions
113+
.update_sync(topic, |_, entry| {
114+
// Ignore send errors (no active receivers is fine)
115+
let _ = entry.sender.send(message.clone());
116+
})
117+
.is_some()
134118
}
135119

136120
/// Get all currently subscribed topics.
137121
pub fn get_all_topics(&self) -> Vec<Topic> {
138122
let mut topics = Vec::new();
139-
self.subscriptions.scan(|topic, _| {
123+
self.subscriptions.retain_sync(|topic, _| {
140124
topics.push(topic.clone());
125+
true
141126
});
142127
topics
143128
}
144129

145130
/// Get the subscriber count for a topic.
146131
pub fn subscriber_count(&self, topic: &Topic) -> usize {
147132
self.subscriptions
148-
.get(topic)
149-
.map(|entry| entry.get().ref_count)
133+
.update_sync(topic, |_, entry| entry.ref_count)
150134
.unwrap_or(0)
151135
}
152136

@@ -162,7 +146,7 @@ impl SubscriptionStore {
162146

163147
/// Clear all subscriptions.
164148
pub fn clear(&self) {
165-
self.subscriptions.clear();
149+
self.subscriptions.clear_sync();
166150
}
167151
}
168152

0 commit comments

Comments
 (0)