Skip to content

Commit 82adc73

Browse files
authored
feat: add support to select columns in table scan planning (#550)
1 parent 257f1dc commit 82adc73

File tree

4 files changed

+143
-5
lines changed

4 files changed

+143
-5
lines changed

src/iceberg/table_scan.cc

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
#include "iceberg/table_scan.h"
2121

2222
#include <cstring>
23+
#include <iterator>
2324

25+
#include "iceberg/expression/binder.h"
2426
#include "iceberg/expression/expression.h"
2527
#include "iceberg/file_reader.h"
2628
#include "iceberg/manifest/manifest_entry.h"
@@ -32,6 +34,7 @@
3234
#include "iceberg/util/macros.h"
3335
#include "iceberg/util/snapshot_util_internal.h"
3436
#include "iceberg/util/timepoint.h"
37+
#include "iceberg/util/type_util.h"
3538

3639
namespace iceberg {
3740

@@ -414,11 +417,32 @@ TableScan::ResolveProjectedSchema() const {
414417
}
415418

416419
if (!context_.selected_columns.empty()) {
417-
/// TODO(gangwu): port Java BaseScan.lazyColumnProjection to collect field ids
418-
/// from selected column names and bound references in the filter, and then create
419-
/// projected schema based on the collected field ids.
420-
return NotImplemented(
421-
"Selecting columns by name to create projected schema is not yet implemented");
420+
std::unordered_set<int32_t> required_field_ids;
421+
422+
// Include columns referenced by filter
423+
if (context_.filter != nullptr) {
424+
ICEBERG_ASSIGN_OR_RAISE(auto is_bound, IsBoundVisitor::IsBound(context_.filter));
425+
if (is_bound) {
426+
ICEBERG_ASSIGN_OR_RAISE(required_field_ids,
427+
ReferenceVisitor::GetReferencedFieldIds(context_.filter));
428+
} else {
429+
ICEBERG_ASSIGN_OR_RAISE(auto filter, Binder::Bind(*schema_, context_.filter,
430+
context_.case_sensitive));
431+
ICEBERG_ASSIGN_OR_RAISE(required_field_ids,
432+
ReferenceVisitor::GetReferencedFieldIds(filter));
433+
}
434+
}
435+
436+
// Include columns selected by option
437+
ICEBERG_ASSIGN_OR_RAISE(auto selected, schema_->Select(context_.selected_columns,
438+
context_.case_sensitive));
439+
ICEBERG_ASSIGN_OR_RAISE(
440+
auto selected_field_ids,
441+
GetProjectedIdsVisitor::GetProjectedIds(*selected, /*include_struct_ids=*/true));
442+
required_field_ids.insert(std::make_move_iterator(selected_field_ids.begin()),
443+
std::make_move_iterator(selected_field_ids.end()));
444+
445+
ICEBERG_ASSIGN_OR_RAISE(projected_schema_, schema_->Project(required_field_ids));
422446
} else if (context_.projected_schema != nullptr) {
423447
projected_schema_ = context_.projected_schema;
424448
} else {

src/iceberg/test/table_scan_test.cc

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,109 @@ TEST_P(TableScanTest, PlanFilesWithDeleteFiles) {
666666
}
667667
}
668668

669+
TEST_P(TableScanTest, SchemaWithSelectedColumnsAndFilter) {
670+
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
671+
SchemaField::MakeRequired(/*field_id=*/1, "id", int32()),
672+
SchemaField::MakeRequired(/*field_id=*/2, "data", string()),
673+
SchemaField::MakeRequired(/*field_id=*/3, "value", int64())});
674+
auto timestamp_ms = TimePointMsFromUnixMs(1609459200000L);
675+
auto metadata = std::make_shared<TableMetadata>(TableMetadata{
676+
.format_version = 2,
677+
.table_uuid = "test-table-uuid",
678+
.location = "/tmp/table",
679+
.last_sequence_number = 1L,
680+
.last_updated_ms = timestamp_ms,
681+
.last_column_id = 3,
682+
.schemas = {schema},
683+
.current_schema_id = schema->schema_id(),
684+
.partition_specs = {unpartitioned_spec_},
685+
.default_spec_id = unpartitioned_spec_->spec_id(),
686+
.last_partition_id = 1000,
687+
.current_snapshot_id = 1000L,
688+
.snapshots = {std::make_shared<Snapshot>(Snapshot{
689+
.snapshot_id = 1000L,
690+
.parent_snapshot_id = std::nullopt,
691+
.sequence_number = 1L,
692+
.timestamp_ms = timestamp_ms,
693+
.manifest_list = "/tmp/metadata/snap-1000-1-manifest-list.avro",
694+
.schema_id = schema->schema_id(),
695+
})},
696+
.snapshot_log = {SnapshotLogEntry{.timestamp_ms = timestamp_ms,
697+
.snapshot_id = 1000L}},
698+
.default_sort_order_id = 0,
699+
.refs = {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
700+
.snapshot_id = 1000L,
701+
.retention = SnapshotRef::Branch{},
702+
})}},
703+
});
704+
705+
// Select "data" column, filter on "id" column
706+
{
707+
ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, file_io_));
708+
builder->Select({"data"}).Filter(Expressions::Equal("id", Literal::Int(42)));
709+
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
710+
ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema());
711+
712+
ASSERT_EQ(projected_schema->fields().size(), 2);
713+
714+
ICEBERG_UNWRAP_OR_FAIL(auto id_field, projected_schema->FindFieldByName("id"));
715+
EXPECT_TRUE(id_field.has_value());
716+
EXPECT_EQ(id_field->get().field_id(), 1);
717+
718+
ICEBERG_UNWRAP_OR_FAIL(auto data_field, projected_schema->FindFieldByName("data"));
719+
EXPECT_TRUE(data_field.has_value());
720+
EXPECT_EQ(data_field->get().field_id(), 2);
721+
}
722+
723+
// Select "id" and "value", filter on "data"
724+
{
725+
ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, file_io_));
726+
builder->Select({"id", "value"})
727+
.Filter(Expressions::Equal("data", Literal::String("test")));
728+
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
729+
ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema());
730+
731+
ASSERT_EQ(projected_schema->fields().size(), 3);
732+
733+
ICEBERG_UNWRAP_OR_FAIL(auto id_field, projected_schema->FindFieldByName("id"));
734+
EXPECT_TRUE(id_field.has_value());
735+
736+
ICEBERG_UNWRAP_OR_FAIL(auto data_field, projected_schema->FindFieldByName("data"));
737+
EXPECT_TRUE(data_field.has_value());
738+
739+
ICEBERG_UNWRAP_OR_FAIL(auto value_field, projected_schema->FindFieldByName("value"));
740+
EXPECT_TRUE(value_field.has_value());
741+
}
742+
743+
// Select "id", filter on "id" - should only have "id" once
744+
{
745+
ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, file_io_));
746+
builder->Select({"id"}).Filter(Expressions::Equal("id", Literal::Int(42)));
747+
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
748+
ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema());
749+
750+
ASSERT_EQ(projected_schema->fields().size(), 1);
751+
752+
ICEBERG_UNWRAP_OR_FAIL(auto id_field, projected_schema->FindFieldByName("id"));
753+
EXPECT_TRUE(id_field.has_value());
754+
EXPECT_EQ(id_field->get().field_id(), 1);
755+
}
756+
757+
// Select columns without filter
758+
{
759+
ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, file_io_));
760+
builder->Select({"data"});
761+
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
762+
ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema());
763+
764+
ASSERT_EQ(projected_schema->fields().size(), 1);
765+
766+
ICEBERG_UNWRAP_OR_FAIL(auto data_field, projected_schema->FindFieldByName("data"));
767+
EXPECT_TRUE(data_field.has_value());
768+
EXPECT_EQ(data_field->get().field_id(), 2);
769+
}
770+
}
771+
669772
INSTANTIATE_TEST_SUITE_P(TableScanVersions, TableScanTest, testing::Values(1, 2, 3));
670773

671774
} // namespace iceberg

src/iceberg/util/type_util.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "iceberg/schema.h"
2626
#include "iceberg/util/checked_cast.h"
2727
#include "iceberg/util/formatter_internal.h"
28+
#include "iceberg/util/macros.h"
2829
#include "iceberg/util/string_util.h"
2930
#include "iceberg/util/visit_type.h"
3031

@@ -300,6 +301,13 @@ Status GetProjectedIdsVisitor::VisitPrimitive(const PrimitiveType& type) { retur
300301

301302
std::unordered_set<int32_t> GetProjectedIdsVisitor::Finish() const { return ids_; }
302303

304+
Result<std::unordered_set<int32_t>> GetProjectedIdsVisitor::GetProjectedIds(
305+
const Type& type, bool include_struct_ids) {
306+
GetProjectedIdsVisitor visitor(include_struct_ids);
307+
ICEBERG_RETURN_UNEXPECTED(visitor.Visit(type));
308+
return visitor.Finish();
309+
}
310+
303311
std::unordered_map<int32_t, int32_t> IndexParents(const StructType& root_struct) {
304312
std::unordered_map<int32_t, int32_t> id_to_parent;
305313
std::stack<int32_t> parent_id_stack;

src/iceberg/util/type_util.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ class GetProjectedIdsVisitor {
132132
Status VisitPrimitive(const PrimitiveType& type);
133133
std::unordered_set<int32_t> Finish() const;
134134

135+
static Result<std::unordered_set<int32_t>> GetProjectedIds(
136+
const Type& type, bool include_struct_ids = false);
137+
135138
private:
136139
const bool include_struct_ids_;
137140
std::unordered_set<int32_t> ids_;

0 commit comments

Comments
 (0)