Skip to content

Commit c2b1b40

Browse files
LorisFriedelclaude
andcommitted
perf(forwarder): reuse boto3 S3 client across SQS batch records
S3EventHandler._get_s3_client() created a new boto3.client("s3") per record. For a 10-record SQS batch that means 10 client constructions (service model parsing, session setup, connection pool init). Extract create_s3_client() as a module-level function, accept an optional s3_client parameter in S3EventHandler.__init__, and create the client once in sqs_handler() before the loop. Backward compatible: callers that omit s3_client get a client created at construction time. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b6366a3 commit c2b1b40

3 files changed

Lines changed: 19 additions & 21 deletions

File tree

aws/logs_monitoring/steps/handlers/s3_handler.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,17 @@
2727
)
2828

2929

30+
def create_s3_client():
31+
"""Create a boto3 S3 client with VPC-aware configuration when applicable."""
32+
if DD_USE_VPC:
33+
return boto3.client(
34+
"s3",
35+
os.environ["AWS_REGION"],
36+
config=botocore.config.Config(s3={"addressing_style": "path"}),
37+
)
38+
return boto3.client("s3")
39+
40+
3041
class S3EventDataStore:
3142
def __init__(self):
3243
self.bucket = None
@@ -37,14 +48,15 @@ def __init__(self):
3748

3849

3950
class S3EventHandler:
40-
def __init__(self, context, metadata, cache_layer):
51+
def __init__(self, context, metadata, cache_layer, s3_client=None):
4152
self.logger = logging.getLogger()
4253
self.logger.setLevel(
4354
logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())
4455
)
4556
self.context = context
4657
self.metadata = metadata
4758
self.cache_layer = cache_layer
59+
self._s3_client = s3_client or create_s3_client()
4860
self.multiline_regex_start_pattern = (
4961
re.compile("^{}".format(DD_MULTILINE_LOG_REGEX_PATTERN))
5062
if DD_MULTILINE_LOG_REGEX_PATTERN
@@ -120,26 +132,12 @@ def _add_s3_tags_from_cache(self):
120132
)
121133

122134
def _extract_data(self):
123-
s3_client = self._get_s3_client()
124-
response = s3_client.get_object(
135+
response = self._s3_client.get_object(
125136
Bucket=self.data_store.bucket, Key=self.data_store.key
126137
)
127138
body = response.get("Body")
128139
self.data_store.data = body.read()
129140

130-
def _get_s3_client(self):
131-
# Need to use path style to access s3 via VPC Endpoints
132-
# https://github.com/gford1000-aws/lambda_s3_access_using_vpc_endpoint#boto3-specific-notes
133-
if DD_USE_VPC:
134-
s3 = boto3.client(
135-
"s3",
136-
os.environ["AWS_REGION"],
137-
config=botocore.config.Config(s3={"addressing_style": "path"}),
138-
)
139-
else:
140-
s3 = boto3.client("s3")
141-
return s3
142-
143141
def _get_structured_lines_for_s3_handler(self):
144142
self._decompress_data()
145143

aws/logs_monitoring/steps/parsing.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
)
1717
from steps.enums import AwsEventSource, AwsEventType, AwsEventTypeKeyword
1818
from steps.handlers.awslogs_handler import AwsLogsHandler
19-
from steps.handlers.s3_handler import S3EventHandler
19+
from steps.handlers.s3_handler import S3EventHandler, create_s3_client
2020
from telemetry import send_event_metric, set_forwarder_telemetry_tags
2121

2222
logger = logging.getLogger()
@@ -97,14 +97,15 @@ def parse_event_type(event):
9797

9898
# Handle S3 events delivered via SQS (S3 -> SQS or S3 -> SNS -> SQS)
9999
def sqs_handler(event, context, cache_layer):
100+
s3_client = create_s3_client()
100101
for record in event["Records"]:
101102
inner_event = _extract_inner_event_from_sqs(record)
102103
if inner_event is None:
103104
continue
104105
# Fresh metadata per SQS record: S3EventHandler mutates metadata
105106
# (DD_SOURCE, tags, service), so each record needs its own copy.
106107
metadata = generate_metadata(context)
107-
s3_handler = S3EventHandler(context, metadata, cache_layer)
108+
s3_handler = S3EventHandler(context, metadata, cache_layer, s3_client=s3_client)
108109
for log_event in s3_handler.handle(inner_event):
109110
if isinstance(log_event, dict):
110111
yield merge_dicts(log_event, metadata)

aws/logs_monitoring/tests/test_s3_handler.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,17 +232,16 @@ def test_s3_handler_with_sns(self):
232232
self.assertEqual(self.s3_handler.metadata["ddsource"], "s3")
233233

234234
@patch("caching.cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.__init__")
235-
@patch("steps.handlers.s3_handler.S3EventHandler._get_s3_client")
236235
def test_s3_tags_added_to_metadata(
237236
self,
238-
mock_get_s3_client,
239237
mock_cache_init,
240238
):
241-
mock_get_s3_client.side_effect = MagicMock()
242239
mock_cache_init.return_value = None
243240
cache_layer = CacheLayer("")
244241
cache_layer._s3_tags_cache.get = MagicMock(return_value=["s3_tag:tag_value"])
245242
self.s3_handler.cache_layer = cache_layer
243+
self.s3_handler._extract_data = MagicMock()
244+
self.s3_handler.data_store.data = b""
246245
event = {
247246
"Records": [
248247
{

0 commit comments

Comments
 (0)