Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 60 additions & 21 deletions sentry_sdk/integrations/redis/_async_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
_set_pipeline_data,
)
from sentry_sdk.tracing import Span
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import capture_internal_exceptions

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, Union
from typing import Any, Optional, Union
from sentry_sdk.traces import StreamedSpan
from redis.asyncio.client import Pipeline, StrictRedis
from redis.asyncio.cluster import ClusterPipeline, RedisCluster

Expand All @@ -26,21 +28,36 @@
pipeline_cls: "Union[type[Pipeline[Any]], type[ClusterPipeline[Any]]]",
is_cluster: bool,
get_command_args_fn: "Any",
set_db_data_fn: "Callable[[Span, Any], None]",
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
old_execute = pipeline_cls.execute

from sentry_sdk.integrations.redis import RedisIntegration

async def _sentry_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
if sentry_sdk.get_client().get_integration(RedisIntegration) is None:
client = sentry_sdk.get_client()
if client.get_integration(RedisIntegration) is None:
return await old_execute(self, *args, **kwargs)

with sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
) as span:
span_streaming = has_span_streaming_enabled(client.options)

span: "Union[Span, StreamedSpan]"
if span_streaming:
span = sentry_sdk.traces.start_span(
name="redis.pipeline.execute",
attributes={
"sentry.origin": SPAN_ORIGIN,
"sentry.op": OP.DB_REDIS,
},
)
else:
span = sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
)

with span:
with capture_internal_exceptions():
try:
command_seq = self._execution_strategy._command_queue
Expand All @@ -67,7 +84,7 @@
def patch_redis_async_client(
cls: "Union[type[StrictRedis[Any]], type[RedisCluster[Any]]]",
is_cluster: bool,
set_db_data_fn: "Callable[[Span, Any], None]",
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
old_execute_command = cls.execute_command

Expand All @@ -76,37 +93,59 @@
async def _sentry_execute_command(
self: "Any", name: str, *args: "Any", **kwargs: "Any"
) -> "Any":
integration = sentry_sdk.get_client().get_integration(RedisIntegration)
client = sentry_sdk.get_client()
integration = client.get_integration(RedisIntegration)
if integration is None:
return await old_execute_command(self, name, *args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

cache_properties = _compile_cache_span_properties(
name,
args,
kwargs,
integration,
)

cache_span = None
cache_span: "Optional[Union[Span, StreamedSpan]]" = None
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
if span_streaming:
cache_span = sentry_sdk.traces.start_span(
name=cache_properties["description"],
attributes={
"sentry.op": cache_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
cache_span.__enter__()

db_properties = _compile_db_span_properties(integration, name, args)

db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span: "Union[Span, StreamedSpan]"
if span_streaming:
db_span = sentry_sdk.traces.start_span(
name=db_properties["description"],
attributes={
"sentry.op": db_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span.__enter__()

set_db_data_fn(db_span, self)
Comment thread
sentry-warden[bot] marked this conversation as resolved.
_set_client_data(db_span, is_cluster, name, *args)

Check failure on line 148 in sentry_sdk/integrations/redis/_async_common.py

View check run for this annotation

@sentry/warden / warden: find-bugs

[6LE-2VT] Spans leak and miss error status when Redis commands raise exceptions (additional location)

The `sentry_patched_execute_command` function manually calls `__enter__()` and `__exit__(None, None, None)` on spans without try/finally protection. When `old_execute_command()` raises an exception (e.g., connection error, timeout), the spans are never closed because `__exit__()` is skipped. Additionally, even on normal exit, `__exit__` is called with `(None, None, None)` instead of exception info, preventing the span's error status from being set. This results in resource leaks (unclosed spans) and missing error tracking when Redis operations fail.

value = await old_execute_command(self, name, *args, **kwargs)

Expand Down
93 changes: 68 additions & 25 deletions sentry_sdk/integrations/redis/_sync_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,51 @@
_set_pipeline_data,
)
from sentry_sdk.tracing import Span
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import capture_internal_exceptions

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any
from typing import Any, Optional, Union
from sentry_sdk.traces import StreamedSpan


def patch_redis_pipeline(
pipeline_cls: "Any",
is_cluster: bool,
get_command_args_fn: "Any",
set_db_data_fn: "Callable[[Span, Any], None]",
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
old_execute = pipeline_cls.execute

from sentry_sdk.integrations.redis import RedisIntegration

def sentry_patched_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
if sentry_sdk.get_client().get_integration(RedisIntegration) is None:
client = sentry_sdk.get_client()
if client.get_integration(RedisIntegration) is None:
return old_execute(self, *args, **kwargs)

with sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
) as span:
span_streaming = has_span_streaming_enabled(client.options)

span: "Union[Span, StreamedSpan]"
if span_streaming:
span = sentry_sdk.traces.start_span(
name="redis.pipeline.execute",
attributes={
"sentry.origin": SPAN_ORIGIN,
"sentry.op": OP.DB_REDIS,
},
)
else:
span = sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
)
Comment thread
sentrivana marked this conversation as resolved.

with span:
with capture_internal_exceptions():
command_seq = None
try:
Expand All @@ -61,7 +78,9 @@


def patch_redis_client(
cls: "Any", is_cluster: bool, set_db_data_fn: "Callable[[Span, Any], None]"
cls: "Any",
is_cluster: bool,
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
"""
This function can be used to instrument custom redis client classes or
Expand All @@ -74,45 +93,69 @@
def sentry_patched_execute_command(
self: "Any", name: str, *args: "Any", **kwargs: "Any"
) -> "Any":
integration = sentry_sdk.get_client().get_integration(RedisIntegration)
client = sentry_sdk.get_client()
integration = client.get_integration(RedisIntegration)
if integration is None:
return old_execute_command(self, name, *args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

cache_properties = _compile_cache_span_properties(
name,
args,
kwargs,
integration,
)

cache_span = None
cache_span: "Optional[Union[Span, StreamedSpan]]" = None
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
if span_streaming:
cache_span = sentry_sdk.traces.start_span(
name=cache_properties["description"],
attributes={
"sentry.op": cache_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
cache_span.__enter__()

db_properties = _compile_db_span_properties(integration, name, args)

db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span: "Union[Span, StreamedSpan]"
if span_streaming:
db_span = sentry_sdk.traces.start_span(
name=db_properties["description"],
attributes={
"sentry.op": db_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span.__enter__()

set_db_data_fn(db_span, self)
_set_client_data(db_span, is_cluster, name, *args)
with capture_internal_exceptions():
set_db_data_fn(db_span, self)
_set_client_data(db_span, is_cluster, name, *args)

value = old_execute_command(self, name, *args, **kwargs)

db_span.__exit__(None, None, None)

if cache_span:
_set_cache_data(cache_span, self, cache_properties, value)
with capture_internal_exceptions():
_set_cache_data(cache_span, self, cache_properties, value)

cache_span.__exit__(None, None, None)

Check failure on line 158 in sentry_sdk/integrations/redis/_sync_common.py

View check run for this annotation

@sentry/warden / warden: find-bugs

Spans leak and miss error status when Redis commands raise exceptions

The `sentry_patched_execute_command` function manually calls `__enter__()` and `__exit__(None, None, None)` on spans without try/finally protection. When `old_execute_command()` raises an exception (e.g., connection error, timeout), the spans are never closed because `__exit__()` is skipped. Additionally, even on normal exit, `__exit__` is called with `(None, None, None)` instead of exception info, preventing the span's error status from being set. This results in resource leaks (unclosed spans) and missing error tracking when Redis operations fail.

return value

Expand Down
24 changes: 15 additions & 9 deletions sentry_sdk/integrations/redis/modules/caches.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations.redis.utils import _get_safe_key, _key_as_string
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.utils import capture_internal_exceptions

GET_COMMANDS = ("get", "mget")
Expand All @@ -14,7 +15,7 @@
if TYPE_CHECKING:
from sentry_sdk.integrations.redis import RedisIntegration
from sentry_sdk.tracing import Span
from typing import Any, Optional
from typing import Any, Optional, Union


def _get_op(name: str) -> "Optional[str]":
Expand Down Expand Up @@ -80,25 +81,30 @@ def _get_cache_span_description(


def _set_cache_data(
span: "Span",
span: "Union[Span, StreamedSpan]",
redis_client: "Any",
properties: "dict[str, Any]",
return_value: "Optional[Any]",
) -> None:
if isinstance(span, StreamedSpan):
set_on_span = span.set_attribute
else:
set_on_span = span.set_data

with capture_internal_exceptions():
span.set_data(SPANDATA.CACHE_KEY, properties["key"])
set_on_span(SPANDATA.CACHE_KEY, properties["key"])

if properties["redis_command"] in GET_COMMANDS:
if return_value is not None:
span.set_data(SPANDATA.CACHE_HIT, True)
set_on_span(SPANDATA.CACHE_HIT, True)
size = (
len(str(return_value).encode("utf-8"))
if not isinstance(return_value, bytes)
else len(return_value)
)
span.set_data(SPANDATA.CACHE_ITEM_SIZE, size)
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)
else:
span.set_data(SPANDATA.CACHE_HIT, False)
set_on_span(SPANDATA.CACHE_HIT, False)

elif properties["redis_command"] in SET_COMMANDS:
if properties["value"] is not None:
Expand All @@ -107,7 +113,7 @@ def _set_cache_data(
if not isinstance(properties["value"], bytes)
else len(properties["value"])
)
span.set_data(SPANDATA.CACHE_ITEM_SIZE, size)
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)

try:
connection_params = redis_client.connection_pool.connection_kwargs
Expand All @@ -122,8 +128,8 @@ def _set_cache_data(

host = connection_params.get("host")
if host is not None:
span.set_data(SPANDATA.NETWORK_PEER_ADDRESS, host)
set_on_span(SPANDATA.NETWORK_PEER_ADDRESS, host)

port = connection_params.get("port")
if port is not None:
span.set_data(SPANDATA.NETWORK_PEER_PORT, port)
set_on_span(SPANDATA.NETWORK_PEER_PORT, port)
Loading
Loading