Skip to content

Commit 59a387b

Browse files
authored
[AWSX] fix(logs fowarder): Skip header line on VPC flow logs parsing (#1044)
* [AWSX] fix(logs fowarder): Skip header line on VPC flow logs parsing * black
1 parent 0861c5a commit 59a387b

3 files changed

Lines changed: 47 additions & 3 deletions

File tree

aws/logs_monitoring/steps/common.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ def is_cloudtrail(key):
5656
return bool(match)
5757

5858

59+
def is_vpc_flowlog(key):
60+
return "vpcflowlogs" in key
61+
62+
5963
def find_cloudwatch_source(log_group):
6064
for prefix in AwsCwEventSourcePrefix:
6165
if log_group.startswith(str(prefix)):

aws/logs_monitoring/steps/handlers/s3_handler.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,13 @@
1818
DD_USE_VPC,
1919
GOV_STRING,
2020
)
21-
from steps.common import add_service_tag, is_cloudtrail, merge_dicts, parse_event_source
21+
from steps.common import (
22+
add_service_tag,
23+
is_cloudtrail,
24+
is_vpc_flowlog,
25+
merge_dicts,
26+
parse_event_source,
27+
)
2228

2329

2430
class S3EventDataStore:
@@ -63,6 +69,7 @@ def handle(self, event):
6369
add_service_tag(self.metadata)
6470

6571
self._extract_data()
72+
6673
yield from self._get_structured_lines_for_s3_handler()
6774

6875
def _extract_event(self, event):
@@ -178,6 +185,9 @@ def _extract_cloudtrail_logs(self):
178185
self.logger.debug("Unable to parse cloudtrail log: %s" % e)
179186

180187
def _extract_other_logs(self):
188+
# VPC flow logs have a header line that should be skipped
189+
skip_first_line = is_vpc_flowlog(self.data_store.key)
190+
181191
# Check if using multiline log regex pattern
182192
# and determine whether line or pattern separated logs
183193
if self.multiline_regex_start_pattern and self.multiline_regex_pattern:
@@ -197,7 +207,9 @@ def _extract_other_logs(self):
197207
)
198208
self.data_store.data = self.data_store.data.splitlines()
199209

200-
for line in self.data_store.data:
210+
for i, line in enumerate(self.data_store.data):
211+
if skip_first_line and i == 0:
212+
continue
201213
yield self._format_event(line)
202214

203215
else:
@@ -206,7 +218,10 @@ def _extract_other_logs(self):
206218
#
207219
# https://docs.python.org/3/library/stdtypes.html#str.splitlines
208220
# https://docs.python.org/3/library/stdtypes.html#bytes.splitlines
209-
for line in self.data_store.data.splitlines():
221+
for i, line in enumerate(self.data_store.data.splitlines()):
222+
if skip_first_line and i == 0:
223+
continue
224+
210225
line = line.decode("utf-8", errors="ignore").strip()
211226
if len(line) == 0:
212227
continue

aws/logs_monitoring/tests/test_s3_handler.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,31 @@ def test_set_source_cloudfront(self):
300300
"s3",
301301
)
302302

303+
def test_vpc_flowlog_skips_header_line(self):
304+
"""Test that VPC flow logs skip the first header line"""
305+
key = "AWSLogs/123456789012/vpcflowlogs/us-east-1/2024/01/01/123456789012_vpcflowlogs_us-east-1_fl-abc123.log.gz"
306+
source = "vpc"
307+
data = (
308+
"version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status\n"
309+
"2 123456789012 eni-abc123 10.0.0.1 10.0.0.2 443 49152 6 10 840 1620000000 1620000060 ACCEPT OK\n"
310+
"2 123456789012 eni-abc123 10.0.0.2 10.0.0.1 49152 443 6 8 640 1620000000 1620000060 ACCEPT OK"
311+
)
312+
structured_lines = self.parse_lines(data, key, source)
313+
314+
self.assertEqual(len(structured_lines), 2)
315+
self.assertIn("10.0.0.1", structured_lines[0]["message"])
316+
self.assertNotIn("version account-id", structured_lines[0]["message"])
317+
318+
def test_non_vpc_flowlog_includes_first_line(self):
319+
"""Test that non-VPC flow logs include all lines"""
320+
key = "AWSLogs/123456789012/elasticloadbalancing/us-east-1/2024/01/01/log.gz"
321+
source = "elb"
322+
data = "first line of data\n" "second line of data\n" "third line of data"
323+
structured_lines = self.parse_lines(data, key, source)
324+
325+
self.assertEqual(len(structured_lines), 3)
326+
self.assertEqual(structured_lines[0]["message"], "first line of data")
327+
303328

304329
if __name__ == "__main__":
305330
unittest.main()

0 commit comments

Comments
 (0)