@@ -5,6 +5,7 @@ use std::sync::Arc;
55
66use async_trait:: async_trait;
77use futures:: StreamExt as _;
8+ use parking_lot:: Mutex ;
89use vortex_array:: ArrayContext ;
910use vortex_array:: IntoArray ;
1011use vortex_array:: arrays:: PrimitiveArray ;
@@ -27,7 +28,6 @@ use crate::IntoLayout;
2728use crate :: LayoutRef ;
2829use crate :: LayoutStrategy ;
2930use crate :: layouts:: array_tree:: ArrayTreeLayout ;
30- use crate :: layouts:: array_tree:: flat:: ArrayTreeFlat ;
3131use crate :: layouts:: array_tree:: flat:: ArrayTreeFlatLayout ;
3232use crate :: layouts:: flat:: FlatLayout ;
3333use crate :: layouts:: flat:: writer:: FlatLayoutStrategy ;
@@ -37,34 +37,48 @@ use crate::sequence::SendableSequentialStream;
3737use crate :: sequence:: SequencePointer ;
3838use crate :: sequence:: SequentialArrayStreamExt ;
3939
40+ /// Side channel for shipping `(segment_id, compact_tree)` pairs from leaf strategies to the
41+ /// collector strategy.
42+ ///
43+ /// Each leaf pushes after `segment_sink.write` resolves (so the leaf's `SequenceId` has been
44+ /// dropped before we touch the sink). The collector drains the sink only after the entire
45+ /// data subtree has completed, which means every leaf has already pushed.
46+ type Sink = Arc < Mutex < Vec < ( SegmentId , ByteBuffer ) > > > ;
47+
4048/// Creates a cooperating pair of strategies for array tree collection.
4149///
4250/// Returns `(collector, leaf)` where:
43- /// - `leaf` replaces [`FlatLayoutStrategy`] in the data pipeline — it serializes chunks and
44- /// produces compact flatbuffers attached to [`ArrayTreeFlatLayout`] .
45- /// - `collector` wraps the data pipeline — after data is written, it walks the layout tree to
46- /// collect compact flatbuffers from all [`ArrayTreeFlatLayout`] leaves and writes them as a
47- /// struct array (`{segment_id, compact_tree}`) via the configured `array_trees_strategy`.
51+ /// - `leaf` replaces [`FlatLayoutStrategy`] in the data pipeline — it serializes chunks,
52+ /// produces compact flatbuffers, and pushes them onto the shared sink .
53+ /// - `collector` wraps the data pipeline — after data is written, it drains the sink and
54+ /// writes the collected pairs as a struct array (`{segment_id, compact_tree}`) via the
55+ /// configured `array_trees_strategy`.
4856pub fn writer (
4957 flat : FlatLayoutStrategy ,
5058 array_trees_strategy : Arc < dyn LayoutStrategy > ,
5159) -> ( ArrayTreeCollectorStrategy , ArrayTreeFlatStrategy ) {
52- let leaf = ArrayTreeFlatStrategy { flat } ;
60+ let sink: Sink = Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
61+ let leaf = ArrayTreeFlatStrategy {
62+ flat,
63+ sink : Arc :: clone ( & sink) ,
64+ } ;
5365 let collector = ArrayTreeCollectorStrategy {
5466 child : None ,
5567 array_trees_strategy,
68+ sink,
5669 } ;
5770 ( collector, leaf)
5871}
5972
6073/// Leaf strategy (TX) that replaces [`FlatLayoutStrategy`].
6174///
6275/// For each chunk, it produces both the compact flatbuffer (encoding tree + buffer
63- /// descriptors, no stats) and the full data segment, and returns an [`ArrayTreeFlatLayout`]
64- /// with the compact tree attached for later collection .
76+ /// descriptors, no stats) and the full data segment, then pushes `(segment_id, compact_tree)`
77+ /// onto the shared sink for the collector to consume .
6578#[ derive( Clone ) ]
6679pub struct ArrayTreeFlatStrategy {
6780 flat : FlatLayoutStrategy ,
81+ sink : Sink ,
6882}
6983
7084#[ async_trait]
@@ -117,21 +131,28 @@ impl LayoutStrategy for ArrayTreeFlatStrategy {
117131 } ,
118132 ) ?;
119133 assert ! ( buffers. len( ) >= 2 ) ;
134+
135+ // IMPORTANT ORDERING CONSTRAINT: write the segment first, then push to the sink.
136+ //
137+ // `segment_sink.write` consumes our `SequenceId` and only drops it on return. Pushing
138+ // to the sink before that point would risk holding the sink mutex while later leaves
139+ // are blocked on `SequenceId::collapse`, creating a dependency from "later leaf is
140+ // ready to write" → "earlier leaf must drop its SequenceId" → "earlier leaf must
141+ // finish its sink push" → mutex contention with the later leaf. Doing the push after
142+ // `await?` resolves means our SequenceId is already gone before we touch the sink.
120143 let segment_id = segment_sink. write ( sequence_id, buffers) . await ?;
144+ self . sink . lock ( ) . push ( ( segment_id, compact_tree) ) ;
121145
122146 let None = stream. next ( ) . await else {
123147 vortex_bail ! ( "array tree flat layout received stream with more than a single chunk" ) ;
124148 } ;
125149
126- Ok ( ArrayTreeFlatLayout :: new (
127- FlatLayout :: new (
128- row_count,
129- stream. dtype ( ) . clone ( ) ,
130- segment_id,
131- ReadContext :: new ( ctx. to_ids ( ) ) ,
132- ) ,
133- compact_tree,
134- )
150+ Ok ( ArrayTreeFlatLayout :: new ( FlatLayout :: new (
151+ row_count,
152+ stream. dtype ( ) . clone ( ) ,
153+ segment_id,
154+ ReadContext :: new ( ctx. to_ids ( ) ) ,
155+ ) )
135156 . into_layout ( ) )
136157 }
137158
@@ -142,13 +163,13 @@ impl LayoutStrategy for ArrayTreeFlatStrategy {
142163
143164/// Collector strategy (RX) that wraps the data pipeline.
144165///
145- /// After the data child completes, walks the returned layout tree to extract compact
146- /// flatbuffers and segment IDs from all [`ArrayTreeFlatLayout`] leaves, builds a struct
147- /// array of `{segment_id, compact_tree}`, and writes it as an auxiliary child via the
148- /// configured `array_trees_strategy`.
166+ /// After the data child completes, drains the shared sink and writes the collected
167+ /// `(segment_id, compact_tree)` pairs as a struct array via the configured
168+ /// `array_trees_strategy`.
149169pub struct ArrayTreeCollectorStrategy {
150170 child : Option < Arc < dyn LayoutStrategy > > ,
151171 array_trees_strategy : Arc < dyn LayoutStrategy > ,
172+ sink : Sink ,
152173}
153174
154175impl ArrayTreeCollectorStrategy {
@@ -186,17 +207,9 @@ impl LayoutStrategy for ArrayTreeCollectorStrategy {
186207 )
187208 . await ?;
188209
189- // Walk the layout tree to collect (segment_id, compact_tree) pairs from
190- // ArrayTreeFlatLayout leaves.
191- let mut entries: Vec < ( SegmentId , ByteBuffer ) > = Vec :: new ( ) ;
192- for layout_ref in data_layout. depth_first_traversal ( ) {
193- let layout_ref = layout_ref?;
194- if let Some ( atf) = layout_ref. as_opt :: < ArrayTreeFlat > ( )
195- && let Some ( tree) = atf. compact_tree ( )
196- {
197- entries. push ( ( atf. inner ( ) . segment_id ( ) , tree. clone ( ) ) ) ;
198- }
199- }
210+ // By the time the data subtree future resolves, every leaf has finished its
211+ // `segment_sink.write().await?` and pushed onto the sink. Drain it now.
212+ let mut entries = std:: mem:: take ( & mut * self . sink . lock ( ) ) ;
200213
201214 // Sort by segment ID so the on-disk order matches segment-write order — this gives
202215 // good locality and predictable lookup-table layout.
0 commit comments