@@ -183,26 +183,37 @@ fn insert_datafusion_worker(
183183/// Adapter that appends the DataFusion query and worker gRPC services to the
184184/// tonic `Router`. Construct via [`build_datafusion_mount`]; [`grpc.rs`] calls
185185/// [`Self::apply`] on the assembled router.
186- ///
187- /// Hiding the closure behind a struct avoids exposing the
188- /// `WorkerServiceServer<Worker>` type publicly (it comes from a
189- /// `pub(crate)` module inside `datafusion-distributed`).
190186pub ( crate ) struct DataFusionMount {
191- router_mod : Option < Box < dyn FnOnce ( Router ) -> Router + Send > > ,
187+ session_builder : Option < Arc < DataFusionSessionBuilder > > ,
188+ max_message_size_bytes : usize ,
192189}
193190
194191impl DataFusionMount {
195192 /// Returns a no-op mount — used when `services.datafusion_session_builder`
196193 /// is `None` (startup toggle off).
197194 fn noop ( ) -> Self {
198- Self { router_mod : None }
195+ Self {
196+ session_builder : None ,
197+ max_message_size_bytes : 0 ,
198+ }
199199 }
200200
201- pub fn apply ( self , router : Router ) -> Router {
202- match self . router_mod {
203- Some ( f) => f ( router) ,
204- None => router,
205- }
201+ pub fn apply < L > ( self , router : Router < L > ) -> Router < L > {
202+ let Some ( session_builder) = self . session_builder else {
203+ return router;
204+ } ;
205+
206+ let query_server = DataFusionServiceServer :: new ( DataFusionServiceGrpcImpl :: new (
207+ DataFusionService :: new ( Arc :: clone ( & session_builder) ) ,
208+ ) )
209+ . max_decoding_message_size ( self . max_message_size_bytes )
210+ . max_encoding_message_size ( self . max_message_size_bytes ) ;
211+
212+ let worker = build_worker ( session_builder) ;
213+
214+ router
215+ . add_service ( query_server)
216+ . add_service ( worker. into_worker_server ( ) )
206217 }
207218}
208219
@@ -225,34 +236,28 @@ pub(crate) fn build_datafusion_mount(
225236 enabled. insert ( "datafusion-worker" ) ;
226237 file_descriptor_sets. push ( quickwit_datafusion:: proto:: DATAFUSION_FILE_DESCRIPTOR_SET ) ;
227238
228- let max_size = max_message_size_bytes;
229- let session_builder = Arc :: clone ( session_builder) ;
230-
231- let router_mod = Box :: new ( move |router : Router | {
232- let query_server = DataFusionServiceServer :: new ( DataFusionServiceGrpcImpl :: new (
233- DataFusionService :: new ( Arc :: clone ( & session_builder) ) ,
234- ) )
235- . max_decoding_message_size ( max_size)
236- . max_encoding_message_size ( max_size) ;
237-
238- let worker = build_worker ( Arc :: clone ( & session_builder) ) ;
239-
240- router
241- . add_service ( query_server)
242- . add_service ( worker. into_worker_server ( ) )
243- } ) ;
244-
245239 DataFusionMount {
246- router_mod : Some ( router_mod) ,
240+ session_builder : Some ( Arc :: clone ( session_builder) ) ,
241+ max_message_size_bytes,
247242 }
248243}
249244
250245#[ cfg( test) ]
251246mod tests {
252247 use quickwit_proto:: ingest:: ingester:: IngesterStatus ;
248+ use tower:: layer:: util:: Identity ;
253249
254250 use super :: * ;
255251
252+ #[ test]
253+ fn datafusion_mount_accepts_layered_tonic_router ( ) {
254+ let ( _health_reporter, health_service) = tonic_health:: server:: health_reporter ( ) ;
255+ let mut server = tonic:: transport:: Server :: builder ( ) . layer ( Identity :: new ( ) ) ;
256+ let router = server. add_service ( health_service) ;
257+
258+ let _router = DataFusionMount :: noop ( ) . apply ( router) ;
259+ }
260+
256261 #[ tokio:: test]
257262 async fn datafusion_worker_changes_ignore_non_searcher_adds ( ) {
258263 let node = ClusterNode :: for_test (
0 commit comments