Skip to content

Commit 100f783

Browse files
committed
More Blobs
Signed-off-by: Nicholas Gates <nick@nickgates.com>
1 parent 024e435 commit 100f783

23 files changed

Lines changed: 142 additions & 85 deletions

File tree

encodings/bytebool/src/array.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ impl From<Vec<Option<bool>>> for ByteBoolData {
352352
mod tests {
353353
use vortex_array::ArrayContext;
354354
use vortex_array::IntoArray;
355+
use vortex_array::LEGACY_SESSION;
355356
use vortex_array::assert_arrays_eq;
356357
use vortex_array::serde::SerializeOptions;
357358
use vortex_array::serde::SerializedArray;
@@ -406,11 +407,7 @@ mod tests {
406407
let serialized = array
407408
.clone()
408409
.into_array()
409-
.serialize(
410-
&ctx,
411-
&vortex_session::VortexSession::empty(),
412-
&SerializeOptions::default(),
413-
)
410+
.serialize(&ctx, &LEGACY_SESSION, &SerializeOptions::default())
414411
.unwrap();
415412

416413
let mut concat = ByteBufferMut::empty();

vortex-cuda/src/layout.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ impl LayoutStrategy for CudaFlatLayoutStrategy {
445445
segment_sink: SegmentSinkRef,
446446
mut stream: SendableSequentialStream,
447447
_eof: SequencePointer,
448-
_handle: vortex::io::runtime::Handle,
448+
session: &VortexSession,
449449
) -> VortexResult<LayoutRef> {
450450
let ctx = ctx.clone();
451451
let options = self.clone();
@@ -507,10 +507,9 @@ impl LayoutStrategy for CudaFlatLayoutStrategy {
507507
// Scan for constant array buffers before serialization (while data is still on host).
508508
let host_buffers = extract_constant_buffers(&chunk);
509509

510-
let session = VortexSession::empty();
511510
let buffers = chunk.serialize(
512511
&ctx,
513-
&session,
512+
session,
514513
&SerializeOptions {
515514
offset: 0,
516515
include_padding: options.include_padding,

vortex-file/src/v2/file_stats_reader.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ mod tests {
219219
use vortex_error::VortexResult;
220220
use vortex_io::runtime::single::block_on;
221221
use vortex_io::session::RuntimeSession;
222+
use vortex_io::session::RuntimeSessionExt;
222223
use vortex_layout::LayoutReader;
223224
use vortex_layout::LayoutStrategy;
224225
use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
@@ -253,6 +254,7 @@ mod tests {
253254
#[test]
254255
fn pruning_when_filter_out_of_range() -> VortexResult<()> {
255256
block_on(|handle| async {
257+
let session = SESSION.clone().with_handle(handle);
256258
let ctx = ArrayContext::empty();
257259
let segments = Arc::new(TestSegments::default());
258260
let (ptr, eof) = SequenceId::root().split();
@@ -269,7 +271,7 @@ mod tests {
269271
Arc::<TestSegments>::clone(&segments),
270272
struct_array.into_array().to_array_stream().sequenced(ptr),
271273
eof,
272-
handle,
274+
&session,
273275
)
274276
.await?;
275277

@@ -291,6 +293,7 @@ mod tests {
291293
#[test]
292294
fn no_pruning_when_filter_in_range() -> VortexResult<()> {
293295
block_on(|handle| async {
296+
let session = SESSION.clone().with_handle(handle);
294297
let ctx = ArrayContext::empty();
295298
let segments = Arc::new(TestSegments::default());
296299
let (ptr, eof) = SequenceId::root().split();
@@ -307,7 +310,7 @@ mod tests {
307310
Arc::<TestSegments>::clone(&segments),
308311
struct_array.into_array().to_array_stream().sequenced(ptr),
309312
eof,
310-
handle,
313+
&session,
311314
)
312315
.await?;
313316

vortex-file/src/writer.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,15 +179,17 @@ impl VortexWriteOptions {
179179
// We spawn the layout future so it is driven in the background while we write the
180180
// buffer stream, so we don't need to poll it until all buffers have been drained.
181181
let ctx2 = ctx.clone();
182-
let layout_fut = self.session.handle().spawn_nested(|h| async move {
182+
let session = self.session.clone();
183+
let layout_fut = self.session.handle().spawn_nested(move |h| async move {
184+
let session = session.with_handle(h);
183185
let layout = self
184186
.strategy
185187
.write_stream(
186188
ctx2,
187189
Arc::<BufferedSegmentSink>::clone(&segments),
188190
stream,
189191
eof,
190-
h,
192+
&session,
191193
)
192194
.await?;
193195
Ok::<_, VortexError>((layout, segments.segment_specs()))

vortex-ipc/src/messages/encoder.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use vortex_error::vortex_err;
1414
use vortex_flatbuffers::FlatBuffer;
1515
use vortex_flatbuffers::WriteFlatBufferExt;
1616
use vortex_flatbuffers::message as fb;
17+
use vortex_session::VortexSession;
1718

1819
/// An IPC message ready to be passed to the encoder.
1920
pub enum EncoderMessage<'a> {
@@ -25,17 +26,17 @@ pub enum EncoderMessage<'a> {
2526
pub struct MessageEncoder {
2627
/// A reusable buffer of zeros used for padding.
2728
zeros: Bytes,
29+
session: VortexSession,
2830
}
2931

30-
impl Default for MessageEncoder {
31-
fn default() -> Self {
32+
impl MessageEncoder {
33+
pub fn new(session: VortexSession) -> Self {
3234
Self {
35+
session,
3336
zeros: BytesMut::zeroed(u16::MAX as usize).freeze(),
3437
}
3538
}
36-
}
3739

38-
impl MessageEncoder {
3940
/// Encode an IPC message for writing to a byte stream.
4041
///
4142
/// The returned buffers should be written contiguously to the stream.

vortex-layout/src/display.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ mod tests {
230230
use vortex_buffer::BitBufferMut;
231231
use vortex_buffer::buffer;
232232
use vortex_io::runtime::single::block_on;
233+
use vortex_io::session::RuntimeSessionExt;
233234
use vortex_utils::env::EnvVarGuard;
234235

235236
use crate::IntoLayout;
@@ -242,12 +243,14 @@ mod tests {
242243
use crate::sequence::SequenceId;
243244
use crate::sequence::SequentialArrayStreamExt;
244245
use crate::strategy::LayoutStrategy;
246+
use crate::test::SESSION;
245247

246248
/// Test display_tree with inline array_tree metadata (no segment source needed).
247249
#[test]
248250
fn test_display_tree_inline_array_tree() {
249251
let _guard = EnvVarGuard::set("FLAT_LAYOUT_INLINE_ARRAY_NODE", "1");
250252
block_on(|handle| async move {
253+
let session = SESSION.clone().with_handle(handle);
251254
let ctx = ArrayContext::empty();
252255
let segments = Arc::new(TestSegments::default());
253256

@@ -267,7 +270,7 @@ mod tests {
267270
Arc::<TestSegments>::clone(&segments),
268271
array1.into_array().to_array_stream().sequenced(ptr1),
269272
eof1,
270-
handle.clone(),
273+
&session,
271274
)
272275
.await
273276
.unwrap();
@@ -294,7 +297,7 @@ mod tests {
294297
.to_array_stream()
295298
.sequenced(ptr2),
296299
eof2,
297-
handle.clone(),
300+
&session,
298301
)
299302
.await
300303
.unwrap();
@@ -342,6 +345,7 @@ vortex.struct, dtype: {numbers=i64?, strings=utf8}, children: 2, rows: 5
342345
// Ensure inline array node is disabled for this test
343346
let _guard = EnvVarGuard::remove("FLAT_LAYOUT_INLINE_ARRAY_NODE");
344347
block_on(|handle| async move {
348+
let session = SESSION.clone().with_handle(handle);
345349
let ctx = ArrayContext::empty();
346350
let segments = Arc::new(TestSegments::default());
347351

@@ -354,7 +358,7 @@ vortex.struct, dtype: {numbers=i64?, strings=utf8}, children: 2, rows: 5
354358
Arc::<TestSegments>::clone(&segments),
355359
array1.into_array().to_array_stream().sequenced(ptr1),
356360
eof1,
357-
handle.clone(),
361+
&session,
358362
)
359363
.await
360364
.unwrap();
@@ -368,7 +372,7 @@ vortex.struct, dtype: {numbers=i64?, strings=utf8}, children: 2, rows: 5
368372
Arc::<TestSegments>::clone(&segments),
369373
array2.into_array().to_array_stream().sequenced(ptr2),
370374
eof2,
371-
handle.clone(),
375+
&session,
372376
)
373377
.await
374378
.unwrap();
@@ -407,13 +411,14 @@ vortex.chunked, dtype: i32, children: 2, rows: 10
407411
// Create a simple primitive array
408412
let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4, 5], Validity::AllValid);
409413
let layout = block_on(|handle| async {
414+
let session = SESSION.clone().with_handle(handle);
410415
FlatLayoutStrategy::default()
411416
.write_stream(
412417
ctx.clone(),
413418
Arc::<TestSegments>::clone(&segments),
414419
array.into_array().to_array_stream().sequenced(ptr),
415420
eof,
416-
handle,
421+
&session,
417422
)
418423
.await
419424
.unwrap()
@@ -449,13 +454,14 @@ vortex.flat, dtype: i32?, segment 0, buffers=[20B], total=20B
449454
// Create a simple primitive array
450455
let array = PrimitiveArray::new(buffer![10i64, 20, 30], Validity::NonNullable);
451456
let layout = block_on(|handle| async {
457+
let session = SESSION.clone().with_handle(handle);
452458
FlatLayoutStrategy::default()
453459
.write_stream(
454460
ctx,
455461
Arc::<TestSegments>::clone(&segments),
456462
array.into_array().to_array_stream().sequenced(ptr),
457463
eof,
458-
handle,
464+
&session,
459465
)
460466
.await
461467
.unwrap()

vortex-layout/src/layouts/buffered.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use futures::StreamExt as _;
1212
use futures::pin_mut;
1313
use vortex_array::ArrayContext;
1414
use vortex_error::VortexResult;
15-
use vortex_io::runtime::Handle;
15+
use vortex_session::VortexSession;
1616

1717
use crate::LayoutRef;
1818
use crate::LayoutStrategy;
@@ -47,7 +47,7 @@ impl LayoutStrategy for BufferedStrategy {
4747
segment_sink: SegmentSinkRef,
4848
stream: SendableSequentialStream,
4949
eof: SequencePointer,
50-
handle: Handle,
50+
session: &VortexSession,
5151
) -> VortexResult<LayoutRef> {
5252
let dtype = stream.dtype().clone();
5353
let buffer_size = self.buffer_size;
@@ -102,7 +102,7 @@ impl LayoutStrategy for BufferedStrategy {
102102
segment_sink,
103103
SequentialStreamAdapter::new(dtype, buffered_stream).sendable(),
104104
eof,
105-
handle,
105+
session,
106106
)
107107
.await
108108
}

vortex-layout/src/layouts/chunked/reader.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ mod test {
343343
use vortex_array::expr::root;
344344
use vortex_buffer::buffer;
345345
use vortex_io::runtime::single::block_on;
346+
use vortex_io::session::RuntimeSessionExt;
346347

347348
use crate::LayoutRef;
348349
use crate::LayoutStrategy;
@@ -364,6 +365,7 @@ mod test {
364365
let strategy = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
365366
let (mut sequence_id, eof) = SequenceId::root().split();
366367
let layout = block_on(|handle| {
368+
let session = SESSION.clone().with_handle(handle);
367369
strategy.write_stream(
368370
ctx,
369371
Arc::<TestSegments>::clone(&segments),
@@ -377,7 +379,7 @@ mod test {
377379
)
378380
.sendable(),
379381
eof,
380-
handle,
382+
&session,
381383
)
382384
})
383385
.unwrap();

vortex-layout/src/layouts/chunked/writer.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ use futures::stream;
1111
use vortex_array::ArrayContext;
1212
use vortex_error::VortexExpect;
1313
use vortex_error::VortexResult;
14-
use vortex_io::runtime::Handle;
14+
use vortex_io::session::RuntimeSessionExt;
15+
use vortex_session::VortexSession;
1516

1617
use crate::IntoLayout;
1718
use crate::LayoutRef;
@@ -46,11 +47,12 @@ impl LayoutStrategy for ChunkedLayoutStrategy {
4647
segment_sink: SegmentSinkRef,
4748
stream: SendableSequentialStream,
4849
mut eof: SequencePointer,
49-
handle: Handle,
50+
session: &VortexSession,
5051
) -> VortexResult<LayoutRef> {
5152
let dtype = stream.dtype().clone();
5253
let dtype2 = dtype.clone();
5354
let chunk_strategy = Arc::clone(&self.chunk_strategy);
55+
let handle = session.handle();
5456

5557
// We spawn each child to allow parallelism when processing chunks.
5658
let stream = stream! {
@@ -62,8 +64,10 @@ impl LayoutStrategy for ChunkedLayoutStrategy {
6264
let ctx = ctx.clone();
6365
let segment_sink = Arc::clone(&segment_sink);
6466
let dtype = dtype2.clone();
67+
let session = session.clone();
6568

6669
yield handle.spawn_nested(move |handle| async move {
70+
let session = session.with_handle(handle);
6771
chunk_strategy
6872
.write_stream(
6973
ctx,
@@ -74,7 +78,7 @@ impl LayoutStrategy for ChunkedLayoutStrategy {
7478
)
7579
.sendable(),
7680
chunk_eof,
77-
handle,
81+
&session,
7882
)
7983
.await
8084
})

vortex-layout/src/layouts/collect.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use vortex_array::IntoArray;
1212
use vortex_array::arrays::ChunkedArray;
1313
use vortex_error::VortexExpect;
1414
use vortex_error::VortexResult;
15-
use vortex_io::runtime::Handle;
15+
use vortex_session::VortexSession;
1616

1717
use crate::LayoutRef;
1818
use crate::LayoutStrategy;
@@ -44,7 +44,7 @@ impl LayoutStrategy for CollectStrategy {
4444
segment_sink: SegmentSinkRef,
4545
stream: SendableSequentialStream,
4646
eof: SequencePointer,
47-
handle: Handle,
47+
session: &VortexSession,
4848
) -> VortexResult<LayoutRef> {
4949
// Read the whole stream, then write one Chunked stream to the inner thing
5050
let dtype = stream.dtype().clone();
@@ -68,7 +68,7 @@ impl LayoutStrategy for CollectStrategy {
6868
let adapted = Box::pin(SequentialStreamAdapter::new(dtype, collected_stream));
6969

7070
self.child
71-
.write_stream(ctx, segment_sink, adapted, eof, handle)
71+
.write_stream(ctx, segment_sink, adapted, eof, session)
7272
.await
7373
}
7474

0 commit comments

Comments
 (0)