Skip to content

Commit 9928961

Browse files
committed
Fixed opentelemetry instrumentation.
1 parent 92461c2 commit 9928961

File tree

2 files changed

+27
-21
lines changed

2 files changed

+27
-21
lines changed

python/natsrpy/instrumentation/nats_core.py

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import contextlib
2-
import sys
31
from collections.abc import AsyncGenerator, Awaitable, Callable
42
from functools import wraps
53
from types import TracebackType
@@ -62,6 +60,26 @@ async def __anext__(self) -> Any:
6260
return next_msg
6361

6462

63+
class SubscriptionCtxProxy(ObjectProxy): # type: ignore
64+
"""Proxy object for subscription context manager."""
65+
66+
def __init__(self, wrapped: Any, tracer: Tracer) -> None:
67+
super().__init__(wrapped)
68+
self._self_tracer = tracer
69+
self._self_sub = None
70+
71+
async def __aenter__(self) -> Any:
72+
sub = await self.__wrapped__.__aenter__()
73+
if isinstance(sub, IteratorSubscription):
74+
sub = IterableSubscriptionProxy(sub, self._self_tracer)
75+
self._self_sub = sub
76+
return sub
77+
78+
def __aexit__(self, *args: Any, **kwargs: dict[Any, Any]) -> Any:
79+
if self._self_sub and isinstance(self._self_sub, IterableSubscriptionProxy):
80+
self._self_sub.__cancel_ctx__(*args, **kwargs)
81+
82+
6583
class NatsCoreInstrumentator:
6684
"""Instrument core nats methods."""
6785

@@ -127,7 +145,7 @@ def _publish_decorator(
127145

128146
wrap_function_wrapper("natsrpy._natsrpy_rs", "Nats.publish", _publish_decorator)
129147

130-
def _instrument_subscriptions(self) -> None: # noqa: C901
148+
def _instrument_subscriptions(self) -> None:
131149
"""Create instrumentation for."""
132150

133151
def callback_wrapper(
@@ -170,27 +188,15 @@ def process_args(
170188
callback = callback_wrapper(callback)
171189
return (subject, callback, queue)
172190

173-
@contextlib.asynccontextmanager
174-
async def wrapper(
191+
def wrapper(
175192
wrapper: Any,
176193
_: Nats,
177194
args: tuple[Any, ...],
178195
kwargs: dict[str, Any],
179196
) -> AsyncGenerator[Any, None]:
180-
181-
async with wrapper(*process_args(*args, **kwargs)) as original_sub:
182-
if isinstance(original_sub, IteratorSubscription):
183-
ret = IterableSubscriptionProxy(original_sub, self.tracer)
184-
else:
185-
ret = original_sub
186-
try:
187-
yield ret
188-
except BaseException:
189-
if isinstance(ret, IterableSubscriptionProxy):
190-
ret.__cancel_ctx__(*sys.exc_info())
191-
raise
192-
finally:
193-
if isinstance(ret, IterableSubscriptionProxy):
194-
ret.__cancel_ctx__()
197+
return SubscriptionCtxProxy(
198+
wrapper(*process_args(*args, **kwargs)),
199+
self.tracer,
200+
)
195201

196202
wrap_function_wrapper("natsrpy._natsrpy_rs", "Nats.subscribe", wrapper)

src/subscriptions/callback.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ async fn process_message(message: async_nats::message::Message, py_callback: Arc
3636
Ok(())
3737
};
3838
if let Err(err) = task().await {
39-
log::error!("Cannot process message {message:#?}. Error: {err}");
39+
log::error!("Cannot process message {message:?}. Error: {err}");
4040
}
4141
}
4242

0 commit comments

Comments
 (0)