feat(data): add MOR file scan task reader#657
Conversation
Add FileScanTaskReader as the delete-aware Arrow stream entrypoint for file scan tasks, covering both no-delete reader passthrough and merge-on-read filtering with position and equality deletes. Introduce reusable Arrow C Data utilities for stream wrapping and batch projection, including ProjectionContext caching and optional Arrow compute registration via arrow::RegisterAll. Move Arrow IO registration into arrow_register and remove FileScanTask::ToArrow. Add coverage for projection behavior across nanoarrow and Arrow compute paths, plus end-to-end FileScanTaskReader tests for projected reads, position deletes, equality deletes, dropped equality fields, and fully deleted batches.
WZhuo
left a comment
There was a problem hiding this comment.
Nice work! This is a well-structured PR with solid test coverage. The design cleanly separates the nanoarrow fallback from the optional Arrow compute path, and the ArrowArrayStreamProvider concept is a good generalization. A few comments below — mostly minor suggestions and documentation requests.
| ProjectionContext() = default; | ||
|
|
||
| const Schema* input_schema_ = nullptr; | ||
| const Schema* output_schema_ = nullptr; |
There was a problem hiding this comment.
nit: Storing raw pointers here with the requirement that callers ensure the schemas outlive the context is fragile. A future refactor could easily break this invariant. Consider either storing shared_ptr<const Schema> or at minimum adding a prominent comment at each call site (e.g. in FileScanTaskReader::Impl::Open) explaining why the lifetime is guaranteed.
| #include <span> | ||
| #include <string> | ||
| #include <tuple> | ||
| #include <utility> |
There was a problem hiding this comment.
nit: <tuple> does not appear to be used in this header. Can be removed.
| const Schema* input_schema_ = nullptr; | ||
| const Schema* output_schema_ = nullptr; | ||
| std::vector<int> selected_field_indices_; | ||
| ArrowSchema input_arrow_schema_{}; |
There was a problem hiding this comment.
nit: selected_field_indices_ is vector<int> but row_indices in ProjectBatch is span<const int32_t>. Using int32_t consistently for both would avoid implicit conversions in the Arrow compute path.
|
|
||
| /// \brief Cached state for ProjectBatch over one input/output schema pair. | ||
| class ICEBERG_EXPORT ProjectionContext { | ||
| public: |
There was a problem hiding this comment.
nit: This is an internal header (suffix _internal.h) but uses ICEBERG_EXPORT. This is needed because the symbols are used across TUs within the library, but it is a bit surprising. A brief comment explaining why would help future readers.
| } | ||
|
|
||
| void ProjectionContext::RegisterProjectBatchFunction( | ||
| ProjectionContext::ProjectBatchFunction project_batch_function) { |
There was a problem hiding this comment.
The global function pointer with mutex is a process-wide singleton — if two libraries in the same process register different implementations, the last one wins silently. Worth documenting this "last writer wins" behavior, either here or in the header doc for RegisterProjectBatchFunction.
| ProjectionContext& projection) { | ||
| ICEBERG_PRECHECK(input_batch != nullptr, "input_batch must not be null"); | ||
| internal::ArrowArrayGuard input_batch_guard(input_batch); | ||
|
|
There was a problem hiding this comment.
Suggestion: add a comment explaining why the ArrowArrayGuard here does not double-release when the Arrow compute path calls ImportRecordBatch(input_batch, ...). The reason is that ImportRecordBatch zeroes the release pointer after import, so the guard becomes a no-op. Without this note, a reader might worry about double-free.
| ArrowArrayViewSetArray(&input_view, &input_batch, &error), error); | ||
| ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( | ||
| ArrowArrayViewValidate(&input_view, NANOARROW_VALIDATION_LEVEL_FULL, &error), | ||
| error); |
There was a problem hiding this comment.
NANOARROW_VALIDATION_LEVEL_FULL on every batch is expensive for production workloads. Consider making this configurable (e.g. via a debug flag or build option), or using NANOARROW_VALIDATION_LEVEL_DEFAULT by default and only enabling full validation in debug/test builds.
| ::arrow::ImportRecordBatch(input_batch, state->input_schema)); | ||
|
|
||
| const int32_t empty_index = 0; | ||
| const int32_t* row_indices_data = |
There was a problem hiding this comment.
nit: empty_index is a bit misleading — it is just a non-null pointer to satisfy Buffer::Wrap when row_indices is empty (the value 0 is never actually used). A brief comment would clarify intent, e.g.:
// Provide a valid pointer for Buffer::Wrap; the array length is 0 so this is never read.| if (!cached_schema_.has_value()) { | ||
| ICEBERG_ASSIGN_OR_RAISE(cached_schema_, reader_->Schema()); | ||
| } | ||
| ArrowSchema& input_arrow_schema = cached_schema_.value(); |
There was a problem hiding this comment.
The schema returned by reader_->Schema() is cached on first call and reused for all subsequent batches. This is correct for Parquet (schema is fixed), but if a future format reader could return varying schemas across batches, this would break. A brief comment noting the assumption would be helpful.
Add FileScanTaskReader as the delete-aware Arrow stream entrypoint for file scan tasks, covering both no-delete reader passthrough and merge-on-read filtering with position and equality deletes.
Introduce reusable Arrow C Data utilities for stream wrapping and batch projection, including ProjectionContext caching and optional Arrow compute registration via arrow::RegisterAll. Move Arrow IO registration into arrow_register and remove FileScanTask::ToArrow.
Add coverage for projection behavior across nanoarrow and Arrow compute paths, plus end-to-end FileScanTaskReader tests for projected reads, position deletes, equality deletes, dropped equality fields, and fully deleted batches.