File tree Expand file tree Collapse file tree
sandbox/plugins/analytics-backend-datafusion/rust/src Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -69,6 +69,10 @@ pub struct QueryStreamHandle {
6969 /// Held for its `Drop` impl — marks the query completed when the
7070 /// stream is closed.
7171 _query_tracking_context : QueryTrackingContext ,
72+ /// Keeps the SessionContext alive while the stream is being consumed.
73+ /// The physical plan may reference state (e.g. RuntimeEnv, caches) owned
74+ /// by the session; dropping it prematurely causes use-after-free.
75+ _session_ctx : Option < datafusion:: prelude:: SessionContext > ,
7276}
7377
7478impl QueryStreamHandle {
@@ -79,6 +83,19 @@ impl QueryStreamHandle {
7983 Self {
8084 stream,
8185 _query_tracking_context : query_context,
86+ _session_ctx : None ,
87+ }
88+ }
89+
90+ pub fn with_session_context (
91+ stream : RecordBatchStreamAdapter < CrossRtStream > ,
92+ query_context : QueryTrackingContext ,
93+ ctx : datafusion:: prelude:: SessionContext ,
94+ ) -> Self {
95+ Self {
96+ stream,
97+ _query_tracking_context : query_context,
98+ _session_ctx : Some ( ctx) ,
8299 }
83100 }
84101}
Original file line number Diff line number Diff line change @@ -624,28 +624,17 @@ pub unsafe extern "C" fn df_execute_with_context(
624624 plan_ptr : * const u8 ,
625625 plan_len : i64 ,
626626) -> i64 {
627- // Consume the session context handle on entry. Ownership transfers here
628- // regardless of whether the remainder of this function succeeds, returns an
629- // error via `?`, or panics — RAII (or `catch_unwind` drop-during-unwind)
630- // drops `session_handle` and frees the underlying SessionContext resources.
631- //
632- // This matches the Java-side contract: SessionContextHandle.markConsumed() is
633- // invoked in a `finally` after the FFM downcall, so every observable path from
634- // Java's perspective ("call.invoke ran") maps to "Rust consumed the handle".
635- // If we were to run fallible or panic-prone code (e.g. `get_rt_manager()?`)
636- // before Box::from_raw, the handle would leak on those paths.
637627 let session_handle = * Box :: from_raw ( session_ctx_ptr as * mut crate :: session_context:: SessionContextHandle ) ;
638628
639629 let mgr = get_rt_manager ( ) ?;
640630 let plan_bytes = slice:: from_raw_parts ( plan_ptr, plan_len as usize ) ;
641631 let cpu_executor = mgr. cpu_executor ( ) ;
642632
643- // Route based on whether the session was configured for indexed execution
644- let handle_ref = & * ( session_ctx_ptr as * const crate :: session_context:: SessionContextHandle ) ;
645- if handle_ref. indexed_config . is_some ( ) {
633+ if session_handle. indexed_config . is_some ( ) {
634+ let ptr = Box :: into_raw ( Box :: new ( session_handle) ) as i64 ;
646635 mgr. io_runtime
647636 . block_on ( crate :: indexed_executor:: execute_indexed_with_context (
648- session_ctx_ptr ,
637+ ptr ,
649638 plan_bytes. to_vec ( ) ,
650639 cpu_executor,
651640 ) )
Original file line number Diff line number Diff line change @@ -692,6 +692,6 @@ pub async unsafe fn execute_indexed_with_context(
692692 let cross_rt_stream = CrossRtStream :: new_with_df_error_stream ( df_stream, cpu_executor) ;
693693 let schema = cross_rt_stream. schema ( ) ;
694694 let wrapped = RecordBatchStreamAdapter :: new ( schema, cross_rt_stream) ;
695- let stream_handle = crate :: api:: QueryStreamHandle :: new ( wrapped, query_context) ;
695+ let stream_handle = crate :: api:: QueryStreamHandle :: with_session_context ( wrapped, query_context, ctx ) ;
696696 Ok ( Box :: into_raw ( Box :: new ( stream_handle) ) as i64 )
697697}
Original file line number Diff line number Diff line change @@ -185,6 +185,6 @@ pub async fn execute_with_context(
185185 cross_rt_stream,
186186 ) ;
187187
188- let stream_handle = crate :: api:: QueryStreamHandle :: new ( wrapped, handle. query_context ) ;
188+ let stream_handle = crate :: api:: QueryStreamHandle :: with_session_context ( wrapped, handle. query_context , handle . ctx ) ;
189189 Ok ( Box :: into_raw ( Box :: new ( stream_handle) ) as i64 )
190190}
You can’t perform that action at this time.
0 commit comments