-
Notifications
You must be signed in to change notification settings - Fork 610
Expand file tree
/
Copy pathcaches.py
More file actions
135 lines (110 loc) · 4.11 KB
/
caches.py
File metadata and controls
135 lines (110 loc) · 4.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
Code used for the Caches module in Sentry
"""
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations.redis.utils import _get_safe_key, _key_as_string
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.utils import capture_internal_exceptions
GET_COMMANDS = ("get", "mget")
SET_COMMANDS = ("set", "setex")
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from sentry_sdk.integrations.redis import RedisIntegration
from sentry_sdk.tracing import Span
from typing import Any, Optional, Union
def _get_op(name: str) -> "Optional[str]":
op = None
if name.lower() in GET_COMMANDS:
op = OP.CACHE_GET
elif name.lower() in SET_COMMANDS:
op = OP.CACHE_PUT
return op
def _compile_cache_span_properties(
redis_command: str,
args: "tuple[Any, ...]",
kwargs: "dict[str, Any]",
integration: "RedisIntegration",
) -> "dict[str, Any]":
key = _get_safe_key(redis_command, args, kwargs)
key_as_string = _key_as_string(key)
keys_as_string = key_as_string.split(", ")
is_cache_key = False
for prefix in integration.cache_prefixes:
for kee in keys_as_string:
if kee.startswith(prefix):
is_cache_key = True
break
if is_cache_key:
break
value = None
if redis_command.lower() in SET_COMMANDS:
value = args[-1]
properties = {
"op": _get_op(redis_command),
"description": _get_cache_span_description(
redis_command, args, kwargs, integration
),
"key": key,
"key_as_string": key_as_string,
"redis_command": redis_command.lower(),
"is_cache_key": is_cache_key,
"value": value,
}
return properties
def _get_cache_span_description(
redis_command: str,
args: "tuple[Any, ...]",
kwargs: "dict[str, Any]",
integration: "RedisIntegration",
) -> str:
description = _key_as_string(_get_safe_key(redis_command, args, kwargs))
if integration.max_data_size and len(description) > integration.max_data_size:
description = description[: integration.max_data_size - len("...")] + "..."
return description
def _set_cache_data(
span: "Union[Span, StreamedSpan]",
redis_client: "Any",
properties: "dict[str, Any]",
return_value: "Optional[Any]",
) -> None:
if isinstance(span, StreamedSpan):
set_on_span = span.set_attribute
else:
set_on_span = span.set_data
with capture_internal_exceptions():
set_on_span(SPANDATA.CACHE_KEY, properties["key"])
if properties["redis_command"] in GET_COMMANDS:
if return_value is not None:
set_on_span(SPANDATA.CACHE_HIT, True)
size = (
len(str(return_value).encode("utf-8"))
if not isinstance(return_value, bytes)
else len(return_value)
)
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)
else:
set_on_span(SPANDATA.CACHE_HIT, False)
elif properties["redis_command"] in SET_COMMANDS:
if properties["value"] is not None:
size = (
len(properties["value"].encode("utf-8"))
if not isinstance(properties["value"], bytes)
else len(properties["value"])
)
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)
try:
connection_params = redis_client.connection_pool.connection_kwargs
except AttributeError:
# If it is a cluster, there is no connection_pool attribute so we
# need to get the default node from the cluster instance
default_node = redis_client.get_default_node()
connection_params = {
"host": default_node.host,
"port": default_node.port,
}
host = connection_params.get("host")
if host is not None:
set_on_span(SPANDATA.NETWORK_PEER_ADDRESS, host)
port = connection_params.get("port")
if port is not None:
set_on_span(SPANDATA.NETWORK_PEER_PORT, port)