Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 60 additions & 21 deletions sentry_sdk/integrations/redis/_async_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
_set_pipeline_data,
)
from sentry_sdk.tracing import Span
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import capture_internal_exceptions

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, Union
from typing import Any, Optional, Union
from sentry_sdk.traces import StreamedSpan
from redis.asyncio.client import Pipeline, StrictRedis
from redis.asyncio.cluster import ClusterPipeline, RedisCluster

Expand All @@ -26,21 +28,36 @@
pipeline_cls: "Union[type[Pipeline[Any]], type[ClusterPipeline[Any]]]",
is_cluster: bool,
get_command_args_fn: "Any",
set_db_data_fn: "Callable[[Span, Any], None]",
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
old_execute = pipeline_cls.execute

from sentry_sdk.integrations.redis import RedisIntegration

async def _sentry_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
if sentry_sdk.get_client().get_integration(RedisIntegration) is None:
client = sentry_sdk.get_client()
if client.get_integration(RedisIntegration) is None:
return await old_execute(self, *args, **kwargs)

with sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
) as span:
span_streaming = has_span_streaming_enabled(client.options)

span: "Union[Span, StreamedSpan]"
if span_streaming:
span = sentry_sdk.traces.start_span(
name="redis.pipeline.execute",
attributes={
"sentry.origin": SPAN_ORIGIN,
"sentry.op": OP.DB_REDIS,
},
)
else:
span = sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
)

with span:
with capture_internal_exceptions():
try:
command_seq = self._execution_strategy._command_queue
Expand All @@ -67,7 +84,7 @@
def patch_redis_async_client(
cls: "Union[type[StrictRedis[Any]], type[RedisCluster[Any]]]",
is_cluster: bool,
set_db_data_fn: "Callable[[Span, Any], None]",
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
old_execute_command = cls.execute_command

Expand All @@ -76,45 +93,67 @@
async def _sentry_execute_command(
self: "Any", name: str, *args: "Any", **kwargs: "Any"
) -> "Any":
integration = sentry_sdk.get_client().get_integration(RedisIntegration)
client = sentry_sdk.get_client()
integration = client.get_integration(RedisIntegration)
if integration is None:
return await old_execute_command(self, name, *args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

cache_properties = _compile_cache_span_properties(
name,
args,
kwargs,
integration,
)

cache_span = None
cache_span: "Optional[Union[Span, StreamedSpan]]" = None
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
if span_streaming:
cache_span = sentry_sdk.traces.start_span(
name=cache_properties["description"],
attributes={
"sentry.op": cache_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
cache_span.__enter__()

db_properties = _compile_db_span_properties(integration, name, args)

db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span: "Union[Span, StreamedSpan]"
if span_streaming:
db_span = sentry_sdk.traces.start_span(
name=db_properties["description"],
attributes={
"sentry.op": db_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span.__enter__()

set_db_data_fn(db_span, self)
Comment thread
sentry-warden[bot] marked this conversation as resolved.
_set_client_data(db_span, is_cluster, name, *args)

value = await old_execute_command(self, name, *args, **kwargs)

db_span.__exit__(None, None, None)

if cache_span:
_set_cache_data(cache_span, self, cache_properties, value)
cache_span.__exit__(None, None, None)

Check warning on line 156 in sentry_sdk/integrations/redis/_async_common.py

View check run for this annotation

@sentry/warden / warden: find-bugs

Spans leak when Redis command raises exception in async execute_command

In `_sentry_execute_command`, spans are entered manually via `__enter__()` but if `await old_execute_command()` raises an exception (e.g., connection error, timeout), neither `db_span.__exit__()` nor `cache_span.__exit__()` are called. This causes spans to never be closed/sent, leading to incorrect timing data and potential memory leaks. The code should use try/finally or context managers to ensure proper cleanup.

return value

Expand Down
93 changes: 68 additions & 25 deletions sentry_sdk/integrations/redis/_sync_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,51 @@
_set_pipeline_data,
)
from sentry_sdk.tracing import Span
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import capture_internal_exceptions

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any
from typing import Any, Optional, Union
from sentry_sdk.traces import StreamedSpan


def patch_redis_pipeline(
pipeline_cls: "Any",
is_cluster: bool,
get_command_args_fn: "Any",
set_db_data_fn: "Callable[[Span, Any], None]",
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
old_execute = pipeline_cls.execute

from sentry_sdk.integrations.redis import RedisIntegration

def sentry_patched_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
if sentry_sdk.get_client().get_integration(RedisIntegration) is None:
client = sentry_sdk.get_client()
if client.get_integration(RedisIntegration) is None:
return old_execute(self, *args, **kwargs)

with sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
) as span:
span_streaming = has_span_streaming_enabled(client.options)

span: "Union[Span, StreamedSpan]"
if span_streaming:
span = sentry_sdk.traces.start_span(
name="redis.pipeline.execute",
attributes={
"sentry.origin": SPAN_ORIGIN,
"sentry.op": OP.DB_REDIS,
},
)
else:
span = sentry_sdk.start_span(
op=OP.DB_REDIS,
name="redis.pipeline.execute",
origin=SPAN_ORIGIN,
)

Check warning on line 56 in sentry_sdk/integrations/redis/_sync_common.py

View check run for this annotation

@sentry/warden / warden: code-review

Missing test coverage for new span streaming functionality

The PR introduces significant new functionality for span streaming support but the PR description explicitly indicates tests have not been added yet. This includes conditional logic for streaming spans in both `patch_redis_pipeline` and `patch_redis_client` functions. Without tests, regressions could occur when the streaming path is exercised.
Comment thread
sentrivana marked this conversation as resolved.

with span:
with capture_internal_exceptions():
command_seq = None
try:
Expand All @@ -61,7 +78,9 @@


def patch_redis_client(
cls: "Any", is_cluster: bool, set_db_data_fn: "Callable[[Span, Any], None]"
cls: "Any",
is_cluster: bool,
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
) -> None:
"""
This function can be used to instrument custom redis client classes or
Expand All @@ -74,44 +93,68 @@
def sentry_patched_execute_command(
self: "Any", name: str, *args: "Any", **kwargs: "Any"
) -> "Any":
integration = sentry_sdk.get_client().get_integration(RedisIntegration)
client = sentry_sdk.get_client()
integration = client.get_integration(RedisIntegration)
if integration is None:
return old_execute_command(self, name, *args, **kwargs)

span_streaming = has_span_streaming_enabled(client.options)

cache_properties = _compile_cache_span_properties(
name,
args,
kwargs,
integration,
)

cache_span = None
cache_span: "Optional[Union[Span, StreamedSpan]]" = None
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
if span_streaming:
cache_span = sentry_sdk.traces.start_span(
name=cache_properties["description"],
attributes={
"sentry.op": cache_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
cache_span = sentry_sdk.start_span(
op=cache_properties["op"],
name=cache_properties["description"],
origin=SPAN_ORIGIN,
)
cache_span.__enter__()

db_properties = _compile_db_span_properties(integration, name, args)

db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span: "Union[Span, StreamedSpan]"
if span_streaming:
db_span = sentry_sdk.traces.start_span(
name=db_properties["description"],
attributes={
"sentry.op": db_properties["op"],
"sentry.origin": SPAN_ORIGIN,
},
)
else:
db_span = sentry_sdk.start_span(
op=db_properties["op"],
name=db_properties["description"],
origin=SPAN_ORIGIN,
)
db_span.__enter__()

set_db_data_fn(db_span, self)
_set_client_data(db_span, is_cluster, name, *args)
with capture_internal_exceptions():
set_db_data_fn(db_span, self)
_set_client_data(db_span, is_cluster, name, *args)

value = old_execute_command(self, name, *args, **kwargs)

db_span.__exit__(None, None, None)

Check warning on line 152 in sentry_sdk/integrations/redis/_sync_common.py

View check run for this annotation

@sentry/warden / warden: find-bugs

[F4C-LTU] Spans leak when Redis command raises exception in async execute_command (additional location)

In `_sentry_execute_command`, spans are entered manually via `__enter__()` but if `await old_execute_command()` raises an exception (e.g., connection error, timeout), neither `db_span.__exit__()` nor `cache_span.__exit__()` are called. This causes spans to never be closed/sent, leading to incorrect timing data and potential memory leaks. The code should use try/finally or context managers to ensure proper cleanup.

if cache_span:
_set_cache_data(cache_span, self, cache_properties, value)
with capture_internal_exceptions():
_set_cache_data(cache_span, self, cache_properties, value)

cache_span.__exit__(None, None, None)

return value
Expand Down
24 changes: 15 additions & 9 deletions sentry_sdk/integrations/redis/modules/caches.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations.redis.utils import _get_safe_key, _key_as_string
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.utils import capture_internal_exceptions

GET_COMMANDS = ("get", "mget")
Expand All @@ -14,7 +15,7 @@
if TYPE_CHECKING:
from sentry_sdk.integrations.redis import RedisIntegration
from sentry_sdk.tracing import Span
from typing import Any, Optional
from typing import Any, Optional, Union


def _get_op(name: str) -> "Optional[str]":
Expand Down Expand Up @@ -80,25 +81,30 @@ def _get_cache_span_description(


def _set_cache_data(
span: "Span",
span: "Union[Span, StreamedSpan]",
redis_client: "Any",
properties: "dict[str, Any]",
return_value: "Optional[Any]",
) -> None:
if isinstance(span, StreamedSpan):
set_on_span = span.set_attribute
else:
set_on_span = span.set_data

with capture_internal_exceptions():
span.set_data(SPANDATA.CACHE_KEY, properties["key"])
set_on_span(SPANDATA.CACHE_KEY, properties["key"])

if properties["redis_command"] in GET_COMMANDS:
if return_value is not None:
span.set_data(SPANDATA.CACHE_HIT, True)
set_on_span(SPANDATA.CACHE_HIT, True)
size = (
len(str(return_value).encode("utf-8"))
if not isinstance(return_value, bytes)
else len(return_value)
)
span.set_data(SPANDATA.CACHE_ITEM_SIZE, size)
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)
else:
span.set_data(SPANDATA.CACHE_HIT, False)
set_on_span(SPANDATA.CACHE_HIT, False)

elif properties["redis_command"] in SET_COMMANDS:
if properties["value"] is not None:
Expand All @@ -107,7 +113,7 @@ def _set_cache_data(
if not isinstance(properties["value"], bytes)
else len(properties["value"])
)
span.set_data(SPANDATA.CACHE_ITEM_SIZE, size)
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)

try:
connection_params = redis_client.connection_pool.connection_kwargs
Expand All @@ -122,8 +128,8 @@ def _set_cache_data(

host = connection_params.get("host")
if host is not None:
span.set_data(SPANDATA.NETWORK_PEER_ADDRESS, host)
set_on_span(SPANDATA.NETWORK_PEER_ADDRESS, host)

port = connection_params.get("port")
if port is not None:
span.set_data(SPANDATA.NETWORK_PEER_PORT, port)
set_on_span(SPANDATA.NETWORK_PEER_PORT, port)
Loading
Loading