Skip to content

Commit 4381e27

Browse files
committed
refactor: remove OTEL explicit integration
1 parent 28fa441 commit 4381e27

File tree

2 files changed

+13
-32
lines changed

2 files changed

+13
-32
lines changed

README.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,16 +150,17 @@ taskiq_broker.task(
150150

151151
## OpenTelemetry Support
152152

153-
**taskiq-faststream** supports distributed tracing with OpenTelemetry. To enable it, install the `otel` extra and pass `enable_otel=True` when creating the broker wrapper:
153+
**taskiq-faststream** supports taskiq's OpenTelemetry middleware. To enable it, pass `OpenTelemetryMiddleware` when creating the broker wrapper:
154154

155155
```python
156156
from faststream.nats import NatsBroker
157157
from taskiq_faststream import BrokerWrapper
158+
from taskiq.middlewares.otel_middleware import OpenTelemetryMiddleware
158159

159160
broker = NatsBroker()
160161

161162
# Enable OpenTelemetry middleware
162-
taskiq_broker = BrokerWrapper(broker, enable_otel=True)
163+
taskiq_broker = BrokerWrapper(broker, middlewares=[OpenTelemetryMiddleware()])
163164
```
164165

165166
This will automatically add OpenTelemetry middleware to track task execution, providing insights into:
@@ -175,9 +176,10 @@ The same applies to `AppWrapper`:
175176
```python
176177
from faststream import FastStream
177178
from taskiq_faststream import AppWrapper
179+
from taskiq.middlewares.otel_middleware import OpenTelemetryMiddleware
178180

179181
app = FastStream(broker)
180182

181183
# Enable OpenTelemetry middleware
182-
taskiq_broker = AppWrapper(app, enable_otel=True)
184+
taskiq_broker = AppWrapper(app, middlewares=[OpenTelemetryMiddleware()])
183185
```

taskiq_faststream/broker.py

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,20 @@
11
import typing
22
import warnings
3+
from collections.abc import Iterable
34
from typing import Any, TypeAlias
45

56
import anyio
67
from faststream._internal.application import Application
78
from faststream.types import SendableMessage
89
from taskiq import AsyncBroker
10+
from taskiq.abc.middleware import TaskiqMiddleware
911
from taskiq.acks import AckableMessage
1012
from taskiq.decor import AsyncTaskiqDecoratedTask
1113

1214
from taskiq_faststream.formatter import PatchedFormatter, PatchedMessage
1315
from taskiq_faststream.types import ScheduledTask
1416
from taskiq_faststream.utils import resolve_msg
1517

16-
try:
17-
from taskiq.middlewares.otel_middleware import OpenTelemetryMiddleware
18-
except ImportError:
19-
OpenTelemetryMiddleware = None # type: ignore[assignment,misc]
20-
2118
PublishParameters: TypeAlias = typing.Any
2219

2320

@@ -39,27 +36,18 @@ def __init__(
3936
self,
4037
broker: Any,
4138
*,
42-
enable_otel: bool = False,
39+
middlewares: Iterable[TaskiqMiddleware] = (),
4340
) -> None:
4441
"""Initialize BrokerWrapper.
4542
4643
Args:
4744
broker: FastStream broker instance to wrap.
48-
enable_otel: Enable OpenTelemetry middleware for distributed tracing.
49-
Requires taskiq[otel] to be installed.
45+
middlewares: Middlewares to add to the broker.
5046
"""
5147
super().__init__()
5248
self.formatter = PatchedFormatter()
5349
self.broker = broker
54-
55-
if enable_otel:
56-
if OpenTelemetryMiddleware is None:
57-
msg = (
58-
"OpenTelemetry middleware requires taskiq[otel] to be installed. "
59-
"Install it with: pip install taskiq-faststream[otel]"
60-
)
61-
raise ImportError(msg)
62-
self.middlewares.append(OpenTelemetryMiddleware())
50+
self.add_middlewares(*middlewares)
6351

6452
async def startup(self) -> None:
6553
"""Startup wrapped FastStream broker."""
@@ -135,27 +123,18 @@ def __init__(
135123
self,
136124
app: Application,
137125
*,
138-
enable_otel: bool = False,
126+
middlewares: Iterable[TaskiqMiddleware] = (),
139127
) -> None:
140128
"""Initialize AppWrapper.
141129
142130
Args:
143131
app: FastStream application instance to wrap.
144-
enable_otel: Enable OpenTelemetry middleware for distributed tracing.
145-
Requires taskiq[otel] to be installed.
132+
middlewares: Middlewares to add to the broker.
146133
"""
147134
super(BrokerWrapper, self).__init__()
148135
self.formatter = PatchedFormatter()
149136
self.app = app
150-
151-
if enable_otel:
152-
if OpenTelemetryMiddleware is None:
153-
msg = (
154-
"OpenTelemetry middleware requires taskiq[otel] to be installed. "
155-
"Install it with: pip install taskiq-faststream[otel]"
156-
)
157-
raise ImportError(msg)
158-
self.middlewares.append(OpenTelemetryMiddleware())
137+
self.add_middlewares(*middlewares)
159138

160139
async def startup(self) -> None:
161140
"""Startup wrapped FastStream."""

0 commit comments

Comments
 (0)