@@ -272,6 +272,46 @@ async fn build_metastore_client(
272272 . build_from_balance_channel ( balance_channel, max_message_size, None ) )
273273}
274274
275+ /// Builds the list of metastore clients whose connectivity gates this node's readiness.
276+ ///
277+ /// `metastore` is the node's main metastore client (`metastore_through_control_plane`). The store
278+ /// it resolves to depends on the role:
279+ /// - on a `metastore` node, the locally served read-write primary;
280+ /// - on a `metastore_read_replica` node, the locally served read-only replica;
281+ /// - on any other node, a remote client to the primary `metastore` pool.
282+ ///
283+ /// `metastore_read_replica_opt` is the separate remote read-replica client built only for
284+ /// searchers that route their reads to a replica.
285+ fn build_metastore_readiness_clients (
286+ node_config : & NodeConfig ,
287+ metastore : MetastoreServiceClient ,
288+ metastore_read_replica_opt : Option < MetastoreServiceClient > ,
289+ ) -> Vec < MetastoreServiceClient > {
290+ let mut readiness_clients = Vec :: with_capacity ( 2 ) ;
291+
292+ // Every role relies on the main metastore client except a searcher routed to a read replica,
293+ // which reads from `metastore_read_replica_opt` below instead. A read-replica node is *not* an
294+ // exception: its `metastore` is the replica it serves locally (see above), so it is checked
295+ // here too.
296+ let should_check_main_metastore =
297+ node_config
298+ . enabled_services
299+ . iter ( )
300+ . any ( |service| match service {
301+ QuickwitService :: Searcher => {
302+ !node_config. searcher_config . use_metastore_read_replica
303+ }
304+ _ => true ,
305+ } ) ;
306+ if should_check_main_metastore {
307+ readiness_clients. push ( metastore) ;
308+ }
309+ if let Some ( read_replica_metastore) = metastore_read_replica_opt {
310+ readiness_clients. push ( read_replica_metastore) ;
311+ }
312+ readiness_clients
313+ }
314+
275315async fn balance_channel_for_service (
276316 cluster : & Cluster ,
277317 service : QuickwitService ,
@@ -733,18 +773,29 @@ pub async fn serve_quickwit(
733773 let use_metastore_read_replica_for_search = node_config
734774 . is_service_enabled ( QuickwitService :: Searcher )
735775 && node_config. searcher_config . use_metastore_read_replica ;
736- let search_metastore_client : MetastoreReadServiceClient =
737- if use_metastore_read_replica_for_search {
738- let read_replica_metastore = build_metastore_client (
776+ let search_read_replica_metastore_opt = if use_metastore_read_replica_for_search {
777+ Some (
778+ build_metastore_client (
739779 & cluster,
740780 QuickwitService :: MetastoreReadReplica ,
741781 grpc_config. max_message_size ,
742782 )
743- . await ?;
744- Arc :: new ( read_replica_metastore)
783+ . await ?,
784+ )
785+ } else {
786+ None
787+ } ;
788+ let search_metastore_client: MetastoreReadServiceClient =
789+ if let Some ( read_replica_metastore) = & search_read_replica_metastore_opt {
790+ Arc :: new ( read_replica_metastore. clone ( ) )
745791 } else {
746792 Arc :: new ( metastore_through_control_plane. clone ( ) )
747793 } ;
794+ let metastore_readiness_clients = build_metastore_readiness_clients (
795+ & node_config,
796+ metastore_through_control_plane. clone ( ) ,
797+ search_read_replica_metastore_opt,
798+ ) ;
748799
749800 let ( search_job_placer, search_service, searcher_pool) = setup_searcher (
750801 & node_config,
@@ -951,7 +1002,7 @@ pub async fn serve_quickwit(
9511002 spawn_named_task (
9521003 node_readiness_reporting_task (
9531004 cluster. clone ( ) ,
954- metastore_through_control_plane ,
1005+ metastore_readiness_clients ,
9551006 ingester_opt. clone ( ) ,
9561007 grpc_readiness_signal_rx,
9571008 rest_readiness_signal_rx,
@@ -1498,10 +1549,32 @@ fn with_arg<T: Clone + Send>(arg: T) -> impl Filter<Extract = (T,), Error = Infa
14981549 warp:: any ( ) . map ( move || arg. clone ( ) )
14991550}
15001551
1552+ async fn metastores_are_available ( metastores : & [ MetastoreServiceClient ] ) -> bool {
1553+ if metastores. is_empty ( ) {
1554+ warn ! ( "no metastore configured for readiness checks" ) ;
1555+ return false ;
1556+ }
1557+ futures:: future:: join_all ( metastores. iter ( ) . map ( |metastore| async move {
1558+ match metastore. check_connectivity ( ) . await {
1559+ Ok ( ( ) ) => {
1560+ debug ! ( metastore_endpoints=?metastore. endpoints( ) , "metastore service is available" ) ;
1561+ true
1562+ }
1563+ Err ( error) => {
1564+ warn ! ( metastore_endpoints=?metastore. endpoints( ) , error=?error, "metastore service is unavailable" ) ;
1565+ false
1566+ }
1567+ }
1568+ } ) )
1569+ . await
1570+ . into_iter ( )
1571+ . all ( |metastore_is_available| metastore_is_available)
1572+ }
1573+
15011574/// Reports node readiness to chitchat cluster every 10 seconds (25 ms for tests).
15021575async fn node_readiness_reporting_task (
15031576 cluster : Cluster ,
1504- metastore : MetastoreServiceClient ,
1577+ metastores : Vec < MetastoreServiceClient > ,
15051578 ingester_opt : Option < impl IngesterService > ,
15061579 grpc_readiness_signal_rx : oneshot:: Receiver < ( ) > ,
15071580 rest_readiness_signal_rx : oneshot:: Receiver < ( ) > ,
@@ -1532,16 +1605,7 @@ async fn node_readiness_reporting_task(
15321605 loop {
15331606 interval. tick ( ) . await ;
15341607
1535- let metastore_is_available = match metastore. check_connectivity ( ) . await {
1536- Ok ( ( ) ) => {
1537- debug ! ( metastore_endpoints=?metastore. endpoints( ) , "metastore service is available" ) ;
1538- true
1539- }
1540- Err ( error) => {
1541- warn ! ( metastore_endpoints=?metastore. endpoints( ) , error=?error, "metastore service is unavailable" ) ;
1542- false
1543- }
1544- } ;
1608+ let metastore_is_available = metastores_are_available ( & metastores) . await ;
15451609 let ingester_is_available = if let Some ( ingester) = & ingester_opt {
15461610 match try_get_ingester_status ( ingester) . await {
15471611 Ok ( status) => {
@@ -1639,6 +1703,26 @@ mod tests {
16391703
16401704 use super :: * ;
16411705
1706+ fn metastore_readiness_client (
1707+ readiness_rx : watch:: Receiver < bool > ,
1708+ uri : & ' static str ,
1709+ ) -> MetastoreServiceClient {
1710+ let mut mock_metastore = MockMetastoreService :: new ( ) ;
1711+ mock_metastore
1712+ . expect_check_connectivity ( )
1713+ . returning ( move || {
1714+ if * readiness_rx. borrow ( ) {
1715+ Ok ( ( ) )
1716+ } else {
1717+ Err ( anyhow:: anyhow!( "metastore `{uri}` not ready" ) )
1718+ }
1719+ } ) ;
1720+ mock_metastore
1721+ . expect_endpoints ( )
1722+ . return_const ( vec ! [ Uri :: for_test( uri) ] ) ;
1723+ MetastoreServiceClient :: from_mock ( mock_metastore)
1724+ }
1725+
16421726 #[ tokio:: test]
16431727 async fn test_check_cluster_configuration ( ) {
16441728 let services = HashSet :: from_iter ( [ QuickwitService :: Metastore ] ) ;
@@ -1672,16 +1756,7 @@ mod tests {
16721756 . await
16731757 . unwrap ( ) ;
16741758 let ( metastore_readiness_tx, metastore_readiness_rx) = watch:: channel ( false ) ;
1675- let mut mock_metastore = MockMetastoreService :: new ( ) ;
1676- mock_metastore
1677- . expect_check_connectivity ( )
1678- . returning ( move || {
1679- if * metastore_readiness_rx. borrow ( ) {
1680- Ok ( ( ) )
1681- } else {
1682- Err ( anyhow:: anyhow!( "Metastore not ready" ) )
1683- }
1684- } ) ;
1759+ let mock_metastore = metastore_readiness_client ( metastore_readiness_rx, "ram:///metastore" ) ;
16851760 let ( ingester_status_tx, ingester_status_rx) = watch:: channel ( IngesterStatus :: Initializing ) ;
16861761 let mut mock_ingester = MockIngesterService :: new ( ) ;
16871762 mock_ingester
@@ -1723,7 +1798,7 @@ mod tests {
17231798
17241799 tokio:: spawn ( node_readiness_reporting_task (
17251800 cluster. clone ( ) ,
1726- MetastoreServiceClient :: from_mock ( mock_metastore) ,
1801+ vec ! [ mock_metastore] ,
17271802 Some ( mock_ingester) ,
17281803 grpc_readiness_signal_rx,
17291804 rest_readiness_signal_rx,
@@ -1755,6 +1830,145 @@ mod tests {
17551830 assert_eq ! ( response. status( ) , ServingStatus :: NotServing . into( ) ) ;
17561831 }
17571832
1833+ #[ tokio:: test]
1834+ async fn test_readiness_requires_all_metastores ( ) {
1835+ let transport = ChitchatTransport :: default ( ) ;
1836+ let cluster = create_cluster_for_test ( Vec :: new ( ) , & [ ] , & transport, false )
1837+ . await
1838+ . unwrap ( ) ;
1839+ let ( primary_readiness_tx, primary_readiness_rx) = watch:: channel ( false ) ;
1840+ let ( replica_readiness_tx, replica_readiness_rx) = watch:: channel ( false ) ;
1841+ let primary_metastore =
1842+ metastore_readiness_client ( primary_readiness_rx, "ram:///primary-metastore" ) ;
1843+ let replica_metastore =
1844+ metastore_readiness_client ( replica_readiness_rx, "ram:///replica-metastore" ) ;
1845+ let ( grpc_readiness_trigger_tx, grpc_readiness_signal_rx) = oneshot:: channel ( ) ;
1846+ let ( rest_readiness_trigger_tx, rest_readiness_signal_rx) = oneshot:: channel ( ) ;
1847+ let ( health_reporter, _health_service) = health_reporter ( ) ;
1848+
1849+ tokio:: spawn ( node_readiness_reporting_task (
1850+ cluster. clone ( ) ,
1851+ vec ! [ primary_metastore, replica_metastore] ,
1852+ None :: < MockIngesterService > ,
1853+ grpc_readiness_signal_rx,
1854+ rest_readiness_signal_rx,
1855+ health_reporter,
1856+ ) ) ;
1857+ grpc_readiness_trigger_tx. send ( ( ) ) . unwrap ( ) ;
1858+ rest_readiness_trigger_tx. send ( ( ) ) . unwrap ( ) ;
1859+
1860+ primary_readiness_tx. send ( true ) . unwrap ( ) ;
1861+ tokio:: time:: sleep ( READINESS_REPORTING_INTERVAL * 3 ) . await ;
1862+ assert ! ( !cluster. is_self_node_ready( ) . await ) ;
1863+
1864+ replica_readiness_tx. send ( true ) . unwrap ( ) ;
1865+ assert_eventually ! ( cluster. is_self_node_ready( ) . await ) ;
1866+
1867+ primary_readiness_tx. send ( false ) . unwrap ( ) ;
1868+ assert_eventually ! ( !cluster. is_self_node_ready( ) . await ) ;
1869+ }
1870+
1871+ #[ test]
1872+ fn test_build_metastore_readiness_clients ( ) {
1873+ const MAIN_ENDPOINT : & str = "ram:///main-metastore" ;
1874+ const READ_REPLICA_ENDPOINT : & str = "ram:///read-replica-metastore" ;
1875+
1876+ fn mock_metastore ( endpoint : & ' static str ) -> MetastoreServiceClient {
1877+ let mut mock_metastore = MockMetastoreService :: new ( ) ;
1878+ mock_metastore
1879+ . expect_endpoints ( )
1880+ . return_const ( vec ! [ Uri :: for_test( endpoint) ] ) ;
1881+ MetastoreServiceClient :: from_mock ( mock_metastore)
1882+ }
1883+
1884+ struct TestCase {
1885+ comment : & ' static str ,
1886+ enabled_services : & ' static [ QuickwitService ] ,
1887+ use_metastore_read_replica : bool ,
1888+ with_read_replica_client : bool ,
1889+ expected_endpoints : & ' static [ & ' static str ] ,
1890+ }
1891+
1892+ let test_cases = [
1893+ TestCase {
1894+ comment : "indexer depends on the main metastore" ,
1895+ enabled_services : & [ QuickwitService :: Indexer ] ,
1896+ use_metastore_read_replica : false ,
1897+ with_read_replica_client : false ,
1898+ expected_endpoints : & [ MAIN_ENDPOINT ] ,
1899+ } ,
1900+ TestCase {
1901+ comment : "searcher without a read replica depends on the main metastore" ,
1902+ enabled_services : & [ QuickwitService :: Searcher ] ,
1903+ use_metastore_read_replica : false ,
1904+ with_read_replica_client : false ,
1905+ expected_endpoints : & [ MAIN_ENDPOINT ] ,
1906+ } ,
1907+ TestCase {
1908+ comment : "searcher routed to a read replica skips the main metastore" ,
1909+ enabled_services : & [ QuickwitService :: Searcher ] ,
1910+ use_metastore_read_replica : true ,
1911+ with_read_replica_client : true ,
1912+ expected_endpoints : & [ READ_REPLICA_ENDPOINT ] ,
1913+ } ,
1914+ TestCase {
1915+ comment : "searcher routed to a read replica still checks the main metastore for \
1916+ its other roles",
1917+ enabled_services : & [ QuickwitService :: Searcher , QuickwitService :: Indexer ] ,
1918+ use_metastore_read_replica : true ,
1919+ with_read_replica_client : true ,
1920+ expected_endpoints : & [ MAIN_ENDPOINT , READ_REPLICA_ENDPOINT ] ,
1921+ } ,
1922+ TestCase {
1923+ comment : "standalone read replica node checks its locally served metastore" ,
1924+ enabled_services : & [ QuickwitService :: MetastoreReadReplica ] ,
1925+ use_metastore_read_replica : false ,
1926+ with_read_replica_client : false ,
1927+ expected_endpoints : & [ MAIN_ENDPOINT ] ,
1928+ } ,
1929+ TestCase {
1930+ comment : "metastore node checks its locally served metastore" ,
1931+ enabled_services : & [ QuickwitService :: Metastore ] ,
1932+ use_metastore_read_replica : false ,
1933+ with_read_replica_client : false ,
1934+ expected_endpoints : & [ MAIN_ENDPOINT ] ,
1935+ } ,
1936+ ] ;
1937+
1938+ for test_case in test_cases {
1939+ let mut node_config = NodeConfig :: for_test ( ) ;
1940+ node_config. enabled_services = test_case. enabled_services . iter ( ) . copied ( ) . collect ( ) ;
1941+ node_config. searcher_config . use_metastore_read_replica =
1942+ test_case. use_metastore_read_replica ;
1943+ let read_replica_client_opt = if test_case. with_read_replica_client {
1944+ Some ( mock_metastore ( READ_REPLICA_ENDPOINT ) )
1945+ } else {
1946+ None
1947+ } ;
1948+
1949+ let readiness_clients = build_metastore_readiness_clients (
1950+ & node_config,
1951+ mock_metastore ( MAIN_ENDPOINT ) ,
1952+ read_replica_client_opt,
1953+ ) ;
1954+
1955+ let actual_endpoints: Vec < Uri > = readiness_clients
1956+ . iter ( )
1957+ . flat_map ( |metastore| metastore. endpoints ( ) )
1958+ . collect ( ) ;
1959+ let expected_endpoints: Vec < Uri > = test_case
1960+ . expected_endpoints
1961+ . iter ( )
1962+ . map ( |endpoint| Uri :: for_test ( endpoint) )
1963+ . collect ( ) ;
1964+ assert_eq ! (
1965+ actual_endpoints, expected_endpoints,
1966+ "{}" ,
1967+ test_case. comment
1968+ ) ;
1969+ }
1970+ }
1971+
17581972 #[ tokio:: test]
17591973 async fn test_setup_indexer_pool ( ) {
17601974 let universe = Universe :: with_accelerated_time ( ) ;
0 commit comments