Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions flowlogs_reader/flowlogs_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,16 @@ class FlowRecord:
'packets_lost_blackhole',
'packets_lost_mtu_exceeded',
'packets_lost_ttl_expired',
'ecs_task_id',
'ecs_task_arn',
'ecs_task_definition_arn',
'ecs_cluster_name',
'ecs_cluster_arn',
'ecs_container_instance_id',
'ecs_container_instance_arn',
'ecs_container_id',
'ecs_second_container_id',
'ecs_service_name',
]

def __init__(self, event_data, EPOCH_32_MAX=2147483647):
Expand Down Expand Up @@ -180,6 +190,16 @@ def __init__(self, event_data, EPOCH_32_MAX=2147483647):
('packets_lost_blackhole', int),
('packets_lost_mtu_exceeded', int),
('packets_lost_ttl_expired', int),
('ecs_task_id', str),
('ecs_task_arn', str),
('ecs_task_definition_arn', str),
('ecs_cluster_name', str),
('ecs_cluster_arn', str),
('ecs_container_instance_id', str),
('ecs_container_instance_arn', str),
('ecs_container_id', str),
('ecs_second_container_id', str),
('ecs_service_name', str),
):
value = event_data.get(key, '-')
if value == '-' or value == 'None' or value is None:
Expand Down Expand Up @@ -249,7 +269,7 @@ def __init__(
start_time=None,
end_time=None,
boto_client=None,
raise_on_error=False
raise_on_error=False,
):
self.region_name = region_name
if boto_client is not None:
Expand Down Expand Up @@ -392,23 +412,22 @@ def _reader(self):
func = lambda x: list(self._read_streams(x))
for events in executor.map(func, all_streams):
for event in events:
try:
try:
yield FlowRecord.from_cwl_event(event, self.fields)
except Exception:
self.skipped_records += 1
if self.raise_on_error:
raise
raise
else:
for event in self._read_streams():
try:
try:
yield FlowRecord.from_cwl_event(event, self.fields)
except Exception:
self.skipped_records += 1
if self.raise_on_error:
raise



class S3FlowLogsReader(BaseReader):
def __init__(
self,
Expand Down Expand Up @@ -542,10 +561,9 @@ def _read_streams(self):

def _reader(self):
for event_data in self._read_streams():
try:
try:
yield FlowRecord(event_data)
except Exception:
self.skipped_records += 1
if self.raise_on_error:
raise

55 changes: 55 additions & 0 deletions tests/test_flowlogs_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,29 @@
'00000006 00000007 00000008 00000009 00000010 00000011 00000012 13 14 15 '
'16\n'
)
V7_FILE = (
'ecs_task_id '
'ecs_task_arn '
'ecs_task_definition_arn '
'ecs_cluster_name '
'ecs_cluster_arn '
'ecs_container_instance_id '
'ecs_container_instance_arn '
'ecs_container_id '
'ecs_second_container_id '
'ecs_service_name\n'
'6c8cdac4c319 '
'arn:aws:ecs:us-east-1:000000000000:task/OBSRVBL-ECS/6c8cdac4c319 '
'arn:aws:ecs:us-east-1:000000000000:task-definition/onadef:2 '
'OBSRVBL-ECS '
'arn:aws:ecs:us-east-1:000000000000:cluster/OBSRVBL-ECS '
'6cc25ce996cf '
'arn:aws:ecs:us-east-1:000000000000:container-instance/OBSRVBL-ECS'
'/6cc25ce996cf '
'f33bbef0bd73 '
'83d172ddeb44 '
'ona\n'
)

PARQUET_FILE = 'tests/data/flows.parquet'

Expand Down Expand Up @@ -799,6 +822,38 @@ def test_serial_v6(self):
reader.compressed_bytes_processed, len(compress(V6_FILE.encode()))
)

def test_serial_v7(self):
expected = [
{
'ecs_task_id': '6c8cdac4c319',
'ecs_task_arn': (
'arn:aws:ecs:us-east-1:000000000000:task/OBSRVBL-ECS'
'/6c8cdac4c319'
),
'ecs_task_definition_arn': (
'arn:aws:ecs:us-east-1:000000000000:task-definition'
'/onadef:2'
),
'ecs_cluster_name': 'OBSRVBL-ECS',
'ecs_cluster_arn': (
'arn:aws:ecs:us-east-1:000000000000:cluster/OBSRVBL-ECS'
),
'ecs_container_instance_id': '6cc25ce996cf',
'ecs_container_instance_arn': (
'arn:aws:ecs:us-east-1:000000000000:container-instance'
'/OBSRVBL-ECS/6cc25ce996cf'
),
'ecs_container_id': 'f33bbef0bd73',
'ecs_second_container_id': '83d172ddeb44',
'ecs_service_name': 'ona',
},
]
reader = self._test_iteration(V7_FILE, expected)
self.assertEqual(reader.bytes_processed, len(V7_FILE.encode()))
self.assertEqual(
reader.compressed_bytes_processed, len(compress(V7_FILE.encode()))
)

def _test_parquet_reader(self, data, expected):
boto_client = boto3.client('s3')
with Stubber(boto_client) as stubbed_client:
Expand Down
15 changes: 10 additions & 5 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,32 @@
(
'2 123456789010 eni-102010ab 198.51.100.1 192.0.2.1 '
'443 49152 6 10 840 1439387263 1439387264 ACCEPT OK '
'- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -'
'- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - '
'- - - - - - - - - -'
),
(
'2 123456789010 eni-102010ab 192.0.2.1 198.51.100.1 '
'49152 443 6 20 1680 1439387264 1439387265 ACCEPT OK '
'- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -'
'- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - '
'- - - - - - - - - -'
),
(
'2 123456789010 eni-102010ab 192.0.2.1 198.51.100.2 '
'49152 443 6 20 1680 1439387265 1439387266 REJECT OK '
'- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -'
'- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - '
'- - - - - - - - - -'
),
(
'2 123456789010 eni-1a2b3c4d - - - - - - - '
'1431280876 1431280934 - NODATA '
'- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -'
'- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - '
'- - - - - - - - - -'
),
(
'2 123456789010 eni-4b118871 - - - - - - - '
'1431280876 1431280934 - SKIPDATA '
'- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -'
'- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - '
'- - - - - - - - - -'
),
]
SAMPLE_RECORDS = [
Expand Down