Skip to content

Commit 5efc5e5

Browse files
Add Apache Arrow result streaming to DuckDBCommand (#334)
* Add Apache Arrow result streaming to DuckDBCommand Adds a managed Apache Arrow read surface built on DuckDB's current Arrow C Data Interface (duckdb_to_arrow_schema / duckdb_data_chunk_to_arrow), addressing #26. - Bindings: duckdb_result_get_arrow_options, duckdb_to_arrow_schema, duckdb_data_chunk_to_arrow, error-data helpers, and a DuckDBArrowOptions safe handle. - Data: DuckDBArrowArrayStream (IArrowArrayStream) that builds the schema once and converts each data chunk into an Arrow RecordBatch via Apache.Arrow's C importers. - DuckDBCommand.ExecuteArrowStream() and ExecuteArrowBatchesAsync() public APIs. - Apache.Arrow dependency added to DuckDB.NET.Data only. - Tests covering schema, scalar values, nulls, multi-chunk streaming, and the stream API. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Honor UseStreamingMode in Arrow APIs and fix result stream cleanup Pass UseStreamingMode through ExecuteArrowStream and fetch chunks via the streaming or materialized path accordingly. Dispose Arrow options and close the result if schema building fails in the stream constructor. Add tests for streaming mode, cancellation, and dispose semantics. * Refactor Arrow error handling to typed DuckDBErrorData handle - Add DuckDBErrorData SafeHandle wrapping duckdb_error_data - Move error-data P/Invokes to NativeMethods.ErrorData - Retype duckdb_to_arrow_schema/data_chunk_to_arrow returns to DuckDBErrorData - Add reusable ThrowOnError extension throwing DuckDBException - Make DuckDBArrowArrayStream internal --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 38ddceb commit 5efc5e5

8 files changed

Lines changed: 486 additions & 0 deletions

File tree

DuckDB.NET.Bindings/DuckDBWrapperObjects.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,28 @@ protected override bool ReleaseHandle()
6868
}
6969
}
7070

71+
public class DuckDBArrowOptions() : SafeHandleZeroOrMinusOneIsInvalid(true)
72+
{
73+
protected override bool ReleaseHandle()
74+
{
75+
NativeMethods.Arrow.DuckDBDestroyArrowOptions(ref handle);
76+
return true;
77+
}
78+
}
79+
80+
public class DuckDBErrorData() : SafeHandleZeroOrMinusOneIsInvalid(true)
81+
{
82+
public bool HasError => !IsInvalid && NativeMethods.ErrorData.DuckDBErrorDataHasError(this);
83+
84+
public string? Message => IsInvalid ? null : NativeMethods.ErrorData.DuckDBErrorDataMessage(this);
85+
86+
protected override bool ReleaseHandle()
87+
{
88+
NativeMethods.ErrorData.DuckDBDestroyErrorData(ref handle);
89+
return true;
90+
}
91+
}
92+
7193
public class DuckDBDataChunk : SafeHandleZeroOrMinusOneIsInvalid
7294
{
7395
public DuckDBDataChunk() : base(true)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
namespace DuckDB.NET.Native;
2+
3+
public partial class NativeMethods
4+
{
5+
//https://duckdb.org/docs/stable/clients/c/api#arrow-interface
6+
public static partial class Arrow
7+
{
8+
[LibraryImport(DuckDbLibrary, EntryPoint = "duckdb_result_get_arrow_options")]
9+
[UnmanagedCallConv(CallConvs = [typeof(CallConvCdecl)])]
10+
public static partial DuckDBArrowOptions DuckDBResultGetArrowOptions(ref DuckDBResult result);
11+
12+
[LibraryImport(DuckDbLibrary, EntryPoint = "duckdb_destroy_arrow_options")]
13+
[UnmanagedCallConv(CallConvs = [typeof(CallConvCdecl)])]
14+
public static partial void DuckDBDestroyArrowOptions(ref IntPtr arrowOptions);
15+
16+
// duckdb_error_data duckdb_to_arrow_schema(duckdb_arrow_options, duckdb_logical_type *types,
17+
// const char **names, idx_t column_count, ArrowSchema *out_schema)
18+
[LibraryImport(DuckDbLibrary, EntryPoint = "duckdb_to_arrow_schema")]
19+
[UnmanagedCallConv(CallConvs = [typeof(CallConvCdecl)])]
20+
public static partial DuckDBErrorData DuckDBToArrowSchema(DuckDBArrowOptions arrowOptions, IntPtr types, IntPtr names, ulong columnCount, IntPtr outSchema);
21+
22+
// duckdb_error_data duckdb_data_chunk_to_arrow(duckdb_arrow_options, duckdb_data_chunk, ArrowArray *out_arrow_array)
23+
[LibraryImport(DuckDbLibrary, EntryPoint = "duckdb_data_chunk_to_arrow")]
24+
[UnmanagedCallConv(CallConvs = [typeof(CallConvCdecl)])]
25+
public static partial DuckDBErrorData DuckDBDataChunkToArrow(DuckDBArrowOptions arrowOptions, DuckDBDataChunk chunk, IntPtr outArray);
26+
}
27+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
namespace DuckDB.NET.Native;
2+
3+
public partial class NativeMethods
4+
{
5+
//https://duckdb.org/docs/stable/clients/c/api#error-data
6+
public static partial class ErrorData
7+
{
8+
[SuppressGCTransition]
9+
[LibraryImport(DuckDbLibrary, EntryPoint = "duckdb_error_data_has_error")]
10+
[UnmanagedCallConv(CallConvs = [typeof(CallConvCdecl)])]
11+
[return: MarshalAs(UnmanagedType.I1)]
12+
public static partial bool DuckDBErrorDataHasError(DuckDBErrorData errorData);
13+
14+
[LibraryImport(DuckDbLibrary, EntryPoint = "duckdb_error_data_message")]
15+
[UnmanagedCallConv(CallConvs = [typeof(CallConvCdecl)])]
16+
[return: MarshalUsing(typeof(DuckDBOwnedStringMarshaller))]
17+
public static partial string DuckDBErrorDataMessage(DuckDBErrorData errorData);
18+
19+
[LibraryImport(DuckDbLibrary, EntryPoint = "duckdb_destroy_error_data")]
20+
[UnmanagedCallConv(CallConvs = [typeof(CallConvCdecl)])]
21+
public static partial void DuckDBDestroyErrorData(ref IntPtr errorData);
22+
}
23+
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
using System.Runtime.InteropServices;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Apache.Arrow;
5+
using Apache.Arrow.C;
6+
using Apache.Arrow.Ipc;
7+
using DuckDB.NET.Native;
8+
9+
namespace DuckDB.NET.Data.Arrow;
10+
11+
/// <summary>
12+
/// Streams the rows of a DuckDB query result as Apache Arrow <see cref="RecordBatch"/> values using
13+
/// DuckDB's Arrow C Data Interface (<c>duckdb_to_arrow_schema</c> / <c>duckdb_data_chunk_to_arrow</c>).
14+
/// Each DuckDB data chunk is converted into one Arrow record batch and imported with no row-by-row marshaling.
15+
/// </summary>
16+
internal sealed class DuckDBArrowArrayStream : IArrowArrayStream
17+
{
18+
private DuckDBResult result;
19+
private readonly DuckDBArrowOptions arrowOptions;
20+
private readonly bool streaming;
21+
private bool disposed;
22+
23+
public Schema Schema { get; }
24+
25+
internal DuckDBArrowArrayStream(DuckDBResult result)
26+
{
27+
this.result = result;
28+
29+
arrowOptions = NativeMethods.Arrow.DuckDBResultGetArrowOptions(ref this.result);
30+
if (arrowOptions.IsInvalid)
31+
{
32+
this.result.Close();
33+
throw new InvalidOperationException("Failed to obtain Arrow options from the DuckDB result.");
34+
}
35+
36+
streaming = NativeMethods.Types.DuckDBResultIsStreaming(this.result) > 0;
37+
38+
try
39+
{
40+
Schema = BuildSchema();
41+
}
42+
catch
43+
{
44+
arrowOptions.Dispose();
45+
this.result.Close();
46+
throw;
47+
}
48+
}
49+
50+
private unsafe Schema BuildSchema()
51+
{
52+
var columnCount = NativeMethods.Query.DuckDBColumnCount(ref result);
53+
54+
var logicalTypes = new DuckDBLogicalType[columnCount];
55+
var typeHandles = new IntPtr[columnCount];
56+
var namePointers = new IntPtr[columnCount];
57+
58+
try
59+
{
60+
for (var index = 0UL; index < columnCount; index++)
61+
{
62+
var logicalType = NativeMethods.Query.DuckDBColumnLogicalType(ref result, (long)index);
63+
logicalTypes[index] = logicalType;
64+
typeHandles[index] = logicalType.DangerousGetHandle();
65+
66+
var name = NativeMethods.Query.DuckDBColumnName(ref result, (long)index);
67+
namePointers[index] = Marshal.StringToCoTaskMemUTF8(name);
68+
}
69+
70+
var cSchema = CArrowSchema.Create();
71+
72+
try
73+
{
74+
fixed (IntPtr* typesPointer = typeHandles)
75+
fixed (IntPtr* namesPointer = namePointers)
76+
{
77+
var error = NativeMethods.Arrow.DuckDBToArrowSchema(arrowOptions, (IntPtr)typesPointer, (IntPtr)namesPointer, columnCount, (IntPtr)cSchema);
78+
error.ThrowOnError("Failed to convert the DuckDB result schema to an Arrow schema.");
79+
}
80+
81+
return CArrowSchemaImporter.ImportSchema(cSchema);
82+
}
83+
finally
84+
{
85+
CArrowSchema.Free(cSchema);
86+
}
87+
}
88+
finally
89+
{
90+
foreach (var pointer in namePointers)
91+
{
92+
if (pointer != IntPtr.Zero)
93+
{
94+
Marshal.FreeCoTaskMem(pointer);
95+
}
96+
}
97+
98+
foreach (var logicalType in logicalTypes)
99+
{
100+
logicalType?.Dispose();
101+
}
102+
}
103+
}
104+
105+
public ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
106+
{
107+
ObjectDisposedException.ThrowIf(disposed, this);
108+
109+
if (cancellationToken.IsCancellationRequested)
110+
{
111+
return new ValueTask<RecordBatch?>(Task.FromCanceled<RecordBatch?>(cancellationToken));
112+
}
113+
114+
var chunk = streaming
115+
? NativeMethods.StreamingResult.DuckDBStreamFetchChunk(result)
116+
: NativeMethods.Query.DuckDBFetchChunk(result);
117+
118+
if (chunk.IsInvalid)
119+
{
120+
chunk.Dispose();
121+
return new ValueTask<RecordBatch?>((RecordBatch?)null);
122+
}
123+
124+
try
125+
{
126+
return new ValueTask<RecordBatch?>(ConvertChunk(chunk));
127+
}
128+
finally
129+
{
130+
chunk.Dispose();
131+
}
132+
}
133+
134+
private unsafe RecordBatch ConvertChunk(DuckDBDataChunk chunk)
135+
{
136+
var cArray = CArrowArray.Create();
137+
138+
try
139+
{
140+
var error = NativeMethods.Arrow.DuckDBDataChunkToArrow(arrowOptions, chunk, (IntPtr)cArray);
141+
error.ThrowOnError("Failed to convert a DuckDB data chunk to an Arrow array.");
142+
143+
return CArrowArrayImporter.ImportRecordBatch(cArray, Schema);
144+
}
145+
finally
146+
{
147+
CArrowArray.Free(cArray);
148+
}
149+
}
150+
151+
public void Dispose()
152+
{
153+
if (disposed)
154+
{
155+
return;
156+
}
157+
158+
disposed = true;
159+
160+
arrowOptions.Dispose();
161+
result.Close();
162+
}
163+
}

DuckDB.NET.Data/Data.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ Fixes:
3232
<InternalsVisibleTo Include="DuckDB.NET.Test, PublicKey=0024000004800000940000000602000000240000525341310004000001000100852ebcffb69a0dfb906bc0377ec8608f4a00ba9af2e29300d1ed77dcb2583f08116b1a1006d202d53a48ec0561c1816738368378f7c5d335fec3daa63ba1b5413298153f886aafc75304e7653715f2395ad370fe3b2f4bc44a36a2f6b958fd500a2f7eea9a69c6ab5819e0933db962630c56c1610c7c87ed6a3c2b36e1ca4ed2" />
3333
</ItemGroup>
3434

35+
<ItemGroup>
36+
<PackageReference Include="Apache.Arrow" Version="23.0.0" />
37+
</ItemGroup>
38+
3539
<ItemGroup>
3640
<ProjectReference Include="..\DuckDB.NET.Bindings\Bindings.csproj" />
3741
</ItemGroup>

DuckDB.NET.Data/DuckDBCommand.cs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
using System.ComponentModel;
22
using System.Diagnostics.CodeAnalysis;
33
using System.Runtime.CompilerServices;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Apache.Arrow;
7+
using Apache.Arrow.Ipc;
8+
using DuckDB.NET.Data.Arrow;
49

510
namespace DuckDB.NET.Data;
611

@@ -109,6 +114,57 @@ protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior)
109114
return reader;
110115
}
111116

117+
/// <summary>
118+
/// Executes the command and returns the first result set as an Apache Arrow
119+
/// <see cref="IArrowArrayStream"/>. Each DuckDB data chunk is converted to an Arrow record batch
120+
/// using DuckDB's Arrow C Data Interface, with no row-by-row marshaling.
121+
/// When <see cref="UseStreamingMode"/> is enabled, record batches are produced lazily from a
122+
/// streaming result (bounded memory), otherwise the result is materialized first.
123+
/// The caller owns the returned stream and must dispose it.
124+
/// </summary>
125+
public IArrowArrayStream ExecuteArrowStream()
126+
{
127+
EnsureConnectionOpen();
128+
129+
var results = PreparedStatement.PreparedStatement.PrepareMultiple(connection!.NativeConnection, CommandText, parameters, UseStreamingMode);
130+
131+
foreach (var result in results)
132+
{
133+
var current = result;
134+
135+
if (NativeMethods.Query.DuckDBResultReturnType(current) == DuckDBResultType.QueryResult)
136+
{
137+
return new DuckDBArrowArrayStream(current);
138+
}
139+
140+
current.Close();
141+
}
142+
143+
throw new InvalidOperationException("The command did not return a result set.");
144+
}
145+
146+
/// <summary>
147+
/// Executes the command and asynchronously streams the first result set as Apache Arrow
148+
/// <see cref="RecordBatch"/> values. The batches are produced lazily, one per DuckDB data chunk.
149+
/// Set <see cref="UseStreamingMode"/> to stream from a streaming result with bounded memory.
150+
/// </summary>
151+
public async IAsyncEnumerable<RecordBatch> ExecuteArrowBatchesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
152+
{
153+
var stream = ExecuteArrowStream();
154+
155+
try
156+
{
157+
while (await stream.ReadNextRecordBatchAsync(cancellationToken).ConfigureAwait(false) is { } batch)
158+
{
159+
yield return batch;
160+
}
161+
}
162+
finally
163+
{
164+
stream.Dispose();
165+
}
166+
}
167+
112168
public override void Prepare() { }
113169

114170
protected override DbParameter CreateDbParameter() => new DuckDBParameter();
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
namespace DuckDB.NET.Data.Extensions;
2+
3+
internal static class DuckDBErrorDataExtensions
4+
{
5+
public static void ThrowOnError(this DuckDBErrorData errorData, string message)
6+
{
7+
using (errorData)
8+
{
9+
if (errorData.HasError)
10+
{
11+
throw new DuckDBException($"{message} {errorData.Message}".TrimEnd());
12+
}
13+
}
14+
}
15+
}

0 commit comments

Comments
 (0)