Skip to content

Commit de86687

Browse files
committed
Merge branch 'main' into api/make-replier-id-global
2 parents c4a0845 + d4a37d9 commit de86687

11 files changed

Lines changed: 403 additions & 38 deletions

File tree

examples/z_pub.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@
1818

1919

2020
def main(
21-
conf: zenoh.Config, key: str, payload: str, iter: Optional[int], interval: int
21+
conf: zenoh.Config,
22+
key: str,
23+
payload: str,
24+
iter: Optional[int],
25+
interval: int,
26+
add_matching_listener: bool,
2227
):
2328
# initiate logging
2429
zenoh.init_log_from_env_or("error")
@@ -28,6 +33,16 @@ def main(
2833
print(f"Declaring Publisher on '{key}'...")
2934
pub = session.declare_publisher(key)
3035

36+
if add_matching_listener:
37+
38+
def on_matching_status_update(status: zenoh.MatchingStatus):
39+
if status.matching:
40+
print("Publisher has matching subscribers.")
41+
else:
42+
print("Publisher has NO MORE matching subscribers")
43+
44+
pub.declare_matching_listener(on_matching_status_update)
45+
3146
print("Press CTRL-C to quit...")
3247
for idx in itertools.count() if iter is None else range(iter):
3348
time.sleep(interval)
@@ -71,8 +86,21 @@ def main(
7186
default=1.0,
7287
help="Interval between each put",
7388
)
89+
parser.add_argument(
90+
"--add-matching-listener",
91+
default=False,
92+
action="store_true",
93+
help="Add matching listener",
94+
)
7495

7596
args = parser.parse_args()
7697
conf = common.get_config_from_args(args)
7798

78-
main(conf, args.key, args.payload, args.iter, args.interval)
99+
main(
100+
conf,
101+
args.key,
102+
args.payload,
103+
args.iter,
104+
args.interval,
105+
args.add_matching_listener,
106+
)

examples/z_querier.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
#
1414
import itertools
1515
import time
16-
from typing import Optional, Tuple
16+
from typing import Optional
1717

1818
import zenoh
1919

@@ -25,6 +25,7 @@ def main(
2525
payload: str,
2626
timeout: float,
2727
iter: Optional[int],
28+
add_matching_listener: bool,
2829
):
2930
# initiate logging
3031
zenoh.init_log_from_env_or("error")
@@ -37,6 +38,16 @@ def main(
3738
query_selector.key_expr, target=target, timeout=timeout
3839
)
3940

41+
if add_matching_listener:
42+
43+
def on_matching_status_update(status: zenoh.MatchingStatus):
44+
if status.matching:
45+
print("Querier has matching queryables.")
46+
else:
47+
print("Querier has NO MORE matching queryables")
48+
49+
querier.declare_matching_listener(on_matching_status_update)
50+
4051
print("Press CTRL-C to quit...")
4152
for idx in itertools.count() if iter is None else range(iter):
4253
time.sleep(1.0)
@@ -99,6 +110,12 @@ def main(
99110
parser.add_argument(
100111
"--iter", dest="iter", type=int, help="How many gets to perform"
101112
)
113+
parser.add_argument(
114+
"--add-matching-listener",
115+
default=False,
116+
action="store_true",
117+
help="Add matching listener",
118+
)
102119

103120
args = parser.parse_args()
104121
conf = common.get_config_from_args(args)
@@ -109,4 +126,12 @@ def main(
109126
"ALL_COMPLETE": zenoh.QueryTarget.ALL_COMPLETE,
110127
}.get(args.target)
111128

112-
main(conf, args.selector, target, args.payload, args.timeout, args.iter)
129+
main(
130+
conf,
131+
args.selector,
132+
target,
133+
args.payload,
134+
args.timeout,
135+
args.iter,
136+
args.add_matching_listener,
137+
)

src/ext.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ use crate::{
2121
macros::{build, import, option_wrapper, py_static, try_import, wrapper},
2222
pubsub::Subscriber,
2323
qos::{CongestionControl, Priority, Reliability},
24-
sample::Sample,
24+
sample::{Locality, Sample},
2525
session::{EntityGlobalId, Session},
26+
time::Timestamp,
2627
utils::{duration, generic, wait, MapInto},
2728
ZDeserializeError,
2829
};
@@ -481,26 +482,30 @@ impl AdvancedPublisher {
481482
Ok(self.get_ref()?.priority().into())
482483
}
483484

484-
// TODO add timestamp
485-
#[pyo3(signature = (payload, *, encoding = None, attachment = None))]
485+
#[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None))]
486486
fn put(
487487
&self,
488488
py: Python,
489489
#[pyo3(from_py_with = "ZBytes::from_py")] payload: ZBytes,
490490
#[pyo3(from_py_with = "Encoding::from_py_opt")] encoding: Option<Encoding>,
491491
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
492+
timestamp: Option<Timestamp>,
492493
) -> PyResult<()> {
493494
let this = self.get_ref()?;
494-
wait(py, build!(this.put(payload), encoding, attachment))
495+
wait(
496+
py,
497+
build!(this.put(payload), encoding, attachment, timestamp),
498+
)
495499
}
496500

497-
#[pyo3(signature = (*, attachment = None))]
501+
#[pyo3(signature = (*, attachment = None, timestamp = None))]
498502
fn delete(
499503
&self,
500504
py: Python,
501505
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
506+
timestamp: Option<Timestamp>,
502507
) -> PyResult<()> {
503-
wait(py, build!(self.get_ref()?.delete(), attachment))
508+
wait(py, build!(self.get_ref()?.delete(), attachment, timestamp))
504509
}
505510

506511
fn undeclare(&mut self, py: Python) -> PyResult<()> {
@@ -743,7 +748,7 @@ impl SampleMissListener {
743748

744749
#[allow(clippy::too_many_arguments)]
745750
#[pyfunction]
746-
#[pyo3(signature = (session, key_expr, *, encoding = None, congestion_control = None, priority = None, express = None, reliability = None, cache = None, sample_miss_detection = None, publisher_detection = None))]
751+
#[pyo3(signature = (session, key_expr, *, encoding = None, congestion_control = None, priority = None, express = None, reliability = None, allowed_destination = None, cache = None, sample_miss_detection = None, publisher_detection = None))]
747752
pub(crate) fn declare_advanced_publisher(
748753
py: Python,
749754
session: &Session,
@@ -753,6 +758,7 @@ pub(crate) fn declare_advanced_publisher(
753758
priority: Option<Priority>,
754759
express: Option<bool>,
755760
reliability: Option<Reliability>,
761+
allowed_destination: Option<Locality>,
756762
cache: Option<CacheConfig>,
757763
sample_miss_detection: Option<MissDetectionConfig>,
758764
publisher_detection: Option<bool>,
@@ -764,6 +770,7 @@ pub(crate) fn declare_advanced_publisher(
764770
priority,
765771
express,
766772
reliability,
773+
allowed_destination,
767774
cache,
768775
sample_miss_detection,
769776
);
@@ -775,19 +782,21 @@ pub(crate) fn declare_advanced_publisher(
775782

776783
#[allow(clippy::too_many_arguments)]
777784
#[pyfunction]
778-
#[pyo3(signature = (session, key_expr, handler = None, *, history = None, recovery = None, subscriber_detection = None))]
785+
#[pyo3(signature = (session, key_expr, handler = None, *, allowed_origin = None, history = None, recovery = None, subscriber_detection = None))]
779786
pub(crate) fn declare_advanced_subscriber(
780787
session: &Session,
781788
py: Python,
782789
#[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr,
783790
handler: Option<&Bound<PyAny>>,
791+
allowed_origin: Option<Locality>,
784792
history: Option<HistoryConfig>,
785793
recovery: Option<RecoveryConfig>,
786794
subscriber_detection: Option<bool>,
787795
) -> PyResult<AdvancedSubscriber> {
788796
let (handler, background) = into_handler(py, handler)?;
789797
let mut builder = build!(
790798
session.0.declare_subscriber(key_expr).advanced(),
799+
allowed_origin,
791800
history,
792801
recovery
793802
);

src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod handlers;
2121
mod key_expr;
2222
mod liveliness;
2323
mod macros;
24+
mod matching;
2425
mod pubsub;
2526
mod qos;
2627
mod query;
@@ -57,13 +58,14 @@ pub(crate) mod zenoh {
5758
handlers::Handler,
5859
key_expr::{KeyExpr, SetIntersectionLevel},
5960
liveliness::{Liveliness, LivelinessToken},
61+
matching::{MatchingListener, MatchingStatus},
6062
pubsub::{Publisher, Subscriber},
6163
qos::{CongestionControl, Priority, Reliability},
6264
query::{
6365
ConsolidationMode, Parameters, Querier, Query, QueryConsolidation, QueryTarget,
6466
Queryable, Reply, ReplyError, Selector,
6567
},
66-
sample::{Sample, SampleKind},
68+
sample::{Locality, Sample, SampleKind},
6769
scouting::{scout, Hello, Scout},
6870
session::{open, EntityGlobalId, Session, SessionInfo},
6971
time::Timestamp,

src/matching.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
//
2+
// Copyright (c) 2025 ZettaScale Technology
3+
//
4+
// This program and the accompanying materials are made available under the
5+
// terms of the Eclipse Public License 2.0 which is available at
6+
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
//
9+
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
//
11+
// Contributors:
12+
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13+
//
14+
use pyo3::{
15+
prelude::*,
16+
types::{PyDict, PyIterator, PyTuple, PyType},
17+
};
18+
19+
use crate::{
20+
handlers::HandlerImpl,
21+
macros::{option_wrapper, wrapper},
22+
utils::{generic, wait},
23+
};
24+
25+
wrapper!(zenoh::matching::MatchingStatus);
26+
27+
#[pymethods]
28+
impl MatchingStatus {
29+
#[getter]
30+
fn matching(&self) -> bool {
31+
self.0.matching()
32+
}
33+
34+
fn __repr__(&self) -> String {
35+
format!("{:?}", self.0)
36+
}
37+
}
38+
39+
option_wrapper!(
40+
zenoh::matching::MatchingListener<HandlerImpl<MatchingStatus>>,
41+
"Undeclared matching listener"
42+
);
43+
44+
#[pymethods]
45+
impl MatchingListener {
46+
#[classmethod]
47+
fn __class_getitem__(cls: &Bound<PyType>, args: &Bound<PyAny>) -> PyObject {
48+
generic(cls, args)
49+
}
50+
51+
fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> &'a Bound<'py, Self> {
52+
this
53+
}
54+
55+
#[pyo3(signature = (*_args, **_kwargs))]
56+
fn __exit__(
57+
&mut self,
58+
py: Python,
59+
_args: &Bound<PyTuple>,
60+
_kwargs: Option<&Bound<PyDict>>,
61+
) -> PyResult<PyObject> {
62+
self.undeclare(py)?;
63+
Ok(py.None())
64+
}
65+
66+
#[getter]
67+
fn handler(&self, py: Python) -> PyResult<PyObject> {
68+
Ok(self.get_ref()?.handler().to_object(py))
69+
}
70+
71+
fn try_recv(&self, py: Python) -> PyResult<PyObject> {
72+
self.get_ref()?.handler().try_recv(py)
73+
}
74+
75+
fn recv(&self, py: Python) -> PyResult<PyObject> {
76+
self.get_ref()?.handler().recv(py)
77+
}
78+
79+
fn undeclare(&mut self, py: Python) -> PyResult<()> {
80+
wait(py, self.take()?.undeclare())
81+
}
82+
83+
fn __iter__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyIterator>> {
84+
self.handler(py)?.bind(py).iter()
85+
}
86+
87+
fn __repr__(&self) -> PyResult<String> {
88+
Ok(format!("{:?}", self.get_ref()?))
89+
}
90+
}

src/pubsub.rs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ use pyo3::{
1818

1919
use crate::{
2020
bytes::{Encoding, ZBytes},
21-
handlers::HandlerImpl,
21+
handlers::{into_handler, HandlerImpl},
2222
key_expr::KeyExpr,
2323
macros::{build, option_wrapper},
24+
matching::{MatchingListener, MatchingStatus},
2425
qos::{CongestionControl, Priority, Reliability},
2526
sample::Sample,
27+
time::Timestamp,
2628
utils::{generic, wait},
2729
};
2830

@@ -70,26 +72,49 @@ impl Publisher {
7072
Ok(self.get_ref()?.reliability().into())
7173
}
7274

73-
// TODO add timestamp
74-
#[pyo3(signature = (payload, *, encoding = None, attachment = None))]
75+
#[getter]
76+
fn matching_status(&self, py: Python) -> PyResult<MatchingStatus> {
77+
Ok(wait(py, self.get_ref()?.matching_status())?.into())
78+
}
79+
80+
#[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None))]
7581
fn put(
7682
&self,
7783
py: Python,
7884
#[pyo3(from_py_with = "ZBytes::from_py")] payload: ZBytes,
7985
#[pyo3(from_py_with = "Encoding::from_py_opt")] encoding: Option<Encoding>,
8086
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
87+
timestamp: Option<Timestamp>,
8188
) -> PyResult<()> {
8289
let this = self.get_ref()?;
83-
wait(py, build!(this.put(payload), encoding, attachment))
90+
wait(
91+
py,
92+
build!(this.put(payload), encoding, attachment, timestamp),
93+
)
8494
}
8595

86-
#[pyo3(signature = (*, attachment = None))]
96+
#[pyo3(signature = (*, attachment = None, timestamp = None))]
8797
fn delete(
8898
&self,
8999
py: Python,
90100
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
101+
timestamp: Option<Timestamp>,
91102
) -> PyResult<()> {
92-
wait(py, build!(self.get_ref()?.delete(), attachment))
103+
wait(py, build!(self.get_ref()?.delete(), attachment, timestamp))
104+
}
105+
106+
#[pyo3(signature = (handler = None))]
107+
fn declare_matching_listener(
108+
&self,
109+
py: Python,
110+
handler: Option<&Bound<PyAny>>,
111+
) -> PyResult<MatchingListener> {
112+
let (handler, background) = into_handler(py, handler)?;
113+
let mut listener = wait(py, self.get_ref()?.matching_listener().with(handler))?;
114+
if background {
115+
listener.set_background(true);
116+
}
117+
Ok(listener.into())
93118
}
94119

95120
fn undeclare(&mut self, py: Python) -> PyResult<()> {

0 commit comments

Comments
 (0)