Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 60 additions & 21 deletions sentry_sdk/integrations/redis/_async_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
_set_pipeline_data,
)
from sentry_sdk.tracing import Span
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import capture_internal_exceptions

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, Union
from typing import Any, Optional, Union
from sentry_sdk.traces import StreamedSpan
from redis.asyncio.client import Pipeline, StrictRedis
from redis.asyncio.cluster import ClusterPipeline, RedisCluster

Expand All @@ -26,21 +28,36 @@ def patch_redis_async_pipeline(
pipeline_cls: "Union[type[Pipeline[Any]], type[ClusterPipeline[Any]]]",
is_cluster: bool,
get_command_args_fn: "Any",
set_db_data_fn: "Callable[[Span, Any], None]",
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
old_execute = pipeline_cls.execute

from sentry_sdk.integrations.redis import RedisIntegration

async def _sentry_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
if sentry_sdk.get_client().get_integration(RedisIntegration) is None:
client = sentry_sdk.get_client()
if client.get_integration(RedisIntegration) is None:
return await old_execute(self, *args, **kwargs)

with sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
) as span:
span_streaming = has_span_streaming_enabled(client.options)

span: "Union[Span, StreamedSpan]"
if span_streaming:
span = sentry_sdk.traces.start_span(
name="redis.pipeline.execute",
attributes={
"sentry.origin": SPAN_ORIGIN,
"sentry.op": OP.DB_REDIS,
},
)
else:
span = sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
)

with span:
with capture_internal_exceptions():
try:
command_seq = self._execution_strategy._command_queue
Expand All @@ -67,7 +84,7 @@ async def _sentry_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
def patch_redis_async_client(
cls: "Union[type[StrictRedis[Any]], type[RedisCluster[Any]]]",
is_cluster: bool,
set_db_data_fn: "Callable[[Span, Any], None]",
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
old_execute_command = cls.execute_command

Expand All @@ -76,33 +93,55 @@ def patch_redis_async_client(
async def _sentry_execute_command(
self: "Any", name: str, *args: "Any", **kwargs: "Any"
) -> "Any":
integration = sentry_sdk.get_client().get_integration(RedisIntegration)
client = sentry_sdk.get_client()
integration = client.get_integration(RedisIntegration)
if integration is None:
return await old_execute_command(self, name, *args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

cache_properties = _compile_cache_span_properties(
name,
args,
kwargs,
integration,
)

cache_span = None
cache_span: "Optional[Union[Span, StreamedSpan]]" = None
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
if span_streaming:
cache_span = sentry_sdk.traces.start_span(
name=cache_properties["description"],
attributes={
"sentry.op": cache_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
cache_span.__enter__()

db_properties = _compile_db_span_properties(integration, name, args)

db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span: "Union[Span, StreamedSpan]"
if span_streaming:
db_span = sentry_sdk.traces.start_span(
name=db_properties["description"],
attributes={
"sentry.op": db_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span.__enter__()

set_db_data_fn(db_span, self)
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Expand Down
83 changes: 62 additions & 21 deletions sentry_sdk/integrations/redis/_sync_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,51 @@
_set_pipeline_data,
)
from sentry_sdk.tracing import Span
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import capture_internal_exceptions

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any
from typing import Any, Optional, Union
from sentry_sdk.traces import StreamedSpan


def patch_redis_pipeline(
pipeline_cls: "Any",
is_cluster: bool,
get_command_args_fn: "Any",
set_db_data_fn: "Callable[[Span, Any], None]",
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
old_execute = pipeline_cls.execute

from sentry_sdk.integrations.redis import RedisIntegration

def sentry_patched_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
if sentry_sdk.get_client().get_integration(RedisIntegration) is None:
client = sentry_sdk.get_client()
if client.get_integration(RedisIntegration) is None:
return old_execute(self, *args, **kwargs)

with sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
) as span:
span_streaming = has_span_streaming_enabled(client.options)

span: "Union[Span, StreamedSpan]"
if span_streaming:
span = sentry_sdk.traces.start_span(
name="redis.pipeline.execute",
attributes={
"sentry.origin": SPAN_ORIGIN,
"sentry.op": OP.DB_REDIS,
},
)
else:
span = sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
)
Comment thread
sentrivana marked this conversation as resolved.

with span:
with capture_internal_exceptions():
command_seq = None
try:
Expand All @@ -61,7 +78,9 @@


def patch_redis_client(
cls: "Any", is_cluster: bool, set_db_data_fn: "Callable[[Span, Any], None]"
cls: "Any",
is_cluster: bool,
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
"""
This function can be used to instrument custom redis client classes or
Expand All @@ -74,39 +93,61 @@
def sentry_patched_execute_command(
self: "Any", name: str, *args: "Any", **kwargs: "Any"
) -> "Any":
integration = sentry_sdk.get_client().get_integration(RedisIntegration)
client = sentry_sdk.get_client()
integration = client.get_integration(RedisIntegration)
if integration is None:
return old_execute_command(self, name, *args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

cache_properties = _compile_cache_span_properties(
name,
args,
kwargs,
integration,
)

cache_span = None
cache_span: "Optional[Union[Span, StreamedSpan]]" = None
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
if span_streaming:
cache_span = sentry_sdk.traces.start_span(
name=cache_properties["description"],
attributes={
"sentry.op": cache_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
cache_span.__enter__()

db_properties = _compile_db_span_properties(integration, name, args)

db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span: "Union[Span, StreamedSpan]"
if span_streaming:
db_span = sentry_sdk.traces.start_span(
name=db_properties["description"],
attributes={
"sentry.op": db_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span.__enter__()

set_db_data_fn(db_span, self)
_set_client_data(db_span, is_cluster, name, *args)

value = old_execute_command(self, name, *args, **kwargs)

Check warning on line 150 in sentry_sdk/integrations/redis/_sync_common.py

View check run for this annotation

@sentry/warden / warden: find-bugs

Spans are never closed when Redis command raises an exception

The code manually enters spans using `__enter__()` (lines 126, 145) and exits them using `__exit__(None, None, None)` (lines 152, 156), but if `old_execute_command()` (line 150) raises an exception, the `__exit__` calls are never reached. This causes spans to remain open/unfinished, corrupts the scope's current span, and fails to record the error status on the spans.

db_span.__exit__(None, None, None)

Expand Down
24 changes: 15 additions & 9 deletions sentry_sdk/integrations/redis/modules/caches.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations.redis.utils import _get_safe_key, _key_as_string
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.utils import capture_internal_exceptions

GET_COMMANDS = ("get", "mget")
Expand All @@ -14,7 +15,7 @@
if TYPE_CHECKING:
from sentry_sdk.integrations.redis import RedisIntegration
from sentry_sdk.tracing import Span
from typing import Any, Optional
from typing import Any, Optional, Union


def _get_op(name: str) -> "Optional[str]":
Expand Down Expand Up @@ -80,25 +81,30 @@ def _get_cache_span_description(


def _set_cache_data(
span: "Span",
span: "Union[Span, StreamedSpan]",
redis_client: "Any",
properties: "dict[str, Any]",
return_value: "Optional[Any]",
) -> None:
if isinstance(span, StreamedSpan):
set_on_span = span.set_attribute
else:
set_on_span = span.set_data

with capture_internal_exceptions():
span.set_data(SPANDATA.CACHE_KEY, properties["key"])
set_on_span(SPANDATA.CACHE_KEY, properties["key"])

if properties["redis_command"] in GET_COMMANDS:
if return_value is not None:
span.set_data(SPANDATA.CACHE_HIT, True)
set_on_span(SPANDATA.CACHE_HIT, True)
size = (
len(str(return_value).encode("utf-8"))
if not isinstance(return_value, bytes)
else len(return_value)
)
span.set_data(SPANDATA.CACHE_ITEM_SIZE, size)
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)
else:
span.set_data(SPANDATA.CACHE_HIT, False)
set_on_span(SPANDATA.CACHE_HIT, False)

elif properties["redis_command"] in SET_COMMANDS:
if properties["value"] is not None:
Expand All @@ -107,7 +113,7 @@ def _set_cache_data(
if not isinstance(properties["value"], bytes)
else len(properties["value"])
)
span.set_data(SPANDATA.CACHE_ITEM_SIZE, size)
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)

try:
connection_params = redis_client.connection_pool.connection_kwargs
Expand All @@ -122,8 +128,8 @@ def _set_cache_data(

host = connection_params.get("host")
if host is not None:
span.set_data(SPANDATA.NETWORK_PEER_ADDRESS, host)
set_on_span(SPANDATA.NETWORK_PEER_ADDRESS, host)

port = connection_params.get("port")
if port is not None:
span.set_data(SPANDATA.NETWORK_PEER_PORT, port)
set_on_span(SPANDATA.NETWORK_PEER_PORT, port)
Loading
Loading