diff --git a/elasticgraph-indexer/README.md b/elasticgraph-indexer/README.md index e4f82753b..52a3be8c9 100644 --- a/elasticgraph-indexer/README.md +++ b/elasticgraph-indexer/README.md @@ -48,3 +48,29 @@ indexer = ElasticGraph::Indexer.from_yaml_file("config/settings/local.yaml") events = [] # JSON events read from an async datastream indexer.processor.process(events) ``` + +## Custom Payload Decoding + +`ElasticGraph::Indexer` can be configured with an indexing event decoder extension. Decoders turn raw payload strings +from a transport into ElasticGraph indexing event hashes before the normal validation and indexing pipeline runs. The +default decoder expects JSON Lines. + +```yaml +indexer: + indexing_event_decoder: + name: MyCompany::ElasticGraph::CSVIndexingEventDecoder + require_path: ./lib/my_company/elastic_graph/csv_indexing_event_decoder + config: + delimiter: "," +``` + +Decoder extensions must implement: + +```ruby +def initialize(config:, schema_artifacts:, logger:) +end + +def decode(payload) + # return an array of ElasticGraph indexing event hashes +end +``` diff --git a/elasticgraph-indexer/lib/elastic_graph/indexer.rb b/elasticgraph-indexer/lib/elastic_graph/indexer.rb index 2d64978bd..da4d0ad6c 100644 --- a/elasticgraph-indexer/lib/elastic_graph/indexer.rb +++ b/elasticgraph-indexer/lib/elastic_graph/indexer.rb @@ -86,6 +86,18 @@ def operation_factory end end + def indexing_event_decoder + @indexing_event_decoder ||= begin + extension = config.indexing_event_decoder + decoder_class = extension.extension_class # : untyped + decoder_class.new( + config: extension.config, + schema_artifacts: schema_artifacts, + logger: logger + ) + end + end + def monotonic_clock @monotonic_clock ||= begin require "elastic_graph/support/monotonic_clock" diff --git a/elasticgraph-indexer/lib/elastic_graph/indexer/config.rb b/elasticgraph-indexer/lib/elastic_graph/indexer/config.rb index 2b760f0ca..2b0146b03 100644 --- a/elasticgraph-indexer/lib/elastic_graph/indexer/config.rb +++ b/elasticgraph-indexer/lib/elastic_graph/indexer/config.rb @@ -6,12 +6,19 @@ # # frozen_string_literal: true -require "elastic_graph/support/config" require "elastic_graph/errors" +require "elastic_graph/indexer/indexing_event_decoder" +require "elastic_graph/schema_artifacts/runtime_metadata/extension_loader" +require "elastic_graph/support/config" module ElasticGraph class Indexer - class Config < Support::Config.define(:latency_slo_thresholds_by_timestamp_in_ms, :skip_derived_indexing_type_updates) + class Config < Support::Config.define(:latency_slo_thresholds_by_timestamp_in_ms, :skip_derived_indexing_type_updates, :indexing_event_decoder) + DEFAULT_INDEXING_EVENT_DECODER = { + "name" => "ElasticGraph::Indexer::IndexingEventDecoder::JSONLines", + "require_path" => "elastic_graph/indexer/indexing_event_decoder" + } + json_schema at: "indexer", optional: false, description: "Configuration for indexing operations and metrics used by `elasticgraph-indexer`.", @@ -42,17 +49,64 @@ class Config < Support::Config.define(:latency_slo_thresholds_by_timestamp_in_ms {}, # : untyped {"WidgetWorkspace" => ["ABC12345678"]} ] + }, + indexing_event_decoder: { + description: "Extension object used to decode raw indexing payloads into ElasticGraph indexing event hashes. The default decoder expects JSON Lines.", + type: "object", + properties: { + name: { + description: "The name of the indexing event decoder extension class.", + type: "string", + pattern: /^[A-Z]\w+(::[A-Z]\w+)*$/.source, # https://rubular.com/r/UuqAz4fR3kdMip + examples: ["MyCompany::ElasticGraph::CSVIndexingEventDecoder"] + }, + require_path: { + description: "The path to require to load the indexing event decoder extension.", + type: "string", + minLength: 1, + examples: ["./lib/my_company/elastic_graph/csv_indexing_event_decoder"] + }, + config: { + description: "Configuration for the indexing event decoder. Will be passed into the decoder's `#initialize` method.", + type: "object", + default: {}, # : untyped + examples: [ + {}, # : untyped + {"delimiter" => ","} + ] + } + }, + required: ["name", "require_path"], + default: DEFAULT_INDEXING_EVENT_DECODER, + examples: [ + DEFAULT_INDEXING_EVENT_DECODER, + { + "name" => "MyCompany::ElasticGraph::CSVIndexingEventDecoder", + "require_path" => "./lib/my_company/elastic_graph/csv_indexing_event_decoder", + "config" => {"delimiter" => ","} + } + ] } } private - def convert_values(skip_derived_indexing_type_updates:, latency_slo_thresholds_by_timestamp_in_ms:) + def convert_values(skip_derived_indexing_type_updates:, latency_slo_thresholds_by_timestamp_in_ms:, indexing_event_decoder:) { skip_derived_indexing_type_updates: skip_derived_indexing_type_updates.transform_values(&:to_set), - latency_slo_thresholds_by_timestamp_in_ms: latency_slo_thresholds_by_timestamp_in_ms + latency_slo_thresholds_by_timestamp_in_ms: latency_slo_thresholds_by_timestamp_in_ms, + indexing_event_decoder: load_indexing_event_decoder(indexing_event_decoder) } end + + def load_indexing_event_decoder(config) + loader = SchemaArtifacts::RuntimeMetadata::ExtensionLoader.new(IndexingEventDecoder::Interface) + loader.load( + config.fetch("name"), + from: config.fetch("require_path"), + config: config["config"] || {} + ) + end end end end diff --git a/elasticgraph-indexer/lib/elastic_graph/indexer/event_id.rb b/elasticgraph-indexer/lib/elastic_graph/indexer/event_id.rb index 133ff5d7d..a3e3e1e9c 100644 --- a/elasticgraph-indexer/lib/elastic_graph/indexer/event_id.rb +++ b/elasticgraph-indexer/lib/elastic_graph/indexer/event_id.rb @@ -12,7 +12,7 @@ module ElasticGraph class Indexer # A unique identifier for an event ingested by the indexer. As a string, takes the form of # "[type]:[id]@v[version]", such as "Widget:123abc@v7". This format was designed to make it - # easy to put these ids in a comma-seperated list. + # easy to put these ids in a comma-separated list. EventID = ::Data.define(:type, :id, :version) do # @implements EventID def self.from_event(event) @@ -26,7 +26,7 @@ def to_s # Steep weirdly expects them here... # @dynamic initialize, config, datastore_core, schema_artifacts, datastore_router, monotonic_clock - # @dynamic record_preparer_factory, processor, operation_factory, logger + # @dynamic record_preparer_factory, processor, operation_factory, indexing_event_decoder, logger # @dynamic self.from_parsed_yaml end end diff --git a/elasticgraph-indexer/lib/elastic_graph/indexer/indexing_event_decoder.rb b/elasticgraph-indexer/lib/elastic_graph/indexer/indexing_event_decoder.rb new file mode 100644 index 000000000..2fe8b9162 --- /dev/null +++ b/elasticgraph-indexer/lib/elastic_graph/indexer/indexing_event_decoder.rb @@ -0,0 +1,49 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "json" + +module ElasticGraph + class Indexer + # Namespace for indexing event decoders, which turn raw payload strings from a transport into + # ElasticGraph indexing event hashes. The decoder to use is configured via the + # `indexer.indexing_event_decoder` setting. + module IndexingEventDecoder + # Defines the indexing event decoder interface, which our extension loader will validate against. + class Interface + # @param config [Hash] configuration from the `indexing_event_decoder.config` setting + # @param schema_artifacts [SchemaArtifacts::FromDisk] the schema artifacts + # @param logger [Logger] the ElasticGraph logger + def initialize(config:, schema_artifacts:, logger:) + # must be defined, but nothing to do + end + + # @param payload [String] a raw payload from the transport + # @return [Array>] the decoded ElasticGraph indexing events + def decode(payload) + # :nocov: -- must return an array to satisfy Steep type checking but never called + [] + # :nocov: + end + end + + # The default indexing event decoder, which expects newline-delimited JSON objects. + class JSONLines < Interface + # (see Interface#initialize) + def initialize(config:, schema_artifacts:, logger:) + # must be defined for extension interface verification, but nothing to do + end + + # (see Interface#decode) + def decode(payload) + payload.split("\n").map { |event| JSON.parse(event) } + end + end + end + end +end diff --git a/elasticgraph-indexer/sig/elastic_graph/indexer.rbs b/elasticgraph-indexer/sig/elastic_graph/indexer.rbs index 6f1d1bc40..e516c6eba 100644 --- a/elasticgraph-indexer/sig/elastic_graph/indexer.rbs +++ b/elasticgraph-indexer/sig/elastic_graph/indexer.rbs @@ -28,6 +28,9 @@ module ElasticGraph @operation_factory: Operation::Factory? def operation_factory: () -> Operation::Factory + @indexing_event_decoder: indexingEventDecoder? + def indexing_event_decoder: () -> indexingEventDecoder + @monotonic_clock: Support::MonotonicClock? def monotonic_clock: () -> Support::MonotonicClock diff --git a/elasticgraph-indexer/sig/elastic_graph/indexer/config.rbs b/elasticgraph-indexer/sig/elastic_graph/indexer/config.rbs index 1228afd4d..a33c4b567 100644 --- a/elasticgraph-indexer/sig/elastic_graph/indexer/config.rbs +++ b/elasticgraph-indexer/sig/elastic_graph/indexer/config.rbs @@ -5,25 +5,33 @@ module ElasticGraph attr_reader latency_slo_thresholds_by_timestamp_in_ms: ::Hash[::String, ::Integer] attr_reader skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]] + attr_reader indexing_event_decoder: SchemaArtifacts::RuntimeMetadata::Extension def initialize: ( ?latency_slo_thresholds_by_timestamp_in_ms: ::Hash[::String, ::Integer], - ?skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]]) -> void + ?skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]], + ?indexing_event_decoder: ::Hash[::String, untyped]) -> void def with: ( ?latency_slo_thresholds_by_timestamp_in_ms: ::Hash[::String, ::Integer], - ?skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]]) -> Config + ?skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]], + ?indexing_event_decoder: SchemaArtifacts::RuntimeMetadata::Extension) -> Config def self.members: () -> ::Array[::Symbol] end class Config < ConfigSupertype + DEFAULT_INDEXING_EVENT_DECODER: ::Hash[::String, untyped] + private def convert_values: ( latency_slo_thresholds_by_timestamp_in_ms: untyped, - skip_derived_indexing_type_updates: untyped + skip_derived_indexing_type_updates: untyped, + indexing_event_decoder: untyped ) -> ::Hash[::Symbol, untyped] + + def load_indexing_event_decoder: (::Hash[::String, untyped]) -> SchemaArtifacts::RuntimeMetadata::Extension end end end diff --git a/elasticgraph-indexer/sig/elastic_graph/indexer/indexing_event_decoder.rbs b/elasticgraph-indexer/sig/elastic_graph/indexer/indexing_event_decoder.rbs new file mode 100644 index 000000000..2aba3cc05 --- /dev/null +++ b/elasticgraph-indexer/sig/elastic_graph/indexer/indexing_event_decoder.rbs @@ -0,0 +1,20 @@ +module ElasticGraph + class Indexer + type indexingEventDecoder = IndexingEventDecoder::Interface + + module IndexingEventDecoder + class Interface + def initialize: ( + config: ::Hash[::Symbol | ::String, untyped], + schema_artifacts: schemaArtifacts, + logger: ::Logger + ) -> void + + def decode: (::String) -> ::Array[event] + end + + class JSONLines < Interface + end + end + end +end diff --git a/elasticgraph-indexer/spec/support/example_extensions/indexing_event_decoder.rb b/elasticgraph-indexer/spec/support/example_extensions/indexing_event_decoder.rb new file mode 100644 index 000000000..b9c777ce6 --- /dev/null +++ b/elasticgraph-indexer/spec/support/example_extensions/indexing_event_decoder.rb @@ -0,0 +1,26 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +class ExampleIndexingEventDecoder + attr_reader :config, :schema_artifacts, :logger + + def initialize(config:, schema_artifacts:, logger:) + @config = config + @schema_artifacts = schema_artifacts + @logger = logger + end + + def decode(payload) + payload.split(config.fetch("delimiter")).map { |value| {"value" => value} } + end +end + +class InvalidIndexingEventDecoder + def initialize(config:, schema_artifacts:, logger:) + end +end diff --git a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/config_spec.rb b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/config_spec.rb index 072d43707..72f591ec6 100644 --- a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/config_spec.rb +++ b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/config_spec.rb @@ -31,6 +31,34 @@ class Indexer expect(config.skip_derived_indexing_type_updates).to eq("WidgetCurrency" => ["USD"].to_set) end + + it "uses the JSON Lines indexing event decoder by default" do + expect(Config.new.indexing_event_decoder.extension_class).to be(IndexingEventDecoder::JSONLines) + end + + it "loads a configured indexing event decoder" do + config = Config.from_parsed_yaml("indexer" => { + "indexing_event_decoder" => { + "name" => "ExampleIndexingEventDecoder", + "require_path" => "support/example_extensions/indexing_event_decoder", + "config" => {"delimiter" => "|"} + } + }) + + expect(config.indexing_event_decoder.extension_class).to be(ExampleIndexingEventDecoder) + expect(config.indexing_event_decoder.config).to eq({"delimiter" => "|"}) + end + + it "verifies that a configured indexing event decoder implements the expected interface" do + expect { + Config.from_parsed_yaml("indexer" => { + "indexing_event_decoder" => { + "name" => "InvalidIndexingEventDecoder", + "require_path" => "support/example_extensions/indexing_event_decoder" + } + }) + }.to raise_error Errors::InvalidExtensionError, a_string_including("InvalidIndexingEventDecoder", "Missing instance methods: `decode`") + end end end end diff --git a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/indexing_event_decoder_spec.rb b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/indexing_event_decoder_spec.rb new file mode 100644 index 000000000..14b3674a5 --- /dev/null +++ b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/indexing_event_decoder_spec.rb @@ -0,0 +1,28 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/indexer/indexing_event_decoder" + +module ElasticGraph + class Indexer + RSpec.describe IndexingEventDecoder::JSONLines, :capture_logs do + it "decodes newline-delimited JSON objects" do + decoder = described_class.new(config: {}, schema_artifacts: nil, logger: logger) + payload = <<~JSONL + {"op":"upsert","id":"1"} + {"op":"upsert","id":"2"} + JSONL + + expect(decoder.decode(payload)).to eq([ + {"op" => "upsert", "id" => "1"}, + {"op" => "upsert", "id" => "2"} + ]) + end + end + end +end diff --git a/elasticgraph-indexer/spec/unit/elastic_graph/indexer_spec.rb b/elasticgraph-indexer/spec/unit/elastic_graph/indexer_spec.rb index abd32bc88..42e5b2f95 100644 --- a/elasticgraph-indexer/spec/unit/elastic_graph/indexer_spec.rb +++ b/elasticgraph-indexer/spec/unit/elastic_graph/indexer_spec.rb @@ -28,6 +28,28 @@ module ElasticGraph expect(indexer).to be_a(Indexer) end + + it "builds a configured indexing event decoder" do + config = Indexer::Config.from_parsed_yaml("indexer" => { + "indexing_event_decoder" => { + "name" => "ExampleIndexingEventDecoder", + "require_path" => "support/example_extensions/indexing_event_decoder", + "config" => {"delimiter" => "|"} + } + }) + indexer = Indexer.new(config: config, datastore_core: build_datastore_core) + + decoder = indexer.indexing_event_decoder + + expect(decoder).to be_a(ExampleIndexingEventDecoder) + expect(decoder.config).to eq({"delimiter" => "|"}) + expect(decoder.schema_artifacts).to be(indexer.schema_artifacts) + expect(decoder.logger).to be(indexer.logger) + expect(decoder.decode("one|two")).to eq([ + {"value" => "one"}, + {"value" => "two"} + ]) + end end end end diff --git a/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/lambda_function.rb b/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/lambda_function.rb index b5ea6b4d0..b90f48d6a 100644 --- a/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/lambda_function.rb +++ b/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/lambda_function.rb @@ -27,6 +27,7 @@ def initialize @sqs_processor = ElasticGraph::IndexerLambda::SqsProcessor.new( indexer.processor, + indexing_event_decoder: indexer.indexing_event_decoder, ignore_sqs_latency_timestamps_from_arns: ignore_sqs_latency_timestamps_from_arns, logger: indexer.logger ) diff --git a/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb b/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb index 635b6dd0c..45144fede 100644 --- a/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb +++ b/elasticgraph-indexer_lambda/lib/elastic_graph/indexer_lambda/sqs_processor.rb @@ -19,9 +19,10 @@ class SqsProcessor # @dynamic ignore_sqs_latency_timestamps_from_arns attr_reader :ignore_sqs_latency_timestamps_from_arns - def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, s3_client: nil) + def initialize(indexer_processor, logger:, ignore_sqs_latency_timestamps_from_arns:, indexing_event_decoder:, s3_client: nil) @indexer_processor = indexer_processor @logger = logger + @indexing_event_decoder = indexing_event_decoder @s3_client = s3_client @ignore_sqs_latency_timestamps_from_arns = ignore_sqs_latency_timestamps_from_arns end @@ -41,32 +42,24 @@ def process(lambda_event, refresh_indices: false) private - # Given a lambda event payload, returns an array of raw operations in JSON format. + # Given a lambda event payload, returns an array of raw ElasticGraph indexing events. # # The SQS payload is wrapped in the following format already: # See https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#example-standard-queue-message-event for more details # { - # Records: { - # [ - # { body: }, - # { body: }, - # ... - # ] - # } + # "Records" => [ + # {"body" => }, + # {"body" => }, + # ... + # ] # } # - # Each entry in "Records" is a SQS entry. Since lambda handles some batching + # Each entry in "Records" is a SQS entry. Since lambda handles some batching # for you (with some limits), you can get multiple. # # We also want to do our own batching in order to cram more into a given payload - # and issue fewer SQS entries and Lambda invocations when possible. As such, we - # encoded multiple JSON with JSON Lines (http://jsonlines.org/) in record body. - # Each JSON Lines object under 'body' should be of the form: - # - # {op: 'upsert', __typename: 'Payment', id: "123", version: "1", record: {...} } \n - # {op: 'upsert', __typename: 'Payment', id: "123", version: "2", record: {...} } \n - # ... - # Note: "\n" at the end of each line is a single byte newline control character, instead of a string sequence + # and issue fewer SQS entries and Lambda invocations when possible. The default decoder expects + # JSON Lines (http://jsonlines.org/), but custom decoders can support other payload formats. def events_from(lambda_event) sqs_received_at_by_message_id = {} # : Hash[String, String] lambda_event.fetch("Records").flat_map do |record| @@ -80,7 +73,7 @@ def events_from(lambda_event) sqs_metadata = sqs_metadata.except("latency_timestamps") end - parse_jsonl(record.fetch("body")).map do |event| + decoded_events_from(record.fetch("body")).map do |event| ElasticGraph::Support::HashUtil.deep_merge(event, sqs_metadata) end end.tap do @@ -93,11 +86,12 @@ def events_from(lambda_event) S3_OFFLOADING_INDICATOR = '["software.amazon.payloadoffloading.PayloadS3Pointer"' - def parse_jsonl(jsonl_string) - if jsonl_string.start_with?(S3_OFFLOADING_INDICATOR) - jsonl_string = get_payload_from_s3(jsonl_string) + def decoded_events_from(payload) + if payload.start_with?(S3_OFFLOADING_INDICATOR) + payload = get_payload_from_s3(payload) end - jsonl_string.split("\n").map { |event| JSON.parse(event) } + + @indexing_event_decoder.decode(payload) end def extract_sqs_metadata(record) diff --git a/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs b/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs index 65df06668..9c34781fe 100644 --- a/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs +++ b/elasticgraph-indexer_lambda/sig/elastic_graph/indexer_lambda/sqs_processor.rbs @@ -5,6 +5,7 @@ module ElasticGraph Indexer::Processor, logger: ::Logger, ignore_sqs_latency_timestamps_from_arns: ::Set[::String], + indexing_event_decoder: Indexer::indexingEventDecoder, ?s3_client: Aws::S3::Client?, ) -> void @@ -14,6 +15,7 @@ module ElasticGraph @indexer_processor: Indexer::Processor @logger: ::Logger + @indexing_event_decoder: Indexer::indexingEventDecoder @s3_client: Aws::S3::Client? attr_reader ignore_sqs_latency_timestamps_from_arns: ::Set[::String] @@ -22,7 +24,7 @@ module ElasticGraph S3_OFFLOADING_INDICATOR: String def extract_sqs_metadata: (::Hash[String, untyped]) -> ::Hash[::String, untyped] def millis_to_iso8601: (::String) -> ::String? - def parse_jsonl: (::String) -> ::Array[::Hash[::String, untyped]] + def decoded_events_from: (::String) -> ::Array[::Hash[::String, untyped]] def get_payload_from_s3: (::String) -> ::String def s3_client: () -> Aws::S3::Client def format_response: ( diff --git a/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb b/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb index eb1343e5d..9e34eedfd 100644 --- a/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb +++ b/elasticgraph-indexer_lambda/spec/unit/elastic_graph/indexer_lambda/sqs_processor_spec.rb @@ -8,6 +8,7 @@ require "elastic_graph/errors" require "elastic_graph/indexer/failed_event_error" +require "elastic_graph/indexer/indexing_event_decoder" require "elastic_graph/indexer/processor" require "elastic_graph/indexer_lambda/sqs_processor" require "elastic_graph/spec_support/lambda_function" @@ -75,6 +76,22 @@ module IndexerLambda ], refresh_indices: false) end + it "uses the configured indexing event decoder" do + custom_decoder = instance_spy(Indexer::IndexingEventDecoder::Interface, decode: [{"field1" => {}}]) + lambda_event = { + "Records" => [ + sqs_message("a", "not-json") + ] + } + + build_sqs_processor(indexing_event_decoder: custom_decoder).process(lambda_event) + + expect(custom_decoder).to have_received(:decode).with("not-json") + expect(indexer_processor).to have_received(:process_returning_failures).with([ + {"field1" => {}, "message_id" => "a"} + ], refresh_indices: false) + end + it "logs the SQS message ids received in the lambda event and the `sqs_received_at` if available" do sent_timestamp_millis = "796010423456" sent_timestamp_iso8601 = "1995-03-24T02:00:23.456Z" @@ -377,9 +394,16 @@ def jsonl(*items) end def build_sqs_processor(**options) + json_lines_decoder = Indexer::IndexingEventDecoder::JSONLines.new( + config: {}, + schema_artifacts: nil, # not used by `JSONLines` + logger: logger + ) + SqsProcessor.new( indexer_processor, logger: logger, + indexing_event_decoder: json_lines_decoder, ignore_sqs_latency_timestamps_from_arns: ignore_sqs_latency_timestamps_from_arns, **options ) diff --git a/elasticgraph-local/lib/elastic_graph/local/spec_support/config_schema.yaml b/elasticgraph-local/lib/elastic_graph/local/spec_support/config_schema.yaml index b42bfc9f5..139ca1cf5 100644 --- a/elasticgraph-local/lib/elastic_graph/local/spec_support/config_schema.yaml +++ b/elasticgraph-local/lib/elastic_graph/local/spec_support/config_schema.yaml @@ -508,6 +508,45 @@ properties: - {} - WidgetWorkspace: - ABC12345678 + indexing_event_decoder: + description: Extension object used to decode raw indexing payloads into ElasticGraph + indexing event hashes. The default decoder expects JSON Lines. + type: object + properties: + name: + description: The name of the indexing event decoder extension class. + type: string + pattern: "^[A-Z]\\w+(::[A-Z]\\w+)*$" + examples: + - MyCompany::ElasticGraph::CSVIndexingEventDecoder + require_path: + description: The path to require to load the indexing event decoder extension. + type: string + minLength: 1 + examples: + - "./lib/my_company/elastic_graph/csv_indexing_event_decoder" + config: + description: Configuration for the indexing event decoder. Will be passed + into the decoder's `#initialize` method. + type: object + default: {} + examples: + - {} + - delimiter: "," + required: + - name + - require_path + default: + name: ElasticGraph::Indexer::IndexingEventDecoder::JSONLines + require_path: elastic_graph/indexer/indexing_event_decoder + examples: + - name: ElasticGraph::Indexer::IndexingEventDecoder::JSONLines + require_path: elastic_graph/indexer/indexing_event_decoder + - name: MyCompany::ElasticGraph::CSVIndexingEventDecoder + require_path: "./lib/my_company/elastic_graph/csv_indexing_event_decoder" + config: + delimiter: "," + additionalProperties: false additionalProperties: false logger: description: Configuration for logging used by all parts of ElasticGraph. diff --git a/elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/lambda_function.rb b/elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/lambda_function.rb index 40e81863d..d58479f39 100644 --- a/elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/lambda_function.rb +++ b/elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/lambda_function.rb @@ -28,6 +28,7 @@ def initialize @sqs_processor = IndexerLambda::SqsProcessor.new( warehouse_lambda.processor, ignore_sqs_latency_timestamps_from_arns: ignore_sqs_latency_timestamps_from_arns, + indexing_event_decoder: warehouse_lambda.indexer.indexing_event_decoder, logger: warehouse_lambda.logger ) end