Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
823018b
deps: add arrow 15.0.2 and iceberg-arrow to pom dependency management
Shekharrajak May 22, 2026
a873c4d
feat: add retrieveTable() to IcebergCatalog for direct Table object a…
Shekharrajak May 22, 2026
c420836
feat: add IcebergArrowInputSourceReader using iceberg-arrow vectorize…
Shekharrajak May 22, 2026
866a093
feat: wire useArrowReader + arrowBatchSize into IcebergInputSource
Shekharrajak May 22, 2026
2339330
test: add IcebergArrowInputSourceReaderTest; fix IcebergInputSourceTe…
Shekharrajak May 22, 2026
8447ce0
style: fix checkstyle and forbidden-apis violations (imports, argumen…
Shekharrajak May 23, 2026
f151f5b
fix: switch arrow-memory-netty to arrow-memory-unsafe to fix CI Arrow…
Shekharrajak May 23, 2026
87bdc7f
test: add regression for aggregator source column projection (current…
Shekharrajak May 24, 2026
c774363
fix: drive Iceberg scan projection from ColumnsFilter, mirroring Delt…
Shekharrajak May 24, 2026
6db57a7
test: ColumnsFilter exclusion prunes unused columns at Iceberg scan
Shekharrajak May 24, 2026
a7fa10c
style: drop redundant comments per AGENTS.md hygiene
Shekharrajak May 24, 2026
113948f
test: regression for residual FAIL mode bypassed by Arrow path
Shekharrajak May 24, 2026
6a3ad85
fix: enforce residualFilterMode in Arrow reader path
Shekharrajak May 24, 2026
ed6c2ff
test: regression for parallel ingestion bypassing Arrow reader
Shekharrajak May 24, 2026
498daa6
fix: route splittable contract through Arrow path when useArrowReader…
Shekharrajak May 24, 2026
484ebea
fix(iceberg-arrow): open jdk.internal.misc for Arrow allocator on JDK 21
Shekharrajak May 25, 2026
bcbfe67
fix(iceberg-arrow): drop module surefire override; root pom argLine a…
Shekharrajak May 25, 2026
2dedaa9
fix(iceberg-arrow): pin Arrow allocator to Unsafe to avoid Netty back…
Shekharrajak May 25, 2026
2111b0a
fix(iceberg-arrow): pin explicit arrow versions in iceberg pom for do…
Shekharrajak May 26, 2026
bf71afb
iceberg: split useArrowReader into dedicated IcebergArrowInputSource …
Shekharrajak Jun 2, 2026
3fbb73f
iceberg: add IcebergArrowInputSourceTest covering non-splittable and …
Shekharrajak Jun 2, 2026
2845d46
iceberg: drop unused InputRowSchema import in IcebergInputSourceTest
Shekharrajak Jun 2, 2026
d046573
iceberg-arrow: null-guard InputStats in read() per nullable contract
Shekharrajak Jun 2, 2026
197afce
iceberg-arrow: regression test for null InputStats via no-arg read()
Shekharrajak Jun 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions extensions-contrib/druid-iceberg-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,23 @@
<scope>provided</scope>
</dependency>

<dependency>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Register new Arrow dependencies in licenses.yaml

This module adds iceberg-arrow, arrow-vector, and arrow-memory-unsafe, while the root pom also manages arrow-memory-netty, but licenses.yaml has no Arrow or Iceberg Arrow entries. dev/license.md says new library dependencies must be registered there, so binary license/notice generation is incomplete until these artifacts are added.

<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-arrow</artifactId>
<version>${iceberg.core.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-unsafe</artifactId>
<version>${arrow.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.iceberg.guice.HiveConf;
import org.apache.druid.iceberg.input.GlueIcebergCatalog;
import org.apache.druid.iceberg.input.HiveIcebergCatalog;
import org.apache.druid.iceberg.input.IcebergArrowInputSource;
import org.apache.druid.iceberg.input.IcebergInputSource;
import org.apache.druid.iceberg.input.LocalCatalog;
import org.apache.druid.iceberg.input.RestIcebergCatalog;
Expand All @@ -49,6 +50,7 @@ public List<? extends Module> getJacksonModules()
new NamedType(LocalCatalog.class, LocalCatalog.TYPE_KEY),
new NamedType(RestIcebergCatalog.class, RestIcebergCatalog.TYPE_KEY),
new NamedType(IcebergInputSource.class, IcebergInputSource.TYPE_KEY),
new NamedType(IcebergArrowInputSource.class, IcebergArrowInputSource.TYPE_KEY),
new NamedType(GlueIcebergCatalog.class, GlueIcebergCatalog.TYPE_KEY)
)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.iceberg.input;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.common.config.Configs;
import org.apache.druid.iceberg.filter.IcebergFilter;
import org.apache.iceberg.Table;
import org.joda.time.DateTime;

import javax.annotation.Nullable;

public abstract class AbstractIcebergInputSource
{
protected final String tableName;
protected final String namespace;
protected final IcebergCatalog icebergCatalog;
protected final IcebergFilter icebergFilter;
protected final DateTime snapshotTime;
protected final ResidualFilterMode residualFilterMode;

protected AbstractIcebergInputSource(
final String tableName,
final String namespace,
@Nullable final IcebergFilter icebergFilter,
final IcebergCatalog icebergCatalog,
@Nullable final DateTime snapshotTime,
@Nullable final ResidualFilterMode residualFilterMode
)
{
this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null");
this.namespace = Preconditions.checkNotNull(namespace, "namespace cannot be null");
this.icebergCatalog = Preconditions.checkNotNull(icebergCatalog, "icebergCatalog cannot be null");
this.icebergFilter = icebergFilter;
this.snapshotTime = snapshotTime;
this.residualFilterMode = Configs.valueOrDefault(residualFilterMode, ResidualFilterMode.IGNORE);
}

@JsonProperty
public String getTableName()
{
return tableName;
}

@JsonProperty
public String getNamespace()
{
return namespace;
}

@JsonProperty
public IcebergCatalog getIcebergCatalog()
{
return icebergCatalog;
}

@JsonProperty
public IcebergFilter getIcebergFilter()
{
return icebergFilter;
}

@Nullable
@JsonProperty
public DateTime getSnapshotTime()
{
return snapshotTime;
}

@JsonProperty
public ResidualFilterMode getResidualFilterMode()
{
return residualFilterMode;
}

protected Table retrieveTable()
{
return icebergCatalog.retrieveTable(namespace, tableName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.iceberg.input;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.iceberg.filter.IcebergFilter;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.io.File;

public class IcebergArrowInputSource extends AbstractIcebergInputSource implements InputSource
{
public static final String TYPE_KEY = "iceberg_arrow";

@JsonProperty
private final int arrowBatchSize;

@JsonCreator
public IcebergArrowInputSource(
@JsonProperty("tableName") String tableName,
@JsonProperty("namespace") String namespace,
@JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter,
@JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog,
@JsonProperty("snapshotTime") @Nullable DateTime snapshotTime,
@JsonProperty("residualFilterMode") @Nullable ResidualFilterMode residualFilterMode,
@JsonProperty("arrowBatchSize") @Nullable Integer arrowBatchSize
)
{
super(tableName, namespace, icebergFilter, icebergCatalog, snapshotTime, residualFilterMode);
this.arrowBatchSize = arrowBatchSize != null && arrowBatchSize > 0
? arrowBatchSize
: IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE;
}

@JsonProperty
public int getArrowBatchSize()
{
return arrowBatchSize;
}

@Override
public boolean needsFormat()
{
return false;
}

@Override
public boolean isSplittable()
{
return false;
}

@Override
public InputSourceReader reader(
InputRowSchema inputRowSchema,
@Nullable InputFormat inputFormat,
File temporaryDirectory
)
{
final Table table = retrieveTable();
if (icebergFilter != null) {
final TableScan filteredScan = icebergFilter.filter(
table.newScan().caseSensitive(icebergCatalog.isCaseSensitive())
);
icebergCatalog.enforceResidualMode(filteredScan, getResidualFilterMode());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Apply snapshotTime before residual enforcement

reader() checks residuals on a filtered scan built from table.newScan(), but snapshotTime is only applied later inside IcebergArrowInputSourceReader.buildScan(). With residualFilterMode=FAIL, historical ingestion can throw or pass based on the current snapshot rather than the snapshot actually being read. Apply asOfTime(snapshotTime.getMillis()) before enforceResidualMode, or move enforcement into the reader after the snapshot is selected.

}
return new IcebergArrowInputSourceReader(
table,
icebergFilter,
snapshotTime,
icebergCatalog.isCaseSensitive(),
inputRowSchema,
arrowBatchSize
);
}
}
Loading
Loading