Skip to content

Commit 34a7657

Browse files
committed
Add metastore read replica routing
1 parent 1ca4177 commit 34a7657

55 files changed

Lines changed: 973 additions & 193 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

config/quickwit.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ version: 0.8
103103
# metastore_uri: s3://your-bucket/indexes
104104
# metastore_uri: postgres://username:password@host:port/db
105105
#
106+
# Optional PostgreSQL read replica URI used by metastore nodes for gRPC requests
107+
# carrying the `qw-use-read-replica: true` metadata. Defaults to unset.
108+
#
109+
# metastore_read_replica_uri: postgres://username:password@read-replica-host:port/db
110+
#
106111
# When using a file-backed metastore, the state of the metastore will be cached forever.
107112
# If you are indexing and searching from different processes, it is possible to periodically
108113
# refresh the state of the metastore on the searcher using the `polling_interval` hashtag.

docs/configuration/node-config.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ A commented example is available here: [quickwit.yaml](https://github.com/quickw
3030
| `peer_seeds` | List of IP addresses or hostnames used to bootstrap the cluster and discover the complete set of nodes. This list may contain the current node address and does not need to be exhaustive. If the list of peer seeds contains a host name, Quickwit will resolve it by querying the DNS every minute. On kubernetes for instance, it is a good practise to set it to a [headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services). | `QW_PEER_SEEDS` | |
3131
| `data_dir` | Path to directory where data (tmp data, splits kept for caching purpose) is persisted. This is mostly used in indexing. | `QW_DATA_DIR` | `./qwdata` |
3232
| `metastore_uri` | Metastore URI. Can be a local directory or `s3://my-bucket/indexes` or `postgres://username:password@localhost:5432/metastore`. [Learn more about the metastore configuration](metastore-config.md). | `QW_METASTORE_URI` | `{data_dir}/indexes` |
33+
| `metastore_read_replica_uri` | Optional PostgreSQL read replica URI used by metastore nodes when a gRPC request carries the `qw-use-read-replica: true` metadata. If unset, those requests use `metastore_uri`. | `QW_METASTORE_READ_REPLICA_URI` | |
3334
| `default_index_root_uri` | Default index root URI that defines the location where index data (splits) is stored. The index URI is built following the scheme: `{default_index_root_uri}/{index-id}` | `QW_DEFAULT_INDEX_ROOT_URI` | `{data_dir}/indexes` |
3435
| environment variable only | Log level of Quickwit. Can be a direct log level, or a comma separated list of `module_name=level` | `RUST_LOG` | `info` |
3536

quickwit/quickwit-cluster/src/grpc_service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub(crate) async fn cluster_grpc_client(
4444

4545
ClusterServiceClient::tower()
4646
.stack_layer(CLUSTER_GRPC_CLIENT_METRICS_LAYER.clone())
47-
.build_from_channel(socket_addr, channel, MAX_MESSAGE_SIZE, None)
47+
.build_from_channel(socket_addr, channel, MAX_MESSAGE_SIZE, None, [])
4848
}
4949

5050
pub fn cluster_grpc_server(

quickwit/quickwit-codegen/example/src/codegen/hello.rs

Lines changed: 14 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-codegen/example/src/lib.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,8 @@ mod tests {
282282
"127.0.0.1:6666".parse().unwrap(),
283283
Endpoint::from_static("http://127.0.0.1:6666").connect_lazy(),
284284
);
285-
let grpc_client = HelloClient::from_balance_channel(channel, MAX_GRPC_MESSAGE_SIZE, None);
285+
let grpc_client =
286+
HelloClient::from_balance_channel(channel, MAX_GRPC_MESSAGE_SIZE, None, []);
286287

287288
assert_eq!(
288289
grpc_client
@@ -342,7 +343,7 @@ mod tests {
342343
// The connectivity check fails if there is no client behind the channel.
343344
let (balance_channel, _): (BalanceChannel<SocketAddr>, _) = BalanceChannel::new();
344345
let grpc_client =
345-
HelloClient::from_balance_channel(balance_channel, MAX_GRPC_MESSAGE_SIZE, None);
346+
HelloClient::from_balance_channel(balance_channel, MAX_GRPC_MESSAGE_SIZE, None, []);
346347
assert_eq!(
347348
grpc_client
348349
.check_connectivity()
@@ -441,6 +442,7 @@ mod tests {
441442
channel,
442443
MAX_GRPC_MESSAGE_SIZE,
443444
Some(CompressionEncoding::Zstd),
445+
[],
444446
);
445447

446448
assert_eq!(
@@ -788,7 +790,7 @@ mod tests {
788790
"127.0.0.1:7777".parse().unwrap(),
789791
Endpoint::from_static("http://127.0.0.1:7777").connect_lazy(),
790792
);
791-
HelloClient::from_balance_channel(balance_channed, MAX_GRPC_MESSAGE_SIZE, None);
793+
HelloClient::from_balance_channel(balance_channed, MAX_GRPC_MESSAGE_SIZE, None, []);
792794
}
793795

794796
#[tokio::test]
@@ -876,7 +878,7 @@ mod tests {
876878
.timeout(Duration::from_millis(100))
877879
.connect_lazy();
878880
let max_message_size = ByteSize::mib(1);
879-
let grpc_client = HelloClient::from_channel(addr, channel, max_message_size, None);
881+
let grpc_client = HelloClient::from_channel(addr, channel, max_message_size, None, []);
880882

881883
let error = grpc_client
882884
.hello(HelloRequest {
@@ -955,7 +957,7 @@ mod tests {
955957
// this test hangs forever if we comment out the TimeoutLayer, which
956958
// shows that a request without explicit timeout might hang forever
957959
.stack_layer(TimeoutLayer::new(Duration::from_secs(3)))
958-
.build_from_balance_channel(balance_channel, ByteSize::mib(1), None);
960+
.build_from_balance_channel(balance_channel, ByteSize::mib(1), None, []);
959961

960962
let response_fut = async move {
961963
grpc_client

quickwit/quickwit-codegen/src/codegen.rs

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,10 @@ impl CodegenContext {
316316
}
317317
}
318318

319+
fn is_metastore_service(context: &CodegenContext) -> bool {
320+
context.package_name == "quickwit.metastore" && context.service_name == "MetastoreService"
321+
}
322+
319323
fn generate_all(
320324
service: &Service,
321325
result_type_path: &str,
@@ -637,6 +641,21 @@ fn generate_client(context: &CodegenContext) -> TokenStream {
637641
} else {
638642
TokenStream::new()
639643
};
644+
let metastore_read_replica_client_methods = if is_metastore_service(context) {
645+
quote! {
646+
pub fn as_grpc_service_with_read_replica(
647+
&self,
648+
read_replica: Option<Self>,
649+
max_message_size: bytesize::ByteSize,
650+
) -> crate::grpc_read_replica::ReadReplicaGrpcService<#grpc_server_package_name::#grpc_server_name<#grpc_server_adapter_name>> {
651+
let primary = self.as_grpc_service(max_message_size);
652+
let read_replica = read_replica.map(|client| client.as_grpc_service(max_message_size));
653+
crate::grpc_read_replica::ReadReplicaGrpcService::new(primary, read_replica)
654+
}
655+
}
656+
} else {
657+
TokenStream::new()
658+
};
640659

641660
quote! {
642661
#[derive(Debug, Clone)]
@@ -671,15 +690,21 @@ fn generate_client(context: &CodegenContext) -> TokenStream {
671690
.max_encoding_message_size(max_message_size.0 as usize)
672691
}
673692

693+
#metastore_read_replica_client_methods
694+
674695
pub fn from_channel(
675696
addr: std::net::SocketAddr,
676697
channel: tonic::transport::Channel,
677698
max_message_size: bytesize::ByteSize,
678699
compression_encoding_opt: Option<tonic::codec::CompressionEncoding>,
700+
interceptors: impl IntoIterator<Item = quickwit_common::tower::GrpcInterceptor>,
679701
) -> Self
680702
{
681703
let (_, connection_keys_watcher) = tokio::sync::watch::channel(std::collections::HashSet::from_iter([addr]));
682-
let mut client = #grpc_client_package_name::#grpc_client_name::new(channel)
704+
let mut client = #grpc_client_package_name::#grpc_client_name::with_interceptor(
705+
channel,
706+
quickwit_common::tower::GrpcInterceptors::new(interceptors),
707+
)
683708
.max_decoding_message_size(max_message_size.0 as usize)
684709
.max_encoding_message_size(max_message_size.0 as usize);
685710
if let Some(compression_encoding) = compression_encoding_opt {
@@ -695,10 +720,14 @@ fn generate_client(context: &CodegenContext) -> TokenStream {
695720
balance_channel: quickwit_common::tower::BalanceChannel<std::net::SocketAddr>,
696721
max_message_size: bytesize::ByteSize,
697722
compression_encoding_opt: Option<tonic::codec::CompressionEncoding>,
723+
interceptors: impl IntoIterator<Item = quickwit_common::tower::GrpcInterceptor>,
698724
) -> #client_name
699725
{
700726
let connection_keys_watcher = balance_channel.connection_keys_watcher();
701-
let mut client = #grpc_client_package_name::#grpc_client_name::new(balance_channel)
727+
let mut client = #grpc_client_package_name::#grpc_client_name::with_interceptor(
728+
balance_channel,
729+
quickwit_common::tower::GrpcInterceptors::new(interceptors),
730+
)
702731
.max_decoding_message_size(max_message_size.0 as usize)
703732
.max_encoding_message_size(max_message_size.0 as usize);
704733
if let Some(compression_encoding) = compression_encoding_opt {
@@ -1024,7 +1053,6 @@ fn generate_layer_stack_impl(context: &CodegenContext) -> TokenStream {
10241053

10251054
svc_attribute_idents.push(svc_attribute_name);
10261055
}
1027-
10281056
quote! {
10291057
impl #tower_layer_stack_name {
10301058
pub fn stack_layer<L>(mut self, layer: L) -> Self
@@ -1051,9 +1079,10 @@ fn generate_layer_stack_impl(context: &CodegenContext) -> TokenStream {
10511079
channel: tonic::transport::Channel,
10521080
max_message_size: bytesize::ByteSize,
10531081
compression_encoding_opt: Option<tonic::codec::CompressionEncoding>,
1082+
interceptors: impl IntoIterator<Item = quickwit_common::tower::GrpcInterceptor>,
10541083
) -> #client_name
10551084
{
1056-
let client = #client_name::from_channel(addr, channel, max_message_size, compression_encoding_opt);
1085+
let client = #client_name::from_channel(addr, channel, max_message_size, compression_encoding_opt, interceptors);
10571086
let inner_client = client.inner;
10581087
self.build_from_inner_client(inner_client)
10591088
}
@@ -1063,9 +1092,10 @@ fn generate_layer_stack_impl(context: &CodegenContext) -> TokenStream {
10631092
balance_channel: quickwit_common::tower::BalanceChannel<std::net::SocketAddr>,
10641093
max_message_size: bytesize::ByteSize,
10651094
compression_encoding_opt: Option<tonic::codec::CompressionEncoding>,
1095+
interceptors: impl IntoIterator<Item = quickwit_common::tower::GrpcInterceptor>,
10661096
) -> #client_name
10671097
{
1068-
let client = #client_name::from_balance_channel(balance_channel, max_message_size, compression_encoding_opt);
1098+
let client = #client_name::from_balance_channel(balance_channel, max_message_size, compression_encoding_opt, interceptors);
10691099
let inner_client = client.inner;
10701100
self.build_from_inner_client(inner_client)
10711101
}
@@ -1279,7 +1309,7 @@ fn generate_grpc_client_adapter(context: &CodegenContext) -> TokenStream {
12791309
pub fn new(instance: T, connection_addrs_rx: tokio::sync::watch::Receiver<std::collections::HashSet<std::net::SocketAddr>>) -> Self {
12801310
Self {
12811311
inner: instance,
1282-
connection_addrs_rx
1312+
connection_addrs_rx,
12831313
}
12841314
}
12851315
}
@@ -1435,8 +1465,7 @@ fn generate_grpc_server_adapter_methods(context: &CodegenContext) -> TokenStream
14351465
let span = #span_macro;
14361466
let _ = <tracing::Span as tracing_opentelemetry::OpenTelemetrySpanExt>::set_parent(&span, parent_context);
14371467
let fut = async move {
1438-
self.inner
1439-
.0
1468+
self.inner.0
14401469
.#method_name(request)
14411470
.await
14421471
.map(#into_response_type)
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use tonic::metadata::KeyAndValueRef;
18+
use tonic::service::Interceptor;
19+
20+
pub type GrpcInterceptor =
21+
Arc<dyn Fn(tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> + Send + Sync>;
22+
23+
#[derive(Clone, Default)]
24+
pub struct GrpcInterceptors {
25+
interceptors: Vec<GrpcInterceptor>,
26+
}
27+
28+
impl GrpcInterceptors {
29+
pub fn new(interceptors: impl IntoIterator<Item = GrpcInterceptor>) -> Self {
30+
Self {
31+
interceptors: interceptors.into_iter().collect(),
32+
}
33+
}
34+
}
35+
36+
impl Interceptor for GrpcInterceptors {
37+
fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
38+
self.interceptors
39+
.iter()
40+
.try_fold(request, |request, interceptor| interceptor(request))
41+
}
42+
}
43+
44+
pub fn fixed_headers_interceptor(headers: tonic::metadata::MetadataMap) -> GrpcInterceptor {
45+
Arc::new(move |mut request: tonic::Request<()>| {
46+
for key_and_value in headers.iter() {
47+
match key_and_value {
48+
KeyAndValueRef::Ascii(key, value) => {
49+
request.metadata_mut().insert(key, value.to_owned());
50+
}
51+
KeyAndValueRef::Binary(key, value) => {
52+
request.metadata_mut().insert_bin(key, value.to_owned());
53+
}
54+
}
55+
}
56+
Ok(request)
57+
})
58+
}
59+
60+
#[cfg(test)]
61+
mod tests {
62+
use tonic::metadata::{BinaryMetadataValue, MetadataMap, MetadataValue};
63+
64+
use super::*;
65+
66+
#[test]
67+
fn fixed_headers_interceptor_copies_ascii_and_binary_metadata() {
68+
let mut headers = MetadataMap::new();
69+
headers.insert("x-ascii", MetadataValue::from_static("ascii-value"));
70+
headers.insert_bin("x-bin", BinaryMetadataValue::from_bytes(b"binary-value"));
71+
72+
let mut interceptors = GrpcInterceptors::new([fixed_headers_interceptor(headers)]);
73+
let request = interceptors.call(tonic::Request::new(())).unwrap();
74+
75+
assert_eq!(
76+
request.metadata().get("x-ascii").unwrap(),
77+
MetadataValue::from_static("ascii-value")
78+
);
79+
assert_eq!(
80+
request
81+
.metadata()
82+
.get_bin("x-bin")
83+
.unwrap()
84+
.to_bytes()
85+
.unwrap()
86+
.as_ref(),
87+
b"binary-value"
88+
);
89+
}
90+
91+
#[test]
92+
fn grpc_interceptors_apply_multiple_interceptors_in_order() {
93+
let first: GrpcInterceptor = Arc::new(|mut request| {
94+
request
95+
.metadata_mut()
96+
.insert("x-order", MetadataValue::from_static("first"));
97+
Ok(request)
98+
});
99+
let second: GrpcInterceptor = Arc::new(|mut request| {
100+
let value = request.metadata().get("x-order").unwrap().to_str().unwrap();
101+
assert_eq!(value, "first");
102+
request
103+
.metadata_mut()
104+
.insert("x-order", MetadataValue::from_static("second"));
105+
Ok(request)
106+
});
107+
108+
let mut interceptors = GrpcInterceptors::new([first, second]);
109+
let request = interceptors.call(tonic::Request::new(())).unwrap();
110+
111+
assert_eq!(
112+
request.metadata().get("x-order").unwrap(),
113+
MetadataValue::from_static("second")
114+
);
115+
}
116+
}

quickwit/quickwit-common/src/tower/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ mod circuit_breaker;
2020
mod delay;
2121
mod estimate_rate;
2222
mod event_listener;
23+
mod interceptor;
2324
mod load_shed;
2425
mod metrics;
2526
mod one_task_per_call_layer;
@@ -43,6 +44,7 @@ pub use delay::{Delay, DelayLayer};
4344
pub use estimate_rate::{EstimateRate, EstimateRateLayer};
4445
pub use event_listener::{EventListener, EventListenerLayer};
4546
use futures::Future;
47+
pub use interceptor::{GrpcInterceptor, GrpcInterceptors, fixed_headers_interceptor};
4648
pub use load_shed::{LoadShed, LoadShedLayer, MakeLoadShedError};
4749
pub use metrics::{GrpcMetrics, GrpcMetricsLayer, RpcName};
4850
pub use one_task_per_call_layer::{OneTaskPerCallLayer, TaskCancelled};

0 commit comments

Comments
 (0)