Skip to content
Open
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
[package]
authors = ["ElasT0ny <elast0ny00@gmail.com>"]
authors = [
"ElasT0ny <elast0ny00@gmail.com>",
"Devon Bagley <devonbagley@gmail.com>"
]
description = "An asynchronous WAMP implementation"
edition = "2018"
license = "MIT OR Apache-2.0"
name = "wamp_async"
version = "0.3.2-alpha.0"
version = "0.4.0-alpha.0"

categories = ["network-programming", "web-programming", "asynchronous"]
documentation = "https://docs.rs/wamp_async"
Expand Down
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ For usage examples, see :

```rust
// Register for events
let (_sub_id, mut event_queue) = client.subscribe("peer.heartbeat").await?;
let (_sub_id, mut event_queue) = client.subscribe("peer.heartbeat", SubscribeOptions::empty()).await?;
// Wait for the next event
match event_queue.recv().await {
Some((_pub_id, args, kwargs)) => println!("Event(args: {:?}, kwargs: {:?})", args, kwargs),
Some((_pub_id, _details, args, kwargs)) => println!("Event(args: {:?}, kwargs: {:?})", args, kwargs),
None => println!("Event queue closed"),
};
```
Expand Down Expand Up @@ -116,10 +116,11 @@ let rpc_id = client.register("peer.echo", rpc_echo).await?;

#### Advanced profile:

| Feature | Desciption | Status |
| --------------------- | --------------------------------------------------------------- | ----------- |
| Client Authentication | Low-level support for Client Authentication (Ticket-based, CRA) | ✔ |
| Progressive Calls | Partial results reported from Callee to Caller | help wanted |
| Feature | Desciption | Status |
| --------------------------- | --------------------------------------------------------------- | ----------- |
| Client Authentication | Low-level support for Client Authentication (Ticket-based, CRA) | ✔ |
| Pattern Based Subscriptions | Wildcard, and Prefix matches for channel topics | ✔ |
| Progressive Calls | Partial results reported from Callee to Caller | help wanted |

## License

Expand Down
86 changes: 86 additions & 0 deletions examples/pattern_based_subscription.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::error::Error;
use wamp_async::{Client, ClientConfig, OptionBuilder, SubscribeOptions, Arg};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();

// Connect to the server
let (mut client, (evt_loop, _rpc_evt_queue)) = Client::connect(
"wss://localhost:8080/ws",
Some(ClientConfig::default().set_ssl_verify(false)),
)
.await?;
println!("Connected !!");

// Spawn the event loop
tokio::spawn(evt_loop);

println!("Joining realm");
client.join_realm("realm1").await?;

let max_events = 10;
let mut cur_event_num: usize = 0;

// If one of the args is "pub", start as a publisher
if let Some(_) = std::env::args().find(|a| a == "pub") {
loop {
match client.publish(format!("peer.heartbeat.{}", cur_event_num), None, None, true).await {
Ok(pub_id) => println!("\tSent event id {}", pub_id.unwrap()),
Err(e) => {
println!("publish error {}", e);
break;
}
};
cur_event_num += 1;
// Exit before sleeping
if cur_event_num >= max_events {
break;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await
}
// Start as a subscriber
} else {
println!(
"Subscribing to peer.heartbeat events. Start another instance with a 'pub' argument"
);
// Prefix Match
let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", SubscribeOptions::new().with_match("prefix")).await?;
// Wildcard match with empty uri part
let (last_sub_id, mut heartbeat_last) = client.subscribe("peer..9", SubscribeOptions::new().with_match("wildcard")).await?;
println!("Waiting for {} heartbeats...", max_events);

while cur_event_num < max_events {
tokio::select! {
pre = heartbeat_queue.recv() => match pre {
Some((pub_id, details, args, kwargs)) => {
println!("\tGot {} (details: {:?} args: {:?}, kwargs: {:?})", pub_id, details, args, kwargs);
// The publisher gives us the current event number in the topic.
cur_event_num = match &details["topic"] {
Arg::Uri(topic) => topic.split(".").collect::<Vec<&str>>().last().unwrap().parse::<usize>().unwrap(),
_ => panic!("We got an event with no topic")
} + 1;
},
None => println!("Subscription is done"),
}

last = heartbeat_last.recv() => match last {
Some((pub_id, details, args, kwargs)) => {
// We know we are done here.
client.unsubscribe(last_sub_id).await?;
client.unsubscribe(sub_id).await?;
println!("\tLast Heartbeat: {} (details: {:?} args: {:?}, kwargs: {:?})", pub_id, details, args, kwargs)
},
None => println!("Subscription is done"),
}
}
}
}

println!("Leaving realm");
client.leave_realm().await?;

tokio::time::sleep(std::time::Duration::from_secs(1)).await;
client.disconnect().await;
Ok(())
}
8 changes: 4 additions & 4 deletions examples/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::error::Error;
use wamp_async::{Client, ClientConfig};
use wamp_async::{Client, ClientConfig, OptionBuilder, SubscribeOptions};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
Expand Down Expand Up @@ -44,13 +44,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
println!(
"Subscribing to peer.heartbeat events. Start another instance with a 'pub' argument"
);
let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat").await?;
let (sub_id, mut heartbeat_queue) = client.subscribe("peer.heartbeat", SubscribeOptions::new()).await?;
println!("Waiting for {} heartbeats...", max_events);

while cur_event_num < max_events {
match heartbeat_queue.recv().await {
Some((pub_id, args, kwargs)) => {
println!("\tGot {} (args: {:?}, kwargs: {:?})", pub_id, args, kwargs)
Some((pub_id, details, args, kwargs)) => {
println!("\tGot {} (details: {:?}, args: {:?}, kwargs: {:?})", pub_id, details args, kwargs)
}
None => println!("Subscription is done"),
};
Expand Down
6 changes: 6 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use url::*;
pub use crate::common::*;
use crate::core::*;
use crate::error::*;
use crate::options::*;
use crate::serializer::SerializerType;

/// Options one can set when connecting to a WAMP server
Expand Down Expand Up @@ -397,11 +398,16 @@ impl<'a> Client<'a> {
pub async fn subscribe<T: AsRef<str>>(
&self,
topic: T,
options: SubscribeOptions
) -> Result<(WampId, SubscriptionQueue), WampError> {
// Send the request
let (res, result) = oneshot::channel();
if let Err(e) = self.ctl_channel.send(Request::Subscribe {
uri: topic.as_ref().to_string(),
options: match options.get_dict() {
Some(dict) => dict,
None => WampDict::new(),
},
res,
}) {
return Err(From::from(format!(
Expand Down
24 changes: 23 additions & 1 deletion src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub type WampArgs = Vec<WampPayloadValue>;
pub type WampKwArgs = serde_json::Map<String, WampPayloadValue>;

/// Generic enum that can hold any concrete WAMP value
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(untagged)]
pub enum Arg {
/// uri: a string URI as defined in URIs
Expand Down Expand Up @@ -107,6 +107,28 @@ impl ClientRole {
ClientRole::Subscriber => "subscriber",
}
}

/// Creates a features dictionary to declare for the role
pub fn get_features(&self) -> WampDict {
match self {
ClientRole::Subscriber => {
let mut features = WampDict::new();
for feature in vec!["pattern_based_subscription"] {
features.insert(feature.to_owned(), Arg::Bool(true));
}
features.clone()
},
_ => WampDict::new()
}
}

/// Returns true if the role has features that need to be declared
pub fn has_features(&self) -> bool {
match self {
ClientRole::Subscriber => true,
_ => false
}
}
}

/// All the supported roles a server can have
Expand Down
6 changes: 4 additions & 2 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod send;

use crate::client;
use crate::message::*;
use crate::Arg;
pub use send::Request;

pub enum Status {
Expand All @@ -34,6 +35,7 @@ pub type JoinResult = Sender<
>;
pub type SubscriptionQueue = UnboundedReceiver<(
WampId, // Publish event ID
WampDict, // Publish event Details
Option<WampArgs>, // Publish args
Option<WampKwArgs>,
)>; // publish kwargs
Expand Down Expand Up @@ -82,7 +84,7 @@ pub struct Core<'a> {
/// Pending subscription requests sent to the server
pending_sub: HashMap<WampId, PendingSubResult>,
/// Current subscriptions
subscriptions: HashMap<WampId, UnboundedSender<(WampId, Option<WampArgs>, Option<WampKwArgs>)>>,
subscriptions: HashMap<WampId, UnboundedSender<(WampId, WampDict, Option<WampArgs>, Option<WampKwArgs>)>>,

/// Pending RPC registration requests sent to the server
pending_register: HashMap<WampId, (RpcFunc<'a>, PendingRegisterResult)>,
Expand Down Expand Up @@ -320,7 +322,7 @@ impl<'a> Core<'a> {
.await
}
Request::Leave { res } => send::leave_realm(self, res).await,
Request::Subscribe { uri, res } => send::subscribe(self, uri, res).await,
Request::Subscribe { uri, options, res } => send::subscribe(self, uri, options, res).await,
Request::Unsubscribe { sub_id, res } => send::unsubscribe(self, sub_id, res).await,
Request::Publish {
uri,
Expand Down
4 changes: 2 additions & 2 deletions src/core/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub async fn event(
core: &mut Core<'_>,
subscription: WampId,
publication: WampId,
_details: WampDict,
details: WampDict,
arguments: Option<WampArgs>,
arguments_kw: Option<WampKwArgs>,
) -> Status {
Expand All @@ -79,7 +79,7 @@ pub async fn event(

// Forward the event to the client
if evt_queue
.send((publication, arguments, arguments_kw))
.send((publication, details, arguments, arguments_kw))
.is_err()
{
warn!(
Expand Down
19 changes: 16 additions & 3 deletions src/core/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub enum Request<'a> {
},
Subscribe {
uri: WampString,
options: WampDict,
res: PendingSubResult,
},
Unsubscribe {
Expand Down Expand Up @@ -74,7 +75,14 @@ pub async fn join_realm(
let mut client_roles: WampDict = WampDict::new();
// Add all of our roles
for role in &roles {
client_roles.insert(String::from(role.to_str()), Arg::Dict(WampDict::new()));
let mut roledict = WampDict::new();
// Support for pattern_based_subscription MUST be announced by Subscribers.
// Crossbar doesn't enforce this, but other brokers might.
if role.has_features() {
roledict.insert("features".to_owned(), Arg::Dict(role.get_features()));
}

client_roles.insert(String::from(role.to_str()), Arg::Dict(roledict.clone()));
}
details.insert("roles".to_owned(), Arg::Dict(client_roles));

Expand Down Expand Up @@ -187,14 +195,19 @@ pub async fn leave_realm(core: &mut Core<'_>, res: Sender<Result<(), WampError>>
Status::Ok
}

pub async fn subscribe(core: &mut Core<'_>, topic: WampString, res: PendingSubResult) -> Status {
pub async fn subscribe(
core: &mut Core<'_>,
topic: WampString,
options: WampDict,
res: PendingSubResult
) -> Status {
let request = core.create_request();

if let Err(e) = core
.send(&Msg::Subscribe {
request,
topic,
options: WampDict::new(),
options
})
.await
{
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ mod error;
mod message;
mod serializer;
mod transport;
mod options;

pub use client::{Client, ClientConfig, ClientState};
pub use common::*;
pub use error::*;
pub use serializer::SerializerType;
pub use options::*;
5 changes: 5 additions & 0 deletions src/options/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod subscription;
mod option;

pub use option::OptionBuilder;
pub use subscription::SubscribeOptions;
Loading