Skip to content

Commit 7ec9a90

Browse files
merge master
2 parents f502225 + 485aa6d commit 7ec9a90

31 files changed

+1147
-653
lines changed

.craft.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
minVersion: 0.34.1
1+
minVersion: 2.17.0
22
targets:
33
- name: pypi
44
includeNames: /^sentry[_\-]sdk.*$/

.github/release.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,28 @@ changelog:
1010
- "Changelog: Feature"
1111
commit_patterns:
1212
- "^feat\\b"
13+
semver: minor
1314
- title: Bug Fixes 🐛
1415
labels:
1516
- "Changelog: Bugfix"
1617
commit_patterns:
1718
- "^(fix|bugfix)\\b"
19+
semver: patch
1820
- title: Deprecations 🏗️
1921
labels:
2022
- "Changelog: Deprecation"
2123
commit_patterns:
2224
- "deprecat" # deprecation, deprecated
25+
semver: patch
2326
- title: Documentation 📚
2427
labels:
2528
- "Changelog: Docs"
2629
commit_patterns:
2730
- "^docs?\\b"
31+
semver: patch
2832
- title: Internal Changes 🔧
2933
labels:
3034
- "Changelog: Internal"
3135
commit_patterns:
3236
- "^(build|ref|chore|ci|tests?)\\b"
37+
semver: patch

scripts/populate_tox/package_dependencies.jsonl

Lines changed: 8 additions & 7 deletions
Large diffs are not rendered by default.

scripts/populate_tox/releases.jsonl

Lines changed: 29 additions & 30 deletions
Large diffs are not rendered by default.

sentry_sdk/_batcher.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
import os
2+
import random
3+
import threading
4+
from datetime import datetime, timezone
5+
from typing import TYPE_CHECKING, TypeVar, Generic
6+
7+
from sentry_sdk.utils import format_timestamp, safe_repr, serialize_attribute
8+
from sentry_sdk.envelope import Envelope, Item, PayloadRef
9+
10+
if TYPE_CHECKING:
11+
from typing import Optional, Callable, Any
12+
13+
T = TypeVar("T")
14+
15+
16+
class Batcher(Generic[T]):
17+
MAX_BEFORE_FLUSH = 100
18+
MAX_BEFORE_DROP = 1_000
19+
FLUSH_WAIT_TIME = 5.0
20+
21+
TYPE = ""
22+
CONTENT_TYPE = ""
23+
24+
def __init__(
25+
self,
26+
capture_func: "Callable[[Envelope], None]",
27+
record_lost_func: "Callable[..., None]",
28+
) -> None:
29+
self._buffer: "list[T]" = []
30+
self._capture_func = capture_func
31+
self._record_lost_func = record_lost_func
32+
self._running = True
33+
self._lock = threading.Lock()
34+
35+
self._flush_event: "threading.Event" = threading.Event()
36+
37+
self._flusher: "Optional[threading.Thread]" = None
38+
self._flusher_pid: "Optional[int]" = None
39+
40+
def _ensure_thread(self) -> bool:
41+
"""For forking processes we might need to restart this thread.
42+
This ensures that our process actually has that thread running.
43+
"""
44+
if not self._running:
45+
return False
46+
47+
pid = os.getpid()
48+
if self._flusher_pid == pid:
49+
return True
50+
51+
with self._lock:
52+
# Recheck to make sure another thread didn't get here and start the
53+
# the flusher in the meantime
54+
if self._flusher_pid == pid:
55+
return True
56+
57+
self._flusher_pid = pid
58+
59+
self._flusher = threading.Thread(target=self._flush_loop)
60+
self._flusher.daemon = True
61+
62+
try:
63+
self._flusher.start()
64+
except RuntimeError:
65+
# Unfortunately at this point the interpreter is in a state that no
66+
# longer allows us to spawn a thread and we have to bail.
67+
self._running = False
68+
return False
69+
70+
return True
71+
72+
def _flush_loop(self) -> None:
73+
while self._running:
74+
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
75+
self._flush_event.clear()
76+
self._flush()
77+
78+
def add(self, item: "T") -> None:
79+
if not self._ensure_thread() or self._flusher is None:
80+
return None
81+
82+
with self._lock:
83+
if len(self._buffer) >= self.MAX_BEFORE_DROP:
84+
self._record_lost(item)
85+
return None
86+
87+
self._buffer.append(item)
88+
if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
89+
self._flush_event.set()
90+
91+
def kill(self) -> None:
92+
if self._flusher is None:
93+
return
94+
95+
self._running = False
96+
self._flush_event.set()
97+
self._flusher = None
98+
99+
def flush(self) -> None:
100+
self._flush()
101+
102+
def _add_to_envelope(self, envelope: "Envelope") -> None:
103+
envelope.add_item(
104+
Item(
105+
type=self.TYPE,
106+
content_type=self.CONTENT_TYPE,
107+
headers={
108+
"item_count": len(self._buffer),
109+
},
110+
payload=PayloadRef(
111+
json={
112+
"items": [
113+
self._to_transport_format(item) for item in self._buffer
114+
]
115+
}
116+
),
117+
)
118+
)
119+
120+
def _flush(self) -> "Optional[Envelope]":
121+
envelope = Envelope(
122+
headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
123+
)
124+
with self._lock:
125+
if len(self._buffer) == 0:
126+
return None
127+
128+
self._add_to_envelope(envelope)
129+
self._buffer.clear()
130+
131+
self._capture_func(envelope)
132+
return envelope
133+
134+
def _record_lost(self, item: "T") -> None:
135+
pass
136+
137+
@staticmethod
138+
def _to_transport_format(item: "T") -> "Any":
139+
pass

sentry_sdk/_log_batcher.py

Lines changed: 35 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -1,175 +1,56 @@
1-
import os
2-
import random
3-
import threading
4-
from datetime import datetime, timezone
5-
from typing import Optional, List, Callable, TYPE_CHECKING, Any
1+
from typing import TYPE_CHECKING
62

7-
from sentry_sdk.utils import format_timestamp, safe_repr
3+
from sentry_sdk._batcher import Batcher
4+
from sentry_sdk.utils import serialize_attribute
85
from sentry_sdk.envelope import Envelope, Item, PayloadRef
96

107
if TYPE_CHECKING:
8+
from typing import Any
119
from sentry_sdk._types import Log
1210

1311

14-
class LogBatcher:
15-
MAX_LOGS_BEFORE_FLUSH = 100
16-
MAX_LOGS_BEFORE_DROP = 1_000
12+
class LogBatcher(Batcher["Log"]):
13+
MAX_BEFORE_FLUSH = 100
14+
MAX_BEFORE_DROP = 1_000
1715
FLUSH_WAIT_TIME = 5.0
1816

19-
def __init__(
20-
self,
21-
capture_func: "Callable[[Envelope], None]",
22-
record_lost_func: "Callable[..., None]",
23-
) -> None:
24-
self._log_buffer: "List[Log]" = []
25-
self._capture_func = capture_func
26-
self._record_lost_func = record_lost_func
27-
self._running = True
28-
self._lock = threading.Lock()
29-
30-
self._flush_event: "threading.Event" = threading.Event()
31-
32-
self._flusher: "Optional[threading.Thread]" = None
33-
self._flusher_pid: "Optional[int]" = None
34-
35-
def _ensure_thread(self) -> bool:
36-
"""For forking processes we might need to restart this thread.
37-
This ensures that our process actually has that thread running.
38-
"""
39-
if not self._running:
40-
return False
41-
42-
pid = os.getpid()
43-
if self._flusher_pid == pid:
44-
return True
45-
46-
with self._lock:
47-
# Recheck to make sure another thread didn't get here and start the
48-
# the flusher in the meantime
49-
if self._flusher_pid == pid:
50-
return True
51-
52-
self._flusher_pid = pid
53-
54-
self._flusher = threading.Thread(target=self._flush_loop)
55-
self._flusher.daemon = True
56-
57-
try:
58-
self._flusher.start()
59-
except RuntimeError:
60-
# Unfortunately at this point the interpreter is in a state that no
61-
# longer allows us to spawn a thread and we have to bail.
62-
self._running = False
63-
return False
64-
65-
return True
66-
67-
def _flush_loop(self) -> None:
68-
while self._running:
69-
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
70-
self._flush_event.clear()
71-
self._flush()
72-
73-
def add(
74-
self,
75-
log: "Log",
76-
) -> None:
77-
if not self._ensure_thread() or self._flusher is None:
78-
return None
79-
80-
with self._lock:
81-
if len(self._log_buffer) >= self.MAX_LOGS_BEFORE_DROP:
82-
# Construct log envelope item without sending it to report lost bytes
83-
log_item = Item(
84-
type="log",
85-
content_type="application/vnd.sentry.items.log+json",
86-
headers={
87-
"item_count": 1,
88-
},
89-
payload=PayloadRef(
90-
json={"items": [LogBatcher._log_to_transport_format(log)]}
91-
),
92-
)
93-
self._record_lost_func(
94-
reason="queue_overflow",
95-
data_category="log_item",
96-
item=log_item,
97-
quantity=1,
98-
)
99-
return None
100-
101-
self._log_buffer.append(log)
102-
if len(self._log_buffer) >= self.MAX_LOGS_BEFORE_FLUSH:
103-
self._flush_event.set()
104-
105-
def kill(self) -> None:
106-
if self._flusher is None:
107-
return
108-
109-
self._running = False
110-
self._flush_event.set()
111-
self._flusher = None
112-
113-
def flush(self) -> None:
114-
self._flush()
17+
TYPE = "log"
18+
CONTENT_TYPE = "application/vnd.sentry.items.log+json"
11519

11620
@staticmethod
117-
def _log_to_transport_format(log: "Log") -> "Any":
118-
def format_attribute(val: "int | float | str | bool") -> "Any":
119-
if isinstance(val, bool):
120-
return {"value": val, "type": "boolean"}
121-
if isinstance(val, int):
122-
return {"value": val, "type": "integer"}
123-
if isinstance(val, float):
124-
return {"value": val, "type": "double"}
125-
if isinstance(val, str):
126-
return {"value": val, "type": "string"}
127-
return {"value": safe_repr(val), "type": "string"}
128-
129-
if "sentry.severity_number" not in log["attributes"]:
130-
log["attributes"]["sentry.severity_number"] = log["severity_number"]
131-
if "sentry.severity_text" not in log["attributes"]:
132-
log["attributes"]["sentry.severity_text"] = log["severity_text"]
21+
def _to_transport_format(item: "Log") -> "Any":
22+
if "sentry.severity_number" not in item["attributes"]:
23+
item["attributes"]["sentry.severity_number"] = item["severity_number"]
24+
if "sentry.severity_text" not in item["attributes"]:
25+
item["attributes"]["sentry.severity_text"] = item["severity_text"]
13326

13427
res = {
135-
"timestamp": int(log["time_unix_nano"]) / 1.0e9,
136-
"trace_id": log.get("trace_id", "00000000-0000-0000-0000-000000000000"),
137-
"span_id": log.get("span_id"),
138-
"level": str(log["severity_text"]),
139-
"body": str(log["body"]),
28+
"timestamp": int(item["time_unix_nano"]) / 1.0e9,
29+
"trace_id": item.get("trace_id", "00000000-0000-0000-0000-000000000000"),
30+
"span_id": item.get("span_id"),
31+
"level": str(item["severity_text"]),
32+
"body": str(item["body"]),
14033
"attributes": {
141-
k: format_attribute(v) for (k, v) in log["attributes"].items()
34+
k: serialize_attribute(v) for (k, v) in item["attributes"].items()
14235
},
14336
}
14437

14538
return res
14639

147-
def _flush(self) -> "Optional[Envelope]":
148-
envelope = Envelope(
149-
headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
40+
def _record_lost(self, item: "Log") -> None:
41+
# Construct log envelope item without sending it to report lost bytes
42+
log_item = Item(
43+
type=self.TYPE,
44+
content_type=self.CONTENT_TYPE,
45+
headers={
46+
"item_count": 1,
47+
},
48+
payload=PayloadRef(json={"items": [self._to_transport_format(item)]}),
15049
)
151-
with self._lock:
152-
if len(self._log_buffer) == 0:
153-
return None
15450

155-
envelope.add_item(
156-
Item(
157-
type="log",
158-
content_type="application/vnd.sentry.items.log+json",
159-
headers={
160-
"item_count": len(self._log_buffer),
161-
},
162-
payload=PayloadRef(
163-
json={
164-
"items": [
165-
self._log_to_transport_format(log)
166-
for log in self._log_buffer
167-
]
168-
}
169-
),
170-
)
171-
)
172-
self._log_buffer.clear()
173-
174-
self._capture_func(envelope)
175-
return envelope
51+
self._record_lost_func(
52+
reason="queue_overflow",
53+
data_category="log_item",
54+
item=log_item,
55+
quantity=1,
56+
)

0 commit comments

Comments
 (0)