Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/schema/artifacts/datastore_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,9 @@ index_templates:
required: true
_size:
enabled: true
_source:
excludes:
- workspace_id2
settings:
index.mapping.ignore_malformed: false
index.mapping.coerce: false
Expand Down Expand Up @@ -1505,6 +1508,9 @@ indices:
dynamic: 'false'
_size:
enabled: true
_source:
excludes:
- full_address
settings:
index.mapping.ignore_malformed: false
index.mapping.coerce: false
Expand Down
2 changes: 2 additions & 0 deletions config/schema/artifacts/runtime_metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,7 @@ index_definitions_by_name:
__counts.shapes|type:
source: __self
full_address:
retrieved_from: doc_values
source: __self
geo_location.lat:
source: __self
Expand Down Expand Up @@ -2613,6 +2614,7 @@ index_definitions_by_name:
weight_in_ng_str:
source: __self
workspace_id2:
retrieved_from: doc_values
source: __self
workspace_name:
source: workspace
Expand Down
6 changes: 6 additions & 0 deletions config/schema/artifacts_with_apollo/datastore_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,9 @@ index_templates:
required: true
_size:
enabled: true
_source:
excludes:
- workspace_id2
settings:
index.mapping.ignore_malformed: false
index.mapping.coerce: false
Expand Down Expand Up @@ -1505,6 +1508,9 @@ indices:
dynamic: 'false'
_size:
enabled: true
_source:
excludes:
- full_address
settings:
index.mapping.ignore_malformed: false
index.mapping.coerce: false
Expand Down
2 changes: 2 additions & 0 deletions config/schema/artifacts_with_apollo/runtime_metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,7 @@ index_definitions_by_name:
__counts.shapes|type:
source: __self
full_address:
retrieved_from: doc_values
source: __self
geo_location.lat:
source: __self
Expand Down Expand Up @@ -2642,6 +2643,7 @@ index_definitions_by_name:
weight_in_ng_str:
source: __self
workspace_id2:
retrieved_from: doc_values
source: __self
workspace_name:
source: workspace
Expand Down
6 changes: 3 additions & 3 deletions config/schema/widgets.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
t.field "id", "ID!"

# Here we use an alternate name for this field since it's the routing field and want to verify
# that `name_in_index` works correctly on routing fields.
t.field "workspace_id", "ID", name_in_index: "workspace_id2"
# that `name_in_index` works correctly on routing fields, including when fetched from doc values.
t.field "workspace_id", "ID", name_in_index: "workspace_id2", retrieved_from: :doc_values

# It's a bit funny we have both `amount_cents` and `cost` but it's nice to be able to test
# aggregations on both a root numeric field and on a nested one, so we are keeping both here.
Expand Down Expand Up @@ -367,7 +367,7 @@
# We use `indexing_only: true` here to verify that `id` can be an indexing-only field.
t.field "id", "ID!", indexing_only: true

t.field "full_address", "String!"
t.field "full_address", "String!", retrieved_from: :doc_values
t.field "timestamps", "AddressTimestamps"
t.field "geo_location", "GeoLocation"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def ignored_values_for_routing
def to_datastore_body
@to_datastore_body ||= aggregations_datastore_body
.merge(document_paginator.to_datastore_body)
.merge({highlight: highlight, query: filter_interpreter.build_query(all_filters), _source: source}.compact)
.merge({docvalue_fields: docvalue_fields, highlight: highlight, query: filter_interpreter.build_query(all_filters), _source: source}.compact)
end

def aggregations_datastore_body
Expand All @@ -323,13 +323,33 @@ def aggregations_datastore_body
# we only ask for the fields we need to return.
def source
return true if request_all_fields
requested_source_fields = requested_fields - ["id"]
requested_source_fields = requested_fields_for_source - ["id"]
return false if requested_source_fields.empty?
# Merging in requested_fields as _source:{includes:} based on Elasticsearch documentation:
# https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-source-field.html#include-exclude
{includes: requested_source_fields.to_a}
end

def docvalue_fields
requested_docvalue_fields =
if request_all_fields
# When requesting all fields we send `_source: true`, but fields excluded from stored
# `_source` (because they use `retrieved_from: :doc_values`) still need an alternative
# retrieval path. We therefore request docvalue_fields for any field that ANY index
# definition stores in doc values, unlike the selective path below which requires
# unanimity across all index definitions.
all_docvalue_fields
else
requested_fields.select do |field_path|
requested_via_doc_values?(field_path)
end
end

return nil if requested_docvalue_fields.empty?

requested_docvalue_fields.to_a
end

def highlight
return nil if !request_all_highlights && requested_highlights.empty?

Expand All @@ -343,6 +363,35 @@ def highlight
{fields:, highlight_query:}.compact
end

def requested_fields_for_source
@requested_fields_for_source ||= requested_fields.reject do |field_path|
requested_via_doc_values?(field_path)
end
end

def all_docvalue_fields
@all_docvalue_fields ||= search_index_definitions.flat_map do |index_def|
index_def.fields_by_path.filter_map do |field_path, field|
field_path if field.retrieved_from_doc_values?
end
end.to_set
end

# Returns true only when every participating index definition agrees the field should be
# retrieved via doc values. When they disagree we fall back to `_source` so that source-backed
# indices can return the field normally; the doc-values-backed index will also have the value
# available in `_source` in that case (a disagreement like this should not happen in practice,
# since `retrieved_from` is set once per field definition and propagates to all index definitions).
def requested_via_doc_values?(field_path)
return false if field_path == "id"

field_definitions = search_index_definitions.filter_map do |index_def|
index_def.fields_by_path[field_path]
end

field_definitions.any? && field_definitions.all?(&:retrieved_from_doc_values?)
end

# Encapsulates dependencies of `Query`, giving us something we can expose off of `application`
# to build queries when desired.
class Builder < Support::MemoizableData.define(:runtime_metadata, :logger, :filter_interpreter, :filter_node_interpreter, :default_page_size, :max_page_size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,23 @@

require "elastic_graph/graphql/decoded_cursor"
require "elastic_graph/support/memoizable_data"
require "elastic_graph/support/hash_util"
require "forwardable"

module ElasticGraph
class GraphQL
module DatastoreResponse
# @private
# Sentinel value to distinguish "no default given" from an explicit `nil` default in {Document#fetch_value_at}.
UNSET = ::Object.new.freeze

# Represents a document fetched from the datastore. Exposes both the raw metadata
# provided by the datastore and the doc payload itself. In addition, you can treat
# it just like a document hash using `#[]` or `#fetch`.
Document = Support::MemoizableData.define(:raw_data, :payload, :decoded_cursor_factory) do
# @implements Document
extend Forwardable

def_delegators :payload, :[], :fetch

def self.build(raw_data, decoded_cursor_factory: DecodedCursor::Factory::Null)
source = raw_data.fetch("_source") do
{} # : ::Hash[::String, untyped]
Expand Down Expand Up @@ -51,6 +54,38 @@ def id
raw_data["_id"]
end

def [](key)
return payload[key] if payload.key?(key)
docvalue_field(key)&.first
end

def fetch(key, default = UNSET)
return payload[key] if payload.key?(key)
if (field_values = docvalue_field(key))
return field_values.first
end
return yield(key) if block_given?
return default unless default.equal?(UNSET)
raise KeyError, "key not found: #{key.inspect}"
end

def fetch_value_at(path, default_value: UNSET)
Support::HashUtil.fetch_value_at_path(payload, path) do
if (field_values = docvalue_field(path.join(".")))
next field_values.first
end
next yield(path) if block_given?
next default_value unless default_value.equal?(UNSET)
raise KeyError, "path not found: #{path.join(".")}"
end
end

def value_at(path)
Support::HashUtil.fetch_value_at_path(payload, path) do
docvalue_field(path.join("."))&.first
end
end

def sort
raw_data["sort"]
end
Expand All @@ -77,6 +112,14 @@ def to_s
"#<#{self.class.name} #{datastore_path}>"
end
alias_method :inspect, :to_s

private

# Returns the doc_values field array for the given key, or nil if not present.
# Datastore doc_values are always returned as arrays (e.g. `{"name" => ["Bob"]}`).
def docvalue_field(key)
raw_data.dig("fields", key)
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def filter_results(field_path, values, size)
# `id` within `_source`, given it's available as `_id`.
->(hit) { values.include?(hit.fetch("_id")) }
else
->(hit) { values.intersect?(Support::HashUtil.fetch_leaf_values_at_path(hit.fetch("_source"), field_path).to_set) }
->(hit) { values.intersect?(hit_values_at_path(hit, field_path).to_set) }
end

hits = raw_data.fetch("hits").fetch("hits").select(&filter).first(size)
Expand All @@ -131,6 +131,28 @@ def docs_description
(documents.size < 3) ? documents.inspect : "[#{documents.first}, ..., #{documents.last}]"
end

# Extracts leaf values from a hit, checking `_source` first and falling back to `fields`
# (populated by `docvalue_fields` in the query). When a field is excluded from `_source`
# (e.g. `retrieved_from: :doc_values`), the datastore still returns it under the `fields`
# key because the query explicitly requested it via `docvalue_fields`.
def hit_values_at_path(hit, field_path)
source = hit["_source"]
if source
Support::HashUtil.fetch_leaf_values_at_path(source, field_path) do
docvalue_fields_for(hit, field_path)
end
else
docvalue_fields_for(hit, field_path)
end
end

def docvalue_fields_for(hit, field_path)
fields = hit["fields"]
joined_path = field_path.join(".")
raise KeyError, "key not found: #{joined_path}" unless fields
fields.fetch(joined_path)
end

def total_document_count(default: nil)
super() || default || raise(Errors::CountUnavailableError, "#{__method__} is unavailable; set `query.total_document_count_needed = true` to make it available")
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ def initialize(elasticgraph_graphql:, config:)
end

def resolve(field:, object:, args:, context:)
data =
value =
case object
when DatastoreResponse::Document
object.payload
object.value_at(field.path_in_index)
else
object
Support::HashUtil.fetch_value_at_path(object, field.path_in_index) { nil }
end

value = Support::HashUtil.fetch_value_at_path(data, field.path_in_index) { nil }
value = [] if value.nil? && field.type.list?

if field.type.relay_connection?
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module ElasticGraph
class GraphQL
module DatastoreResponse
UNSET: ::Object

class Document
extend Forwardable

Expand Down Expand Up @@ -29,6 +31,12 @@ module ElasticGraph

def []: (::String) -> untyped
def fetch: (::String) -> untyped
| (::String, untyped) -> untyped
| (::String) { (::String) -> untyped } -> untyped
def fetch_value_at: (::Array[::String]) -> untyped
| (::Array[::String], default_value: untyped) -> untyped
| (::Array[::String]) { (::Array[::String]) -> untyped } -> untyped
def value_at: (::Array[::String]) -> untyped
def index_name: () -> ::String
def index_definition_name: () -> ::String
def id: () -> ::String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ module ElasticGraph
def blank_value: () -> (nil | DatastoreResponse::SearchResponse)

def extract_id_or_ids_from: (
::Hash[::String, untyped],
^(document: ::Hash[::String, untyped], problem: ::String) -> void
DatastoreResponse::Document | ::Hash[::String, untyped],
^(document: DatastoreResponse::Document | ::Hash[::String, untyped], problem: ::String) -> void
) -> (nil | ::String | ::Enumerable[::String])

def normalize_documents: (
Expand Down
31 changes: 31 additions & 0 deletions elasticgraph-graphql/spec/acceptance/datastore_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,23 @@ module ElasticGraph
])
end

it "returns direct leaf fields configured to be fetched from doc values" do
index_records(
widget = build(:widget, workspace_id: "workspace_1"),
address = build(:address, full_address: "123 Main St")
)

widgets = list_widgets(fields: <<~EOS)
id
#{case_correctly("workspace_id")}
EOS

addresses = list_addresses(fields: case_correctly("full_address"))

expect(widgets).to eq([string_hash_of(widget, :id, :workspace_id)])
expect(addresses).to eq([string_hash_of(address, :full_address)])
end

describe "timeout behavior" do
it "raises `Errors::RequestExceededDeadlineError` if the specified timeout is exceeded by a datastore query" do
expect {
Expand Down Expand Up @@ -170,6 +187,20 @@ def list_addresses(fields:, gql: graphql, **query_args)
QUERY
end

def list_widgets(fields:, gql: graphql, **query_args)
call_graphql_query(<<~QUERY, gql: gql).dig("data", "widgets", "edges").map { |we| we.fetch("node") }
query {
widgets#{graphql_args(query_args)} {
edges {
node {
#{fields}
}
}
}
}
QUERY
end

def query_all_indexed_type_counts
call_graphql_query(<<~QUERY).fetch("data")
query {
Expand Down
Loading
Loading