Skip to content

Commit 327b53f

Browse files
committed
Refactor indexer lambda event decoding
1 parent 89b5446 commit 327b53f

4 files changed

Lines changed: 92 additions & 6 deletions

File tree

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Copyright 2024 - 2026 Block, Inc.
2+
#
3+
# Use of this source code is governed by an MIT-style
4+
# license that can be found in the LICENSE file or at
5+
# https://opensource.org/licenses/MIT.
6+
#
7+
# frozen_string_literal: true
8+
9+
require "json"
10+
11+
module ElasticGraph
12+
module IndexerLambda
13+
# Decodes SQS message payloads encoded as JSON Lines into ElasticGraph events.
14+
#
15+
# @private
16+
class JSONLDecoder
17+
# Decodes the given message payload into zero or more ElasticGraph events.
18+
#
19+
# @param sqs_record [Hash] full SQS record carrying the payload
20+
# @param body [String] resolved SQS message body
21+
# @return [Array<Hash>] decoded ElasticGraph events
22+
def decode_events(sqs_record:, body:)
23+
_ = sqs_record
24+
body.split("\n").map { |event| JSON.parse(event) }
25+
end
26+
end
27+
end
28+
end

elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
require "elastic_graph/errors"
1010
require "elastic_graph/indexer/indexing_failures_error"
11+
require "elastic_graph/indexer_lambda/jsonl_decoder"
1112
require "json"
1213

1314
module ElasticGraph
@@ -19,11 +20,12 @@ class SqsProcessor
1920
# @dynamic ignore_sqs_latency_timestamps_from_arns
2021
attr_reader :ignore_sqs_latency_timestamps_from_arns
2122

22-
def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, s3_client: nil)
23+
def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, event_payload_decoder: JSONLDecoder.new, s3_client: nil)
2324
@indexer_processor = indexer_processor
2425
@logger = logger
2526
@s3_client = s3_client
2627
@ignore_sqs_latency_timestamps_from_arns = ignore_sqs_latency_timestamps_from_arns
28+
@event_payload_decoder = event_payload_decoder
2729
end
2830

2931
# Processes the ElasticGraph events in the given `lambda_event`, indexing the data in the datastore.
@@ -80,7 +82,10 @@ def events_from(lambda_event)
8082
sqs_metadata = sqs_metadata.except("latency_timestamps")
8183
end
8284

83-
parse_jsonl(record.fetch("body")).map do |event|
85+
@event_payload_decoder.decode_events(
86+
sqs_record: record,
87+
body: body_from(record.fetch("body"))
88+
).map do |event|
8489
ElasticGraph::Support::HashUtil.deep_merge(event, sqs_metadata)
8590
end
8691
end.tap do
@@ -93,11 +98,11 @@ def events_from(lambda_event)
9398

9499
S3_OFFLOADING_INDICATOR = '["software.amazon.payloadoffloading.PayloadS3Pointer"'
95100

96-
def parse_jsonl(jsonl_string)
97-
if jsonl_string.start_with?(S3_OFFLOADING_INDICATOR)
98-
jsonl_string = get_payload_from_s3(jsonl_string)
101+
def body_from(body)
102+
if body.start_with?(S3_OFFLOADING_INDICATOR)
103+
body = get_payload_from_s3(body)
99104
end
100-
jsonl_string.split("\n").map { |event| JSON.parse(event) }
105+
body
101106
end
102107

103108
def extract_sqs_metadata(record)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Copyright 2024 - 2026 Block, Inc.
2+
#
3+
# Use of this source code is governed by an MIT-style
4+
# license that can be found in the LICENSE file or at
5+
# https://opensource.org/licenses/MIT.
6+
#
7+
# frozen_string_literal: true
8+
9+
require "elastic_graph/indexer_lambda/jsonl_decoder"
10+
11+
module ElasticGraph
12+
module IndexerLambda
13+
RSpec.describe JSONLDecoder do
14+
describe "#decode_events" do
15+
it "parses JSON Lines payloads into ElasticGraph events" do
16+
decoder = described_class.new
17+
18+
decoded_events = decoder.decode_events(
19+
sqs_record: {"messageId" => "123"},
20+
body: %({"id":"1"}\n{"id":"2","record":{"name":"Widget"}})
21+
)
22+
23+
expect(decoded_events).to eq([
24+
{"id" => "1"},
25+
{"id" => "2", "record" => {"name" => "Widget"}}
26+
])
27+
end
28+
end
29+
end
30+
end
31+
end

elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,28 @@ module IndexerLambda
7575
], refresh_indices: false)
7676
end
7777

78+
it "decodes message bodies using the configured event payload decoder" do
79+
event_payload_decoder = instance_double(JSONLDecoder, decode_events: [{"field1" => {}}])
80+
sqs_processor = build_sqs_processor(event_payload_decoder: event_payload_decoder)
81+
82+
lambda_event = {
83+
"Records" => [
84+
sqs_message("a", {"field1" => {}})
85+
]
86+
}
87+
88+
sqs_processor.process(lambda_event)
89+
90+
expect(event_payload_decoder).to have_received(:decode_events).with(
91+
sqs_record: lambda_event.fetch("Records").first,
92+
body: "{\"field1\":{}}"
93+
)
94+
95+
expect(indexer_processor).to have_received(:process_returning_failures).with([
96+
{"field1" => {}, "message_id" => "a"}
97+
], refresh_indices: false)
98+
end
99+
78100
it "logs the SQS message ids received in the lambda event and the `sqs_received_at` if available" do
79101
sent_timestamp_millis = "796010423456"
80102
sent_timestamp_iso8601 = "1995-03-24T02:00:23.456Z"

0 commit comments

Comments
 (0)