Skip to content

Commit 85b23de

Browse files
committed
refactor: clean up code formatting and improve readability in WebSocket client and codec
1 parent 370bd37 commit 85b23de

4 files changed

Lines changed: 40 additions & 21 deletions

File tree

aimdb-websocket-connector/src/client/builder.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,8 @@ where
171171
// Mirrors `AimxClientConnector`: `run_client` owns demux/reconnect/
172172
// keepalive over the WS `Dialer` + per-connection `WsCodec`;
173173
// `pump_client` wires `link_to`/`link_from` routes to the handle.
174-
let (handle, engine_fut) = run_client(
175-
WsDialer::new(self.url.clone()),
176-
WsCodec::new(),
177-
config,
178-
);
174+
let (handle, engine_fut) =
175+
run_client(WsDialer::new(self.url.clone()), WsCodec::new(), config);
179176
let mut futures = pump_client(db, "ws-client", &handle);
180177
futures.push(engine_fut);
181178
Ok(futures)

aimdb-websocket-connector/src/codec.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ impl aimdb_core::EnvelopeCodec for WsCodec {
126126
// ---- server direction: read a ClientMessage, write a ServerMessage ------
127127

128128
fn decode(&self, frame: &[u8]) -> Result<Inbound, CodecError> {
129-
let msg: ClientMessage = serde_json::from_slice(frame).map_err(|_| CodecError::Malformed)?;
129+
let msg: ClientMessage =
130+
serde_json::from_slice(frame).map_err(|_| CodecError::Malformed)?;
130131
let mut st = self.state.lock().unwrap();
131132
match msg {
132133
// The transport splits multi-topic frames, so exactly one topic here.
@@ -190,7 +191,12 @@ impl aimdb_core::EnvelopeCodec for WsCodec {
190191
.unwrap()
191192
.topic_of(sub)
192193
.ok_or(CodecError::Malformed)?;
193-
write_server(out, &ServerMessage::Subscribed { topics: vec![topic] })
194+
write_server(
195+
out,
196+
&ServerMessage::Subscribed {
197+
topics: vec![topic],
198+
},
199+
)
194200
}
195201
Outbound::Pong => write_server(out, &ServerMessage::Pong),
196202
// `Reply::Ok` payloads are already a complete `ServerMessage` JSON
@@ -388,10 +394,20 @@ mod tests {
388394
let id = sub(&codec, "a/b");
389395
let mut out = Vec::new();
390396
codec
391-
.encode(Outbound::Subscribed { sub: &id.to_string() }, &mut out)
397+
.encode(
398+
Outbound::Subscribed {
399+
sub: &id.to_string(),
400+
},
401+
&mut out,
402+
)
392403
.unwrap();
393404
let v: ServerMessage = serde_json::from_slice(&out).unwrap();
394-
assert_eq!(v, ServerMessage::Subscribed { topics: vec!["a/b".into()] });
405+
assert_eq!(
406+
v,
407+
ServerMessage::Subscribed {
408+
topics: vec!["a/b".into()]
409+
}
410+
);
395411
}
396412

397413
#[test]

aimdb-websocket-connector/src/dispatch.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,7 @@ impl Session for WsSession {
119119
.handle_query(&pattern, from, to, limit)
120120
.await
121121
{
122-
Ok((records, total)) => ServerMessage::QueryResult {
123-
id,
124-
records,
125-
total,
126-
},
122+
Ok((records, total)) => ServerMessage::QueryResult { id, records, total },
127123
Err(message) => ServerMessage::Error {
128124
code: ErrorCode::ServerError,
129125
topic: None,
@@ -192,8 +188,8 @@ mod tests {
192188
use std::sync::Mutex;
193189
use std::time::Duration;
194190

195-
use aimdb_core::session::{run_session, SessionConfig};
196191
use aimdb_core::router::RouterBuilder;
192+
use aimdb_core::session::{run_session, SessionConfig};
197193
use aimdb_core::{Connection, SessionLimits, TransportResult};
198194
use tokio::sync::mpsc;
199195

@@ -266,8 +262,10 @@ mod tests {
266262
// snapshot, then a bus broadcast fans out as a Data frame.
267263
#[tokio::test]
268264
async fn subscribe_ack_snapshot_and_fanout() {
269-
let dispatch =
270-
dispatch_with(Arc::new(OneSnap("sensors/temp".into(), b"\"last\"".to_vec())));
265+
let dispatch = dispatch_with(Arc::new(OneSnap(
266+
"sensors/temp".into(),
267+
b"\"last\"".to_vec(),
268+
)));
271269
let mgr = dispatch.client_mgr.clone();
272270

273271
let (tx, rx) = mpsc::unbounded_channel::<Vec<u8>>();
@@ -303,8 +301,12 @@ mod tests {
303301

304302
// Ack + snapshot should have been emitted, in order.
305303
let msgs = parse(&out);
306-
assert!(matches!(&msgs[0], ServerMessage::Subscribed { topics } if topics == &vec!["sensors/temp".to_string()]));
307-
assert!(matches!(&msgs[1], ServerMessage::Snapshot { topic, .. } if topic == "sensors/temp"));
304+
assert!(
305+
matches!(&msgs[0], ServerMessage::Subscribed { topics } if topics == &vec!["sensors/temp".to_string()])
306+
);
307+
assert!(
308+
matches!(&msgs[1], ServerMessage::Snapshot { topic, .. } if topic == "sensors/temp")
309+
);
308310

309311
// A bus broadcast now fans out to this subscription as a Data frame.
310312
mgr.broadcast("sensors/temp", b"\"22.5\"").await;
@@ -313,7 +315,9 @@ mod tests {
313315
let data = msgs
314316
.iter()
315317
.find_map(|m| match m {
316-
ServerMessage::Data { topic, payload, .. } => Some((topic.clone(), payload.clone())),
318+
ServerMessage::Data { topic, payload, .. } => {
319+
Some((topic.clone(), payload.clone()))
320+
}
317321
_ => None,
318322
})
319323
.expect("a Data frame");

aimdb-websocket-connector/src/transport.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,9 @@ impl WsDialer {
142142

143143
#[cfg(feature = "client")]
144144
impl aimdb_core::Dialer for WsDialer {
145-
fn connect(&self) -> aimdb_core::BoxFut<'_, aimdb_core::TransportResult<Box<dyn aimdb_core::Connection>>> {
145+
fn connect(
146+
&self,
147+
) -> aimdb_core::BoxFut<'_, aimdb_core::TransportResult<Box<dyn aimdb_core::Connection>>> {
146148
Box::pin(async move {
147149
let (ws, _resp) = tokio_tungstenite::connect_async(&self.url)
148150
.await

0 commit comments

Comments
 (0)