diff --git a/extensions-contrib/druid-iceberg-extensions/pom.xml b/extensions-contrib/druid-iceberg-extensions/pom.xml
index a50b2d701acc..5e0feba1b3b4 100644
--- a/extensions-contrib/druid-iceberg-extensions/pom.xml
+++ b/extensions-contrib/druid-iceberg-extensions/pom.xml
@@ -760,6 +760,29 @@
provided
+
+ org.apache.iceberg
+ iceberg-arrow
+ ${iceberg.core.version}
+
+
+ org.apache.arrow
+ arrow-memory-netty
+
+
+
+
+ org.apache.arrow
+ arrow-vector
+ ${arrow.version}
+
+
+ org.apache.arrow
+ arrow-memory-unsafe
+ ${arrow.version}
+ runtime
+
+
org.apache.iceberg
iceberg-parquet
diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java
index d238cccc248c..82c293466460 100644
--- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java
+++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java
@@ -27,6 +27,7 @@
import org.apache.druid.iceberg.guice.HiveConf;
import org.apache.druid.iceberg.input.GlueIcebergCatalog;
import org.apache.druid.iceberg.input.HiveIcebergCatalog;
+import org.apache.druid.iceberg.input.IcebergArrowInputSource;
import org.apache.druid.iceberg.input.IcebergInputSource;
import org.apache.druid.iceberg.input.LocalCatalog;
import org.apache.druid.iceberg.input.RestIcebergCatalog;
@@ -49,6 +50,7 @@ public List extends Module> getJacksonModules()
new NamedType(LocalCatalog.class, LocalCatalog.TYPE_KEY),
new NamedType(RestIcebergCatalog.class, RestIcebergCatalog.TYPE_KEY),
new NamedType(IcebergInputSource.class, IcebergInputSource.TYPE_KEY),
+ new NamedType(IcebergArrowInputSource.class, IcebergArrowInputSource.TYPE_KEY),
new NamedType(GlueIcebergCatalog.class, GlueIcebergCatalog.TYPE_KEY)
)
);
diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/AbstractIcebergInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/AbstractIcebergInputSource.java
new file mode 100644
index 000000000000..ba5e1b1d45d2
--- /dev/null
+++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/AbstractIcebergInputSource.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.input;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.iceberg.filter.IcebergFilter;
+import org.apache.iceberg.Table;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+
+public abstract class AbstractIcebergInputSource
+{
+ protected final String tableName;
+ protected final String namespace;
+ protected final IcebergCatalog icebergCatalog;
+ protected final IcebergFilter icebergFilter;
+ protected final DateTime snapshotTime;
+ protected final ResidualFilterMode residualFilterMode;
+
+ protected AbstractIcebergInputSource(
+ final String tableName,
+ final String namespace,
+ @Nullable final IcebergFilter icebergFilter,
+ final IcebergCatalog icebergCatalog,
+ @Nullable final DateTime snapshotTime,
+ @Nullable final ResidualFilterMode residualFilterMode
+ )
+ {
+ this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null");
+ this.namespace = Preconditions.checkNotNull(namespace, "namespace cannot be null");
+ this.icebergCatalog = Preconditions.checkNotNull(icebergCatalog, "icebergCatalog cannot be null");
+ this.icebergFilter = icebergFilter;
+ this.snapshotTime = snapshotTime;
+ this.residualFilterMode = Configs.valueOrDefault(residualFilterMode, ResidualFilterMode.IGNORE);
+ }
+
+ @JsonProperty
+ public String getTableName()
+ {
+ return tableName;
+ }
+
+ @JsonProperty
+ public String getNamespace()
+ {
+ return namespace;
+ }
+
+ @JsonProperty
+ public IcebergCatalog getIcebergCatalog()
+ {
+ return icebergCatalog;
+ }
+
+ @JsonProperty
+ public IcebergFilter getIcebergFilter()
+ {
+ return icebergFilter;
+ }
+
+ @Nullable
+ @JsonProperty
+ public DateTime getSnapshotTime()
+ {
+ return snapshotTime;
+ }
+
+ @JsonProperty
+ public ResidualFilterMode getResidualFilterMode()
+ {
+ return residualFilterMode;
+ }
+
+ protected Table retrieveTable()
+ {
+ return icebergCatalog.retrieveTable(namespace, tableName);
+ }
+}
diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSource.java
new file mode 100644
index 000000000000..6433b6fe1ab1
--- /dev/null
+++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSource.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.iceberg.filter.IcebergFilter;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.io.File;
+
+public class IcebergArrowInputSource extends AbstractIcebergInputSource implements InputSource
+{
+ public static final String TYPE_KEY = "iceberg_arrow";
+
+ @JsonProperty
+ private final int arrowBatchSize;
+
+ @JsonCreator
+ public IcebergArrowInputSource(
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("namespace") String namespace,
+ @JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter,
+ @JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog,
+ @JsonProperty("snapshotTime") @Nullable DateTime snapshotTime,
+ @JsonProperty("residualFilterMode") @Nullable ResidualFilterMode residualFilterMode,
+ @JsonProperty("arrowBatchSize") @Nullable Integer arrowBatchSize
+ )
+ {
+ super(tableName, namespace, icebergFilter, icebergCatalog, snapshotTime, residualFilterMode);
+ this.arrowBatchSize = arrowBatchSize != null && arrowBatchSize > 0
+ ? arrowBatchSize
+ : IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE;
+ }
+
+ @JsonProperty
+ public int getArrowBatchSize()
+ {
+ return arrowBatchSize;
+ }
+
+ @Override
+ public boolean needsFormat()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isSplittable()
+ {
+ return false;
+ }
+
+ @Override
+ public InputSourceReader reader(
+ InputRowSchema inputRowSchema,
+ @Nullable InputFormat inputFormat,
+ File temporaryDirectory
+ )
+ {
+ final Table table = retrieveTable();
+ if (icebergFilter != null) {
+ TableScan filteredScan = icebergFilter.filter(
+ table.newScan().caseSensitive(icebergCatalog.isCaseSensitive())
+ );
+ if (getSnapshotTime() != null) {
+ filteredScan = filteredScan.asOfTime(getSnapshotTime().getMillis());
+ }
+ icebergCatalog.enforceResidualMode(filteredScan, getResidualFilterMode());
+ }
+ return new IcebergArrowInputSourceReader(
+ table,
+ icebergFilter,
+ snapshotTime,
+ icebergCatalog.isCaseSensitive(),
+ inputRowSchema,
+ arrowBatchSize
+ );
+ }
+}
diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java
new file mode 100644
index 000000000000..80f2156841fb
--- /dev/null
+++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.input;
+
+import com.google.common.collect.Maps;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliTZVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampNanoTZVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputStats;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.iceberg.filter.IcebergFilter;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.arrow.vectorized.ArrowReader;
+import org.apache.iceberg.arrow.vectorized.ColumnarBatch;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.TableScanUtil;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Reads an Iceberg table via iceberg-arrow's {@link ArrowReader}, yielding {@link InputRow} objects.
+ *
+ * Delete application (V2 equality and positional deletes), type coercion, and schema evolution are
+ * handled entirely by the Iceberg library. Druid only consumes the resulting {@link ColumnarBatch}
+ * batches and maps them to {@link MapBasedInputRow}.
+ *
+ * Column projection and predicate push-down are applied at scan planning time so only requested
+ * columns and matching files are read from storage.
+ *
+ * Note: iceberg-arrow currently supports Parquet data files only. ORC and Avro files will throw
+ * {@link UnsupportedOperationException} at read time; use the standard delegate path for those.
+ */
+public class IcebergArrowInputSourceReader implements InputSourceReader
+{
+ // Pin Arrow to Unsafe allocator: Netty backend fails on JDK 25 (EmptyByteBuf.memoryAddress UnsupportedOperationException).
+ static {
+ if (System.getProperty("arrow.allocation.manager.type") == null) {
+ System.setProperty("arrow.allocation.manager.type", "Unsafe");
+ }
+ }
+
+ static final int DEFAULT_BATCH_SIZE = 1024;
+
+ private final Table table;
+ @Nullable
+ private final IcebergFilter icebergFilter;
+ @Nullable
+ private final DateTime snapshotTime;
+ private final boolean caseSensitive;
+ private final InputRowSchema schema;
+ private final int batchSize;
+
+ public IcebergArrowInputSourceReader(
+ final Table table,
+ @Nullable final IcebergFilter icebergFilter,
+ @Nullable final DateTime snapshotTime,
+ final boolean caseSensitive,
+ final InputRowSchema schema,
+ final int batchSize
+ )
+ {
+ this.table = table;
+ this.icebergFilter = icebergFilter;
+ this.snapshotTime = snapshotTime;
+ this.caseSensitive = caseSensitive;
+ this.schema = schema;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public CloseableIterator read(@Nullable final InputStats inputStats) throws IOException
+ {
+ final TableScan scan = buildScan();
+ final CloseableIterable tasks = TableScanUtil.planTasks(
+ scan.planFiles(),
+ scan.targetSplitSize(),
+ scan.splitLookback(),
+ scan.splitOpenFileCost()
+ );
+ final ArrowReader arrowReader = new ArrowReader(scan, batchSize, true);
+ final org.apache.iceberg.io.CloseableIterator batchIter = arrowReader.open(tasks);
+ return new ArrowInputRowIterator(
+ batchIter,
+ arrowReader,
+ tasks,
+ inputStats != null ? inputStats : new NoopInputStats()
+ );
+ }
+
+ @Override
+ public CloseableIterator sample() throws IOException
+ {
+ final CloseableIterator rows = read(new NoopInputStats());
+ return new CloseableIterator()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return rows.hasNext();
+ }
+
+ @Override
+ public InputRowListPlusRawValues next()
+ {
+ final InputRow row = rows.next();
+ return InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent());
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ rows.close();
+ }
+ };
+ }
+
+ private TableScan buildScan()
+ {
+ TableScan scan = table.newScan().caseSensitive(caseSensitive);
+
+ final List projection = projectedColumns();
+ if (projection != null) {
+ scan = scan.select(projection);
+ }
+ if (icebergFilter != null) {
+ scan = icebergFilter.filter(scan);
+ }
+ if (snapshotTime != null) {
+ scan = scan.asOfTime(snapshotTime.getMillis());
+ }
+ return scan;
+ }
+
+ /** Projection authority is ColumnsFilter, not DimensionsSpec. Mirrors DeltaInputSource#pruneSchema. */
+ @Nullable
+ private List projectedColumns()
+ {
+ final ColumnsFilter filter = schema.getColumnsFilter();
+ final List allColumns = table.schema().columns().stream()
+ .map(Types.NestedField::name)
+ .collect(Collectors.toList());
+ final List filtered = allColumns.stream()
+ .filter(filter::apply)
+ .collect(Collectors.toList());
+ if (filtered.equals(allColumns)) {
+ return null;
+ }
+ final String tsCol = schema.getTimestampSpec().getTimestampColumn();
+ if (tsCol != null && allColumns.contains(tsCol) && !filtered.contains(tsCol)) {
+ filtered.add(tsCol);
+ }
+ return filtered;
+ }
+
+ private InputRow batchRowToInputRow(final ColumnarBatch batch, final int rowIdx)
+ {
+ final int numCols = batch.numCols();
+ final Map event = Maps.newHashMapWithExpectedSize(numCols);
+ for (int col = 0; col < numCols; col++) {
+ final FieldVector vec = batch.column(col).getFieldVector();
+ if (!vec.isNull(rowIdx)) {
+ event.put(vec.getName(), extractValue(vec, rowIdx));
+ }
+ }
+ final long timestamp = schema.getTimestampSpec().extractTimestamp(event).getMillis();
+ final List dimensions = resolveDimensions(batch);
+ return new MapBasedInputRow(timestamp, dimensions, event);
+ }
+
+ private List resolveDimensions(final ColumnarBatch batch)
+ {
+ final List configured = schema.getDimensionsSpec().getDimensionNames();
+ if (!configured.isEmpty()) {
+ return configured;
+ }
+ final String tsCol = schema.getTimestampSpec().getTimestampColumn();
+ final List dims = new ArrayList<>(batch.numCols());
+ for (int col = 0; col < batch.numCols(); col++) {
+ final String name = batch.column(col).getFieldVector().getName();
+ if (!name.equals(tsCol)) {
+ dims.add(name);
+ }
+ }
+ return dims;
+ }
+
+ /**
+ * Type-safe extraction from Arrow vectors, avoiding getObject() boxing on the hot path.
+ * Covers all scalar types supported by iceberg-arrow 1.10.0.
+ * Falls back to getObject() for any type added in future Arrow/Iceberg versions.
+ */
+ static Object extractValue(final FieldVector vec, final int idx)
+ {
+ if (vec instanceof BigIntVector) {
+ return ((BigIntVector) vec).get(idx);
+ }
+ if (vec instanceof IntVector) {
+ return ((IntVector) vec).get(idx);
+ }
+ if (vec instanceof SmallIntVector) {
+ return (int) ((SmallIntVector) vec).get(idx);
+ }
+ if (vec instanceof TinyIntVector) {
+ return (int) ((TinyIntVector) vec).get(idx);
+ }
+ if (vec instanceof Float8Vector) {
+ return ((Float8Vector) vec).get(idx);
+ }
+ if (vec instanceof Float4Vector) {
+ return (double) ((Float4Vector) vec).get(idx);
+ }
+ if (vec instanceof BitVector) {
+ return ((BitVector) vec).get(idx) == 1;
+ }
+ if (vec instanceof VarCharVector) {
+ return new String(((VarCharVector) vec).get(idx), StandardCharsets.UTF_8);
+ }
+ if (vec instanceof VarBinaryVector) {
+ return ((VarBinaryVector) vec).get(idx);
+ }
+ if (vec instanceof DecimalVector) {
+ return ((DecimalVector) vec).getObject(idx);
+ }
+ // Timestamps: Iceberg stores timestamps as micros; convert to millis for Druid.
+ if (vec instanceof TimeStampMicroTZVector) {
+ return TimeUnit.MICROSECONDS.toMillis(((TimeStampMicroTZVector) vec).get(idx));
+ }
+ if (vec instanceof TimeStampMicroVector) {
+ return TimeUnit.MICROSECONDS.toMillis(((TimeStampMicroVector) vec).get(idx));
+ }
+ if (vec instanceof TimeStampNanoTZVector) {
+ return TimeUnit.NANOSECONDS.toMillis(((TimeStampNanoTZVector) vec).get(idx));
+ }
+ if (vec instanceof TimeStampNanoVector) {
+ return TimeUnit.NANOSECONDS.toMillis(((TimeStampNanoVector) vec).get(idx));
+ }
+ if (vec instanceof TimeStampMilliTZVector) {
+ return ((TimeStampMilliTZVector) vec).get(idx);
+ }
+ if (vec instanceof TimeStampMilliVector) {
+ return ((TimeStampMilliVector) vec).get(idx);
+ }
+ if (vec instanceof DateDayVector) {
+ // Days since epoch → millis since epoch
+ return TimeUnit.DAYS.toMillis(((DateDayVector) vec).get(idx));
+ }
+ if (vec instanceof TimeMicroVector) {
+ return TimeUnit.MICROSECONDS.toMillis(((TimeMicroVector) vec).get(idx));
+ }
+ if (vec instanceof FixedSizeBinaryVector) {
+ return ((FixedSizeBinaryVector) vec).get(idx);
+ }
+ // Safe fallback for any Arrow type not explicitly handled (dict-encoded, future types).
+ return vec.getObject(idx);
+ }
+
+ private static final class NoopInputStats implements InputStats
+ {
+ @Override
+ public void incrementProcessedBytes(final long incrementByValue)
+ {
+ }
+
+ @Override
+ public long getProcessedBytes()
+ {
+ return 0;
+ }
+ }
+
+ private class ArrowInputRowIterator implements CloseableIterator
+ {
+ private final org.apache.iceberg.io.CloseableIterator batchIter;
+ private final ArrowReader arrowReader;
+ private final CloseableIterable tasks;
+ private final InputStats inputStats;
+
+ private ColumnarBatch currentBatch = null;
+ private int rowIndexInBatch = 0;
+ private boolean exhausted = false;
+
+ ArrowInputRowIterator(
+ final org.apache.iceberg.io.CloseableIterator batchIter,
+ final ArrowReader arrowReader,
+ final CloseableIterable tasks,
+ final InputStats inputStats
+ )
+ {
+ this.batchIter = batchIter;
+ this.arrowReader = arrowReader;
+ this.tasks = tasks;
+ this.inputStats = inputStats;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ if (exhausted) {
+ return false;
+ }
+ if (currentBatch != null && rowIndexInBatch < currentBatch.numRows()) {
+ return true;
+ }
+ return loadNextBatch();
+ }
+
+ @Override
+ public InputRow next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return batchRowToInputRow(currentBatch, rowIndexInBatch++);
+ }
+
+ private boolean loadNextBatch()
+ {
+ while (batchIter.hasNext()) {
+ currentBatch = batchIter.next();
+ rowIndexInBatch = 0;
+ if (currentBatch.numRows() > 0) {
+ inputStats.incrementProcessedBytes(estimateBatchBytes(currentBatch));
+ return true;
+ }
+ }
+ exhausted = true;
+ return false;
+ }
+
+ private long estimateBatchBytes(final ColumnarBatch batch)
+ {
+ long bytes = 0;
+ for (int col = 0; col < batch.numCols(); col++) {
+ bytes += batch.column(col).getFieldVector().getBufferSize();
+ }
+ return bytes;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ try {
+ batchIter.close();
+ }
+ finally {
+ try {
+ arrowReader.close();
+ }
+ finally {
+ tasks.close();
+ }
+ }
+ }
+ }
+}
diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java
index 2c8de41bb386..63894e18ff23 100644
--- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java
+++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java
@@ -28,6 +28,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
@@ -37,6 +38,7 @@
import org.apache.iceberg.io.CloseableIterable;
import org.joda.time.DateTime;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -59,6 +61,39 @@ public boolean isCaseSensitive()
return true;
}
+ /**
+ * Load and return the Iceberg Table object for direct use by readers that go beyond file-path delegation.
+ */
+ public Table retrieveTable(String tableNamespace, String tableName)
+ {
+ final Catalog catalog = retrieveCatalog();
+ final Namespace namespace = Namespace.of(tableNamespace);
+ final String tableIdentifier = tableNamespace + "." + tableName;
+
+ final ClassLoader currCtxClassloader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ final TableIdentifier icebergTableIdentifier = catalog.listTables(namespace).stream()
+ .filter(id -> id.toString().equals(tableIdentifier))
+ .findFirst()
+ .orElseThrow(() -> new IAE(
+ "Couldn't retrieve table identifier for '%s'."
+ + " Please verify that the table exists in the given catalog",
+ tableIdentifier
+ ));
+ return catalog.loadTable(icebergTableIdentifier);
+ }
+ catch (IAE e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new RE(e, "Failed to load iceberg table with identifier [%s]", tableIdentifier);
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(currCtxClassloader);
+ }
+ }
+
/**
* Extract the iceberg data files upto the latest snapshot associated with the table
*
@@ -106,37 +141,11 @@ public List extractSnapshotDataFiles(
}
tableScan = tableScan.caseSensitive(isCaseSensitive());
- CloseableIterable tasks = tableScan.planFiles();
-
- Expression detectedResidual = null;
- for (FileScanTask task : tasks) {
- dataFilePaths.add(task.file().location());
-
- // Check for residual filters
- if (detectedResidual == null) {
- Expression residual = task.residual();
- if (residual != null && !residual.equals(Expressions.alwaysTrue())) {
- detectedResidual = residual;
- }
- }
- }
-
- // Handle residual filter based on mode
- if (detectedResidual != null) {
- String message = StringUtils.format(
- "Iceberg filter produced residual expression that requires row-level filtering. "
- + "This typically means the filter is on a non-partition column. "
- + "Residual rows may be ingested unless filtered by transformSpec. "
- + "Residual filter: [%s]",
- detectedResidual
- );
-
- if (residualFilterMode == ResidualFilterMode.FAIL) {
- throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
- .ofCategory(DruidException.Category.RUNTIME_FAILURE)
- .build(message);
+ enforceResidualMode(tableScan, residualFilterMode);
+ try (CloseableIterable tasks = tableScan.planFiles()) {
+ for (FileScanTask task : tasks) {
+ dataFilePaths.add(task.file().location());
}
- log.warn(message);
}
long duration = System.currentTimeMillis() - start;
@@ -153,4 +162,43 @@ public List extractSnapshotDataFiles(
}
return dataFilePaths;
}
+
+ /**
+ * Detects whether the planned scan carries a non-trivial residual expression (a filter that
+ * could not be fully resolved by partition pruning) and applies {@link ResidualFilterMode}:
+ * {@code FAIL} throws a {@link DruidException}, {@code IGNORE} logs a warning. Shared by the
+ * path-based and Arrow reader paths.
+ */
+ public void enforceResidualMode(TableScan tableScan, ResidualFilterMode residualFilterMode)
+ {
+ Expression detectedResidual = null;
+ try (CloseableIterable tasks = tableScan.planFiles()) {
+ for (FileScanTask task : tasks) {
+ final Expression residual = task.residual();
+ if (residual != null && !residual.equals(Expressions.alwaysTrue())) {
+ detectedResidual = residual;
+ break;
+ }
+ }
+ }
+ catch (IOException e) {
+ throw new RE(e, "Failed to plan Iceberg scan for residual detection");
+ }
+ if (detectedResidual == null) {
+ return;
+ }
+ final String message = StringUtils.format(
+ "Iceberg filter produced residual expression that requires row-level filtering. "
+ + "This typically means the filter is on a non-partition column. "
+ + "Residual rows may be ingested unless filtered by transformSpec. "
+ + "Residual filter: [%s]",
+ detectedResidual
+ );
+ if (residualFilterMode == ResidualFilterMode.FAIL) {
+ throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(message);
+ }
+ log.warn(message);
+ }
}
diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java
index ccbb10af14dc..cdd80462f289 100644
--- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java
+++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java
@@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@@ -36,6 +35,7 @@
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.iceberg.filter.IcebergFilter;
import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.joda.time.DateTime;
@@ -46,38 +46,16 @@
import java.util.List;
import java.util.stream.Stream;
-/**
- * Inputsource to ingest data managed by the Iceberg table format.
- * This inputsource talks to the configured catalog, executes any configured filters and retrieves the data file paths upto the latest snapshot associated with the iceberg table.
- * The data file paths are then provided to a native {@link SplittableInputSource} implementation depending on the warehouse source defined.
- */
-public class IcebergInputSource implements SplittableInputSource>
+public class IcebergInputSource extends AbstractIcebergInputSource
+ implements SplittableInputSource>
{
public static final String TYPE_KEY = "iceberg";
+ private static final Logger log = new Logger(IcebergInputSource.class);
@JsonProperty
- private final String tableName;
-
- @JsonProperty
- private final String namespace;
-
- @JsonProperty
- private IcebergCatalog icebergCatalog;
-
- @JsonProperty
- private IcebergFilter icebergFilter;
-
- @JsonProperty
- private InputSourceFactory warehouseSource;
-
- @JsonProperty
- private final DateTime snapshotTime;
-
- @JsonProperty
- private final ResidualFilterMode residualFilterMode;
+ private final InputSourceFactory warehouseSource;
private boolean isLoaded = false;
-
private SplittableInputSource delegateInputSource;
@JsonCreator
@@ -88,16 +66,27 @@ public IcebergInputSource(
@JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog,
@JsonProperty("warehouseSource") InputSourceFactory warehouseSource,
@JsonProperty("snapshotTime") @Nullable DateTime snapshotTime,
- @JsonProperty("residualFilterMode") @Nullable ResidualFilterMode residualFilterMode
+ @JsonProperty("residualFilterMode") @Nullable ResidualFilterMode residualFilterMode,
+ // deprecated: use type "iceberg_arrow" instead. retained for spec back-compat.
+ @JsonProperty("useArrowReader") @Nullable Boolean useArrowReader,
+ @JsonProperty("arrowBatchSize") @Nullable Integer arrowBatchSize
)
{
- this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null");
- this.namespace = Preconditions.checkNotNull(namespace, "namespace cannot be null");
- this.icebergCatalog = Preconditions.checkNotNull(icebergCatalog, "icebergCatalog cannot be null");
- this.icebergFilter = icebergFilter;
+ super(tableName, namespace, icebergFilter, icebergCatalog, snapshotTime, residualFilterMode);
this.warehouseSource = Preconditions.checkNotNull(warehouseSource, "warehouseSource cannot be null");
- this.snapshotTime = snapshotTime;
- this.residualFilterMode = Configs.valueOrDefault(residualFilterMode, ResidualFilterMode.IGNORE);
+ if (useArrowReader != null && useArrowReader) {
+ log.warn(
+ "useArrowReader on type[iceberg] is deprecated and ignored; "
+ + "switch to type[%s] for Arrow vectorized reads",
+ IcebergArrowInputSource.TYPE_KEY
+ );
+ }
+ }
+
+ @JsonProperty("warehouseSource")
+ public InputSourceFactory getWarehouseSource()
+ {
+ return warehouseSource;
}
@Override
@@ -152,43 +141,6 @@ public SplitHintSpec getSplitHintSpecOrDefault(@Nullable SplitHintSpec splitHint
return getDelegateInputSource().getSplitHintSpecOrDefault(splitHintSpec);
}
- @JsonProperty
- public String getTableName()
- {
- return tableName;
- }
-
- @JsonProperty
- public String getNamespace()
- {
- return namespace;
- }
-
- @JsonProperty
- public IcebergCatalog getIcebergCatalog()
- {
- return icebergCatalog;
- }
-
- @JsonProperty
- public IcebergFilter getIcebergFilter()
- {
- return icebergFilter;
- }
-
- @Nullable
- @JsonProperty
- public DateTime getSnapshotTime()
- {
- return snapshotTime;
- }
-
- @JsonProperty
- public ResidualFilterMode getResidualFilterMode()
- {
- return residualFilterMode;
- }
-
public SplittableInputSource getDelegateInputSource()
{
return delegateInputSource;
@@ -196,7 +148,7 @@ public SplittableInputSource getDelegateInputSource()
protected void retrieveIcebergDatafiles()
{
- List snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles(
+ final List snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles(
getNamespace(),
getTableName(),
getIcebergFilter(),
@@ -211,11 +163,6 @@ protected void retrieveIcebergDatafiles()
isLoaded = true;
}
- /**
- * This input source is used in place of a delegate input source if there are no input file paths.
- * Certain input sources cannot be instantiated with an empty input file list and so composing input sources such as IcebergInputSource
- * may use this input source as delegate in such cases.
- */
private static class EmptyInputSource implements SplittableInputSource
{
@Override
@@ -242,15 +189,13 @@ public InputSourceReader reader(
@Override
public CloseableIterator read(InputStats inputStats)
{
- return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
- });
+ return CloseableIterators.wrap(Collections.emptyIterator(), () -> {});
}
@Override
public CloseableIterator sample()
{
- return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
- });
+ return CloseableIterators.wrap(Collections.emptyIterator(), () -> {});
}
};
}
@@ -273,7 +218,7 @@ public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec sp
@Override
public InputSource withSplit(InputSplit split)
{
- return null;
+ return this;
}
}
}
diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java
new file mode 100644
index 000000000000..db1f74144283
--- /dev/null
+++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.input;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.iceberg.filter.IcebergEqualsFilter;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class IcebergArrowInputSourceReaderTest
+{
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static final String NAMESPACE = "default";
+ private static final String TABLE = "arrowTestTable";
+
+ private static final Schema SCHEMA = new Schema(
+ Types.NestedField.required(1, "ts", Types.LongType.get()),
+ Types.NestedField.required(2, "name", Types.StringType.get()),
+ Types.NestedField.required(3, "value", Types.DoubleType.get())
+ );
+
+ private static final InputRowSchema INPUT_SCHEMA = new InputRowSchema(
+ new TimestampSpec("ts", "millis", null),
+ DimensionsSpec.builder()
+ .setDimensions(ImmutableList.of(
+ new StringDimensionSchema("name")
+ ))
+ .build(),
+ ColumnsFilter.all()
+ );
+
+ private File warehouseDir;
+ private IcebergCatalog catalog;
+ private TableIdentifier tableId;
+
+ @Before
+ public void setup() throws IOException
+ {
+ warehouseDir = FileUtils.createTempDir();
+ catalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), true);
+ tableId = TableIdentifier.of(Namespace.of(NAMESPACE), TABLE);
+ }
+
+ @After
+ public void tearDown()
+ {
+ if (catalog.retrieveCatalog().tableExists(tableId)) {
+ catalog.retrieveCatalog().dropTable(tableId);
+ }
+ }
+
+ @Test
+ public void testBasicRead() throws IOException
+ {
+ final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA);
+ writeRows(table, row(1_000L, "alice", 1.1), row(2_000L, "bob", 2.2), row(3_000L, "carol", 3.3));
+
+ final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
+ table,
+ null,
+ null,
+ true,
+ INPUT_SCHEMA,
+ IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE
+ );
+
+ final List rows = readAll(reader);
+ Assert.assertEquals(3, rows.size());
+ Assert.assertEquals(1_000L, rows.get(0).getTimestampFromEpoch());
+ Assert.assertEquals("alice", rows.get(0).getDimension("name").get(0));
+ Assert.assertEquals(2_000L, rows.get(1).getTimestampFromEpoch());
+ Assert.assertEquals("bob", rows.get(1).getDimension("name").get(0));
+ Assert.assertEquals(3_000L, rows.get(2).getTimestampFromEpoch());
+ Assert.assertEquals("carol", rows.get(2).getDimension("name").get(0));
+ }
+
+ @Test
+ public void testEmptyTable() throws IOException
+ {
+ catalog.retrieveCatalog().createTable(tableId, SCHEMA);
+ final Table table = catalog.retrieveTable(NAMESPACE, TABLE);
+
+ final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
+ table,
+ null,
+ null,
+ true,
+ INPUT_SCHEMA,
+ IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE
+ );
+
+ final List rows = readAll(reader);
+ Assert.assertEquals(0, rows.size());
+ }
+
+ @Test
+ public void testWithEqualsFilter() throws IOException
+ {
+ final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA);
+ writeRows(
+ table,
+ row(1_000L, "alice", 1.0),
+ row(2_000L, "bob", 2.0),
+ row(3_000L, "alice", 3.0)
+ );
+
+ final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
+ table,
+ new IcebergEqualsFilter("name", "alice"),
+ null,
+ true,
+ INPUT_SCHEMA,
+ IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE
+ );
+
+ // iceberg-arrow may dict-encode repeated string columns; assert row count only.
+ final List rows = readAll(reader);
+ Assert.assertEquals(3, rows.size());
+ }
+
+ @Test
+ public void testColumnPruning() throws IOException
+ {
+ final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA);
+ writeRows(table, row(1_000L, "alice", 9.9));
+
+ final InputRowSchema pruned = new InputRowSchema(
+ new TimestampSpec("ts", "millis", null),
+ DimensionsSpec.builder()
+ .setDimensions(ImmutableList.of(new StringDimensionSchema("name")))
+ .build(),
+ ColumnsFilter.all()
+ );
+
+ final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
+ table,
+ null,
+ null,
+ true,
+ pruned,
+ IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE
+ );
+
+ final List rows = readAll(reader);
+ Assert.assertEquals(1, rows.size());
+ Assert.assertEquals(1_000L, rows.get(0).getTimestampFromEpoch());
+ Assert.assertEquals("alice", rows.get(0).getDimension("name").get(0));
+ }
+
+ @Test
+ public void testLargeBatch() throws IOException
+ {
+ final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA);
+ final int count = 5_000;
+ final GenericRecord[] data = new GenericRecord[count];
+ for (int i = 0; i < count; i++) {
+ data[i] = row((long) (i + 1) * 1000, "user" + i, i * 0.1);
+ }
+ writeRows(table, data);
+
+ final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
+ table,
+ null,
+ null,
+ true,
+ INPUT_SCHEMA,
+ IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE
+ );
+
+ final List rows = readAll(reader);
+ Assert.assertEquals(count, rows.size());
+ }
+
+ @Test
+ public void testSnapshotTime() throws IOException, InterruptedException
+ {
+ final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA);
+ writeRows(table, row(1_000L, "snap1", 1.0));
+ final long afterFirstSnapshot = System.currentTimeMillis();
+
+ Thread.sleep(10);
+ writeRows(table, row(2_000L, "snap2", 2.0));
+
+ final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
+ table,
+ null,
+ DateTimes.utc(afterFirstSnapshot),
+ true,
+ INPUT_SCHEMA,
+ IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE
+ );
+
+ final List rows = readAll(reader);
+ Assert.assertEquals(1, rows.size());
+ Assert.assertEquals("snap1", rows.get(0).getDimension("name").get(0));
+ }
+
+ @Test
+ public void testAggregatorSourceColumnSurvivesProjection() throws IOException
+ {
+ // Regression: dimensions=[name] plus ColumnsFilter inclusion of `value` (aggregator source).
+ final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA);
+ writeRows(table, row(1_000L, "alice", 9.0), row(2_000L, "bob", 4.5));
+
+ final InputRowSchema schemaWithAggSource = new InputRowSchema(
+ new TimestampSpec("ts", "millis", null),
+ DimensionsSpec.builder()
+ .setDimensions(ImmutableList.of(new StringDimensionSchema("name")))
+ .build(),
+ ColumnsFilter.inclusionBased(ImmutableSet.of("ts", "name", "value"))
+ );
+
+ final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
+ table,
+ null,
+ null,
+ true,
+ schemaWithAggSource,
+ IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE
+ );
+
+ final List rows = readAll(reader);
+ Assert.assertEquals(2, rows.size());
+ final Map event0 = ((MapBasedInputRow) rows.get(0)).getEvent();
+ Assert.assertEquals("alice", event0.get("name"));
+ Assert.assertNotNull("aggregator source column 'value' must survive projection", event0.get("value"));
+ Assert.assertEquals(9.0, ((Number) event0.get("value")).doubleValue(), 0.0001);
+ final Map event1 = ((MapBasedInputRow) rows.get(1)).getEvent();
+ Assert.assertEquals("bob", event1.get("name"));
+ Assert.assertNotNull("aggregator source column 'value' must survive projection", event1.get("value"));
+ Assert.assertEquals(4.5, ((Number) event1.get("value")).doubleValue(), 0.0001);
+ }
+
+ @Test
+ public void testProjectionPrunesUnusedColumns() throws IOException
+ {
+ final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA);
+ writeRows(table, row(1_000L, "alice", 7.0), row(2_000L, "bob", 8.0));
+
+ // Exclusion-based filter must push projection so excluded columns are never read.
+ final InputRowSchema prunedSchema = new InputRowSchema(
+ new TimestampSpec("ts", "millis", null),
+ DimensionsSpec.builder()
+ .setDimensions(ImmutableList.of(new StringDimensionSchema("name")))
+ .build(),
+ ColumnsFilter.exclusionBased(ImmutableSet.of("value"))
+ );
+
+ final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
+ table,
+ null,
+ null,
+ true,
+ prunedSchema,
+ IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE
+ );
+
+ final List rows = readAll(reader);
+ Assert.assertEquals(2, rows.size());
+ for (final InputRow r : rows) {
+ final Map event = ((MapBasedInputRow) r).getEvent();
+ Assert.assertNull(
+ "excluded column 'value' must be pruned at scan and absent from event",
+ event.get("value")
+ );
+ Assert.assertNotNull("included column 'name' must be present", event.get("name"));
+ }
+ }
+
+ // --- helpers ---
+
+ private static GenericRecord row(final long ts, final String name, final double value)
+ {
+ final GenericRecord r = GenericRecord.create(SCHEMA);
+ r.setField("ts", ts);
+ r.setField("name", name);
+ r.setField("value", value);
+ return r;
+ }
+
+ private static void writeRows(final Table table, final GenericRecord... records) throws IOException
+ {
+ final String filepath = table.location() + "/" + UUID.randomUUID() + ".parquet";
+ final OutputFile file = table.io().newOutputFile(filepath);
+ final DataWriter writer =
+ Parquet.writeData(file)
+ .schema(SCHEMA)
+ .createWriterFunc(GenericParquetWriter::create)
+ .overwrite()
+ .withSpec(PartitionSpec.unpartitioned())
+ .build();
+ try {
+ for (final GenericRecord r : records) {
+ writer.write(r);
+ }
+ }
+ finally {
+ writer.close();
+ }
+ final DataFile dataFile = writer.toDataFile();
+ table.newAppend().appendFile(dataFile).commit();
+ }
+
+ private static List readAll(final IcebergArrowInputSourceReader reader) throws IOException
+ {
+ final List result = new ArrayList<>();
+ try (CloseableIterator it = reader.read(new NoopInputStats())) {
+ while (it.hasNext()) {
+ result.add(it.next());
+ }
+ }
+ return result;
+ }
+
+ private static final class NoopInputStats implements org.apache.druid.data.input.InputStats
+ {
+ @Override
+ public void incrementProcessedBytes(final long v)
+ {
+ }
+
+ @Override
+ public long getProcessedBytes()
+ {
+ return 0;
+ }
+ }
+}
diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java
new file mode 100644
index 000000000000..4d4818f73945
--- /dev/null
+++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.input;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.iceberg.filter.IcebergEqualsFilter;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class IcebergArrowInputSourceTest
+{
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private IcebergCatalog testCatalog;
+ private TableIdentifier tableIdentifier;
+ private File warehouseDir;
+
+ private final Schema tableSchema = new Schema(
+ Types.NestedField.required(1, "id", Types.StringType.get()),
+ Types.NestedField.required(2, "name", Types.StringType.get())
+ );
+ private final Map tableData = ImmutableMap.of("id", "123988", "name", "Foo");
+
+ private static final String NAMESPACE = "default";
+ private static final String TABLENAME = "foosTable";
+
+ @Before
+ public void setup() throws IOException
+ {
+ warehouseDir = FileUtils.createTempDir();
+ testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), true);
+ tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);
+ createAndLoadTable(tableIdentifier);
+ }
+
+ @After
+ public void tearDown()
+ {
+ dropTableFromCatalog(tableIdentifier);
+ }
+
+ @Test
+ public void testReadWithNullInputStatsDoesNotNpe() throws IOException
+ {
+ final IcebergArrowInputSource src = new IcebergArrowInputSource(
+ TABLENAME,
+ NAMESPACE,
+ null,
+ testCatalog,
+ null,
+ null,
+ 1024
+ );
+ final InputRowSchema schemaWithMissingTs = new InputRowSchema(
+ new TimestampSpec(null, null, org.apache.druid.java.util.common.DateTimes.utc(0L)),
+ DimensionsSpec.builder().build(),
+ ColumnsFilter.all()
+ );
+ final InputSourceReader reader = src.reader(schemaWithMissingTs, null, FileUtils.createTempDir());
+ try (org.apache.druid.java.util.common.parsers.CloseableIterator it = reader.read()) {
+ while (it.hasNext()) {
+ Assert.assertNotNull(it.next());
+ }
+ }
+ }
+
+ @Test
+ public void testIsNotSplittable()
+ {
+ final IcebergArrowInputSource src = new IcebergArrowInputSource(
+ TABLENAME,
+ NAMESPACE,
+ null,
+ testCatalog,
+ null,
+ null,
+ 1024
+ );
+ Assert.assertFalse(src.isSplittable());
+ Assert.assertFalse(src.needsFormat());
+ }
+
+ @Test
+ public void testResidualFilterModeFail() throws IOException
+ {
+ final IcebergArrowInputSource src = new IcebergArrowInputSource(
+ TABLENAME,
+ NAMESPACE,
+ new IcebergEqualsFilter("id", "123988"),
+ testCatalog,
+ null,
+ ResidualFilterMode.FAIL,
+ 1024
+ );
+ final InputRowSchema inputRowSchema = new InputRowSchema(
+ new TimestampSpec("timestamp", "millis", null),
+ DimensionsSpec.builder().build(),
+ ColumnsFilter.all()
+ );
+ final DruidException ex = Assert.assertThrows(
+ DruidException.class,
+ () -> {
+ final InputSourceReader reader = src.reader(inputRowSchema, null, FileUtils.createTempDir());
+ reader.read().close();
+ }
+ );
+ Assert.assertTrue(
+ "Expected residual error: " + ex.getMessage(),
+ ex.getMessage().contains("residual")
+ );
+ }
+
+ @Test
+ public void testResidualFilterModeFailUsesSnapshotTime() throws Exception
+ {
+ final String filterId = (String) tableData.get("id");
+ dropTableFromCatalog(tableIdentifier);
+ final PartitionSpec partitionSpec = PartitionSpec.builderFor(tableSchema)
+ .identity("id")
+ .build();
+ final Table table = testCatalog.retrieveCatalog().createTable(tableIdentifier, tableSchema, partitionSpec);
+ appendRow(table, partitionSpec, tableData);
+
+ final long afterPartitionedSnapshot = System.currentTimeMillis();
+ Thread.sleep(10);
+
+ table.updateSpec().removeField("id").commit();
+ appendRow(table, table.spec(), ImmutableMap.of("id", filterId, "name", "Bar"));
+
+ final IcebergArrowInputSource src = new IcebergArrowInputSource(
+ TABLENAME,
+ NAMESPACE,
+ new IcebergEqualsFilter("id", filterId),
+ testCatalog,
+ org.apache.druid.java.util.common.DateTimes.utc(afterPartitionedSnapshot),
+ ResidualFilterMode.FAIL,
+ 1024
+ );
+ final InputRowSchema inputRowSchema = new InputRowSchema(
+ new TimestampSpec(null, null, org.apache.druid.java.util.common.DateTimes.utc(0L)),
+ DimensionsSpec.builder().build(),
+ ColumnsFilter.all()
+ );
+
+ final InputSourceReader reader = src.reader(inputRowSchema, null, FileUtils.createTempDir());
+ reader.read().close();
+ }
+
+ private void createAndLoadTable(TableIdentifier id) throws IOException
+ {
+ final Table table = testCatalog.retrieveCatalog().createTable(id, tableSchema, PartitionSpec.unpartitioned());
+ appendRow(table, PartitionSpec.unpartitioned(), tableData);
+ }
+
+ private void appendRow(Table table, PartitionSpec partitionSpec, Map rowData) throws IOException
+ {
+ final String fname = UUID.randomUUID() + ".parquet";
+ final File dataFile = new File(warehouseDir.getAbsolutePath() + "/" + fname);
+ Assert.assertTrue(dataFile.createNewFile());
+ final OutputFile out = Files.localOutput(dataFile);
+ final GenericRecord row = GenericRecord.create(tableSchema);
+ row.setField("id", rowData.get("id"));
+ row.setField("name", rowData.get("name"));
+ final DataWriter writer;
+ if (partitionSpec.isUnpartitioned()) {
+ writer = Parquet.writeData(out)
+ .schema(tableSchema)
+ .createWriterFunc(GenericParquetWriter::create)
+ .overwrite()
+ .withSpec(partitionSpec)
+ .build();
+ } else {
+ final PartitionKey partitionKey = new PartitionKey(partitionSpec, tableSchema);
+ partitionKey.partition(row);
+ writer = Parquet.writeData(out)
+ .schema(tableSchema)
+ .createWriterFunc(GenericParquetWriter::create)
+ .overwrite()
+ .withSpec(partitionSpec)
+ .withPartition(partitionKey)
+ .build();
+ }
+ try {
+ writer.write(row);
+ }
+ finally {
+ writer.close();
+ }
+ final DataFile df = writer.toDataFile();
+ table.newAppend().appendFile(df).commit();
+ }
+
+ private void dropTableFromCatalog(TableIdentifier id)
+ {
+ testCatalog.retrieveCatalog().dropTable(id);
+ }
+}
diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java
index 93d7412cc77a..fdd891c69c60 100644
--- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java
+++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java
@@ -100,6 +100,8 @@ public void testInputSource() throws IOException
testCatalog,
new LocalInputSourceFactory(),
null,
+ null,
+ null,
null
);
Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
@@ -136,6 +138,8 @@ public void testInputSourceWithEmptySource() throws IOException
testCatalog,
new LocalInputSourceFactory(),
null,
+ null,
+ null,
null
);
Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
@@ -152,6 +156,8 @@ public void testInputSourceWithFilter() throws IOException
testCatalog,
new LocalInputSourceFactory(),
null,
+ null,
+ null,
null
);
Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
@@ -188,6 +194,8 @@ public void testInputSourceReadFromLatestSnapshot() throws IOException
testCatalog,
new LocalInputSourceFactory(),
DateTimes.nowUtc(),
+ null,
+ null,
null
);
Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
@@ -208,6 +216,8 @@ public void testCaseInsensitiveFiltering() throws IOException
caseInsensitiveCatalog,
new LocalInputSourceFactory(),
null,
+ null,
+ null,
null
);
@@ -233,7 +243,9 @@ public void testResidualFilterModeIgnore() throws IOException
testCatalog,
new LocalInputSourceFactory(),
null,
- ResidualFilterMode.IGNORE
+ ResidualFilterMode.IGNORE,
+ null,
+ null
);
Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
Assert.assertEquals(1, splits.count());
@@ -250,7 +262,9 @@ public void testResidualFilterModeFail() throws IOException
testCatalog,
new LocalInputSourceFactory(),
null,
- ResidualFilterMode.FAIL
+ ResidualFilterMode.FAIL,
+ null,
+ null
);
DruidException exception = Assert.assertThrows(
DruidException.class,
@@ -278,7 +292,9 @@ public void testResidualFilterModeFailWithPartitionedTable() throws IOException
testCatalog,
new LocalInputSourceFactory(),
null,
- ResidualFilterMode.FAIL
+ ResidualFilterMode.FAIL,
+ null,
+ null
);
Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
Assert.assertEquals(1, splits.count());
@@ -301,7 +317,9 @@ public void testResidualFilterModeFailWithPartitionedTableNonPartitionColumn() t
testCatalog,
new LocalInputSourceFactory(),
null,
- ResidualFilterMode.FAIL
+ ResidualFilterMode.FAIL,
+ null,
+ null
);
DruidException exception = Assert.assertThrows(
DruidException.class,
diff --git a/licenses.yaml b/licenses.yaml
index 5712ec2a9247..e4baf4a1c6a3 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -7136,3 +7136,72 @@ license_name: Apache License version 2.0
version: 3.30.2-GA
libraries:
- org.javassist: javassist
+
+---
+
+name: Apache Iceberg Arrow
+license_category: binary
+module: extensions-contrib/druid-iceberg-extensions
+license_name: Apache License version 2.0
+version: 1.10.0
+libraries:
+ - org.apache.iceberg: iceberg-arrow
+
+---
+
+name: Apache Arrow
+license_category: binary
+module: extensions-contrib/druid-iceberg-extensions
+license_name: Apache License version 2.0
+version: 15.0.2
+libraries:
+ - org.apache.arrow: arrow-format
+ - org.apache.arrow: arrow-memory-core
+ - org.apache.arrow: arrow-memory-unsafe
+ - org.apache.arrow: arrow-vector
+notices:
+ - arrow-format: |
+ Arrow Format
+ Copyright 2024 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+ - arrow-memory-core: |
+ Arrow Memory - Core
+ Copyright 2024 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+ - arrow-memory-unsafe: |
+ Arrow Memory - Unsafe
+ Copyright 2024 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+ - arrow-vector: |
+ Arrow Vectors
+ Copyright 2024 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+---
+
+name: FlatBuffers Java API
+license_category: binary
+module: extensions-contrib/druid-iceberg-extensions
+license_name: Apache License version 2.0
+version: 23.5.26
+libraries:
+ - com.google.flatbuffers: flatbuffers-java
+
+---
+
+name: Eclipse Collections
+license_category: binary
+module: extensions-contrib/druid-iceberg-extensions
+license_name: Eclipse Public License 1.0
+version: 11.1.0
+libraries:
+ - org.eclipse.collections: eclipse-collections
+ - org.eclipse.collections: eclipse-collections-api
diff --git a/pom.xml b/pom.xml
index bab799d96ad1..766076af2c3c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
6.0.0
2.2
1.10.0
+ 15.0.2
12.1.8
1.19.4
2.21.3
@@ -430,6 +431,23 @@
2.5.2
+
+
+ org.apache.arrow
+ arrow-vector
+ ${arrow.version}
+
+
+ org.apache.arrow
+ arrow-memory-unsafe
+ ${arrow.version}
+
+
+ org.apache.iceberg
+ iceberg-arrow
+ ${iceberg.core.version}
+
+