Skip to content

Commit e8e1a67

Browse files
committed
Address review feedback on indexing event decoder PR
- Make `indexing_event_decoder` a required `SqsProcessor` kwarg instead of a nilable one with a second, duplicate default-construction path. The warehouse lambda now passes its indexer's configured decoder, so there is a single source of truth for the default and decoders can rely on receiving non-nil `schema_artifacts`. - Make `JSONLines` inherit from `Interface` so its RBS superclass declaration matches the runtime class. - Validate the decoder's `name` config with the same class-name pattern used for query interceptors, and regenerate the config schema artifact. - Match the established extension-interface style (explanatory comments, `:nocov:` only around the body that must return a value) and document the interface publicly since it is the contract decoder authors implement. - Replace the `_ =` cast with an inline type annotation, alphabetize requires, and simplify the SQS processor spec helper.
1 parent 5bd5f28 commit e8e1a67

10 files changed

Lines changed: 34 additions & 31 deletions

File tree

elasticgraph-indexer/lib/elastic_graph/indexer.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ def operation_factory
8989
def indexing_event_decoder
9090
@indexing_event_decoder ||= begin
9191
extension = config.indexing_event_decoder
92-
(_ = extension.extension_class).new(
92+
decoder_class = extension.extension_class # : untyped
93+
decoder_class.new(
9394
config: extension.config,
9495
schema_artifacts: schema_artifacts,
9596
logger: logger

elasticgraph-indexer/lib/elastic_graph/indexer/config.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
#
77
# frozen_string_literal: true
88

9-
require "elastic_graph/support/config"
109
require "elastic_graph/errors"
1110
require "elastic_graph/indexer/indexing_event_decoder"
1211
require "elastic_graph/schema_artifacts/runtime_metadata/extension_loader"
12+
require "elastic_graph/support/config"
1313

1414
module ElasticGraph
1515
class Indexer
@@ -57,7 +57,7 @@ class Config < Support::Config.define(:latency_slo_thresholds_by_timestamp_in_ms
5757
name: {
5858
description: "The name of the indexing event decoder extension class.",
5959
type: "string",
60-
minLength: 1,
60+
pattern: /^[A-Z]\w+(::[A-Z]\w+)*$/.source, # https://rubular.com/r/UuqAz4fR3kdMip
6161
examples: ["MyCompany::ElasticGraph::CSVIndexingEventDecoder"]
6262
},
6363
require_path: {

elasticgraph-indexer/lib/elastic_graph/indexer/indexing_event_decoder.rb

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,36 @@
1010

1111
module ElasticGraph
1212
class Indexer
13+
# Namespace for indexing event decoders, which turn raw payload strings from a transport into
14+
# ElasticGraph indexing event hashes. The decoder to use is configured via the
15+
# `indexer.indexing_event_decoder` setting.
1316
module IndexingEventDecoder
14-
# Defines the extension interface implemented by indexing event decoders.
15-
#
16-
# @api private
17+
# Defines the indexing event decoder interface, which our extension loader will validate against.
1718
class Interface
18-
# :nocov:
19+
# @param config [Hash<String, Object>] configuration from the `indexing_event_decoder.config` setting
20+
# @param schema_artifacts [SchemaArtifacts::FromDisk] the schema artifacts
21+
# @param logger [Logger] the ElasticGraph logger
1922
def initialize(config:, schema_artifacts:, logger:)
23+
# must be defined, but nothing to do
2024
end
2125

26+
# @param payload [String] a raw payload from the transport
27+
# @return [Array<Hash<String, Object>>] the decoded ElasticGraph indexing events
2228
def decode(payload)
29+
# :nocov: -- must return an array to satisfy Steep type checking but never called
2330
[]
31+
# :nocov:
2432
end
25-
# :nocov:
2633
end
2734

2835
# The default indexing event decoder, which expects newline-delimited JSON objects.
29-
#
30-
# @api private
31-
class JSONLines
36+
class JSONLines < Interface
37+
# (see Interface#initialize)
3238
def initialize(config:, schema_artifacts:, logger:)
39+
# must be defined for extension interface verification, but nothing to do
3340
end
3441

42+
# (see Interface#decode)
3543
def decode(payload)
3644
payload.split("\n").map { |event| JSON.parse(event) }
3745
end

elasticgraph-indexer/sig/elastic_graph/indexer/indexing_event_decoder.rbs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ module ElasticGraph
66
class Interface
77
def initialize: (
88
config: ::Hash[::Symbol | ::String, untyped],
9-
schema_artifacts: schemaArtifacts?,
9+
schema_artifacts: schemaArtifacts,
1010
logger: ::Logger
1111
) -> void
1212

elasticgraph-indexer/spec/unit/elastic_graph/indexer/indexing_event_decoder_spec.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010

1111
module ElasticGraph
1212
class Indexer
13-
RSpec.describe IndexingEventDecoder::JSONLines do
13+
RSpec.describe IndexingEventDecoder::JSONLines, :capture_logs do
1414
it "decodes newline-delimited JSON objects" do
15-
decoder = described_class.new(config: {}, schema_artifacts: nil, logger: nil)
15+
decoder = described_class.new(config: {}, schema_artifacts: nil, logger: logger)
1616
payload = <<~JSONL
1717
{"op":"upsert","id":"1"}
1818
{"op":"upsert","id":"2"}

elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ class SqsProcessor
1919
# @dynamic ignore_sqs_latency_timestamps_from_arns
2020
attr_reader :ignore_sqs_latency_timestamps_from_arns
2121

22-
def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, indexing_event_decoder: nil, s3_client: nil)
22+
def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, indexing_event_decoder:, s3_client: nil)
2323
@indexer_processor = indexer_processor
2424
@logger = logger
25-
@indexing_event_decoder = indexing_event_decoder || default_indexing_event_decoder
25+
@indexing_event_decoder = indexing_event_decoder
2626
@s3_client = s3_client
2727
@ignore_sqs_latency_timestamps_from_arns = ignore_sqs_latency_timestamps_from_arns
2828
end
@@ -94,16 +94,6 @@ def decoded_events_from(payload)
9494
@indexing_event_decoder.decode(payload)
9595
end
9696

97-
def default_indexing_event_decoder
98-
require "elastic_graph/indexer/indexing_event_decoder"
99-
100-
Indexer::IndexingEventDecoder::JSONLines.new(
101-
config: {},
102-
schema_artifacts: nil,
103-
logger: @logger
104-
)
105-
end
106-
10797
def extract_sqs_metadata(record)
10898
sqs_timestamps = {
10999
"processing_first_attempted_at" => millis_to_iso8601(record.dig("attributes", "ApproximateFirstReceiveTimestamp")),

elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ module ElasticGraph
55
Indexer::Processor,
66
logger: ::Logger,
77
ignore_sqs_latency_timestamps_from_arns: ::Set[::String],
8-
?indexing_event_decoder: Indexer::indexingEventDecoder?,
8+
indexing_event_decoder: Indexer::indexingEventDecoder,
99
?s3_client: Aws::S3::Client?,
1010
) -> void
1111

@@ -25,7 +25,6 @@ module ElasticGraph
2525
def extract_sqs_metadata: (::Hash[String, untyped]) -> ::Hash[::String, untyped]
2626
def millis_to_iso8601: (::String) -> ::String?
2727
def decoded_events_from: (::String) -> ::Array[::Hash[::String, untyped]]
28-
def default_indexing_event_decoder: () -> Indexer::indexingEventDecoder
2928
def get_payload_from_s3: (::String) -> ::String
3029
def s3_client: () -> Aws::S3::Client
3130
def format_response: (

elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,12 +394,16 @@ def jsonl(*items)
394394
end
395395

396396
def build_sqs_processor(**options)
397-
event_decoder = options.delete(:indexing_event_decoder) { nil }
397+
json_lines_decoder = Indexer::IndexingEventDecoder::JSONLines.new(
398+
config: {},
399+
schema_artifacts: nil, # not used by `JSONLines`
400+
logger: logger
401+
)
398402

399403
SqsProcessor.new(
400404
indexer_processor,
401405
logger: logger,
402-
indexing_event_decoder: event_decoder,
406+
indexing_event_decoder: json_lines_decoder,
403407
ignore_sqs_latency_timestamps_from_arns: ignore_sqs_latency_timestamps_from_arns,
404408
**options
405409
)

elasticgraph-local/lib/elastic_graph/local/spec_support/config_schema.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ properties:
516516
name:
517517
description: The name of the indexing event decoder extension class.
518518
type: string
519-
minLength: 1
519+
pattern: "^[A-Z]\\w+(::[A-Z]\\w+)*$"
520520
examples:
521521
- MyCompany::ElasticGraph::CSVIndexingEventDecoder
522522
require_path:

elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/lambda_function.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def initialize
2828
@sqs_processor = IndexerLambda::SqsProcessor.new(
2929
warehouse_lambda.processor,
3030
ignore_sqs_latency_timestamps_from_arns: ignore_sqs_latency_timestamps_from_arns,
31+
indexing_event_decoder: warehouse_lambda.indexer.indexing_event_decoder,
3132
logger: warehouse_lambda.logger
3233
)
3334
end

0 commit comments

Comments
 (0)