Skip to content

Commit 426f6cc

Browse files
committed
Pull materialized CDCs through the engine again for arrow conversion with a live connection / transaction
1 parent 50d2b28 commit 426f6cc

4 files changed

Lines changed: 527 additions & 306 deletions

File tree

src/duckdb_py/include/duckdb_python/pyresult.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,15 @@ struct DuckDBPyResult {
7171
unique_ptr<DataChunk> FetchNextRaw(QueryResult &result);
7272
unique_ptr<NumpyResultConversion> InitializeNumpyConversion(bool pandas = false);
7373

74+
//! Re-feed an already-MATERIALIZED result (a ColumnDataCollection, e.g. from
75+
//! rel.execute()) back through the engine on the user's own context. The eager
76+
//! variant installs a PhysicalArrowCollector to produce an ArrowQueryResult
77+
//! (parallel); the stream variant produces a lazy StreamQueryResult that co-owns
78+
//! the context (so it survives `del conn`). Never call these on a StreamQueryResult:
79+
//! a lazy result already has a live context and is converted/wrapped directly.
80+
void PromoteMaterializedToArrow(idx_t batch_size);
81+
void PromoteMaterializedToStream();
82+
7483
private:
7584
idx_t chunk_offset = 0;
7685

src/duckdb_py/pyrelation.cpp

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,8 @@ unique_ptr<DuckDBPyRelation> DuckDBPyRelation::Distinct() {
801801

802802
duckdb::pyarrow::RecordBatchReader DuckDBPyRelation::FetchRecordBatchReader(idx_t rows_per_batch) {
803803
AssertResult();
804+
// FetchRecordBatchReader routes by the stored result's type: a con.execute() stream is
805+
// wrapped directly; a pre-executed relation's MaterializedQueryResult is re-fed.
804806
return result->FetchRecordBatchReader(rows_per_batch);
805807
}
806808

@@ -991,20 +993,15 @@ py::object DuckDBPyRelation::ToArrowCapsule(const py::object &requested_schema)
991993
if (!rel) {
992994
return py::none();
993995
}
994-
// The PyCapsule protocol doesn't allow custom parameters, so we use the same
995-
// default batch size as fetch_arrow_table / fetch_record_batch.
996-
idx_t batch_size = 1000000;
997-
auto &config = ClientConfig::GetConfig(*rel->context->GetContext());
998-
ScopedConfigSetting scoped_setting(
999-
config,
1000-
[&batch_size](ClientConfig &config) {
1001-
config.get_result_collector = [&batch_size](ClientContext &context, PreparedStatementData &data) {
1002-
return PhysicalArrowCollector::Create(context, data, batch_size);
1003-
};
1004-
},
1005-
[](ClientConfig &config) { config.get_result_collector = nullptr; });
1006-
ExecuteOrThrow();
996+
// Fresh relation: execute lazily as a stream on the user's own context. The
997+
// resulting StreamQueryResult owns that context, so the capsule survives a
998+
// later `del conn` (natively fixing #492); it shares the connection's single
999+
// active-stream slot, so it must be consumed before reusing the connection.
1000+
ExecuteOrThrow(true);
10071001
}
1002+
// FetchArrowCapsule routes by the result's type: a StreamQueryResult (fresh or a
1003+
// con.execute() cursor) is wrapped directly; a pre-executed relation's
1004+
// MaterializedQueryResult is re-fed through the engine as a stream.
10081005
AssertResultOpen();
10091006
auto res = result->FetchArrowCapsule();
10101007
result = nullptr;
@@ -1049,8 +1046,10 @@ duckdb::pyarrow::RecordBatchReader DuckDBPyRelation::ToRecordBatch(idx_t batch_s
10491046
if (!rel) {
10501047
return py::none();
10511048
}
1049+
// Fresh relation: stream lazily on the user's own context (survives `del conn`).
10521050
ExecuteOrThrow(true);
10531051
}
1052+
// FetchRecordBatchReader routes by type (stream wrapped directly, materialized re-fed).
10541053
AssertResultOpen();
10551054
auto res = result->FetchRecordBatchReader(batch_size);
10561055
result = nullptr;

0 commit comments

Comments
 (0)