|
| 1 | +<!-- |
| 2 | +Licensed to the Apache Software Foundation (ASF) under one |
| 3 | +or more contributor license agreements. See the NOTICE file |
| 4 | +distributed with this work for additional information |
| 5 | +regarding copyright ownership. The ASF licenses this file |
| 6 | +to you under the Apache License, Version 2.0 (the |
| 7 | +"License"); you may not use this file except in compliance |
| 8 | +with the License. You may obtain a copy of the License at |
| 9 | +
|
| 10 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | +
|
| 12 | +Unless required by applicable law or agreed to in writing, |
| 13 | +software distributed under the License is distributed on an |
| 14 | +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | +KIND, either express or implied. See the License for the |
| 16 | +specific language governing permissions and limitations |
| 17 | +under the License. |
| 18 | +--> |
| 19 | + |
| 20 | +# Public API for Apache Iceberg Integration |
| 21 | + |
| 22 | +This document describes the Comet classes and methods that form the public API used by |
| 23 | +[Apache Iceberg](https://iceberg.apache.org/). These APIs enable Iceberg to leverage Comet's |
| 24 | +native Parquet reader for vectorized reads in Spark. |
| 25 | + |
| 26 | +**Important**: Changes to these APIs may break Iceberg's Comet integration. Contributors should |
| 27 | +exercise caution when modifying these classes and consider backward compatibility. |
| 28 | + |
| 29 | +## Overview |
| 30 | + |
| 31 | +Iceberg uses Comet's Parquet reading infrastructure to accelerate table scans. The integration |
| 32 | +uses two approaches: |
| 33 | + |
| 34 | +1. **Hybrid Reader**: Native Parquet decoding with JVM-based I/O (requires building Iceberg |
| 35 | + from source with Comet patches) |
| 36 | +2. **Native Reader**: Fully-native Iceberg scans using iceberg-rust |
| 37 | + |
| 38 | +This document focuses on the Hybrid Reader API, which is used when Iceberg is configured with |
| 39 | +`spark.sql.iceberg.parquet.reader-type=COMET`. |
| 40 | + |
| 41 | +## Package: `org.apache.comet.parquet` |
| 42 | + |
| 43 | +### FileReader |
| 44 | + |
| 45 | +Main class for reading Parquet files with native decoding. |
| 46 | + |
| 47 | +```java |
| 48 | +// Constructor |
| 49 | +public FileReader( |
| 50 | + WrappedInputFile inputFile, |
| 51 | + ReadOptions options, |
| 52 | + Map<String, String> properties, |
| 53 | + Long start, |
| 54 | + Long length, |
| 55 | + byte[] fileEncryptionKey, |
| 56 | + byte[] fileAADPrefix |
| 57 | +) throws IOException |
| 58 | + |
| 59 | +// Methods used by Iceberg |
| 60 | +public void setRequestedSchemaFromSpecs(List<ParquetColumnSpec> specs) |
| 61 | +public RowGroupReader readNextRowGroup() throws IOException |
| 62 | +public void skipNextRowGroup() |
| 63 | +public void close() throws IOException |
| 64 | +``` |
| 65 | + |
| 66 | +### RowGroupReader |
| 67 | + |
| 68 | +Provides access to row group data. |
| 69 | + |
| 70 | +```java |
| 71 | +// Methods used by Iceberg |
| 72 | +public long getRowCount() |
| 73 | +``` |
| 74 | + |
| 75 | +### ReadOptions |
| 76 | + |
| 77 | +Configuration for Parquet read operations. |
| 78 | + |
| 79 | +```java |
| 80 | +// Builder pattern |
| 81 | +public static Builder builder(Configuration conf) |
| 82 | + |
| 83 | +public class Builder { |
| 84 | + public ReadOptions build() |
| 85 | +} |
| 86 | +``` |
| 87 | + |
| 88 | +### WrappedInputFile |
| 89 | + |
| 90 | +Wrapper that adapts Iceberg's `InputFile` interface to Comet's file reading infrastructure. |
| 91 | + |
| 92 | +```java |
| 93 | +// Constructor |
| 94 | +public WrappedInputFile(org.apache.iceberg.io.InputFile inputFile) |
| 95 | +``` |
| 96 | + |
| 97 | +### ParquetColumnSpec |
| 98 | + |
| 99 | +Specification describing a Parquet column's schema information. |
| 100 | + |
| 101 | +```java |
| 102 | +// Constructor |
| 103 | +public ParquetColumnSpec( |
| 104 | + int fieldId, |
| 105 | + String[] path, |
| 106 | + String physicalType, |
| 107 | + int typeLength, |
| 108 | + boolean isRepeated, |
| 109 | + int maxDefinitionLevel, |
| 110 | + int maxRepetitionLevel, |
| 111 | + String logicalTypeName, |
| 112 | + Map<String, String> logicalTypeParams |
| 113 | +) |
| 114 | + |
| 115 | +// Getters used by Iceberg |
| 116 | +public int getFieldId() |
| 117 | +public String[] getPath() |
| 118 | +public String getPhysicalType() |
| 119 | +public int getTypeLength() |
| 120 | +public int getMaxDefinitionLevel() |
| 121 | +public int getMaxRepetitionLevel() |
| 122 | +public String getLogicalTypeName() |
| 123 | +public Map<String, String> getLogicalTypeParams() |
| 124 | +``` |
| 125 | + |
| 126 | +### AbstractColumnReader |
| 127 | + |
| 128 | +Base class for column readers. |
| 129 | + |
| 130 | +```java |
| 131 | +// Methods used by Iceberg |
| 132 | +public void setBatchSize(int batchSize) |
| 133 | +public void close() |
| 134 | +``` |
| 135 | + |
| 136 | +### ColumnReader |
| 137 | + |
| 138 | +Column reader for regular Parquet columns (extends `AbstractColumnReader`). |
| 139 | + |
| 140 | +```java |
| 141 | +// Methods used by Iceberg |
| 142 | +public void setPageReader(PageReader pageReader) throws IOException |
| 143 | +``` |
| 144 | + |
| 145 | +### BatchReader |
| 146 | + |
| 147 | +Coordinates reading batches across multiple column readers. |
| 148 | + |
| 149 | +```java |
| 150 | +// Constructor |
| 151 | +public BatchReader(AbstractColumnReader[] columnReaders) |
| 152 | + |
| 153 | +// Methods used by Iceberg |
| 154 | +public void setSparkSchema(StructType schema) |
| 155 | +public AbstractColumnReader[] getColumnReaders() |
| 156 | +public void nextBatch(int batchSize) |
| 157 | +``` |
| 158 | + |
| 159 | +### MetadataColumnReader |
| 160 | + |
| 161 | +Reader for metadata columns (used for Iceberg's delete and position columns). |
| 162 | + |
| 163 | +```java |
| 164 | +// Constructor |
| 165 | +public MetadataColumnReader( |
| 166 | + DataType sparkType, |
| 167 | + ColumnDescriptor descriptor, |
| 168 | + boolean useDecimal128, |
| 169 | + boolean isConstant |
| 170 | +) |
| 171 | + |
| 172 | +// Methods used by Iceberg |
| 173 | +public void readBatch(int total) |
| 174 | +public CometVector currentBatch() |
| 175 | + |
| 176 | +// Protected field accessed by subclasses |
| 177 | +protected long nativeHandle |
| 178 | +``` |
| 179 | + |
| 180 | +### ConstantColumnReader |
| 181 | + |
| 182 | +Reader for columns with constant/default values (extends `MetadataColumnReader`). |
| 183 | + |
| 184 | +```java |
| 185 | +// Constructor |
| 186 | +public ConstantColumnReader( |
| 187 | + DataType sparkType, |
| 188 | + ColumnDescriptor descriptor, |
| 189 | + Object value, |
| 190 | + boolean useDecimal128 |
| 191 | +) |
| 192 | +``` |
| 193 | + |
| 194 | +### Native |
| 195 | + |
| 196 | +JNI interface for native operations. |
| 197 | + |
| 198 | +```java |
| 199 | +// Static methods used by Iceberg |
| 200 | +public static void resetBatch(long nativeHandle) |
| 201 | +public static void setIsDeleted(long nativeHandle, boolean[] isDeleted) |
| 202 | +public static void setPosition(long nativeHandle, long position, int total) |
| 203 | +``` |
| 204 | + |
| 205 | +### TypeUtil |
| 206 | + |
| 207 | +Utilities for Parquet type conversions. |
| 208 | + |
| 209 | +```java |
| 210 | +// Methods used by Iceberg |
| 211 | +public static ColumnDescriptor convertToParquet(StructField sparkField) |
| 212 | +``` |
| 213 | + |
| 214 | +### Utils |
| 215 | + |
| 216 | +General utility methods. |
| 217 | + |
| 218 | +```java |
| 219 | +// Methods used by Iceberg |
| 220 | +public static AbstractColumnReader getColumnReader( |
| 221 | + DataType sparkType, |
| 222 | + ColumnDescriptor descriptor, |
| 223 | + CometSchemaImporter importer, |
| 224 | + int batchSize, |
| 225 | + boolean useDecimal128, |
| 226 | + boolean isConstant |
| 227 | +) |
| 228 | +``` |
| 229 | + |
| 230 | +## Package: `org.apache.comet` |
| 231 | + |
| 232 | +### CometSchemaImporter |
| 233 | + |
| 234 | +Imports and converts schemas between Arrow and Spark formats. |
| 235 | + |
| 236 | +```java |
| 237 | +// Constructor |
| 238 | +public CometSchemaImporter(RootAllocator allocator) |
| 239 | + |
| 240 | +// Methods used by Iceberg |
| 241 | +public void close() |
| 242 | +``` |
| 243 | + |
| 244 | +## Package: `org.apache.comet.vector` |
| 245 | + |
| 246 | +### CometVector |
| 247 | + |
| 248 | +Base class for Comet's columnar vectors (extends Spark's `ColumnVector`). |
| 249 | + |
| 250 | +```java |
| 251 | +// Constructor |
| 252 | +public CometVector(DataType type, boolean useDecimal128) |
| 253 | + |
| 254 | +// Abstract methods that subclasses must implement |
| 255 | +public abstract int numValues() |
| 256 | +public abstract ValueVector getValueVector() |
| 257 | +public abstract CometVector slice(int offset, int length) |
| 258 | +public abstract void setNumNulls(int numNulls) |
| 259 | +public abstract void setNumValues(int numValues) |
| 260 | + |
| 261 | +// Inherited from Spark ColumnVector - commonly overridden |
| 262 | +public abstract void close() |
| 263 | +public abstract boolean hasNull() |
| 264 | +public abstract int numNulls() |
| 265 | +public abstract boolean isNullAt(int rowId) |
| 266 | +public abstract boolean getBoolean(int rowId) |
| 267 | +// ... other type-specific getters |
| 268 | +``` |
| 269 | + |
| 270 | +## Package: `org.apache.comet.shaded.arrow.memory` |
| 271 | + |
| 272 | +### RootAllocator |
| 273 | + |
| 274 | +Arrow memory allocator (shaded to avoid conflicts). |
| 275 | + |
| 276 | +```java |
| 277 | +// Constructor used by Iceberg |
| 278 | +public RootAllocator() |
| 279 | +``` |
| 280 | + |
| 281 | +## Package: `org.apache.comet.shaded.arrow.vector` |
| 282 | + |
| 283 | +### ValueVector |
| 284 | + |
| 285 | +Arrow's base vector interface (shaded). Used as return type in `CometVector.getValueVector()`. |
| 286 | + |
| 287 | +## How Iceberg Uses These APIs |
| 288 | + |
| 289 | +### Parquet File Reading Flow |
| 290 | + |
| 291 | +1. Iceberg creates a `WrappedInputFile` from its `InputFile` |
| 292 | +2. Creates `ReadOptions` via builder pattern |
| 293 | +3. Instantiates `FileReader` with the wrapped input file |
| 294 | +4. Converts Parquet `ColumnDescriptor`s to `ParquetColumnSpec`s using `CometTypeUtils` |
| 295 | +5. Calls `setRequestedSchemaFromSpecs()` to specify which columns to read |
| 296 | +6. Iterates through row groups via `readNextRowGroup()` and `skipNextRowGroup()` |
| 297 | + |
| 298 | +### Column Reading Flow |
| 299 | + |
| 300 | +1. Creates `CometSchemaImporter` with a `RootAllocator` |
| 301 | +2. Uses `Utils.getColumnReader()` to create appropriate column readers |
| 302 | +3. Calls `reset()` and `setPageReader()` for each row group |
| 303 | +4. Uses `BatchReader` to coordinate reading batches across all columns |
| 304 | +5. Retrieves results via `delegate().currentBatch()` |
| 305 | + |
| 306 | +### Metadata Columns |
| 307 | + |
| 308 | +Iceberg uses `MetadataColumnReader` subclasses for special columns: |
| 309 | + |
| 310 | +- **Delete tracking**: Uses `Native.setIsDeleted()` to mark deleted rows |
| 311 | +- **Position tracking**: Uses `Native.setPosition()` to track row positions |
| 312 | + |
| 313 | +## Compatibility Considerations |
| 314 | + |
| 315 | +When modifying these APIs, consider: |
| 316 | + |
| 317 | +1. **Constructor signatures**: Adding required parameters breaks Iceberg |
| 318 | +2. **Method signatures**: Changing return types or parameters breaks Iceberg |
| 319 | +3. **Protected fields**: `MetadataColumnReader.nativeHandle` is accessed by Iceberg subclasses |
| 320 | +4. **Shaded dependencies**: Arrow classes are shaded under `org.apache.comet.shaded` |
| 321 | + |
| 322 | +## Testing Iceberg Integration |
| 323 | + |
| 324 | +See the [Iceberg user guide](../user-guide/latest/iceberg.md) for instructions on testing |
| 325 | +Comet with Iceberg. |
0 commit comments