Skip to content

Commit d4a37d9

Browse files
Add locality and timestamp options (#558)
* add locality and timestamp options * clippy fix * docs fix
1 parent 953b5a8 commit d4a37d9

8 files changed

Lines changed: 134 additions & 31 deletions

File tree

src/ext.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ use crate::{
2121
macros::{build, import, option_wrapper, py_static, try_import, wrapper},
2222
pubsub::Subscriber,
2323
qos::{CongestionControl, Priority, Reliability},
24-
sample::Sample,
24+
sample::{Locality, Sample},
2525
session::{EntityGlobalId, Session},
26+
time::Timestamp,
2627
utils::{duration, generic, wait, MapInto},
2728
ZDeserializeError,
2829
};
@@ -481,26 +482,30 @@ impl AdvancedPublisher {
481482
Ok(self.get_ref()?.priority().into())
482483
}
483484

484-
// TODO add timestamp
485-
#[pyo3(signature = (payload, *, encoding = None, attachment = None))]
485+
#[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None))]
486486
fn put(
487487
&self,
488488
py: Python,
489489
#[pyo3(from_py_with = "ZBytes::from_py")] payload: ZBytes,
490490
#[pyo3(from_py_with = "Encoding::from_py_opt")] encoding: Option<Encoding>,
491491
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
492+
timestamp: Option<Timestamp>,
492493
) -> PyResult<()> {
493494
let this = self.get_ref()?;
494-
wait(py, build!(this.put(payload), encoding, attachment))
495+
wait(
496+
py,
497+
build!(this.put(payload), encoding, attachment, timestamp),
498+
)
495499
}
496500

497-
#[pyo3(signature = (*, attachment = None))]
501+
#[pyo3(signature = (*, attachment = None, timestamp = None))]
498502
fn delete(
499503
&self,
500504
py: Python,
501505
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
506+
timestamp: Option<Timestamp>,
502507
) -> PyResult<()> {
503-
wait(py, build!(self.get_ref()?.delete(), attachment))
508+
wait(py, build!(self.get_ref()?.delete(), attachment, timestamp))
504509
}
505510

506511
fn undeclare(&mut self, py: Python) -> PyResult<()> {
@@ -743,7 +748,7 @@ impl SampleMissListener {
743748

744749
#[allow(clippy::too_many_arguments)]
745750
#[pyfunction]
746-
#[pyo3(signature = (session, key_expr, *, encoding = None, congestion_control = None, priority = None, express = None, reliability = None, cache = None, sample_miss_detection = None, publisher_detection = None))]
751+
#[pyo3(signature = (session, key_expr, *, encoding = None, congestion_control = None, priority = None, express = None, reliability = None, allowed_destination = None, cache = None, sample_miss_detection = None, publisher_detection = None))]
747752
pub(crate) fn declare_advanced_publisher(
748753
py: Python,
749754
session: &Session,
@@ -753,6 +758,7 @@ pub(crate) fn declare_advanced_publisher(
753758
priority: Option<Priority>,
754759
express: Option<bool>,
755760
reliability: Option<Reliability>,
761+
allowed_destination: Option<Locality>,
756762
cache: Option<CacheConfig>,
757763
sample_miss_detection: Option<MissDetectionConfig>,
758764
publisher_detection: Option<bool>,
@@ -764,6 +770,7 @@ pub(crate) fn declare_advanced_publisher(
764770
priority,
765771
express,
766772
reliability,
773+
allowed_destination,
767774
cache,
768775
sample_miss_detection,
769776
);
@@ -775,19 +782,21 @@ pub(crate) fn declare_advanced_publisher(
775782

776783
#[allow(clippy::too_many_arguments)]
777784
#[pyfunction]
778-
#[pyo3(signature = (session, key_expr, handler = None, *, history = None, recovery = None, subscriber_detection = None))]
785+
#[pyo3(signature = (session, key_expr, handler = None, *, allowed_origin = None, history = None, recovery = None, subscriber_detection = None))]
779786
pub(crate) fn declare_advanced_subscriber(
780787
session: &Session,
781788
py: Python,
782789
#[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr,
783790
handler: Option<&Bound<PyAny>>,
791+
allowed_origin: Option<Locality>,
784792
history: Option<HistoryConfig>,
785793
recovery: Option<RecoveryConfig>,
786794
subscriber_detection: Option<bool>,
787795
) -> PyResult<AdvancedSubscriber> {
788796
let (handler, background) = into_handler(py, handler)?;
789797
let mut builder = build!(
790798
session.0.declare_subscriber(key_expr).advanced(),
799+
allowed_origin,
791800
history,
792801
recovery
793802
);

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pub(crate) mod zenoh {
6565
ConsolidationMode, Parameters, Querier, Query, QueryConsolidation, QueryTarget,
6666
Queryable, Reply, ReplyError, Selector,
6767
},
68-
sample::{Sample, SampleKind},
68+
sample::{Locality, Sample, SampleKind},
6969
scouting::{scout, Hello, Scout},
7070
session::{open, EntityGlobalId, Session, SessionInfo},
7171
time::Timestamp,

src/pubsub.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::{
2424
matching::{MatchingListener, MatchingStatus},
2525
qos::{CongestionControl, Priority, Reliability},
2626
sample::Sample,
27+
time::Timestamp,
2728
utils::{generic, wait},
2829
};
2930

@@ -76,26 +77,30 @@ impl Publisher {
7677
Ok(wait(py, self.get_ref()?.matching_status())?.into())
7778
}
7879

79-
// TODO add timestamp
80-
#[pyo3(signature = (payload, *, encoding = None, attachment = None))]
80+
#[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None))]
8181
fn put(
8282
&self,
8383
py: Python,
8484
#[pyo3(from_py_with = "ZBytes::from_py")] payload: ZBytes,
8585
#[pyo3(from_py_with = "Encoding::from_py_opt")] encoding: Option<Encoding>,
8686
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
87+
timestamp: Option<Timestamp>,
8788
) -> PyResult<()> {
8889
let this = self.get_ref()?;
89-
wait(py, build!(this.put(payload), encoding, attachment))
90+
wait(
91+
py,
92+
build!(this.put(payload), encoding, attachment, timestamp),
93+
)
9094
}
9195

92-
#[pyo3(signature = (*, attachment = None))]
96+
#[pyo3(signature = (*, attachment = None, timestamp = None))]
9397
fn delete(
9498
&self,
9599
py: Python,
96100
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
101+
timestamp: Option<Timestamp>,
97102
) -> PyResult<()> {
98-
wait(py, build!(self.get_ref()?.delete(), attachment))
103+
wait(py, build!(self.get_ref()?.delete(), attachment, timestamp))
99104
}
100105

101106
#[pyo3(signature = (handler = None))]

src/query.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::{
2626
macros::{build, downcast_or_new, enum_mapper, option_wrapper, wrapper},
2727
matching::{MatchingListener, MatchingStatus},
2828
qos::{CongestionControl, Priority},
29+
time::Timestamp,
2930
utils::{generic, wait, IntoPyResult, IntoPython, IntoRust, MapInto},
3031
};
3132

@@ -126,9 +127,8 @@ impl Query {
126127
Ok(self.get_ref()?.attachment().cloned().map_into())
127128
}
128129

129-
// TODO timestamp
130130
#[allow(clippy::too_many_arguments)]
131-
#[pyo3(signature = (key_expr, payload, *, encoding = None, congestion_control = None, priority = None, express = None, attachment = None))]
131+
#[pyo3(signature = (key_expr, payload, *, encoding = None, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None))]
132132
fn reply(
133133
&self,
134134
py: Python,
@@ -139,6 +139,7 @@ impl Query {
139139
priority: Option<Priority>,
140140
express: Option<bool>,
141141
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
142+
timestamp: Option<Timestamp>,
142143
) -> PyResult<()> {
143144
let build = build!(
144145
self.get_ref()?.reply(key_expr, payload),
@@ -147,6 +148,7 @@ impl Query {
147148
priority,
148149
express,
149150
attachment,
151+
timestamp,
150152
);
151153
wait(py, build)
152154
}
@@ -162,7 +164,8 @@ impl Query {
162164
wait(py, build)
163165
}
164166

165-
#[pyo3(signature = (key_expr, *, congestion_control = None, priority = None, express = None, attachment = None))]
167+
#[allow(clippy::too_many_arguments)]
168+
#[pyo3(signature = (key_expr, *, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None))]
166169
fn reply_del(
167170
&self,
168171
py: Python,
@@ -171,13 +174,15 @@ impl Query {
171174
priority: Option<Priority>,
172175
express: Option<bool>,
173176
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
177+
timestamp: Option<Timestamp>,
174178
) -> PyResult<()> {
175179
let build = build!(
176180
self.get_ref()?.reply_del(key_expr),
177181
congestion_control,
178182
priority,
179183
express,
180184
attachment,
185+
timestamp,
181186
);
182187
wait(py, build)
183188
}

src/sample.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,18 @@ enum_mapper!(zenoh::sample::SampleKind: u8 {
2727
Delete = 1,
2828
});
2929

30+
enum_mapper!(zenoh::sample::Locality: u8 {
31+
SessionLocal,
32+
Remote,
33+
Any,
34+
});
35+
36+
#[pymethods]
37+
impl Locality {
38+
#[classattr]
39+
const DEFAULT: Self = Self::Any;
40+
}
41+
3042
wrapper!(zenoh::sample::Sample);
3143

3244
#[pymethods]

src/session.rs

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::{
2929
pubsub::{Publisher, Subscriber},
3030
qos::{CongestionControl, Priority, Reliability},
3131
query::{Querier, QueryConsolidation, QueryTarget, Queryable, Reply, Selector},
32+
sample::Locality,
3233
time::Timestamp,
3334
utils::{duration, wait, IntoPython, MapInto},
3435
};
@@ -86,7 +87,7 @@ impl Session {
8687
}
8788

8889
#[allow(clippy::too_many_arguments)]
89-
#[pyo3(signature = (key_expr, payload, *, encoding = None, congestion_control = None, priority = None, express = None, attachment = None))]
90+
#[pyo3(signature = (key_expr, payload, *, encoding = None, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None))]
9091
fn put(
9192
&self,
9293
py: Python,
@@ -97,6 +98,8 @@ impl Session {
9798
priority: Option<Priority>,
9899
express: Option<bool>,
99100
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
101+
timestamp: Option<Timestamp>,
102+
allowed_destination: Option<Locality>,
100103
) -> PyResult<()> {
101104
let build = build!(
102105
self.0.put(key_expr, payload),
@@ -105,11 +108,14 @@ impl Session {
105108
priority,
106109
express,
107110
attachment,
111+
timestamp,
112+
allowed_destination,
108113
);
109114
wait(py, build)
110115
}
111116

112-
#[pyo3(signature = (key_expr, *, congestion_control = None, priority = None, express = None, attachment = None))]
117+
#[allow(clippy::too_many_arguments)]
118+
#[pyo3(signature = (key_expr, *, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None))]
113119
fn delete(
114120
&self,
115121
py: Python,
@@ -118,19 +124,23 @@ impl Session {
118124
priority: Option<Priority>,
119125
express: Option<bool>,
120126
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
127+
timestamp: Option<Timestamp>,
128+
allowed_destination: Option<Locality>,
121129
) -> PyResult<()> {
122130
let build = build!(
123131
self.0.delete(key_expr),
124132
congestion_control,
125133
priority,
126134
express,
127135
attachment,
136+
timestamp,
137+
allowed_destination,
128138
);
129139
wait(py, build)
130140
}
131141

132142
#[allow(clippy::too_many_arguments)]
133-
#[pyo3(signature = (selector, handler = None, *, target = None, consolidation = None, timeout = None, congestion_control = None, priority = None, express = None, payload = None, encoding = None, attachment = None))]
143+
#[pyo3(signature = (selector, handler = None, *, target = None, consolidation = None, timeout = None, congestion_control = None, priority = None, express = None, payload = None, encoding = None, attachment = None, allowed_destination = None))]
134144
fn get(
135145
&self,
136146
py: Python,
@@ -147,6 +157,7 @@ impl Session {
147157
#[pyo3(from_py_with = "ZBytes::from_py_opt")] payload: Option<ZBytes>,
148158
#[pyo3(from_py_with = "Encoding::from_py_opt")] encoding: Option<Encoding>,
149159
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
160+
allowed_destination: Option<Locality>,
150161
) -> PyResult<HandlerImpl<Reply>> {
151162
let (handler, _) = into_handler(py, handler)?;
152163
let builder = build!(
@@ -160,6 +171,7 @@ impl Session {
160171
payload,
161172
encoding,
162173
attachment,
174+
allowed_destination,
163175
);
164176
wait(py, builder.with(handler)).map_into()
165177
}
@@ -169,32 +181,34 @@ impl Session {
169181
self.0.info().into()
170182
}
171183

172-
#[pyo3(signature = (key_expr, handler = None))]
184+
#[pyo3(signature = (key_expr, handler = None, *, allowed_origin = None))]
173185
fn declare_subscriber(
174186
&self,
175187
py: Python,
176188
#[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr,
177189
handler: Option<&Bound<PyAny>>,
190+
allowed_origin: Option<Locality>,
178191
) -> PyResult<Subscriber> {
179192
let (handler, background) = into_handler(py, handler)?;
180-
let builder = self.0.declare_subscriber(key_expr);
193+
let builder = build!(self.0.declare_subscriber(key_expr), allowed_origin);
181194
let mut subscriber = wait(py, builder.with(handler))?;
182195
if background {
183196
subscriber.set_background(true);
184197
}
185198
Ok(subscriber.into())
186199
}
187200

188-
#[pyo3(signature = (key_expr, handler = None, *, complete = None))]
201+
#[pyo3(signature = (key_expr, handler = None, *, complete = None, allowed_origin = None))]
189202
fn declare_queryable(
190203
&self,
191204
py: Python,
192205
#[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr,
193206
handler: Option<&Bound<PyAny>>,
194207
complete: Option<bool>,
208+
allowed_origin: Option<Locality>,
195209
) -> PyResult<Queryable> {
196210
let (handler, background) = into_handler(py, handler)?;
197-
let builder = build!(self.0.declare_queryable(key_expr), complete);
211+
let builder = build!(self.0.declare_queryable(key_expr), complete, allowed_origin);
198212
let mut queryable = wait(py, builder.with(handler))?;
199213
if background {
200214
queryable.set_background(true);
@@ -203,7 +217,7 @@ impl Session {
203217
}
204218

205219
#[allow(clippy::too_many_arguments)]
206-
#[pyo3(signature = (key_expr, *, encoding = None, congestion_control = None, priority = None, express = None, reliability = None))]
220+
#[pyo3(signature = (key_expr, *, encoding = None, congestion_control = None, priority = None, express = None, reliability = None, allowed_destination = None))]
207221
fn declare_publisher(
208222
&self,
209223
py: Python,
@@ -213,6 +227,7 @@ impl Session {
213227
priority: Option<Priority>,
214228
express: Option<bool>,
215229
reliability: Option<Reliability>,
230+
allowed_destination: Option<Locality>,
216231
) -> PyResult<Publisher> {
217232
let builder = build!(
218233
self.0.declare_publisher(key_expr),
@@ -221,12 +236,13 @@ impl Session {
221236
priority,
222237
express,
223238
reliability,
239+
allowed_destination,
224240
);
225241
wait(py, builder).map_into()
226242
}
227243

228244
#[allow(clippy::too_many_arguments)]
229-
#[pyo3(signature = (key_expr, *, target = None, consolidation = None, timeout = None, congestion_control = None, priority = None, express = None))]
245+
#[pyo3(signature = (key_expr, *, target = None, consolidation = None, timeout = None, congestion_control = None, priority = None, express = None, allowed_destination = None))]
230246
fn declare_querier(
231247
&self,
232248
py: Python,
@@ -239,6 +255,7 @@ impl Session {
239255
congestion_control: Option<CongestionControl>,
240256
priority: Option<Priority>,
241257
express: Option<bool>,
258+
allowed_destination: Option<Locality>,
242259
) -> PyResult<Querier> {
243260
let builder = build!(
244261
self.0.declare_querier(key_expr),
@@ -248,6 +265,7 @@ impl Session {
248265
congestion_control,
249266
priority,
250267
express,
268+
allowed_destination,
251269
);
252270
wait(py, builder).map_into()
253271
}

0 commit comments

Comments
 (0)