From fafad639d016966e544365a98d559ddf154c209b Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 2 Jun 2025 17:12:59 +0200 Subject: [PATCH 1/7] implement advanced pub/sub --- Cargo.lock | 54 +++---- Cargo.toml | 1 + src/bytes.rs | 4 +- src/ext.rs | 385 ++++++++++++++++++++++++++++++++++++++++++++- src/lib.rs | 2 +- src/liveliness.rs | 4 +- src/macros.rs | 3 +- src/query.rs | 6 +- src/session.rs | 25 ++- src/utils.rs | 20 ++- zenoh/__init__.pyi | 19 ++- zenoh/ext.pyi | 307 +++++++++++++++++++++++++++++++++++- 12 files changed, 777 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 92332fd5..803323b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -2940,7 +2940,7 @@ dependencies = [ [[package]] name = "zenoh" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "ahash", "arc-swap", @@ -2989,7 +2989,7 @@ dependencies = [ [[package]] name = "zenoh-buffers" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "zenoh-collections", ] @@ -2997,7 +2997,7 @@ dependencies = [ [[package]] name = "zenoh-codec" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "tracing", "uhlc", @@ -3008,7 +3008,7 @@ dependencies = [ [[package]] name = "zenoh-collections" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "ahash", ] @@ -3016,7 +3016,7 @@ dependencies = [ [[package]] name = "zenoh-config" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "json5", "nonempty-collections", @@ -3040,7 +3040,7 @@ dependencies = [ [[package]] name = "zenoh-core" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "lazy_static", "tokio", @@ -3051,7 +3051,7 @@ dependencies = [ [[package]] name = "zenoh-crypto" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "aes", "hmac", @@ -3064,7 +3064,7 @@ dependencies = [ [[package]] name = "zenoh-ext" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "async-trait", "bincode", @@ -3083,7 +3083,7 @@ dependencies = [ [[package]] name = "zenoh-keyexpr" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "getrandom", "hashbrown 0.14.5", @@ -3098,7 +3098,7 @@ dependencies = [ [[package]] name = "zenoh-link" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "zenoh-config", "zenoh-link-commons", @@ -3115,7 +3115,7 @@ dependencies = [ [[package]] name = "zenoh-link-commons" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "async-trait", "flume", @@ -3140,7 +3140,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "async-trait", "base64 0.22.1", @@ -3167,7 +3167,7 @@ dependencies = [ [[package]] name = "zenoh-link-tcp" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "async-trait", "socket2", @@ -3184,7 +3184,7 @@ dependencies = [ [[package]] name = "zenoh-link-tls" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "async-trait", "base64 0.22.1", @@ -3213,7 +3213,7 @@ dependencies = [ [[package]] name = "zenoh-link-udp" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "async-trait", "socket2", @@ -3232,7 +3232,7 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "async-trait", "nix", @@ -3250,7 +3250,7 @@ dependencies = [ [[package]] name = "zenoh-link-ws" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "async-trait", "futures-util", @@ -3270,7 +3270,7 @@ dependencies = [ [[package]] name = "zenoh-macros" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "proc-macro2", "quote", @@ -3281,7 +3281,7 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "git-version", "libloading", @@ -3297,7 +3297,7 @@ dependencies = [ [[package]] name = "zenoh-protocol" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "const_format", "rand", @@ -3324,7 +3324,7 @@ dependencies = [ [[package]] name = "zenoh-result" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "anyhow", ] @@ -3332,7 +3332,7 @@ dependencies = [ [[package]] name = "zenoh-runtime" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "lazy_static", "ron", @@ -3346,7 +3346,7 @@ dependencies = [ [[package]] name = "zenoh-sync" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "arc-swap", "event-listener", @@ -3360,7 +3360,7 @@ dependencies = [ [[package]] name = "zenoh-task" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "futures", "tokio", @@ -3373,7 +3373,7 @@ dependencies = [ [[package]] name = "zenoh-transport" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "async-trait", "crossbeam-utils", @@ -3406,7 +3406,7 @@ dependencies = [ [[package]] name = "zenoh-util" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c528884fd6f3ce73a8c11cdb9c596a5cbdaa378e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" dependencies = [ "async-trait", "const_format", diff --git a/Cargo.toml b/Cargo.toml index 09c87a75..41100d1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ crate-type = ["cdylib"] [features] default = ["zenoh/default", "zenoh-ext"] +zenoh-ext = ["dep:zenoh-ext", "zenoh-ext/unstable", "zenoh-ext/internal"] [badges] maintenance = { status = "actively-developed" } diff --git a/src/bytes.rs b/src/bytes.rs index eecb6450..2dcfc1d3 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -92,8 +92,8 @@ downcast_or_new!(Encoding => Option); #[pymethods] impl Encoding { #[new] - fn new(s: Option) -> PyResult { - Ok(s.map_into().map(Self).unwrap_or_default()) + fn new(s: Option) -> Self { + s.map_into().map(Self).unwrap_or_default() } fn with_schema(&self, schema: String) -> Self { diff --git a/src/ext.rs b/src/ext.rs index ef927c2c..ec32ed84 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -1,17 +1,29 @@ +use std::time::Duration; + use pyo3::{ exceptions::{PyTypeError, PyValueError}, prelude::*, types::{ - PyBool, PyByteArray, PyBytes, PyDict, PyFloat, PyFrozenSet, PyInt, PyList, PySet, PyString, - PyTuple, PyType, + PyBool, PyByteArray, PyBytes, PyDict, PyFloat, PyFrozenSet, PyInt, PyIterator, PyList, + PySet, PyString, PyTuple, PyType, }, PyTypeInfo, }; -use zenoh_ext::{Deserialize, VarInt, ZDeserializer, ZSerializer}; +use zenoh_ext::{ + AdvancedPublisherBuilderExt, AdvancedSubscriberBuilderExt, Deserialize, VarInt, ZDeserializer, + ZSerializer, +}; use crate::{ - bytes::ZBytes, - macros::{import, py_static, try_import}, + bytes::{Encoding, ZBytes}, + handlers::{into_handler, HandlerImpl}, + key_expr::KeyExpr, + macros::{build, downcast_or_new, import, option_wrapper, py_static, try_import, wrapper}, + pubsub::Subscriber, + qos::{CongestionControl, Priority, Reliability}, + sample::Sample, + session::{EntityGlobalId, Session}, + utils::{duration, generic, wait, MapInto}, ZDeserializeError, }; @@ -426,3 +438,366 @@ pub(crate) fn z_deserialize(tp: &Bound, zbytes: &ZBytes) -> PyResult, + "Undeclared publisher" +); + +#[pymethods] +impl AdvancedPublisher { + fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> PyResult<&'a Bound<'py, Self>> { + Self::check(this) + } + + #[pyo3(signature = (*_args, **_kwargs))] + fn __exit__( + &mut self, + py: Python, + _args: &Bound, + _kwargs: Option<&Bound>, + ) -> PyResult { + self.undeclare(py)?; + Ok(py.None()) + } + + #[getter] + fn key_expr(&self) -> PyResult { + Ok(self.get_ref()?.key_expr().clone().into()) + } + + #[getter] + fn encoding(&self) -> PyResult { + Ok(self.get_ref()?.encoding().clone().into()) + } + + #[getter] + fn congestion_control(&self) -> PyResult { + Ok(self.get_ref()?.congestion_control().into()) + } + + #[getter] + fn priority(&self) -> PyResult { + Ok(self.get_ref()?.priority().into()) + } + + // TODO add timestamp + #[pyo3(signature = (payload, *, encoding = None, attachment = None))] + fn put( + &self, + py: Python, + #[pyo3(from_py_with = "ZBytes::from_py")] payload: ZBytes, + #[pyo3(from_py_with = "Encoding::from_py_opt")] encoding: Option, + #[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option, + ) -> PyResult<()> { + let this = self.get_ref()?; + wait(py, build!(this.put(payload), encoding, attachment)) + } + + #[pyo3(signature = (*, attachment = None))] + fn delete( + &self, + py: Python, + #[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option, + ) -> PyResult<()> { + wait(py, build!(self.get_ref()?.delete(), attachment)) + } + + fn undeclare(&mut self, py: Python) -> PyResult<()> { + wait(py, self.take()?.undeclare()) + } +} + +option_wrapper!( + zenoh_ext::AdvancedSubscriber>, + "Undeclared subscriber" +); + +#[pymethods] +impl AdvancedSubscriber { + #[classmethod] + fn __class_getitem__(cls: &Bound, args: &Bound) -> PyObject { + generic(cls, args) + } + + fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> &'a Bound<'py, Self> { + this + } + + #[pyo3(signature = (*_args, **_kwargs))] + fn __exit__( + &mut self, + py: Python, + _args: &Bound, + _kwargs: Option<&Bound>, + ) -> PyResult { + self.undeclare(py)?; + Ok(py.None()) + } + + #[getter] + fn key_expr(&self) -> PyResult { + Ok(self.get_ref()?.key_expr().clone().into()) + } + + #[getter] + fn handler(&self, py: Python) -> PyResult { + Ok(self.get_ref()?.handler().to_object(py)) + } + + #[pyo3(signature = (handler = None))] + fn sample_miss_listener( + &self, + py: Python, + handler: Option<&Bound>, + ) -> PyResult { + let (handler, background) = into_handler(py, handler)?; + let builder = self.get_ref()?.sample_miss_listener(); + let mut listener = wait(py, builder.with(handler))?; + if background { + listener.set_background(true); + } + Ok(listener.into()) + } + + #[pyo3(signature = (handler = None, *, history = None))] + fn detect_publishers( + &self, + py: Python, + handler: Option<&Bound>, + history: Option, + ) -> PyResult { + let (handler, background) = into_handler(py, handler)?; + let builder = build!(self.get_ref()?.detect_publishers(), history); + let mut subscriber = wait(py, builder.with(handler))?; + if background { + subscriber.set_background(true); + } + Ok(subscriber.into()) + } + + fn try_recv(&self, py: Python) -> PyResult { + self.get_ref()?.handler().try_recv(py) + } + + fn recv(&self, py: Python) -> PyResult { + self.get_ref()?.handler().recv(py) + } + + fn undeclare(&mut self, py: Python) -> PyResult<()> { + wait(py, self.take()?.undeclare()) + } + + fn __iter__<'py>(&self, py: Python<'py>) -> PyResult> { + self.handler(py)?.bind(py).iter() + } +} + +wrapper!(zenoh_ext::CacheConfig: Clone); +downcast_or_new!(CacheConfig => Option, None); + +#[pymethods] +impl CacheConfig { + #[new] + #[pyo3(signature = (max_samples = None, *, replies_config = None))] + fn new(max_samples: Option, replies_config: Option) -> Self { + Self(build!( + zenoh_ext::CacheConfig::default(), + max_samples, + replies_config, + )) + } +} + +wrapper!(zenoh_ext::HistoryConfig: Clone); + +#[pymethods] +impl HistoryConfig { + #[new] + #[pyo3(signature = (*, detect_late_publishers = None, max_samples = None, max_age = None))] + fn new( + detect_late_publishers: Option, + max_samples: Option, + max_age: Option, + ) -> Self { + let mut config = build!(zenoh_ext::HistoryConfig::default(), max_samples, max_age); + if matches!(detect_late_publishers, Some(true)) { + config = config.detect_late_publishers(); + } + Self(config) + } +} + +wrapper!(zenoh_ext::Miss); + +#[pymethods] +impl Miss { + #[getter] + fn source(&self) -> EntityGlobalId { + self.source().into() + } + + #[getter] + fn nb(&self) -> usize { + self.nb() + } +} + +wrapper!(zenoh_ext::MissDetectionConfig: Clone); + +#[pymethods] +impl MissDetectionConfig { + #[new] + #[pyo3(signature = (*, heartbeat = None, sporadic_heartbeat = None))] + fn new( + #[pyo3(from_py_with = "duration")] heartbeat: Option, + #[pyo3(from_py_with = "duration")] sporadic_heartbeat: Option, + ) -> Self { + Self(build!( + zenoh_ext::MissDetectionConfig::default(), + heartbeat, + sporadic_heartbeat, + )) + } +} + +wrapper!(zenoh_ext::RecoveryConfig: Clone); + +#[pymethods] +impl RecoveryConfig { + #[new] + #[pyo3(signature = (*, periodic_queries = None, heartbeat = None))] + fn new(periodic_queries: Option, heartbeat: Option) -> PyResult { + let config = zenoh_ext::RecoveryConfig::default(); + Ok(Self(match (periodic_queries, heartbeat) { + (Some(periodic_queries), None) => config.periodic_queries(periodic_queries), + (None, Some(true)) => config.heartbeat(), + _ => return Err(PyValueError::new_err("invalid parameters")), + })) + } +} + +wrapper!(zenoh_ext::RepliesConfig: Clone); + +#[pymethods] +impl RepliesConfig { + #[new] + #[pyo3(signature = (*, congestion_control = None, priority = None, express = None))] + fn new( + congestion_control: Option, + priority: Option, + express: Option, + ) -> Self { + Self(build!( + zenoh_ext::RepliesConfig::default(), + congestion_control, + priority, + express, + express, + )) + } +} + +option_wrapper!( + zenoh_ext::SampleMissListener>, + "Undeclared sample-miss listener" +); + +#[pymethods] +impl SampleMissListener { + #[classmethod] + fn __class_getitem__(cls: &Bound, args: &Bound) -> PyObject { + generic(cls, args) + } + + fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> &'a Bound<'py, Self> { + this + } + + #[pyo3(signature = (*_args, **_kwargs))] + fn __exit__( + &mut self, + py: Python, + _args: &Bound, + _kwargs: Option<&Bound>, + ) -> PyResult { + self.undeclare(py)?; + Ok(py.None()) + } + + fn try_recv(&self, py: Python) -> PyResult { + self.get_ref()?.try_recv(py) + } + + fn recv(&self, py: Python) -> PyResult { + self.get_ref()?.recv(py) + } + + fn undeclare(&mut self, py: Python) -> PyResult<()> { + wait(py, self.take()?.undeclare()) + } + + fn __iter__<'py>(&self, py: Python<'py>) -> PyResult> { + (**self.get_ref()?).to_object(py).bind(py).iter() + } +} + +#[allow(clippy::too_many_arguments)] +#[pyfunction] +#[pyo3(signature = (session, key_expr, *, encoding = None, congestion_control = None, priority = None, express = None, reliability = None, cache = None, sample_miss_detection = None, publisher_detection = None))] +fn declare_publisher( + py: Python, + session: &Session, + #[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr, + #[pyo3(from_py_with = "Encoding::from_py_opt")] encoding: Option, + congestion_control: Option, + priority: Option, + express: Option, + reliability: Option, + cache: Option, + sample_miss_detection: Option, + publisher_detection: Option, +) -> PyResult { + let mut builder = build!( + session.0.declare_publisher(key_expr).advanced(), + encoding, + congestion_control, + priority, + express, + reliability, + cache, + sample_miss_detection, + ); + if matches!(publisher_detection, Some(true)) { + builder = builder.publisher_detection(); + } + wait(py, builder).map_into() +} + +#[allow(clippy::too_many_arguments)] +#[pyfunction] +#[pyo3(signature = (session, key_expr, handler = None, *, history = None, recovery = None, subscriber_detection = None))] +fn declare_subscriber( + session: &Session, + py: Python, + #[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr, + handler: Option<&Bound>, + history: Option, + recovery: Option, + subscriber_detection: Option, +) -> PyResult { + let (handler, background) = into_handler(py, handler)?; + let mut builder = build!( + session.0.declare_subscriber(key_expr).advanced(), + history, + recovery + ); + if matches!(subscriber_detection, Some(true)) { + builder = builder.subscriber_detection(); + } + let mut subscriber = wait(py, builder.with(handler))?; + if background { + subscriber.set_background(true); + } + Ok(subscriber.into()) +} diff --git a/src/lib.rs b/src/lib.rs index 3a17f716..122b058e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,7 +65,7 @@ pub(crate) mod zenoh { }, sample::{Sample, SampleKind}, scouting::{scout, Hello, Scout}, - session::{open, Session, SessionInfo}, + session::{open, EntityGlobalId, Session, SessionInfo}, time::Timestamp, ZError, }; diff --git a/src/liveliness.rs b/src/liveliness.rs index 2d00f349..ec45b5fe 100644 --- a/src/liveliness.rs +++ b/src/liveliness.rs @@ -11,7 +11,7 @@ use crate::{ macros::{build, option_wrapper}, pubsub::Subscriber, query::Reply, - utils::{timeout, wait, MapInto}, + utils::{duration, wait, MapInto}, }; #[pyclass] @@ -53,7 +53,7 @@ impl Liveliness { py: Python, #[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr, handler: Option<&Bound>, - #[pyo3(from_py_with = "timeout")] timeout: Option, + #[pyo3(from_py_with = "duration")] timeout: Option, ) -> PyResult> { let (handler, _) = into_handler(py, handler)?; let liveliness = self.0.liveliness(); diff --git a/src/macros.rs b/src/macros.rs index f0e8e3a9..cc94427f 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -71,7 +71,8 @@ macro_rules! downcast_or_new { if let Ok(obj) = ::extract_bound(obj) { return Ok(obj); } - Self::new(PyResult::Ok(obj)$(.and_then(<$new>::extract_bound))??.into(), $($other)?) + let this = Self::new(PyResult::Ok(obj)$(.and_then(<$new>::extract_bound))??.into(), $($other)?); + $crate::utils::IntoResult::into_result(this) } pub(crate) fn from_py_opt(obj: &Bound) -> PyResult> { if obj.is_none() { diff --git a/src/query.rs b/src/query.rs index 449a33be..342b436e 100644 --- a/src/query.rs +++ b/src/query.rs @@ -64,11 +64,11 @@ impl QueryConsolidation { const DEFAULT: Self = Self(zenoh::query::QueryConsolidation::DEFAULT); #[new] - fn new(mode: Option) -> PyResult { + fn new(mode: Option) -> Self { let Some(mode) = mode else { - return Ok(Self::DEFAULT); + return Self::DEFAULT; }; - Ok(Self(mode.into_rust().into())) + Self(mode.into_rust().into()) } fn mode(&self) -> ConsolidationMode { diff --git a/src/session.rs b/src/session.rs index 0fe930c1..26b6844b 100644 --- a/src/session.rs +++ b/src/session.rs @@ -17,7 +17,7 @@ use pyo3::{ prelude::*, types::{PyDict, PyList, PyTuple}, }; -use zenoh::Wait; +use zenoh::{session::EntityId, Wait}; use crate::{ bytes::{Encoding, ZBytes}, @@ -30,7 +30,7 @@ use crate::{ qos::{CongestionControl, Priority, Reliability}, query::{Querier, QueryConsolidation, QueryTarget, Queryable, Reply, Selector}, time::Timestamp, - utils::{timeout, wait, IntoPython, MapInto}, + utils::{duration, wait, IntoPython, MapInto}, }; #[pyclass] @@ -140,7 +140,7 @@ impl Session { #[pyo3(from_py_with = "QueryConsolidation::from_py_opt")] consolidation: Option< QueryConsolidation, >, - #[pyo3(from_py_with = "timeout")] timeout: Option, + #[pyo3(from_py_with = "duration")] timeout: Option, congestion_control: Option, priority: Option, express: Option, @@ -235,7 +235,7 @@ impl Session { #[pyo3(from_py_with = "QueryConsolidation::from_py_opt")] consolidation: Option< QueryConsolidation, >, - #[pyo3(from_py_with = "timeout")] timeout: Option, + #[pyo3(from_py_with = "duration")] timeout: Option, congestion_control: Option, priority: Option, express: Option, @@ -298,3 +298,20 @@ impl SessionInfo { // TODO __repr__ } + +wrapper!(zenoh::session::EntityGlobalId); + +#[pymethods] +impl EntityGlobalId { + #[getter] + fn zid(&self) -> ZenohId { + self.0.zid().into() + } + + #[getter] + fn eid(&self) -> EntityId { + self.0.eid() + } + + // TODO __repr__ +} diff --git a/src/utils.rs b/src/utils.rs index 24edd7a5..9c3d239e 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -20,6 +20,22 @@ use crate::{ ZError, }; +pub(crate) trait IntoResult { + fn into_result(self) -> Result; +} + +impl IntoResult for T { + fn into_result(self) -> Result { + Ok(self) + } +} + +impl IntoResult for Result { + fn into_result(self) -> Result { + self + } +} + pub(crate) trait IntoPyErr { fn into_pyerr(self) -> PyErr; } @@ -42,7 +58,7 @@ pub(crate) trait IntoRust: 'static { fn into_rust(self) -> Self::Into; } -into_rust!(bool, Duration); +into_rust!(bool, usize, f64, Duration); pub(crate) trait IntoPython: Sized + Send + Sync + 'static { type Into: IntoPy; @@ -105,7 +121,7 @@ pub(crate) fn wait( py.allow_threads(|| resolve.wait()).into_pyres() } -pub(crate) fn timeout(obj: &Bound) -> PyResult> { +pub(crate) fn duration(obj: &Bound) -> PyResult> { if obj.is_none() { return Ok(None); } diff --git a/zenoh/__init__.pyi b/zenoh/__init__.pyi index 649aa0b5..676e0c9c 100644 --- a/zenoh/__init__.pyi +++ b/zenoh/__init__.pyi @@ -12,23 +12,21 @@ # ZettaScale Zenoh Team, # -from collections.abc import Callable, Iterable +from collections.abc import Callable from datetime import datetime from enum import Enum, auto from pathlib import Path -from typing import Any, Generic, Literal, Never, Self, TypeVar, final, overload +from typing import Any, Generic, Never, Self, TypeVar, final, overload from . import handlers as handlers from .handlers import Handler as Handler _T = TypeVar("_T") _H = TypeVar("_H") -_F = TypeVar("_F", bound=Callable) _RustHandler = ( handlers.DefaultHandler[_T] | handlers.FifoChannel[_T] | handlers.RingChannel[_T] ) - _PythonCallback = Callable[[_T], Any] _PythonHandler = tuple[_PythonCallback[_T], _H] @@ -304,6 +302,19 @@ class Encoding: _IntoEncoding = Encoding | str +EntityId = int + +@_unstable +@final +class EntityGlobalId: + @property + def zid(self) -> ZenohId: + """Returns the `ZenohId`, i.e. the Zenoh session, this ID is associated to.""" + + @property + def eid(self) -> EntityId: + """Returns the `EntityId` used to identify the entity in a Zenoh session.""" + @final class Hello: @property diff --git a/zenoh/ext.pyi b/zenoh/ext.pyi index a497f783..4b762458 100644 --- a/zenoh/ext.pyi +++ b/zenoh/ext.pyi @@ -1,8 +1,36 @@ -from typing import Any, TypeVar +from collections.abc import Callable +from typing import Any, Generic, Literal, Never, Self, TypeVar, final, overload -from zenoh import ZBytes +from zenoh import ( + CongestionControl, + Encoding, + EntityGlobalId, + Handler, + KeyExpr, + Priority, + Reliability, + Sample, + Subscriber, + ZBytes, + handlers, +) +from zenoh.zenoh import Session _T = TypeVar("_T") +_H = TypeVar("_H") + +_RustHandler = ( + handlers.DefaultHandler[_T] | handlers.FifoChannel[_T] | handlers.RingChannel[_T] +) +_PythonCallback = Callable[[_T], Any] +_PythonHandler = tuple[_PythonCallback[_T], _H] + +def _unstable(item: _T) -> _T: + """marker for unstable functionality""" + +_IntoEncoding = Encoding | str +_IntoKeyExpr = KeyExpr | str +_IntoZBytes = Any class Int8(int): """int subclass enabling to (de)serialize 8bit signed integer.""" @@ -45,3 +73,278 @@ class ZDeserializeError(Exception): def z_serialize(obj: Any) -> ZBytes: ... def z_deserialize(tp: type[_T], zbytes: ZBytes) -> _T: ... +@_unstable +@final +class AdvancedPublisher: + def __enter__(self) -> Self: ... + def __exit__(self, *_args, **_kwargs): ... + @property + def key_expr(self) -> KeyExpr: ... + @property + def encoding(self) -> Encoding: ... + @property + def congestion_control(self) -> CongestionControl: ... + @property + def priority(self) -> Priority: ... + def put( + self, + payload: _IntoZBytes, + *, + encoding: _IntoEncoding | None = None, + attachment: _IntoZBytes | None = None, + ): ... + def delete(self, *, attachment: _IntoZBytes | None = None): ... + def undeclare(self): ... + +@_unstable +@final +class AdvancedSubscriber(Generic[_H]): + def __enter__(self) -> Self: ... + def __exit__(self, *_args, **_kwargs): ... + @property + def key_expr(self) -> KeyExpr: ... + @property + def handler(self) -> _H: ... + @overload + def sample_miss_listener( + self, handler: _RustHandler[Miss] | None = None + ) -> SampleMissListener[Handler[Miss]]: + """Declares a listener to detect missed samples. + + Missed samples can only be detected from `AdvancedPublisher` that enable `sample_miss_detection`. + """ + + @overload + def sample_miss_listener( + self, handler: _PythonHandler[Miss, _H] + ) -> SampleMissListener[_H]: + """Declares a listener to detect missed samples. + + Missed samples can only be detected from `AdvancedPublisher` that enable `sample_miss_detection`. + """ + + @overload + def sample_miss_listener( + self, handler: _PythonCallback[Miss] + ) -> SampleMissListener[None]: + """Declares a listener to detect missed samples. + + Missed samples can only be detected from `AdvancedPublisher` that enable `sample_miss_detection`. + """ + + @overload + def detect_publishers( + self, + handler: _RustHandler[Sample] | None = None, + *, + history: bool | None = None, + ) -> Subscriber[Handler[Sample]]: + """Declares a listener to detect matching publishers. + + Only `AdvancedPublisher` that enable `publisher_detection` can be detected. + """ + + @overload + def detect_publishers( + self, handler: _PythonHandler[Sample, _H], *, history: bool | None = None + ) -> Subscriber[_H]: + """Declares a listener to detect matching publishers. + + Only `AdvancedPublisher` that enable `publisher_detection` can be detected. + """ + + @overload + def detect_publishers( + self, handler: _PythonCallback[Sample], *, history: bool | None = None + ) -> Subscriber[None]: + """Declares a listener to detect matching publishers. + + Only `AdvancedPublisher` that enable `publisher_detection` can be detected. + """ + + @property + def undeclare(self): ... + @overload + def try_recv(self: AdvancedSubscriber[Handler[Sample]]) -> Sample | None: ... + @overload + def try_recv(self) -> Never: ... + @overload + def recv(self: AdvancedSubscriber[Handler[Sample]]) -> Sample: ... + @overload + def recv(self) -> Never: ... + @overload + def __iter__(self: AdvancedSubscriber[Handler[Sample]]) -> Handler[Sample]: ... + @overload + def __iter__(self) -> Never: ... + +@_unstable +@final +class CacheConfig: + def __new__( + cls, + max_samples: int | None = None, + *, + replies_config: RepliesConfig | None = None, + ) -> Self: + """ + :param max_samples: specify how many samples to keep for each resource, default to 1 + :param replies_config: the QoS to apply to replies + """ + +@_unstable +@final +class HistoryConfig: + def __new__( + cls, + *, + detect_late_publishers: bool | None = None, + max_samples: int | None = None, + max_age: float | int | None = None, + ) -> Self: + """ + :param detect_late_publishers: enable detection of late joiner publishers and query for their historical data; + late joiner detection can only be achieved for `AdvancedPublisher` that enable `publisher_detection` + history can only be retransmitted by `AdvancedPublisher` that enable `cache` + :param max_samples: specify how many samples to query for each resource + :param max_age: specify the maximum age of samples to query in seconds + """ + +@_unstable +@final +class Miss: + @property + def source(self) -> EntityGlobalId: + """The source of missed samples.""" + + @property + def nb(self) -> int: + """The number of missed samples.""" + +@_unstable +@final +class MissDetectionConfig: + @overload + def __new__(cls) -> Self: ... + @overload + def __new__(cls, *, heartbeat: float | int) -> Self: + """ + :param heartbeat: period in seconds, allow last sample miss detection through periodic heartbeat; + periodically send the last published Sample's sequence number to allow last sample recovery + `AdvancedSubscriber`(crate::AdvancedSubscriber) can recover the last sample with the `heartbeat` option + """ + + @overload + def __new__(cls, sporadic_heartbeat: float | int) -> Self: + """ + :param sporadic_heartbeat: period in seconds, allow last sample miss detection through sporadic heartbeat; + each period, the last published Sample's sequence number is sent with `CongestionControl.Block` but only if + it has changed since the last period. + """ + +@_unstable +@final +class RecoveryConfig: + @overload + def __new__(cls) -> Self: ... + @overload + def __new__(cls, *, periodic_queries: float | int) -> Self: + """ + :param periodic_queries: enable periodic queries for not yet received Samples and specify their period; + it allows retrieving the last Sample(s) if the last Sample(s) is/are lost, + so it is useful for sporadic publications but useless for periodic publications + with a period smaller or equal to this period + retransmission can only be achieved by `AdvancedPublisher` that enable `cache` and `sample_miss_detection` + """ + + @overload + def __new__(cls, *, heartbeat: Literal[True]) -> Self: + """ + :param heartbeat: subscribe to heartbeats of `AdvancedPublisher`; + it allows receiving the last published Sample's sequence number and check for misses + heartbeat subscriber must be paired with `AdvancedPublishers` that enable `cache` and + `sample_miss_detection` with `heartbeat` or `sporadic_heartbeat` + """ + +@_unstable +@final +class RepliesConfig: + def __new__( + cls, + *, + congestion_control: CongestionControl | None = None, + priority: Priority | None = None, + express: bool | None = None, + ) -> Self: ... + +@_unstable +@final +class SampleMissListener(Generic[_H]): + @property + def undeclare(self): ... + @overload + def try_recv(self: SampleMissListener[Handler[Miss]]) -> Miss | None: ... + @overload + def try_recv(self) -> Never: ... + @overload + def recv(self: SampleMissListener[Handler[Miss]]) -> Miss: ... + @overload + def recv(self) -> Never: ... + @overload + def __iter__(self: SampleMissListener[Handler[Miss]]) -> Handler[Miss]: ... + @overload + def __iter__(self) -> Never: ... + +@_unstable +def declare_advanced_publisher( + session: Session, + key_expr: _IntoKeyExpr, + *, + encoding: _IntoEncoding | None = None, + congestion_control: CongestionControl | None = None, + priority: Priority | None = None, + express: bool | None = None, + reliability: Reliability | None = None, + cache: CacheConfig | int | None = None, + sample_miss_detection: MissDetectionConfig | None = None, + publisher_detection: bool | None = None, +) -> AdvancedPublisher: + """Create an AdvancedPublisher for the given key expression.""" + +@_unstable +@overload +def declare_advanced_subscriber( + session: Session, + key_expr: _IntoKeyExpr, + handler: _RustHandler[Sample] | None = None, + *, + history: HistoryConfig | None = None, + recovery: RecoveryConfig | None = None, + subscriber_detection: bool | None = None, +) -> AdvancedSubscriber[Handler[Sample]]: + """Create an AdvancedSubscriber for the given key expression.""" + +@_unstable +@overload +def declare_advanced_subscriber( + session: Session, + key_expr: _IntoKeyExpr, + handler: _PythonHandler[Sample, _H], + *, + history: HistoryConfig | None = None, + recovery: RecoveryConfig | None = None, + subscriber_detection: bool | None = None, +) -> AdvancedSubscriber[_H]: + """Create an AdvancedSubscriber for the given key expression.""" + +@_unstable +@overload +def declare_advanced_subscriber( + session: Session, + key_expr: _IntoKeyExpr, + handler: _PythonCallback[Sample], + *, + history: HistoryConfig | None = None, + recovery: RecoveryConfig | None = None, + subscriber_detection: bool | None = None, +) -> AdvancedSubscriber[None]: + """Create an AdvancedSubscriber for the given key expression.""" From b3b164272f8a1d1e1877089d0052ebe9343df204 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 23 Jun 2025 09:09:19 +0200 Subject: [PATCH 2/7] add examples --- examples/z_advanced_pub.py | 79 ++++++++++++++++++++++++++++++++++++++ examples/z_advanced_sub.py | 72 ++++++++++++++++++++++++++++++++++ src/ext.rs | 13 +++---- src/lib.rs | 6 ++- src/session.rs | 4 +- zenoh/ext.pyi | 2 +- 6 files changed, 166 insertions(+), 10 deletions(-) create mode 100644 examples/z_advanced_pub.py create mode 100644 examples/z_advanced_sub.py diff --git a/examples/z_advanced_pub.py b/examples/z_advanced_pub.py new file mode 100644 index 00000000..149b0bd9 --- /dev/null +++ b/examples/z_advanced_pub.py @@ -0,0 +1,79 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +import time +from typing import Optional + +import zenoh +from zenoh.ext import CacheConfig, MissDetectionConfig, declare_advanced_publisher + + +def main(conf: zenoh.Config, key: str, payload: str, history: int): + # initiate logging + zenoh.init_log_from_env_or("error") + + print("Opening session...") + with zenoh.open(conf) as session: + print(f"Declaring AdvancedPublisher on '{key}'...") + pub = declare_advanced_publisher( + session, + key, + cache=CacheConfig(max_samples=history), + sample_miss_detection=MissDetectionConfig(heartbeat=5), + publisher_detection=True, + ) + + print("Press CTRL-C to quit...") + for idx in itertools.count(): + buf = f"[{idx:4d}] {payload}" + print(f"Putting Data ('{key}': '{buf}')...") + pub.put(buf) + + +# --- Command line argument parsing --- --- --- --- --- --- +if __name__ == "__main__": + import argparse + import itertools + + import common + + parser = argparse.ArgumentParser(prog="z_pub", description="zenoh pub example") + common.add_config_arguments(parser) + parser.add_argument( + "--key", + "-k", + dest="key", + default="demo/example/zenoh-python-pub", + type=str, + help="The key expression to publish onto.", + ) + parser.add_argument( + "--payload", + "-p", + dest="payload", + default="Pub from Python!", + type=str, + help="The payload to publish.", + ) + parser.add_argument( + "--history", + dest="history", + type=int, + default=1, + help="The number of publications to keep in cache", + ) + + args = parser.parse_args() + conf = common.get_config_from_args(args) + + main(conf, args.key, args.payload, args.history, args.iter, args.interval) diff --git a/examples/z_advanced_sub.py b/examples/z_advanced_sub.py new file mode 100644 index 00000000..2373ea57 --- /dev/null +++ b/examples/z_advanced_sub.py @@ -0,0 +1,72 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +import time + +import zenoh +from zenoh.ext import HistoryConfig, Miss, RecoveryConfig, declare_advanced_subscriber + + +def main(conf: zenoh.Config, key: str): + # initiate logging + zenoh.init_log_from_env_or("error") + + print("Opening session...") + with zenoh.open(conf) as session: + print(f"Declaring Subscriber on '{key}'...") + + def listener(sample: zenoh.Sample): + print( + f">> [Subscriber] Received {sample.kind} ('{sample.key_expr}': '{sample.payload.to_string()}')" + ) + + advanced_sub = declare_advanced_subscriber( + session, + key, + listener, + history=HistoryConfig(detect_late_publishers=True), + recovery=RecoveryConfig(heartbeat=True), + subscriber_detection=True, + ) + + def miss_listener(miss: Miss): + print(f">> [Subscriber] Missed {miss.nb} samples from {miss.source} !!!") + + advanced_sub.sample_miss_listener(miss_listener) + + print("Press CTRL-C to quit...") + while True: + time.sleep(1) + + +# --- Command line argument parsing --- --- --- --- --- --- +if __name__ == "__main__": + import argparse + + import common + + parser = argparse.ArgumentParser(prog="z_sub", description="zenoh sub example") + common.add_config_arguments(parser) + parser.add_argument( + "--key", + "-k", + dest="key", + default="demo/example/**", + type=str, + help="The key expression to subscribe to.", + ) + + args = parser.parse_args() + conf = common.get_config_from_args(args) + + main(conf, args.key) diff --git a/src/ext.rs b/src/ext.rs index ec32ed84..c3fa1632 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -18,7 +18,7 @@ use crate::{ bytes::{Encoding, ZBytes}, handlers::{into_handler, HandlerImpl}, key_expr::KeyExpr, - macros::{build, downcast_or_new, import, option_wrapper, py_static, try_import, wrapper}, + macros::{build, import, option_wrapper, py_static, try_import, wrapper}, pubsub::Subscriber, qos::{CongestionControl, Priority, Reliability}, sample::Sample, @@ -594,7 +594,6 @@ impl AdvancedSubscriber { } wrapper!(zenoh_ext::CacheConfig: Clone); -downcast_or_new!(CacheConfig => Option, None); #[pymethods] impl CacheConfig { @@ -634,12 +633,12 @@ wrapper!(zenoh_ext::Miss); impl Miss { #[getter] fn source(&self) -> EntityGlobalId { - self.source().into() + self.0.source().into() } #[getter] - fn nb(&self) -> usize { - self.nb() + fn nb(&self) -> u32 { + self.0.nb() } } @@ -745,7 +744,7 @@ impl SampleMissListener { #[allow(clippy::too_many_arguments)] #[pyfunction] #[pyo3(signature = (session, key_expr, *, encoding = None, congestion_control = None, priority = None, express = None, reliability = None, cache = None, sample_miss_detection = None, publisher_detection = None))] -fn declare_publisher( +pub(crate) fn declare_advanced_publisher( py: Python, session: &Session, #[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr, @@ -777,7 +776,7 @@ fn declare_publisher( #[allow(clippy::too_many_arguments)] #[pyfunction] #[pyo3(signature = (session, key_expr, handler = None, *, history = None, recovery = None, subscriber_detection = None))] -fn declare_subscriber( +pub(crate) fn declare_advanced_subscriber( session: &Session, py: Python, #[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr, diff --git a/src/lib.rs b/src/lib.rs index 122b058e..f568a890 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,7 +80,11 @@ pub(crate) mod zenoh { #[pymodule] mod _ext { #[pymodule_export] - use crate::ext::{z_deserialize, z_serialize}; + use crate::ext::{ + declare_advanced_publisher, declare_advanced_subscriber, z_deserialize, z_serialize, + AdvancedPublisher, AdvancedSubscriber, CacheConfig, HistoryConfig, Miss, + MissDetectionConfig, RecoveryConfig, RepliesConfig, SampleMissListener, + }; #[pymodule_export] use crate::ZDeserializeError; } diff --git a/src/session.rs b/src/session.rs index 26b6844b..df54643d 100644 --- a/src/session.rs +++ b/src/session.rs @@ -313,5 +313,7 @@ impl EntityGlobalId { self.0.eid() } - // TODO __repr__ + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } } diff --git a/zenoh/ext.pyi b/zenoh/ext.pyi index 4b762458..b921326e 100644 --- a/zenoh/ext.pyi +++ b/zenoh/ext.pyi @@ -304,7 +304,7 @@ def declare_advanced_publisher( priority: Priority | None = None, express: bool | None = None, reliability: Reliability | None = None, - cache: CacheConfig | int | None = None, + cache: CacheConfig | None = None, sample_miss_detection: MissDetectionConfig | None = None, publisher_detection: bool | None = None, ) -> AdvancedPublisher: From 62c3d0e679e2fd374054fb106854d288291743aa Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Wed, 2 Jul 2025 12:44:25 +0200 Subject: [PATCH 3/7] Merge branch 'main' into advanced-pubsub --- Cargo.lock | 60 ++++++++++++++++++++++++++------------------------ docs/conf.py | 1 + docs/index.rst | 7 ++++++ src/bytes.rs | 15 ++++++++----- zenoh/ext.pyi | 28 +++++++++++++++++++++-- 5 files changed, 75 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 803323b6..8b3138bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 4 +version = 3 [[package]] name = "addr2line" @@ -1426,9 +1426,9 @@ dependencies = [ [[package]] name = "portable-atomic" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" +checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" [[package]] name = "powerfmt" @@ -2940,7 +2940,7 @@ dependencies = [ [[package]] name = "zenoh" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "ahash", "arc-swap", @@ -2989,7 +2989,7 @@ dependencies = [ [[package]] name = "zenoh-buffers" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "zenoh-collections", ] @@ -2997,7 +2997,7 @@ dependencies = [ [[package]] name = "zenoh-codec" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "tracing", "uhlc", @@ -3008,7 +3008,7 @@ dependencies = [ [[package]] name = "zenoh-collections" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "ahash", ] @@ -3016,7 +3016,7 @@ dependencies = [ [[package]] name = "zenoh-config" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "json5", "nonempty-collections", @@ -3040,7 +3040,7 @@ dependencies = [ [[package]] name = "zenoh-core" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "lazy_static", "tokio", @@ -3051,7 +3051,7 @@ dependencies = [ [[package]] name = "zenoh-crypto" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "aes", "hmac", @@ -3064,7 +3064,7 @@ dependencies = [ [[package]] name = "zenoh-ext" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "async-trait", "bincode", @@ -3083,7 +3083,7 @@ dependencies = [ [[package]] name = "zenoh-keyexpr" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "getrandom", "hashbrown 0.14.5", @@ -3098,7 +3098,7 @@ dependencies = [ [[package]] name = "zenoh-link" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "zenoh-config", "zenoh-link-commons", @@ -3115,7 +3115,7 @@ dependencies = [ [[package]] name = "zenoh-link-commons" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "async-trait", "flume", @@ -3140,7 +3140,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "async-trait", "base64 0.22.1", @@ -3167,7 +3167,7 @@ dependencies = [ [[package]] name = "zenoh-link-tcp" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "async-trait", "socket2", @@ -3184,7 +3184,7 @@ dependencies = [ [[package]] name = "zenoh-link-tls" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "async-trait", "base64 0.22.1", @@ -3213,13 +3213,15 @@ dependencies = [ [[package]] name = "zenoh-link-udp" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "async-trait", + "libc", "socket2", "tokio", "tokio-util", "tracing", + "windows-sys 0.59.0", "zenoh-buffers", "zenoh-core", "zenoh-link-commons", @@ -3232,7 +3234,7 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "async-trait", "nix", @@ -3250,7 +3252,7 @@ dependencies = [ [[package]] name = "zenoh-link-ws" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "async-trait", "futures-util", @@ -3270,7 +3272,7 @@ dependencies = [ [[package]] name = "zenoh-macros" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "proc-macro2", "quote", @@ -3281,7 +3283,7 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "git-version", "libloading", @@ -3297,7 +3299,7 @@ dependencies = [ [[package]] name = "zenoh-protocol" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "const_format", "rand", @@ -3324,7 +3326,7 @@ dependencies = [ [[package]] name = "zenoh-result" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "anyhow", ] @@ -3332,7 +3334,7 @@ dependencies = [ [[package]] name = "zenoh-runtime" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "lazy_static", "ron", @@ -3346,7 +3348,7 @@ dependencies = [ [[package]] name = "zenoh-sync" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "arc-swap", "event-listener", @@ -3360,7 +3362,7 @@ dependencies = [ [[package]] name = "zenoh-task" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "futures", "tokio", @@ -3373,7 +3375,7 @@ dependencies = [ [[package]] name = "zenoh-transport" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "async-trait", "crossbeam-utils", @@ -3406,7 +3408,7 @@ dependencies = [ [[package]] name = "zenoh-util" version = "1.4.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#b33ddc35210faf949ecb8f68c5fcff520d8b1d2e" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#287d5aca455be005e735d71a206ce9878eac6e12" dependencies = [ "async-trait", "const_format", diff --git a/docs/conf.py b/docs/conf.py index 6b441a4f..1f6d3957 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -26,6 +26,7 @@ import tomllib import zenoh +import zenoh.ext # -- Project information ----------------------------------------------------- diff --git a/docs/index.rst b/docs/index.rst index 2933bf0e..48e511c9 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -74,3 +74,10 @@ module zenoh.handlers .. automodule:: zenoh.handlers :members: :undoc-members: + +module zenoh.ext +================ + +.. automodule:: zenoh.ext + :members: + :undoc-members: diff --git a/src/bytes.rs b/src/bytes.rs index 2dcfc1d3..b5830b6f 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -49,8 +49,11 @@ impl ZBytes { } } - fn to_bytes(&self) -> Cow<[u8]> { - self.0.to_bytes() + fn to_bytes<'py>(&self, py: Python<'py>) -> PyResult> { + // Not using `ZBytes::to_bytes` + PyBytes::new_bound_with(py, self.0.len(), |bytes| { + self.0.reader().read_exact(bytes).into_pyres() + }) } fn to_string(&self) -> PyResult> { @@ -68,9 +71,11 @@ impl ZBytes { } fn __bytes__<'py>(&self, py: Python<'py>) -> PyResult> { - PyBytes::new_bound_with(py, self.0.len(), |bytes| { - self.0.reader().read_exact(bytes).into_pyres() - }) + self.to_bytes(py) + } + + fn __str__(&self) -> PyResult> { + self.to_string() } fn __eq__(&self, other: &Bound) -> PyResult { diff --git a/zenoh/ext.pyi b/zenoh/ext.pyi index b921326e..39891737 100644 --- a/zenoh/ext.pyi +++ b/zenoh/ext.pyi @@ -71,8 +71,32 @@ class Float64(float): class ZDeserializeError(Exception): pass -def z_serialize(obj: Any) -> ZBytes: ... -def z_deserialize(tp: type[_T], zbytes: ZBytes) -> _T: ... +def z_serialize(obj: Any) -> ZBytes: + """Serialize an object of supported type according to the `Zenoh serialization format `_. + + Supported types are: + + * UInt8, UInt16, Uint32, UInt64, UInt128, Int8, Int16, Int32, Int64, Int128, int (handled as int32), Float32, Float64, float (handled as Float64), bool; + + * Str, Bytes, ByteArray; + + * List, Dict, Set, FrozenSet and Tuple of supported types. + """ + pass + +def z_deserialize(tp: type[_T], zbytes: ZBytes) -> _T: + """Deserialize into an object of supported type according to the `Zenoh serialization format `_. + + Supported types are: + + * UInt8, UInt16, Uint32, UInt64, UInt128, Int8, Int16, Int32, Int64, Int128, int (handled as int32), Float32, Float64, float (handled as Float64), bool; + + * Str, Bytes, ByteArray; + + * List, Dict, Set, FrozenSet and Tuple of supported types. + """ + pass + @_unstable @final class AdvancedPublisher: From 6987c5871a153b96b482864d902a74b3e6422fb7 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Wed, 2 Jul 2025 13:01:39 +0200 Subject: [PATCH 4/7] fix examples and add test --- examples/z_advanced_pub.py | 5 +++-- examples/z_advanced_sub.py | 2 +- tests/examples_check.py | 28 ++++++++++++++++++++++++++++ 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/examples/z_advanced_pub.py b/examples/z_advanced_pub.py index 149b0bd9..5da12c67 100644 --- a/examples/z_advanced_pub.py +++ b/examples/z_advanced_pub.py @@ -35,6 +35,7 @@ def main(conf: zenoh.Config, key: str, payload: str, history: int): print("Press CTRL-C to quit...") for idx in itertools.count(): + time.sleep(1) buf = f"[{idx:4d}] {payload}" print(f"Putting Data ('{key}': '{buf}')...") pub.put(buf) @@ -47,7 +48,7 @@ def main(conf: zenoh.Config, key: str, payload: str, history: int): import common - parser = argparse.ArgumentParser(prog="z_pub", description="zenoh pub example") + parser = argparse.ArgumentParser(prog="z_advanced_pub", description="zenoh advanced pub example") common.add_config_arguments(parser) parser.add_argument( "--key", @@ -76,4 +77,4 @@ def main(conf: zenoh.Config, key: str, payload: str, history: int): args = parser.parse_args() conf = common.get_config_from_args(args) - main(conf, args.key, args.payload, args.history, args.iter, args.interval) + main(conf, args.key, args.payload, args.history) diff --git a/examples/z_advanced_sub.py b/examples/z_advanced_sub.py index 2373ea57..9131179d 100644 --- a/examples/z_advanced_sub.py +++ b/examples/z_advanced_sub.py @@ -55,7 +55,7 @@ def miss_listener(miss: Miss): import common - parser = argparse.ArgumentParser(prog="z_sub", description="zenoh sub example") + parser = argparse.ArgumentParser(prog="z_advanced_sub", description="zenoh advanced sub example") common.add_config_arguments(parser) parser.add_argument( "--key", diff --git a/tests/examples_check.py b/tests/examples_check.py index 4bd6aafe..e0cb9d72 100644 --- a/tests/examples_check.py +++ b/tests/examples_check.py @@ -356,3 +356,31 @@ def test_z_sub_thr_z_pub_thr(): assert not sub_thr.errors assert not pub_thr.errors + + +def test_z_advanced_pub_z_advanced_sub(): + """Test z_advanced_pub & z_advanced_sub.""" + ## Run z_advanced_pub and z_advanced_sub + ## z_advanced_pub: Start publishing messages + pub = Pyrun("z_advanced_pub.py", ["--history=10"]) + time.sleep(5) # wait 5 seconds to ensure that we miss few messages + sub = Pyrun("z_advanced_sub.py", []) + time.sleep(5) + + if error := pub.interrupt(): + pub.dbg() + pub.errors.append(error) + if error := sub.interrupt(): + sub.dbg() + sub.errors.append(error) + + sub_out = "".join(sub.stdout) + for i in range(0, 8): + if not ( + F"Received SampleKind.PUT ('demo/example/zenoh-python-pub': '[ {i}] Pub from Python!')" + in sub_out + ): + sub.errors.append(F"z_advanced_sub didn't catch the {i}-th z_advanced_pub message") + + assert not pub.errors + assert not sub.errors \ No newline at end of file From a78159ecff05f9f8921ee57f30f6801bb320b414 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Wed, 2 Jul 2025 13:07:39 +0200 Subject: [PATCH 5/7] format fix --- tests/examples_check.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/examples_check.py b/tests/examples_check.py index e0cb9d72..38118318 100644 --- a/tests/examples_check.py +++ b/tests/examples_check.py @@ -363,7 +363,7 @@ def test_z_advanced_pub_z_advanced_sub(): ## Run z_advanced_pub and z_advanced_sub ## z_advanced_pub: Start publishing messages pub = Pyrun("z_advanced_pub.py", ["--history=10"]) - time.sleep(5) # wait 5 seconds to ensure that we miss few messages + time.sleep(5) # wait 5 seconds to ensure that we miss few messages sub = Pyrun("z_advanced_sub.py", []) time.sleep(5) @@ -377,10 +377,12 @@ def test_z_advanced_pub_z_advanced_sub(): sub_out = "".join(sub.stdout) for i in range(0, 8): if not ( - F"Received SampleKind.PUT ('demo/example/zenoh-python-pub': '[ {i}] Pub from Python!')" + f"Received SampleKind.PUT ('demo/example/zenoh-python-pub': '[ {i}] Pub from Python!')" in sub_out ): - sub.errors.append(F"z_advanced_sub didn't catch the {i}-th z_advanced_pub message") + sub.errors.append( + f"z_advanced_sub didn't catch the {i}-th z_advanced_pub message" + ) assert not pub.errors - assert not sub.errors \ No newline at end of file + assert not sub.errors From 1256929128fc994fabefd462dfe3db855cbcdc8a Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Wed, 2 Jul 2025 13:11:46 +0200 Subject: [PATCH 6/7] format fix --- examples/z_advanced_pub.py | 4 +++- examples/z_advanced_sub.py | 4 +++- zenoh/ext.pyi | 1 - 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/examples/z_advanced_pub.py b/examples/z_advanced_pub.py index 5da12c67..a91d9ea4 100644 --- a/examples/z_advanced_pub.py +++ b/examples/z_advanced_pub.py @@ -48,7 +48,9 @@ def main(conf: zenoh.Config, key: str, payload: str, history: int): import common - parser = argparse.ArgumentParser(prog="z_advanced_pub", description="zenoh advanced pub example") + parser = argparse.ArgumentParser( + prog="z_advanced_pub", description="zenoh advanced pub example" + ) common.add_config_arguments(parser) parser.add_argument( "--key", diff --git a/examples/z_advanced_sub.py b/examples/z_advanced_sub.py index 9131179d..24df7552 100644 --- a/examples/z_advanced_sub.py +++ b/examples/z_advanced_sub.py @@ -55,7 +55,9 @@ def miss_listener(miss: Miss): import common - parser = argparse.ArgumentParser(prog="z_advanced_sub", description="zenoh advanced sub example") + parser = argparse.ArgumentParser( + prog="z_advanced_sub", description="zenoh advanced sub example" + ) common.add_config_arguments(parser) parser.add_argument( "--key", diff --git a/zenoh/ext.pyi b/zenoh/ext.pyi index 39891737..52139c41 100644 --- a/zenoh/ext.pyi +++ b/zenoh/ext.pyi @@ -186,7 +186,6 @@ class AdvancedSubscriber(Generic[_H]): Only `AdvancedPublisher` that enable `publisher_detection` can be detected. """ - @property def undeclare(self): ... @overload def try_recv(self: AdvancedSubscriber[Handler[Sample]]) -> Sample | None: ... From 9feddb6018c223ea0ebd778d4ac51dcec6d15753 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Wed, 2 Jul 2025 17:39:55 +0200 Subject: [PATCH 7/7] fix docs for __new__ --- docs/stubs_to_sources.py | 32 +++++++------ zenoh/ext.pyi | 100 ++++++++++++++++++++------------------- 2 files changed, 69 insertions(+), 63 deletions(-) diff --git a/docs/stubs_to_sources.py b/docs/stubs_to_sources.py index 2476f560..3e5a99fe 100644 --- a/docs/stubs_to_sources.py +++ b/docs/stubs_to_sources.py @@ -28,6 +28,7 @@ PACKAGE = (Path(__file__) / "../../zenoh").resolve() __INIT__ = PACKAGE / "__init__.py" +EXT = PACKAGE / "ext.py" def _unstable(item): @@ -98,23 +99,26 @@ def visit_FunctionDef(self, node: ast.FunctionDef): def main(): - # remove __init__.pyi - __INIT__.unlink() + fnames = [__INIT__, EXT] + for fname in fnames: + # remove *.py + fname.unlink() # rename stubs for entry in PACKAGE.glob("*.pyi"): entry.rename(PACKAGE / f"{entry.stem}.py") - # read stub code - with open(__INIT__) as f: - stub: ast.Module = ast.parse(f.read()) - # replace _unstable - for i, stmt in enumerate(stub.body): - if isinstance(stmt, ast.FunctionDef) and stmt.name == "_unstable": - stub.body[i] = ast.parse(inspect.getsource(_unstable)) - # remove overload - stub = RemoveOverload().visit(stub) - # write modified code - with open(__INIT__, "w") as f: - f.write(ast.unparse(stub)) + for fname in fnames: + # read stub code + with open(fname) as f: + stub: ast.Module = ast.parse(f.read()) + # replace _unstable + for i, stmt in enumerate(stub.body): + if isinstance(stmt, ast.FunctionDef) and stmt.name == "_unstable": + stub.body[i] = ast.parse(inspect.getsource(_unstable)) + # remove overload + stub = RemoveOverload().visit(stub) + # write modified code + with open(fname, "w") as f: + f.write(ast.unparse(stub)) if __name__ == "__main__": diff --git a/zenoh/ext.pyi b/zenoh/ext.pyi index 52139c41..0e782dc6 100644 --- a/zenoh/ext.pyi +++ b/zenoh/ext.pyi @@ -203,34 +203,36 @@ class AdvancedSubscriber(Generic[_H]): @_unstable @final class CacheConfig: + """ + :param max_samples: specify how many samples to keep for each resource, default to 1 + :param replies_config: the QoS to apply to replies + """ + def __new__( cls, max_samples: int | None = None, *, replies_config: RepliesConfig | None = None, - ) -> Self: - """ - :param max_samples: specify how many samples to keep for each resource, default to 1 - :param replies_config: the QoS to apply to replies - """ + ) -> Self: ... @_unstable @final class HistoryConfig: + """ + :param detect_late_publishers: enable detection of late joiner publishers and query for their historical data; + late joiner detection can only be achieved for `AdvancedPublisher` that enable `publisher_detection` + history can only be retransmitted by `AdvancedPublisher` that enable `cache` + :param max_samples: specify how many samples to query for each resource + :param max_age: specify the maximum age of samples to query in seconds + """ + def __new__( cls, *, detect_late_publishers: bool | None = None, max_samples: int | None = None, max_age: float | int | None = None, - ) -> Self: - """ - :param detect_late_publishers: enable detection of late joiner publishers and query for their historical data; - late joiner detection can only be achieved for `AdvancedPublisher` that enable `publisher_detection` - history can only be retransmitted by `AdvancedPublisher` that enable `cache` - :param max_samples: specify how many samples to query for each resource - :param max_age: specify the maximum age of samples to query in seconds - """ + ) -> Self: ... @_unstable @final @@ -246,47 +248,48 @@ class Miss: @_unstable @final class MissDetectionConfig: - @overload - def __new__(cls) -> Self: ... - @overload - def __new__(cls, *, heartbeat: float | int) -> Self: - """ - :param heartbeat: period in seconds, allow last sample miss detection through periodic heartbeat; - periodically send the last published Sample's sequence number to allow last sample recovery - `AdvancedSubscriber`(crate::AdvancedSubscriber) can recover the last sample with the `heartbeat` option - """ + """ + :param heartbeat: period in seconds, allow last sample miss detection through periodic heartbeat; + periodically send the last published Sample's sequence number to allow last sample recovery. + `AdvancedSubscriber can only recover the last sample with the `heartbeat` option enabled. - @overload - def __new__(cls, sporadic_heartbeat: float | int) -> Self: - """ - :param sporadic_heartbeat: period in seconds, allow last sample miss detection through sporadic heartbeat; - each period, the last published Sample's sequence number is sent with `CongestionControl.Block` but only if - it has changed since the last period. - """ + **This option can not be enabled simultaneously with `sporadic_heartbeat`.** + + :param sporadic_heartbeat: period in seconds, allow last sample miss detection through sporadic heartbeat; + each period, the last published Sample's sequence number is sent with `CongestionControl.Block` but only if + it has changed since the last period. + `AdvancedSubscriber can only recover the last sample with the `heartbeat` option enabled. + + **This option can not be enabled simultaneously with `heartbeat`.** + """ + + def __new__( + cls, *, heartbeat: float | int | None, sporadic_heartbeat: float | int | None + ) -> Self: ... @_unstable @final class RecoveryConfig: - @overload - def __new__(cls) -> Self: ... - @overload - def __new__(cls, *, periodic_queries: float | int) -> Self: - """ - :param periodic_queries: enable periodic queries for not yet received Samples and specify their period; - it allows retrieving the last Sample(s) if the last Sample(s) is/are lost, - so it is useful for sporadic publications but useless for periodic publications - with a period smaller or equal to this period - retransmission can only be achieved by `AdvancedPublisher` that enable `cache` and `sample_miss_detection` - """ + """ + :param periodic_queries: enable periodic queries for not yet received Samples and specify their period; + it allows retrieving the last Sample(s) if the last Sample(s) is/are lost, + so it is useful for sporadic publications but useless for periodic publications + with a period smaller or equal to this period. + Retransmission can only be achieved by `AdvancedPublisher` that enable `cache` and `sample_miss_detection`. - @overload - def __new__(cls, *, heartbeat: Literal[True]) -> Self: - """ - :param heartbeat: subscribe to heartbeats of `AdvancedPublisher`; - it allows receiving the last published Sample's sequence number and check for misses - heartbeat subscriber must be paired with `AdvancedPublishers` that enable `cache` and - `sample_miss_detection` with `heartbeat` or `sporadic_heartbeat` - """ + **This option can not be enabled simultaneously with `heartbeat`.** + + :param heartbeat: subscribe to heartbeats of `AdvancedPublisher`; + it allows receiving the last published Sample's sequence number and check for misses. + Heartbeat subscriber must be paired with `AdvancedPublishers` that enable `cache` and + `sample_miss_detection` with `heartbeat` or `sporadic_heartbeat`. + + **This option can not be enabled simultaneously with `periodic_queries`.** + """ + + def __new__( + cls, *, periodic_queries: float | int | None, heartbeat: Literal[True] | None + ) -> Self: ... @_unstable @final @@ -302,7 +305,6 @@ class RepliesConfig: @_unstable @final class SampleMissListener(Generic[_H]): - @property def undeclare(self): ... @overload def try_recv(self: SampleMissListener[Handler[Miss]]) -> Miss | None: ...