From d83292d49b926778c99de8d38a41395b7b5d15a5 Mon Sep 17 00:00:00 2001 From: Georgi Date: Wed, 14 Jan 2026 10:43:10 +0100 Subject: [PATCH 1/2] [AWSX] fix(logs fowarder): Skip header line on VPC flow logs parsing --- aws/logs_monitoring/steps/common.py | 3 ++ .../steps/handlers/s3_handler.py | 15 +++++++-- aws/logs_monitoring/tests/test_s3_handler.py | 31 +++++++++++++++++++ 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/aws/logs_monitoring/steps/common.py b/aws/logs_monitoring/steps/common.py index f6247e2ce..c50366c31 100644 --- a/aws/logs_monitoring/steps/common.py +++ b/aws/logs_monitoring/steps/common.py @@ -55,6 +55,9 @@ def is_cloudtrail(key): match = CLOUDTRAIL_REGEX.search(key) return bool(match) +def is_vpc_flowlog(key): + return "vpcflowlogs" in key + def find_cloudwatch_source(log_group): for prefix in AwsCwEventSourcePrefix: diff --git a/aws/logs_monitoring/steps/handlers/s3_handler.py b/aws/logs_monitoring/steps/handlers/s3_handler.py index 39ac2b0d5..bda5ee18f 100644 --- a/aws/logs_monitoring/steps/handlers/s3_handler.py +++ b/aws/logs_monitoring/steps/handlers/s3_handler.py @@ -18,7 +18,7 @@ DD_USE_VPC, GOV_STRING, ) -from steps.common import add_service_tag, is_cloudtrail, merge_dicts, parse_event_source +from steps.common import add_service_tag, is_cloudtrail, is_vpc_flowlog, merge_dicts, parse_event_source class S3EventDataStore: @@ -63,6 +63,7 @@ def handle(self, event): add_service_tag(self.metadata) self._extract_data() + yield from self._get_structured_lines_for_s3_handler() def _extract_event(self, event): @@ -178,6 +179,9 @@ def _extract_cloudtrail_logs(self): self.logger.debug("Unable to parse cloudtrail log: %s" % e) def _extract_other_logs(self): + # VPC flow logs have a header line that should be skipped + skip_first_line = is_vpc_flowlog(self.data_store.key) + # Check if using multiline log regex pattern # and determine whether line or pattern separated logs if self.multiline_regex_start_pattern and self.multiline_regex_pattern: @@ -197,7 +201,9 @@ def _extract_other_logs(self): ) self.data_store.data = self.data_store.data.splitlines() - for line in self.data_store.data: + for i, line in enumerate(self.data_store.data): + if skip_first_line and i == 0: + continue yield self._format_event(line) else: @@ -206,7 +212,10 @@ def _extract_other_logs(self): # # https://docs.python.org/3/library/stdtypes.html#str.splitlines # https://docs.python.org/3/library/stdtypes.html#bytes.splitlines - for line in self.data_store.data.splitlines(): + for i, line in enumerate(self.data_store.data.splitlines()): + if skip_first_line and i == 0: + continue + line = line.decode("utf-8", errors="ignore").strip() if len(line) == 0: continue diff --git a/aws/logs_monitoring/tests/test_s3_handler.py b/aws/logs_monitoring/tests/test_s3_handler.py index d0daecddd..9981d5d17 100644 --- a/aws/logs_monitoring/tests/test_s3_handler.py +++ b/aws/logs_monitoring/tests/test_s3_handler.py @@ -300,6 +300,37 @@ def test_set_source_cloudfront(self): "s3", ) + def test_vpc_flowlog_skips_header_line(self): + """Test that VPC flow logs skip the first header line""" + key = "AWSLogs/123456789012/vpcflowlogs/us-east-1/2024/01/01/123456789012_vpcflowlogs_us-east-1_fl-abc123.log.gz" + source = "vpc" + data = ( + "version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status\n" + "2 123456789012 eni-abc123 10.0.0.1 10.0.0.2 443 49152 6 10 840 1620000000 1620000060 ACCEPT OK\n" + "2 123456789012 eni-abc123 10.0.0.2 10.0.0.1 49152 443 6 8 640 1620000000 1620000060 ACCEPT OK" + ) + structured_lines = self.parse_lines(data, key, source) + + + self.assertEqual(len(structured_lines), 2) + self.assertIn("10.0.0.1", structured_lines[0]["message"]) + self.assertNotIn("version account-id", structured_lines[0]["message"]) + + def test_non_vpc_flowlog_includes_first_line(self): + """Test that non-VPC flow logs include all lines""" + key = "AWSLogs/123456789012/elasticloadbalancing/us-east-1/2024/01/01/log.gz" + source = "elb" + data = ( + "first line of data\n" + "second line of data\n" + "third line of data" + ) + structured_lines = self.parse_lines(data, key, source) + + + self.assertEqual(len(structured_lines), 3) + self.assertEqual(structured_lines[0]["message"], "first line of data") + if __name__ == "__main__": unittest.main() From d2bdbc1bad566a3d6e98ec5d94e74c66f89c0b35 Mon Sep 17 00:00:00 2001 From: Georgi Date: Wed, 14 Jan 2026 10:44:56 +0100 Subject: [PATCH 2/2] black --- aws/logs_monitoring/steps/common.py | 1 + aws/logs_monitoring/steps/handlers/s3_handler.py | 10 ++++++++-- aws/logs_monitoring/tests/test_s3_handler.py | 8 +------- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/aws/logs_monitoring/steps/common.py b/aws/logs_monitoring/steps/common.py index c50366c31..91c5045e2 100644 --- a/aws/logs_monitoring/steps/common.py +++ b/aws/logs_monitoring/steps/common.py @@ -55,6 +55,7 @@ def is_cloudtrail(key): match = CLOUDTRAIL_REGEX.search(key) return bool(match) + def is_vpc_flowlog(key): return "vpcflowlogs" in key diff --git a/aws/logs_monitoring/steps/handlers/s3_handler.py b/aws/logs_monitoring/steps/handlers/s3_handler.py index bda5ee18f..be40c0e68 100644 --- a/aws/logs_monitoring/steps/handlers/s3_handler.py +++ b/aws/logs_monitoring/steps/handlers/s3_handler.py @@ -18,7 +18,13 @@ DD_USE_VPC, GOV_STRING, ) -from steps.common import add_service_tag, is_cloudtrail, is_vpc_flowlog, merge_dicts, parse_event_source +from steps.common import ( + add_service_tag, + is_cloudtrail, + is_vpc_flowlog, + merge_dicts, + parse_event_source, +) class S3EventDataStore: @@ -63,7 +69,7 @@ def handle(self, event): add_service_tag(self.metadata) self._extract_data() - + yield from self._get_structured_lines_for_s3_handler() def _extract_event(self, event): diff --git a/aws/logs_monitoring/tests/test_s3_handler.py b/aws/logs_monitoring/tests/test_s3_handler.py index 9981d5d17..a3360d276 100644 --- a/aws/logs_monitoring/tests/test_s3_handler.py +++ b/aws/logs_monitoring/tests/test_s3_handler.py @@ -311,7 +311,6 @@ def test_vpc_flowlog_skips_header_line(self): ) structured_lines = self.parse_lines(data, key, source) - self.assertEqual(len(structured_lines), 2) self.assertIn("10.0.0.1", structured_lines[0]["message"]) self.assertNotIn("version account-id", structured_lines[0]["message"]) @@ -320,14 +319,9 @@ def test_non_vpc_flowlog_includes_first_line(self): """Test that non-VPC flow logs include all lines""" key = "AWSLogs/123456789012/elasticloadbalancing/us-east-1/2024/01/01/log.gz" source = "elb" - data = ( - "first line of data\n" - "second line of data\n" - "third line of data" - ) + data = "first line of data\n" "second line of data\n" "third line of data" structured_lines = self.parse_lines(data, key, source) - self.assertEqual(len(structured_lines), 3) self.assertEqual(structured_lines[0]["message"], "first line of data")