Skip to content

Commit 9addd1a

Browse files
committed
[FLINK-39166] Add bucket-level configuration parity in Native S3 FileSystem
1 parent 8b71634 commit 9addd1a

File tree

7 files changed

+1255
-37
lines changed

7 files changed

+1255
-37
lines changed

flink-filesystems/flink-s3-fs-native/README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,29 @@ input.sinkTo(FileSink.forRowFormat(new Path("s3://my-bucket/output"),
8989
| s3.assume-role.session-name | flink-s3-session | Session name for the assumed role |
9090
| s3.assume-role.session-duration | 3600 | Session duration in seconds (900-43200) |
9191

92+
## Bucket-Level Configuration
93+
94+
The Native S3 FileSystem supports per-bucket configuration overrides, allowing different S3 buckets to use different connection settings within the same Flink cluster. This enables scenarios like:
95+
96+
- **Checkpointing to one bucket** with specific credentials
97+
- **Savepoints to another bucket** with different region/endpoint
98+
- **Data sinks to third-party buckets** with cross-account IAM roles
99+
100+
### Format
101+
102+
Bucket-level configuration uses the format: `s3.bucket.<bucket-name>.<property>`
103+
104+
Bucket names containing dots (e.g., `my.company.data`) are fully supported through longest-suffix matching.
105+
106+
### Supported Properties
107+
108+
All global S3 configuration properties can be overridden at the bucket level:
109+
110+
- **Connection:** `region`, `endpoint`, `path-style-access`
111+
- **Credentials:** `access-key`, `secret-key`, `credentials.provider`
112+
- **Encryption:** `sse.type`, `sse.kms-key-id`
113+
- **IAM Assume Role:** `assume-role.arn`, `assume-role.external-id`, `assume-role.session-name`, `assume-role.session-duration`
114+
92115
## Server-Side Encryption (SSE)
93116

94117
The filesystem supports server-side encryption for data at rest:
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.fs.s3native;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.annotation.VisibleForTesting;
23+
import org.apache.flink.configuration.Configuration;
24+
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import javax.annotation.Nullable;
29+
30+
import java.util.Collections;
31+
import java.util.HashMap;
32+
import java.util.Map;
33+
34+
/**
35+
* Parses bucket-specific S3 configuration using format {@code s3.bucket.<bucket-name>.<property>}.
36+
*
37+
* <p>Enables per-bucket overrides for endpoints, credentials, encryption, and IAM roles. Bucket
38+
* names containing dots are supported; properties are matched by longest suffix first.
39+
*
40+
* <p>Immutable and thread-safe after construction.
41+
*/
42+
@Internal
43+
final class BucketConfigProvider {
44+
45+
private static final Logger LOG = LoggerFactory.getLogger(BucketConfigProvider.class);
46+
47+
static final String BUCKET_CONFIG_PREFIX = "s3.bucket.";
48+
49+
/**
50+
* Known bucket-level properties, sorted by descending length so that the longest match wins.
51+
*/
52+
private static final String[] KNOWN_PROPERTIES =
53+
new String[] {
54+
"assume-role.session-duration",
55+
"assume-role.session-name",
56+
"assume-role.external-id",
57+
"credentials.provider",
58+
"path-style-access",
59+
"sse.kms-key-id",
60+
"assume-role.arn",
61+
"access-key",
62+
"secret-key",
63+
"sse.type",
64+
"endpoint",
65+
"region"
66+
};
67+
68+
private final Map<String, S3BucketConfig> bucketConfigs;
69+
70+
BucketConfigProvider(Configuration flinkConfig) {
71+
this.bucketConfigs = Collections.unmodifiableMap(parseBucketConfigs(flinkConfig));
72+
}
73+
74+
@Nullable
75+
S3BucketConfig getBucketConfig(String bucketName) {
76+
return bucketConfigs.get(bucketName);
77+
}
78+
79+
@VisibleForTesting
80+
boolean hasBucketConfig(String bucketName) {
81+
return bucketConfigs.containsKey(bucketName);
82+
}
83+
84+
@VisibleForTesting
85+
int size() {
86+
return bucketConfigs.size();
87+
}
88+
89+
private static Map<String, S3BucketConfig> parseBucketConfigs(Configuration flinkConfig) {
90+
Map<String, Map<String, String>> rawConfigs = new HashMap<>();
91+
92+
for (String key : flinkConfig.keySet()) {
93+
if (!key.startsWith(BUCKET_CONFIG_PREFIX)) {
94+
continue;
95+
}
96+
String suffix = key.substring(BUCKET_CONFIG_PREFIX.length());
97+
String value = flinkConfig.getString(key, null);
98+
if (value == null) {
99+
continue;
100+
}
101+
102+
for (String prop : KNOWN_PROPERTIES) {
103+
if (suffix.endsWith("." + prop)) {
104+
String bucketName = suffix.substring(0, suffix.length() - prop.length() - 1);
105+
if (!bucketName.isEmpty()) {
106+
rawConfigs
107+
.computeIfAbsent(bucketName, k -> new HashMap<>())
108+
.put(prop, value);
109+
}
110+
break;
111+
}
112+
}
113+
}
114+
115+
Map<String, S3BucketConfig> result = new HashMap<>();
116+
for (Map.Entry<String, Map<String, String>> entry : rawConfigs.entrySet()) {
117+
String bucketName = entry.getKey();
118+
Map<String, String> props = entry.getValue();
119+
120+
S3BucketConfig bucketConfig = buildBucketConfig(bucketName, props);
121+
if (bucketConfig.hasAnyOverride()) {
122+
result.put(bucketName, bucketConfig);
123+
LOG.info(
124+
"Registered bucket-specific configuration for bucket '{}': {}",
125+
bucketName,
126+
bucketConfig);
127+
}
128+
}
129+
130+
return result;
131+
}
132+
133+
private static S3BucketConfig buildBucketConfig(String bucketName, Map<String, String> props) {
134+
S3BucketConfig.Builder builder = S3BucketConfig.builder(bucketName);
135+
136+
applyIfPresent(props, "region", builder::region);
137+
applyIfPresent(props, "endpoint", builder::endpoint);
138+
applyIfPresent(props, "access-key", builder::accessKey);
139+
applyIfPresent(props, "secret-key", builder::secretKey);
140+
applyIfPresent(props, "sse.type", builder::sseType);
141+
applyIfPresent(props, "sse.kms-key-id", builder::sseKmsKeyId);
142+
applyIfPresent(props, "assume-role.arn", builder::assumeRoleArn);
143+
applyIfPresent(props, "assume-role.external-id", builder::assumeRoleExternalId);
144+
applyIfPresent(props, "assume-role.session-name", builder::assumeRoleSessionName);
145+
applyIfPresent(props, "credentials.provider", builder::credentialsProvider);
146+
147+
String pathStyleStr = props.get("path-style-access");
148+
if (pathStyleStr != null) {
149+
builder.pathStyleAccess(Boolean.parseBoolean(pathStyleStr));
150+
}
151+
152+
String durationStr = props.get("assume-role.session-duration");
153+
if (durationStr != null) {
154+
try {
155+
builder.assumeRoleSessionDurationSeconds(Integer.parseInt(durationStr));
156+
} catch (NumberFormatException e) {
157+
throw new IllegalConfigurationException(
158+
String.format(
159+
"Invalid assume-role.session-duration '%s' for bucket '%s'. "
160+
+ "Must be a valid integer (e.g., 3600)",
161+
durationStr,
162+
bucketName),
163+
e);
164+
}
165+
}
166+
167+
return builder.build();
168+
}
169+
170+
private static void applyIfPresent(
171+
Map<String, String> props, String key, java.util.function.Consumer<String> setter) {
172+
String value = props.get(key);
173+
if (value != null) {
174+
setter.accept(value);
175+
}
176+
}
177+
}

0 commit comments

Comments
 (0)