Skip to content

Commit 667df25

Browse files
committed
Respect readiness in read replica gRPC service
1 parent c48e368 commit 667df25

1 file changed

Lines changed: 18 additions & 10 deletions

File tree

quickwit/quickwit-proto/src/grpc_read_replica.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub fn read_replica_header_interceptor() -> quickwit_common::tower::GrpcIntercep
2727
quickwit_common::tower::fixed_headers_interceptor(headers)
2828
}
2929

30-
#[derive(Debug, Clone)]
30+
#[derive(Debug)]
3131
pub struct ReadReplicaGrpcService<S> {
3232
primary: S,
3333
read_replica: Option<S>,
@@ -42,6 +42,12 @@ impl<S> ReadReplicaGrpcService<S> {
4242
}
4343
}
4444

45+
impl<S: Clone> Clone for ReadReplicaGrpcService<S> {
46+
fn clone(&self) -> Self {
47+
Self::new(self.primary.clone(), self.read_replica.clone())
48+
}
49+
}
50+
4551
impl<S> tonic::server::NamedService for ReadReplicaGrpcService<S>
4652
where S: tonic::server::NamedService
4753
{
@@ -63,9 +69,13 @@ where
6369
{
6470
type Response = http::Response<tonic::body::Body>;
6571
type Error = Infallible;
66-
type Future = tonic::codegen::BoxFuture<Self::Response, Self::Error>;
72+
type Future = S::Future;
6773

68-
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
74+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
75+
std::task::ready!(tower::Service::poll_ready(&mut self.primary, cx))?;
76+
if let Some(read_replica) = self.read_replica.as_mut() {
77+
std::task::ready!(tower::Service::poll_ready(read_replica, cx))?;
78+
}
6979
Poll::Ready(Ok(()))
7080
}
7181

@@ -75,13 +85,11 @@ where
7585
.get(READ_REPLICA_HEADER_NAME)
7686
.and_then(|value| value.to_str().ok())
7787
== Some(READ_REPLICA_HEADER_VALUE);
78-
let mut service = if use_read_replica {
79-
self.read_replica
80-
.clone()
81-
.unwrap_or_else(|| self.primary.clone())
88+
89+
if use_read_replica && let Some(read_replica) = self.read_replica.as_mut() {
90+
tower::Service::call(read_replica, request)
8291
} else {
83-
self.primary.clone()
84-
};
85-
Box::pin(async move { tower::Service::call(&mut service, request).await })
92+
tower::Service::call(&mut self.primary, request)
93+
}
8694
}
8795
}

0 commit comments

Comments
 (0)