diff --git a/extensions-contrib/druid-iceberg-extensions/pom.xml b/extensions-contrib/druid-iceberg-extensions/pom.xml index a50b2d701acc..5e0feba1b3b4 100644 --- a/extensions-contrib/druid-iceberg-extensions/pom.xml +++ b/extensions-contrib/druid-iceberg-extensions/pom.xml @@ -760,6 +760,29 @@ provided + + org.apache.iceberg + iceberg-arrow + ${iceberg.core.version} + + + org.apache.arrow + arrow-memory-netty + + + + + org.apache.arrow + arrow-vector + ${arrow.version} + + + org.apache.arrow + arrow-memory-unsafe + ${arrow.version} + runtime + + org.apache.iceberg iceberg-parquet diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java index d238cccc248c..82c293466460 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java @@ -27,6 +27,7 @@ import org.apache.druid.iceberg.guice.HiveConf; import org.apache.druid.iceberg.input.GlueIcebergCatalog; import org.apache.druid.iceberg.input.HiveIcebergCatalog; +import org.apache.druid.iceberg.input.IcebergArrowInputSource; import org.apache.druid.iceberg.input.IcebergInputSource; import org.apache.druid.iceberg.input.LocalCatalog; import org.apache.druid.iceberg.input.RestIcebergCatalog; @@ -49,6 +50,7 @@ public List getJacksonModules() new NamedType(LocalCatalog.class, LocalCatalog.TYPE_KEY), new NamedType(RestIcebergCatalog.class, RestIcebergCatalog.TYPE_KEY), new NamedType(IcebergInputSource.class, IcebergInputSource.TYPE_KEY), + new NamedType(IcebergArrowInputSource.class, IcebergArrowInputSource.TYPE_KEY), new NamedType(GlueIcebergCatalog.class, GlueIcebergCatalog.TYPE_KEY) ) ); diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/AbstractIcebergInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/AbstractIcebergInputSource.java new file mode 100644 index 000000000000..ba5e1b1d45d2 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/AbstractIcebergInputSource.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.common.config.Configs; +import org.apache.druid.iceberg.filter.IcebergFilter; +import org.apache.iceberg.Table; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; + +public abstract class AbstractIcebergInputSource +{ + protected final String tableName; + protected final String namespace; + protected final IcebergCatalog icebergCatalog; + protected final IcebergFilter icebergFilter; + protected final DateTime snapshotTime; + protected final ResidualFilterMode residualFilterMode; + + protected AbstractIcebergInputSource( + final String tableName, + final String namespace, + @Nullable final IcebergFilter icebergFilter, + final IcebergCatalog icebergCatalog, + @Nullable final DateTime snapshotTime, + @Nullable final ResidualFilterMode residualFilterMode + ) + { + this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null"); + this.namespace = Preconditions.checkNotNull(namespace, "namespace cannot be null"); + this.icebergCatalog = Preconditions.checkNotNull(icebergCatalog, "icebergCatalog cannot be null"); + this.icebergFilter = icebergFilter; + this.snapshotTime = snapshotTime; + this.residualFilterMode = Configs.valueOrDefault(residualFilterMode, ResidualFilterMode.IGNORE); + } + + @JsonProperty + public String getTableName() + { + return tableName; + } + + @JsonProperty + public String getNamespace() + { + return namespace; + } + + @JsonProperty + public IcebergCatalog getIcebergCatalog() + { + return icebergCatalog; + } + + @JsonProperty + public IcebergFilter getIcebergFilter() + { + return icebergFilter; + } + + @Nullable + @JsonProperty + public DateTime getSnapshotTime() + { + return snapshotTime; + } + + @JsonProperty + public ResidualFilterMode getResidualFilterMode() + { + return residualFilterMode; + } + + protected Table retrieveTable() + { + return icebergCatalog.retrieveTable(namespace, tableName); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSource.java new file mode 100644 index 000000000000..6433b6fe1ab1 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSource.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.iceberg.filter.IcebergFilter; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.io.File; + +public class IcebergArrowInputSource extends AbstractIcebergInputSource implements InputSource +{ + public static final String TYPE_KEY = "iceberg_arrow"; + + @JsonProperty + private final int arrowBatchSize; + + @JsonCreator + public IcebergArrowInputSource( + @JsonProperty("tableName") String tableName, + @JsonProperty("namespace") String namespace, + @JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter, + @JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog, + @JsonProperty("snapshotTime") @Nullable DateTime snapshotTime, + @JsonProperty("residualFilterMode") @Nullable ResidualFilterMode residualFilterMode, + @JsonProperty("arrowBatchSize") @Nullable Integer arrowBatchSize + ) + { + super(tableName, namespace, icebergFilter, icebergCatalog, snapshotTime, residualFilterMode); + this.arrowBatchSize = arrowBatchSize != null && arrowBatchSize > 0 + ? arrowBatchSize + : IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE; + } + + @JsonProperty + public int getArrowBatchSize() + { + return arrowBatchSize; + } + + @Override + public boolean needsFormat() + { + return false; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputSourceReader reader( + InputRowSchema inputRowSchema, + @Nullable InputFormat inputFormat, + File temporaryDirectory + ) + { + final Table table = retrieveTable(); + if (icebergFilter != null) { + TableScan filteredScan = icebergFilter.filter( + table.newScan().caseSensitive(icebergCatalog.isCaseSensitive()) + ); + if (getSnapshotTime() != null) { + filteredScan = filteredScan.asOfTime(getSnapshotTime().getMillis()); + } + icebergCatalog.enforceResidualMode(filteredScan, getResidualFilterMode()); + } + return new IcebergArrowInputSourceReader( + table, + icebergFilter, + snapshotTime, + icebergCatalog.isCaseSensitive(), + inputRowSchema, + arrowBatchSize + ); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java new file mode 100644 index 000000000000..80f2156841fb --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.google.common.collect.Maps; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliTZVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampNanoTZVector; +import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.iceberg.filter.IcebergFilter; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.arrow.vectorized.ArrowReader; +import org.apache.iceberg.arrow.vectorized.ColumnarBatch; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.TableScanUtil; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Reads an Iceberg table via iceberg-arrow's {@link ArrowReader}, yielding {@link InputRow} objects. + * + * Delete application (V2 equality and positional deletes), type coercion, and schema evolution are + * handled entirely by the Iceberg library. Druid only consumes the resulting {@link ColumnarBatch} + * batches and maps them to {@link MapBasedInputRow}. + * + * Column projection and predicate push-down are applied at scan planning time so only requested + * columns and matching files are read from storage. + * + * Note: iceberg-arrow currently supports Parquet data files only. ORC and Avro files will throw + * {@link UnsupportedOperationException} at read time; use the standard delegate path for those. + */ +public class IcebergArrowInputSourceReader implements InputSourceReader +{ + // Pin Arrow to Unsafe allocator: Netty backend fails on JDK 25 (EmptyByteBuf.memoryAddress UnsupportedOperationException). + static { + if (System.getProperty("arrow.allocation.manager.type") == null) { + System.setProperty("arrow.allocation.manager.type", "Unsafe"); + } + } + + static final int DEFAULT_BATCH_SIZE = 1024; + + private final Table table; + @Nullable + private final IcebergFilter icebergFilter; + @Nullable + private final DateTime snapshotTime; + private final boolean caseSensitive; + private final InputRowSchema schema; + private final int batchSize; + + public IcebergArrowInputSourceReader( + final Table table, + @Nullable final IcebergFilter icebergFilter, + @Nullable final DateTime snapshotTime, + final boolean caseSensitive, + final InputRowSchema schema, + final int batchSize + ) + { + this.table = table; + this.icebergFilter = icebergFilter; + this.snapshotTime = snapshotTime; + this.caseSensitive = caseSensitive; + this.schema = schema; + this.batchSize = batchSize; + } + + @Override + public CloseableIterator read(@Nullable final InputStats inputStats) throws IOException + { + final TableScan scan = buildScan(); + final CloseableIterable tasks = TableScanUtil.planTasks( + scan.planFiles(), + scan.targetSplitSize(), + scan.splitLookback(), + scan.splitOpenFileCost() + ); + final ArrowReader arrowReader = new ArrowReader(scan, batchSize, true); + final org.apache.iceberg.io.CloseableIterator batchIter = arrowReader.open(tasks); + return new ArrowInputRowIterator( + batchIter, + arrowReader, + tasks, + inputStats != null ? inputStats : new NoopInputStats() + ); + } + + @Override + public CloseableIterator sample() throws IOException + { + final CloseableIterator rows = read(new NoopInputStats()); + return new CloseableIterator() + { + @Override + public boolean hasNext() + { + return rows.hasNext(); + } + + @Override + public InputRowListPlusRawValues next() + { + final InputRow row = rows.next(); + return InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()); + } + + @Override + public void close() throws IOException + { + rows.close(); + } + }; + } + + private TableScan buildScan() + { + TableScan scan = table.newScan().caseSensitive(caseSensitive); + + final List projection = projectedColumns(); + if (projection != null) { + scan = scan.select(projection); + } + if (icebergFilter != null) { + scan = icebergFilter.filter(scan); + } + if (snapshotTime != null) { + scan = scan.asOfTime(snapshotTime.getMillis()); + } + return scan; + } + + /** Projection authority is ColumnsFilter, not DimensionsSpec. Mirrors DeltaInputSource#pruneSchema. */ + @Nullable + private List projectedColumns() + { + final ColumnsFilter filter = schema.getColumnsFilter(); + final List allColumns = table.schema().columns().stream() + .map(Types.NestedField::name) + .collect(Collectors.toList()); + final List filtered = allColumns.stream() + .filter(filter::apply) + .collect(Collectors.toList()); + if (filtered.equals(allColumns)) { + return null; + } + final String tsCol = schema.getTimestampSpec().getTimestampColumn(); + if (tsCol != null && allColumns.contains(tsCol) && !filtered.contains(tsCol)) { + filtered.add(tsCol); + } + return filtered; + } + + private InputRow batchRowToInputRow(final ColumnarBatch batch, final int rowIdx) + { + final int numCols = batch.numCols(); + final Map event = Maps.newHashMapWithExpectedSize(numCols); + for (int col = 0; col < numCols; col++) { + final FieldVector vec = batch.column(col).getFieldVector(); + if (!vec.isNull(rowIdx)) { + event.put(vec.getName(), extractValue(vec, rowIdx)); + } + } + final long timestamp = schema.getTimestampSpec().extractTimestamp(event).getMillis(); + final List dimensions = resolveDimensions(batch); + return new MapBasedInputRow(timestamp, dimensions, event); + } + + private List resolveDimensions(final ColumnarBatch batch) + { + final List configured = schema.getDimensionsSpec().getDimensionNames(); + if (!configured.isEmpty()) { + return configured; + } + final String tsCol = schema.getTimestampSpec().getTimestampColumn(); + final List dims = new ArrayList<>(batch.numCols()); + for (int col = 0; col < batch.numCols(); col++) { + final String name = batch.column(col).getFieldVector().getName(); + if (!name.equals(tsCol)) { + dims.add(name); + } + } + return dims; + } + + /** + * Type-safe extraction from Arrow vectors, avoiding getObject() boxing on the hot path. + * Covers all scalar types supported by iceberg-arrow 1.10.0. + * Falls back to getObject() for any type added in future Arrow/Iceberg versions. + */ + static Object extractValue(final FieldVector vec, final int idx) + { + if (vec instanceof BigIntVector) { + return ((BigIntVector) vec).get(idx); + } + if (vec instanceof IntVector) { + return ((IntVector) vec).get(idx); + } + if (vec instanceof SmallIntVector) { + return (int) ((SmallIntVector) vec).get(idx); + } + if (vec instanceof TinyIntVector) { + return (int) ((TinyIntVector) vec).get(idx); + } + if (vec instanceof Float8Vector) { + return ((Float8Vector) vec).get(idx); + } + if (vec instanceof Float4Vector) { + return (double) ((Float4Vector) vec).get(idx); + } + if (vec instanceof BitVector) { + return ((BitVector) vec).get(idx) == 1; + } + if (vec instanceof VarCharVector) { + return new String(((VarCharVector) vec).get(idx), StandardCharsets.UTF_8); + } + if (vec instanceof VarBinaryVector) { + return ((VarBinaryVector) vec).get(idx); + } + if (vec instanceof DecimalVector) { + return ((DecimalVector) vec).getObject(idx); + } + // Timestamps: Iceberg stores timestamps as micros; convert to millis for Druid. + if (vec instanceof TimeStampMicroTZVector) { + return TimeUnit.MICROSECONDS.toMillis(((TimeStampMicroTZVector) vec).get(idx)); + } + if (vec instanceof TimeStampMicroVector) { + return TimeUnit.MICROSECONDS.toMillis(((TimeStampMicroVector) vec).get(idx)); + } + if (vec instanceof TimeStampNanoTZVector) { + return TimeUnit.NANOSECONDS.toMillis(((TimeStampNanoTZVector) vec).get(idx)); + } + if (vec instanceof TimeStampNanoVector) { + return TimeUnit.NANOSECONDS.toMillis(((TimeStampNanoVector) vec).get(idx)); + } + if (vec instanceof TimeStampMilliTZVector) { + return ((TimeStampMilliTZVector) vec).get(idx); + } + if (vec instanceof TimeStampMilliVector) { + return ((TimeStampMilliVector) vec).get(idx); + } + if (vec instanceof DateDayVector) { + // Days since epoch → millis since epoch + return TimeUnit.DAYS.toMillis(((DateDayVector) vec).get(idx)); + } + if (vec instanceof TimeMicroVector) { + return TimeUnit.MICROSECONDS.toMillis(((TimeMicroVector) vec).get(idx)); + } + if (vec instanceof FixedSizeBinaryVector) { + return ((FixedSizeBinaryVector) vec).get(idx); + } + // Safe fallback for any Arrow type not explicitly handled (dict-encoded, future types). + return vec.getObject(idx); + } + + private static final class NoopInputStats implements InputStats + { + @Override + public void incrementProcessedBytes(final long incrementByValue) + { + } + + @Override + public long getProcessedBytes() + { + return 0; + } + } + + private class ArrowInputRowIterator implements CloseableIterator + { + private final org.apache.iceberg.io.CloseableIterator batchIter; + private final ArrowReader arrowReader; + private final CloseableIterable tasks; + private final InputStats inputStats; + + private ColumnarBatch currentBatch = null; + private int rowIndexInBatch = 0; + private boolean exhausted = false; + + ArrowInputRowIterator( + final org.apache.iceberg.io.CloseableIterator batchIter, + final ArrowReader arrowReader, + final CloseableIterable tasks, + final InputStats inputStats + ) + { + this.batchIter = batchIter; + this.arrowReader = arrowReader; + this.tasks = tasks; + this.inputStats = inputStats; + } + + @Override + public boolean hasNext() + { + if (exhausted) { + return false; + } + if (currentBatch != null && rowIndexInBatch < currentBatch.numRows()) { + return true; + } + return loadNextBatch(); + } + + @Override + public InputRow next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return batchRowToInputRow(currentBatch, rowIndexInBatch++); + } + + private boolean loadNextBatch() + { + while (batchIter.hasNext()) { + currentBatch = batchIter.next(); + rowIndexInBatch = 0; + if (currentBatch.numRows() > 0) { + inputStats.incrementProcessedBytes(estimateBatchBytes(currentBatch)); + return true; + } + } + exhausted = true; + return false; + } + + private long estimateBatchBytes(final ColumnarBatch batch) + { + long bytes = 0; + for (int col = 0; col < batch.numCols(); col++) { + bytes += batch.column(col).getFieldVector().getBufferSize(); + } + return bytes; + } + + @Override + public void close() throws IOException + { + try { + batchIter.close(); + } + finally { + try { + arrowReader.close(); + } + finally { + tasks.close(); + } + } + } + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java index 2c8de41bb386..63894e18ff23 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; @@ -37,6 +38,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.joda.time.DateTime; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -59,6 +61,39 @@ public boolean isCaseSensitive() return true; } + /** + * Load and return the Iceberg Table object for direct use by readers that go beyond file-path delegation. + */ + public Table retrieveTable(String tableNamespace, String tableName) + { + final Catalog catalog = retrieveCatalog(); + final Namespace namespace = Namespace.of(tableNamespace); + final String tableIdentifier = tableNamespace + "." + tableName; + + final ClassLoader currCtxClassloader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + final TableIdentifier icebergTableIdentifier = catalog.listTables(namespace).stream() + .filter(id -> id.toString().equals(tableIdentifier)) + .findFirst() + .orElseThrow(() -> new IAE( + "Couldn't retrieve table identifier for '%s'." + + " Please verify that the table exists in the given catalog", + tableIdentifier + )); + return catalog.loadTable(icebergTableIdentifier); + } + catch (IAE e) { + throw e; + } + catch (Exception e) { + throw new RE(e, "Failed to load iceberg table with identifier [%s]", tableIdentifier); + } + finally { + Thread.currentThread().setContextClassLoader(currCtxClassloader); + } + } + /** * Extract the iceberg data files upto the latest snapshot associated with the table * @@ -106,37 +141,11 @@ public List extractSnapshotDataFiles( } tableScan = tableScan.caseSensitive(isCaseSensitive()); - CloseableIterable tasks = tableScan.planFiles(); - - Expression detectedResidual = null; - for (FileScanTask task : tasks) { - dataFilePaths.add(task.file().location()); - - // Check for residual filters - if (detectedResidual == null) { - Expression residual = task.residual(); - if (residual != null && !residual.equals(Expressions.alwaysTrue())) { - detectedResidual = residual; - } - } - } - - // Handle residual filter based on mode - if (detectedResidual != null) { - String message = StringUtils.format( - "Iceberg filter produced residual expression that requires row-level filtering. " - + "This typically means the filter is on a non-partition column. " - + "Residual rows may be ingested unless filtered by transformSpec. " - + "Residual filter: [%s]", - detectedResidual - ); - - if (residualFilterMode == ResidualFilterMode.FAIL) { - throw DruidException.forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build(message); + enforceResidualMode(tableScan, residualFilterMode); + try (CloseableIterable tasks = tableScan.planFiles()) { + for (FileScanTask task : tasks) { + dataFilePaths.add(task.file().location()); } - log.warn(message); } long duration = System.currentTimeMillis() - start; @@ -153,4 +162,43 @@ public List extractSnapshotDataFiles( } return dataFilePaths; } + + /** + * Detects whether the planned scan carries a non-trivial residual expression (a filter that + * could not be fully resolved by partition pruning) and applies {@link ResidualFilterMode}: + * {@code FAIL} throws a {@link DruidException}, {@code IGNORE} logs a warning. Shared by the + * path-based and Arrow reader paths. + */ + public void enforceResidualMode(TableScan tableScan, ResidualFilterMode residualFilterMode) + { + Expression detectedResidual = null; + try (CloseableIterable tasks = tableScan.planFiles()) { + for (FileScanTask task : tasks) { + final Expression residual = task.residual(); + if (residual != null && !residual.equals(Expressions.alwaysTrue())) { + detectedResidual = residual; + break; + } + } + } + catch (IOException e) { + throw new RE(e, "Failed to plan Iceberg scan for residual detection"); + } + if (detectedResidual == null) { + return; + } + final String message = StringUtils.format( + "Iceberg filter produced residual expression that requires row-level filtering. " + + "This typically means the filter is on a non-partition column. " + + "Residual rows may be ingested unless filtered by transformSpec. " + + "Residual filter: [%s]", + detectedResidual + ); + if (residualFilterMode == ResidualFilterMode.FAIL) { + throw DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(message); + } + log.warn(message); + } } diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java index ccbb10af14dc..cdd80462f289 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import org.apache.druid.common.config.Configs; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowListPlusRawValues; @@ -36,6 +35,7 @@ import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.iceberg.filter.IcebergFilter; import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.joda.time.DateTime; @@ -46,38 +46,16 @@ import java.util.List; import java.util.stream.Stream; -/** - * Inputsource to ingest data managed by the Iceberg table format. - * This inputsource talks to the configured catalog, executes any configured filters and retrieves the data file paths upto the latest snapshot associated with the iceberg table. - * The data file paths are then provided to a native {@link SplittableInputSource} implementation depending on the warehouse source defined. - */ -public class IcebergInputSource implements SplittableInputSource> +public class IcebergInputSource extends AbstractIcebergInputSource + implements SplittableInputSource> { public static final String TYPE_KEY = "iceberg"; + private static final Logger log = new Logger(IcebergInputSource.class); @JsonProperty - private final String tableName; - - @JsonProperty - private final String namespace; - - @JsonProperty - private IcebergCatalog icebergCatalog; - - @JsonProperty - private IcebergFilter icebergFilter; - - @JsonProperty - private InputSourceFactory warehouseSource; - - @JsonProperty - private final DateTime snapshotTime; - - @JsonProperty - private final ResidualFilterMode residualFilterMode; + private final InputSourceFactory warehouseSource; private boolean isLoaded = false; - private SplittableInputSource delegateInputSource; @JsonCreator @@ -88,16 +66,27 @@ public IcebergInputSource( @JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog, @JsonProperty("warehouseSource") InputSourceFactory warehouseSource, @JsonProperty("snapshotTime") @Nullable DateTime snapshotTime, - @JsonProperty("residualFilterMode") @Nullable ResidualFilterMode residualFilterMode + @JsonProperty("residualFilterMode") @Nullable ResidualFilterMode residualFilterMode, + // deprecated: use type "iceberg_arrow" instead. retained for spec back-compat. + @JsonProperty("useArrowReader") @Nullable Boolean useArrowReader, + @JsonProperty("arrowBatchSize") @Nullable Integer arrowBatchSize ) { - this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null"); - this.namespace = Preconditions.checkNotNull(namespace, "namespace cannot be null"); - this.icebergCatalog = Preconditions.checkNotNull(icebergCatalog, "icebergCatalog cannot be null"); - this.icebergFilter = icebergFilter; + super(tableName, namespace, icebergFilter, icebergCatalog, snapshotTime, residualFilterMode); this.warehouseSource = Preconditions.checkNotNull(warehouseSource, "warehouseSource cannot be null"); - this.snapshotTime = snapshotTime; - this.residualFilterMode = Configs.valueOrDefault(residualFilterMode, ResidualFilterMode.IGNORE); + if (useArrowReader != null && useArrowReader) { + log.warn( + "useArrowReader on type[iceberg] is deprecated and ignored; " + + "switch to type[%s] for Arrow vectorized reads", + IcebergArrowInputSource.TYPE_KEY + ); + } + } + + @JsonProperty("warehouseSource") + public InputSourceFactory getWarehouseSource() + { + return warehouseSource; } @Override @@ -152,43 +141,6 @@ public SplitHintSpec getSplitHintSpecOrDefault(@Nullable SplitHintSpec splitHint return getDelegateInputSource().getSplitHintSpecOrDefault(splitHintSpec); } - @JsonProperty - public String getTableName() - { - return tableName; - } - - @JsonProperty - public String getNamespace() - { - return namespace; - } - - @JsonProperty - public IcebergCatalog getIcebergCatalog() - { - return icebergCatalog; - } - - @JsonProperty - public IcebergFilter getIcebergFilter() - { - return icebergFilter; - } - - @Nullable - @JsonProperty - public DateTime getSnapshotTime() - { - return snapshotTime; - } - - @JsonProperty - public ResidualFilterMode getResidualFilterMode() - { - return residualFilterMode; - } - public SplittableInputSource getDelegateInputSource() { return delegateInputSource; @@ -196,7 +148,7 @@ public SplittableInputSource getDelegateInputSource() protected void retrieveIcebergDatafiles() { - List snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles( + final List snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles( getNamespace(), getTableName(), getIcebergFilter(), @@ -211,11 +163,6 @@ protected void retrieveIcebergDatafiles() isLoaded = true; } - /** - * This input source is used in place of a delegate input source if there are no input file paths. - * Certain input sources cannot be instantiated with an empty input file list and so composing input sources such as IcebergInputSource - * may use this input source as delegate in such cases. - */ private static class EmptyInputSource implements SplittableInputSource { @Override @@ -242,15 +189,13 @@ public InputSourceReader reader( @Override public CloseableIterator read(InputStats inputStats) { - return CloseableIterators.wrap(Collections.emptyIterator(), () -> { - }); + return CloseableIterators.wrap(Collections.emptyIterator(), () -> {}); } @Override public CloseableIterator sample() { - return CloseableIterators.wrap(Collections.emptyIterator(), () -> { - }); + return CloseableIterators.wrap(Collections.emptyIterator(), () -> {}); } }; } @@ -273,7 +218,7 @@ public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec sp @Override public InputSource withSplit(InputSplit split) { - return null; + return this; } } } diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java new file mode 100644 index 000000000000..db1f74144283 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.iceberg.filter.IcebergEqualsFilter; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class IcebergArrowInputSourceReaderTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final String NAMESPACE = "default"; + private static final String TABLE = "arrowTestTable"; + + private static final Schema SCHEMA = new Schema( + Types.NestedField.required(1, "ts", Types.LongType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()), + Types.NestedField.required(3, "value", Types.DoubleType.get()) + ); + + private static final InputRowSchema INPUT_SCHEMA = new InputRowSchema( + new TimestampSpec("ts", "millis", null), + DimensionsSpec.builder() + .setDimensions(ImmutableList.of( + new StringDimensionSchema("name") + )) + .build(), + ColumnsFilter.all() + ); + + private File warehouseDir; + private IcebergCatalog catalog; + private TableIdentifier tableId; + + @Before + public void setup() throws IOException + { + warehouseDir = FileUtils.createTempDir(); + catalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), true); + tableId = TableIdentifier.of(Namespace.of(NAMESPACE), TABLE); + } + + @After + public void tearDown() + { + if (catalog.retrieveCatalog().tableExists(tableId)) { + catalog.retrieveCatalog().dropTable(tableId); + } + } + + @Test + public void testBasicRead() throws IOException + { + final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); + writeRows(table, row(1_000L, "alice", 1.1), row(2_000L, "bob", 2.2), row(3_000L, "carol", 3.3)); + + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, + null, + null, + true, + INPUT_SCHEMA, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + final List rows = readAll(reader); + Assert.assertEquals(3, rows.size()); + Assert.assertEquals(1_000L, rows.get(0).getTimestampFromEpoch()); + Assert.assertEquals("alice", rows.get(0).getDimension("name").get(0)); + Assert.assertEquals(2_000L, rows.get(1).getTimestampFromEpoch()); + Assert.assertEquals("bob", rows.get(1).getDimension("name").get(0)); + Assert.assertEquals(3_000L, rows.get(2).getTimestampFromEpoch()); + Assert.assertEquals("carol", rows.get(2).getDimension("name").get(0)); + } + + @Test + public void testEmptyTable() throws IOException + { + catalog.retrieveCatalog().createTable(tableId, SCHEMA); + final Table table = catalog.retrieveTable(NAMESPACE, TABLE); + + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, + null, + null, + true, + INPUT_SCHEMA, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + final List rows = readAll(reader); + Assert.assertEquals(0, rows.size()); + } + + @Test + public void testWithEqualsFilter() throws IOException + { + final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); + writeRows( + table, + row(1_000L, "alice", 1.0), + row(2_000L, "bob", 2.0), + row(3_000L, "alice", 3.0) + ); + + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, + new IcebergEqualsFilter("name", "alice"), + null, + true, + INPUT_SCHEMA, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + // iceberg-arrow may dict-encode repeated string columns; assert row count only. + final List rows = readAll(reader); + Assert.assertEquals(3, rows.size()); + } + + @Test + public void testColumnPruning() throws IOException + { + final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); + writeRows(table, row(1_000L, "alice", 9.9)); + + final InputRowSchema pruned = new InputRowSchema( + new TimestampSpec("ts", "millis", null), + DimensionsSpec.builder() + .setDimensions(ImmutableList.of(new StringDimensionSchema("name"))) + .build(), + ColumnsFilter.all() + ); + + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, + null, + null, + true, + pruned, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + final List rows = readAll(reader); + Assert.assertEquals(1, rows.size()); + Assert.assertEquals(1_000L, rows.get(0).getTimestampFromEpoch()); + Assert.assertEquals("alice", rows.get(0).getDimension("name").get(0)); + } + + @Test + public void testLargeBatch() throws IOException + { + final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); + final int count = 5_000; + final GenericRecord[] data = new GenericRecord[count]; + for (int i = 0; i < count; i++) { + data[i] = row((long) (i + 1) * 1000, "user" + i, i * 0.1); + } + writeRows(table, data); + + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, + null, + null, + true, + INPUT_SCHEMA, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + final List rows = readAll(reader); + Assert.assertEquals(count, rows.size()); + } + + @Test + public void testSnapshotTime() throws IOException, InterruptedException + { + final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); + writeRows(table, row(1_000L, "snap1", 1.0)); + final long afterFirstSnapshot = System.currentTimeMillis(); + + Thread.sleep(10); + writeRows(table, row(2_000L, "snap2", 2.0)); + + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, + null, + DateTimes.utc(afterFirstSnapshot), + true, + INPUT_SCHEMA, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + final List rows = readAll(reader); + Assert.assertEquals(1, rows.size()); + Assert.assertEquals("snap1", rows.get(0).getDimension("name").get(0)); + } + + @Test + public void testAggregatorSourceColumnSurvivesProjection() throws IOException + { + // Regression: dimensions=[name] plus ColumnsFilter inclusion of `value` (aggregator source). + final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); + writeRows(table, row(1_000L, "alice", 9.0), row(2_000L, "bob", 4.5)); + + final InputRowSchema schemaWithAggSource = new InputRowSchema( + new TimestampSpec("ts", "millis", null), + DimensionsSpec.builder() + .setDimensions(ImmutableList.of(new StringDimensionSchema("name"))) + .build(), + ColumnsFilter.inclusionBased(ImmutableSet.of("ts", "name", "value")) + ); + + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, + null, + null, + true, + schemaWithAggSource, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + final List rows = readAll(reader); + Assert.assertEquals(2, rows.size()); + final Map event0 = ((MapBasedInputRow) rows.get(0)).getEvent(); + Assert.assertEquals("alice", event0.get("name")); + Assert.assertNotNull("aggregator source column 'value' must survive projection", event0.get("value")); + Assert.assertEquals(9.0, ((Number) event0.get("value")).doubleValue(), 0.0001); + final Map event1 = ((MapBasedInputRow) rows.get(1)).getEvent(); + Assert.assertEquals("bob", event1.get("name")); + Assert.assertNotNull("aggregator source column 'value' must survive projection", event1.get("value")); + Assert.assertEquals(4.5, ((Number) event1.get("value")).doubleValue(), 0.0001); + } + + @Test + public void testProjectionPrunesUnusedColumns() throws IOException + { + final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); + writeRows(table, row(1_000L, "alice", 7.0), row(2_000L, "bob", 8.0)); + + // Exclusion-based filter must push projection so excluded columns are never read. + final InputRowSchema prunedSchema = new InputRowSchema( + new TimestampSpec("ts", "millis", null), + DimensionsSpec.builder() + .setDimensions(ImmutableList.of(new StringDimensionSchema("name"))) + .build(), + ColumnsFilter.exclusionBased(ImmutableSet.of("value")) + ); + + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, + null, + null, + true, + prunedSchema, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + final List rows = readAll(reader); + Assert.assertEquals(2, rows.size()); + for (final InputRow r : rows) { + final Map event = ((MapBasedInputRow) r).getEvent(); + Assert.assertNull( + "excluded column 'value' must be pruned at scan and absent from event", + event.get("value") + ); + Assert.assertNotNull("included column 'name' must be present", event.get("name")); + } + } + + // --- helpers --- + + private static GenericRecord row(final long ts, final String name, final double value) + { + final GenericRecord r = GenericRecord.create(SCHEMA); + r.setField("ts", ts); + r.setField("name", name); + r.setField("value", value); + return r; + } + + private static void writeRows(final Table table, final GenericRecord... records) throws IOException + { + final String filepath = table.location() + "/" + UUID.randomUUID() + ".parquet"; + final OutputFile file = table.io().newOutputFile(filepath); + final DataWriter writer = + Parquet.writeData(file) + .schema(SCHEMA) + .createWriterFunc(GenericParquetWriter::create) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build(); + try { + for (final GenericRecord r : records) { + writer.write(r); + } + } + finally { + writer.close(); + } + final DataFile dataFile = writer.toDataFile(); + table.newAppend().appendFile(dataFile).commit(); + } + + private static List readAll(final IcebergArrowInputSourceReader reader) throws IOException + { + final List result = new ArrayList<>(); + try (CloseableIterator it = reader.read(new NoopInputStats())) { + while (it.hasNext()) { + result.add(it.next()); + } + } + return result; + } + + private static final class NoopInputStats implements org.apache.druid.data.input.InputStats + { + @Override + public void incrementProcessedBytes(final long v) + { + } + + @Override + public long getProcessedBytes() + { + return 0; + } + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java new file mode 100644 index 000000000000..4d4818f73945 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.error.DruidException; +import org.apache.druid.iceberg.filter.IcebergEqualsFilter; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public class IcebergArrowInputSourceTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private IcebergCatalog testCatalog; + private TableIdentifier tableIdentifier; + private File warehouseDir; + + private final Schema tableSchema = new Schema( + Types.NestedField.required(1, "id", Types.StringType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()) + ); + private final Map tableData = ImmutableMap.of("id", "123988", "name", "Foo"); + + private static final String NAMESPACE = "default"; + private static final String TABLENAME = "foosTable"; + + @Before + public void setup() throws IOException + { + warehouseDir = FileUtils.createTempDir(); + testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), true); + tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME); + createAndLoadTable(tableIdentifier); + } + + @After + public void tearDown() + { + dropTableFromCatalog(tableIdentifier); + } + + @Test + public void testReadWithNullInputStatsDoesNotNpe() throws IOException + { + final IcebergArrowInputSource src = new IcebergArrowInputSource( + TABLENAME, + NAMESPACE, + null, + testCatalog, + null, + null, + 1024 + ); + final InputRowSchema schemaWithMissingTs = new InputRowSchema( + new TimestampSpec(null, null, org.apache.druid.java.util.common.DateTimes.utc(0L)), + DimensionsSpec.builder().build(), + ColumnsFilter.all() + ); + final InputSourceReader reader = src.reader(schemaWithMissingTs, null, FileUtils.createTempDir()); + try (org.apache.druid.java.util.common.parsers.CloseableIterator it = reader.read()) { + while (it.hasNext()) { + Assert.assertNotNull(it.next()); + } + } + } + + @Test + public void testIsNotSplittable() + { + final IcebergArrowInputSource src = new IcebergArrowInputSource( + TABLENAME, + NAMESPACE, + null, + testCatalog, + null, + null, + 1024 + ); + Assert.assertFalse(src.isSplittable()); + Assert.assertFalse(src.needsFormat()); + } + + @Test + public void testResidualFilterModeFail() throws IOException + { + final IcebergArrowInputSource src = new IcebergArrowInputSource( + TABLENAME, + NAMESPACE, + new IcebergEqualsFilter("id", "123988"), + testCatalog, + null, + ResidualFilterMode.FAIL, + 1024 + ); + final InputRowSchema inputRowSchema = new InputRowSchema( + new TimestampSpec("timestamp", "millis", null), + DimensionsSpec.builder().build(), + ColumnsFilter.all() + ); + final DruidException ex = Assert.assertThrows( + DruidException.class, + () -> { + final InputSourceReader reader = src.reader(inputRowSchema, null, FileUtils.createTempDir()); + reader.read().close(); + } + ); + Assert.assertTrue( + "Expected residual error: " + ex.getMessage(), + ex.getMessage().contains("residual") + ); + } + + @Test + public void testResidualFilterModeFailUsesSnapshotTime() throws Exception + { + final String filterId = (String) tableData.get("id"); + dropTableFromCatalog(tableIdentifier); + final PartitionSpec partitionSpec = PartitionSpec.builderFor(tableSchema) + .identity("id") + .build(); + final Table table = testCatalog.retrieveCatalog().createTable(tableIdentifier, tableSchema, partitionSpec); + appendRow(table, partitionSpec, tableData); + + final long afterPartitionedSnapshot = System.currentTimeMillis(); + Thread.sleep(10); + + table.updateSpec().removeField("id").commit(); + appendRow(table, table.spec(), ImmutableMap.of("id", filterId, "name", "Bar")); + + final IcebergArrowInputSource src = new IcebergArrowInputSource( + TABLENAME, + NAMESPACE, + new IcebergEqualsFilter("id", filterId), + testCatalog, + org.apache.druid.java.util.common.DateTimes.utc(afterPartitionedSnapshot), + ResidualFilterMode.FAIL, + 1024 + ); + final InputRowSchema inputRowSchema = new InputRowSchema( + new TimestampSpec(null, null, org.apache.druid.java.util.common.DateTimes.utc(0L)), + DimensionsSpec.builder().build(), + ColumnsFilter.all() + ); + + final InputSourceReader reader = src.reader(inputRowSchema, null, FileUtils.createTempDir()); + reader.read().close(); + } + + private void createAndLoadTable(TableIdentifier id) throws IOException + { + final Table table = testCatalog.retrieveCatalog().createTable(id, tableSchema, PartitionSpec.unpartitioned()); + appendRow(table, PartitionSpec.unpartitioned(), tableData); + } + + private void appendRow(Table table, PartitionSpec partitionSpec, Map rowData) throws IOException + { + final String fname = UUID.randomUUID() + ".parquet"; + final File dataFile = new File(warehouseDir.getAbsolutePath() + "/" + fname); + Assert.assertTrue(dataFile.createNewFile()); + final OutputFile out = Files.localOutput(dataFile); + final GenericRecord row = GenericRecord.create(tableSchema); + row.setField("id", rowData.get("id")); + row.setField("name", rowData.get("name")); + final DataWriter writer; + if (partitionSpec.isUnpartitioned()) { + writer = Parquet.writeData(out) + .schema(tableSchema) + .createWriterFunc(GenericParquetWriter::create) + .overwrite() + .withSpec(partitionSpec) + .build(); + } else { + final PartitionKey partitionKey = new PartitionKey(partitionSpec, tableSchema); + partitionKey.partition(row); + writer = Parquet.writeData(out) + .schema(tableSchema) + .createWriterFunc(GenericParquetWriter::create) + .overwrite() + .withSpec(partitionSpec) + .withPartition(partitionKey) + .build(); + } + try { + writer.write(row); + } + finally { + writer.close(); + } + final DataFile df = writer.toDataFile(); + table.newAppend().appendFile(df).commit(); + } + + private void dropTableFromCatalog(TableIdentifier id) + { + testCatalog.retrieveCatalog().dropTable(id); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java index 93d7412cc77a..fdd891c69c60 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java @@ -100,6 +100,8 @@ public void testInputSource() throws IOException testCatalog, new LocalInputSourceFactory(), null, + null, + null, null ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); @@ -136,6 +138,8 @@ public void testInputSourceWithEmptySource() throws IOException testCatalog, new LocalInputSourceFactory(), null, + null, + null, null ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); @@ -152,6 +156,8 @@ public void testInputSourceWithFilter() throws IOException testCatalog, new LocalInputSourceFactory(), null, + null, + null, null ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); @@ -188,6 +194,8 @@ public void testInputSourceReadFromLatestSnapshot() throws IOException testCatalog, new LocalInputSourceFactory(), DateTimes.nowUtc(), + null, + null, null ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); @@ -208,6 +216,8 @@ public void testCaseInsensitiveFiltering() throws IOException caseInsensitiveCatalog, new LocalInputSourceFactory(), null, + null, + null, null ); @@ -233,7 +243,9 @@ public void testResidualFilterModeIgnore() throws IOException testCatalog, new LocalInputSourceFactory(), null, - ResidualFilterMode.IGNORE + ResidualFilterMode.IGNORE, + null, + null ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); Assert.assertEquals(1, splits.count()); @@ -250,7 +262,9 @@ public void testResidualFilterModeFail() throws IOException testCatalog, new LocalInputSourceFactory(), null, - ResidualFilterMode.FAIL + ResidualFilterMode.FAIL, + null, + null ); DruidException exception = Assert.assertThrows( DruidException.class, @@ -278,7 +292,9 @@ public void testResidualFilterModeFailWithPartitionedTable() throws IOException testCatalog, new LocalInputSourceFactory(), null, - ResidualFilterMode.FAIL + ResidualFilterMode.FAIL, + null, + null ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); Assert.assertEquals(1, splits.count()); @@ -301,7 +317,9 @@ public void testResidualFilterModeFailWithPartitionedTableNonPartitionColumn() t testCatalog, new LocalInputSourceFactory(), null, - ResidualFilterMode.FAIL + ResidualFilterMode.FAIL, + null, + null ); DruidException exception = Assert.assertThrows( DruidException.class, diff --git a/licenses.yaml b/licenses.yaml index 5712ec2a9247..e4baf4a1c6a3 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -7136,3 +7136,72 @@ license_name: Apache License version 2.0 version: 3.30.2-GA libraries: - org.javassist: javassist + +--- + +name: Apache Iceberg Arrow +license_category: binary +module: extensions-contrib/druid-iceberg-extensions +license_name: Apache License version 2.0 +version: 1.10.0 +libraries: + - org.apache.iceberg: iceberg-arrow + +--- + +name: Apache Arrow +license_category: binary +module: extensions-contrib/druid-iceberg-extensions +license_name: Apache License version 2.0 +version: 15.0.2 +libraries: + - org.apache.arrow: arrow-format + - org.apache.arrow: arrow-memory-core + - org.apache.arrow: arrow-memory-unsafe + - org.apache.arrow: arrow-vector +notices: + - arrow-format: | + Arrow Format + Copyright 2024 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + - arrow-memory-core: | + Arrow Memory - Core + Copyright 2024 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + - arrow-memory-unsafe: | + Arrow Memory - Unsafe + Copyright 2024 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + - arrow-vector: | + Arrow Vectors + Copyright 2024 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + +--- + +name: FlatBuffers Java API +license_category: binary +module: extensions-contrib/druid-iceberg-extensions +license_name: Apache License version 2.0 +version: 23.5.26 +libraries: + - com.google.flatbuffers: flatbuffers-java + +--- + +name: Eclipse Collections +license_category: binary +module: extensions-contrib/druid-iceberg-extensions +license_name: Eclipse Public License 1.0 +version: 11.1.0 +libraries: + - org.eclipse.collections: eclipse-collections + - org.eclipse.collections: eclipse-collections-api diff --git a/pom.xml b/pom.xml index bab799d96ad1..766076af2c3c 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,7 @@ 6.0.0 2.2 1.10.0 + 15.0.2 12.1.8 1.19.4 2.21.3 @@ -430,6 +431,23 @@ 2.5.2 + + + org.apache.arrow + arrow-vector + ${arrow.version} + + + org.apache.arrow + arrow-memory-unsafe + ${arrow.version} + + + org.apache.iceberg + iceberg-arrow + ${iceberg.core.version} + +