Skip to content

Commit 0720530

Browse files
committed
Add configurable indexing event decoder
1 parent ef120ae commit 0720530

17 files changed

Lines changed: 368 additions & 32 deletions

File tree

elasticgraph-indexer/README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,29 @@ indexer = ElasticGraph::Indexer.from_yaml_file("config/settings/local.yaml")
4848
events = [] # JSON events read from an async datastream
4949
indexer.processor.process(events)
5050
```
51+
52+
## Custom Payload Decoding
53+
54+
`ElasticGraph::Indexer` can be configured with an indexing event decoder extension. Decoders turn raw payload strings
55+
from a transport into ElasticGraph indexing event hashes before the normal validation and indexing pipeline runs. The
56+
default decoder expects JSON Lines.
57+
58+
```yaml
59+
indexer:
60+
indexing_event_decoder:
61+
name: MyCompany::ElasticGraph::CSVIndexingEventDecoder
62+
require_path: ./lib/my_company/elastic_graph/csv_indexing_event_decoder
63+
config:
64+
delimiter: ","
65+
```
66+
67+
Decoder extensions must implement:
68+
69+
```ruby
70+
def initialize(config:, schema_artifacts:, logger:)
71+
end
72+
73+
def decode(payload)
74+
# return an array of ElasticGraph indexing event hashes
75+
end
76+
```

elasticgraph-indexer/lib/elastic_graph/indexer.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,19 @@ def operation_factory
8686
end
8787
end
8888

89+
def indexing_event_decoder
90+
@indexing_event_decoder ||= begin
91+
extension = config.indexing_event_decoder
92+
decoder_class = extension.extension_class # : ::Class
93+
94+
__skip__ = decoder_class.new(
95+
config: extension.config,
96+
schema_artifacts: schema_artifacts,
97+
logger: logger
98+
)
99+
end
100+
end
101+
89102
def monotonic_clock
90103
@monotonic_clock ||= begin
91104
require "elastic_graph/support/monotonic_clock"

elasticgraph-indexer/lib/elastic_graph/indexer/config.rb

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,17 @@
88

99
require "elastic_graph/support/config"
1010
require "elastic_graph/errors"
11+
require "elastic_graph/indexer/indexing_event_decoder"
12+
require "elastic_graph/schema_artifacts/runtime_metadata/extension_loader"
1113

1214
module ElasticGraph
1315
class Indexer
14-
class Config < Support::Config.define(:latency_slo_thresholds_by_timestamp_in_ms, :skip_derived_indexing_type_updates)
16+
class Config < Support::Config.define(:latency_slo_thresholds_by_timestamp_in_ms, :skip_derived_indexing_type_updates, :indexing_event_decoder)
17+
DEFAULT_INDEXING_EVENT_DECODER = {
18+
"name" => "ElasticGraph::Indexer::IndexingEventDecoder::JSONLines",
19+
"require_path" => "elastic_graph/indexer/indexing_event_decoder"
20+
}
21+
1522
json_schema at: "indexer",
1623
optional: false,
1724
description: "Configuration for indexing operations and metrics used by `elasticgraph-indexer`.",
@@ -42,17 +49,64 @@ class Config < Support::Config.define(:latency_slo_thresholds_by_timestamp_in_ms
4249
{}, # : untyped
4350
{"WidgetWorkspace" => ["ABC12345678"]}
4451
]
52+
},
53+
indexing_event_decoder: {
54+
description: "Extension object used to decode raw indexing payloads into ElasticGraph indexing event hashes. The default decoder expects JSON Lines.",
55+
type: "object",
56+
properties: {
57+
name: {
58+
description: "The name of the indexing event decoder extension class.",
59+
type: "string",
60+
minLength: 1,
61+
examples: ["MyCompany::ElasticGraph::CSVIndexingEventDecoder"]
62+
},
63+
require_path: {
64+
description: "The path to require to load the indexing event decoder extension.",
65+
type: "string",
66+
minLength: 1,
67+
examples: ["./lib/my_company/elastic_graph/csv_indexing_event_decoder"]
68+
},
69+
config: {
70+
description: "Configuration for the indexing event decoder. Will be passed into the decoder's `#initialize` method.",
71+
type: "object",
72+
default: {}, # : untyped
73+
examples: [
74+
{}, # : untyped
75+
{"delimiter" => ","}
76+
]
77+
}
78+
},
79+
required: ["name", "require_path"],
80+
default: DEFAULT_INDEXING_EVENT_DECODER,
81+
examples: [
82+
DEFAULT_INDEXING_EVENT_DECODER,
83+
{
84+
"name" => "MyCompany::ElasticGraph::CSVIndexingEventDecoder",
85+
"require_path" => "./lib/my_company/elastic_graph/csv_indexing_event_decoder",
86+
"config" => {"delimiter" => ","}
87+
}
88+
]
4589
}
4690
}
4791

4892
private
4993

50-
def convert_values(skip_derived_indexing_type_updates:, latency_slo_thresholds_by_timestamp_in_ms:)
94+
def convert_values(skip_derived_indexing_type_updates:, latency_slo_thresholds_by_timestamp_in_ms:, indexing_event_decoder:)
5195
{
5296
skip_derived_indexing_type_updates: skip_derived_indexing_type_updates.transform_values(&:to_set),
53-
latency_slo_thresholds_by_timestamp_in_ms: latency_slo_thresholds_by_timestamp_in_ms
97+
latency_slo_thresholds_by_timestamp_in_ms: latency_slo_thresholds_by_timestamp_in_ms,
98+
indexing_event_decoder: load_indexing_event_decoder(indexing_event_decoder)
5499
}
55100
end
101+
102+
def load_indexing_event_decoder(config)
103+
loader = SchemaArtifacts::RuntimeMetadata::ExtensionLoader.new(IndexingEventDecoder::Interface)
104+
loader.load(
105+
config.fetch("name"),
106+
from: config.fetch("require_path"),
107+
config: config["config"] || {}
108+
)
109+
end
56110
end
57111
end
58112
end

elasticgraph-indexer/lib/elastic_graph/indexer/event_id.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ module ElasticGraph
1212
class Indexer
1313
# A unique identifier for an event ingested by the indexer. As a string, takes the form of
1414
# "[type]:[id]@v[version]", such as "Widget:123abc@v7". This format was designed to make it
15-
# easy to put these ids in a comma-seperated list.
15+
# easy to put these ids in a comma-separated list.
1616
EventID = ::Data.define(:type, :id, :version) do
1717
# @implements EventID
1818
def self.from_event(event)
@@ -26,7 +26,7 @@ def to_s
2626

2727
# Steep weirdly expects them here...
2828
# @dynamic initialize, config, datastore_core, schema_artifacts, datastore_router, monotonic_clock
29-
# @dynamic record_preparer_factory, processor, operation_factory, logger
29+
# @dynamic record_preparer_factory, processor, operation_factory, indexing_event_decoder, logger
3030
# @dynamic self.from_parsed_yaml
3131
end
3232
end
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright 2024 - 2026 Block, Inc.
2+
#
3+
# Use of this source code is governed by an MIT-style
4+
# license that can be found in the LICENSE file or at
5+
# https://opensource.org/licenses/MIT.
6+
#
7+
# frozen_string_literal: true
8+
9+
require "json"
10+
11+
module ElasticGraph
12+
class Indexer
13+
module IndexingEventDecoder
14+
# Defines the extension interface implemented by indexing event decoders.
15+
#
16+
# @api private
17+
class Interface
18+
# :nocov:
19+
def initialize(config:, schema_artifacts:, logger:)
20+
end
21+
22+
def decode(payload)
23+
[]
24+
end
25+
# :nocov:
26+
end
27+
28+
# The default indexing event decoder, which expects newline-delimited JSON objects.
29+
#
30+
# @api private
31+
class JSONLines
32+
def initialize(config:, schema_artifacts:, logger:)
33+
end
34+
35+
def decode(payload)
36+
payload.split("\n").map { |event| JSON.parse(event) }
37+
end
38+
end
39+
end
40+
end
41+
end

elasticgraph-indexer/sig/elastic_graph/indexer.rbs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ module ElasticGraph
2828
@operation_factory: Operation::Factory?
2929
def operation_factory: () -> Operation::Factory
3030

31+
@indexing_event_decoder: indexingEventDecoder?
32+
def indexing_event_decoder: () -> indexingEventDecoder
33+
3134
@monotonic_clock: Support::MonotonicClock?
3235
def monotonic_clock: () -> Support::MonotonicClock
3336

elasticgraph-indexer/sig/elastic_graph/indexer/config.rbs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,33 @@ module ElasticGraph
55

66
attr_reader latency_slo_thresholds_by_timestamp_in_ms: ::Hash[::String, ::Integer]
77
attr_reader skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]]
8+
attr_reader indexing_event_decoder: SchemaArtifacts::RuntimeMetadata::Extension
89

910
def initialize: (
1011
?latency_slo_thresholds_by_timestamp_in_ms: ::Hash[::String, ::Integer],
11-
?skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]]) -> void
12+
?skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]],
13+
?indexing_event_decoder: ::Hash[::String, untyped]) -> void
1214

1315
def with: (
1416
?latency_slo_thresholds_by_timestamp_in_ms: ::Hash[::String, ::Integer],
15-
?skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]]) -> Config
17+
?skip_derived_indexing_type_updates: ::Hash[::String, ::Set[::String]],
18+
?indexing_event_decoder: SchemaArtifacts::RuntimeMetadata::Extension) -> Config
1619

1720
def self.members: () -> ::Array[::Symbol]
1821
end
1922

2023
class Config < ConfigSupertype
24+
DEFAULT_INDEXING_EVENT_DECODER: ::Hash[::String, untyped]
25+
2126
private
2227

2328
def convert_values: (
2429
latency_slo_thresholds_by_timestamp_in_ms: untyped,
25-
skip_derived_indexing_type_updates: untyped
30+
skip_derived_indexing_type_updates: untyped,
31+
indexing_event_decoder: untyped
2632
) -> ::Hash[::Symbol, untyped]
33+
34+
def load_indexing_event_decoder: (::Hash[::String, untyped]) -> SchemaArtifacts::RuntimeMetadata::Extension
2735
end
2836
end
2937
end
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
module ElasticGraph
2+
class Indexer
3+
type indexingEventDecoder = IndexingEventDecoder::Interface
4+
5+
module IndexingEventDecoder
6+
class Interface
7+
def initialize: (
8+
config: ::Hash[::Symbol | ::String, untyped],
9+
schema_artifacts: schemaArtifacts?,
10+
logger: ::Logger
11+
) -> void
12+
13+
def decode: (::String) -> ::Array[event]
14+
end
15+
16+
class JSONLines < Interface
17+
end
18+
end
19+
end
20+
end
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Copyright 2024 - 2026 Block, Inc.
2+
#
3+
# Use of this source code is governed by an MIT-style
4+
# license that can be found in the LICENSE file or at
5+
# https://opensource.org/licenses/MIT.
6+
#
7+
# frozen_string_literal: true
8+
9+
class ExampleIndexingEventDecoder
10+
attr_reader :config, :schema_artifacts, :logger
11+
12+
def initialize(config:, schema_artifacts:, logger:)
13+
@config = config
14+
@schema_artifacts = schema_artifacts
15+
@logger = logger
16+
end
17+
18+
def decode(payload)
19+
payload.split(config.fetch("delimiter")).map { |value| {"value" => value} }
20+
end
21+
end
22+
23+
class InvalidIndexingEventDecoder
24+
def initialize(config:, schema_artifacts:, logger:)
25+
end
26+
end

elasticgraph-indexer/spec/unit/elastic_graph/indexer/config_spec.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,34 @@ class Indexer
3131

3232
expect(config.skip_derived_indexing_type_updates).to eq("WidgetCurrency" => ["USD"].to_set)
3333
end
34+
35+
it "uses the JSON Lines indexing event decoder by default" do
36+
expect(Config.new.indexing_event_decoder.extension_class).to be(IndexingEventDecoder::JSONLines)
37+
end
38+
39+
it "loads a configured indexing event decoder" do
40+
config = Config.from_parsed_yaml("indexer" => {
41+
"indexing_event_decoder" => {
42+
"name" => "ExampleIndexingEventDecoder",
43+
"require_path" => "support/example_extensions/indexing_event_decoder",
44+
"config" => {"delimiter" => "|"}
45+
}
46+
})
47+
48+
expect(config.indexing_event_decoder.extension_class).to be(ExampleIndexingEventDecoder)
49+
expect(config.indexing_event_decoder.config).to eq({"delimiter" => "|"})
50+
end
51+
52+
it "verifies that a configured indexing event decoder implements the expected interface" do
53+
expect {
54+
Config.from_parsed_yaml("indexer" => {
55+
"indexing_event_decoder" => {
56+
"name" => "InvalidIndexingEventDecoder",
57+
"require_path" => "support/example_extensions/indexing_event_decoder"
58+
}
59+
})
60+
}.to raise_error Errors::InvalidExtensionError, a_string_including("InvalidIndexingEventDecoder", "Missing instance methods: `decode`")
61+
end
3462
end
3563
end
3664
end

0 commit comments

Comments
 (0)