diff --git a/Cargo.toml b/Cargo.toml index 09c87a75..41100d1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ crate-type = ["cdylib"] [features] default = ["zenoh/default", "zenoh-ext"] +zenoh-ext = ["dep:zenoh-ext", "zenoh-ext/unstable", "zenoh-ext/internal"] [badges] maintenance = { status = "actively-developed" } diff --git a/docs/stubs_to_sources.py b/docs/stubs_to_sources.py index 2476f560..3e5a99fe 100644 --- a/docs/stubs_to_sources.py +++ b/docs/stubs_to_sources.py @@ -28,6 +28,7 @@ PACKAGE = (Path(__file__) / "../../zenoh").resolve() __INIT__ = PACKAGE / "__init__.py" +EXT = PACKAGE / "ext.py" def _unstable(item): @@ -98,23 +99,26 @@ def visit_FunctionDef(self, node: ast.FunctionDef): def main(): - # remove __init__.pyi - __INIT__.unlink() + fnames = [__INIT__, EXT] + for fname in fnames: + # remove *.py + fname.unlink() # rename stubs for entry in PACKAGE.glob("*.pyi"): entry.rename(PACKAGE / f"{entry.stem}.py") - # read stub code - with open(__INIT__) as f: - stub: ast.Module = ast.parse(f.read()) - # replace _unstable - for i, stmt in enumerate(stub.body): - if isinstance(stmt, ast.FunctionDef) and stmt.name == "_unstable": - stub.body[i] = ast.parse(inspect.getsource(_unstable)) - # remove overload - stub = RemoveOverload().visit(stub) - # write modified code - with open(__INIT__, "w") as f: - f.write(ast.unparse(stub)) + for fname in fnames: + # read stub code + with open(fname) as f: + stub: ast.Module = ast.parse(f.read()) + # replace _unstable + for i, stmt in enumerate(stub.body): + if isinstance(stmt, ast.FunctionDef) and stmt.name == "_unstable": + stub.body[i] = ast.parse(inspect.getsource(_unstable)) + # remove overload + stub = RemoveOverload().visit(stub) + # write modified code + with open(fname, "w") as f: + f.write(ast.unparse(stub)) if __name__ == "__main__": diff --git a/examples/z_advanced_pub.py b/examples/z_advanced_pub.py new file mode 100644 index 00000000..a91d9ea4 --- /dev/null +++ b/examples/z_advanced_pub.py @@ -0,0 +1,82 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +import time +from typing import Optional + +import zenoh +from zenoh.ext import CacheConfig, MissDetectionConfig, declare_advanced_publisher + + +def main(conf: zenoh.Config, key: str, payload: str, history: int): + # initiate logging + zenoh.init_log_from_env_or("error") + + print("Opening session...") + with zenoh.open(conf) as session: + print(f"Declaring AdvancedPublisher on '{key}'...") + pub = declare_advanced_publisher( + session, + key, + cache=CacheConfig(max_samples=history), + sample_miss_detection=MissDetectionConfig(heartbeat=5), + publisher_detection=True, + ) + + print("Press CTRL-C to quit...") + for idx in itertools.count(): + time.sleep(1) + buf = f"[{idx:4d}] {payload}" + print(f"Putting Data ('{key}': '{buf}')...") + pub.put(buf) + + +# --- Command line argument parsing --- --- --- --- --- --- +if __name__ == "__main__": + import argparse + import itertools + + import common + + parser = argparse.ArgumentParser( + prog="z_advanced_pub", description="zenoh advanced pub example" + ) + common.add_config_arguments(parser) + parser.add_argument( + "--key", + "-k", + dest="key", + default="demo/example/zenoh-python-pub", + type=str, + help="The key expression to publish onto.", + ) + parser.add_argument( + "--payload", + "-p", + dest="payload", + default="Pub from Python!", + type=str, + help="The payload to publish.", + ) + parser.add_argument( + "--history", + dest="history", + type=int, + default=1, + help="The number of publications to keep in cache", + ) + + args = parser.parse_args() + conf = common.get_config_from_args(args) + + main(conf, args.key, args.payload, args.history) diff --git a/examples/z_advanced_sub.py b/examples/z_advanced_sub.py new file mode 100644 index 00000000..24df7552 --- /dev/null +++ b/examples/z_advanced_sub.py @@ -0,0 +1,74 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +import time + +import zenoh +from zenoh.ext import HistoryConfig, Miss, RecoveryConfig, declare_advanced_subscriber + + +def main(conf: zenoh.Config, key: str): + # initiate logging + zenoh.init_log_from_env_or("error") + + print("Opening session...") + with zenoh.open(conf) as session: + print(f"Declaring Subscriber on '{key}'...") + + def listener(sample: zenoh.Sample): + print( + f">> [Subscriber] Received {sample.kind} ('{sample.key_expr}': '{sample.payload.to_string()}')" + ) + + advanced_sub = declare_advanced_subscriber( + session, + key, + listener, + history=HistoryConfig(detect_late_publishers=True), + recovery=RecoveryConfig(heartbeat=True), + subscriber_detection=True, + ) + + def miss_listener(miss: Miss): + print(f">> [Subscriber] Missed {miss.nb} samples from {miss.source} !!!") + + advanced_sub.sample_miss_listener(miss_listener) + + print("Press CTRL-C to quit...") + while True: + time.sleep(1) + + +# --- Command line argument parsing --- --- --- --- --- --- +if __name__ == "__main__": + import argparse + + import common + + parser = argparse.ArgumentParser( + prog="z_advanced_sub", description="zenoh advanced sub example" + ) + common.add_config_arguments(parser) + parser.add_argument( + "--key", + "-k", + dest="key", + default="demo/example/**", + type=str, + help="The key expression to subscribe to.", + ) + + args = parser.parse_args() + conf = common.get_config_from_args(args) + + main(conf, args.key) diff --git a/src/bytes.rs b/src/bytes.rs index 21852fca..b5830b6f 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -97,8 +97,8 @@ downcast_or_new!(Encoding => Option); #[pymethods] impl Encoding { #[new] - fn new(s: Option) -> PyResult { - Ok(s.map_into().map(Self).unwrap_or_default()) + fn new(s: Option) -> Self { + s.map_into().map(Self).unwrap_or_default() } fn with_schema(&self, schema: String) -> Self { diff --git a/src/ext.rs b/src/ext.rs index ef927c2c..c3fa1632 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -1,17 +1,29 @@ +use std::time::Duration; + use pyo3::{ exceptions::{PyTypeError, PyValueError}, prelude::*, types::{ - PyBool, PyByteArray, PyBytes, PyDict, PyFloat, PyFrozenSet, PyInt, PyList, PySet, PyString, - PyTuple, PyType, + PyBool, PyByteArray, PyBytes, PyDict, PyFloat, PyFrozenSet, PyInt, PyIterator, PyList, + PySet, PyString, PyTuple, PyType, }, PyTypeInfo, }; -use zenoh_ext::{Deserialize, VarInt, ZDeserializer, ZSerializer}; +use zenoh_ext::{ + AdvancedPublisherBuilderExt, AdvancedSubscriberBuilderExt, Deserialize, VarInt, ZDeserializer, + ZSerializer, +}; use crate::{ - bytes::ZBytes, - macros::{import, py_static, try_import}, + bytes::{Encoding, ZBytes}, + handlers::{into_handler, HandlerImpl}, + key_expr::KeyExpr, + macros::{build, import, option_wrapper, py_static, try_import, wrapper}, + pubsub::Subscriber, + qos::{CongestionControl, Priority, Reliability}, + sample::Sample, + session::{EntityGlobalId, Session}, + utils::{duration, generic, wait, MapInto}, ZDeserializeError, }; @@ -426,3 +438,365 @@ pub(crate) fn z_deserialize(tp: &Bound, zbytes: &ZBytes) -> PyResult, + "Undeclared publisher" +); + +#[pymethods] +impl AdvancedPublisher { + fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> PyResult<&'a Bound<'py, Self>> { + Self::check(this) + } + + #[pyo3(signature = (*_args, **_kwargs))] + fn __exit__( + &mut self, + py: Python, + _args: &Bound, + _kwargs: Option<&Bound>, + ) -> PyResult { + self.undeclare(py)?; + Ok(py.None()) + } + + #[getter] + fn key_expr(&self) -> PyResult { + Ok(self.get_ref()?.key_expr().clone().into()) + } + + #[getter] + fn encoding(&self) -> PyResult { + Ok(self.get_ref()?.encoding().clone().into()) + } + + #[getter] + fn congestion_control(&self) -> PyResult { + Ok(self.get_ref()?.congestion_control().into()) + } + + #[getter] + fn priority(&self) -> PyResult { + Ok(self.get_ref()?.priority().into()) + } + + // TODO add timestamp + #[pyo3(signature = (payload, *, encoding = None, attachment = None))] + fn put( + &self, + py: Python, + #[pyo3(from_py_with = "ZBytes::from_py")] payload: ZBytes, + #[pyo3(from_py_with = "Encoding::from_py_opt")] encoding: Option, + #[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option, + ) -> PyResult<()> { + let this = self.get_ref()?; + wait(py, build!(this.put(payload), encoding, attachment)) + } + + #[pyo3(signature = (*, attachment = None))] + fn delete( + &self, + py: Python, + #[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option, + ) -> PyResult<()> { + wait(py, build!(self.get_ref()?.delete(), attachment)) + } + + fn undeclare(&mut self, py: Python) -> PyResult<()> { + wait(py, self.take()?.undeclare()) + } +} + +option_wrapper!( + zenoh_ext::AdvancedSubscriber>, + "Undeclared subscriber" +); + +#[pymethods] +impl AdvancedSubscriber { + #[classmethod] + fn __class_getitem__(cls: &Bound, args: &Bound) -> PyObject { + generic(cls, args) + } + + fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> &'a Bound<'py, Self> { + this + } + + #[pyo3(signature = (*_args, **_kwargs))] + fn __exit__( + &mut self, + py: Python, + _args: &Bound, + _kwargs: Option<&Bound>, + ) -> PyResult { + self.undeclare(py)?; + Ok(py.None()) + } + + #[getter] + fn key_expr(&self) -> PyResult { + Ok(self.get_ref()?.key_expr().clone().into()) + } + + #[getter] + fn handler(&self, py: Python) -> PyResult { + Ok(self.get_ref()?.handler().to_object(py)) + } + + #[pyo3(signature = (handler = None))] + fn sample_miss_listener( + &self, + py: Python, + handler: Option<&Bound>, + ) -> PyResult { + let (handler, background) = into_handler(py, handler)?; + let builder = self.get_ref()?.sample_miss_listener(); + let mut listener = wait(py, builder.with(handler))?; + if background { + listener.set_background(true); + } + Ok(listener.into()) + } + + #[pyo3(signature = (handler = None, *, history = None))] + fn detect_publishers( + &self, + py: Python, + handler: Option<&Bound>, + history: Option, + ) -> PyResult { + let (handler, background) = into_handler(py, handler)?; + let builder = build!(self.get_ref()?.detect_publishers(), history); + let mut subscriber = wait(py, builder.with(handler))?; + if background { + subscriber.set_background(true); + } + Ok(subscriber.into()) + } + + fn try_recv(&self, py: Python) -> PyResult { + self.get_ref()?.handler().try_recv(py) + } + + fn recv(&self, py: Python) -> PyResult { + self.get_ref()?.handler().recv(py) + } + + fn undeclare(&mut self, py: Python) -> PyResult<()> { + wait(py, self.take()?.undeclare()) + } + + fn __iter__<'py>(&self, py: Python<'py>) -> PyResult> { + self.handler(py)?.bind(py).iter() + } +} + +wrapper!(zenoh_ext::CacheConfig: Clone); + +#[pymethods] +impl CacheConfig { + #[new] + #[pyo3(signature = (max_samples = None, *, replies_config = None))] + fn new(max_samples: Option, replies_config: Option) -> Self { + Self(build!( + zenoh_ext::CacheConfig::default(), + max_samples, + replies_config, + )) + } +} + +wrapper!(zenoh_ext::HistoryConfig: Clone); + +#[pymethods] +impl HistoryConfig { + #[new] + #[pyo3(signature = (*, detect_late_publishers = None, max_samples = None, max_age = None))] + fn new( + detect_late_publishers: Option, + max_samples: Option, + max_age: Option, + ) -> Self { + let mut config = build!(zenoh_ext::HistoryConfig::default(), max_samples, max_age); + if matches!(detect_late_publishers, Some(true)) { + config = config.detect_late_publishers(); + } + Self(config) + } +} + +wrapper!(zenoh_ext::Miss); + +#[pymethods] +impl Miss { + #[getter] + fn source(&self) -> EntityGlobalId { + self.0.source().into() + } + + #[getter] + fn nb(&self) -> u32 { + self.0.nb() + } +} + +wrapper!(zenoh_ext::MissDetectionConfig: Clone); + +#[pymethods] +impl MissDetectionConfig { + #[new] + #[pyo3(signature = (*, heartbeat = None, sporadic_heartbeat = None))] + fn new( + #[pyo3(from_py_with = "duration")] heartbeat: Option, + #[pyo3(from_py_with = "duration")] sporadic_heartbeat: Option, + ) -> Self { + Self(build!( + zenoh_ext::MissDetectionConfig::default(), + heartbeat, + sporadic_heartbeat, + )) + } +} + +wrapper!(zenoh_ext::RecoveryConfig: Clone); + +#[pymethods] +impl RecoveryConfig { + #[new] + #[pyo3(signature = (*, periodic_queries = None, heartbeat = None))] + fn new(periodic_queries: Option, heartbeat: Option) -> PyResult { + let config = zenoh_ext::RecoveryConfig::default(); + Ok(Self(match (periodic_queries, heartbeat) { + (Some(periodic_queries), None) => config.periodic_queries(periodic_queries), + (None, Some(true)) => config.heartbeat(), + _ => return Err(PyValueError::new_err("invalid parameters")), + })) + } +} + +wrapper!(zenoh_ext::RepliesConfig: Clone); + +#[pymethods] +impl RepliesConfig { + #[new] + #[pyo3(signature = (*, congestion_control = None, priority = None, express = None))] + fn new( + congestion_control: Option, + priority: Option, + express: Option, + ) -> Self { + Self(build!( + zenoh_ext::RepliesConfig::default(), + congestion_control, + priority, + express, + express, + )) + } +} + +option_wrapper!( + zenoh_ext::SampleMissListener>, + "Undeclared sample-miss listener" +); + +#[pymethods] +impl SampleMissListener { + #[classmethod] + fn __class_getitem__(cls: &Bound, args: &Bound) -> PyObject { + generic(cls, args) + } + + fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> &'a Bound<'py, Self> { + this + } + + #[pyo3(signature = (*_args, **_kwargs))] + fn __exit__( + &mut self, + py: Python, + _args: &Bound, + _kwargs: Option<&Bound>, + ) -> PyResult { + self.undeclare(py)?; + Ok(py.None()) + } + + fn try_recv(&self, py: Python) -> PyResult { + self.get_ref()?.try_recv(py) + } + + fn recv(&self, py: Python) -> PyResult { + self.get_ref()?.recv(py) + } + + fn undeclare(&mut self, py: Python) -> PyResult<()> { + wait(py, self.take()?.undeclare()) + } + + fn __iter__<'py>(&self, py: Python<'py>) -> PyResult> { + (**self.get_ref()?).to_object(py).bind(py).iter() + } +} + +#[allow(clippy::too_many_arguments)] +#[pyfunction] +#[pyo3(signature = (session, key_expr, *, encoding = None, congestion_control = None, priority = None, express = None, reliability = None, cache = None, sample_miss_detection = None, publisher_detection = None))] +pub(crate) fn declare_advanced_publisher( + py: Python, + session: &Session, + #[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr, + #[pyo3(from_py_with = "Encoding::from_py_opt")] encoding: Option, + congestion_control: Option, + priority: Option, + express: Option, + reliability: Option, + cache: Option, + sample_miss_detection: Option, + publisher_detection: Option, +) -> PyResult { + let mut builder = build!( + session.0.declare_publisher(key_expr).advanced(), + encoding, + congestion_control, + priority, + express, + reliability, + cache, + sample_miss_detection, + ); + if matches!(publisher_detection, Some(true)) { + builder = builder.publisher_detection(); + } + wait(py, builder).map_into() +} + +#[allow(clippy::too_many_arguments)] +#[pyfunction] +#[pyo3(signature = (session, key_expr, handler = None, *, history = None, recovery = None, subscriber_detection = None))] +pub(crate) fn declare_advanced_subscriber( + session: &Session, + py: Python, + #[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr, + handler: Option<&Bound>, + history: Option, + recovery: Option, + subscriber_detection: Option, +) -> PyResult { + let (handler, background) = into_handler(py, handler)?; + let mut builder = build!( + session.0.declare_subscriber(key_expr).advanced(), + history, + recovery + ); + if matches!(subscriber_detection, Some(true)) { + builder = builder.subscriber_detection(); + } + let mut subscriber = wait(py, builder.with(handler))?; + if background { + subscriber.set_background(true); + } + Ok(subscriber.into()) +} diff --git a/src/lib.rs b/src/lib.rs index 3a17f716..f568a890 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,7 +65,7 @@ pub(crate) mod zenoh { }, sample::{Sample, SampleKind}, scouting::{scout, Hello, Scout}, - session::{open, Session, SessionInfo}, + session::{open, EntityGlobalId, Session, SessionInfo}, time::Timestamp, ZError, }; @@ -80,7 +80,11 @@ pub(crate) mod zenoh { #[pymodule] mod _ext { #[pymodule_export] - use crate::ext::{z_deserialize, z_serialize}; + use crate::ext::{ + declare_advanced_publisher, declare_advanced_subscriber, z_deserialize, z_serialize, + AdvancedPublisher, AdvancedSubscriber, CacheConfig, HistoryConfig, Miss, + MissDetectionConfig, RecoveryConfig, RepliesConfig, SampleMissListener, + }; #[pymodule_export] use crate::ZDeserializeError; } diff --git a/src/liveliness.rs b/src/liveliness.rs index 2d00f349..ec45b5fe 100644 --- a/src/liveliness.rs +++ b/src/liveliness.rs @@ -11,7 +11,7 @@ use crate::{ macros::{build, option_wrapper}, pubsub::Subscriber, query::Reply, - utils::{timeout, wait, MapInto}, + utils::{duration, wait, MapInto}, }; #[pyclass] @@ -53,7 +53,7 @@ impl Liveliness { py: Python, #[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr, handler: Option<&Bound>, - #[pyo3(from_py_with = "timeout")] timeout: Option, + #[pyo3(from_py_with = "duration")] timeout: Option, ) -> PyResult> { let (handler, _) = into_handler(py, handler)?; let liveliness = self.0.liveliness(); diff --git a/src/macros.rs b/src/macros.rs index f0e8e3a9..cc94427f 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -71,7 +71,8 @@ macro_rules! downcast_or_new { if let Ok(obj) = ::extract_bound(obj) { return Ok(obj); } - Self::new(PyResult::Ok(obj)$(.and_then(<$new>::extract_bound))??.into(), $($other)?) + let this = Self::new(PyResult::Ok(obj)$(.and_then(<$new>::extract_bound))??.into(), $($other)?); + $crate::utils::IntoResult::into_result(this) } pub(crate) fn from_py_opt(obj: &Bound) -> PyResult> { if obj.is_none() { diff --git a/src/query.rs b/src/query.rs index 449a33be..342b436e 100644 --- a/src/query.rs +++ b/src/query.rs @@ -64,11 +64,11 @@ impl QueryConsolidation { const DEFAULT: Self = Self(zenoh::query::QueryConsolidation::DEFAULT); #[new] - fn new(mode: Option) -> PyResult { + fn new(mode: Option) -> Self { let Some(mode) = mode else { - return Ok(Self::DEFAULT); + return Self::DEFAULT; }; - Ok(Self(mode.into_rust().into())) + Self(mode.into_rust().into()) } fn mode(&self) -> ConsolidationMode { diff --git a/src/session.rs b/src/session.rs index 0fe930c1..df54643d 100644 --- a/src/session.rs +++ b/src/session.rs @@ -17,7 +17,7 @@ use pyo3::{ prelude::*, types::{PyDict, PyList, PyTuple}, }; -use zenoh::Wait; +use zenoh::{session::EntityId, Wait}; use crate::{ bytes::{Encoding, ZBytes}, @@ -30,7 +30,7 @@ use crate::{ qos::{CongestionControl, Priority, Reliability}, query::{Querier, QueryConsolidation, QueryTarget, Queryable, Reply, Selector}, time::Timestamp, - utils::{timeout, wait, IntoPython, MapInto}, + utils::{duration, wait, IntoPython, MapInto}, }; #[pyclass] @@ -140,7 +140,7 @@ impl Session { #[pyo3(from_py_with = "QueryConsolidation::from_py_opt")] consolidation: Option< QueryConsolidation, >, - #[pyo3(from_py_with = "timeout")] timeout: Option, + #[pyo3(from_py_with = "duration")] timeout: Option, congestion_control: Option, priority: Option, express: Option, @@ -235,7 +235,7 @@ impl Session { #[pyo3(from_py_with = "QueryConsolidation::from_py_opt")] consolidation: Option< QueryConsolidation, >, - #[pyo3(from_py_with = "timeout")] timeout: Option, + #[pyo3(from_py_with = "duration")] timeout: Option, congestion_control: Option, priority: Option, express: Option, @@ -298,3 +298,22 @@ impl SessionInfo { // TODO __repr__ } + +wrapper!(zenoh::session::EntityGlobalId); + +#[pymethods] +impl EntityGlobalId { + #[getter] + fn zid(&self) -> ZenohId { + self.0.zid().into() + } + + #[getter] + fn eid(&self) -> EntityId { + self.0.eid() + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } +} diff --git a/src/utils.rs b/src/utils.rs index 24edd7a5..9c3d239e 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -20,6 +20,22 @@ use crate::{ ZError, }; +pub(crate) trait IntoResult { + fn into_result(self) -> Result; +} + +impl IntoResult for T { + fn into_result(self) -> Result { + Ok(self) + } +} + +impl IntoResult for Result { + fn into_result(self) -> Result { + self + } +} + pub(crate) trait IntoPyErr { fn into_pyerr(self) -> PyErr; } @@ -42,7 +58,7 @@ pub(crate) trait IntoRust: 'static { fn into_rust(self) -> Self::Into; } -into_rust!(bool, Duration); +into_rust!(bool, usize, f64, Duration); pub(crate) trait IntoPython: Sized + Send + Sync + 'static { type Into: IntoPy; @@ -105,7 +121,7 @@ pub(crate) fn wait( py.allow_threads(|| resolve.wait()).into_pyres() } -pub(crate) fn timeout(obj: &Bound) -> PyResult> { +pub(crate) fn duration(obj: &Bound) -> PyResult> { if obj.is_none() { return Ok(None); } diff --git a/tests/examples_check.py b/tests/examples_check.py index 4bd6aafe..38118318 100644 --- a/tests/examples_check.py +++ b/tests/examples_check.py @@ -356,3 +356,33 @@ def test_z_sub_thr_z_pub_thr(): assert not sub_thr.errors assert not pub_thr.errors + + +def test_z_advanced_pub_z_advanced_sub(): + """Test z_advanced_pub & z_advanced_sub.""" + ## Run z_advanced_pub and z_advanced_sub + ## z_advanced_pub: Start publishing messages + pub = Pyrun("z_advanced_pub.py", ["--history=10"]) + time.sleep(5) # wait 5 seconds to ensure that we miss few messages + sub = Pyrun("z_advanced_sub.py", []) + time.sleep(5) + + if error := pub.interrupt(): + pub.dbg() + pub.errors.append(error) + if error := sub.interrupt(): + sub.dbg() + sub.errors.append(error) + + sub_out = "".join(sub.stdout) + for i in range(0, 8): + if not ( + f"Received SampleKind.PUT ('demo/example/zenoh-python-pub': '[ {i}] Pub from Python!')" + in sub_out + ): + sub.errors.append( + f"z_advanced_sub didn't catch the {i}-th z_advanced_pub message" + ) + + assert not pub.errors + assert not sub.errors diff --git a/zenoh/__init__.pyi b/zenoh/__init__.pyi index 649aa0b5..676e0c9c 100644 --- a/zenoh/__init__.pyi +++ b/zenoh/__init__.pyi @@ -12,23 +12,21 @@ # ZettaScale Zenoh Team, # -from collections.abc import Callable, Iterable +from collections.abc import Callable from datetime import datetime from enum import Enum, auto from pathlib import Path -from typing import Any, Generic, Literal, Never, Self, TypeVar, final, overload +from typing import Any, Generic, Never, Self, TypeVar, final, overload from . import handlers as handlers from .handlers import Handler as Handler _T = TypeVar("_T") _H = TypeVar("_H") -_F = TypeVar("_F", bound=Callable) _RustHandler = ( handlers.DefaultHandler[_T] | handlers.FifoChannel[_T] | handlers.RingChannel[_T] ) - _PythonCallback = Callable[[_T], Any] _PythonHandler = tuple[_PythonCallback[_T], _H] @@ -304,6 +302,19 @@ class Encoding: _IntoEncoding = Encoding | str +EntityId = int + +@_unstable +@final +class EntityGlobalId: + @property + def zid(self) -> ZenohId: + """Returns the `ZenohId`, i.e. the Zenoh session, this ID is associated to.""" + + @property + def eid(self) -> EntityId: + """Returns the `EntityId` used to identify the entity in a Zenoh session.""" + @final class Hello: @property diff --git a/zenoh/ext.pyi b/zenoh/ext.pyi index d291ebe0..0e782dc6 100644 --- a/zenoh/ext.pyi +++ b/zenoh/ext.pyi @@ -1,8 +1,36 @@ -from typing import Any, TypeVar +from collections.abc import Callable +from typing import Any, Generic, Literal, Never, Self, TypeVar, final, overload -from zenoh import ZBytes +from zenoh import ( + CongestionControl, + Encoding, + EntityGlobalId, + Handler, + KeyExpr, + Priority, + Reliability, + Sample, + Subscriber, + ZBytes, + handlers, +) +from zenoh.zenoh import Session _T = TypeVar("_T") +_H = TypeVar("_H") + +_RustHandler = ( + handlers.DefaultHandler[_T] | handlers.FifoChannel[_T] | handlers.RingChannel[_T] +) +_PythonCallback = Callable[[_T], Any] +_PythonHandler = tuple[_PythonCallback[_T], _H] + +def _unstable(item: _T) -> _T: + """marker for unstable functionality""" + +_IntoEncoding = Encoding | str +_IntoKeyExpr = KeyExpr | str +_IntoZBytes = Any class Int8(int): """int subclass enabling to (de)serialize 8bit signed integer.""" @@ -68,3 +96,280 @@ def z_deserialize(tp: type[_T], zbytes: ZBytes) -> _T: * List, Dict, Set, FrozenSet and Tuple of supported types. """ pass + +@_unstable +@final +class AdvancedPublisher: + def __enter__(self) -> Self: ... + def __exit__(self, *_args, **_kwargs): ... + @property + def key_expr(self) -> KeyExpr: ... + @property + def encoding(self) -> Encoding: ... + @property + def congestion_control(self) -> CongestionControl: ... + @property + def priority(self) -> Priority: ... + def put( + self, + payload: _IntoZBytes, + *, + encoding: _IntoEncoding | None = None, + attachment: _IntoZBytes | None = None, + ): ... + def delete(self, *, attachment: _IntoZBytes | None = None): ... + def undeclare(self): ... + +@_unstable +@final +class AdvancedSubscriber(Generic[_H]): + def __enter__(self) -> Self: ... + def __exit__(self, *_args, **_kwargs): ... + @property + def key_expr(self) -> KeyExpr: ... + @property + def handler(self) -> _H: ... + @overload + def sample_miss_listener( + self, handler: _RustHandler[Miss] | None = None + ) -> SampleMissListener[Handler[Miss]]: + """Declares a listener to detect missed samples. + + Missed samples can only be detected from `AdvancedPublisher` that enable `sample_miss_detection`. + """ + + @overload + def sample_miss_listener( + self, handler: _PythonHandler[Miss, _H] + ) -> SampleMissListener[_H]: + """Declares a listener to detect missed samples. + + Missed samples can only be detected from `AdvancedPublisher` that enable `sample_miss_detection`. + """ + + @overload + def sample_miss_listener( + self, handler: _PythonCallback[Miss] + ) -> SampleMissListener[None]: + """Declares a listener to detect missed samples. + + Missed samples can only be detected from `AdvancedPublisher` that enable `sample_miss_detection`. + """ + + @overload + def detect_publishers( + self, + handler: _RustHandler[Sample] | None = None, + *, + history: bool | None = None, + ) -> Subscriber[Handler[Sample]]: + """Declares a listener to detect matching publishers. + + Only `AdvancedPublisher` that enable `publisher_detection` can be detected. + """ + + @overload + def detect_publishers( + self, handler: _PythonHandler[Sample, _H], *, history: bool | None = None + ) -> Subscriber[_H]: + """Declares a listener to detect matching publishers. + + Only `AdvancedPublisher` that enable `publisher_detection` can be detected. + """ + + @overload + def detect_publishers( + self, handler: _PythonCallback[Sample], *, history: bool | None = None + ) -> Subscriber[None]: + """Declares a listener to detect matching publishers. + + Only `AdvancedPublisher` that enable `publisher_detection` can be detected. + """ + + def undeclare(self): ... + @overload + def try_recv(self: AdvancedSubscriber[Handler[Sample]]) -> Sample | None: ... + @overload + def try_recv(self) -> Never: ... + @overload + def recv(self: AdvancedSubscriber[Handler[Sample]]) -> Sample: ... + @overload + def recv(self) -> Never: ... + @overload + def __iter__(self: AdvancedSubscriber[Handler[Sample]]) -> Handler[Sample]: ... + @overload + def __iter__(self) -> Never: ... + +@_unstable +@final +class CacheConfig: + """ + :param max_samples: specify how many samples to keep for each resource, default to 1 + :param replies_config: the QoS to apply to replies + """ + + def __new__( + cls, + max_samples: int | None = None, + *, + replies_config: RepliesConfig | None = None, + ) -> Self: ... + +@_unstable +@final +class HistoryConfig: + """ + :param detect_late_publishers: enable detection of late joiner publishers and query for their historical data; + late joiner detection can only be achieved for `AdvancedPublisher` that enable `publisher_detection` + history can only be retransmitted by `AdvancedPublisher` that enable `cache` + :param max_samples: specify how many samples to query for each resource + :param max_age: specify the maximum age of samples to query in seconds + """ + + def __new__( + cls, + *, + detect_late_publishers: bool | None = None, + max_samples: int | None = None, + max_age: float | int | None = None, + ) -> Self: ... + +@_unstable +@final +class Miss: + @property + def source(self) -> EntityGlobalId: + """The source of missed samples.""" + + @property + def nb(self) -> int: + """The number of missed samples.""" + +@_unstable +@final +class MissDetectionConfig: + """ + :param heartbeat: period in seconds, allow last sample miss detection through periodic heartbeat; + periodically send the last published Sample's sequence number to allow last sample recovery. + `AdvancedSubscriber can only recover the last sample with the `heartbeat` option enabled. + + **This option can not be enabled simultaneously with `sporadic_heartbeat`.** + + :param sporadic_heartbeat: period in seconds, allow last sample miss detection through sporadic heartbeat; + each period, the last published Sample's sequence number is sent with `CongestionControl.Block` but only if + it has changed since the last period. + `AdvancedSubscriber can only recover the last sample with the `heartbeat` option enabled. + + **This option can not be enabled simultaneously with `heartbeat`.** + """ + + def __new__( + cls, *, heartbeat: float | int | None, sporadic_heartbeat: float | int | None + ) -> Self: ... + +@_unstable +@final +class RecoveryConfig: + """ + :param periodic_queries: enable periodic queries for not yet received Samples and specify their period; + it allows retrieving the last Sample(s) if the last Sample(s) is/are lost, + so it is useful for sporadic publications but useless for periodic publications + with a period smaller or equal to this period. + Retransmission can only be achieved by `AdvancedPublisher` that enable `cache` and `sample_miss_detection`. + + **This option can not be enabled simultaneously with `heartbeat`.** + + :param heartbeat: subscribe to heartbeats of `AdvancedPublisher`; + it allows receiving the last published Sample's sequence number and check for misses. + Heartbeat subscriber must be paired with `AdvancedPublishers` that enable `cache` and + `sample_miss_detection` with `heartbeat` or `sporadic_heartbeat`. + + **This option can not be enabled simultaneously with `periodic_queries`.** + """ + + def __new__( + cls, *, periodic_queries: float | int | None, heartbeat: Literal[True] | None + ) -> Self: ... + +@_unstable +@final +class RepliesConfig: + def __new__( + cls, + *, + congestion_control: CongestionControl | None = None, + priority: Priority | None = None, + express: bool | None = None, + ) -> Self: ... + +@_unstable +@final +class SampleMissListener(Generic[_H]): + def undeclare(self): ... + @overload + def try_recv(self: SampleMissListener[Handler[Miss]]) -> Miss | None: ... + @overload + def try_recv(self) -> Never: ... + @overload + def recv(self: SampleMissListener[Handler[Miss]]) -> Miss: ... + @overload + def recv(self) -> Never: ... + @overload + def __iter__(self: SampleMissListener[Handler[Miss]]) -> Handler[Miss]: ... + @overload + def __iter__(self) -> Never: ... + +@_unstable +def declare_advanced_publisher( + session: Session, + key_expr: _IntoKeyExpr, + *, + encoding: _IntoEncoding | None = None, + congestion_control: CongestionControl | None = None, + priority: Priority | None = None, + express: bool | None = None, + reliability: Reliability | None = None, + cache: CacheConfig | None = None, + sample_miss_detection: MissDetectionConfig | None = None, + publisher_detection: bool | None = None, +) -> AdvancedPublisher: + """Create an AdvancedPublisher for the given key expression.""" + +@_unstable +@overload +def declare_advanced_subscriber( + session: Session, + key_expr: _IntoKeyExpr, + handler: _RustHandler[Sample] | None = None, + *, + history: HistoryConfig | None = None, + recovery: RecoveryConfig | None = None, + subscriber_detection: bool | None = None, +) -> AdvancedSubscriber[Handler[Sample]]: + """Create an AdvancedSubscriber for the given key expression.""" + +@_unstable +@overload +def declare_advanced_subscriber( + session: Session, + key_expr: _IntoKeyExpr, + handler: _PythonHandler[Sample, _H], + *, + history: HistoryConfig | None = None, + recovery: RecoveryConfig | None = None, + subscriber_detection: bool | None = None, +) -> AdvancedSubscriber[_H]: + """Create an AdvancedSubscriber for the given key expression.""" + +@_unstable +@overload +def declare_advanced_subscriber( + session: Session, + key_expr: _IntoKeyExpr, + handler: _PythonCallback[Sample], + *, + history: HistoryConfig | None = None, + recovery: RecoveryConfig | None = None, + subscriber_detection: bool | None = None, +) -> AdvancedSubscriber[None]: + """Create an AdvancedSubscriber for the given key expression."""