@@ -263,13 +263,28 @@ const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
263263const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE : & str =
264264 "LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE" ;
265265const ENV_LANCE_READ_CACHE_REPETITION_INDEX : & str = "LANCE_READ_CACHE_REPETITION_INDEX" ;
266+ const ENV_LANCE_INLINE_SCHEDULING_THRESHOLD : & str = "LANCE_INLINE_SCHEDULING_THRESHOLD" ;
267+
268+ // If a request is for at most this many rows we skip the scheduler-task spawn
269+ // and run scheduling inline as part of the stream's first poll.
270+ const DEFAULT_INLINE_SCHEDULING_THRESHOLD : u64 = 16 * 1024 ;
266271
267272fn default_cache_repetition_index ( ) -> bool {
268273 static DEFAULT_CACHE_REPETITION_INDEX : OnceLock < bool > = OnceLock :: new ( ) ;
269274 * DEFAULT_CACHE_REPETITION_INDEX
270275 . get_or_init ( || parse_env_as_bool ( ENV_LANCE_READ_CACHE_REPETITION_INDEX , true ) )
271276}
272277
278+ fn inline_scheduling_threshold ( ) -> u64 {
279+ static THRESHOLD : OnceLock < u64 > = OnceLock :: new ( ) ;
280+ * THRESHOLD . get_or_init ( || {
281+ std:: env:: var ( ENV_LANCE_INLINE_SCHEDULING_THRESHOLD )
282+ . ok ( )
283+ . and_then ( |v| v. trim ( ) . parse :: < u64 > ( ) . ok ( ) )
284+ . unwrap_or ( DEFAULT_INLINE_SCHEDULING_THRESHOLD )
285+ } )
286+ }
287+
273288/// Top-level encoding message for a page. Wraps both the
274289/// legacy pb::ArrayEncoding and the newer pb::PageLayout
275290///
@@ -1956,13 +1971,24 @@ pub struct DecoderConfig {
19561971 pub cache_repetition_index : bool ,
19571972 /// Whether to validate decoded data
19581973 pub validate_on_decode : bool ,
1974+ /// Override the strategy used to dispatch the scheduling work in
1975+ /// [`schedule_and_decode`].
1976+ ///
1977+ /// * `None` - default behavior: scheduling runs inline on the stream's
1978+ /// first poll when the request is small (controlled by the
1979+ /// `LANCE_INLINE_SCHEDULING_THRESHOLD` env var) and on a spawned task
1980+ /// otherwise.
1981+ /// * `Some(true)` - always run scheduling inline.
1982+ /// * `Some(false)` - always spawn a task for scheduling.
1983+ pub inline_scheduling : Option < bool > ,
19591984}
19601985
19611986impl Default for DecoderConfig {
19621987 fn default ( ) -> Self {
19631988 Self {
19641989 cache_repetition_index : default_cache_repetition_index ( ) ,
19651990 validate_on_decode : false ,
1991+ inline_scheduling : None ,
19661992 }
19671993 }
19681994}
@@ -2120,7 +2146,17 @@ fn create_scheduler_decoder(
21202146 config. batch_size_bytes ,
21212147 ) ?;
21222148
2123- let scheduler_handle = tokio:: task:: spawn ( async move {
2149+ // For small requests the scheduling cost is dwarfed by the overhead of
2150+ // spawning a task, so run scheduling inline as part of the stream's first
2151+ // poll instead. The threshold is configurable via
2152+ // `LANCE_INLINE_SCHEDULING_THRESHOLD`, and callers can force either
2153+ // strategy via `DecoderConfig::inline_scheduling`.
2154+ let inline_scheduling = config
2155+ . decoder_config
2156+ . inline_scheduling
2157+ . unwrap_or_else ( || num_rows <= inline_scheduling_threshold ( ) ) ;
2158+
2159+ let scheduling = async move {
21242160 let mut decode_scheduler = match DecodeBatchScheduler :: try_new (
21252161 target_schema. as_ref ( ) ,
21262162 & column_indices,
@@ -2150,9 +2186,19 @@ fn create_scheduler_decoder(
21502186 decode_scheduler. schedule_take ( & indices, & filter, tx, config. io )
21512187 }
21522188 }
2153- } ) ;
2189+ } ;
21542190
2155- Ok ( check_scheduler_on_drop ( decode_stream, scheduler_handle) )
2191+ if inline_scheduling {
2192+ Ok ( async move {
2193+ scheduling. await ;
2194+ decode_stream
2195+ }
2196+ . flatten_stream ( )
2197+ . boxed ( ) )
2198+ } else {
2199+ let scheduler_handle = tokio:: task:: spawn ( scheduling) ;
2200+ Ok ( check_scheduler_on_drop ( decode_stream, scheduler_handle) )
2201+ }
21562202}
21572203
21582204/// Launches a scheduler on a dedicated (spawned) task and creates a decoder to
0 commit comments