Skip to content

Commit fb47849

Browse files
tsafinclaude
andcommitted
Fix stream consumer failure hang in Lance streaming writer
When the Rust streaming task exits early (disk error, write failure), push() would block indefinitely: the queue stays full, nothing notifies not_full_cv_, and the C++ producer hangs forever. Fix: - Add lance_writer_stream_is_alive() Rust FFI: polls JoinHandle::is_finished() to detect whether the background task is still consuming the stream. - Add StreamState::set_health_check(fn): stores a callback invoked when a push() timeout fires without a real queue-drain notification. - Rewrite the stall wait from bare wait() to a wait_for(500ms) loop: on each timeout, call the health check; if the task is gone, call set_error() which unblocks both not_full_cv_ and not_empty_cv_. - Wire up in initialize_lance_dataset(): register a lambda that calls lance_writer_stream_is_alive(rust_writer_) after stream start. The 500 ms poll interval means producers unblock within half a second of a Rust task crash, rather than hanging until process kill. Normal operation (no task failure) adds zero overhead: the health check is only reached on a wait_for timeout, which only fires when the queue stays full for 500 ms without a pop — i.e., already a stall event. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 9d4f425 commit fb47849

3 files changed

Lines changed: 55 additions & 3 deletions

File tree

include/tpch/lance_ffi.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,17 @@ LanceWriter* lance_writer_create(const char* uri, const void* arrow_schema_ptr,
4646
*/
4747
int lance_writer_start_stream(LanceWriter* writer, void* arrow_stream_ptr);
4848

49+
/**
50+
* Check whether the background streaming task is still running.
51+
*
52+
* Used by C++ producers to detect early task exit (e.g., write error) so they
53+
* can unblock from a full queue rather than hanging indefinitely.
54+
*
55+
* @param writer Pointer to LanceWriter from lance_writer_create()
56+
* @return 1 if the background task is running, 0 if finished or not started
57+
*/
58+
int lance_writer_stream_is_alive(const LanceWriter* writer);
59+
4960
/**
5061
* Configure write parameters for Lance writes.
5162
*

src/writers/lance_writer.cpp

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
#include <mutex>
88
#include <condition_variable>
99
#include <atomic>
10+
#include <chrono>
11+
#include <functional>
1012
#include <cstdlib>
1113
#include <cstring>
1214
#include <arrow/type.h>
@@ -23,13 +25,29 @@ struct StreamState {
2325
explicit StreamState(size_t max_queue_batches)
2426
: max_queue_batches_(max_queue_batches) {}
2527

28+
void set_health_check(std::function<bool()> fn) {
29+
health_check_fn_ = std::move(fn);
30+
}
31+
2632
void push(const std::shared_ptr<arrow::RecordBatch>& batch, int64_t batch_bytes) {
2733
std::unique_lock<std::mutex> lock(mu_);
2834
if (!closed_ && status_.ok() && queue_.size() >= max_queue_batches_) {
2935
auto wait_start = std::chrono::steady_clock::now();
30-
not_full_cv_.wait(lock, [&] {
31-
return closed_ || queue_.size() < max_queue_batches_ || !status_.ok();
32-
});
36+
// Poll with a timeout so we can detect early stream-consumer exit.
37+
// A bare wait() would hang indefinitely if the Rust task fails before
38+
// draining the queue, because nothing would notify not_full_cv_.
39+
while (!closed_ && status_.ok() && queue_.size() >= max_queue_batches_) {
40+
bool signaled = not_full_cv_.wait_for(lock, std::chrono::milliseconds(500), [&] {
41+
return closed_ || queue_.size() < max_queue_batches_ || !status_.ok();
42+
});
43+
if (!signaled && health_check_fn_ && !health_check_fn_()) {
44+
status_ = arrow::Status::IOError(
45+
"Lance streaming task exited unexpectedly (write error or crash)");
46+
not_empty_cv_.notify_all();
47+
not_full_cv_.notify_all();
48+
break;
49+
}
50+
}
3351
auto wait_end = std::chrono::steady_clock::now();
3452
auto waited = std::chrono::duration_cast<std::chrono::nanoseconds>(wait_end - wait_start).count();
3553
stall_ns_.fetch_add(static_cast<uint64_t>(waited), std::memory_order_relaxed);
@@ -119,6 +137,7 @@ struct StreamState {
119137
size_t max_queue_batches_;
120138
bool closed_ = false;
121139
arrow::Status status_ = arrow::Status::OK();
140+
std::function<bool()> health_check_fn_; // returns false when Rust task has exited
122141
std::atomic<uint64_t> stall_ns_{0};
123142
std::atomic<uint64_t> stall_count_{0};
124143
int64_t current_bytes_ = 0;
@@ -268,6 +287,13 @@ void LanceWriter::initialize_lance_dataset(
268287
streaming_started_ = true;
269288
stream_state_ = std::move(state);
270289
stream_reader_ = std::move(reader);
290+
291+
// Register health check: if the Rust task exits early (write error etc.),
292+
// push() will detect it within 500 ms and unblock producers via set_error().
293+
auto* raw = reinterpret_cast<::LanceWriter*>(rust_writer_);
294+
stream_state_->set_health_check([raw]() -> bool {
295+
return lance_writer_stream_is_alive(raw) != 0;
296+
});
271297
}
272298

273299
std::cout << "Lance: Initialized dataset at " << dataset_path_ << "\n";

third_party/lance-ffi/src/lib.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,21 @@ pub extern "C" fn lance_writer_write_batch(writer_ptr: *mut LanceWriterHandle, a
371371
})).unwrap_or(7)
372372
}
373373

374+
/// Returns 1 if the background streaming task is still running, 0 if it has finished
375+
/// (either successfully or with an error) or was never started. C++ uses this to detect
376+
/// early task exit and unblock producers waiting on a full queue.
377+
#[no_mangle]
378+
pub extern "C" fn lance_writer_stream_is_alive(writer_ptr: *const LanceWriterHandle) -> c_int {
379+
if writer_ptr.is_null() { return 0; }
380+
let writer = unsafe { &*writer_ptr };
381+
match &writer.backend {
382+
WriterBackend::Streaming { task } => {
383+
task.as_ref().map(|t| if t.is_finished() { 0 } else { 1 }).unwrap_or(0)
384+
}
385+
_ => 0,
386+
}
387+
}
388+
374389
#[no_mangle]
375390
pub extern "C" fn lance_writer_start_stream(writer_ptr: *mut LanceWriterHandle, arrow_stream_ptr: *const c_void) -> c_int {
376391
catch_unwind(AssertUnwindSafe(|| {

0 commit comments

Comments
 (0)