Skip to content

Commit ad61890

Browse files
authored
Add automatic X-Opaque-Id headers to datastore msearch requests (#1135)
1 parent dd16218 commit ad61890

26 files changed

Lines changed: 317 additions & 39 deletions

File tree

elasticgraph-graphql/lib/elastic_graph/graphql/client.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,13 @@ class GraphQL
1313
# meant to be a friendly/human readable string (such as a service name)
1414
# where as `source_description` is meant to be an opaque string describing
1515
# where `name` came from.
16-
class Client < Data.define(:source_description, :name)
16+
class Client < Data.define(:source_description, :name, :extra_opaque_id_parts)
1717
# `Data.define` provides the following methods:
18-
# @dynamic initialize, name, source_description, with
18+
# @dynamic name, source_description, extra_opaque_id_parts, with
1919

20-
ANONYMOUS = new("(anonymous)", "(anonymous)")
21-
ELASTICGRAPH_INTERNAL = new("(ElasticGraphInternal)", "(ElasticGraphInternal)")
20+
def initialize(source_description:, name:, extra_opaque_id_parts: [])
21+
super
22+
end
2223

2324
def description
2425
if source_description == name
@@ -38,6 +39,9 @@ def resolve(http_request)
3839
Client::ANONYMOUS
3940
end
4041
end
42+
43+
ANONYMOUS = new("(anonymous)", "(anonymous)")
44+
ELASTICGRAPH_INTERNAL = new("(ElasticGraphInternal)", "(ElasticGraphInternal)")
4145
end
4246
end
4347
end

elasticgraph-graphql/lib/elastic_graph/graphql/datastore_search_router.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
require "elastic_graph/errors"
1111
require "elastic_graph/graphql/datastore_response/search_response"
1212
require "elastic_graph/graphql/query_details_tracker"
13+
require "elastic_graph/support/opaque_id"
1314
require "elastic_graph/support/threading"
1415

1516
module ElasticGraph
@@ -32,7 +33,7 @@ def initialize(
3233

3334
# Sends the datastore a multi-search request based on the given queries.
3435
# Returns a hash of responses keyed by the query.
35-
def msearch(queries, query_tracker: QueryDetailsTracker.empty)
36+
def msearch(queries, query_tracker: QueryDetailsTracker.empty, opaque_id_parts: ["elasticgraph-graphql"])
3637
DatastoreQuery.perform(queries) do |header_body_tuples_by_query|
3738
# Here we set a client-side timeout, which causes the client to give up and close the connection.
3839
# According to [1]--"We have a new way to cancel search requests efficiently from the client
@@ -64,6 +65,9 @@ def msearch(queries, query_tracker: QueryDetailsTracker.empty)
6465
# even though Faraday (the underlying HTTP client) does. To work around this, we pass our desired
6566
# timeout in a specific header that the `SupportTimeouts` Faraday middleware will use.
6667
headers = {TIMEOUT_MS_HEADER => msearch_request_timeout_from(queries)&.to_s}.compact
68+
if (opaque_id = Support::OpaqueID.build_header(opaque_id_parts))
69+
headers[OPAQUE_ID_HEADER] = opaque_id
70+
end
6771

6872
queries_and_header_body_tuples_by_datastore_client = header_body_tuples_by_query.group_by do |(query, header_body_tuples)|
6973
@datastore_clients_by_name.fetch(query.cluster_name)

elasticgraph-graphql/lib/elastic_graph/graphql/query_executor.rb

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ def execute(
5959
client: client,
6060
context: context.merge({
6161
monotonic_clock_deadline: timeout_in_ms&.+(start_time_in_ms),
62-
elastic_graph_query_tracker: query_tracker
62+
elastic_graph_query_tracker: query_tracker,
63+
elastic_graph_client: client
6364
}.compact)
6465
)
6566

@@ -89,7 +90,7 @@ def execute(
8990
@logger.info({
9091
"message_type" => "ElasticGraphQueryExecutorQueryDuration",
9192
"client" => client.name,
92-
"query_fingerprint" => fingerprint_for(query),
93+
"query_fingerprint" => query.fingerprint,
9394
"query_name" => query.selected_operation_name,
9495
"duration_ms" => duration,
9596
# How long the datastore queries took according to what the datastore itself reported.
@@ -142,7 +143,7 @@ def build_and_execute_query(query_string:, variables:, operation_name:, context:
142143
def execute_query(query, client:)
143144
# Log the query before starting to execute it, in case there's a lambda timeout, in which case
144145
# we won't get any other logged messages for the query.
145-
@logger.info "Starting to execute query #{fingerprint_for(query)} for client #{client.description}."
146+
@logger.info "Starting to execute query #{query.fingerprint} for client #{client.description}."
146147

147148
query.result
148149
rescue => ex
@@ -163,11 +164,7 @@ def execute_query(query, client:)
163164
# the fingerprint to make sure that we at least have some identification information
164165
# about the query.
165166
def full_description_of(query)
166-
"#{fingerprint_for(query)} #{query.sanitized_query_string}"
167-
end
168-
169-
def fingerprint_for(query)
170-
query.query_string ? query.fingerprint : "(no query string)"
167+
"#{query.fingerprint} #{query.sanitized_query_string}"
171168
end
172169

173170
def slo_result_for(query, duration)

elasticgraph-graphql/lib/elastic_graph/graphql/resolvers/query_source.rb

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,48 @@ module Resolvers
2020
# Note: `NestedRelationshipsSource` implements further optimizations on top of this, and should
2121
# be used rather than this class when applicable.
2222
class QuerySource < ::GraphQL::Dataloader::Source
23-
def initialize(datastore_router, query_tracker)
23+
def initialize(datastore_router, query_tracker, opaque_id_parts = [])
2424
@datastore_router = datastore_router
2525
@query_tracker = query_tracker
26+
@opaque_id_parts = opaque_id_parts
2627
end
2728

2829
def fetch(queries)
29-
responses_by_query = @datastore_router.msearch(queries, query_tracker: @query_tracker)
30+
responses_by_query = @datastore_router.msearch(
31+
queries,
32+
query_tracker: @query_tracker,
33+
opaque_id_parts: @opaque_id_parts
34+
)
3035
queries.map { |q| responses_by_query.fetch(q) }
3136
end
3237

3338
def self.execute_many(queries, for_context:)
3439
datastore_router = for_context.fetch(:datastore_search_router)
3540
query_tracker = for_context.fetch(:elastic_graph_query_tracker)
41+
opaque_id_parts = datastore_opaque_id_parts_for(for_context)
3642
dataloader = for_context.dataloader
3743

38-
responses = dataloader.with(self, datastore_router, query_tracker).load_all(queries)
44+
responses = dataloader.with(self, datastore_router, query_tracker, opaque_id_parts).load_all(queries)
3945
queries.zip(responses).to_h
4046
end
4147

4248
def self.execute_one(query, for_context:)
4349
execute_many([query], for_context: for_context).fetch(query)
4450
end
51+
52+
# `QueryExecutor` adds `:elastic_graph_client` to the GraphQL context before
53+
# resolver execution begins, so resolver-side datastore queries can reuse the
54+
# same client identity in their datastore `X-Opaque-Id` headers.
55+
private_class_method def self.datastore_opaque_id_parts_for(for_context)
56+
client = for_context.fetch(:elastic_graph_client)
57+
graphql_query = for_context.query
58+
[
59+
"elasticgraph-graphql",
60+
"client=#{client.name}",
61+
*client.extra_opaque_id_parts,
62+
"query=#{graphql_query.fingerprint}"
63+
]
64+
end
4565
end
4666
end
4767
end

elasticgraph-graphql/sig/elastic_graph/graphql/client.rbs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ module ElasticGraph
33
class Client
44
attr_reader name: ::String
55
attr_reader source_description: ::String
6-
def initialize: (name: ::String, source_description: ::String) -> void
7-
def with: (?name: ::String, ?source_description: ::String) -> Client
6+
attr_reader extra_opaque_id_parts: ::Array[::String]
7+
def initialize: (source_description: ::String, name: ::String, ?extra_opaque_id_parts: ::Array[::String]) -> void
8+
def with: (?source_description: ::String, ?name: ::String, ?extra_opaque_id_parts: ::Array[::String]) -> Client
89

910
def self.new:
10-
(name: ::String, source_description: ::String) -> instance
11-
| (::String, ::String) -> instance
11+
(source_description: ::String, name: ::String, ?extra_opaque_id_parts: ::Array[::String]) -> instance
12+
| (::String, ::String, ?::Array[::String]) -> instance
1213

1314
ANONYMOUS: Client
1415
ELASTICGRAPH_INTERNAL: Client

elasticgraph-graphql/sig/elastic_graph/graphql/datastore_search_router.rbs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ module ElasticGraph
88
config: Config
99
) -> void
1010

11-
def msearch: (::Array[DatastoreQuery], ?query_tracker: QueryDetailsTracker) -> ::Hash[DatastoreQuery, DatastoreResponse::SearchResponse]
11+
def msearch: (
12+
::Array[DatastoreQuery],
13+
?query_tracker: QueryDetailsTracker,
14+
?opaque_id_parts: ::Array[::String?]
15+
) -> ::Hash[DatastoreQuery, DatastoreResponse::SearchResponse]
1216
end
1317
end
1418
end

elasticgraph-graphql/sig/elastic_graph/graphql/query_executor.rbs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ module ElasticGraph
3737

3838
def execute_query: (::GraphQL::Query, client: Client) -> ::GraphQL::Query::Result
3939
def full_description_of: (::GraphQL::Query) -> ::String
40-
def fingerprint_for: (::GraphQL::Query) -> ::String
4140
def directives_from_query_operation: (::GraphQL::Query) -> ::Hash[::String, ::Hash[::String, untyped]]
4241
def slo_result_for: (::GraphQL::Query, ::Integer) -> ::String?
4342
end

elasticgraph-graphql/sig/elastic_graph/graphql/resolvers/query_source.rbs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ module ElasticGraph
22
class GraphQL
33
module Resolvers
44
class QuerySource < ::GraphQL::Dataloader::Source
5-
def initialize: (DatastoreSearchRouter, QueryDetailsTracker) -> void
5+
def initialize: (DatastoreSearchRouter, QueryDetailsTracker, ?::Array[::String]) -> void
66
@datastore_router: DatastoreSearchRouter
77
@query_tracker: QueryDetailsTracker
8+
@opaque_id_parts: ::Array[::String]
89
def fetch: (::Array[DatastoreQuery]) -> ::Array[DatastoreResponse::SearchResponse]
910

1011
def self.execute_many: (
@@ -16,6 +17,8 @@ module ElasticGraph
1617
DatastoreQuery,
1718
for_context: ::GraphQL::Query::Context
1819
) -> DatastoreResponse::SearchResponse
20+
21+
def self.datastore_opaque_id_parts_for: (::GraphQL::Query::Context) -> ::Array[::String]
1922
end
2023
end
2124
end

elasticgraph-graphql/spec/integration/elastic_graph/graphql/resolvers/nested_relationships_source_spec.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
# frozen_string_literal: true
88

99
require "elastic_graph/graphql/query_details_tracker"
10+
require "elastic_graph/graphql/client"
1011
require "elastic_graph/graphql/resolvers/nested_relationships_source"
1112
require "graphql"
1213
require "support/aggregations_helpers"
@@ -217,13 +218,14 @@ def resolve_field(field, *value_sets, **query_args)
217218

218219
::GraphQL::Dataloader.with_dataloading do |dataloader|
219220
context = ::GraphQL::Query::Context.new(
220-
query: instance_double(::GraphQL::Query),
221+
query: instance_double(::GraphQL::Query, fingerprint: "NestedRelationshipsSource/test"),
221222
schema: graphql.schema.graphql_schema,
222223
values: {
223224
elastic_graph_schema: graphql.schema,
224225
dataloader: dataloader,
225226
datastore_search_router: graphql.datastore_search_router,
226-
elastic_graph_query_tracker: QueryDetailsTracker.empty
227+
elastic_graph_query_tracker: QueryDetailsTracker.empty,
228+
elastic_graph_client: Client::ANONYMOUS
227229
}
228230
)
229231

elasticgraph-graphql/spec/support/resolver.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
# frozen_string_literal: true
88

99
require "elastic_graph/graphql/query_details_tracker"
10+
require "elastic_graph/graphql/client"
1011
require "elastic_graph/graphql/resolvers/query_adapter"
1112
require "elastic_graph/graphql/resolvers/query_source"
1213
require "graphql"
@@ -21,13 +22,14 @@ def resolve(type_name, field_name, document = nil, **options)
2122

2223
::GraphQL::Dataloader.with_dataloading do |dataloader|
2324
context = ::GraphQL::Query::Context.new(
24-
query: nil,
25+
query: instance_double(::GraphQL::Query, fingerprint: "ResolverHelperQuery/test"),
2526
schema: graphql.schema.graphql_schema,
2627
values: {
2728
elastic_graph_schema: graphql.schema,
2829
dataloader: dataloader,
2930
elastic_graph_query_tracker: query_details_tracker,
30-
datastore_search_router: graphql.datastore_search_router
31+
datastore_search_router: graphql.datastore_search_router,
32+
elastic_graph_client: ElasticGraph::GraphQL::Client::ANONYMOUS
3133
}
3234
)
3335

0 commit comments

Comments
 (0)