Skip to content

Commit ceeb9bc

Browse files
authored
[feat](Iceberg)Rest & S3Table Support Iam-role (apache#60498)
FYI apache#59893 The initial implementation was contributed by https://github.com/Sbaia . Many thanks for his contributions.
1 parent accd8a0 commit ceeb9bc

13 files changed

Lines changed: 1028 additions & 80 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/s3tables/CustomAwsCredentialsProvider.java

Lines changed: 0 additions & 43 deletions
This file was deleted.

fe/fe-core/src/main/java/org/apache/doris/datasource/property/common/AwsCredentialsProviderFactory.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,4 +177,32 @@ public static String getV2ClassName(AwsCredentialsProviderMode mode, boolean inc
177177
"AWS SDK V2 does not support credentials provider mode: " + mode);
178178
}
179179
}
180+
181+
/**
182+
* Get the AWS credentials provider class name.
183+
* For DEFAULT mode, returns AWS SDK native DefaultCredentialsProvider.
184+
* For other modes, returns the specific provider class name.
185+
*/
186+
public static String getV2ClassName(AwsCredentialsProviderMode mode) {
187+
switch (mode) {
188+
case ENV:
189+
return EnvironmentVariableCredentialsProvider.class.getName();
190+
case SYSTEM_PROPERTIES:
191+
return SystemPropertyCredentialsProvider.class.getName();
192+
case WEB_IDENTITY:
193+
return WebIdentityTokenFileCredentialsProvider.class.getName();
194+
case CONTAINER:
195+
return ContainerCredentialsProvider.class.getName();
196+
case INSTANCE_PROFILE:
197+
return InstanceProfileCredentialsProvider.class.getName();
198+
case ANONYMOUS:
199+
return AnonymousCredentialsProvider.class.getName();
200+
case DEFAULT:
201+
// For Iceberg REST, use AWS SDK native DefaultCredentialsProvider
202+
return "software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider";
203+
default:
204+
throw new UnsupportedOperationException(
205+
"AWS SDK V2 does not support credentials provider mode: " + mode);
206+
}
207+
}
180208
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.property.common;
19+
20+
import org.apache.doris.datasource.property.storage.S3Properties;
21+
22+
import org.apache.commons.lang3.StringUtils;
23+
import org.apache.iceberg.aws.AssumeRoleAwsClientFactory;
24+
import org.apache.iceberg.aws.AwsProperties;
25+
26+
import java.util.Map;
27+
28+
/**
29+
* Shared util for putting Iceberg AWS assume-role properties into a map.
30+
* Used by Iceberg REST (glue/s3tables signing), S3 Tables catalog, and S3 FileIO.
31+
*/
32+
public final class IcebergAwsAssumeRoleProperties {
33+
34+
private IcebergAwsAssumeRoleProperties() {}
35+
36+
/**
37+
* Puts assume-role related Iceberg client properties into the target map when roleArn is present.
38+
* No-op if roleArn is blank.
39+
*/
40+
public static void putAssumeRoleProperties(Map<String, String> target, S3Properties s3Properties) {
41+
if (StringUtils.isBlank(s3Properties.getS3IAMRole())) {
42+
return;
43+
}
44+
target.put(AwsProperties.CLIENT_FACTORY, AssumeRoleAwsClientFactory.class.getName());
45+
target.put("aws.region", s3Properties.getRegion());
46+
target.put(AwsProperties.CLIENT_ASSUME_ROLE_REGION, s3Properties.getRegion());
47+
target.put(AwsProperties.CLIENT_ASSUME_ROLE_ARN, s3Properties.getS3IAMRole());
48+
if (StringUtils.isNotBlank(s3Properties.getS3ExternalId())) {
49+
target.put(AwsProperties.CLIENT_ASSUME_ROLE_EXTERNAL_ID, s3Properties.getS3ExternalId());
50+
}
51+
}
52+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.property.common;
19+
20+
import org.apache.doris.datasource.property.storage.S3Properties;
21+
22+
import org.apache.commons.lang3.StringUtils;
23+
import org.apache.iceberg.aws.AwsClientProperties;
24+
import org.apache.iceberg.aws.AwsProperties;
25+
import org.apache.iceberg.aws.s3.S3FileIOProperties;
26+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
27+
28+
import java.util.Map;
29+
30+
public final class IcebergAwsClientCredentialsProperties {
31+
32+
private IcebergAwsClientCredentialsProperties() {}
33+
34+
public static void putCredentialProviderProperties(Map<String, String> target, S3Properties s3Properties) {
35+
switch (getCredentialType(s3Properties)) {
36+
case EXPLICIT:
37+
putExplicitRestCredentials(target, s3Properties.getAccessKey(), s3Properties.getSecretKey(),
38+
s3Properties.getSessionToken());
39+
return;
40+
case ASSUME_ROLE:
41+
IcebergAwsAssumeRoleProperties.putAssumeRoleProperties(target, s3Properties);
42+
return;
43+
case PROVIDER_CHAIN:
44+
putCredentialsProvider(target, s3Properties.getAwsCredentialsProviderMode());
45+
return;
46+
default:
47+
throw new IllegalStateException("Unsupported Iceberg AWS credential type");
48+
}
49+
}
50+
51+
public static void putCredentialProviderProperties(Map<String, String> target,
52+
String accessKey, String secretKey, String sessionToken, AwsCredentialsProviderMode providerMode) {
53+
if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) {
54+
putExplicitRestCredentials(target, accessKey, secretKey, sessionToken);
55+
return;
56+
}
57+
putCredentialsProvider(target, providerMode);
58+
}
59+
60+
public static void putS3FileIOCredentialProperties(Map<String, String> target,
61+
S3Properties s3Properties) {
62+
putS3FileIOProperties(target, s3Properties);
63+
switch (getCredentialType(s3Properties)) {
64+
case EXPLICIT:
65+
return;
66+
case ASSUME_ROLE:
67+
IcebergAwsAssumeRoleProperties.putAssumeRoleProperties(target, s3Properties);
68+
return;
69+
case PROVIDER_CHAIN:
70+
putCredentialsProvider(target, s3Properties.getAwsCredentialsProviderMode());
71+
return;
72+
default:
73+
throw new IllegalStateException("Unsupported Iceberg AWS credential type");
74+
}
75+
}
76+
77+
public static AwsCredentialsProvider createAwsCredentialsProvider(S3Properties s3Properties,
78+
boolean includeAnonymousInDefault) {
79+
switch (getCredentialType(s3Properties)) {
80+
case EXPLICIT:
81+
case ASSUME_ROLE:
82+
return s3Properties.getAwsCredentialsProvider();
83+
case PROVIDER_CHAIN:
84+
return AwsCredentialsProviderFactory.createV2(
85+
s3Properties.getAwsCredentialsProviderMode(), includeAnonymousInDefault);
86+
default:
87+
throw new IllegalStateException("Unsupported Iceberg AWS credential type");
88+
}
89+
}
90+
91+
private static CredentialType getCredentialType(S3Properties s3Properties) {
92+
if (StringUtils.isNotBlank(s3Properties.getAccessKey())
93+
&& StringUtils.isNotBlank(s3Properties.getSecretKey())) {
94+
return CredentialType.EXPLICIT;
95+
}
96+
if (StringUtils.isNotBlank(s3Properties.getS3IAMRole())) {
97+
return CredentialType.ASSUME_ROLE;
98+
}
99+
return CredentialType.PROVIDER_CHAIN;
100+
}
101+
102+
private static void putExplicitRestCredentials(Map<String, String> target,
103+
String accessKey, String secretKey, String sessionToken) {
104+
target.put(AwsProperties.REST_ACCESS_KEY_ID, accessKey);
105+
target.put(AwsProperties.REST_SECRET_ACCESS_KEY, secretKey);
106+
if (StringUtils.isNotBlank(sessionToken)) {
107+
target.put(AwsProperties.REST_SESSION_TOKEN, sessionToken);
108+
}
109+
}
110+
111+
private static void putS3FileIOProperties(Map<String, String> target,
112+
S3Properties s3Properties) {
113+
if (StringUtils.isNotBlank(s3Properties.getEndpoint())) {
114+
target.put(S3FileIOProperties.ENDPOINT, s3Properties.getEndpoint());
115+
}
116+
if (StringUtils.isNotBlank(s3Properties.getUsePathStyle())) {
117+
target.put(S3FileIOProperties.PATH_STYLE_ACCESS, s3Properties.getUsePathStyle());
118+
}
119+
if (StringUtils.isNotBlank(s3Properties.getAccessKey())) {
120+
target.put(S3FileIOProperties.ACCESS_KEY_ID, s3Properties.getAccessKey());
121+
}
122+
if (StringUtils.isNotBlank(s3Properties.getSecretKey())) {
123+
target.put(S3FileIOProperties.SECRET_ACCESS_KEY, s3Properties.getSecretKey());
124+
}
125+
if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
126+
target.put(S3FileIOProperties.SESSION_TOKEN, s3Properties.getSessionToken());
127+
}
128+
}
129+
130+
private static void putCredentialsProvider(Map<String, String> target,
131+
AwsCredentialsProviderMode providerMode) {
132+
if (providerMode == null || providerMode == AwsCredentialsProviderMode.DEFAULT) {
133+
return;
134+
}
135+
target.put(AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER,
136+
AwsCredentialsProviderFactory.getV2ClassName(providerMode));
137+
}
138+
139+
private enum CredentialType {
140+
EXPLICIT,
141+
ASSUME_ROLE,
142+
PROVIDER_CHAIN
143+
}
144+
}

fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
2121
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
2222
import org.apache.doris.datasource.metacache.CacheSpec;
23+
import org.apache.doris.datasource.property.common.IcebergAwsAssumeRoleProperties;
2324
import org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
2425
import org.apache.doris.datasource.property.storage.S3Properties;
2526
import org.apache.doris.datasource.property.storage.StorageProperties;
@@ -266,6 +267,10 @@ private void toS3FileIOProperties(AbstractS3CompatibleProperties s3Properties, M
266267
if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
267268
options.put(S3FileIOProperties.SESSION_TOKEN, s3Properties.getSessionToken());
268269
}
270+
if (s3Properties instanceof S3Properties) {
271+
S3Properties awsProperties = (S3Properties) s3Properties;
272+
IcebergAwsAssumeRoleProperties.putAssumeRoleProperties(options, awsProperties);
273+
}
269274
}
270275

271276
protected Catalog buildIcebergCatalog(String catalogName, Map<String, String> options, Configuration conf) {

0 commit comments

Comments
 (0)