diff --git a/flowlogs_reader/flowlogs_reader.py b/flowlogs_reader/flowlogs_reader.py index b40afae..4230655 100644 --- a/flowlogs_reader/flowlogs_reader.py +++ b/flowlogs_reader/flowlogs_reader.py @@ -112,6 +112,16 @@ class FlowRecord: 'packets_lost_blackhole', 'packets_lost_mtu_exceeded', 'packets_lost_ttl_expired', + 'ecs_task_id', + 'ecs_task_arn', + 'ecs_task_definition_arn', + 'ecs_cluster_name', + 'ecs_cluster_arn', + 'ecs_container_instance_id', + 'ecs_container_instance_arn', + 'ecs_container_id', + 'ecs_second_container_id', + 'ecs_service_name', ] def __init__(self, event_data, EPOCH_32_MAX=2147483647): @@ -180,6 +190,16 @@ def __init__(self, event_data, EPOCH_32_MAX=2147483647): ('packets_lost_blackhole', int), ('packets_lost_mtu_exceeded', int), ('packets_lost_ttl_expired', int), + ('ecs_task_id', str), + ('ecs_task_arn', str), + ('ecs_task_definition_arn', str), + ('ecs_cluster_name', str), + ('ecs_cluster_arn', str), + ('ecs_container_instance_id', str), + ('ecs_container_instance_arn', str), + ('ecs_container_id', str), + ('ecs_second_container_id', str), + ('ecs_service_name', str), ): value = event_data.get(key, '-') if value == '-' or value == 'None' or value is None: @@ -249,7 +269,7 @@ def __init__( start_time=None, end_time=None, boto_client=None, - raise_on_error=False + raise_on_error=False, ): self.region_name = region_name if boto_client is not None: @@ -392,15 +412,15 @@ def _reader(self): func = lambda x: list(self._read_streams(x)) for events in executor.map(func, all_streams): for event in events: - try: + try: yield FlowRecord.from_cwl_event(event, self.fields) except Exception: self.skipped_records += 1 if self.raise_on_error: - raise + raise else: for event in self._read_streams(): - try: + try: yield FlowRecord.from_cwl_event(event, self.fields) except Exception: self.skipped_records += 1 @@ -408,7 +428,6 @@ def _reader(self): raise - class S3FlowLogsReader(BaseReader): def __init__( self, @@ -542,10 +561,9 @@ def _read_streams(self): def _reader(self): for event_data in self._read_streams(): - try: + try: yield FlowRecord(event_data) except Exception: self.skipped_records += 1 if self.raise_on_error: raise - diff --git a/tests/test_flowlogs_reader.py b/tests/test_flowlogs_reader.py index 5f2c8d5..045f6cf 100644 --- a/tests/test_flowlogs_reader.py +++ b/tests/test_flowlogs_reader.py @@ -108,6 +108,29 @@ '00000006 00000007 00000008 00000009 00000010 00000011 00000012 13 14 15 ' '16\n' ) +V7_FILE = ( + 'ecs_task_id ' + 'ecs_task_arn ' + 'ecs_task_definition_arn ' + 'ecs_cluster_name ' + 'ecs_cluster_arn ' + 'ecs_container_instance_id ' + 'ecs_container_instance_arn ' + 'ecs_container_id ' + 'ecs_second_container_id ' + 'ecs_service_name\n' + '6c8cdac4c319 ' + 'arn:aws:ecs:us-east-1:000000000000:task/OBSRVBL-ECS/6c8cdac4c319 ' + 'arn:aws:ecs:us-east-1:000000000000:task-definition/onadef:2 ' + 'OBSRVBL-ECS ' + 'arn:aws:ecs:us-east-1:000000000000:cluster/OBSRVBL-ECS ' + '6cc25ce996cf ' + 'arn:aws:ecs:us-east-1:000000000000:container-instance/OBSRVBL-ECS' + '/6cc25ce996cf ' + 'f33bbef0bd73 ' + '83d172ddeb44 ' + 'ona\n' +) PARQUET_FILE = 'tests/data/flows.parquet' @@ -799,6 +822,38 @@ def test_serial_v6(self): reader.compressed_bytes_processed, len(compress(V6_FILE.encode())) ) + def test_serial_v7(self): + expected = [ + { + 'ecs_task_id': '6c8cdac4c319', + 'ecs_task_arn': ( + 'arn:aws:ecs:us-east-1:000000000000:task/OBSRVBL-ECS' + '/6c8cdac4c319' + ), + 'ecs_task_definition_arn': ( + 'arn:aws:ecs:us-east-1:000000000000:task-definition' + '/onadef:2' + ), + 'ecs_cluster_name': 'OBSRVBL-ECS', + 'ecs_cluster_arn': ( + 'arn:aws:ecs:us-east-1:000000000000:cluster/OBSRVBL-ECS' + ), + 'ecs_container_instance_id': '6cc25ce996cf', + 'ecs_container_instance_arn': ( + 'arn:aws:ecs:us-east-1:000000000000:container-instance' + '/OBSRVBL-ECS/6cc25ce996cf' + ), + 'ecs_container_id': 'f33bbef0bd73', + 'ecs_second_container_id': '83d172ddeb44', + 'ecs_service_name': 'ona', + }, + ] + reader = self._test_iteration(V7_FILE, expected) + self.assertEqual(reader.bytes_processed, len(V7_FILE.encode())) + self.assertEqual( + reader.compressed_bytes_processed, len(compress(V7_FILE.encode())) + ) + def _test_parquet_reader(self, data, expected): boto_client = boto3.client('s3') with Stubber(boto_client) as stubbed_client: diff --git a/tests/test_main.py b/tests/test_main.py index 4527cc9..49d8e54 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -27,27 +27,32 @@ ( '2 123456789010 eni-102010ab 198.51.100.1 192.0.2.1 ' '443 49152 6 10 840 1439387263 1439387264 ACCEPT OK ' - '- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -' + '- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - ' + '- - - - - - - - - -' ), ( '2 123456789010 eni-102010ab 192.0.2.1 198.51.100.1 ' '49152 443 6 20 1680 1439387264 1439387265 ACCEPT OK ' - '- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -' + '- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - ' + '- - - - - - - - - -' ), ( '2 123456789010 eni-102010ab 192.0.2.1 198.51.100.2 ' '49152 443 6 20 1680 1439387265 1439387266 REJECT OK ' - '- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -' + '- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - ' + '- - - - - - - - - -' ), ( '2 123456789010 eni-1a2b3c4d - - - - - - - ' '1431280876 1431280934 - NODATA ' - '- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -' + '- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - ' + '- - - - - - - - - -' ), ( '2 123456789010 eni-4b118871 - - - - - - - ' '1431280876 1431280934 - SKIPDATA ' - '- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -' + '- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - ' + '- - - - - - - - - -' ), ] SAMPLE_RECORDS = [