1111 _set_pipeline_data ,
1212)
1313from sentry_sdk .tracing import Span
14+ from sentry_sdk .tracing_utils import has_span_streaming_enabled
1415from sentry_sdk .utils import capture_internal_exceptions
1516
1617from typing import TYPE_CHECKING
1718
1819if TYPE_CHECKING :
1920 from collections .abc import Callable
20- from typing import Any , Union
21+ from typing import Any , Optional , Union
22+ from sentry_sdk .traces import StreamedSpan
2123 from redis .asyncio .client import Pipeline , StrictRedis
2224 from redis .asyncio .cluster import ClusterPipeline , RedisCluster
2325
@@ -26,21 +28,36 @@ def patch_redis_async_pipeline(
2628 pipeline_cls : "Union[type[Pipeline[Any]], type[ClusterPipeline[Any]]]" ,
2729 is_cluster : bool ,
2830 get_command_args_fn : "Any" ,
29- set_db_data_fn : "Callable[[Span, Any], None]" ,
31+ set_db_data_fn : "Callable[[Union[ Span, StreamedSpan] , Any], None]" ,
3032) -> None :
3133 old_execute = pipeline_cls .execute
3234
3335 from sentry_sdk .integrations .redis import RedisIntegration
3436
3537 async def _sentry_execute (self : "Any" , * args : "Any" , ** kwargs : "Any" ) -> "Any" :
36- if sentry_sdk .get_client ().get_integration (RedisIntegration ) is None :
38+ client = sentry_sdk .get_client ()
39+ if client .get_integration (RedisIntegration ) is None :
3740 return await old_execute (self , * args , ** kwargs )
3841
39- with sentry_sdk .start_span (
40- op = OP .DB_REDIS ,
41- name = "redis.pipeline.execute" ,
42- origin = SPAN_ORIGIN ,
43- ) as span :
42+ span_streaming = has_span_streaming_enabled (client .options )
43+
44+ span : "Union[Span, StreamedSpan]"
45+ if span_streaming :
46+ span = sentry_sdk .traces .start_span (
47+ name = "redis.pipeline.execute" ,
48+ attributes = {
49+ "sentry.origin" : SPAN_ORIGIN ,
50+ "sentry.op" : OP .DB_REDIS ,
51+ },
52+ )
53+ else :
54+ span = sentry_sdk .start_span (
55+ op = OP .DB_REDIS ,
56+ name = "redis.pipeline.execute" ,
57+ origin = SPAN_ORIGIN ,
58+ )
59+
60+ with span :
4461 with capture_internal_exceptions ():
4562 try :
4663 command_seq = self ._execution_strategy ._command_queue
@@ -67,7 +84,7 @@ async def _sentry_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
6784def patch_redis_async_client (
6885 cls : "Union[type[StrictRedis[Any]], type[RedisCluster[Any]]]" ,
6986 is_cluster : bool ,
70- set_db_data_fn : "Callable[[Span, Any], None]" ,
87+ set_db_data_fn : "Callable[[Union[ Span, StreamedSpan] , Any], None]" ,
7188) -> None :
7289 old_execute_command = cls .execute_command
7390
@@ -76,33 +93,55 @@ def patch_redis_async_client(
7693 async def _sentry_execute_command (
7794 self : "Any" , name : str , * args : "Any" , ** kwargs : "Any"
7895 ) -> "Any" :
79- integration = sentry_sdk .get_client ().get_integration (RedisIntegration )
96+ client = sentry_sdk .get_client ()
97+ integration = client .get_integration (RedisIntegration )
8098 if integration is None :
8199 return await old_execute_command (self , name , * args , ** kwargs )
82100
101+ span_streaming = has_span_streaming_enabled (client .options )
102+
83103 cache_properties = _compile_cache_span_properties (
84104 name ,
85105 args ,
86106 kwargs ,
87107 integration ,
88108 )
89109
90- cache_span = None
110+ cache_span : "Optional[Union[Span, StreamedSpan]]" = None
91111 if cache_properties ["is_cache_key" ] and cache_properties ["op" ] is not None :
92- cache_span = sentry_sdk .start_span (
93- op = cache_properties ["op" ],
94- name = cache_properties ["description" ],
95- origin = SPAN_ORIGIN ,
96- )
112+ if span_streaming :
113+ cache_span = sentry_sdk .traces .start_span (
114+ name = cache_properties ["description" ],
115+ attributes = {
116+ "sentry.op" : cache_properties ["op" ],
117+ "sentry.origin" : SPAN_ORIGIN ,
118+ },
119+ )
120+ else :
121+ cache_span = sentry_sdk .start_span (
122+ op = cache_properties ["op" ],
123+ name = cache_properties ["description" ],
124+ origin = SPAN_ORIGIN ,
125+ )
97126 cache_span .__enter__ ()
98127
99128 db_properties = _compile_db_span_properties (integration , name , args )
100129
101- db_span = sentry_sdk .start_span (
102- op = db_properties ["op" ],
103- name = db_properties ["description" ],
104- origin = SPAN_ORIGIN ,
105- )
130+ db_span : "Union[Span, StreamedSpan]"
131+ if span_streaming :
132+ db_span = sentry_sdk .traces .start_span (
133+ name = db_properties ["description" ],
134+ attributes = {
135+ "sentry.op" : db_properties ["op" ],
136+ "sentry.origin" : SPAN_ORIGIN ,
137+ },
138+ )
139+ else :
140+ db_span = sentry_sdk .start_span (
141+ op = db_properties ["op" ],
142+ name = db_properties ["description" ],
143+ origin = SPAN_ORIGIN ,
144+ )
106145 db_span .__enter__ ()
107146
108147 set_db_data_fn (db_span , self )
0 commit comments