Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions aws/logs_monitoring/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,21 @@
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))


class Forwarder(object):
class Forwarder:
def __init__(self, function_prefix):
self.trace_connection = TraceConnection(
DD_TRACE_INTAKE_URL, DD_API_KEY, DD_SKIP_SSL_VALIDATION
)
self.storage = create_storage(function_prefix)
self._scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS)
self._matcher = DatadogMatcher(
include_pattern=INCLUDE_AT_MATCH, exclude_pattern=EXCLUDE_AT_MATCH
)
self._batcher = DatadogBatcher(
max_item_size_bytes=512 * 1000,
max_batch_size_bytes=4 * 1000 * 1000,
max_items_count=400,
)

def forward(self, logs, metrics, traces):
"""
Expand Down Expand Up @@ -81,11 +90,6 @@ def _forward_logs(self, logs, key=None):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Forwarding {len(logs)} logs")

scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS)
matcher = DatadogMatcher(
include_pattern=INCLUDE_AT_MATCH, exclude_pattern=EXCLUDE_AT_MATCH
)

logs_to_forward = []
for log in logs:
if key:
Expand All @@ -101,20 +105,24 @@ def _forward_logs(self, logs, key=None):
to_forward = dump_event(log)
evaluated_log = to_forward

if matcher.match(evaluated_log):
if self._matcher.match(evaluated_log):
if to_forward is None:
logs_to_forward.append(dump_event(log))
else:
logs_to_forward.append(to_forward)

batcher = DatadogBatcher(512 * 1000, 4 * 1000 * 1000, 400)
cli = DatadogHTTPClient(
DD_URL, DD_PORT, DD_NO_SSL, DD_SKIP_SSL_VALIDATION, DD_API_KEY, scrubber
DD_URL,
DD_PORT,
DD_NO_SSL,
DD_SKIP_SSL_VALIDATION,
DD_API_KEY,
self._scrubber,
)

failed_logs = []
with DatadogClient(cli) as client:
for batch in batcher.batch(logs_to_forward):
for batch in self._batcher.batch(logs_to_forward):
try:
client.send(batch)
except Exception as e:
Expand All @@ -126,10 +134,10 @@ def _forward_logs(self, logs, key=None):
if key:
self.storage.delete_data(key)

if DD_STORE_FAILED_EVENTS and len(failed_logs) > 0 and not key:
if DD_STORE_FAILED_EVENTS and failed_logs and not key:
self.storage.store_data(RetryPrefix.LOGS, failed_logs)

if len(failed_logs) > 0:
if failed_logs:
send_event_metric("logs_failed", failed_logs)

send_event_metric("logs_forwarded", len(logs_to_forward) - len(failed_logs))
Expand Down Expand Up @@ -157,16 +165,16 @@ def _forward_metrics(self, metrics, key=None):
if key:
self.storage.delete_data(key)

if DD_STORE_FAILED_EVENTS and len(failed_metrics) > 0 and not key:
if DD_STORE_FAILED_EVENTS and failed_metrics and not key:
self.storage.store_data(RetryPrefix.METRICS, failed_metrics)

if len(failed_metrics) > 0:
if failed_metrics:
send_event_metric("metrics_failed", failed_metrics)

send_event_metric("metrics_forwarded", len(metrics) - len(failed_metrics))

def _forward_traces(self, traces, key=None):
if not len(traces) > 0:
if not traces:
return

if logger.isEnabledFor(logging.DEBUG):
Expand Down
Loading