Skip to content

Commit ab31afa

Browse files
committed
Generalize indexing schema versions
1 parent 198a1cb commit ab31afa

35 files changed

Lines changed: 372 additions & 153 deletions

File tree

elasticgraph-indexer/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,7 @@ def decode(payload)
7474
# return an array of ElasticGraph indexing event hashes
7575
end
7676
```
77+
78+
Decoded event hashes do not need to provide a schema version. When a version is omitted, the latest
79+
available schema artifact version is used for validation and record preparation. Decoders may include
80+
`schema_version` to request a specific schema artifact version.

elasticgraph-indexer/lib/elastic_graph/indexer/indexing_event_decoder.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ def initialize(config:, schema_artifacts:, logger:)
2424
end
2525

2626
# @param payload [String] a raw payload from the transport
27-
# @return [Array<Hash<String, Object>>] the decoded ElasticGraph indexing events
27+
# @return [Array<Hash<String, Object>>] the decoded ElasticGraph indexing events. Events do not
28+
# need to include a schema version; when omitted, the latest available schema version is used.
2829
def decode(payload)
2930
# :nocov: -- must return an array to satisfy Steep type checking but never called
3031
[]

elasticgraph-indexer/lib/elastic_graph/indexer/operation/factory.rb

Lines changed: 34 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -28,37 +28,30 @@ class Factory < Support::MemoizableData.define(
2828
def build(event)
2929
event = prepare_event(event)
3030

31-
selected_json_schema_version = select_json_schema_version(event) { |failure| return failure }
31+
requested_schema_version = schema_version_from(event)
32+
selected_schema_version = select_schema_version(event, requested_schema_version) { |failure| return failure }
33+
event = event.merge(SCHEMA_VERSION_KEY => requested_schema_version)
3234

33-
# Because the `select_json_schema_version` picks the closest-matching json schema version, the incoming
34-
# event might not match the expected json_schema_version value in the json schema (which is a `const` field).
35-
# This is by design, since we're picking a schema based on best-effort, so to avoid that by-design validation error,
36-
# performing the envelope validation on a "patched" version of the event.
37-
event_with_patched_envelope = event.merge({JSON_SCHEMA_VERSION_KEY => selected_json_schema_version})
35+
event_for_validation = schema_artifacts.event_for_schema_version_validation(event, selected_schema_version)
3836

39-
if (error_message = validator(EVENT_ENVELOPE_JSON_SCHEMA_NAME, selected_json_schema_version).validate_with_error_message(event_with_patched_envelope))
37+
if (error_message = validator(EVENT_ENVELOPE_JSON_SCHEMA_NAME, selected_schema_version).validate_with_error_message(event_for_validation))
4038
return build_failed_result(event, "event payload", error_message)
4139
end
4240

43-
failed_result = validate_record_returning_failure(event, selected_json_schema_version)
41+
failed_result = validate_record_returning_failure(event, selected_schema_version)
4442
failed_result || BuildResult.success(build_all_operations_for(
4543
event,
46-
record_preparer_factory.for_json_schema_version(selected_json_schema_version)
44+
record_preparer_factory.for_schema_version(selected_schema_version)
4745
))
4846
end
4947

5048
private
5149

52-
def select_json_schema_version(event)
53-
available_json_schema_versions = schema_artifacts.available_json_schema_versions
50+
def select_schema_version(event, requested_schema_version)
51+
available_schema_versions = schema_artifacts.available_schema_versions
5452

55-
requested_json_schema_version = event[JSON_SCHEMA_VERSION_KEY]
56-
57-
# First check that a valid value has been requested (a positive integer)
58-
if !event.key?(JSON_SCHEMA_VERSION_KEY)
59-
yield build_failed_result(event, JSON_SCHEMA_VERSION_KEY, "Event lacks a `#{JSON_SCHEMA_VERSION_KEY}`")
60-
elsif !requested_json_schema_version.is_a?(Integer) || requested_json_schema_version < 1
61-
yield build_failed_result(event, JSON_SCHEMA_VERSION_KEY, "#{JSON_SCHEMA_VERSION_KEY} (#{requested_json_schema_version}) must be a positive integer.")
53+
unless requested_schema_version.is_a?(Integer) && requested_schema_version >= 1
54+
yield build_failed_result(event, SCHEMA_VERSION_KEY, "#{SCHEMA_VERSION_KEY} (#{requested_schema_version}) must be a positive integer.")
6255
end
6356

6457
# The requested version might not necessarily be available (if the publisher is deployed ahead of the indexer, or an old schema
@@ -67,46 +60,46 @@ def select_json_schema_version(event)
6760
# the event can still be indexed.
6861
#
6962
# This min_by block will take the closest version in the list. If a tie occurs, the first value in the list wins. The desired
70-
# behavior is in the event of a tie (highly unlikely, there shouldn't be a gap in available json schema versions), the higher version
63+
# behavior is in the event of a tie (highly unlikely, there shouldn't be a gap in available schema versions), the higher version
7164
# should be selected. So to get that behavior, the list is sorted in descending order.
7265
#
73-
selected_json_schema_version = available_json_schema_versions.sort.reverse.min_by { |version| (requested_json_schema_version - version).abs }
66+
selected_schema_version = available_schema_versions.sort.reverse.min_by { |version| (requested_schema_version - version).abs }
7467

75-
if selected_json_schema_version != requested_json_schema_version
68+
if selected_schema_version != requested_schema_version
7669
logger.info({
77-
"message_type" => "ElasticGraphMissingJSONSchemaVersion",
70+
"message_type" => "ElasticGraphMissingSchemaVersion",
7871
"message_id" => event["message_id"],
7972
"event_id" => EventID.from_event(event),
8073
"event_type" => event["type"],
81-
"requested_json_schema_version" => requested_json_schema_version,
82-
"selected_json_schema_version" => selected_json_schema_version
74+
"requested_schema_version" => requested_schema_version,
75+
"selected_schema_version" => selected_schema_version
8376
})
8477
end
8578

86-
if selected_json_schema_version.nil?
79+
if selected_schema_version.nil?
8780
yield build_failed_result(
88-
event, JSON_SCHEMA_VERSION_KEY,
89-
"Failed to select json schema version. Requested version: #{event[JSON_SCHEMA_VERSION_KEY]}. \
90-
Available json schema versions: #{available_json_schema_versions.sort.join(", ")}"
81+
event, SCHEMA_VERSION_KEY,
82+
"Failed to select schema version. Requested version: #{requested_schema_version}. \
83+
Available schema versions: #{available_schema_versions.sort.join(", ")}"
9184
)
9285
end
9386

94-
selected_json_schema_version
87+
selected_schema_version
9588
end
9689

97-
def validator(type, selected_json_schema_version)
98-
factory = validator_factories_by_version[selected_json_schema_version] # : Support::JSONSchema::ValidatorFactory
90+
def validator(type, selected_schema_version)
91+
factory = validator_factories_by_version[selected_schema_version] # : Support::JSONSchema::ValidatorFactory
9992
factory.validator_for(type)
10093
end
10194

10295
def validator_factories_by_version
103-
@validator_factories_by_version ||= ::Hash.new do |hash, json_schema_version|
96+
@validator_factories_by_version ||= ::Hash.new do |hash, schema_version|
10497
factory = Support::JSONSchema::ValidatorFactory.new(
105-
schema: schema_artifacts.json_schemas_for(json_schema_version),
98+
schema: schema_artifacts.json_schemas_for(schema_version),
10699
sanitize_pii: true
107100
)
108101
factory = configure_record_validator.call(factory) if configure_record_validator
109-
hash[json_schema_version] = factory
102+
hash[schema_version] = factory
110103
end
111104
end
112105

@@ -117,10 +110,14 @@ def prepare_event(event)
117110
event.merge("record" => event["record"].merge("id" => event.fetch("id")))
118111
end
119112

120-
def validate_record_returning_failure(event, selected_json_schema_version)
113+
def schema_version_from(event)
114+
event.fetch(SCHEMA_VERSION_KEY) { schema_artifacts.available_schema_versions.max }
115+
end
116+
117+
def validate_record_returning_failure(event, selected_schema_version)
121118
record = event.fetch("record")
122119
graphql_type_name = event.fetch("type")
123-
validator = validator(graphql_type_name, selected_json_schema_version)
120+
validator = validator(graphql_type_name, selected_schema_version)
124121

125122
if (error_message = validator.validate_with_error_message(record))
126123
build_failed_result(event, "#{graphql_type_name} record", error_message)
@@ -130,7 +127,7 @@ def validate_record_returning_failure(event, selected_json_schema_version)
130127
def build_failed_result(event, payload_description, validation_message)
131128
message = "Malformed #{payload_description}. #{validation_message}"
132129

133-
# Here we use the `RecordPreparer::Identity` record preparer because we may not have a valid JSON schema
130+
# Here we use the `RecordPreparer::Identity` record preparer because we may not have a valid schema
134131
# version number in this case (which is usually required to get a `RecordPreparer` from the factory), and
135132
# we won't wind up using the record preparer for real on these operations, anyway.
136133
operations = build_all_operations_for(event, RecordPreparer::Identity)

elasticgraph-indexer/lib/elastic_graph/indexer/processor.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ def calculate_latency_metrics(successful_operations, noop_results)
125125
"message_id" => event["message_id"],
126126
"event_type" => event.fetch("type"),
127127
"event_id" => EventID.from_event(event).to_s,
128-
JSON_SCHEMA_VERSION_KEY => event.fetch(JSON_SCHEMA_VERSION_KEY),
128+
SCHEMA_VERSION_KEY => event.fetch(SCHEMA_VERSION_KEY),
129129
"latencies_in_ms_from" => latencies_in_ms_from,
130130
"slo_results" => slo_results,
131131
"result" => result

elasticgraph-indexer/lib/elastic_graph/indexer/record_preparer.rb

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
module ElasticGraph
1212
class Indexer
1313
class RecordPreparer
14-
# Provides the ability to get a `RecordPreparer` for a specific JSON schema version.
14+
# Provides the ability to get a `RecordPreparer` for a specific schema artifact version.
1515
class Factory
1616
def initialize(schema_artifacts)
1717
@schema_artifacts = schema_artifacts
@@ -21,23 +21,23 @@ def initialize(schema_artifacts)
2121
hash[type_name] = scalar_types_by_name[type_name]&.load_indexing_preparer&.extension_class
2222
end # : ::Hash[::String, SchemaArtifacts::RuntimeMetadata::extensionClass?]
2323

24-
@preparers_by_json_schema_version = ::Hash.new do |hash, version|
24+
@preparers_by_schema_version = ::Hash.new do |hash, version|
2525
hash[version] = RecordPreparer.new(
2626
indexing_preparer_by_scalar_type_name,
2727
build_type_metas_from(@schema_artifacts.json_schemas_for(version))
2828
)
2929
end
3030
end
3131

32-
# Gets the `RecordPreparer` for the given JSON schema version.
33-
def for_json_schema_version(json_schema_version)
34-
@preparers_by_json_schema_version[json_schema_version] # : RecordPreparer
32+
# Gets the `RecordPreparer` for the given schema artifact version.
33+
def for_schema_version(schema_version)
34+
@preparers_by_schema_version[schema_version] # : RecordPreparer
3535
end
3636

37-
# Gets the `RecordPreparer` for the latest JSON schema version. Intended primarily
37+
# Gets the `RecordPreparer` for the latest schema artifact version. Intended primarily
3838
# for use in tests for convenience.
39-
def for_latest_json_schema_version
40-
for_json_schema_version(@schema_artifacts.latest_json_schema_version)
39+
def for_latest_schema_version
40+
for_schema_version(@schema_artifacts.latest_schema_version)
4141
end
4242

4343
private

elasticgraph-indexer/lib/elastic_graph/indexer/test_support/converters.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ def self.upsert_event_for(record)
2121
"id" => record.fetch("id"),
2222
"type" => record.fetch("__typename"),
2323
"version" => record.fetch("__version"),
24-
"record" => record.except("__typename", "__version", "__json_schema_version"),
25-
JSON_SCHEMA_VERSION_KEY => record.fetch("__json_schema_version")
24+
"record" => record.except("__typename", "__version", "__schema_version"),
25+
SCHEMA_VERSION_KEY => record.fetch("__schema_version")
2626
}
2727
end
2828

elasticgraph-indexer/sig/elastic_graph/indexer/operation/factory.rbs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ module ElasticGraph
4242
@validator_factories_by_version: ::Hash[::Integer, Support::JSONSchema::ValidatorFactory]?
4343
def validator_factories_by_version: () -> ::Hash[::Integer, Support::JSONSchema::ValidatorFactory]
4444

45-
def select_json_schema_version: (event) { (BuildResult) -> bot } -> (::Integer | bot)
45+
def select_schema_version: (event, untyped) { (BuildResult) -> bot } -> (::Integer | bot)
4646
def prepare_event: (event) -> event
47+
def schema_version_from: (event) -> untyped
4748
def validate_record_returning_failure: (event, ::Integer) -> BuildResult?
4849
def build_failed_result: (event, ::String, ::String) -> BuildResult
4950
def build_all_operations_for: (event, _RecordPreparer) -> ::Array[_Operation]

elasticgraph-indexer/sig/elastic_graph/indexer/record_preparer.rbs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,13 @@ module ElasticGraph
1111
class Factory
1212
def initialize: (schemaArtifacts) -> void
1313

14-
def for_json_schema_version: (::Integer) -> RecordPreparer
15-
def for_latest_json_schema_version: () -> RecordPreparer
14+
def for_schema_version: (::Integer) -> RecordPreparer
15+
def for_latest_schema_version: () -> RecordPreparer
1616

1717
private
1818

1919
@schema_artifacts: schemaArtifacts
20-
@preparers_by_json_schema_version: ::Hash[::Integer, RecordPreparer]
20+
@preparers_by_schema_version: ::Hash[::Integer, RecordPreparer]
2121

2222
def build_type_metas_from: (::Hash[::String, untyped]) -> ::Array[TypeMetadata]
2323
end

elasticgraph-indexer/spec/acceptance/schema_evolution_spec.rb

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def build_address_event_without_geolocation
9797
end
9898

9999
def build_widget(json_schema_version:)
100-
event = build_upsert_event(:widget, __json_schema_version: json_schema_version)
100+
event = build_upsert_event(:widget, __schema_version: json_schema_version)
101101
event.merge("record" => (yield event.fetch("record")))
102102
end
103103
end
@@ -117,7 +117,7 @@ def build_widget(json_schema_version:)
117117
write_address_schema_def(json_schema_version: 2, address_extras: "t.deleted_field 'deprecated'")
118118
dump_artifacts
119119

120-
event = build_upsert_event(:address, id: "abc", deprecated: "foo", __json_schema_version: 1)
120+
event = build_upsert_event(:address, id: "abc", deprecated: "foo", __schema_version: 1)
121121
expect(event.dig("record", "deprecated")).to eq("foo")
122122

123123
boot_indexer.processor.process([event], refresh_indices: true)
@@ -163,8 +163,8 @@ def get_address_payload(id)
163163
# included at that part of the JSON schema. So here we verify that the factory includes that.
164164
expect(build(:team_season)).to include(__typename: "TeamSeason")
165165

166-
v1_event = build_upsert_event(:team, __json_schema_version: 1)
167-
v2_event = build_upsert_event(:team, __json_schema_version: 2)
166+
v1_event = build_upsert_event(:team, __schema_version: 1)
167+
v2_event = build_upsert_event(:team, __schema_version: 2)
168168
.then { |event| ::JSON.generate(event) }
169169
# Fix the event to align with the v2 schema, since `build_upsert_event` doesn't automatically
170170
# know that the `__typename` should be `SeasonOfATeam` instead of `TeamSeason`.
@@ -201,8 +201,8 @@ def get_address_payload(id)
201201
end
202202
dump_artifacts
203203

204-
v1_event = build_upsert_event(:team, __json_schema_version: 1)
205-
v2_event = build_upsert_event(:team, __json_schema_version: 2)
204+
v1_event = build_upsert_event(:team, __schema_version: 1)
205+
v2_event = build_upsert_event(:team, __schema_version: 2)
206206

207207
expect {
208208
boot_indexer.processor.process([v1_event, v2_event], refresh_indices: true)
@@ -245,9 +245,9 @@ def get_address_payload(id)
245245
end
246246
dump_artifacts
247247

248-
v1_event = build_upsert_event(:team, __json_schema_version: 1)
248+
v1_event = build_upsert_event(:team, __schema_version: 1)
249249
v1_event = ::JSON.parse(::JSON.generate(v1_event).gsub('"name":', '"full_name":'))
250-
v2_event = build_upsert_event(:team, __json_schema_version: 2)
250+
v2_event = build_upsert_event(:team, __schema_version: 2)
251251

252252
expect {
253253
boot_indexer.processor.process([v1_event, v2_event], refresh_indices: true)
@@ -289,7 +289,7 @@ def get_address_payload(id)
289289
end
290290
dump_artifacts
291291

292-
v1_event = build_upsert_event(:team, __json_schema_version: 1)
292+
v1_event = build_upsert_event(:team, __schema_version: 1)
293293

294294
expect {
295295
boot_indexer.processor.process([v1_event], refresh_indices: true)
@@ -324,7 +324,7 @@ def get_address_payload(id)
324324
write_address_schema_def(json_schema_version: 2, schema_extras: 'schema.deleted_type "Team"')
325325
dump_artifacts
326326

327-
v1_event = build_upsert_event(:team, __json_schema_version: 1)
327+
v1_event = build_upsert_event(:team, __schema_version: 1)
328328
boot_indexer.processor.process([v1_event], refresh_indices: true)
329329

330330
expect(search_for_ids("teams")).to be_empty

elasticgraph-indexer/spec/support/indexing_preparer.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
schema.object_type "Object" do |t|
3030
t.field "scalar", scalar_type
3131
end
32-
end).record_preparer_factory.for_latest_json_schema_version
32+
end).record_preparer_factory.for_latest_schema_version
3333
end
3434

3535
def prepare_scalar_value(value)

0 commit comments

Comments
 (0)