Skip to content

Commit a4cbc2c

Browse files
committed
Extract S3 body loading from indexer lambda processor
1 parent 327b53f commit a4cbc2c

File tree

7 files changed

+197
-104
lines changed

7 files changed

+197
-104
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Copyright 2024 - 2026 Block, Inc.
2+
#
3+
# Use of this source code is governed by an MIT-style
4+
# license that can be found in the LICENSE file or at
5+
# https://opensource.org/licenses/MIT.
6+
#
7+
# frozen_string_literal: true
8+
9+
require "elastic_graph/errors"
10+
require "json"
11+
12+
module ElasticGraph
13+
module IndexerLambda
14+
# Resolves the raw body of an SQS message, including fetching offloaded payloads from S3.
15+
#
16+
# @private
17+
class SqsMessageBodyLoader
18+
S3_OFFLOADING_INDICATOR = '["software.amazon.payloadoffloading.PayloadS3Pointer"'
19+
20+
def initialize(s3_client: nil)
21+
@s3_client = s3_client
22+
end
23+
24+
# Loads the message body for the given SQS record.
25+
#
26+
# @param sqs_record [Hash] full SQS record carrying the body
27+
# @return [String] resolved SQS message body
28+
def load_body(sqs_record:)
29+
body = sqs_record.fetch("body")
30+
return body unless body.start_with?(S3_OFFLOADING_INDICATOR)
31+
32+
get_payload_from_s3(body)
33+
end
34+
35+
private
36+
37+
def get_payload_from_s3(json_string)
38+
s3_pointer = JSON.parse(json_string)[1]
39+
bucket_name = s3_pointer.fetch("s3BucketName")
40+
object_key = s3_pointer.fetch("s3Key")
41+
42+
begin
43+
s3_client.get_object(bucket: bucket_name, key: object_key).body.read
44+
rescue Aws::S3::Errors::ServiceError => e
45+
raise Errors::S3OperationFailedError, "Error reading large message from S3. bucket: `#{bucket_name}` key: `#{object_key}` message: `#{e.message}`"
46+
end
47+
end
48+
49+
# The S3 client is lazily initialized because loading the AWS SDK is relatively expensive,
50+
# and offloaded SQS messages should be uncommon.
51+
def s3_client
52+
@s3_client ||= begin
53+
require "aws-sdk-s3"
54+
Aws::S3::Client.new
55+
end
56+
end
57+
end
58+
end
59+
end

elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
require "elastic_graph/errors"
1010
require "elastic_graph/indexer/indexing_failures_error"
1111
require "elastic_graph/indexer_lambda/jsonl_decoder"
12+
require "elastic_graph/indexer_lambda/sqs_message_body_loader"
1213
require "json"
1314

1415
module ElasticGraph
@@ -20,12 +21,12 @@ class SqsProcessor
2021
# @dynamic ignore_sqs_latency_timestamps_from_arns
2122
attr_reader :ignore_sqs_latency_timestamps_from_arns
2223

23-
def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, event_payload_decoder: JSONLDecoder.new, s3_client: nil)
24+
def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, s3_client: nil, event_payload_decoder: JSONLDecoder.new, message_body_loader: SqsMessageBodyLoader.new(s3_client: s3_client))
2425
@indexer_processor = indexer_processor
2526
@logger = logger
26-
@s3_client = s3_client
2727
@ignore_sqs_latency_timestamps_from_arns = ignore_sqs_latency_timestamps_from_arns
2828
@event_payload_decoder = event_payload_decoder
29+
@message_body_loader = message_body_loader
2930
end
3031

3132
# Processes the ElasticGraph events in the given `lambda_event`, indexing the data in the datastore.
@@ -84,7 +85,7 @@ def events_from(lambda_event)
8485

8586
@event_payload_decoder.decode_events(
8687
sqs_record: record,
87-
body: body_from(record.fetch("body"))
88+
body: @message_body_loader.load_body(sqs_record: record)
8889
).map do |event|
8990
ElasticGraph::Support::HashUtil.deep_merge(event, sqs_metadata)
9091
end
@@ -96,15 +97,6 @@ def events_from(lambda_event)
9697
end
9798
end
9899

99-
S3_OFFLOADING_INDICATOR = '["software.amazon.payloadoffloading.PayloadS3Pointer"'
100-
101-
def body_from(body)
102-
if body.start_with?(S3_OFFLOADING_INDICATOR)
103-
body = get_payload_from_s3(body)
104-
end
105-
body
106-
end
107-
108100
def extract_sqs_metadata(record)
109101
sqs_timestamps = {
110102
"processing_first_attempted_at" => millis_to_iso8601(record.dig("attributes", "ApproximateFirstReceiveTimestamp")),
@@ -123,28 +115,6 @@ def millis_to_iso8601(millis)
123115
Time.at(seconds, millis, :millisecond).getutc.iso8601(3)
124116
end
125117

126-
def get_payload_from_s3(json_string)
127-
s3_pointer = JSON.parse(json_string)[1]
128-
bucket_name = s3_pointer.fetch("s3BucketName")
129-
object_key = s3_pointer.fetch("s3Key")
130-
131-
begin
132-
s3_client.get_object(bucket: bucket_name, key: object_key).body.read
133-
rescue Aws::S3::Errors::ServiceError => e
134-
raise Errors::S3OperationFailedError, "Error reading large message from S3. bucket: `#{bucket_name}` key: `#{object_key}` message: `#{e.message}`"
135-
end
136-
end
137-
138-
# The s3 client is being lazily initialized, as it's slow to import/init and it will only be used
139-
# in rare scenarios where large messages need offloaded from SQS -> S3.
140-
# See: (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html)
141-
def s3_client
142-
@s3_client ||= begin
143-
require "aws-sdk-s3"
144-
Aws::S3::Client.new
145-
end
146-
end
147-
148118
# Formats the response, including any failures, based on
149119
# https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
150120
def format_response(failures)
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
module ElasticGraph
2+
module IndexerLambda
3+
class JSONLDecoder
4+
def decode_events: (
5+
sqs_record: ::Hash[::String, untyped],
6+
body: ::String
7+
) -> ::Array[::Hash[::String, untyped]]
8+
end
9+
end
10+
end
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
module ElasticGraph
2+
module IndexerLambda
3+
class SqsMessageBodyLoader
4+
def initialize: (?s3_client: Aws::S3::Client?) -> void
5+
def load_body: (sqs_record: ::Hash[::String, untyped]) -> ::String
6+
7+
private
8+
9+
@s3_client: Aws::S3::Client?
10+
11+
S3_OFFLOADING_INDICATOR: ::String
12+
def get_payload_from_s3: (::String) -> ::String
13+
def s3_client: () -> Aws::S3::Client
14+
end
15+
end
16+
end

elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,26 @@ module ElasticGraph
55
Indexer::Processor,
66
logger: ::Logger,
77
ignore_sqs_latency_timestamps_from_arns: ::Set[::String],
8+
?event_payload_decoder: untyped,
9+
?message_body_loader: untyped,
810
?s3_client: Aws::S3::Client?,
911
) -> void
1012

11-
def process: (::Hash[::String, untyped], ?refresh_indices: bool) -> void
13+
def process: (
14+
::Hash[::String, untyped],
15+
?refresh_indices: bool
16+
) -> {"batchItemFailures" => ::Array[{"itemIdentifier" => ::String}]}
1217

1318
private
1419

1520
@indexer_processor: Indexer::Processor
1621
@logger: ::Logger
17-
@s3_client: Aws::S3::Client?
1822

1923
attr_reader ignore_sqs_latency_timestamps_from_arns: ::Set[::String]
2024

2125
def events_from: (::Hash[::String, untyped]) -> ::Array[::Hash[::String, untyped]]
22-
S3_OFFLOADING_INDICATOR: String
2326
def extract_sqs_metadata: (::Hash[String, untyped]) -> ::Hash[::String, untyped]
2427
def millis_to_iso8601: (::String) -> ::String?
25-
def parse_jsonl: (::String) -> ::Array[::Hash[::String, untyped]]
26-
def get_payload_from_s3: (::String) -> ::String
27-
def s3_client: () -> Aws::S3::Client
2828
def format_response: (
2929
::Array[Indexer::FailedEventError]
3030
) -> {"batchItemFailures" => ::Array[{"itemIdentifier" => ::String}]}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Copyright 2024 - 2026 Block, Inc.
2+
#
3+
# Use of this source code is governed by an MIT-style
4+
# license that can be found in the LICENSE file or at
5+
# https://opensource.org/licenses/MIT.
6+
#
7+
# frozen_string_literal: true
8+
9+
require "aws-sdk-s3"
10+
require "elastic_graph/errors"
11+
require "elastic_graph/indexer_lambda/sqs_message_body_loader"
12+
require "elastic_graph/spec_support/lambda_function"
13+
require "json"
14+
15+
module ElasticGraph
16+
module IndexerLambda
17+
RSpec.describe SqsMessageBodyLoader do
18+
describe "#load_body" do
19+
it "returns inline SQS message bodies unchanged" do
20+
loader = described_class.new
21+
22+
loaded_body = loader.load_body(sqs_record: {"body" => "{\"field1\":{}}"})
23+
24+
expect(loaded_body).to eq("{\"field1\":{}}")
25+
end
26+
27+
it "retrieves large messages from S3 when an ElasticGraph event was offloaded there" do
28+
bucket_name = "test-bucket-name"
29+
s3_key = "88680f6d-53d4-4143-b8c7-f5b1189213b6"
30+
body = "{\"field1\":{}}\n{\"field2\":{}}"
31+
s3_client = Aws::S3::Client.new(stub_responses: true)
32+
loader = described_class.new(s3_client: s3_client)
33+
34+
sqs_record = {
35+
"body" => JSON.generate([
36+
"software.amazon.payloadoffloading.PayloadS3Pointer",
37+
{"s3BucketName" => bucket_name, "s3Key" => s3_key}
38+
])
39+
}
40+
41+
s3_client.stub_responses(:get_object, ->(context) {
42+
expect(context.params).to include(bucket: bucket_name, key: s3_key)
43+
{body: body}
44+
})
45+
46+
expect(loader.load_body(sqs_record: sqs_record)).to eq(body)
47+
end
48+
49+
it "raises a detailed error when fetching from S3 fails" do
50+
bucket_name = "test-bucket-name"
51+
s3_key = "88680f6d-53d4-4143-b8c7-f5b1189213b6"
52+
s3_client = Aws::S3::Client.new(stub_responses: true)
53+
loader = described_class.new(s3_client: s3_client)
54+
55+
sqs_record = {
56+
"body" => JSON.generate([
57+
"software.amazon.payloadoffloading.PayloadS3Pointer",
58+
{"s3BucketName" => bucket_name, "s3Key" => s3_key}
59+
])
60+
}
61+
62+
s3_client.stub_responses(:get_object, "NoSuchkey")
63+
64+
expect {
65+
loader.load_body(sqs_record: sqs_record)
66+
}.to raise_error Errors::S3OperationFailedError, a_string_including(
67+
"Error reading large message from S3. bucket: `#{bucket_name}` key: `#{s3_key}` message: `stubbed-response-error-message`"
68+
)
69+
end
70+
end
71+
72+
context "when instantiated without an S3 client injection" do
73+
include_context "lambda function"
74+
75+
it "lazily creates the S3 client when needed" do
76+
expect(described_class.new.send(:s3_client)).to be_a Aws::S3::Client
77+
end
78+
end
79+
end
80+
end
81+
end

elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb

Lines changed: 21 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
require "elastic_graph/indexer/processor"
1212
require "elastic_graph/indexer_lambda/sqs_processor"
1313
require "elastic_graph/spec_support/lambda_function"
14-
require "json"
15-
require "aws-sdk-s3"
1614

1715
module ElasticGraph
1816
module IndexerLambda
@@ -21,7 +19,6 @@ module IndexerLambda
2119
let(:indexer_processor) { instance_double(Indexer::Processor, process_returning_failures: []) }
2220

2321
describe "#process" do
24-
let(:s3_client) { Aws::S3::Client.new(stub_responses: true) }
2522
let(:sqs_processor) { build_sqs_processor }
2623

2724
it "processes a lambda event containing a single SQS message with a single ElasticGraph event" do
@@ -97,6 +94,27 @@ module IndexerLambda
9794
], refresh_indices: false)
9895
end
9996

97+
it "loads message bodies using the configured message body loader" do
98+
message_body_loader = instance_double(SqsMessageBodyLoader, load_body: "{\"field1\":{}}")
99+
sqs_processor = build_sqs_processor(message_body_loader: message_body_loader)
100+
101+
lambda_event = {
102+
"Records" => [
103+
sqs_message("a", {"field1" => {}})
104+
]
105+
}
106+
107+
sqs_processor.process(lambda_event)
108+
109+
expect(message_body_loader).to have_received(:load_body).with(
110+
sqs_record: lambda_event.fetch("Records").first
111+
)
112+
113+
expect(indexer_processor).to have_received(:process_returning_failures).with([
114+
{"field1" => {}, "message_id" => "a"}
115+
], refresh_indices: false)
116+
end
117+
100118
it "logs the SQS message ids received in the lambda event and the `sqs_received_at` if available" do
101119
sent_timestamp_millis = "796010423456"
102120
sent_timestamp_iso8601 = "1995-03-24T02:00:23.456Z"
@@ -147,55 +165,6 @@ module IndexerLambda
147165
expect(indexer_processor).not_to have_received(:process_returning_failures)
148166
end
149167

150-
it "retrieves large messages from s3 when an ElasticGraph event was offloaded there" do
151-
bucket_name = "test-bucket-name"
152-
s3_key = "88680f6d-53d4-4143-b8c7-f5b1189213b6"
153-
event_payload = {"test" => "data"}
154-
155-
lambda_event = {
156-
"Records" => [
157-
sqs_message("a", JSON.generate([
158-
"software.amazon.payloadoffloading.PayloadS3Pointer",
159-
{"s3BucketName" => bucket_name, "s3Key" => s3_key}
160-
]))
161-
]
162-
}
163-
164-
s3_client.stub_responses(:get_object, ->(context) {
165-
expect(context.params).to include(bucket: bucket_name, key: s3_key)
166-
{body: jsonl(event_payload)}
167-
})
168-
169-
sqs_processor.process(lambda_event)
170-
171-
expect(indexer_processor).to have_received(:process_returning_failures).with(
172-
[event_payload.merge("message_id" => "a")],
173-
refresh_indices: false
174-
)
175-
end
176-
177-
it "throws a detailed error when fetching from s3 fails" do
178-
bucket_name = "test-bucket-name"
179-
s3_key = "88680f6d-53d4-4143-b8c7-f5b1189213b6"
180-
181-
lambda_event = {
182-
"Records" => [
183-
sqs_message("a", JSON.generate([
184-
"software.amazon.payloadoffloading.PayloadS3Pointer",
185-
{"s3BucketName" => bucket_name, "s3Key" => s3_key}
186-
]))
187-
]
188-
}
189-
190-
s3_client.stub_responses(:get_object, "NoSuchkey")
191-
192-
expect {
193-
sqs_processor.process(lambda_event)
194-
}.to raise_error Errors::S3OperationFailedError, a_string_including(
195-
"Error reading large message from S3. bucket: `#{bucket_name}` key: `#{s3_key}` message: `stubbed-response-error-message`"
196-
)
197-
end
198-
199168
it "parses and merges SQS timestamps into non-existing `latency_timestamps` field" do
200169
approximate_first_receive_timestamp_millis = "1696334412345"
201170
sent_timestamp_millis = "796010423456"
@@ -361,18 +330,6 @@ module IndexerLambda
361330
def failure_of(id, message: "boom", event: {})
362331
instance_double(Indexer::FailedEventError, id: id, message: message, event: event)
363332
end
364-
365-
def build_sqs_processor(**options)
366-
super(s3_client: s3_client, **options)
367-
end
368-
end
369-
370-
context "when instantiated without an S3 client injection" do
371-
include_context "lambda function"
372-
373-
it "lazily creates the S3 client when needed" do
374-
expect(build_sqs_processor.send(:s3_client)).to be_a Aws::S3::Client
375-
end
376333
end
377334

378335
def sqs_message(message_id, *body, event_source_arn: "arn:aws:sqs:us-east-2:123456789012:my-queue", attributes: nil)

0 commit comments

Comments
 (0)