Skip to content

Commit 439af17

Browse files
authored
fix: store failed log events on intake errors (#1131)
1 parent 65f12d5 commit 439af17

2 files changed

Lines changed: 94 additions & 13 deletions

File tree

aws/logs_monitoring/logs/datadog_http_client.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import logging
88
import os
9-
from concurrent.futures import as_completed
109

1110
from requests_futures.sessions import FuturesSession
1211

@@ -67,7 +66,6 @@ def __init__(
6766
self._timeout = timeout
6867
self._session = None
6968
self._ssl_validation = not skip_ssl_validation
70-
self._futures = []
7169

7270
if logger.isEnabledFor(logging.DEBUG):
7371
logger.debug(
@@ -81,13 +79,6 @@ def _connect(self):
8179
self._session.headers.update(self._HEADERS)
8280

8381
def _close(self):
84-
# Resolve all the futures and log exceptions if any
85-
for future in as_completed(self._futures):
86-
try:
87-
future.result()
88-
except Exception as e:
89-
logger.error(f"Exception while forwarding logs: {e}")
90-
9182
self._session.close()
9283

9384
def send(self, logs):
@@ -101,11 +92,11 @@ def send(self, logs):
10192
if DD_USE_COMPRESSION:
10293
data = compress_logs(data, DD_COMPRESSION_LEVEL)
10394

104-
# FuturesSession returns immediately with a future object
105-
future = self._session.post(
95+
# Resolve the future here so callers can attribute failures to this batch.
96+
response = self._session.post(
10697
self._url, data, timeout=self._timeout, verify=self._ssl_validation
107-
)
108-
self._futures.append(future)
98+
).result()
99+
response.raise_for_status()
109100

110101
def __enter__(self):
111102
self._connect()
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import unittest
2+
import sys
3+
import types
4+
from unittest.mock import MagicMock, patch
5+
6+
botocore = types.ModuleType("botocore")
7+
botocore.config = types.ModuleType("botocore.config")
8+
botocore.config.Config = MagicMock()
9+
sys.modules["boto3"] = MagicMock()
10+
sys.modules["botocore"] = botocore
11+
sys.modules["botocore.config"] = botocore.config
12+
sys.modules["requests"] = MagicMock()
13+
sys.modules["requests_futures.sessions"] = MagicMock()
14+
15+
16+
class TestDatadogHTTPClient(unittest.TestCase):
17+
def _client(self, session):
18+
from logs.datadog_http_client import DatadogHTTPClient
19+
20+
scrubber = MagicMock()
21+
scrubber.scrub.side_effect = lambda payload: payload
22+
23+
client = DatadogHTTPClient("example.com", 443, False, False, "apikey", scrubber)
24+
client._session = session
25+
return client
26+
27+
def test_send_raises_future_exception(self):
28+
future = MagicMock()
29+
future.result.side_effect = Exception("network error")
30+
session = MagicMock()
31+
session.post.return_value = future
32+
33+
with self.assertRaisesRegex(Exception, "network error"):
34+
self._client(session).send(['{"message":"hello"}'])
35+
36+
def test_send_raises_http_error_response(self):
37+
response = MagicMock()
38+
response.raise_for_status.side_effect = Exception("403 Client Error")
39+
future = MagicMock()
40+
future.result.return_value = response
41+
session = MagicMock()
42+
session.post.return_value = future
43+
44+
with self.assertRaisesRegex(Exception, "403 Client Error"):
45+
self._client(session).send(['{"message":"hello"}'])
46+
47+
def test_send_returns_after_successful_response(self):
48+
response = MagicMock()
49+
future = MagicMock()
50+
future.result.return_value = response
51+
session = MagicMock()
52+
session.post.return_value = future
53+
54+
self._client(session).send(['{"message":"hello"}'])
55+
56+
response.raise_for_status.assert_called_once_with()
57+
58+
59+
class TestForwarderFailedLogs(unittest.TestCase):
60+
@patch("forwarder.send_event_metric")
61+
@patch("forwarder.DatadogHTTPClient")
62+
@patch("forwarder.DD_STORE_FAILED_EVENTS", True)
63+
def test_forward_logs_stores_failed_batch(self, mock_http_client, mock_send_metric):
64+
from forwarder import Forwarder
65+
from retry.enums import RetryPrefix
66+
67+
client = MagicMock()
68+
client.__enter__.return_value = client
69+
client.send.side_effect = Exception("send failed")
70+
mock_http_client.return_value = client
71+
72+
forwarder = Forwarder.__new__(Forwarder)
73+
forwarder.storage = MagicMock()
74+
forwarder._scrubber = MagicMock()
75+
forwarder._matcher = MagicMock()
76+
forwarder._matcher.match.return_value = True
77+
forwarder._batcher = MagicMock()
78+
forwarder._batcher.batch.return_value = [['"hello"']]
79+
80+
forwarder._forward_logs(["hello"])
81+
82+
forwarder.storage.store_data.assert_called_once_with(
83+
RetryPrefix.LOGS, ['"hello"']
84+
)
85+
mock_send_metric.assert_any_call("logs_failed", ['"hello"'])
86+
mock_send_metric.assert_any_call("logs_forwarded", 0)
87+
88+
89+
if __name__ == "__main__":
90+
unittest.main()

0 commit comments

Comments
 (0)