From 77e68d57084b8c042531efebc90c935b782fec1c Mon Sep 17 00:00:00 2001 From: Loris Friedel Date: Mon, 16 Mar 2026 11:27:52 +0100 Subject: [PATCH] perf(forwarder): reuse boto3 S3 client across SQS batch records S3EventHandler._get_s3_client() created a new boto3.client("s3") per record. For a 10-record SQS batch that means 10 client constructions (service model parsing, session setup, connection pool init). Extract create_s3_client() as a module-level function, accept an optional s3_client parameter in S3EventHandler.__init__, and create the client once in sqs_handler() before the loop. Backward compatible: callers that omit s3_client get a client created at construction time. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../steps/handlers/s3_handler.py | 30 +++++++++---------- aws/logs_monitoring/steps/parsing.py | 5 ++-- aws/logs_monitoring/tests/test_s3_handler.py | 5 ++-- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/aws/logs_monitoring/steps/handlers/s3_handler.py b/aws/logs_monitoring/steps/handlers/s3_handler.py index 63b963b4c..488452624 100644 --- a/aws/logs_monitoring/steps/handlers/s3_handler.py +++ b/aws/logs_monitoring/steps/handlers/s3_handler.py @@ -38,6 +38,17 @@ ) +def create_s3_client(): + """Create a boto3 S3 client with VPC-aware configuration when applicable.""" + if DD_USE_VPC: + return boto3.client( + "s3", + os.environ["AWS_REGION"], + config=botocore.config.Config(s3={"addressing_style": "path"}), + ) + return boto3.client("s3") + + class S3EventDataStore: def __init__(self): self.bucket = None @@ -48,7 +59,7 @@ def __init__(self): class S3EventHandler: - def __init__(self, context, metadata, cache_layer): + def __init__(self, context, metadata, cache_layer, s3_client=None): self.logger = logging.getLogger() self.logger.setLevel( logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()) @@ -56,6 +67,7 @@ def __init__(self, context, metadata, cache_layer): self.context = context self.metadata = metadata self.cache_layer = cache_layer + self._s3_client = s3_client or create_s3_client() self.multiline_regex_start_pattern = _MULTILINE_REGEX_START_PATTERN self.multiline_regex_pattern = _MULTILINE_REGEX_PATTERN self.data_store = S3EventDataStore() @@ -122,26 +134,12 @@ def _add_s3_tags_from_cache(self): ) def _extract_data(self): - s3_client = self._get_s3_client() - response = s3_client.get_object( + response = self._s3_client.get_object( Bucket=self.data_store.bucket, Key=self.data_store.key ) body = response.get("Body") self.data_store.data = body.read() - def _get_s3_client(self): - # Need to use path style to access s3 via VPC Endpoints - # https://github.com/gford1000-aws/lambda_s3_access_using_vpc_endpoint#boto3-specific-notes - if DD_USE_VPC: - s3 = boto3.client( - "s3", - os.environ["AWS_REGION"], - config=botocore.config.Config(s3={"addressing_style": "path"}), - ) - else: - s3 = boto3.client("s3") - return s3 - def _get_structured_lines_for_s3_handler(self): self._decompress_data() diff --git a/aws/logs_monitoring/steps/parsing.py b/aws/logs_monitoring/steps/parsing.py index 738fa8971..93dcb45b5 100644 --- a/aws/logs_monitoring/steps/parsing.py +++ b/aws/logs_monitoring/steps/parsing.py @@ -16,7 +16,7 @@ ) from steps.enums import AwsEventSource, AwsEventType, AwsEventTypeKeyword from steps.handlers.awslogs_handler import AwsLogsHandler -from steps.handlers.s3_handler import S3EventHandler +from steps.handlers.s3_handler import S3EventHandler, create_s3_client from telemetry import send_event_metric, set_forwarder_telemetry_tags logger = logging.getLogger() @@ -97,6 +97,7 @@ def parse_event_type(event): # Handle S3 events delivered via SQS (S3 -> SQS or S3 -> SNS -> SQS) def sqs_handler(event, context, cache_layer): + s3_client = create_s3_client() for record in event["Records"]: inner_event = _extract_inner_event_from_sqs(record) if inner_event is None: @@ -104,7 +105,7 @@ def sqs_handler(event, context, cache_layer): # Fresh metadata per SQS record: S3EventHandler mutates metadata # (DD_SOURCE, tags, service), so each record needs its own copy. metadata = generate_metadata(context) - s3_handler = S3EventHandler(context, metadata, cache_layer) + s3_handler = S3EventHandler(context, metadata, cache_layer, s3_client=s3_client) for log_event in s3_handler.handle(inner_event): if isinstance(log_event, dict): yield merge_dicts(log_event, metadata) diff --git a/aws/logs_monitoring/tests/test_s3_handler.py b/aws/logs_monitoring/tests/test_s3_handler.py index a3360d276..b19fe1fc1 100644 --- a/aws/logs_monitoring/tests/test_s3_handler.py +++ b/aws/logs_monitoring/tests/test_s3_handler.py @@ -232,17 +232,16 @@ def test_s3_handler_with_sns(self): self.assertEqual(self.s3_handler.metadata["ddsource"], "s3") @patch("caching.cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.__init__") - @patch("steps.handlers.s3_handler.S3EventHandler._get_s3_client") def test_s3_tags_added_to_metadata( self, - mock_get_s3_client, mock_cache_init, ): - mock_get_s3_client.side_effect = MagicMock() mock_cache_init.return_value = None cache_layer = CacheLayer("") cache_layer._s3_tags_cache.get = MagicMock(return_value=["s3_tag:tag_value"]) self.s3_handler.cache_layer = cache_layer + self.s3_handler._extract_data = MagicMock() + self.s3_handler.data_store.data = b"" event = { "Records": [ {