Skip to content

Commit 995a3c2

Browse files
Merge branch 'main' into grpc-target-fix-1
2 parents fce0286 + d3d505a commit 995a3c2

3 files changed

Lines changed: 30 additions & 18 deletions

File tree

elasticapm/transport/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ def _flush(self, buffer, forced_flush=False) -> None:
250250
"""
251251
if not self.state.should_try():
252252
logger.error("dropping flushed data due to transport failure back-off")
253+
buffer.close()
253254
else:
254255
fileobj = buffer.fileobj # get a reference to the fileobj before closing the gzip file
255256
buffer.close()
@@ -261,6 +262,8 @@ def _flush(self, buffer, forced_flush=False) -> None:
261262
except Exception as e:
262263
self.handle_transport_fail(e)
263264

265+
data.release()
266+
264267
def start_thread(self, pid=None) -> None:
265268
super(Transport, self).start_thread(pid=pid)
266269
if (not self._thread or self.pid != self._thread.pid) and not self._closed:

elasticapm/utils/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ def get_name_from_func(func: FunctionType) -> str:
7878
return "partial({})".format(get_name_from_func(func.func))
7979
elif hasattr(func, "_partialmethod") and hasattr(func._partialmethod, "func"):
8080
return "partial({})".format(get_name_from_func(func._partialmethod.func))
81+
elif hasattr(func, "__partialmethod__") and hasattr(func.__partialmethod__, "func"):
82+
return "partial({})".format(get_name_from_func(func.__partialmethod__.func))
8183

8284
module = func.__module__
8385

tests/transports/test_base.py

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -107,18 +107,25 @@ def test_empty_queue_flush(mock_send, elasticapm_client):
107107
transport.close()
108108

109109

110-
@mock.patch("elasticapm.transport.base.Transport.send")
110+
@mock.patch("elasticapm.transport.base.Transport._flush")
111111
@pytest.mark.parametrize("elasticapm_client", [{"api_request_time": "5s"}], indirect=True)
112-
def test_metadata_prepended(mock_send, elasticapm_client):
112+
def test_metadata_prepended(mock_flush, elasticapm_client):
113113
transport = Transport(client=elasticapm_client, compress_level=0)
114114
transport.start_thread()
115115
transport.queue("error", {}, flush=True)
116116
transport.close()
117-
assert mock_send.call_count == 1
118-
args, kwargs = mock_send.call_args
119-
data = gzip.decompress(args[0])
117+
assert mock_flush.call_count == 1
118+
args, kwargs = mock_flush.call_args
119+
buffer = args[0]
120+
# this test used to mock send but after we fixed a leak for not releasing the memoryview containing
121+
# the gzipped data we cannot read it anymore. So reimplement _flush and read the data ourselves
122+
fileobj = buffer.fileobj
123+
buffer.close()
124+
compressed_data = fileobj.getbuffer()
125+
data = gzip.decompress(compressed_data)
120126
data = data.decode("utf-8").split("\n")
121127
assert "metadata" in data[0]
128+
compressed_data.release()
122129

123130

124131
@mock.patch("elasticapm.transport.base.Transport.send")
@@ -157,43 +164,43 @@ def test_api_request_time_dynamic(mock_send, caplog, elasticapm_client):
157164
assert mock_send.call_count == 0
158165

159166

160-
@pytest.mark.skipif(sys.version_info >= (3, 12), reason="Failing locally on 3.12.0rc1") # TODO py3.12
167+
def _cleanup_flush_mock_buffers(mock_flush):
168+
args, kwargs = mock_flush.call_args
169+
buffer = args[0]
170+
buffer.close()
171+
172+
161173
@mock.patch("elasticapm.transport.base.Transport._flush")
162174
def test_api_request_size_dynamic(mock_flush, caplog, elasticapm_client):
163-
elasticapm_client.config.update(version="1", api_request_size="100b")
175+
elasticapm_client.config.update(version="1", api_request_size="9b")
164176
transport = Transport(client=elasticapm_client, queue_chill_count=1)
165177
transport.start_thread()
166178
try:
167179
with caplog.at_level("DEBUG", "elasticapm.transport"):
168-
# we need to add lots of uncompressible data to fill up the gzip-internal buffer
169-
for i in range(12):
170-
transport.queue("error", "".join(random.choice(string.ascii_letters) for i in range(2000)))
180+
transport.queue("error", "".join(random.choice(string.ascii_letters) for i in range(2000)))
171181
transport._flushed.wait(timeout=0.1)
182+
_cleanup_flush_mock_buffers(mock_flush)
172183
assert mock_flush.call_count == 1
173184
elasticapm_client.config.update(version="1", api_request_size="1mb")
174185
with caplog.at_level("DEBUG", "elasticapm.transport"):
175-
# we need to add lots of uncompressible data to fill up the gzip-internal buffer
176-
for i in range(12):
177-
transport.queue("error", "".join(random.choice(string.ascii_letters) for i in range(2000)))
186+
transport.queue("error", "".join(random.choice(string.ascii_letters) for i in range(2000)))
178187
transport._flushed.wait(timeout=0.1)
179188
# Should be unchanged because our buffer limit is much higher.
180189
assert mock_flush.call_count == 1
181190
finally:
182191
transport.close()
183192

184193

185-
@pytest.mark.skipif(sys.version_info >= (3, 12), reason="Failing locally on 3.12.0rc1") # TODO py3.12
186194
@mock.patch("elasticapm.transport.base.Transport._flush")
187-
@pytest.mark.parametrize("elasticapm_client", [{"api_request_size": "100b"}], indirect=True)
195+
@pytest.mark.parametrize("elasticapm_client", [{"api_request_size": "9b"}], indirect=True)
188196
def test_flush_time_size(mock_flush, caplog, elasticapm_client):
189197
transport = Transport(client=elasticapm_client, queue_chill_count=1)
190198
transport.start_thread()
191199
try:
192200
with caplog.at_level("DEBUG", "elasticapm.transport"):
193-
# we need to add lots of uncompressible data to fill up the gzip-internal buffer
194-
for i in range(12):
195-
transport.queue("error", "".join(random.choice(string.ascii_letters) for i in range(2000)))
201+
transport.queue("error", "".join(random.choice(string.ascii_letters) for i in range(2000)))
196202
transport._flushed.wait(timeout=0.1)
203+
_cleanup_flush_mock_buffers(mock_flush)
197204
assert mock_flush.call_count == 1
198205
finally:
199206
transport.close()

0 commit comments

Comments
 (0)