diff --git a/aws/logs_monitoring/steps/common.py b/aws/logs_monitoring/steps/common.py index f6247e2ce..91c5045e2 100644 --- a/aws/logs_monitoring/steps/common.py +++ b/aws/logs_monitoring/steps/common.py @@ -56,6 +56,10 @@ def is_cloudtrail(key): return bool(match) +def is_vpc_flowlog(key): + return "vpcflowlogs" in key + + def find_cloudwatch_source(log_group): for prefix in AwsCwEventSourcePrefix: if log_group.startswith(str(prefix)): diff --git a/aws/logs_monitoring/steps/handlers/s3_handler.py b/aws/logs_monitoring/steps/handlers/s3_handler.py index 39ac2b0d5..be40c0e68 100644 --- a/aws/logs_monitoring/steps/handlers/s3_handler.py +++ b/aws/logs_monitoring/steps/handlers/s3_handler.py @@ -18,7 +18,13 @@ DD_USE_VPC, GOV_STRING, ) -from steps.common import add_service_tag, is_cloudtrail, merge_dicts, parse_event_source +from steps.common import ( + add_service_tag, + is_cloudtrail, + is_vpc_flowlog, + merge_dicts, + parse_event_source, +) class S3EventDataStore: @@ -63,6 +69,7 @@ def handle(self, event): add_service_tag(self.metadata) self._extract_data() + yield from self._get_structured_lines_for_s3_handler() def _extract_event(self, event): @@ -178,6 +185,9 @@ def _extract_cloudtrail_logs(self): self.logger.debug("Unable to parse cloudtrail log: %s" % e) def _extract_other_logs(self): + # VPC flow logs have a header line that should be skipped + skip_first_line = is_vpc_flowlog(self.data_store.key) + # Check if using multiline log regex pattern # and determine whether line or pattern separated logs if self.multiline_regex_start_pattern and self.multiline_regex_pattern: @@ -197,7 +207,9 @@ def _extract_other_logs(self): ) self.data_store.data = self.data_store.data.splitlines() - for line in self.data_store.data: + for i, line in enumerate(self.data_store.data): + if skip_first_line and i == 0: + continue yield self._format_event(line) else: @@ -206,7 +218,10 @@ def _extract_other_logs(self): # # https://docs.python.org/3/library/stdtypes.html#str.splitlines # https://docs.python.org/3/library/stdtypes.html#bytes.splitlines - for line in self.data_store.data.splitlines(): + for i, line in enumerate(self.data_store.data.splitlines()): + if skip_first_line and i == 0: + continue + line = line.decode("utf-8", errors="ignore").strip() if len(line) == 0: continue diff --git a/aws/logs_monitoring/tests/test_s3_handler.py b/aws/logs_monitoring/tests/test_s3_handler.py index d0daecddd..a3360d276 100644 --- a/aws/logs_monitoring/tests/test_s3_handler.py +++ b/aws/logs_monitoring/tests/test_s3_handler.py @@ -300,6 +300,31 @@ def test_set_source_cloudfront(self): "s3", ) + def test_vpc_flowlog_skips_header_line(self): + """Test that VPC flow logs skip the first header line""" + key = "AWSLogs/123456789012/vpcflowlogs/us-east-1/2024/01/01/123456789012_vpcflowlogs_us-east-1_fl-abc123.log.gz" + source = "vpc" + data = ( + "version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status\n" + "2 123456789012 eni-abc123 10.0.0.1 10.0.0.2 443 49152 6 10 840 1620000000 1620000060 ACCEPT OK\n" + "2 123456789012 eni-abc123 10.0.0.2 10.0.0.1 49152 443 6 8 640 1620000000 1620000060 ACCEPT OK" + ) + structured_lines = self.parse_lines(data, key, source) + + self.assertEqual(len(structured_lines), 2) + self.assertIn("10.0.0.1", structured_lines[0]["message"]) + self.assertNotIn("version account-id", structured_lines[0]["message"]) + + def test_non_vpc_flowlog_includes_first_line(self): + """Test that non-VPC flow logs include all lines""" + key = "AWSLogs/123456789012/elasticloadbalancing/us-east-1/2024/01/01/log.gz" + source = "elb" + data = "first line of data\n" "second line of data\n" "third line of data" + structured_lines = self.parse_lines(data, key, source) + + self.assertEqual(len(structured_lines), 3) + self.assertEqual(structured_lines[0]["message"], "first line of data") + if __name__ == "__main__": unittest.main()