Skip to content

Commit 5574e57

Browse files
Update broker.py
1 parent 93619fa commit 5574e57

File tree

1 file changed

+49
-2
lines changed

1 file changed

+49
-2
lines changed

taskiq_faststream/broker.py

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313
from taskiq_faststream.types import ScheduledTask
1414
from taskiq_faststream.utils import resolve_msg
1515

16+
try:
17+
from taskiq.middlewares.otel_middleware import OpenTelemetryMiddleware
18+
except ImportError:
19+
OpenTelemetryMiddleware = None # type: ignore[assignment,misc]
20+
1621
PublishParameters: TypeAlias = typing.Any
1722

1823

@@ -30,11 +35,32 @@ class BrokerWrapper(AsyncBroker):
3035
task : Register FastStream scheduled task.
3136
"""
3237

33-
def __init__(self, broker: Any) -> None:
38+
def __init__(
39+
self,
40+
broker: Any,
41+
*,
42+
enable_otel: bool = False,
43+
) -> None:
44+
"""Initialize BrokerWrapper.
45+
46+
Args:
47+
broker: FastStream broker instance to wrap.
48+
enable_otel: Enable OpenTelemetry middleware for distributed tracing.
49+
Requires taskiq[otel] to be installed.
50+
"""
3451
super().__init__()
3552
self.formatter = PatchedFormatter()
3653
self.broker = broker
3754

55+
if enable_otel:
56+
if OpenTelemetryMiddleware is None:
57+
msg = (
58+
"OpenTelemetry middleware requires taskiq[otel] to be installed. "
59+
"Install it with: pip install taskiq-faststream[otel]"
60+
)
61+
raise ImportError(msg)
62+
self.middlewares.append(OpenTelemetryMiddleware())
63+
3864
async def startup(self) -> None:
3965
"""Startup wrapped FastStream broker."""
4066
await super().startup()
@@ -105,11 +131,32 @@ class AppWrapper(BrokerWrapper):
105131
task : Register FastStream scheduled task.
106132
"""
107133

108-
def __init__(self, app: Application) -> None:
134+
def __init__(
135+
self,
136+
app: Application,
137+
*,
138+
enable_otel: bool = False,
139+
) -> None:
140+
"""Initialize AppWrapper.
141+
142+
Args:
143+
app: FastStream application instance to wrap.
144+
enable_otel: Enable OpenTelemetry middleware for distributed tracing.
145+
Requires taskiq[otel] to be installed.
146+
"""
109147
super(BrokerWrapper, self).__init__()
110148
self.formatter = PatchedFormatter()
111149
self.app = app
112150

151+
if enable_otel:
152+
if OpenTelemetryMiddleware is None:
153+
msg = (
154+
"OpenTelemetry middleware requires taskiq[otel] to be installed. "
155+
"Install it with: pip install taskiq-faststream[otel]"
156+
)
157+
raise ImportError(msg)
158+
self.middlewares.append(OpenTelemetryMiddleware())
159+
113160
async def startup(self) -> None:
114161
"""Startup wrapped FastStream."""
115162
await super(BrokerWrapper, self).startup()

0 commit comments

Comments
 (0)