Skip to content

Commit ec2b9db

Browse files
mockersfkfc35
andauthored
can have observers through BRP (#23800)
# Objective - Observe events through BRP ## Solution - Add a observe + watch method to observe events - remote observer systems are limited to only the event as a parameter, processing it will have to happen on the client side - depends on #23797 ## Testing - CI with new test --------- Co-authored-by: Kevin Chen <chen.kevin.f@gmail.com>
1 parent 60a860d commit ec2b9db

2 files changed

Lines changed: 198 additions & 2 deletions

File tree

crates/bevy_remote/src/builtin_methods.rs

Lines changed: 192 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
//! Built-in verbs for the Bevy Remote Protocol.
22
3+
use alloc::sync::Arc;
34
use core::any::TypeId;
5+
use std::sync::Mutex;
46

57
use anyhow::{anyhow, Result as AnyhowResult};
68
use bevy_dev_tools::schedule_data::serde::ScheduleData;
@@ -12,16 +14,17 @@ use bevy_ecs::{
1214
message::MessageCursor,
1315
query::QueryBuilder,
1416
reflect::{AppTypeRegistry, ReflectComponent, ReflectEvent, ReflectMessage, ReflectResource},
17+
resource::Resource,
1518
schedule::Schedules,
1619
system::{In, Local},
17-
world::{EntityRef, EntityWorldMut, FilteredEntityRef, Mut, World},
20+
world::{DeferredWorld, EntityRef, EntityWorldMut, FilteredEntityRef, Mut, World},
1821
};
1922
use bevy_log::warn_once;
2023
use bevy_platform::collections::HashMap;
2124
use bevy_reflect::{
2225
serde::{ReflectSerializer, TypedReflectDeserializer},
2326
structs::DynamicStruct,
24-
GetPath, PartialReflect, TypeRegistration, TypeRegistry,
27+
GetPath, PartialReflect, Reflect, TypeRegistration, TypeRegistry,
2528
};
2629
use serde::{de::DeserializeSeed as _, de::IntoDeserializer, Deserialize, Serialize};
2730
use serde_json::{Map, Value};
@@ -98,6 +101,9 @@ pub const BRP_REGISTRY_SCHEMA_METHOD: &str = "registry.schema";
98101
/// The method path for a `schedule.list` request.
99102
pub const BRP_SCHEDULE_LIST: &str = "schedule.list";
100103

104+
/// The method path for a `world.observe+watch` request.
105+
pub const BRP_OBSERVE_METHOD: &str = "world.observe+watch";
106+
101107
/// The method path for a `schedule.graph` request.
102108
pub const BRP_SCHEDULE_GRAPH: &str = "schedule.graph";
103109

@@ -340,6 +346,26 @@ pub struct BrpWriteMessageParams {
340346
pub value: Option<Value>,
341347
}
342348

349+
/// `world.observe+watch`: Registers an observer for the given event type and
350+
/// streams event data back to the client each time the event is triggered.
351+
///
352+
/// If `entity` is provided, observes only events targeting that entity.
353+
/// Otherwise, observes all global triggers of the event.
354+
///
355+
/// The server responds with serialized event data when events are observed.
356+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
357+
pub struct BrpObserveParams {
358+
/// The [full path] of the event type to observe.
359+
///
360+
/// [full path]: bevy_reflect::TypePath::type_path
361+
pub event: String,
362+
363+
/// An optional entity to scope the observer to.
364+
/// When set, only events targeting this entity will be observed.
365+
#[serde(default)]
366+
pub entity: Option<Entity>,
367+
}
368+
343369
/// `schedule.graph`:
344370
///
345371
/// The server responds with [`BrpScheduleGraphResponse`] if the schedule is found,
@@ -1527,6 +1553,117 @@ pub fn process_remote_write_message_request(
15271553
})
15281554
}
15291555

1556+
/// Stores observer state for `world.observe+watch` requests.
1557+
#[derive(Resource, Default)]
1558+
pub struct BrpEventObservers {
1559+
/// Map from observer key to buffered serialized events.
1560+
/// The key encodes both event type and optional entity scope.
1561+
observers: HashMap<String, Arc<Mutex<Vec<Value>>>>,
1562+
}
1563+
1564+
/// Handles a `world.observe+watch` request coming from a client.
1565+
///
1566+
/// On the first call for a given event/entity combination, this registers an observer that captures triggered
1567+
/// events. On each subsequent poll, it returns any events that have been captured since the last poll.
1568+
///
1569+
/// When `entity` is provided, the observer is scoped to that entity. Otherwise a global observer is registered.
1570+
pub fn process_remote_observe_watching_request(
1571+
In(params): In<Option<Value>>,
1572+
world: &mut World,
1573+
) -> BrpResult<Option<Value>> {
1574+
let BrpObserveParams { event, entity } = parse_some(params)?;
1575+
1576+
let key = match entity {
1577+
Some(e) => format!("{event}@{e}"),
1578+
None => event.clone(),
1579+
};
1580+
1581+
if !world.contains_resource::<BrpEventObservers>() {
1582+
world.init_resource::<BrpEventObservers>();
1583+
}
1584+
1585+
let already_registered = world
1586+
.resource::<BrpEventObservers>()
1587+
.observers
1588+
.contains_key(&key);
1589+
1590+
if !already_registered {
1591+
let app_type_registry = world.resource::<AppTypeRegistry>().clone();
1592+
let reflect_event = {
1593+
let type_registry = app_type_registry.read();
1594+
let Some(registration) = type_registry.get_with_type_path(&event) else {
1595+
return Err(BrpError::resource_error(format!(
1596+
"Unknown event type: `{event}`"
1597+
)));
1598+
};
1599+
let Some(reflect_event) = registration.data::<ReflectEvent>() else {
1600+
return Err(BrpError::resource_error(format!(
1601+
"Event `{event}` is not reflectable"
1602+
)));
1603+
};
1604+
reflect_event.clone()
1605+
};
1606+
1607+
let buffer: Arc<Mutex<Vec<Value>>> = Arc::new(Mutex::new(Vec::new()));
1608+
let buffer_clone = buffer.clone();
1609+
let registry_clone = app_type_registry.clone();
1610+
let event_clone = event.clone();
1611+
1612+
let callback = Box::new(move |event_data: &dyn Reflect, _world: DeferredWorld| {
1613+
let reg = registry_clone.read();
1614+
let serializer = ReflectSerializer::new(event_data, &reg);
1615+
match serde_json::to_value(&serializer) {
1616+
Ok(value) => {
1617+
buffer_clone.lock().unwrap().push(value);
1618+
}
1619+
Err(err) => {
1620+
warn_once!("Failed to serialize observed event `{event_clone}`: {err}");
1621+
// Push a placeholder so the client still gets notified.
1622+
buffer_clone
1623+
.lock()
1624+
.unwrap()
1625+
.push(serde_json::json!({ "event": event_clone }));
1626+
}
1627+
}
1628+
});
1629+
1630+
let observer = reflect_event.create_observer(callback);
1631+
if let Some(target) = entity {
1632+
world.spawn(observer.with_entity(target));
1633+
} else {
1634+
world.spawn(observer);
1635+
}
1636+
1637+
world
1638+
.resource_mut::<BrpEventObservers>()
1639+
.observers
1640+
.insert(key.clone(), buffer);
1641+
}
1642+
1643+
let observers = world.resource::<BrpEventObservers>();
1644+
let Some(buffer) = observers.observers.get(&key) else {
1645+
return Err(BrpError::internal(anyhow!("Observer state missing")));
1646+
};
1647+
1648+
let mut events = buffer.lock().unwrap();
1649+
if events.is_empty() {
1650+
return Ok(None);
1651+
}
1652+
1653+
let captured: Vec<Value> = events
1654+
.drain(..)
1655+
.map(|json_event| {
1656+
json_event
1657+
.get(&event)
1658+
.expect("event keyed by its type path")
1659+
.to_owned()
1660+
})
1661+
.collect();
1662+
serde_json::to_value(captured)
1663+
.map(Some)
1664+
.map_err(BrpError::internal)
1665+
}
1666+
15301667
/// Handles a `registry.schema` request (list all registry types in form of schema) coming from a client.
15311668
pub fn export_registry_types(In(params): In<Option<Value>>, world: &World) -> BrpResult {
15321669
let filter: BrpJsonSchemaQueryFilter = match params {
@@ -1982,6 +2119,59 @@ mod tests {
19822119
assert!(world.resource::<TestResult>().0);
19832120
}
19842121

2122+
#[test]
2123+
fn observe_watching_captures_triggered_events() {
2124+
#[derive(Event, Reflect)]
2125+
#[reflect(Event)]
2126+
struct Ping {
2127+
value: u32,
2128+
}
2129+
2130+
let atr = AppTypeRegistry::default();
2131+
{
2132+
let mut register = atr.write();
2133+
register.register::<Ping>();
2134+
}
2135+
let mut world = World::new();
2136+
world.insert_resource(atr);
2137+
2138+
let observe_params = serde_json::to_value(&BrpObserveParams {
2139+
event: "bevy_remote::builtin_methods::tests::Ping".to_owned(),
2140+
entity: None,
2141+
})
2142+
.expect("FAIL");
2143+
2144+
assert_eq!(
2145+
process_remote_observe_watching_request(In(Some(observe_params.clone())), &mut world,),
2146+
Ok(None)
2147+
);
2148+
assert!(world.contains_resource::<BrpEventObservers>());
2149+
2150+
let trigger_params = serde_json::to_value(&BrpTriggerEventParams {
2151+
event: "bevy_remote::builtin_methods::tests::Ping".to_owned(),
2152+
value: Some(serde_json::json!({ "value": 42 })),
2153+
})
2154+
.expect("FAIL");
2155+
assert_eq!(
2156+
process_remote_trigger_event_request(In(Some(trigger_params)), &mut world),
2157+
Ok(Null)
2158+
);
2159+
2160+
let captured =
2161+
process_remote_observe_watching_request(In(Some(observe_params.clone())), &mut world)
2162+
.expect("poll should succeed")
2163+
.expect("events should be returned");
2164+
let events: Vec<Value> =
2165+
serde_json::from_value(captured).expect("captured events are a JSON array");
2166+
assert_eq!(events.len(), 1);
2167+
assert_eq!(events[0].get("value"), Some(&serde_json::json!(42)));
2168+
2169+
assert_eq!(
2170+
process_remote_observe_watching_request(In(Some(observe_params)), &mut world),
2171+
Ok(None)
2172+
);
2173+
}
2174+
19852175
#[test]
19862176
fn write_reflect_only_message() {
19872177
#[derive(Message, Reflect)]

crates/bevy_remote/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -765,6 +765,11 @@ impl RemotePlugin {
765765
builtin_methods::process_remote_write_message_request,
766766
to_main,
767767
)
768+
.with_watching_method(
769+
builtin_methods::BRP_OBSERVE_METHOD,
770+
builtin_methods::process_remote_observe_watching_request,
771+
to_main,
772+
)
768773
.with_method(
769774
builtin_methods::BRP_REGISTRY_SCHEMA_METHOD,
770775
builtin_methods::export_registry_types,
@@ -832,6 +837,7 @@ impl Plugin for RemotePlugin {
832837
app.insert_resource(remote_methods)
833838
.init_resource::<schemas::SchemaTypesMetadata>()
834839
.init_resource::<RemoteWatchingRequests>()
840+
.init_resource::<builtin_methods::BrpEventObservers>()
835841
.add_systems(PreStartup, setup_mailbox_channel)
836842
.configure_sets(
837843
RemoteLast,

0 commit comments

Comments
 (0)