Skip to content

feat(data): add MOR file scan task reader#657

Open
wgtmac wants to merge 1 commit into
apache:mainfrom
wgtmac:scan_task_reader
Open

feat(data): add MOR file scan task reader#657
wgtmac wants to merge 1 commit into
apache:mainfrom
wgtmac:scan_task_reader

Conversation

@wgtmac
Copy link
Copy Markdown
Member

@wgtmac wgtmac commented May 18, 2026

Add FileScanTaskReader as the delete-aware Arrow stream entrypoint for file scan tasks, covering both no-delete reader passthrough and merge-on-read filtering with position and equality deletes.

Introduce reusable Arrow C Data utilities for stream wrapping and batch projection, including ProjectionContext caching and optional Arrow compute registration via arrow::RegisterAll. Move Arrow IO registration into arrow_register and remove FileScanTask::ToArrow.

Add coverage for projection behavior across nanoarrow and Arrow compute paths, plus end-to-end FileScanTaskReader tests for projected reads, position deletes, equality deletes, dropped equality fields, and fully deleted batches.

Add FileScanTaskReader as the delete-aware Arrow stream entrypoint for file scan
tasks, covering both no-delete reader passthrough and merge-on-read filtering with
position and equality deletes.

Introduce reusable Arrow C Data utilities for stream wrapping and batch
projection, including ProjectionContext caching and optional Arrow compute
registration via arrow::RegisterAll. Move Arrow IO registration into
arrow_register and remove FileScanTask::ToArrow.

Add coverage for projection behavior across nanoarrow and Arrow compute paths,
plus end-to-end FileScanTaskReader tests for projected reads, position deletes,
equality deletes, dropped equality fields, and fully deleted batches.
@wgtmac wgtmac force-pushed the scan_task_reader branch from 48c2739 to 91364da Compare May 18, 2026 05:42
Copy link
Copy Markdown
Contributor

@WZhuo WZhuo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! This is a well-structured PR with solid test coverage. The design cleanly separates the nanoarrow fallback from the optional Arrow compute path, and the ArrowArrayStreamProvider concept is a good generalization. A few comments below — mostly minor suggestions and documentation requests.

ProjectionContext() = default;

const Schema* input_schema_ = nullptr;
const Schema* output_schema_ = nullptr;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Storing raw pointers here with the requirement that callers ensure the schemas outlive the context is fragile. A future refactor could easily break this invariant. Consider either storing shared_ptr<const Schema> or at minimum adding a prominent comment at each call site (e.g. in FileScanTaskReader::Impl::Open) explaining why the lifetime is guaranteed.

#include <span>
#include <string>
#include <tuple>
#include <utility>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: <tuple> does not appear to be used in this header. Can be removed.

const Schema* input_schema_ = nullptr;
const Schema* output_schema_ = nullptr;
std::vector<int> selected_field_indices_;
ArrowSchema input_arrow_schema_{};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: selected_field_indices_ is vector<int> but row_indices in ProjectBatch is span<const int32_t>. Using int32_t consistently for both would avoid implicit conversions in the Arrow compute path.


/// \brief Cached state for ProjectBatch over one input/output schema pair.
class ICEBERG_EXPORT ProjectionContext {
public:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is an internal header (suffix _internal.h) but uses ICEBERG_EXPORT. This is needed because the symbols are used across TUs within the library, but it is a bit surprising. A brief comment explaining why would help future readers.

}

void ProjectionContext::RegisterProjectBatchFunction(
ProjectionContext::ProjectBatchFunction project_batch_function) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The global function pointer with mutex is a process-wide singleton — if two libraries in the same process register different implementations, the last one wins silently. Worth documenting this "last writer wins" behavior, either here or in the header doc for RegisterProjectBatchFunction.

ProjectionContext& projection) {
ICEBERG_PRECHECK(input_batch != nullptr, "input_batch must not be null");
internal::ArrowArrayGuard input_batch_guard(input_batch);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: add a comment explaining why the ArrowArrayGuard here does not double-release when the Arrow compute path calls ImportRecordBatch(input_batch, ...). The reason is that ImportRecordBatch zeroes the release pointer after import, so the guard becomes a no-op. Without this note, a reader might worry about double-free.

ArrowArrayViewSetArray(&input_view, &input_batch, &error), error);
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
ArrowArrayViewValidate(&input_view, NANOARROW_VALIDATION_LEVEL_FULL, &error),
error);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NANOARROW_VALIDATION_LEVEL_FULL on every batch is expensive for production workloads. Consider making this configurable (e.g. via a debug flag or build option), or using NANOARROW_VALIDATION_LEVEL_DEFAULT by default and only enabling full validation in debug/test builds.

::arrow::ImportRecordBatch(input_batch, state->input_schema));

const int32_t empty_index = 0;
const int32_t* row_indices_data =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty_index is a bit misleading — it is just a non-null pointer to satisfy Buffer::Wrap when row_indices is empty (the value 0 is never actually used). A brief comment would clarify intent, e.g.:

// Provide a valid pointer for Buffer::Wrap; the array length is 0 so this is never read.

if (!cached_schema_.has_value()) {
ICEBERG_ASSIGN_OR_RAISE(cached_schema_, reader_->Schema());
}
ArrowSchema& input_arrow_schema = cached_schema_.value();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema returned by reader_->Schema() is cached on first call and reused for all subsequent batches. This is correct for Parquet (schema is fixed), but if a future format reader could return varying schemas across batches, this would break. A brief comment noting the assumption would be helpful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants