Skip to content

Commit 1a8f607

Browse files
committed
Added otlp instrumentation.
1 parent 16ee1a7 commit 1a8f607

33 files changed

+596
-216
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/target
2+
delete-me-*
23

34
# Byte-compiled / optimized / DLL files
45
__pycache__/

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ repos:
1818
name: python mypy
1919
always_run: true
2020
pass_filenames: false
21-
args: ["python"]
21+
args: ["python", "examples"]
2222
- repo: https://github.com/astral-sh/ruff-pre-commit
2323
rev: v0.15.7
2424
hooks:

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ async-nats = "0.46"
1818
bytes = "1.11.1"
1919
futures-util = "0.3.32"
2020
log = "0.4.29"
21-
pyo3 = { version = "0.28", features = ["experimental-inspect"] }
21+
pyo3 = { version = "0.28", features = ["abi3", "experimental-inspect"] }
2222
pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] }
2323
pyo3-log = "0.13.3"
2424
serde = { version = "1.0.228", features = ["derive"] }

examples/consumers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@ async def main() -> None:
4444
# We use messages() to get async iterator which we
4545
# use to get messages for push_consumer.
4646
async for push_message in await push_consumer.messages():
47-
print(f"[FROM_PUSH] {push_message.payload}") # noqa: T201
47+
print(f"[FROM_PUSH] {push_message.payload!r}") # noqa: T201
4848
await push_message.ack()
4949
break
5050

5151
# Pull consumers have to request batches of messages.
5252
for pull_message in await pull_consumer.fetch(max_messages=10):
53-
print(f"[FROM_PULL] {pull_message.payload}") # noqa: T201
53+
print(f"[FROM_PULL] {pull_message.payload!r}") # noqa: T201
5454
await pull_message.ack()
5555

5656
# Cleanup

examples/kv.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ async def main() -> None:
3030

3131
await kv.delete("test-key")
3232

33-
# Alternatively you can
34-
# use await watcher.next()
3533
async for event in watcher:
3634
print("[EVENT]", event) # noqa: T201
3735
break

examples/request_reply.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ async def main() -> None:
1212
# Here we create responder, that will be
1313
# answering to our requests.
1414
async def responder(message: Message) -> None:
15-
print(f"[REQUEST]: {message.payload}, headers={message.headers}") # noqa: T201
15+
print(f"[REQUEST]: {message.payload!r}, headers={message.headers}") # noqa: T201
1616
if message.reply:
1717
await nats.publish(
1818
message.reply,
19-
f"reply to {message.payload}",
19+
f"reply to {message.payload!r}",
2020
headers=message.headers,
2121
)
2222

examples/subscriptions.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ async def main() -> None:
1111
cb_lock = asyncio.Event()
1212

1313
async def callback(message: Message) -> None:
14-
print(f"[FROM_CALLBACK] {message.payload}") # noqa: T201
14+
print(f"[FROM_CALLBACK] {message.payload!r}") # noqa: T201
1515
cb_lock.set()
1616

1717
# When subscribing you can set callback.
@@ -21,8 +21,6 @@ async def callback(message: Message) -> None:
2121

2222
# When callback is not set, you get a subscription
2323
# that should be used along with `async for`
24-
# loop, or alternatively you can call
25-
# `await iter_sub.next()` to get a single message.
2624
iter_sub = await nats.subscribe("iter-subj")
2725

2826
# Subscriptions with queue argument create
@@ -40,10 +38,10 @@ async def callback(message: Message) -> None:
4038
await queue_sub.unsubscribe(limit=1)
4139

4240
async for message in iter_sub:
43-
print(f"[FROM_ITERATOR] {message.payload}") # noqa: T201
41+
print(f"[FROM_ITERATOR] {message.payload!r}") # noqa: T201
4442

4543
async for message in queue_sub:
46-
print(f"[FROM_QUEUED] {message.payload}") # noqa: T201
44+
print(f"[FROM_QUEUED] {message.payload!r}") # noqa: T201
4745

4846
# Making sure that the message in callback is received.
4947
await cb_lock.wait()

pyproject.toml

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@ dynamic = ["version"]
2323
name = "Pavel Kirilin"
2424
email = "s3riussan@gmail.com"
2525

26+
[project.entry-points.opentelemetry_instrumentor]
27+
natsrpy = "natsrpy.instrumentation:NatsrpyInstrumentor"
28+
29+
[project.optional-dependencies]
30+
opentelemetry = [
31+
"opentelemetry-api (>=1.38.0,<2.0.0)",
32+
"opentelemetry-instrumentation (>=0.59b0,<1)",
33+
"opentelemetry-semantic-conventions (>=0.59b0,<1)",
34+
]
35+
2636
[dependency-groups]
2737
dev = [
2838
"anyio>=4,<5",
@@ -38,9 +48,6 @@ dev = [
3848
requires = ["maturin>=1.12,<2.0"]
3949
build-backend = "maturin"
4050

41-
[tool.uv]
42-
package = false
43-
4451
[tool.maturin]
4552
bindings = "pyo3"
4653
features = ["pyo3/extension-module"]
@@ -55,6 +62,7 @@ packages = ["natsrpy"]
5562
pretty = true
5663
implicit_reexport = true
5764
allow_untyped_decorators = true
65+
namespace_packages = true
5866
warn_return_any = false
5967

6068
[tool.pytest]
@@ -128,3 +136,6 @@ ignore-decorators = ["typing.overload"]
128136

129137
[tool.ruff.lint.pylint]
130138
allow-magic-value-types = ["int", "str", "float"]
139+
140+
[tool.uv]
141+
package = false

python/natsrpy/_natsrpy_rs/__init__.pyi

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,27 +33,12 @@ class Message:
3333
description: str | None
3434
length: int
3535

36+
def __len__(self) -> int: ...
37+
3638
@final
3739
class IteratorSubscription:
38-
"""Async iterator subscription for receiving NATS messages.
39-
40-
Returned by :meth:`Nats.subscribe` when no callback is provided.
41-
Messages can be received using ``async for`` or by calling :meth:`next`
42-
directly.
43-
"""
44-
45-
def __aiter__(self) -> IteratorSubscription: ...
40+
def __aiter__(self) -> Self: ...
4641
def __anext__(self) -> Future[Message]: ...
47-
def next(self, timeout: float | timedelta | None = None) -> Future[Message]:
48-
"""Receive the next message from the subscription.
49-
50-
:param timeout: maximum time to wait for a message in seconds
51-
or as a timedelta, defaults to None (wait indefinitely).
52-
:return: the next message.
53-
:raises StopAsyncIteration: when the subscription is drained or
54-
unsubscribed.
55-
"""
56-
5742
def unsubscribe(self, limit: int | None = None) -> Future[None]:
5843
"""Unsubscribe from the subject.
5944
@@ -96,6 +81,22 @@ class Nats:
9681
access over a connection to one or more NATS servers.
9782
"""
9883

84+
@property
85+
def addr(self) -> list[str]: ...
86+
@property
87+
def user_and_pass(self) -> tuple[str, str]: ...
88+
@property
89+
def nkey(self) -> str | None: ...
90+
@property
91+
def token(self) -> str | None: ...
92+
@property
93+
def custom_inbox_prefix(self) -> str | None: ...
94+
@property
95+
def read_buffer_capacity(self) -> int: ...
96+
@property
97+
def sender_capacity(self) -> int: ...
98+
@property
99+
def max_reconnects(self) -> int | None: ...
99100
def __new__(
100101
cls,
101102
/,

python/natsrpy/_natsrpy_rs/js/__init__.pyi

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,10 @@ class JetStreamMessage:
159159
def token(self) -> str | None:
160160
"""Authentication token, if applicable."""
161161

162+
@property
163+
def length(self) -> int:
164+
"""Message's payload length."""
165+
162166
def ack(self, double: bool = False) -> Future[None]:
163167
"""Acknowledge that a message was handled.
164168
@@ -208,3 +212,5 @@ class JetStreamMessage:
208212
209213
:param double: whether to wait for server response, defaults to False.
210214
"""
215+
216+
def __len__(self) -> int: ...

0 commit comments

Comments
 (0)