Skip to content

Commit ec9a3f3

Browse files
committed
review fixes
1 parent 7ad1730 commit ec9a3f3

5 files changed

Lines changed: 32 additions & 25 deletions

File tree

vortex-ffi/build.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::process::Command;
88
use std::process::exit;
99

1010
fn main() {
11+
println!("cargo::rustc-check-cfg=cfg(vortex_nightly)");
1112
println!("cargo:rerun-if-changed=src");
1213
println!("cargo:rerun-if-changed=cbindgen.toml");
1314
println!("cargo:rerun-if-changed=Cargo.toml");

vortex-ffi/cbindgen.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ struct ArrowArrayStream {
5151
void (*release)(struct ArrowArrayStream*);
5252
void* private_data;
5353
};
54-
typedef ArrowSchema FFI_ArrowSchema;
55-
typedef ArrowArrayStream FFI_ArrowArrayStream;
54+
typedef struct ArrowSchema FFI_ArrowSchema;
55+
typedef struct ArrowArrayStream FFI_ArrowArrayStream;
5656
#endif
5757
"""
5858

vortex-ffi/src/scan.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use std::ffi::c_int;
66
use std::ops::Range;
77
use std::ptr;
88
use std::sync::Arc;
9-
use std::sync::Mutex;
109

1110
use arrow_array::RecordBatch;
1211
use arrow_array::cast::AsArray;
@@ -43,12 +42,11 @@ use crate::error::vx_error;
4342
use crate::expression::vx_expression;
4443
use crate::session::vx_session;
4544

46-
pub enum VxScanState {
45+
pub enum VxScan {
4746
Pending(Box<dyn DataSourceScan>),
4847
Started(PartitionStream),
4948
Finished,
5049
}
51-
pub type VxScan = Mutex<VxScanState>;
5250
crate::box_wrapper!(VxScan, vx_scan);
5351

5452
pub enum VxPartitionScan {
@@ -233,7 +231,7 @@ pub unsafe extern "C-unwind" fn vx_data_source_scan(
233231
unsafe { &mut *estimate },
234232
);
235233
}
236-
Ok(vx_scan::new(Mutex::new(VxScanState::Pending(scan))))
234+
Ok(vx_scan::new(VxScan::Pending(scan)))
237235
})
238236
})
239237
}
@@ -249,8 +247,8 @@ pub unsafe extern "C-unwind" fn vx_scan_dtype(
249247
err: *mut *mut vx_error,
250248
) -> *const vx_dtype {
251249
try_or(err, ptr::null(), || {
252-
let scan = vx_scan::as_ref(scan).lock().expect("failed to lock mutex");
253-
let VxScanState::Pending(ref scan) = *scan else {
250+
let scan = vx_scan::as_ref(scan);
251+
let VxScan::Pending(ref scan) = *scan else {
254252
vortex_bail!("dtype unavailable: scan already started");
255253
};
256254
Ok(vx_dtype::new_ref(scan.dtype()))
@@ -265,22 +263,21 @@ pub unsafe extern "C-unwind" fn vx_scan_dtype(
265263
/// "err".
266264
/// On error returns NULL and sets "err".
267265
///
268-
/// This function is thread-safe: callers running a multi-threaded pipeline
269-
/// should call it concurrently and dispatch each partition to a dedicated
270-
/// worker thread.
266+
/// This function is thread-unsafe. Callers running a multi-threaded pipeline
267+
/// should synchronise on calls to this function and dispatch each produced
268+
/// partition to a dedicated worker thread.
271269
#[unsafe(no_mangle)]
272270
pub unsafe extern "C-unwind" fn vx_scan_next_partition(
273271
scan: *mut vx_scan,
274272
err: *mut *mut vx_error,
275273
) -> *mut vx_partition {
276274
let scan = vx_scan::as_mut(scan);
277-
let mut scan = scan.lock().expect("failed to lock mutex");
278275
let scan = &mut *scan;
279276
unsafe {
280-
let ptr = scan as *mut VxScanState;
277+
let ptr = scan as *mut VxScan;
281278

282279
let on_finish = || -> VortexResult<*mut vx_partition> {
283-
ptr::write(ptr, VxScanState::Finished);
280+
ptr::write(ptr, VxScan::Finished);
284281
Ok(ptr::null_mut())
285282
};
286283

@@ -289,7 +286,7 @@ pub unsafe extern "C-unwind" fn vx_scan_next_partition(
289286
Some(partition) => {
290287
let partition = VxPartitionScan::Pending(partition?);
291288
let partition = vx_partition::new(partition);
292-
ptr::write(ptr, VxScanState::Started(stream));
289+
ptr::write(ptr, VxScan::Started(stream));
293290
Ok(partition)
294291
}
295292
None => on_finish(),
@@ -298,9 +295,9 @@ pub unsafe extern "C-unwind" fn vx_scan_next_partition(
298295

299296
let owned = ptr::read(ptr);
300297
try_or_default(err, || match owned {
301-
VxScanState::Pending(scan) => on_stream(scan.partitions()),
302-
VxScanState::Started(stream) => on_stream(stream),
303-
VxScanState::Finished => on_finish(),
298+
VxScan::Pending(scan) => on_stream(scan.partitions()),
299+
VxScan::Started(stream) => on_stream(stream),
300+
VxScan::Finished => on_finish(),
304301
})
305302
}
306303
}

vortex-ffi/test/common.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,4 @@ struct Defer {
3838
};
3939
#define CONCAT(x, y) x##y
4040
#define CONCAT2(x, y) CONCAT(x, y)
41-
#define defer Defer CONCAT2(defer_, __LINE__) = [&]
41+
#define defer Defer CONCAT2(defer_, __LINE__) = [&]

vortex-ffi/test/scan.cpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@ using nanoarrow::UniqueSchema;
2828

2929
struct TempPath : fs::path {
3030
TempPath() = default;
31-
explicit TempPath(fs::path p) : fs::path(std::move(p)) {}
31+
explicit TempPath(fs::path p) : fs::path(std::move(p)) {
32+
}
3233

3334
TempPath(const TempPath &) = delete;
3435
TempPath &operator=(const TempPath &) = delete;
3536

36-
TempPath(TempPath &&other) noexcept : fs::path(std::move(other)) {}
37+
TempPath(TempPath &&other) noexcept : fs::path(std::move(other)) {
38+
}
3739
TempPath &operator=(TempPath &&other) noexcept {
3840
if (this != &other) {
3941
fs::remove(*this);
@@ -179,7 +181,7 @@ UniqueArrayStream sample_array_stream() {
179181
require_no_error(error);
180182

181183
INFO("Written vortex file "s + path.generic_string());
182-
return TempPath{path};
184+
return TempPath {path};
183185
}
184186

185187
TEST_CASE("Creating datasources", "[datasource]") {
@@ -216,7 +218,7 @@ TEST_CASE("Creating datasources", "[datasource]") {
216218
vx_error_free(error);
217219

218220
TempPath file = write_sample(session);
219-
for (const char *files : { file.c_str(), "/tmp/*.vortex"}) {
221+
for (const char *files : {file.c_str(), "/tmp/*.vortex"}) {
220222
INFO("reading "s + files);
221223
opts.paths = files;
222224
ds = vx_data_source_new(session, &opts, &error);
@@ -479,11 +481,18 @@ TEST_CASE("Multithreaded scan", "[datasource]") {
479481
REQUIRE(estimate.type == VX_ESTIMATE_INEXACT);
480482
REQUIRE(estimate.estimate == NUM_FILES);
481483

484+
std::mutex mutex;
482485
std::vector<std::jthread> threads;
483486
for (size_t i = 0; i < NUM_FILES; ++i) {
484487
threads.emplace_back([&] {
485-
vx_partition *partition = vx_scan_next_partition(scan, &error);
486-
require_no_error(error);
488+
vx_partition *partition = nullptr;
489+
{
490+
std::lock_guard _(mutex);
491+
partition = vx_scan_next_partition(scan, &error);
492+
require_no_error(error);
493+
REQUIRE(partition != nullptr);
494+
}
495+
487496
defer {
488497
vx_partition_free(partition);
489498
};

0 commit comments

Comments
 (0)