diff --git a/aws/logs_monitoring/forwarder.py b/aws/logs_monitoring/forwarder.py index 5cc1a02f3..015a55e48 100644 --- a/aws/logs_monitoring/forwarder.py +++ b/aws/logs_monitoring/forwarder.py @@ -36,12 +36,21 @@ logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())) -class Forwarder(object): +class Forwarder: def __init__(self, function_prefix): self.trace_connection = TraceConnection( DD_TRACE_INTAKE_URL, DD_API_KEY, DD_SKIP_SSL_VALIDATION ) self.storage = create_storage(function_prefix) + self._scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS) + self._matcher = DatadogMatcher( + include_pattern=INCLUDE_AT_MATCH, exclude_pattern=EXCLUDE_AT_MATCH + ) + self._batcher = DatadogBatcher( + max_item_size_bytes=512 * 1000, + max_batch_size_bytes=4 * 1000 * 1000, + max_items_count=400, + ) def forward(self, logs, metrics, traces): """ @@ -81,11 +90,6 @@ def _forward_logs(self, logs, key=None): if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Forwarding {len(logs)} logs") - scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS) - matcher = DatadogMatcher( - include_pattern=INCLUDE_AT_MATCH, exclude_pattern=EXCLUDE_AT_MATCH - ) - logs_to_forward = [] for log in logs: if key: @@ -101,20 +105,24 @@ def _forward_logs(self, logs, key=None): to_forward = dump_event(log) evaluated_log = to_forward - if matcher.match(evaluated_log): + if self._matcher.match(evaluated_log): if to_forward is None: logs_to_forward.append(dump_event(log)) else: logs_to_forward.append(to_forward) - batcher = DatadogBatcher(512 * 1000, 4 * 1000 * 1000, 400) cli = DatadogHTTPClient( - DD_URL, DD_PORT, DD_NO_SSL, DD_SKIP_SSL_VALIDATION, DD_API_KEY, scrubber + DD_URL, + DD_PORT, + DD_NO_SSL, + DD_SKIP_SSL_VALIDATION, + DD_API_KEY, + self._scrubber, ) failed_logs = [] with DatadogClient(cli) as client: - for batch in batcher.batch(logs_to_forward): + for batch in self._batcher.batch(logs_to_forward): try: client.send(batch) except Exception as e: @@ -126,10 +134,10 @@ def _forward_logs(self, logs, key=None): if key: self.storage.delete_data(key) - if DD_STORE_FAILED_EVENTS and len(failed_logs) > 0 and not key: + if DD_STORE_FAILED_EVENTS and failed_logs and not key: self.storage.store_data(RetryPrefix.LOGS, failed_logs) - if len(failed_logs) > 0: + if failed_logs: send_event_metric("logs_failed", failed_logs) send_event_metric("logs_forwarded", len(logs_to_forward) - len(failed_logs)) @@ -157,16 +165,16 @@ def _forward_metrics(self, metrics, key=None): if key: self.storage.delete_data(key) - if DD_STORE_FAILED_EVENTS and len(failed_metrics) > 0 and not key: + if DD_STORE_FAILED_EVENTS and failed_metrics and not key: self.storage.store_data(RetryPrefix.METRICS, failed_metrics) - if len(failed_metrics) > 0: + if failed_metrics: send_event_metric("metrics_failed", failed_metrics) send_event_metric("metrics_forwarded", len(metrics) - len(failed_metrics)) def _forward_traces(self, traces, key=None): - if not len(traces) > 0: + if not traces: return if logger.isEnabledFor(logging.DEBUG):