Skip to content

Commit e332733

Browse files
authored
fix(arrow/cdata): make nativeCRecordBatchReader deterministic (#793)
### Rationale for this change Instead of relying on a finalizer, make the `nativeCRecordBatchReader` use atomic Retain and Release to make releasing deterministic. ### What changes are included in this PR? Remove the finalizer, call C.ArrowArrayRelease and C.ArrowArrayStreamRelease based on refcount. ### Are these changes tested? Yes, tests cover this already. ### Are there any user-facing changes? Only that retain and release now properly control the determinism of releasing the C memory instead of relying on a finalizer.
1 parent 2b2aa6b commit e332733

2 files changed

Lines changed: 38 additions & 13 deletions

File tree

arrow/cdata/cdata.go

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,14 @@ import (
4747
"runtime"
4848
"strconv"
4949
"strings"
50+
"sync/atomic"
5051
"syscall"
5152
"unsafe"
5253

5354
"github.com/apache/arrow-go/v18/arrow"
5455
"github.com/apache/arrow-go/v18/arrow/array"
5556
"github.com/apache/arrow-go/v18/arrow/bitutil"
57+
"github.com/apache/arrow-go/v18/arrow/internal/debug"
5658
"github.com/apache/arrow-go/v18/arrow/memory"
5759
)
5860

@@ -903,18 +905,19 @@ func importCArrayAsType(arr *CArrowArray, dt arrow.DataType) (imp *cimporter, er
903905
}
904906

905907
func initReader(rdr *nativeCRecordBatchReader, stream *CArrowArrayStream) error {
908+
rdr.refCount.Store(1)
906909
rdr.stream = C.get_stream()
907910
C.ArrowArrayStreamMove(stream, rdr.stream)
908911
rdr.arr = C.get_arr()
909-
runtime.SetFinalizer(rdr, func(r *nativeCRecordBatchReader) {
910-
if r.cur != nil {
911-
r.cur.Release()
912-
}
913-
C.ArrowArrayStreamRelease(r.stream)
914-
C.ArrowArrayRelease(r.arr)
915-
C.free(unsafe.Pointer(r.stream))
916-
C.free(unsafe.Pointer(r.arr))
917-
})
912+
913+
rdr.cleanUps[0] = runtime.AddCleanup(rdr, func(s *CArrowArrayStream) {
914+
C.ArrowArrayStreamRelease(s)
915+
C.free(unsafe.Pointer(s))
916+
}, rdr.stream)
917+
rdr.cleanUps[1] = runtime.AddCleanup(rdr, func(a *CArrowArray) {
918+
C.ArrowArrayRelease(a)
919+
C.free(unsafe.Pointer(a))
920+
}, rdr.arr)
918921

919922
var sc CArrowSchema
920923
errno := C.stream_get_schema(rdr.stream, &sc)
@@ -940,12 +943,32 @@ type nativeCRecordBatchReader struct {
940943

941944
cur arrow.RecordBatch
942945
err error
946+
947+
refCount atomic.Int64
948+
cleanUps [2]runtime.Cleanup
949+
}
950+
951+
func (n *nativeCRecordBatchReader) Retain() {
952+
n.refCount.Add(1)
943953
}
944954

945-
// No need to implement retain and release here as we used runtime.SetFinalizer when constructing
946-
// the reader to free up the ArrowArrayStream memory when the garbage collector cleans it up.
947-
func (n *nativeCRecordBatchReader) Retain() {}
948-
func (n *nativeCRecordBatchReader) Release() {}
955+
func (n *nativeCRecordBatchReader) Release() {
956+
rc := n.refCount.Add(-1)
957+
debug.Assert(rc >= 0, "too many releases")
958+
959+
if rc == 0 {
960+
n.cleanUps[0].Stop()
961+
n.cleanUps[1].Stop()
962+
if n.cur != nil {
963+
n.cur.Release()
964+
}
965+
966+
C.ArrowArrayStreamRelease(n.stream)
967+
C.ArrowArrayRelease(n.arr)
968+
C.free(unsafe.Pointer(n.stream))
969+
C.free(unsafe.Pointer(n.arr))
970+
}
971+
}
949972

950973
func (n *nativeCRecordBatchReader) Err() error { return n.err }
951974
func (n *nativeCRecordBatchReader) RecordBatch() arrow.RecordBatch { return n.cur }

arrow/cdata/interface.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,10 @@ func ImportCArrayStream(stream *CArrowArrayStream, schema *arrow.Schema) arrio.R
189189
func ImportCRecordReader(stream *CArrowArrayStream, schema *arrow.Schema) (arrio.Reader, error) {
190190
out := &nativeCRecordBatchReader{schema: schema}
191191
if err := initReader(out, stream); err != nil {
192+
out.Release()
192193
return nil, err
193194
}
195+
194196
return out, nil
195197
}
196198

0 commit comments

Comments
 (0)