Skip to content

Commit e29bf65

Browse files
tokokombutrovich
andauthored
feat: pass vended credentials to Iceberg native scan (apache#3523)
* feat: pass vended credentials to Iceberg native scan * test: add negative test for credential vending * fix: address comments --------- Co-authored-by: Matt Butrovich <mbutrovich@users.noreply.github.com>
1 parent fe06c6e commit e29bf65

9 files changed

Lines changed: 249 additions & 9 deletions

File tree

pom.xml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,12 +477,43 @@ under the License.
477477
<version>${testcontainers.version}</version>
478478
<scope>test</scope>
479479
</dependency>
480+
<!--
481+
AWS SDK modules for Iceberg REST catalog + S3 tests.
482+
iceberg-spark-runtime treats the AWS SDK as provided scope, so tests
483+
that exercise Iceberg's S3FileIO (via ResolvingFileIO) must supply these.
484+
AwsProperties references all service client types in method signatures,
485+
and Java serialization introspection resolves them at class-load time.
486+
-->
480487
<dependency>
481488
<groupId>software.amazon.awssdk</groupId>
482489
<artifactId>s3</artifactId>
483490
<version>${amazon-awssdk-v2.version}</version>
484491
<scope>test</scope>
485492
</dependency>
493+
<dependency>
494+
<groupId>software.amazon.awssdk</groupId>
495+
<artifactId>sts</artifactId>
496+
<version>${amazon-awssdk-v2.version}</version>
497+
<scope>test</scope>
498+
</dependency>
499+
<dependency>
500+
<groupId>software.amazon.awssdk</groupId>
501+
<artifactId>dynamodb</artifactId>
502+
<version>${amazon-awssdk-v2.version}</version>
503+
<scope>test</scope>
504+
</dependency>
505+
<dependency>
506+
<groupId>software.amazon.awssdk</groupId>
507+
<artifactId>glue</artifactId>
508+
<version>${amazon-awssdk-v2.version}</version>
509+
<scope>test</scope>
510+
</dependency>
511+
<dependency>
512+
<groupId>software.amazon.awssdk</groupId>
513+
<artifactId>kms</artifactId>
514+
<version>${amazon-awssdk-v2.version}</version>
515+
<scope>test</scope>
516+
</dependency>
486517

487518
<dependency>
488519
<groupId>org.codehaus.jackson</groupId>

spark/pom.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,27 @@ under the License.
169169
<groupId>org.testcontainers</groupId>
170170
<artifactId>minio</artifactId>
171171
</dependency>
172+
<!-- AWS SDK modules required by Iceberg's S3FileIO (see parent pom for details) -->
172173
<dependency>
173174
<groupId>software.amazon.awssdk</groupId>
174175
<artifactId>s3</artifactId>
175176
</dependency>
177+
<dependency>
178+
<groupId>software.amazon.awssdk</groupId>
179+
<artifactId>sts</artifactId>
180+
</dependency>
181+
<dependency>
182+
<groupId>software.amazon.awssdk</groupId>
183+
<artifactId>dynamodb</artifactId>
184+
</dependency>
185+
<dependency>
186+
<groupId>software.amazon.awssdk</groupId>
187+
<artifactId>glue</artifactId>
188+
</dependency>
189+
<dependency>
190+
<groupId>software.amazon.awssdk</groupId>
191+
<artifactId>kms</artifactId>
192+
</dependency>
176193
<!-- Jetty and Iceberg dependencies for testing native Iceberg scan -->
177194
<!-- Note: The specific versions are defined in profiles below based on Spark version -->
178195
</dependencies>

spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,32 @@ object IcebergReflection extends Logging {
237237
}
238238
}
239239

240+
/**
241+
* Gets storage properties from an Iceberg table's FileIO.
242+
*
243+
* This extracts credentials from the FileIO implementation, which is critical for REST catalog
244+
* credential vending. The REST catalog returns temporary S3 credentials per-table via the
245+
* loadTable response, stored in the table's FileIO (typically ResolvingFileIO).
246+
*
247+
* The properties() method is not on the FileIO interface -- it exists on specific
248+
* implementations like ResolvingFileIO and S3FileIO. Returns None gracefully when unavailable.
249+
*/
250+
def getFileIOProperties(table: Any): Option[Map[String, String]] = {
251+
import scala.jdk.CollectionConverters._
252+
getFileIO(table).flatMap { fileIO =>
253+
findMethodInHierarchy(fileIO.getClass, "properties").flatMap { propsMethod =>
254+
propsMethod.invoke(fileIO) match {
255+
case javaMap: java.util.Map[_, _] =>
256+
val scalaMap = javaMap.asScala.collect { case (k: String, v: String) =>
257+
k -> v
258+
}.toMap
259+
if (scalaMap.nonEmpty) Some(scalaMap) else None
260+
case _ => None
261+
}
262+
}
263+
}
264+
}
265+
240266
/**
241267
* Gets the schema from an Iceberg table.
242268
*/

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflecti
4949
import org.apache.comet.objectstore.NativeConfig
5050
import org.apache.comet.parquet.{Native, SupportsComet}
5151
import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported}
52-
import org.apache.comet.serde.operator.CometNativeScan
52+
import org.apache.comet.serde.operator.{CometIcebergNativeScan, CometNativeScan}
5353
import org.apache.comet.shims.{CometTypeShim, ShimFileFormat, ShimSubqueryBroadcast}
5454

5555
/**
@@ -387,9 +387,18 @@ case class CometScanRule(session: SparkSession)
387387

388388
val hadoopS3Options = NativeConfig.extractObjectStoreOptions(hadoopConf, effectiveUri)
389389

390-
val catalogProperties =
391-
org.apache.comet.serde.operator.CometIcebergNativeScan
392-
.hadoopToIcebergS3Properties(hadoopS3Options)
390+
val hadoopDerivedProperties =
391+
CometIcebergNativeScan.hadoopToIcebergS3Properties(hadoopS3Options)
392+
393+
// Extract vended credentials from FileIO (REST catalog credential vending).
394+
// FileIO properties take precedence over Hadoop-derived properties because
395+
// they contain per-table credentials vended by the REST catalog.
396+
val fileIOProperties = tableOpt
397+
.flatMap(IcebergReflection.getFileIOProperties)
398+
.map(CometIcebergNativeScan.filterStorageProperties)
399+
.getOrElse(Map.empty)
400+
401+
val catalogProperties = hadoopDerivedProperties ++ fileIOProperties
393402

394403
val result = CometIcebergNativeScanMetadata
395404
.extract(scanExec.scan, effectiveLocation, catalogProperties)

spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,21 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit
488488
}
489489
}
490490

491+
/** Storage-related property prefixes passed through to native FileIO. */
492+
private val storagePropertyPrefixes =
493+
Seq("s3.", "gcs.", "adls.", "client.")
494+
495+
/**
496+
* Filters a properties map to only include storage-related keys. FileIO.properties() may
497+
* contain catalog URIs, bearer tokens, and other non-storage settings that should not be passed
498+
* to the native FileIO builder.
499+
*/
500+
def filterStorageProperties(props: Map[String, String]): Map[String, String] = {
501+
props.filter { case (key, _) =>
502+
storagePropertyPrefixes.exists(prefix => key.startsWith(prefix))
503+
}
504+
}
505+
491506
/**
492507
* Transforms Hadoop S3A configuration keys to Iceberg FileIO property keys.
493508
*

spark/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,14 @@ public class RESTCatalogAdapter implements RESTClient {
9898
private final SupportsNamespaces asNamespaceCatalog;
9999
private final ViewCatalog asViewCatalog;
100100

101+
// Optional credentials to inject into loadTable responses, simulating REST catalog
102+
// credential vending. When non-empty, these are added to LoadTableResponse.config().
103+
private Map<String, String> vendedCredentials = ImmutableMap.of();
104+
105+
public void setVendedCredentials(Map<String, String> credentials) {
106+
this.vendedCredentials = credentials;
107+
}
108+
101109
public RESTCatalogAdapter(Catalog catalog) {
102110
this.catalog = catalog;
103111
this.asNamespaceCatalog =
@@ -279,6 +287,26 @@ private static OAuthTokenResponse handleOAuthRequest(Object body) {
279287
@SuppressWarnings({"MethodLength", "checkstyle:CyclomaticComplexity"})
280288
public <T extends RESTResponse> T handleRequest(
281289
Route route, Map<String, String> vars, Object body, Class<T> responseType) {
290+
T response = doHandleRequest(route, vars, body, responseType);
291+
// Inject vended credentials into any LoadTableResponse, simulating REST catalog
292+
// credential vending. This covers CREATE_TABLE, LOAD_TABLE, UPDATE_TABLE, etc.
293+
if (!vendedCredentials.isEmpty() && response instanceof LoadTableResponse) {
294+
LoadTableResponse original = (LoadTableResponse) response;
295+
@SuppressWarnings("unchecked")
296+
T withCreds =
297+
(T)
298+
LoadTableResponse.builder()
299+
.withTableMetadata(original.tableMetadata())
300+
.addAllConfig(original.config())
301+
.addAllConfig(vendedCredentials)
302+
.build();
303+
return withCreds;
304+
}
305+
return response;
306+
}
307+
308+
private <T extends RESTResponse> T doHandleRequest(
309+
Route route, Map<String, String> vars, Object body, Class<T> responseType) {
282310
switch (route) {
283311
case TOKENS:
284312
return castResponse(responseType, handleOAuthRequest(body));

spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import org.apache.spark.SparkConf
2323
import org.apache.spark.sql.comet.CometIcebergNativeScanExec
2424
import org.apache.spark.sql.execution.SparkPlan
2525

26-
class IcebergReadFromS3Suite extends CometS3TestBase {
26+
import org.apache.comet.iceberg.RESTCatalogHelper
27+
28+
class IcebergReadFromS3Suite extends CometS3TestBase with RESTCatalogHelper {
2729

2830
override protected val testBucketName = "test-iceberg-bucket"
2931

@@ -227,4 +229,74 @@ class IcebergReadFromS3Suite extends CometS3TestBase {
227229

228230
spark.sql("DROP TABLE s3_catalog.db.mor_delete_test")
229231
}
232+
233+
test("REST catalog credential vending rejects wrong credentials") {
234+
assume(icebergAvailable, "Iceberg not available in classpath")
235+
236+
val wrongCreds = Map(
237+
"s3.access-key-id" -> "WRONG_ACCESS_KEY",
238+
"s3.secret-access-key" -> "WRONG_SECRET_KEY",
239+
"s3.endpoint" -> minioContainer.getS3URL,
240+
"s3.path-style-access" -> "true")
241+
val warehouse = s"s3a://$testBucketName/warehouse-bad-creds"
242+
243+
withRESTCatalog(vendedCredentials = wrongCreds, warehouseLocation = Some(warehouse)) {
244+
(restUri, _, _) =>
245+
withSQLConf(
246+
"spark.sql.catalog.bad_cat" -> "org.apache.iceberg.spark.SparkCatalog",
247+
"spark.sql.catalog.bad_cat.catalog-impl" -> "org.apache.iceberg.rest.RESTCatalog",
248+
"spark.sql.catalog.bad_cat.uri" -> restUri,
249+
"spark.sql.catalog.bad_cat.warehouse" -> warehouse) {
250+
251+
spark.sql("CREATE NAMESPACE bad_cat.db")
252+
253+
// CREATE TABLE succeeds (metadata only, no S3 access needed)
254+
spark.sql("CREATE TABLE bad_cat.db.test (id INT) USING iceberg")
255+
256+
// INSERT fails because S3FileIO uses the wrong vended credentials
257+
val e = intercept[Exception] {
258+
spark.sql("INSERT INTO bad_cat.db.test VALUES (1)")
259+
}
260+
assert(e.getMessage.contains("403"), s"Expected S3 403 error but got: ${e.getMessage}")
261+
}
262+
}
263+
}
264+
265+
test("REST catalog credential vending with native Iceberg scan on S3") {
266+
assume(icebergAvailable, "Iceberg not available in classpath")
267+
268+
val vendedCreds = Map(
269+
"s3.access-key-id" -> userName,
270+
"s3.secret-access-key" -> password,
271+
"s3.endpoint" -> minioContainer.getS3URL,
272+
"s3.path-style-access" -> "true")
273+
val warehouse = s"s3a://$testBucketName/warehouse-vending"
274+
275+
withRESTCatalog(vendedCredentials = vendedCreds, warehouseLocation = Some(warehouse)) {
276+
(restUri, _, _) =>
277+
withSQLConf(
278+
"spark.sql.catalog.vend_cat" -> "org.apache.iceberg.spark.SparkCatalog",
279+
"spark.sql.catalog.vend_cat.catalog-impl" -> "org.apache.iceberg.rest.RESTCatalog",
280+
"spark.sql.catalog.vend_cat.uri" -> restUri,
281+
"spark.sql.catalog.vend_cat.warehouse" -> warehouse,
282+
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
283+
284+
spark.sql("CREATE NAMESPACE vend_cat.db")
285+
286+
spark.sql("""
287+
CREATE TABLE vend_cat.db.simple (
288+
id INT, name STRING, value DOUBLE
289+
) USING iceberg
290+
""")
291+
spark.sql("""
292+
INSERT INTO vend_cat.db.simple
293+
VALUES (1, 'Alice', 10.5), (2, 'Bob', 20.3), (3, 'Charlie', 30.7)
294+
""")
295+
checkIcebergNativeScan("SELECT * FROM vend_cat.db.simple ORDER BY id")
296+
297+
spark.sql("DROP TABLE vend_cat.db.simple")
298+
spark.sql("DROP NAMESPACE vend_cat.db")
299+
}
300+
}
301+
}
230302
}

spark/src/test/spark-3.x/org/apache/comet/iceberg/RESTCatalogHelper.scala

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,22 @@ import java.nio.file.Files
2626
trait RESTCatalogHelper {
2727

2828
/** Helper to set up REST catalog with embedded Jetty server (Spark 3.x / Jetty 9.4) */
29-
def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = {
29+
def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit =
30+
withRESTCatalog()(f)
31+
32+
/**
33+
* Helper to set up REST catalog with optional credential vending.
34+
*
35+
* @param vendedCredentials
36+
* Storage credentials to inject into loadTable responses, simulating REST catalog credential
37+
* vending. When non-empty, these are added to every LoadTableResponse.config().
38+
* @param warehouseLocation
39+
* Override the warehouse location (e.g., for S3). Defaults to a local temp directory.
40+
*/
41+
def withRESTCatalog(
42+
vendedCredentials: Map[String, String] = Map.empty,
43+
warehouseLocation: Option[String] = None)(
44+
f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = {
3045
import org.apache.iceberg.inmemory.InMemoryCatalog
3146
import org.apache.iceberg.CatalogProperties
3247
import org.apache.iceberg.rest.{RESTCatalogAdapter, RESTCatalogServlet}
@@ -35,12 +50,18 @@ trait RESTCatalogHelper {
3550
import org.eclipse.jetty.server.handler.gzip.GzipHandler
3651

3752
val warehouseDir = Files.createTempDirectory("comet-rest-catalog-test").toFile
53+
val effectiveWarehouse = warehouseLocation.getOrElse(warehouseDir.getAbsolutePath)
54+
3855
val backendCatalog = new InMemoryCatalog()
3956
backendCatalog.initialize(
4057
"in-memory",
41-
java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, warehouseDir.getAbsolutePath))
58+
java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, effectiveWarehouse))
4259

4360
val adapter = new RESTCatalogAdapter(backendCatalog)
61+
if (vendedCredentials.nonEmpty) {
62+
import scala.jdk.CollectionConverters._
63+
adapter.setVendedCredentials(vendedCredentials.asJava)
64+
}
4465
val servlet = new RESTCatalogServlet(adapter)
4566

4667
val servletContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)

spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,22 @@ import java.nio.file.Files
2626
trait RESTCatalogHelper {
2727

2828
/** Helper to set up REST catalog with embedded Jetty server (Spark 4.0 / Jetty 11) */
29-
def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = {
29+
def withRESTCatalog(f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit =
30+
withRESTCatalog()(f)
31+
32+
/**
33+
* Helper to set up REST catalog with optional credential vending.
34+
*
35+
* @param vendedCredentials
36+
* Storage credentials to inject into loadTable responses, simulating REST catalog credential
37+
* vending. When non-empty, these are added to every LoadTableResponse.config().
38+
* @param warehouseLocation
39+
* Override the warehouse location (e.g., for S3). Defaults to a local temp directory.
40+
*/
41+
def withRESTCatalog(
42+
vendedCredentials: Map[String, String] = Map.empty,
43+
warehouseLocation: Option[String] = None)(
44+
f: (String, org.eclipse.jetty.server.Server, File) => Unit): Unit = {
3045
import org.apache.iceberg.inmemory.InMemoryCatalog
3146
import org.apache.iceberg.CatalogProperties
3247
import org.apache.iceberg.rest.{RESTCatalogAdapter, RESTCatalogServlet}
@@ -35,12 +50,18 @@ trait RESTCatalogHelper {
3550
import org.eclipse.jetty.server.handler.gzip.GzipHandler
3651

3752
val warehouseDir = Files.createTempDirectory("comet-rest-catalog-test").toFile
53+
val effectiveWarehouse = warehouseLocation.getOrElse(warehouseDir.getAbsolutePath)
54+
3855
val backendCatalog = new InMemoryCatalog()
3956
backendCatalog.initialize(
4057
"in-memory",
41-
java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, warehouseDir.getAbsolutePath))
58+
java.util.Map.of(CatalogProperties.WAREHOUSE_LOCATION, effectiveWarehouse))
4259

4360
val adapter = new RESTCatalogAdapter(backendCatalog)
61+
if (vendedCredentials.nonEmpty) {
62+
import scala.jdk.CollectionConverters._
63+
adapter.setVendedCredentials(vendedCredentials.asJava)
64+
}
4465
val servlet = new RESTCatalogServlet(adapter)
4566

4667
val servletContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)

0 commit comments

Comments
 (0)