Skip to content

Commit c64e469

Browse files
implement advanced pub/sub (#537)
* implement advanced pub/sub * add examples * Merge branch 'main' into advanced-pubsub * fix examples and add test * format fix * format fix * fix docs for __new__ --------- Co-authored-by: Denis Biryukov <denis.biryukov@zettascale.tech> Co-authored-by: DenisBiryukov91 <155981813+DenisBiryukov91@users.noreply.github.com>
1 parent 567f82d commit c64e469

15 files changed

Lines changed: 962 additions & 41 deletions

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ crate-type = ["cdylib"]
3535

3636
[features]
3737
default = ["zenoh/default", "zenoh-ext"]
38+
zenoh-ext = ["dep:zenoh-ext", "zenoh-ext/unstable", "zenoh-ext/internal"]
3839

3940
[badges]
4041
maintenance = { status = "actively-developed" }

docs/stubs_to_sources.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
PACKAGE = (Path(__file__) / "../../zenoh").resolve()
3030
__INIT__ = PACKAGE / "__init__.py"
31+
EXT = PACKAGE / "ext.py"
3132

3233

3334
def _unstable(item):
@@ -98,23 +99,26 @@ def visit_FunctionDef(self, node: ast.FunctionDef):
9899

99100

100101
def main():
101-
# remove __init__.pyi
102-
__INIT__.unlink()
102+
fnames = [__INIT__, EXT]
103+
for fname in fnames:
104+
# remove *.py
105+
fname.unlink()
103106
# rename stubs
104107
for entry in PACKAGE.glob("*.pyi"):
105108
entry.rename(PACKAGE / f"{entry.stem}.py")
106-
# read stub code
107-
with open(__INIT__) as f:
108-
stub: ast.Module = ast.parse(f.read())
109-
# replace _unstable
110-
for i, stmt in enumerate(stub.body):
111-
if isinstance(stmt, ast.FunctionDef) and stmt.name == "_unstable":
112-
stub.body[i] = ast.parse(inspect.getsource(_unstable))
113-
# remove overload
114-
stub = RemoveOverload().visit(stub)
115-
# write modified code
116-
with open(__INIT__, "w") as f:
117-
f.write(ast.unparse(stub))
109+
for fname in fnames:
110+
# read stub code
111+
with open(fname) as f:
112+
stub: ast.Module = ast.parse(f.read())
113+
# replace _unstable
114+
for i, stmt in enumerate(stub.body):
115+
if isinstance(stmt, ast.FunctionDef) and stmt.name == "_unstable":
116+
stub.body[i] = ast.parse(inspect.getsource(_unstable))
117+
# remove overload
118+
stub = RemoveOverload().visit(stub)
119+
# write modified code
120+
with open(fname, "w") as f:
121+
f.write(ast.unparse(stub))
118122

119123

120124
if __name__ == "__main__":

examples/z_advanced_pub.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#
2+
# Copyright (c) 2022 ZettaScale Technology
3+
#
4+
# This program and the accompanying materials are made available under the
5+
# terms of the Eclipse Public License 2.0 which is available at
6+
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
#
9+
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
#
11+
# Contributors:
12+
# ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13+
#
14+
import time
15+
from typing import Optional
16+
17+
import zenoh
18+
from zenoh.ext import CacheConfig, MissDetectionConfig, declare_advanced_publisher
19+
20+
21+
def main(conf: zenoh.Config, key: str, payload: str, history: int):
22+
# initiate logging
23+
zenoh.init_log_from_env_or("error")
24+
25+
print("Opening session...")
26+
with zenoh.open(conf) as session:
27+
print(f"Declaring AdvancedPublisher on '{key}'...")
28+
pub = declare_advanced_publisher(
29+
session,
30+
key,
31+
cache=CacheConfig(max_samples=history),
32+
sample_miss_detection=MissDetectionConfig(heartbeat=5),
33+
publisher_detection=True,
34+
)
35+
36+
print("Press CTRL-C to quit...")
37+
for idx in itertools.count():
38+
time.sleep(1)
39+
buf = f"[{idx:4d}] {payload}"
40+
print(f"Putting Data ('{key}': '{buf}')...")
41+
pub.put(buf)
42+
43+
44+
# --- Command line argument parsing --- --- --- --- --- ---
45+
if __name__ == "__main__":
46+
import argparse
47+
import itertools
48+
49+
import common
50+
51+
parser = argparse.ArgumentParser(
52+
prog="z_advanced_pub", description="zenoh advanced pub example"
53+
)
54+
common.add_config_arguments(parser)
55+
parser.add_argument(
56+
"--key",
57+
"-k",
58+
dest="key",
59+
default="demo/example/zenoh-python-pub",
60+
type=str,
61+
help="The key expression to publish onto.",
62+
)
63+
parser.add_argument(
64+
"--payload",
65+
"-p",
66+
dest="payload",
67+
default="Pub from Python!",
68+
type=str,
69+
help="The payload to publish.",
70+
)
71+
parser.add_argument(
72+
"--history",
73+
dest="history",
74+
type=int,
75+
default=1,
76+
help="The number of publications to keep in cache",
77+
)
78+
79+
args = parser.parse_args()
80+
conf = common.get_config_from_args(args)
81+
82+
main(conf, args.key, args.payload, args.history)

examples/z_advanced_sub.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#
2+
# Copyright (c) 2022 ZettaScale Technology
3+
#
4+
# This program and the accompanying materials are made available under the
5+
# terms of the Eclipse Public License 2.0 which is available at
6+
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
#
9+
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
#
11+
# Contributors:
12+
# ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13+
#
14+
import time
15+
16+
import zenoh
17+
from zenoh.ext import HistoryConfig, Miss, RecoveryConfig, declare_advanced_subscriber
18+
19+
20+
def main(conf: zenoh.Config, key: str):
21+
# initiate logging
22+
zenoh.init_log_from_env_or("error")
23+
24+
print("Opening session...")
25+
with zenoh.open(conf) as session:
26+
print(f"Declaring Subscriber on '{key}'...")
27+
28+
def listener(sample: zenoh.Sample):
29+
print(
30+
f">> [Subscriber] Received {sample.kind} ('{sample.key_expr}': '{sample.payload.to_string()}')"
31+
)
32+
33+
advanced_sub = declare_advanced_subscriber(
34+
session,
35+
key,
36+
listener,
37+
history=HistoryConfig(detect_late_publishers=True),
38+
recovery=RecoveryConfig(heartbeat=True),
39+
subscriber_detection=True,
40+
)
41+
42+
def miss_listener(miss: Miss):
43+
print(f">> [Subscriber] Missed {miss.nb} samples from {miss.source} !!!")
44+
45+
advanced_sub.sample_miss_listener(miss_listener)
46+
47+
print("Press CTRL-C to quit...")
48+
while True:
49+
time.sleep(1)
50+
51+
52+
# --- Command line argument parsing --- --- --- --- --- ---
53+
if __name__ == "__main__":
54+
import argparse
55+
56+
import common
57+
58+
parser = argparse.ArgumentParser(
59+
prog="z_advanced_sub", description="zenoh advanced sub example"
60+
)
61+
common.add_config_arguments(parser)
62+
parser.add_argument(
63+
"--key",
64+
"-k",
65+
dest="key",
66+
default="demo/example/**",
67+
type=str,
68+
help="The key expression to subscribe to.",
69+
)
70+
71+
args = parser.parse_args()
72+
conf = common.get_config_from_args(args)
73+
74+
main(conf, args.key)

src/bytes.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ downcast_or_new!(Encoding => Option<String>);
9797
#[pymethods]
9898
impl Encoding {
9999
#[new]
100-
fn new(s: Option<String>) -> PyResult<Self> {
101-
Ok(s.map_into().map(Self).unwrap_or_default())
100+
fn new(s: Option<String>) -> Self {
101+
s.map_into().map(Self).unwrap_or_default()
102102
}
103103

104104
fn with_schema(&self, schema: String) -> Self {

0 commit comments

Comments
 (0)