diff --git a/Cargo.toml b/Cargo.toml index 27a7359..7d8ef96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,13 @@ [package] -authors = ["ElasT0ny "] +authors = [ + "ElasT0ny ", + "Devon Bagley " +] description = "An asynchronous WAMP implementation" edition = "2018" license = "MIT OR Apache-2.0" name = "wamp_async" -version = "0.3.2-alpha.0" +version = "0.4.0-alpha.0" categories = ["network-programming", "web-programming", "asynchronous"] documentation = "https://docs.rs/wamp_async" diff --git a/README.md b/README.md index 22ed942..de9eebc 100644 --- a/README.md +++ b/README.md @@ -20,10 +20,10 @@ For usage examples, see : ```rust // Register for events - let (_sub_id, mut event_queue) = client.subscribe("peer.heartbeat").await?; + let (_sub_id, mut event_queue) = client.subscribe("peer.heartbeat", SubscribeOptions::empty()).await?; // Wait for the next event match event_queue.recv().await { - Some((_pub_id, args, kwargs)) => println!("Event(args: {:?}, kwargs: {:?})", args, kwargs), + Some((_pub_id, _details, args, kwargs)) => println!("Event(args: {:?}, kwargs: {:?})", args, kwargs), None => println!("Event queue closed"), }; ``` @@ -116,10 +116,11 @@ let rpc_id = client.register("peer.echo", rpc_echo).await?; #### Advanced profile: -| Feature | Desciption | Status | -| --------------------- | --------------------------------------------------------------- | ----------- | -| Client Authentication | Low-level support for Client Authentication (Ticket-based, CRA) | ✔ | -| Progressive Calls | Partial results reported from Callee to Caller | help wanted | +| Feature | Desciption | Status | +| --------------------------- | --------------------------------------------------------------- | ----------- | +| Client Authentication | Low-level support for Client Authentication (Ticket-based, CRA) | ✔ | +| Pattern Based Subscriptions | Wildcard, and Prefix matches for channel topics | ✔ | +| Progressive Calls | Partial results reported from Callee to Caller | help wanted | ## License diff --git a/examples/pattern_based_subscription.rs b/examples/pattern_based_subscription.rs new file mode 100644 index 0000000..114c145 --- /dev/null +++ b/examples/pattern_based_subscription.rs @@ -0,0 +1,86 @@ +use std::error::Error; +use wamp_async::{Client, ClientConfig, OptionBuilder, SubscribeOptions, Arg}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + // Connect to the server + let (mut client, (evt_loop, _rpc_evt_queue)) = Client::connect( + "wss://localhost:8080/ws", + Some(ClientConfig::default().set_ssl_verify(false)), + ) + .await?; + println!("Connected !!"); + + // Spawn the event loop + tokio::spawn(evt_loop); + + println!("Joining realm"); + client.join_realm("realm1").await?; + + let max_events = 10; + let mut cur_event_num: usize = 0; + + // If one of the args is "pub", start as a publisher + if let Some(_) = std::env::args().find(|a| a == "pub") { + loop { + match client.publish(format!("peer.heartbeat.{}", cur_event_num), None, None, true).await { + Ok(pub_id) => println!("\tSent event id {}", pub_id.unwrap()), + Err(e) => { + println!("publish error {}", e); + break; + } + }; + cur_event_num += 1; + // Exit before sleeping + if cur_event_num >= max_events { + break; + } + tokio::time::sleep(std::time::Duration::from_secs(1)).await + } + // Start as a subscriber + } else { + println!( + "Subscribing to peer.heartbeat events. Start another instance with a 'pub' argument" + ); + // Prefix Match + let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", SubscribeOptions::new().with_match("prefix")).await?; + // Wildcard match with empty uri part + let (last_sub_id, mut heartbeat_last) = client.subscribe("peer..9", SubscribeOptions::new().with_match("wildcard")).await?; + println!("Waiting for {} heartbeats...", max_events); + + while cur_event_num < max_events { + tokio::select! { + pre = heartbeat_queue.recv() => match pre { + Some((pub_id, details, args, kwargs)) => { + println!("\tGot {} (details: {:?} args: {:?}, kwargs: {:?})", pub_id, details, args, kwargs); + // The publisher gives us the current event number in the topic. + cur_event_num = match &details["topic"] { + Arg::Uri(topic) => topic.split(".").collect::>().last().unwrap().parse::().unwrap(), + _ => panic!("We got an event with no topic") + } + 1; + }, + None => println!("Subscription is done"), + } + + last = heartbeat_last.recv() => match last { + Some((pub_id, details, args, kwargs)) => { + // We know we are done here. + client.unsubscribe(last_sub_id).await?; + client.unsubscribe(sub_id).await?; + println!("\tLast Heartbeat: {} (details: {:?} args: {:?}, kwargs: {:?})", pub_id, details, args, kwargs) + }, + None => println!("Subscription is done"), + } + } + } + } + + println!("Leaving realm"); + client.leave_realm().await?; + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + client.disconnect().await; + Ok(()) +} diff --git a/examples/pubsub.rs b/examples/pubsub.rs index e897e06..0bba7ec 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -1,5 +1,5 @@ use std::error::Error; -use wamp_async::{Client, ClientConfig}; +use wamp_async::{Client, ClientConfig, OptionBuilder, SubscribeOptions}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -44,13 +44,13 @@ async fn main() -> Result<(), Box> { println!( "Subscribing to peer.heartbeat events. Start another instance with a 'pub' argument" ); - let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat").await?; + let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", SubscribeOptions::new()).await?; println!("Waiting for {} heartbeats...", max_events); while cur_event_num < max_events { match heartbeat_queue.recv().await { - Some((pub_id, args, kwargs)) => { - println!("\tGot {} (args: {:?}, kwargs: {:?})", pub_id, args, kwargs) + Some((pub_id, details, args, kwargs)) => { + println!("\tGot {} (details: {:?}, args: {:?}, kwargs: {:?})", pub_id, details args, kwargs) } None => println!("Subscription is done"), }; diff --git a/src/client.rs b/src/client.rs index d67170c..fe74217 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,6 +12,7 @@ use url::*; pub use crate::common::*; use crate::core::*; use crate::error::*; +use crate::options::*; use crate::serializer::SerializerType; /// Options one can set when connecting to a WAMP server @@ -397,11 +398,16 @@ impl<'a> Client<'a> { pub async fn subscribe>( &self, topic: T, + options: SubscribeOptions ) -> Result<(WampId, SubscriptionQueue), WampError> { // Send the request let (res, result) = oneshot::channel(); if let Err(e) = self.ctl_channel.send(Request::Subscribe { uri: topic.as_ref().to_string(), + options: match options.get_dict() { + Some(dict) => dict, + None => WampDict::new(), + }, res, }) { return Err(From::from(format!( diff --git a/src/common.rs b/src/common.rs index 855096a..da8f695 100644 --- a/src/common.rs +++ b/src/common.rs @@ -65,7 +65,7 @@ pub type WampArgs = Vec; pub type WampKwArgs = serde_json::Map; /// Generic enum that can hold any concrete WAMP value -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(untagged)] pub enum Arg { /// uri: a string URI as defined in URIs @@ -107,6 +107,28 @@ impl ClientRole { ClientRole::Subscriber => "subscriber", } } + + /// Creates a features dictionary to declare for the role + pub fn get_features(&self) -> WampDict { + match self { + ClientRole::Subscriber => { + let mut features = WampDict::new(); + for feature in vec!["pattern_based_subscription"] { + features.insert(feature.to_owned(), Arg::Bool(true)); + } + features.clone() + }, + _ => WampDict::new() + } + } + + /// Returns true if the role has features that need to be declared + pub fn has_features(&self) -> bool { + match self { + ClientRole::Subscriber => true, + _ => false + } + } } /// All the supported roles a server can have diff --git a/src/core/mod.rs b/src/core/mod.rs index 08660a9..faef679 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -15,6 +15,7 @@ mod send; use crate::client; use crate::message::*; +use crate::Arg; pub use send::Request; pub enum Status { @@ -34,6 +35,7 @@ pub type JoinResult = Sender< >; pub type SubscriptionQueue = UnboundedReceiver<( WampId, // Publish event ID + WampDict, // Publish event Details Option, // Publish args Option, )>; // publish kwargs @@ -82,7 +84,7 @@ pub struct Core<'a> { /// Pending subscription requests sent to the server pending_sub: HashMap, /// Current subscriptions - subscriptions: HashMap, Option)>>, + subscriptions: HashMap, Option)>>, /// Pending RPC registration requests sent to the server pending_register: HashMap, PendingRegisterResult)>, @@ -320,7 +322,7 @@ impl<'a> Core<'a> { .await } Request::Leave { res } => send::leave_realm(self, res).await, - Request::Subscribe { uri, res } => send::subscribe(self, uri, res).await, + Request::Subscribe { uri, options, res } => send::subscribe(self, uri, options, res).await, Request::Unsubscribe { sub_id, res } => send::unsubscribe(self, sub_id, res).await, Request::Publish { uri, diff --git a/src/core/recv.rs b/src/core/recv.rs index 6f7fca9..95423c3 100644 --- a/src/core/recv.rs +++ b/src/core/recv.rs @@ -62,7 +62,7 @@ pub async fn event( core: &mut Core<'_>, subscription: WampId, publication: WampId, - _details: WampDict, + details: WampDict, arguments: Option, arguments_kw: Option, ) -> Status { @@ -79,7 +79,7 @@ pub async fn event( // Forward the event to the client if evt_queue - .send((publication, arguments, arguments_kw)) + .send((publication, details, arguments, arguments_kw)) .is_err() { warn!( diff --git a/src/core/send.rs b/src/core/send.rs index 6689887..32f8531 100644 --- a/src/core/send.rs +++ b/src/core/send.rs @@ -24,6 +24,7 @@ pub enum Request<'a> { }, Subscribe { uri: WampString, + options: WampDict, res: PendingSubResult, }, Unsubscribe { @@ -74,7 +75,14 @@ pub async fn join_realm( let mut client_roles: WampDict = WampDict::new(); // Add all of our roles for role in &roles { - client_roles.insert(String::from(role.to_str()), Arg::Dict(WampDict::new())); + let mut roledict = WampDict::new(); + // Support for pattern_based_subscription MUST be announced by Subscribers. + // Crossbar doesn't enforce this, but other brokers might. + if role.has_features() { + roledict.insert("features".to_owned(), Arg::Dict(role.get_features())); + } + + client_roles.insert(String::from(role.to_str()), Arg::Dict(roledict.clone())); } details.insert("roles".to_owned(), Arg::Dict(client_roles)); @@ -187,14 +195,19 @@ pub async fn leave_realm(core: &mut Core<'_>, res: Sender> Status::Ok } -pub async fn subscribe(core: &mut Core<'_>, topic: WampString, res: PendingSubResult) -> Status { +pub async fn subscribe( + core: &mut Core<'_>, + topic: WampString, + options: WampDict, + res: PendingSubResult +) -> Status { let request = core.create_request(); if let Err(e) = core .send(&Msg::Subscribe { request, topic, - options: WampDict::new(), + options }) .await { diff --git a/src/lib.rs b/src/lib.rs index 77a31bf..95e4fb4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,8 +5,10 @@ mod error; mod message; mod serializer; mod transport; +mod options; pub use client::{Client, ClientConfig, ClientState}; pub use common::*; pub use error::*; pub use serializer::SerializerType; +pub use options::*; diff --git a/src/options/mod.rs b/src/options/mod.rs new file mode 100644 index 0000000..584689b --- /dev/null +++ b/src/options/mod.rs @@ -0,0 +1,5 @@ +mod subscription; +mod option; + +pub use option::OptionBuilder; +pub use subscription::SubscribeOptions; diff --git a/src/options/option.rs b/src/options/option.rs new file mode 100644 index 0000000..c7fc3e7 --- /dev/null +++ b/src/options/option.rs @@ -0,0 +1,76 @@ +use crate::{ + Arg, + WampDict, + WampString, +}; + +#[derive(Debug, Clone)] +/// Options specific to roles for key/value pairs +pub enum WampOption { + /// A publisher role feature option + PublishOption(K, V), + /// A Subscriber role feature option + SubscribeOption(K, V), + /// A Caller role feature option + CallOption(K, V), + /// A Callee role feature option + RegisterOption(K, V), + /// An empty option + None +} + +/// Provides generic functionality for role options dictionary generation +pub trait OptionBuilder { + + /// Clones or creates a WampDict and inserts the key/value pair from the supplied WampOption + /// + /// * `option` - The key/value pair to insert into the dictionary + fn with_option(&self, option: WampOption) -> Self where Self: OptionBuilder + Sized { + let mut next_options = match &self.get_dict() { + Some(opts) => opts.clone(), + None => WampDict::new() + }; + + let (key, value) = match Self::validate_option(option.clone()) { + Some(result) => result, + None => panic!("Can't create invalid option {:?}", option) + }; + + next_options.insert(key, value); + + Self::create(Some(next_options.clone())) + } + + // TODO: Actual validation per role here + /// WIP (currently not functional) + /// Validate that the option being passed in is relevant for the role, and that they type of the value is correct for the given key. + /// + /// * `option` - The key/value pair to validate + fn validate_option(option: WampOption) -> Option<(WampString, Arg)> { + match option { + WampOption::PublishOption(key, value) => Some((key, value)), + WampOption::SubscribeOption(key, value) => Some((key, value)), + WampOption::RegisterOption(key, value) => Some((key, value)), + WampOption::CallOption(key, value) => Some((key, value)), + WampOption::None => None, + } + } + + /// Create a new empty builder - provided for convention + fn new() -> Self where Self: OptionBuilder + Sized { + Self::empty() + } + + /// Create a new empty builder + fn empty() -> Self where Self: OptionBuilder + Sized { + Self::create(None) + } + + /// Create an OptionBuilder using the provided WampDict + /// Must implement + fn create(options: Option) -> Self where Self: OptionBuilder + Sized; + /// Return the current builder WampDict + /// Must implement + fn get_dict(&self) -> Option; + +} diff --git a/src/options/subscription.rs b/src/options/subscription.rs new file mode 100644 index 0000000..3a7d919 --- /dev/null +++ b/src/options/subscription.rs @@ -0,0 +1,43 @@ +use crate::{ + WampDict, + Arg +}; +use crate::options::option::{ + OptionBuilder, + WampOption, +}; + +/// Base struct for storing WampDict value +pub struct SubscriptionOptionItem(Option); + +/// Provides functions for adding defined options to the WampDict +impl SubscriptionOptionItem { + /// Add an option for pattern matching the topic of the subscription + pub fn with_match(&self, match_option: &str) -> Self { + self.with_option(WampOption::SubscribeOption("match".to_owned(), Arg::String(match_option.to_owned()))) + } +} + +/// Add base OptionBuilder functionality +impl OptionBuilder for SubscriptionOptionItem { + /// Build a new SubscriptionOptionItem from a provided Option + fn create(options: Option) -> Self where Self: OptionBuilder + Sized { + Self(options) + } + + /// Return the WampDict being operated on and stored by SubscriptionOptionItem + fn get_dict(&self) -> Option { + self.0.clone() + } +} + +/// Default +impl Default for SubscriptionOptionItem { + /// Create a new empty SubscriptionOptionItem + fn default() -> Self { + Self::empty() + } +} + +/// Alias for SubscriptionOptionItem +pub type SubscribeOptions = SubscriptionOptionItem;