1111 _set_pipeline_data ,
1212)
1313from sentry_sdk .tracing import Span
14+ from sentry_sdk .tracing_utils import has_span_streaming_enabled
1415from sentry_sdk .utils import capture_internal_exceptions
1516
1617from typing import TYPE_CHECKING
1718
1819if TYPE_CHECKING :
1920 from collections .abc import Callable
20- from typing import Any
21+ from typing import Any , Optional , Union
22+ from sentry_sdk .traces import StreamedSpan
2123
2224
2325def patch_redis_pipeline (
2426 pipeline_cls : "Any" ,
2527 is_cluster : bool ,
2628 get_command_args_fn : "Any" ,
27- set_db_data_fn : "Callable[[Span, Any], None]" ,
29+ set_db_data_fn : "Callable[[Union[ Span, StreamedSpan] , Any], None]" ,
2830) -> None :
2931 old_execute = pipeline_cls .execute
3032
3133 from sentry_sdk .integrations .redis import RedisIntegration
3234
3335 def sentry_patched_execute (self : "Any" , * args : "Any" , ** kwargs : "Any" ) -> "Any" :
34- if sentry_sdk .get_client ().get_integration (RedisIntegration ) is None :
36+ client = sentry_sdk .get_client ()
37+ if client .get_integration (RedisIntegration ) is None :
3538 return old_execute (self , * args , ** kwargs )
3639
37- with sentry_sdk .start_span (
38- op = OP .DB_REDIS ,
39- name = "redis.pipeline.execute" ,
40- origin = SPAN_ORIGIN ,
41- ) as span :
40+ span_streaming = has_span_streaming_enabled (client .options )
41+
42+ span : "Union[Span, StreamedSpan]"
43+ if span_streaming :
44+ span = sentry_sdk .traces .start_span (
45+ name = "redis.pipeline.execute" ,
46+ attributes = {
47+ "sentry.origin" : SPAN_ORIGIN ,
48+ "sentry.op" : OP .DB_REDIS ,
49+ },
50+ )
51+ else :
52+ span = sentry_sdk .start_span (
53+ op = OP .DB_REDIS ,
54+ name = "redis.pipeline.execute" ,
55+ origin = SPAN_ORIGIN ,
56+ )
57+
58+ with span :
4259 with capture_internal_exceptions ():
4360 command_seq = None
4461 try :
@@ -61,7 +78,9 @@ def sentry_patched_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
6178
6279
6380def patch_redis_client (
64- cls : "Any" , is_cluster : bool , set_db_data_fn : "Callable[[Span, Any], None]"
81+ cls : "Any" ,
82+ is_cluster : bool ,
83+ set_db_data_fn : "Callable[[Union[Span, StreamedSpan], Any], None]" ,
6584) -> None :
6685 """
6786 This function can be used to instrument custom redis client classes or
@@ -74,45 +93,71 @@ def patch_redis_client(
7493 def sentry_patched_execute_command (
7594 self : "Any" , name : str , * args : "Any" , ** kwargs : "Any"
7695 ) -> "Any" :
77- integration = sentry_sdk .get_client ().get_integration (RedisIntegration )
96+ client = sentry_sdk .get_client ()
97+ integration = client .get_integration (RedisIntegration )
7898 if integration is None :
7999 return old_execute_command (self , name , * args , ** kwargs )
80100
101+ span_streaming = has_span_streaming_enabled (client .options )
102+
81103 cache_properties = _compile_cache_span_properties (
82104 name ,
83105 args ,
84106 kwargs ,
85107 integration ,
86108 )
87109
88- cache_span = None
110+ cache_span : "Optional[Union[Span, StreamedSpan]]" = None
89111 if cache_properties ["is_cache_key" ] and cache_properties ["op" ] is not None :
90- cache_span = sentry_sdk .start_span (
91- op = cache_properties ["op" ],
92- name = cache_properties ["description" ],
93- origin = SPAN_ORIGIN ,
94- )
112+ if span_streaming :
113+ cache_span = sentry_sdk .traces .start_span (
114+ name = cache_properties ["description" ],
115+ attributes = {
116+ "sentry.op" : cache_properties ["op" ],
117+ "sentry.origin" : SPAN_ORIGIN ,
118+ },
119+ )
120+ else :
121+ cache_span = sentry_sdk .start_span (
122+ op = cache_properties ["op" ],
123+ name = cache_properties ["description" ],
124+ origin = SPAN_ORIGIN ,
125+ )
95126 cache_span .__enter__ ()
96127
97128 db_properties = _compile_db_span_properties (integration , name , args )
98129
99- db_span = sentry_sdk .start_span (
100- op = db_properties ["op" ],
101- name = db_properties ["description" ],
102- origin = SPAN_ORIGIN ,
103- )
130+ db_span : "Union[Span, StreamedSpan]"
131+ if span_streaming :
132+ db_span = sentry_sdk .traces .start_span (
133+ name = db_properties ["description" ],
134+ attributes = {
135+ "sentry.op" : db_properties ["op" ],
136+ "sentry.origin" : SPAN_ORIGIN ,
137+ },
138+ )
139+ else :
140+ db_span = sentry_sdk .start_span (
141+ op = db_properties ["op" ],
142+ name = db_properties ["description" ],
143+ origin = SPAN_ORIGIN ,
144+ )
104145 db_span .__enter__ ()
105146
106- set_db_data_fn (db_span , self )
107- _set_client_data (db_span , is_cluster , name , * args )
147+ with capture_internal_exceptions ():
148+ set_db_data_fn (db_span , self )
149+ _set_client_data (db_span , is_cluster , name , * args )
108150
109- value = old_execute_command (self , name , * args , ** kwargs )
151+ try :
152+ value = old_execute_command (self , name , * args , ** kwargs )
153+ finally :
154+ db_span .__exit__ (None , None , None )
110155
111- db_span .__exit__ (None , None , None )
156+ if cache_span :
157+ with capture_internal_exceptions ():
158+ _set_cache_data (cache_span , self , cache_properties , value )
112159
113- if cache_span :
114- _set_cache_data (cache_span , self , cache_properties , value )
115- cache_span .__exit__ (None , None , None )
160+ cache_span .__exit__ (None , None , None )
116161
117162 return value
118163
0 commit comments