Skip to content

Commit acc9924

Browse files
committed
fix: Add mobile mDNS service metadata handling
1 parent 4f61626 commit acc9924

5 files changed

Lines changed: 374 additions & 31 deletions

File tree

crates/buttplug_transport_websocket_tungstenite/src/websocket_server.rs

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use buttplug_core::{
1616
message::serializer::ButtplugSerializedMessage,
1717
};
1818
use futures::{FutureExt, SinkExt, StreamExt, future::BoxFuture};
19-
use std::{sync::Arc, time::Duration};
19+
use std::{fmt, sync::Arc, time::Duration};
2020
use tokio::{
2121
net::{TcpListener, TcpStream},
2222
select,
@@ -27,19 +27,42 @@ use tokio::{
2727
time::sleep,
2828
};
2929

30+
#[derive(Clone)]
31+
struct ListenerBoundCallback(Arc<dyn Fn(u16) + Send + Sync>);
32+
33+
impl ListenerBoundCallback {
34+
fn new(callback: impl Fn(u16) + Send + Sync + 'static) -> Self {
35+
Self(Arc::new(callback))
36+
}
37+
38+
fn call(&self, port: u16) {
39+
(self.0)(port);
40+
}
41+
}
42+
43+
impl fmt::Debug for ListenerBoundCallback {
44+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45+
f.debug_struct("ListenerBoundCallback")
46+
.finish_non_exhaustive()
47+
}
48+
}
49+
3050
#[derive(Clone, Debug)]
3151
pub struct ButtplugWebsocketServerTransportBuilder {
3252
/// If true, listens all on available interfaces. Otherwise, only listens on 127.0.0.1.
3353
listen_on_all_interfaces: bool,
3454
/// Insecure port for listening for websocket connections.
3555
port: u16,
56+
/// Optional callback fired after the listener is bound and the actual local port is known.
57+
listener_bound_callback: Option<ListenerBoundCallback>,
3658
}
3759

3860
impl Default for ButtplugWebsocketServerTransportBuilder {
3961
fn default() -> Self {
4062
Self {
4163
listen_on_all_interfaces: false,
4264
port: 12345,
65+
listener_bound_callback: None,
4366
}
4467
}
4568
}
@@ -55,10 +78,16 @@ impl ButtplugWebsocketServerTransportBuilder {
5578
self
5679
}
5780

81+
pub fn on_listener_bound(&mut self, callback: impl Fn(u16) + Send + Sync + 'static) -> &mut Self {
82+
self.listener_bound_callback = Some(ListenerBoundCallback::new(callback));
83+
self
84+
}
85+
5886
pub fn finish(&self) -> ButtplugWebsocketServerTransport {
5987
ButtplugWebsocketServerTransport {
6088
port: self.port,
6189
listen_on_all_interfaces: self.listen_on_all_interfaces,
90+
listener_bound_callback: self.listener_bound_callback.clone(),
6291
disconnect_notifier: Arc::new(Notify::new()),
6392
}
6493
}
@@ -193,6 +222,7 @@ async fn run_connection_loop(
193222
pub struct ButtplugWebsocketServerTransport {
194223
port: u16,
195224
listen_on_all_interfaces: bool,
225+
listener_bound_callback: Option<ListenerBoundCallback>,
196226
disconnect_notifier: Arc<Notify>,
197227
}
198228

@@ -203,6 +233,7 @@ impl ButtplugConnectorTransport for ButtplugWebsocketServerTransport {
203233
incoming_sender: Sender<ButtplugTransportIncomingMessage>,
204234
) -> BoxFuture<'static, Result<(), ButtplugConnectorError>> {
205235
let disconnect_notifier = self.disconnect_notifier.clone();
236+
let listener_bound_callback = self.listener_bound_callback.clone();
206237

207238
let base_addr = if self.listen_on_all_interfaces {
208239
"0.0.0.0"
@@ -231,6 +262,19 @@ impl ButtplugConnectorTransport for ButtplugWebsocketServerTransport {
231262
)
232263
})?;
233264
debug!("Websocket: Listening on: {}", addr);
265+
if let Some(callback) = &listener_bound_callback {
266+
let local_port = listener
267+
.local_addr()
268+
.map_err(|e| {
269+
ButtplugConnectorError::TransportSpecificError(
270+
ButtplugConnectorTransportSpecificError::GenericNetworkError(format!(
271+
"Could not determine websocket listener local address: {e}"
272+
)),
273+
)
274+
})?
275+
.port();
276+
callback.call(local_port);
277+
}
234278
if let Ok((stream, _)) = listener.accept().await {
235279
info!("Websocket: Got connection");
236280
let ws_stream = tokio_tungstenite::accept_async(stream)
@@ -288,6 +332,7 @@ mod test {
288332
message::serializer::ButtplugSerializedMessage,
289333
};
290334
use std::io::ErrorKind;
335+
use std::sync::{Arc, Mutex};
291336
use tokio::{net::TcpListener, sync::mpsc};
292337

293338
#[tokio::test]
@@ -322,4 +367,37 @@ mod test {
322367
other => panic!("Unexpected error: {other:?}"),
323368
}
324369
}
370+
371+
#[tokio::test]
372+
async fn listener_bound_callback_receives_actual_port() {
373+
let bound_port = Arc::new(Mutex::new(None));
374+
let callback_port = bound_port.clone();
375+
let transport = ButtplugWebsocketServerTransportBuilder::default()
376+
.on_listener_bound(move |port| {
377+
*callback_port.lock().unwrap() = Some(port);
378+
})
379+
.finish();
380+
let (_outgoing_sender, outgoing_receiver) = mpsc::channel::<ButtplugSerializedMessage>(1);
381+
let (incoming_sender, _incoming_receiver) =
382+
mpsc::channel::<ButtplugTransportIncomingMessage>(1);
383+
let connect_task = tokio::spawn(async move {
384+
let _ = transport.connect(outgoing_receiver, incoming_sender).await;
385+
});
386+
387+
tokio::time::timeout(std::time::Duration::from_secs(1), async {
388+
loop {
389+
if let Some(port) = *bound_port.lock().unwrap() {
390+
return port;
391+
}
392+
tokio::task::yield_now().await;
393+
}
394+
})
395+
.await
396+
.expect("listener bound callback was not called");
397+
398+
let port = bound_port.lock().unwrap().unwrap();
399+
assert!(port > 0);
400+
401+
connect_task.abort();
402+
}
325403
}

crates/intiface_engine/src/buttplug_server.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,18 +164,23 @@ pub async fn setup_buttplug_server(
164164
pub async fn run_server(
165165
server: &ButtplugRemoteServer,
166166
options: &EngineOptions,
167+
on_listener_bound: Option<Arc<dyn Fn(u16) + Send + Sync>>,
167168
) -> Result<(), ButtplugServerConnectorError> {
168169
if let Some(port) = options.websocket_port() {
170+
let mut transport_builder = ButtplugWebsocketServerTransportBuilder::default();
171+
transport_builder
172+
.port(port)
173+
.listen_on_all_interfaces(options.websocket_use_all_interfaces());
174+
if let Some(on_listener_bound) = on_listener_bound {
175+
transport_builder.on_listener_bound(move |bound_port| {
176+
on_listener_bound(bound_port);
177+
});
178+
}
169179
server
170180
.start(ButtplugRemoteServerConnector::<
171181
_,
172182
ButtplugServerJSONSerializer,
173-
>::new(
174-
ButtplugWebsocketServerTransportBuilder::default()
175-
.port(port)
176-
.listen_on_all_interfaces(options.websocket_use_all_interfaces())
177-
.finish(),
178-
))
183+
>::new(transport_builder.finish()))
179184
.await
180185
} else if let Some(addr) = options.websocket_client_address() {
181186
server

crates/intiface_engine/src/engine.rs

Lines changed: 82 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::{
1414
Frontend, frontend_external_event_loop, frontend_server_event_loop,
1515
process_messages::{EngineErrorDetail, EngineMessage},
1616
},
17-
mdns::IntifaceMdns,
17+
mdns::{IntifaceMdns, IntifaceMdnsServiceMetadata},
1818
options::EngineOptions,
1919
remote_server::{ButtplugRemoteServerEvent, ButtplugServerConnectorError},
2020
rest_server::IntifaceRestServer,
@@ -26,7 +26,15 @@ use buttplug_core::connector::{
2626
use buttplug_server_device_config::{DeviceConfigurationManager, save_user_config};
2727
use futures::{StreamExt, pin_mut};
2828
use once_cell::sync::OnceCell;
29-
use std::{io::ErrorKind, path::Path, sync::Arc, time::Duration};
29+
use std::{
30+
io::ErrorKind,
31+
path::Path,
32+
sync::{
33+
Arc, Mutex,
34+
atomic::{AtomicBool, Ordering},
35+
},
36+
time::Duration,
37+
};
3038
use tokio::{fs, select};
3139
use tokio_util::sync::CancellationToken;
3240

@@ -90,6 +98,39 @@ fn websocket_port_in_use_error(err: &ButtplugServerConnectorError) -> Option<(St
9098
}
9199
}
92100

101+
fn engine_server_created_message(
102+
metadata: Option<&IntifaceMdnsServiceMetadata>,
103+
port: Option<u16>,
104+
) -> EngineMessage {
105+
if let Some(metadata) = metadata {
106+
EngineMessage::EngineServerCreated {
107+
service_type: Some(metadata.service_type.clone()),
108+
instance_name: Some(metadata.instance_name.clone()),
109+
port,
110+
txt_records: Some(metadata.txt_records.clone()),
111+
}
112+
} else {
113+
EngineMessage::EngineServerCreated {
114+
service_type: None,
115+
instance_name: None,
116+
port: None,
117+
txt_records: None,
118+
}
119+
}
120+
}
121+
122+
async fn send_engine_server_created(
123+
frontend: Option<&Arc<dyn Frontend>>,
124+
metadata: Option<&IntifaceMdnsServiceMetadata>,
125+
port: Option<u16>,
126+
) {
127+
if let Some(frontend) = frontend {
128+
frontend
129+
.send(engine_server_created_message(metadata, port))
130+
.await;
131+
}
132+
}
133+
93134
impl IntifaceEngine {
94135
pub fn backdoor_server(&self) -> Option<Arc<BackdoorServer>> {
95136
Some(self.backdoor_server.get()?.clone())
@@ -112,15 +153,41 @@ impl IntifaceEngine {
112153
frontend.send(EngineMessage::EngineStarted {}).await;
113154
}
114155

115-
// Set up mDNS
116-
let _mdns_server = if options.broadcast_server_mdns() {
117-
// TODO Unregister whenever we have a live connection
118-
119-
// TODO Support different services for engine versus repeater
120-
IntifaceMdns::new()
121-
} else {
122-
None
123-
};
156+
let mdns_service_metadata =
157+
if options.broadcast_server_mdns() && options.websocket_port().is_some() {
158+
Some(Arc::new(IntifaceMdnsServiceMetadata::new(
159+
options.mdns_suffix().as_deref(),
160+
)))
161+
} else {
162+
None
163+
};
164+
let mdns_publisher = Arc::new(Mutex::new(None::<IntifaceMdns>));
165+
let engine_server_created_sent = Arc::new(AtomicBool::new(false));
166+
let on_listener_bound = mdns_service_metadata.as_ref().map(|metadata| {
167+
let metadata = Arc::clone(metadata);
168+
let frontend = frontend.clone();
169+
let mdns_publisher = Arc::clone(&mdns_publisher);
170+
let engine_server_created_sent = Arc::clone(&engine_server_created_sent);
171+
Arc::new(move |port: u16| {
172+
if !engine_server_created_sent.swap(true, Ordering::SeqCst) {
173+
let metadata = Arc::clone(&metadata);
174+
let frontend = frontend.clone();
175+
tokio::spawn(async move {
176+
send_engine_server_created(frontend.as_ref(), Some(metadata.as_ref()), Some(port))
177+
.await;
178+
});
179+
}
180+
#[cfg(target_os = "ios")]
181+
info!("Skipping Rust mDNS publisher on iOS; host app is expected to publish Bonjour");
182+
#[cfg(not(target_os = "ios"))]
183+
{
184+
let mut publisher = mdns_publisher.lock().unwrap();
185+
if publisher.is_none() {
186+
*publisher = IntifaceMdns::new(metadata.as_ref(), port);
187+
}
188+
}
189+
}) as Arc<dyn Fn(u16) + Send + Sync>
190+
});
124191

125192
// Set up Repeater (if in repeater mode)
126193
if options.repeater_mode() {
@@ -233,7 +300,9 @@ impl IntifaceEngine {
233300
}
234301
}
235302
if let Some(frontend) = &frontend {
236-
frontend.send(EngineMessage::EngineServerCreated {}).await;
303+
if on_listener_bound.is_none() {
304+
send_engine_server_created(Some(frontend), None, None).await;
305+
}
237306
let event_receiver = server.event_stream();
238307
let frontend_clone = frontend.clone();
239308
let stop_child_token = self.stop_token.child_token();
@@ -257,7 +326,7 @@ impl IntifaceEngine {
257326
info!("Owner requested process exit, exiting.");
258327
exit_requested = true;
259328
}
260-
result = run_server(&server, options) => {
329+
result = run_server(&server, options, on_listener_bound.clone()) => {
261330
match result {
262331
Ok(_) => info!("Connection dropped, restarting stay open loop."),
263332
Err(e) => {

crates/intiface_engine/src/frontend/process_messages.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,16 @@ pub enum EngineMessage {
2727
#[serde(skip_serializing_if = "Option::is_none")]
2828
detail: Option<EngineErrorDetail>,
2929
},
30-
EngineServerCreated {},
30+
EngineServerCreated {
31+
#[serde(default, skip_serializing_if = "Option::is_none")]
32+
service_type: Option<String>,
33+
#[serde(default, skip_serializing_if = "Option::is_none")]
34+
instance_name: Option<String>,
35+
#[serde(default, skip_serializing_if = "Option::is_none")]
36+
port: Option<u16>,
37+
#[serde(default, skip_serializing_if = "Option::is_none")]
38+
txt_records: Option<Vec<String>>,
39+
},
3140
EngineStopped {},
3241
ClientConnected {
3342
client_name: String,
@@ -103,4 +112,42 @@ mod test {
103112
}),
104113
);
105114
}
115+
116+
#[test]
117+
fn engine_server_created_serializes_optional_service_metadata() {
118+
let message = EngineMessage::EngineServerCreated {
119+
service_type: Some("_intiface_engine._tcp".to_owned()),
120+
instance_name: Some("Intiface ABC123".to_owned()),
121+
port: Some(12345),
122+
txt_records: Some(vec!["path=/".to_owned()]),
123+
};
124+
125+
assert_eq!(
126+
serde_json::to_value(message).unwrap(),
127+
json!({
128+
"EngineServerCreated": {
129+
"service_type": "_intiface_engine._tcp",
130+
"instance_name": "Intiface ABC123",
131+
"port": 12345,
132+
"txt_records": ["path=/"]
133+
}
134+
}),
135+
);
136+
}
137+
138+
#[test]
139+
fn engine_server_created_deserializes_legacy_empty_payload() {
140+
let message: EngineMessage =
141+
serde_json::from_value(json!({"EngineServerCreated": {}})).unwrap();
142+
143+
assert!(matches!(
144+
message,
145+
EngineMessage::EngineServerCreated {
146+
service_type: None,
147+
instance_name: None,
148+
port: None,
149+
txt_records: None,
150+
}
151+
));
152+
}
106153
}

0 commit comments

Comments
 (0)