Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 60 additions & 21 deletions sentry_sdk/integrations/redis/_async_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
_set_pipeline_data,
)
from sentry_sdk.tracing import Span
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import capture_internal_exceptions

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, Union
from typing import Any, Optional, Union
from sentry_sdk.traces import StreamedSpan
from redis.asyncio.client import Pipeline, StrictRedis
from redis.asyncio.cluster import ClusterPipeline, RedisCluster

Expand All @@ -26,21 +28,36 @@
pipeline_cls: "Union[type[Pipeline[Any]], type[ClusterPipeline[Any]]]",
is_cluster: bool,
get_command_args_fn: "Any",
set_db_data_fn: "Callable[[Span, Any], None]",
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
old_execute = pipeline_cls.execute

from sentry_sdk.integrations.redis import RedisIntegration

async def _sentry_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
if sentry_sdk.get_client().get_integration(RedisIntegration) is None:
client = sentry_sdk.get_client()
if client.get_integration(RedisIntegration) is None:
return await old_execute(self, *args, **kwargs)

with sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
) as span:
span_streaming = has_span_streaming_enabled(client.options)

span: "Union[Span, StreamedSpan]"
if span_streaming:
span = sentry_sdk.traces.start_span(
name="redis.pipeline.execute",
attributes={
"sentry.origin": SPAN_ORIGIN,
"sentry.op": OP.DB_REDIS,
},
)
else:
span = sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
)

with span:
with capture_internal_exceptions():
try:
command_seq = self._execution_strategy._command_queue
Expand All @@ -67,7 +84,7 @@
def patch_redis_async_client(
cls: "Union[type[StrictRedis[Any]], type[RedisCluster[Any]]]",
is_cluster: bool,
set_db_data_fn: "Callable[[Span, Any], None]",
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
old_execute_command = cls.execute_command

Expand All @@ -76,45 +93,67 @@
async def _sentry_execute_command(
self: "Any", name: str, *args: "Any", **kwargs: "Any"
) -> "Any":
integration = sentry_sdk.get_client().get_integration(RedisIntegration)
client = sentry_sdk.get_client()
integration = client.get_integration(RedisIntegration)
if integration is None:
return await old_execute_command(self, name, *args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

cache_properties = _compile_cache_span_properties(
name,
args,
kwargs,
integration,
)

cache_span = None
cache_span: "Optional[Union[Span, StreamedSpan]]" = None
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
if span_streaming:
cache_span = sentry_sdk.traces.start_span(
name=cache_properties["description"],
attributes={
"sentry.op": cache_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
cache_span.__enter__()

db_properties = _compile_db_span_properties(integration, name, args)

db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span: "Union[Span, StreamedSpan]"
if span_streaming:
db_span = sentry_sdk.traces.start_span(
name=db_properties["description"],
attributes={
"sentry.op": db_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span.__enter__()

set_db_data_fn(db_span, self)
Comment thread
sentry-warden[bot] marked this conversation as resolved.
_set_client_data(db_span, is_cluster, name, *args)

value = await old_execute_command(self, name, *args, **kwargs)

db_span.__exit__(None, None, None)

if cache_span:
_set_cache_data(cache_span, self, cache_properties, value)
cache_span.__exit__(None, None, None)

Check warning on line 156 in sentry_sdk/integrations/redis/_async_common.py

View check run for this annotation

@sentry/warden / warden: find-bugs

Spans not properly closed on exception in _sentry_execute_command

In `_sentry_execute_command`, spans are opened via manual `__enter__()` calls (lines 126, 145) but their corresponding `__exit__()` calls (lines 152, 156) are not wrapped in try/finally. If `old_execute_command` raises an exception at line 150, both `db_span` and `cache_span` will never be closed, resulting in spans not being sent to Sentry, error status not being recorded, and the scope's span reference not being restored.

return value

Expand Down
83 changes: 62 additions & 21 deletions sentry_sdk/integrations/redis/_sync_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,51 @@
_set_pipeline_data,
)
from sentry_sdk.tracing import Span
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import capture_internal_exceptions

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any
from typing import Any, Optional, Union
from sentry_sdk.traces import StreamedSpan


def patch_redis_pipeline(
pipeline_cls: "Any",
is_cluster: bool,
get_command_args_fn: "Any",
set_db_data_fn: "Callable[[Span, Any], None]",
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
old_execute = pipeline_cls.execute

from sentry_sdk.integrations.redis import RedisIntegration

def sentry_patched_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
if sentry_sdk.get_client().get_integration(RedisIntegration) is None:
client = sentry_sdk.get_client()
if client.get_integration(RedisIntegration) is None:
return old_execute(self, *args, **kwargs)

with sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
) as span:
span_streaming = has_span_streaming_enabled(client.options)

span: "Union[Span, StreamedSpan]"
if span_streaming:
span = sentry_sdk.traces.start_span(
name="redis.pipeline.execute",
attributes={
"sentry.origin": SPAN_ORIGIN,
"sentry.op": OP.DB_REDIS,
},
)
else:
span = sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
)
Comment thread
sentrivana marked this conversation as resolved.

with span:
with capture_internal_exceptions():
command_seq = None
try:
Expand All @@ -61,7 +78,9 @@


def patch_redis_client(
cls: "Any", is_cluster: bool, set_db_data_fn: "Callable[[Span, Any], None]"
cls: "Any",
is_cluster: bool,
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
"""
This function can be used to instrument custom redis client classes or
Expand All @@ -74,36 +93,58 @@
def sentry_patched_execute_command(
self: "Any", name: str, *args: "Any", **kwargs: "Any"
) -> "Any":
integration = sentry_sdk.get_client().get_integration(RedisIntegration)
client = sentry_sdk.get_client()
integration = client.get_integration(RedisIntegration)
if integration is None:
return old_execute_command(self, name, *args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

cache_properties = _compile_cache_span_properties(
name,
args,
kwargs,
integration,
)

cache_span = None
cache_span: "Optional[Union[Span, StreamedSpan]]" = None
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
if span_streaming:
cache_span = sentry_sdk.traces.start_span(
name=cache_properties["description"],
attributes={
"sentry.op": cache_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
cache_span.__enter__()

db_properties = _compile_db_span_properties(integration, name, args)

db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span: "Union[Span, StreamedSpan]"
if span_streaming:
db_span = sentry_sdk.traces.start_span(
name=db_properties["description"],
attributes={
"sentry.op": db_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span.__enter__()

set_db_data_fn(db_span, self)

Check warning on line 147 in sentry_sdk/integrations/redis/_sync_common.py

View check run for this annotation

@sentry/warden / warden: find-bugs

[UNR-74R] Spans not properly closed on exception in _sentry_execute_command (additional location)

In `_sentry_execute_command`, spans are opened via manual `__enter__()` calls (lines 126, 145) but their corresponding `__exit__()` calls (lines 152, 156) are not wrapped in try/finally. If `old_execute_command` raises an exception at line 150, both `db_span` and `cache_span` will never be closed, resulting in spans not being sent to Sentry, error status not being recorded, and the scope's span reference not being restored.
_set_client_data(db_span, is_cluster, name, *args)

value = old_execute_command(self, name, *args, **kwargs)
Expand Down
24 changes: 15 additions & 9 deletions sentry_sdk/integrations/redis/modules/caches.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations.redis.utils import _get_safe_key, _key_as_string
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.utils import capture_internal_exceptions

GET_COMMANDS = ("get", "mget")
Expand All @@ -14,7 +15,7 @@
if TYPE_CHECKING:
from sentry_sdk.integrations.redis import RedisIntegration
from sentry_sdk.tracing import Span
from typing import Any, Optional
from typing import Any, Optional, Union


def _get_op(name: str) -> "Optional[str]":
Expand Down Expand Up @@ -80,25 +81,30 @@ def _get_cache_span_description(


def _set_cache_data(
span: "Span",
span: "Union[Span, StreamedSpan]",
redis_client: "Any",
properties: "dict[str, Any]",
return_value: "Optional[Any]",
) -> None:
if isinstance(span, StreamedSpan):
set_on_span = span.set_attribute
else:
set_on_span = span.set_data

with capture_internal_exceptions():
span.set_data(SPANDATA.CACHE_KEY, properties["key"])
set_on_span(SPANDATA.CACHE_KEY, properties["key"])

if properties["redis_command"] in GET_COMMANDS:
if return_value is not None:
span.set_data(SPANDATA.CACHE_HIT, True)
set_on_span(SPANDATA.CACHE_HIT, True)
size = (
len(str(return_value).encode("utf-8"))
if not isinstance(return_value, bytes)
else len(return_value)
)
span.set_data(SPANDATA.CACHE_ITEM_SIZE, size)
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)
else:
span.set_data(SPANDATA.CACHE_HIT, False)
set_on_span(SPANDATA.CACHE_HIT, False)

elif properties["redis_command"] in SET_COMMANDS:
if properties["value"] is not None:
Expand All @@ -107,7 +113,7 @@ def _set_cache_data(
if not isinstance(properties["value"], bytes)
else len(properties["value"])
)
span.set_data(SPANDATA.CACHE_ITEM_SIZE, size)
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)

try:
connection_params = redis_client.connection_pool.connection_kwargs
Expand All @@ -122,8 +128,8 @@ def _set_cache_data(

host = connection_params.get("host")
if host is not None:
span.set_data(SPANDATA.NETWORK_PEER_ADDRESS, host)
set_on_span(SPANDATA.NETWORK_PEER_ADDRESS, host)

port = connection_params.get("port")
if port is not None:
span.set_data(SPANDATA.NETWORK_PEER_PORT, port)
set_on_span(SPANDATA.NETWORK_PEER_PORT, port)
Loading
Loading