Skip to content

Commit 74c386d

Browse files
LorisFriedelclaude
andauthored
perf(forwarder): reuse boto3 S3 client across SQS batch records (#1079)
S3EventHandler._get_s3_client() created a new boto3.client("s3") per record. For a 10-record SQS batch that means 10 client constructions (service model parsing, session setup, connection pool init). Extract create_s3_client() as a module-level function, accept an optional s3_client parameter in S3EventHandler.__init__, and create the client once in sqs_handler() before the loop. Backward compatible: callers that omit s3_client get a client created at construction time. Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7694bbe commit 74c386d

3 files changed

Lines changed: 19 additions & 21 deletions

File tree

aws/logs_monitoring/steps/handlers/s3_handler.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,17 @@
3838
)
3939

4040

41+
def create_s3_client():
42+
"""Create a boto3 S3 client with VPC-aware configuration when applicable."""
43+
if DD_USE_VPC:
44+
return boto3.client(
45+
"s3",
46+
os.environ["AWS_REGION"],
47+
config=botocore.config.Config(s3={"addressing_style": "path"}),
48+
)
49+
return boto3.client("s3")
50+
51+
4152
class S3EventDataStore:
4253
def __init__(self):
4354
self.bucket = None
@@ -48,14 +59,15 @@ def __init__(self):
4859

4960

5061
class S3EventHandler:
51-
def __init__(self, context, metadata, cache_layer):
62+
def __init__(self, context, metadata, cache_layer, s3_client=None):
5263
self.logger = logging.getLogger()
5364
self.logger.setLevel(
5465
logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())
5566
)
5667
self.context = context
5768
self.metadata = metadata
5869
self.cache_layer = cache_layer
70+
self._s3_client = s3_client or create_s3_client()
5971
self.multiline_regex_start_pattern = _MULTILINE_REGEX_START_PATTERN
6072
self.multiline_regex_pattern = _MULTILINE_REGEX_PATTERN
6173
self.data_store = S3EventDataStore()
@@ -122,26 +134,12 @@ def _add_s3_tags_from_cache(self):
122134
)
123135

124136
def _extract_data(self):
125-
s3_client = self._get_s3_client()
126-
response = s3_client.get_object(
137+
response = self._s3_client.get_object(
127138
Bucket=self.data_store.bucket, Key=self.data_store.key
128139
)
129140
body = response.get("Body")
130141
self.data_store.data = body.read()
131142

132-
def _get_s3_client(self):
133-
# Need to use path style to access s3 via VPC Endpoints
134-
# https://github.com/gford1000-aws/lambda_s3_access_using_vpc_endpoint#boto3-specific-notes
135-
if DD_USE_VPC:
136-
s3 = boto3.client(
137-
"s3",
138-
os.environ["AWS_REGION"],
139-
config=botocore.config.Config(s3={"addressing_style": "path"}),
140-
)
141-
else:
142-
s3 = boto3.client("s3")
143-
return s3
144-
145143
def _get_structured_lines_for_s3_handler(self):
146144
self._decompress_data()
147145

aws/logs_monitoring/steps/parsing.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
)
1717
from steps.enums import AwsEventSource, AwsEventType, AwsEventTypeKeyword
1818
from steps.handlers.awslogs_handler import AwsLogsHandler
19-
from steps.handlers.s3_handler import S3EventHandler
19+
from steps.handlers.s3_handler import S3EventHandler, create_s3_client
2020
from telemetry import send_event_metric, set_forwarder_telemetry_tags
2121

2222
logger = logging.getLogger()
@@ -97,14 +97,15 @@ def parse_event_type(event):
9797

9898
# Handle S3 events delivered via SQS (S3 -> SQS or S3 -> SNS -> SQS)
9999
def sqs_handler(event, context, cache_layer):
100+
s3_client = create_s3_client()
100101
for record in event["Records"]:
101102
inner_event = _extract_inner_event_from_sqs(record)
102103
if inner_event is None:
103104
continue
104105
# Fresh metadata per SQS record: S3EventHandler mutates metadata
105106
# (DD_SOURCE, tags, service), so each record needs its own copy.
106107
metadata = generate_metadata(context)
107-
s3_handler = S3EventHandler(context, metadata, cache_layer)
108+
s3_handler = S3EventHandler(context, metadata, cache_layer, s3_client=s3_client)
108109
for log_event in s3_handler.handle(inner_event):
109110
if isinstance(log_event, dict):
110111
yield merge_dicts(log_event, metadata)

aws/logs_monitoring/tests/test_s3_handler.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,17 +232,16 @@ def test_s3_handler_with_sns(self):
232232
self.assertEqual(self.s3_handler.metadata["ddsource"], "s3")
233233

234234
@patch("caching.cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.__init__")
235-
@patch("steps.handlers.s3_handler.S3EventHandler._get_s3_client")
236235
def test_s3_tags_added_to_metadata(
237236
self,
238-
mock_get_s3_client,
239237
mock_cache_init,
240238
):
241-
mock_get_s3_client.side_effect = MagicMock()
242239
mock_cache_init.return_value = None
243240
cache_layer = CacheLayer("")
244241
cache_layer._s3_tags_cache.get = MagicMock(return_value=["s3_tag:tag_value"])
245242
self.s3_handler.cache_layer = cache_layer
243+
self.s3_handler._extract_data = MagicMock()
244+
self.s3_handler.data_store.data = b""
246245
event = {
247246
"Records": [
248247
{

0 commit comments

Comments
 (0)