From efdc8e0a057a06fffd7b4b4bf6e0cec83470db38 Mon Sep 17 00:00:00 2001 From: Devon Bagley Date: Fri, 20 May 2022 01:03:53 -0700 Subject: [PATCH 1/9] Options on subscriptions are a prerequisite for more advanced usage --- examples/pubsub.rs | 2 +- src/client.rs | 5 +++++ src/core/mod.rs | 2 +- src/core/send.rs | 10 ++++++++-- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/examples/pubsub.rs b/examples/pubsub.rs index e897e06..ac6b4fa 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box> { println!( "Subscribing to peer.heartbeat events. Start another instance with a 'pub' argument" ); - let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat").await?; + let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", None).await?; println!("Waiting for {} heartbeats...", max_events); while cur_event_num < max_events { diff --git a/src/client.rs b/src/client.rs index d67170c..0d07688 100644 --- a/src/client.rs +++ b/src/client.rs @@ -397,11 +397,16 @@ impl<'a> Client<'a> { pub async fn subscribe>( &self, topic: T, + options: Option ) -> Result<(WampId, SubscriptionQueue), WampError> { // Send the request let (res, result) = oneshot::channel(); if let Err(e) = self.ctl_channel.send(Request::Subscribe { uri: topic.as_ref().to_string(), + options: match options { + Some(dict) => dict, + None => WampDict::new(), + }, res, }) { return Err(From::from(format!( diff --git a/src/core/mod.rs b/src/core/mod.rs index 08660a9..d9c3fab 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -320,7 +320,7 @@ impl<'a> Core<'a> { .await } Request::Leave { res } => send::leave_realm(self, res).await, - Request::Subscribe { uri, res } => send::subscribe(self, uri, res).await, + Request::Subscribe { uri, options, res } => send::subscribe(self, uri, options, res).await, Request::Unsubscribe { sub_id, res } => send::unsubscribe(self, sub_id, res).await, Request::Publish { uri, diff --git a/src/core/send.rs b/src/core/send.rs index 6689887..dc70442 100644 --- a/src/core/send.rs +++ b/src/core/send.rs @@ -24,6 +24,7 @@ pub enum Request<'a> { }, Subscribe { uri: WampString, + options: WampDict, res: PendingSubResult, }, Unsubscribe { @@ -187,14 +188,19 @@ pub async fn leave_realm(core: &mut Core<'_>, res: Sender> Status::Ok } -pub async fn subscribe(core: &mut Core<'_>, topic: WampString, res: PendingSubResult) -> Status { +pub async fn subscribe( + core: &mut Core<'_>, + topic: WampString, + options: WampDict, + res: PendingSubResult +) -> Status { let request = core.create_request(); if let Err(e) = core .send(&Msg::Subscribe { request, topic, - options: WampDict::new(), + options }) .await { From 03c698910f08ee33115cfe52b77d3ca09e4445da Mon Sep 17 00:00:00 2001 From: Devon Bagley Date: Fri, 20 May 2022 10:14:14 -0700 Subject: [PATCH 2/9] Subscription options builder --- src/client.rs | 4 ++-- src/common.rs | 35 ++++++++++++++++++++++++++++++++++- src/core/mod.rs | 1 + 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index 0d07688..ed183c1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -397,13 +397,13 @@ impl<'a> Client<'a> { pub async fn subscribe>( &self, topic: T, - options: Option + options: SubOptions ) -> Result<(WampId, SubscriptionQueue), WampError> { // Send the request let (res, result) = oneshot::channel(); if let Err(e) = self.ctl_channel.send(Request::Subscribe { uri: topic.as_ref().to_string(), - options: match options { + options: match options.get_dict() { Some(dict) => dict, None => WampDict::new(), }, diff --git a/src/common.rs b/src/common.rs index 855096a..e27fbca 100644 --- a/src/common.rs +++ b/src/common.rs @@ -64,8 +64,41 @@ pub type WampArgs = Vec; /// Named WAMP argument map pub type WampKwArgs = serde_json::Map; +pub struct SubOptions(Option); + +impl SubOptions { + pub fn with_match(&self, matchOpt: String) -> Self { + let mut options = match &self.0 { + Some(opts) => opts.clone(), + None => WampDict::new(), + }; + + options.insert("match".to_string(), Arg::String(matchOpt)); + + SubOptions(Some(options)) + } + + pub fn new() -> Self { + Self::empty() + } + + pub fn empty() -> Self { + SubOptions(None) + } + + pub fn get_dict(&self) -> Option { + self.0.clone() + } +} + +impl Default for SubOptions { + fn default() -> Self { + Self::empty() + } +} + /// Generic enum that can hold any concrete WAMP value -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(untagged)] pub enum Arg { /// uri: a string URI as defined in URIs diff --git a/src/core/mod.rs b/src/core/mod.rs index d9c3fab..4d03c22 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -15,6 +15,7 @@ mod send; use crate::client; use crate::message::*; +use crate::Arg; pub use send::Request; pub enum Status { From da2ee3a4c4940bcc071e3f85dc48e5f2a6802d20 Mon Sep 17 00:00:00 2001 From: Devon Bagley Date: Fri, 20 May 2022 10:58:53 -0700 Subject: [PATCH 3/9] Add client feature announcement --- src/common.rs | 20 ++++++++++++++++++++ src/core/send.rs | 9 ++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/common.rs b/src/common.rs index e27fbca..52673f6 100644 --- a/src/common.rs +++ b/src/common.rs @@ -140,6 +140,26 @@ impl ClientRole { ClientRole::Subscriber => "subscriber", } } + + pub fn get_features(&self) -> WampDict { + match self { + ClientRole::Subscriber => { + let mut features = WampDict::new(); + for feature in vec!["pattern_based_subscription"] { + features.insert(feature.to_owned(), Arg::Bool(true)); + } + features.clone() + }, + _ => WampDict::new() + } + } + + pub fn has_features(&self) -> WampBool { + match self { + ClientRole::Subscriber => true, + _ => false + } + } } /// All the supported roles a server can have diff --git a/src/core/send.rs b/src/core/send.rs index dc70442..32f8531 100644 --- a/src/core/send.rs +++ b/src/core/send.rs @@ -75,7 +75,14 @@ pub async fn join_realm( let mut client_roles: WampDict = WampDict::new(); // Add all of our roles for role in &roles { - client_roles.insert(String::from(role.to_str()), Arg::Dict(WampDict::new())); + let mut roledict = WampDict::new(); + // Support for pattern_based_subscription MUST be announced by Subscribers. + // Crossbar doesn't enforce this, but other brokers might. + if role.has_features() { + roledict.insert("features".to_owned(), Arg::Dict(role.get_features())); + } + + client_roles.insert(String::from(role.to_str()), Arg::Dict(roledict.clone())); } details.insert("roles".to_owned(), Arg::Dict(client_roles)); From 02e48a09ad8fc6a8bd88405e041cc77e9761739f Mon Sep 17 00:00:00 2001 From: Devon Bagley Date: Fri, 20 May 2022 12:03:20 -0700 Subject: [PATCH 4/9] Providing event details is a prerequisite for advanced functionality/extensibility. --- src/core/mod.rs | 3 ++- src/core/recv.rs | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/core/mod.rs b/src/core/mod.rs index 4d03c22..faef679 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -35,6 +35,7 @@ pub type JoinResult = Sender< >; pub type SubscriptionQueue = UnboundedReceiver<( WampId, // Publish event ID + WampDict, // Publish event Details Option, // Publish args Option, )>; // publish kwargs @@ -83,7 +84,7 @@ pub struct Core<'a> { /// Pending subscription requests sent to the server pending_sub: HashMap, /// Current subscriptions - subscriptions: HashMap, Option)>>, + subscriptions: HashMap, Option)>>, /// Pending RPC registration requests sent to the server pending_register: HashMap, PendingRegisterResult)>, diff --git a/src/core/recv.rs b/src/core/recv.rs index 6f7fca9..95423c3 100644 --- a/src/core/recv.rs +++ b/src/core/recv.rs @@ -62,7 +62,7 @@ pub async fn event( core: &mut Core<'_>, subscription: WampId, publication: WampId, - _details: WampDict, + details: WampDict, arguments: Option, arguments_kw: Option, ) -> Status { @@ -79,7 +79,7 @@ pub async fn event( // Forward the event to the client if evt_queue - .send((publication, arguments, arguments_kw)) + .send((publication, details, arguments, arguments_kw)) .is_err() { warn!( From b385ba79db829b43e5242db7c1de08cdd9ada242 Mon Sep 17 00:00:00 2001 From: Devon Bagley Date: Fri, 20 May 2022 15:20:45 -0700 Subject: [PATCH 5/9] Provide some examples, and update README --- README.md | 13 ++-- examples/pattern_based_subscription.rs | 84 ++++++++++++++++++++++++++ examples/pubsub.rs | 8 +-- src/common.rs | 4 +- 4 files changed, 97 insertions(+), 12 deletions(-) create mode 100644 examples/pattern_based_subscription.rs diff --git a/README.md b/README.md index 22ed942..ce39e4b 100644 --- a/README.md +++ b/README.md @@ -20,10 +20,10 @@ For usage examples, see : ```rust // Register for events - let (_sub_id, mut event_queue) = client.subscribe("peer.heartbeat").await?; + let (_sub_id, mut event_queue) = client.subscribe("peer.heartbeat", SubOptions::empty()).await?; // Wait for the next event match event_queue.recv().await { - Some((_pub_id, args, kwargs)) => println!("Event(args: {:?}, kwargs: {:?})", args, kwargs), + Some((_pub_id, _details, args, kwargs)) => println!("Event(args: {:?}, kwargs: {:?})", args, kwargs), None => println!("Event queue closed"), }; ``` @@ -116,10 +116,11 @@ let rpc_id = client.register("peer.echo", rpc_echo).await?; #### Advanced profile: -| Feature | Desciption | Status | -| --------------------- | --------------------------------------------------------------- | ----------- | -| Client Authentication | Low-level support for Client Authentication (Ticket-based, CRA) | ✔ | -| Progressive Calls | Partial results reported from Callee to Caller | help wanted | +| Feature | Desciption | Status | +| --------------------------- | --------------------------------------------------------------- | ----------- | +| Client Authentication | Low-level support for Client Authentication (Ticket-based, CRA) | ✔ | +| Pattern Based Subscriptions | Wildcard, and Prefix matches for channel topics | ✔ | +| Progressive Calls | Partial results reported from Callee to Caller | help wanted | ## License diff --git a/examples/pattern_based_subscription.rs b/examples/pattern_based_subscription.rs new file mode 100644 index 0000000..e878507 --- /dev/null +++ b/examples/pattern_based_subscription.rs @@ -0,0 +1,84 @@ +use std::error::Error; +use wamp_async::{Client, ClientConfig, SubOptions, Arg}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + // Connect to the server + let (mut client, (evt_loop, _rpc_evt_queue)) = Client::connect( + "wss://localhost:8080/ws", + Some(ClientConfig::default().set_ssl_verify(false)), + ) + .await?; + println!("Connected !!"); + + // Spawn the event loop + tokio::spawn(evt_loop); + + println!("Joining realm"); + client.join_realm("realm1").await?; + + let max_events = 10; + let mut cur_event_num: usize = 0; + + // If one of the args is "pub", start as a publisher + if let Some(_) = std::env::args().find(|a| a == "pub") { + loop { + match client.publish(format!("peer.heartbeat.{}", cur_event_num), None, None, true).await { + Ok(pub_id) => println!("\tSent event id {}", pub_id.unwrap()), + Err(e) => { + println!("publish error {}", e); + break; + } + }; + cur_event_num += 1; + // Exit before sleeping + if cur_event_num >= max_events { + break; + } + tokio::time::sleep(std::time::Duration::from_secs(1)).await + } + // Start as a subscriber + } else { + println!( + "Subscribing to peer.heartbeat events. Start another instance with a 'pub' argument" + ); + // Prefix Match + let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", SubOptions::new().with_match("prefix")).await?; + // Wildcard match with empty uri part + let (sub_id, mut heartbeat_last) = client.subscribe("peer..9", SubOptions::new().with_match("wildcard")).await?; + println!("Waiting for {} heartbeats...", max_events); + + while cur_event_num < max_events { + tokio::select! { + pre = heartbeat_queue.recv() => match pre { + Some((pub_id, details, args, kwargs)) => { + println!("\tGot {} (details: {:?} args: {:?}, kwargs: {:?})", pub_id, details, args, kwargs); + cur_event_num = match &details["topic"] { + Arg::Uri(topic) => topic.split(".").collect::>().last().unwrap().parse::().unwrap(), + _ => 0 + }; + }, + None => println!("Subscription is done"), + } + + last = heartbeat_last.recv() => match last { + Some((pub_id, details, args, kwargs)) => { + println!("\tLast Heartbeat: {} (details: {:?} args: {:?}, kwargs: {:?})", pub_id, details, args, kwargs) + }, + None => println!("Subscription is done"), + } + } + } + + client.unsubscribe(sub_id).await?; + } + + println!("Leaving realm"); + client.leave_realm().await?; + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + client.disconnect().await; + Ok(()) +} diff --git a/examples/pubsub.rs b/examples/pubsub.rs index ac6b4fa..31b3326 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -1,5 +1,5 @@ use std::error::Error; -use wamp_async::{Client, ClientConfig}; +use wamp_async::{Client, ClientConfig, SubOptions}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -44,13 +44,13 @@ async fn main() -> Result<(), Box> { println!( "Subscribing to peer.heartbeat events. Start another instance with a 'pub' argument" ); - let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", None).await?; + let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", SubOptions::new()).await?; println!("Waiting for {} heartbeats...", max_events); while cur_event_num < max_events { match heartbeat_queue.recv().await { - Some((pub_id, args, kwargs)) => { - println!("\tGot {} (args: {:?}, kwargs: {:?})", pub_id, args, kwargs) + Some((pub_id, details, args, kwargs)) => { + println!("\tGot {} (details: {:?}, args: {:?}, kwargs: {:?})", pub_id, details args, kwargs) } None => println!("Subscription is done"), }; diff --git a/src/common.rs b/src/common.rs index 52673f6..db40d27 100644 --- a/src/common.rs +++ b/src/common.rs @@ -67,13 +67,13 @@ pub type WampKwArgs = serde_json::Map; pub struct SubOptions(Option); impl SubOptions { - pub fn with_match(&self, matchOpt: String) -> Self { + pub fn with_match(&self, match_option: &str) -> Self { let mut options = match &self.0 { Some(opts) => opts.clone(), None => WampDict::new(), }; - options.insert("match".to_string(), Arg::String(matchOpt)); + options.insert("match".to_string(), Arg::String(match_option.to_owned())); SubOptions(Some(options)) } From 6abf6e74dcbedbdd3b77016c1eb0c0e6a49a5c5e Mon Sep 17 00:00:00 2001 From: Devon Bagley Date: Fri, 20 May 2022 18:58:46 -0700 Subject: [PATCH 6/9] chore(cargo-release): prepare next iterations version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 27a7359..bbf8629 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ description = "An asynchronous WAMP implementation" edition = "2018" license = "MIT OR Apache-2.0" name = "wamp_async" -version = "0.3.2-alpha.0" +version = "0.4.0-alpha.0" categories = ["network-programming", "web-programming", "asynchronous"] documentation = "https://docs.rs/wamp_async" From 1e6cfa97a3ee3cbd1133c729bba20424f4fcc343 Mon Sep 17 00:00:00 2001 From: Devon Bagley Date: Sat, 21 May 2022 02:32:53 -0700 Subject: [PATCH 7/9] Refactoring Options Builder to be more generic --- src/client.rs | 3 +- src/common.rs | 33 ---------------------- src/lib.rs | 2 ++ src/options/mod.rs | 5 ++++ src/options/option.rs | 56 +++++++++++++++++++++++++++++++++++++ src/options/subscription.rs | 34 ++++++++++++++++++++++ 6 files changed, 99 insertions(+), 34 deletions(-) create mode 100644 src/options/mod.rs create mode 100644 src/options/option.rs create mode 100644 src/options/subscription.rs diff --git a/src/client.rs b/src/client.rs index ed183c1..fe74217 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,6 +12,7 @@ use url::*; pub use crate::common::*; use crate::core::*; use crate::error::*; +use crate::options::*; use crate::serializer::SerializerType; /// Options one can set when connecting to a WAMP server @@ -397,7 +398,7 @@ impl<'a> Client<'a> { pub async fn subscribe>( &self, topic: T, - options: SubOptions + options: SubscribeOptions ) -> Result<(WampId, SubscriptionQueue), WampError> { // Send the request let (res, result) = oneshot::channel(); diff --git a/src/common.rs b/src/common.rs index db40d27..4235474 100644 --- a/src/common.rs +++ b/src/common.rs @@ -64,39 +64,6 @@ pub type WampArgs = Vec; /// Named WAMP argument map pub type WampKwArgs = serde_json::Map; -pub struct SubOptions(Option); - -impl SubOptions { - pub fn with_match(&self, match_option: &str) -> Self { - let mut options = match &self.0 { - Some(opts) => opts.clone(), - None => WampDict::new(), - }; - - options.insert("match".to_string(), Arg::String(match_option.to_owned())); - - SubOptions(Some(options)) - } - - pub fn new() -> Self { - Self::empty() - } - - pub fn empty() -> Self { - SubOptions(None) - } - - pub fn get_dict(&self) -> Option { - self.0.clone() - } -} - -impl Default for SubOptions { - fn default() -> Self { - Self::empty() - } -} - /// Generic enum that can hold any concrete WAMP value #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(untagged)] diff --git a/src/lib.rs b/src/lib.rs index 77a31bf..95e4fb4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,8 +5,10 @@ mod error; mod message; mod serializer; mod transport; +mod options; pub use client::{Client, ClientConfig, ClientState}; pub use common::*; pub use error::*; pub use serializer::SerializerType; +pub use options::*; diff --git a/src/options/mod.rs b/src/options/mod.rs new file mode 100644 index 0000000..584689b --- /dev/null +++ b/src/options/mod.rs @@ -0,0 +1,5 @@ +mod subscription; +mod option; + +pub use option::OptionBuilder; +pub use subscription::SubscribeOptions; diff --git a/src/options/option.rs b/src/options/option.rs new file mode 100644 index 0000000..1e3d476 --- /dev/null +++ b/src/options/option.rs @@ -0,0 +1,56 @@ +use crate::{ + Arg, + WampDict, + WampString, +}; + +#[derive(Debug, Clone)] +pub enum WampOption { + PublishOption(K, V), + SubscribeOption(K, V), + CallOption(K, V), + RegisterOption(K, V), + None +} + +pub trait OptionBuilder { + + fn with_option(&self, option: WampOption) -> Self where Self: OptionBuilder + Sized { + let mut next_options = match &self.get_dict() { + Some(opts) => opts.clone(), + None => WampDict::new() + }; + + let (key, value) = match Self::validate_option(option.clone()) { + Some(result) => result, + None => panic!("Can't create invalid option {:?}", option) + }; + + next_options.insert(key, value); + + Self::create(Some(next_options.clone())) + } + + // TODO: Actual validation per role here + fn validate_option(option: WampOption) -> Option<(WampString, Arg)> { + match option { + WampOption::PublishOption(key, value) => Some((key, value)), + WampOption::SubscribeOption(key, value) => Some((key, value)), + WampOption::RegisterOption(key, value) => Some((key, value)), + WampOption::CallOption(key, value) => Some((key, value)), + WampOption::None => None, + } + } + + fn new() -> Self where Self: OptionBuilder + Sized { + Self::empty() + } + + fn empty() -> Self where Self: OptionBuilder + Sized { + Self::create(None) + } + + fn create(options: Option) -> Self where Self: OptionBuilder + Sized; + fn get_dict(&self) -> Option; + +} diff --git a/src/options/subscription.rs b/src/options/subscription.rs new file mode 100644 index 0000000..a55041b --- /dev/null +++ b/src/options/subscription.rs @@ -0,0 +1,34 @@ +use crate::{ + WampDict, + Arg +}; +use crate::options::option::{ + OptionBuilder, + WampOption, +}; + +pub struct SubscriptionOptionItem(Option); + +impl SubscriptionOptionItem { + pub fn with_match(&self, match_option: &str) -> Self { + self.with_option(WampOption::SubscribeOption("match".to_owned(), Arg::String(match_option.to_owned()))) + } +} + +impl OptionBuilder for SubscriptionOptionItem { + fn create(options: Option) -> Self where Self: OptionBuilder + Sized { + Self(options) + } + + fn get_dict(&self) -> Option { + self.0.clone() + } +} + +impl Default for SubscriptionOptionItem { + fn default() -> Self { + Self::empty() + } +} + +pub type SubscribeOptions = SubscriptionOptionItem; From c19875dd5d84b1d71354b140d150e13ae02b9616 Mon Sep 17 00:00:00 2001 From: Devon Bagley Date: Sat, 21 May 2022 03:41:55 -0700 Subject: [PATCH 8/9] Update docs to reflect refactor --- Cargo.toml | 5 ++++- examples/pattern_based_subscription.rs | 16 +++++++++------- examples/pubsub.rs | 4 ++-- src/common.rs | 4 +++- src/options/option.rs | 22 +++++++++++++++++++++- src/options/subscription.rs | 9 +++++++++ 6 files changed, 48 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bbf8629..7d8ef96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,8 @@ [package] -authors = ["ElasT0ny "] +authors = [ + "ElasT0ny ", + "Devon Bagley " +] description = "An asynchronous WAMP implementation" edition = "2018" license = "MIT OR Apache-2.0" diff --git a/examples/pattern_based_subscription.rs b/examples/pattern_based_subscription.rs index e878507..114c145 100644 --- a/examples/pattern_based_subscription.rs +++ b/examples/pattern_based_subscription.rs @@ -1,5 +1,5 @@ use std::error::Error; -use wamp_async::{Client, ClientConfig, SubOptions, Arg}; +use wamp_async::{Client, ClientConfig, OptionBuilder, SubscribeOptions, Arg}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -45,9 +45,9 @@ async fn main() -> Result<(), Box> { "Subscribing to peer.heartbeat events. Start another instance with a 'pub' argument" ); // Prefix Match - let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", SubOptions::new().with_match("prefix")).await?; + let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", SubscribeOptions::new().with_match("prefix")).await?; // Wildcard match with empty uri part - let (sub_id, mut heartbeat_last) = client.subscribe("peer..9", SubOptions::new().with_match("wildcard")).await?; + let (last_sub_id, mut heartbeat_last) = client.subscribe("peer..9", SubscribeOptions::new().with_match("wildcard")).await?; println!("Waiting for {} heartbeats...", max_events); while cur_event_num < max_events { @@ -55,24 +55,26 @@ async fn main() -> Result<(), Box> { pre = heartbeat_queue.recv() => match pre { Some((pub_id, details, args, kwargs)) => { println!("\tGot {} (details: {:?} args: {:?}, kwargs: {:?})", pub_id, details, args, kwargs); + // The publisher gives us the current event number in the topic. cur_event_num = match &details["topic"] { Arg::Uri(topic) => topic.split(".").collect::>().last().unwrap().parse::().unwrap(), - _ => 0 - }; + _ => panic!("We got an event with no topic") + } + 1; }, None => println!("Subscription is done"), } last = heartbeat_last.recv() => match last { Some((pub_id, details, args, kwargs)) => { + // We know we are done here. + client.unsubscribe(last_sub_id).await?; + client.unsubscribe(sub_id).await?; println!("\tLast Heartbeat: {} (details: {:?} args: {:?}, kwargs: {:?})", pub_id, details, args, kwargs) }, None => println!("Subscription is done"), } } } - - client.unsubscribe(sub_id).await?; } println!("Leaving realm"); diff --git a/examples/pubsub.rs b/examples/pubsub.rs index 31b3326..0bba7ec 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -1,5 +1,5 @@ use std::error::Error; -use wamp_async::{Client, ClientConfig, SubOptions}; +use wamp_async::{Client, ClientConfig, OptionBuilder, SubscribeOptions}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box> { println!( "Subscribing to peer.heartbeat events. Start another instance with a 'pub' argument" ); - let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", SubOptions::new()).await?; + let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", SubscribeOptions::new()).await?; println!("Waiting for {} heartbeats...", max_events); while cur_event_num < max_events { diff --git a/src/common.rs b/src/common.rs index 4235474..da8f695 100644 --- a/src/common.rs +++ b/src/common.rs @@ -108,6 +108,7 @@ impl ClientRole { } } + /// Creates a features dictionary to declare for the role pub fn get_features(&self) -> WampDict { match self { ClientRole::Subscriber => { @@ -121,7 +122,8 @@ impl ClientRole { } } - pub fn has_features(&self) -> WampBool { + /// Returns true if the role has features that need to be declared + pub fn has_features(&self) -> bool { match self { ClientRole::Subscriber => true, _ => false diff --git a/src/options/option.rs b/src/options/option.rs index 1e3d476..c7fc3e7 100644 --- a/src/options/option.rs +++ b/src/options/option.rs @@ -5,16 +5,26 @@ use crate::{ }; #[derive(Debug, Clone)] +/// Options specific to roles for key/value pairs pub enum WampOption { + /// A publisher role feature option PublishOption(K, V), + /// A Subscriber role feature option SubscribeOption(K, V), + /// A Caller role feature option CallOption(K, V), + /// A Callee role feature option RegisterOption(K, V), + /// An empty option None } +/// Provides generic functionality for role options dictionary generation pub trait OptionBuilder { + /// Clones or creates a WampDict and inserts the key/value pair from the supplied WampOption + /// + /// * `option` - The key/value pair to insert into the dictionary fn with_option(&self, option: WampOption) -> Self where Self: OptionBuilder + Sized { let mut next_options = match &self.get_dict() { Some(opts) => opts.clone(), @@ -32,6 +42,10 @@ pub trait OptionBuilder { } // TODO: Actual validation per role here + /// WIP (currently not functional) + /// Validate that the option being passed in is relevant for the role, and that they type of the value is correct for the given key. + /// + /// * `option` - The key/value pair to validate fn validate_option(option: WampOption) -> Option<(WampString, Arg)> { match option { WampOption::PublishOption(key, value) => Some((key, value)), @@ -42,15 +56,21 @@ pub trait OptionBuilder { } } + /// Create a new empty builder - provided for convention fn new() -> Self where Self: OptionBuilder + Sized { Self::empty() } + /// Create a new empty builder fn empty() -> Self where Self: OptionBuilder + Sized { Self::create(None) } + /// Create an OptionBuilder using the provided WampDict + /// Must implement fn create(options: Option) -> Self where Self: OptionBuilder + Sized; + /// Return the current builder WampDict + /// Must implement fn get_dict(&self) -> Option; - + } diff --git a/src/options/subscription.rs b/src/options/subscription.rs index a55041b..3a7d919 100644 --- a/src/options/subscription.rs +++ b/src/options/subscription.rs @@ -7,28 +7,37 @@ use crate::options::option::{ WampOption, }; +/// Base struct for storing WampDict value pub struct SubscriptionOptionItem(Option); +/// Provides functions for adding defined options to the WampDict impl SubscriptionOptionItem { + /// Add an option for pattern matching the topic of the subscription pub fn with_match(&self, match_option: &str) -> Self { self.with_option(WampOption::SubscribeOption("match".to_owned(), Arg::String(match_option.to_owned()))) } } +/// Add base OptionBuilder functionality impl OptionBuilder for SubscriptionOptionItem { + /// Build a new SubscriptionOptionItem from a provided Option fn create(options: Option) -> Self where Self: OptionBuilder + Sized { Self(options) } + /// Return the WampDict being operated on and stored by SubscriptionOptionItem fn get_dict(&self) -> Option { self.0.clone() } } +/// Default impl Default for SubscriptionOptionItem { + /// Create a new empty SubscriptionOptionItem fn default() -> Self { Self::empty() } } +/// Alias for SubscriptionOptionItem pub type SubscribeOptions = SubscriptionOptionItem; From 07109adb7e5f49e54ae4f2ad8942e5e31b80167f Mon Sep 17 00:00:00 2001 From: Devon Bagley Date: Sat, 21 May 2022 03:41:55 -0700 Subject: [PATCH 9/9] Update docs to reflect refactor --- Cargo.toml | 5 ++++- README.md | 2 +- examples/pattern_based_subscription.rs | 16 +++++++++------- examples/pubsub.rs | 4 ++-- src/common.rs | 4 +++- src/options/option.rs | 22 +++++++++++++++++++++- src/options/subscription.rs | 9 +++++++++ 7 files changed, 49 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bbf8629..7d8ef96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,8 @@ [package] -authors = ["ElasT0ny "] +authors = [ + "ElasT0ny ", + "Devon Bagley " +] description = "An asynchronous WAMP implementation" edition = "2018" license = "MIT OR Apache-2.0" diff --git a/README.md b/README.md index ce39e4b..de9eebc 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ For usage examples, see : ```rust // Register for events - let (_sub_id, mut event_queue) = client.subscribe("peer.heartbeat", SubOptions::empty()).await?; + let (_sub_id, mut event_queue) = client.subscribe("peer.heartbeat", SubscribeOptions::empty()).await?; // Wait for the next event match event_queue.recv().await { Some((_pub_id, _details, args, kwargs)) => println!("Event(args: {:?}, kwargs: {:?})", args, kwargs), diff --git a/examples/pattern_based_subscription.rs b/examples/pattern_based_subscription.rs index e878507..114c145 100644 --- a/examples/pattern_based_subscription.rs +++ b/examples/pattern_based_subscription.rs @@ -1,5 +1,5 @@ use std::error::Error; -use wamp_async::{Client, ClientConfig, SubOptions, Arg}; +use wamp_async::{Client, ClientConfig, OptionBuilder, SubscribeOptions, Arg}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -45,9 +45,9 @@ async fn main() -> Result<(), Box> { "Subscribing to peer.heartbeat events. Start another instance with a 'pub' argument" ); // Prefix Match - let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", SubOptions::new().with_match("prefix")).await?; + let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", SubscribeOptions::new().with_match("prefix")).await?; // Wildcard match with empty uri part - let (sub_id, mut heartbeat_last) = client.subscribe("peer..9", SubOptions::new().with_match("wildcard")).await?; + let (last_sub_id, mut heartbeat_last) = client.subscribe("peer..9", SubscribeOptions::new().with_match("wildcard")).await?; println!("Waiting for {} heartbeats...", max_events); while cur_event_num < max_events { @@ -55,24 +55,26 @@ async fn main() -> Result<(), Box> { pre = heartbeat_queue.recv() => match pre { Some((pub_id, details, args, kwargs)) => { println!("\tGot {} (details: {:?} args: {:?}, kwargs: {:?})", pub_id, details, args, kwargs); + // The publisher gives us the current event number in the topic. cur_event_num = match &details["topic"] { Arg::Uri(topic) => topic.split(".").collect::>().last().unwrap().parse::().unwrap(), - _ => 0 - }; + _ => panic!("We got an event with no topic") + } + 1; }, None => println!("Subscription is done"), } last = heartbeat_last.recv() => match last { Some((pub_id, details, args, kwargs)) => { + // We know we are done here. + client.unsubscribe(last_sub_id).await?; + client.unsubscribe(sub_id).await?; println!("\tLast Heartbeat: {} (details: {:?} args: {:?}, kwargs: {:?})", pub_id, details, args, kwargs) }, None => println!("Subscription is done"), } } } - - client.unsubscribe(sub_id).await?; } println!("Leaving realm"); diff --git a/examples/pubsub.rs b/examples/pubsub.rs index 31b3326..0bba7ec 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -1,5 +1,5 @@ use std::error::Error; -use wamp_async::{Client, ClientConfig, SubOptions}; +use wamp_async::{Client, ClientConfig, OptionBuilder, SubscribeOptions}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box> { println!( "Subscribing to peer.heartbeat events. Start another instance with a 'pub' argument" ); - let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", SubOptions::new()).await?; + let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", SubscribeOptions::new()).await?; println!("Waiting for {} heartbeats...", max_events); while cur_event_num < max_events { diff --git a/src/common.rs b/src/common.rs index 4235474..da8f695 100644 --- a/src/common.rs +++ b/src/common.rs @@ -108,6 +108,7 @@ impl ClientRole { } } + /// Creates a features dictionary to declare for the role pub fn get_features(&self) -> WampDict { match self { ClientRole::Subscriber => { @@ -121,7 +122,8 @@ impl ClientRole { } } - pub fn has_features(&self) -> WampBool { + /// Returns true if the role has features that need to be declared + pub fn has_features(&self) -> bool { match self { ClientRole::Subscriber => true, _ => false diff --git a/src/options/option.rs b/src/options/option.rs index 1e3d476..c7fc3e7 100644 --- a/src/options/option.rs +++ b/src/options/option.rs @@ -5,16 +5,26 @@ use crate::{ }; #[derive(Debug, Clone)] +/// Options specific to roles for key/value pairs pub enum WampOption { + /// A publisher role feature option PublishOption(K, V), + /// A Subscriber role feature option SubscribeOption(K, V), + /// A Caller role feature option CallOption(K, V), + /// A Callee role feature option RegisterOption(K, V), + /// An empty option None } +/// Provides generic functionality for role options dictionary generation pub trait OptionBuilder { + /// Clones or creates a WampDict and inserts the key/value pair from the supplied WampOption + /// + /// * `option` - The key/value pair to insert into the dictionary fn with_option(&self, option: WampOption) -> Self where Self: OptionBuilder + Sized { let mut next_options = match &self.get_dict() { Some(opts) => opts.clone(), @@ -32,6 +42,10 @@ pub trait OptionBuilder { } // TODO: Actual validation per role here + /// WIP (currently not functional) + /// Validate that the option being passed in is relevant for the role, and that they type of the value is correct for the given key. + /// + /// * `option` - The key/value pair to validate fn validate_option(option: WampOption) -> Option<(WampString, Arg)> { match option { WampOption::PublishOption(key, value) => Some((key, value)), @@ -42,15 +56,21 @@ pub trait OptionBuilder { } } + /// Create a new empty builder - provided for convention fn new() -> Self where Self: OptionBuilder + Sized { Self::empty() } + /// Create a new empty builder fn empty() -> Self where Self: OptionBuilder + Sized { Self::create(None) } + /// Create an OptionBuilder using the provided WampDict + /// Must implement fn create(options: Option) -> Self where Self: OptionBuilder + Sized; + /// Return the current builder WampDict + /// Must implement fn get_dict(&self) -> Option; - + } diff --git a/src/options/subscription.rs b/src/options/subscription.rs index a55041b..3a7d919 100644 --- a/src/options/subscription.rs +++ b/src/options/subscription.rs @@ -7,28 +7,37 @@ use crate::options::option::{ WampOption, }; +/// Base struct for storing WampDict value pub struct SubscriptionOptionItem(Option); +/// Provides functions for adding defined options to the WampDict impl SubscriptionOptionItem { + /// Add an option for pattern matching the topic of the subscription pub fn with_match(&self, match_option: &str) -> Self { self.with_option(WampOption::SubscribeOption("match".to_owned(), Arg::String(match_option.to_owned()))) } } +/// Add base OptionBuilder functionality impl OptionBuilder for SubscriptionOptionItem { + /// Build a new SubscriptionOptionItem from a provided Option fn create(options: Option) -> Self where Self: OptionBuilder + Sized { Self(options) } + /// Return the WampDict being operated on and stored by SubscriptionOptionItem fn get_dict(&self) -> Option { self.0.clone() } } +/// Default impl Default for SubscriptionOptionItem { + /// Create a new empty SubscriptionOptionItem fn default() -> Self { Self::empty() } } +/// Alias for SubscriptionOptionItem pub type SubscribeOptions = SubscriptionOptionItem;