Skip to content

Commit c8c35eb

Browse files
committed
Get the arrow schema in a separate transaction for materialized data only
1 parent 7ddc75f commit c8c35eb

5 files changed

Lines changed: 102 additions & 147 deletions

File tree

src/duckdb_py/arrow/arrow_export_utils.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ namespace duckdb {
1717

1818
namespace pyarrow {
1919

20-
py::object ToPyArrowSchema(ArrowSchema &schema) {
20+
py::object ToPyArrowSchema(const ArrowSchema &schema) {
2121
py::gil_scoped_acquire acquire;
2222

2323
auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib");

src/duckdb_py/include/duckdb_python/arrow/arrow_export_utils.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace duckdb {
66

77
namespace pyarrow {
88

9-
py::object ToPyArrowSchema(ArrowSchema &schema);
9+
py::object ToPyArrowSchema(const ArrowSchema &schema);
1010

1111
py::object ToArrowTable(const vector<LogicalType> &types, const vector<string> &names, const py::list &batches,
1212
ClientProperties &options);

src/duckdb_py/include/duckdb_python/pyresult.hpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,13 @@ struct DuckDBPyResult {
3636

3737
PandasDataFrame FetchDF(bool date_as_object);
3838

39-
duckdb::pyarrow::Table FetchArrowTable(idx_t rows_per_batch, bool to_polars);
40-
4139
PandasDataFrame FetchDFChunk(const idx_t vectors_per_chunk = 1, bool date_as_object = false);
4240

4341
py::dict FetchPyTorch();
4442

4543
py::dict FetchTF();
4644

47-
ArrowArrayStream FetchArrowArrayStream(idx_t rows_per_batch = 1000000);
45+
duckdb::pyarrow::Table FetchArrowTable(idx_t rows_per_batch, bool to_polars);
4846
duckdb::pyarrow::RecordBatchReader FetchRecordBatchReader(idx_t rows_per_batch = 1000000);
4947
py::object FetchArrowCapsule(idx_t rows_per_batch = 1000000);
5048

@@ -78,7 +76,11 @@ struct DuckDBPyResult {
7876
//! the context (so it survives `del conn`). Never call these on a StreamQueryResult:
7977
//! a lazy result already has a live context and is converted/wrapped directly.
8078
void PromoteMaterializedToArrow(idx_t batch_size);
81-
void PromoteMaterializedToStream();
79+
80+
template <typename T>
81+
T RunWithArrowSchema(const std::function<T(const ArrowSchema &)> &fun, bool dedup_col_names);
82+
duckdb::pyarrow::Table MaterializedResultToArrowTable(const ArrowSchema &arrow_schema, idx_t rows_per_batch);
83+
ArrowArrayStream FetchArrowArrayStream(idx_t rows_per_batch);
8284

8385
private:
8486
idx_t chunk_offset = 0;

src/duckdb_py/pyrelation.cpp

Lines changed: 1 addition & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -23,59 +23,6 @@
2323
#include "duckdb/common/arrow/physical_arrow_collector.hpp"
2424
#include "duckdb_python/arrow/arrow_export_utils.hpp"
2525

26-
namespace {
27-
28-
// A helper for arrow conversion. We want to be able to fetch a result's schema in the same transaction that
29-
// creates the result, so we have to wrap both calls in the same transaction. This helper always reverts the
30-
// transaction if we haven't committed it explicitly. Note that this is not the same as RunFunctionInTransaction:
31-
// we run _queries_ in a transaction (where each query acquires the context lock) while RFIT runs a function
32-
// while holding the context lock for that duration.
33-
// Note: this is a workaround that is intended to be temporary. We should really just cache the schema in the
34-
// ArrowQueryResult.
35-
36-
void RunOrThrow(duckdb::ClientContext &context, const char *sql) {
37-
auto result = context.Query(sql, duckdb::QueryParameters(false));
38-
if (result->HasError()) {
39-
result->ThrowError();
40-
}
41-
}
42-
43-
class ArrowConversionTransaction {
44-
public:
45-
explicit ArrowConversionTransaction(duckdb::ClientContext &context_p) : context(context_p), owns(false) {
46-
auto &txn = context.transaction;
47-
if (txn.IsAutoCommit() && !txn.HasActiveTransaction()) {
48-
RunOrThrow(context, "BEGIN TRANSACTION");
49-
owns = true;
50-
}
51-
}
52-
53-
~ArrowConversionTransaction() {
54-
if (owns) {
55-
try {
56-
RunOrThrow(context, "ROLLBACK");
57-
} catch (...) { // NOLINT
58-
}
59-
}
60-
}
61-
62-
void Commit() {
63-
if (owns) {
64-
RunOrThrow(context, "COMMIT");
65-
owns = false;
66-
}
67-
}
68-
69-
ArrowConversionTransaction(const ArrowConversionTransaction &) = delete;
70-
ArrowConversionTransaction &operator=(const ArrowConversionTransaction &) = delete;
71-
72-
private:
73-
duckdb::ClientContext &context;
74-
bool owns;
75-
};
76-
77-
} // namespace
78-
7926
namespace duckdb {
8027

8128
DuckDBPyRelation::DuckDBPyRelation(shared_ptr<Relation> rel_p) : rel(std::move(rel_p)) {
@@ -1013,22 +960,10 @@ PandasDataFrame DuckDBPyRelation::FetchDFChunk(idx_t vectors_per_chunk, bool dat
1013960
return result->FetchDFChunk(vectors_per_chunk, date_as_object);
1014961
}
1015962

1016-
duckdb::pyarrow::Table DuckDBPyRelation::ToArrowTableInternal(idx_t batch_size, bool to_polars) {
963+
pyarrow::Table DuckDBPyRelation::ToArrowTableInternal(idx_t batch_size, bool to_polars) {
1017964
if (!result && !rel) {
1018965
return py::none();
1019966
}
1020-
// Make sure we have a valid client context
1021-
shared_ptr<ClientContext> context;
1022-
if (rel) {
1023-
context = rel->context->GetContext();
1024-
} else if (auto cc = result->GetClientProperties().client_context) {
1025-
context = cc->shared_from_this();
1026-
} else {
1027-
throw ConnectionException("Cannot fetch an arrow table without a valid connection");
1028-
}
1029-
// Start (or piggyback on) a transaction for the conversion
1030-
ArrowConversionTransaction conversion_txn(*context);
1031-
1032967
if (!result) {
1033968
auto &config = ClientConfig::GetConfig(*rel->context->GetContext());
1034969
ScopedConfigSetting scoped_setting(
@@ -1044,8 +979,6 @@ duckdb::pyarrow::Table DuckDBPyRelation::ToArrowTableInternal(idx_t batch_size,
1044979
AssertResultOpen();
1045980
auto res = result->FetchArrowTable(batch_size, to_polars);
1046981
result = nullptr;
1047-
// We must commit the transaction before returning
1048-
conversion_txn.Commit();
1049982
return res;
1050983
}
1051984

src/duckdb_py/pyresult.cpp

Lines changed: 93 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -483,104 +483,112 @@ void DuckDBPyResult::PromoteMaterializedToArrow(idx_t batch_size) {
483483
result = std::move(new_result);
484484
}
485485

486-
// Re-feed a materialized result as a lazy stream on the user's own context. The
487-
// StreamQueryResult co-owns the context, so conversion survives `del conn` and runs under a
488-
// live transaction (geometry/extension correctness, #492).
489-
void DuckDBPyResult::PromoteMaterializedToStream() {
490-
D_ASSERT(result->type == QueryResultType::MATERIALIZED_RESULT);
491-
auto client_context = result->client_properties.client_context;
492-
if (!client_context) {
493-
throw InternalException("Cannot promote result to an Arrow stream: the originating client context is gone");
486+
template <typename T>
487+
T DuckDBPyResult::RunWithArrowSchema(const std::function<T(const ArrowSchema &)> &fun, bool dedup_col_names) {
488+
D_ASSERT(result);
489+
if (!result->client_properties.client_context) {
490+
throw ConnectionException("Cannot fetch arrow schema without a valid connection");
494491
}
495-
auto context = client_context->shared_from_this();
496-
auto &materialized = result->Cast<MaterializedQueryResult>();
497-
auto names = result->names;
498-
auto select = MakeColumnDataScanStatement(materialized.TakeCollection(), names);
492+
auto ctx = result->client_properties.client_context->shared_from_this();
499493

500-
unique_ptr<QueryResult> new_result;
501-
{
502-
D_ASSERT(py::gil_check());
503-
py::gil_scoped_release release;
504-
auto pending_query = context->PendingQuery(std::move(select), QueryParameters(true));
505-
new_result = DuckDBPyConnection::CompletePendingQuery(*pending_query);
506-
}
507-
if (new_result->HasError()) {
508-
new_result->ThrowError();
494+
auto names = result->names;
495+
if (dedup_col_names) {
496+
QueryResult::DeduplicateColumns(names);
509497
}
510-
new_result->names = std::move(names);
511-
result = std::move(new_result);
498+
499+
ArrowSchema arrow_schema;
500+
ctx->RunFunctionInTransaction(
501+
[&] { ArrowConverter::ToArrowSchema(&arrow_schema, result->types, names, result->client_properties); });
502+
503+
return fun(arrow_schema);
512504
}
513505

514-
duckdb::pyarrow::Table DuckDBPyResult::FetchArrowTable(idx_t rows_per_batch, bool to_polars) {
515-
if (!result) {
516-
throw InvalidInputException("There is no query result");
517-
}
518-
// ARROW_RESULT: fresh collector output. MATERIALIZED: re-feed for parallel conversion.
519-
// STREAM: a live result, converted directly below (never materialized to re-feed).
506+
duckdb::pyarrow::Table DuckDBPyResult::MaterializedResultToArrowTable(const ArrowSchema &arrow_schema,
507+
const idx_t rows_per_batch) {
508+
D_ASSERT(result);
509+
D_ASSERT(result->type == QueryResultType::MATERIALIZED_RESULT || result->type == QueryResultType::ARROW_RESULT);
510+
511+
auto pyarrow_schema = pyarrow::ToPyArrowSchema(arrow_schema);
520512
if (result->type == QueryResultType::MATERIALIZED_RESULT) {
521513
PromoteMaterializedToArrow(rows_per_batch);
522514
}
523-
524-
auto names = result->names;
525-
if (to_polars) {
526-
QueryResult::DeduplicateColumns(names);
515+
py::list batches;
516+
auto &arrow_result = result->Cast<ArrowQueryResult>();
517+
auto arrays = arrow_result.ConsumeArrays();
518+
for (auto &array : arrays) {
519+
ArrowArray data = array->arrow_array;
520+
array->arrow_array.release = nullptr;
521+
TransformDuckToArrowChunk(pyarrow_schema, data, batches);
527522
}
523+
return pyarrow::ToArrowTable(std::move(batches), pyarrow_schema);
524+
}
528525

529-
// Fetch the schema once
530-
ArrowSchema arrow_schema;
531-
ArrowConverter::ToArrowSchema(&arrow_schema, result->types, names, result->client_properties);
532-
auto pyarrow_schema = pyarrow::ToPyArrowSchema(arrow_schema);
533-
534-
py::list batches;
535-
if (result->type == QueryResultType::ARROW_RESULT) {
536-
auto &arrow_result = result->Cast<ArrowQueryResult>();
537-
auto arrays = arrow_result.ConsumeArrays();
538-
for (auto &array : arrays) {
539-
ArrowArray data = array->arrow_array;
540-
array->arrow_array.release = nullptr;
541-
TransformDuckToArrowChunk(pyarrow_schema, data, batches);
542-
}
543-
} else {
544-
// STREAM_RESULT: pull the live stream directly into Arrow batches.
545-
QueryResultChunkScanState scan_state(*result);
546-
while (true) {
547-
ArrowArray data;
548-
idx_t count;
549-
{
550-
D_ASSERT(py::gil_check());
551-
py::gil_scoped_release release;
552-
count = ArrowUtil::FetchChunk(scan_state, result->client_properties, rows_per_batch, &data,
553-
ArrowTypeExtensionData::GetExtensionTypes(
554-
*result->client_properties.client_context, result->types));
555-
}
556-
if (count == 0) {
557-
break;
558-
}
559-
TransformDuckToArrowChunk(pyarrow_schema, data, batches);
560-
}
526+
duckdb::pyarrow::Table DuckDBPyResult::FetchArrowTable(const idx_t rows_per_batch, const bool to_polars) {
527+
if (!result) {
528+
throw InvalidInputException("There is no query result");
561529
}
562530

563-
return pyarrow::ToArrowTable(std::move(batches), pyarrow_schema);
531+
return RunWithArrowSchema<duckdb::pyarrow::Table>(
532+
[&](const ArrowSchema &schema) -> duckdb::pyarrow::Table {
533+
if (result->type == QueryResultType::MATERIALIZED_RESULT || result->type == QueryResultType::ARROW_RESULT) {
534+
return MaterializedResultToArrowTable(schema, rows_per_batch);
535+
}
536+
if (result->type != QueryResultType::STREAM_RESULT) {
537+
throw InternalException("FetchArrowTable called with unsupported query result: %d", result->type);
538+
}
539+
auto pyarrow_schema = pyarrow::ToPyArrowSchema(schema);
540+
py::list batches;
541+
QueryResultChunkScanState scan_state(*result);
542+
while (true) {
543+
ArrowArray data;
544+
idx_t count;
545+
{
546+
D_ASSERT(py::gil_check());
547+
py::gil_scoped_release release;
548+
count = ArrowUtil::FetchChunk(scan_state, result->client_properties, rows_per_batch, &data,
549+
ArrowTypeExtensionData::GetExtensionTypes(
550+
*result->client_properties.client_context, result->types));
551+
}
552+
if (count == 0) {
553+
break;
554+
}
555+
TransformDuckToArrowChunk(pyarrow_schema, data, batches);
556+
}
557+
return pyarrow::ToArrowTable(std::move(batches), pyarrow_schema);
558+
},
559+
to_polars);
564560
}
565561

566562
ArrowArrayStream DuckDBPyResult::FetchArrowArrayStream(idx_t rows_per_batch) {
567563
if (!result) {
568564
throw InvalidInputException("There is no query result");
569565
}
570-
// Re-feed a materialized result to get a context-owning stream; a StreamQueryResult is
571-
// wrapped directly (already has a live context).
572-
if (result->type == QueryResultType::MATERIALIZED_RESULT) {
573-
PromoteMaterializedToStream();
566+
if (result->type != QueryResultType::STREAM_RESULT) {
567+
throw InternalException("FetchArrowArrayStream called with unsupported query result: %d", result->type);
574568
}
575569
// The wrapper is owned by the ArrowArrayStream's private_data (released with the stream).
576-
ResultArrowArrayStreamWrapper *result_stream = new ResultArrowArrayStreamWrapper(std::move(result), rows_per_batch);
570+
const auto result_stream = new ResultArrowArrayStreamWrapper(std::move(result), rows_per_batch);
577571
return result_stream->stream;
578572
}
579573

580574
duckdb::pyarrow::RecordBatchReader DuckDBPyResult::FetchRecordBatchReader(idx_t rows_per_batch) {
581575
if (!result) {
582576
throw InvalidInputException("There is no query result");
583577
}
578+
579+
if (result->type == QueryResultType::MATERIALIZED_RESULT || result->type == QueryResultType::ARROW_RESULT) {
580+
constexpr bool dedup_column_names = false;
581+
return RunWithArrowSchema<duckdb::pyarrow::RecordBatchReader>(
582+
[&](const ArrowSchema &schema) -> duckdb::pyarrow::RecordBatchReader {
583+
const auto table = MaterializedResultToArrowTable(schema, rows_per_batch);
584+
return py::cast<duckdb::pyarrow::RecordBatchReader>(
585+
table.attr("to_reader")(py::arg("max_chunksize") = rows_per_batch));
586+
},
587+
dedup_column_names);
588+
}
589+
if (result->type != QueryResultType::STREAM_RESULT) {
590+
throw InternalException("FetchRecordBatchReader called with unsupported query result: %d", result->type);
591+
}
584592
py::gil_scoped_acquire acquire;
585593
auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib");
586594
auto record_batch_reader_func = pyarrow_lib_module.attr("RecordBatchReader").attr("_import_from_c");
@@ -601,11 +609,23 @@ static void ArrowArrayStreamPyCapsuleDestructor(PyObject *object) {
601609
delete stream;
602610
}
603611

604-
py::object DuckDBPyResult::FetchArrowCapsule(idx_t rows_per_batch) {
612+
py::object DuckDBPyResult::FetchArrowCapsule(const idx_t rows_per_batch) {
605613
if (!result) {
606614
throw InvalidInputException("There is no query result");
607615
}
608-
// Lazy streaming capsule backed by a context-owning stream (see FetchArrowArrayStream).
616+
617+
constexpr bool dedup_column_names = false;
618+
if (result->type == QueryResultType::MATERIALIZED_RESULT || result->type == QueryResultType::ARROW_RESULT) {
619+
return RunWithArrowSchema<py::object>(
620+
[&](const ArrowSchema &schema) -> py::object {
621+
const auto table = MaterializedResultToArrowTable(schema, rows_per_batch);
622+
return table.attr("__arrow_c_stream__")();
623+
},
624+
dedup_column_names);
625+
}
626+
if (result->type != QueryResultType::STREAM_RESULT) {
627+
throw InternalException("FetchArrowCapsule called with unsupported query result: %d", result->type);
628+
}
609629
auto inner_stream = FetchArrowArrayStream(rows_per_batch);
610630
auto stream = new ArrowArrayStream();
611631
*stream = inner_stream;

0 commit comments

Comments
 (0)