Skip to content

Commit 76e194a

Browse files
[FLINK-39530][s3] Replace S3 compatible auto-detection with explicit chunked-encoding and checksum-validation config options
1 parent a014737 commit 76e194a

3 files changed

Lines changed: 310 additions & 471 deletions

File tree

flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,22 @@ public class NativeS3FileSystemFactory implements FileSystemFactory {
9090
.withFallbackKeys("s3.path.style.access")
9191
.withDescription("Use path-style access for S3 (for S3-compatible storage)");
9292

93+
public static final ConfigOption<Boolean> CHUNKED_ENCODING_ENABLED =
94+
ConfigOptions.key("s3.chunked-encoding.enabled")
95+
.booleanType()
96+
.defaultValue(true)
97+
.withDescription(
98+
"Enable chunked encoding for S3 requests. "
99+
+ "Disable for S3-compatible servers that do not support chunked encoding.");
100+
101+
public static final ConfigOption<Boolean> CHECKSUM_VALIDATION_ENABLED =
102+
ConfigOptions.key("s3.checksum-validation.enabled")
103+
.booleanType()
104+
.defaultValue(true)
105+
.withDescription(
106+
"Enable checksum validation for S3 requests. "
107+
+ "Disable for S3-compatible servers that do not support checksum validation.");
108+
93109
public static final ConfigOption<Long> PART_UPLOAD_MIN_SIZE =
94110
ConfigOptions.key("s3.upload.min.part.size")
95111
.longType()
@@ -295,10 +311,6 @@ public FileSystem create(URI fsUri) throws IOException {
295311
String endpoint = config.get(ENDPOINT);
296312
boolean pathStyleAccess = config.get(PATH_STYLE_ACCESS);
297313

298-
if (endpoint != null && !pathStyleAccess) {
299-
pathStyleAccess = true;
300-
}
301-
302314
S3EncryptionConfig encryptionConfig =
303315
S3EncryptionConfig.fromConfig(config.get(SSE_TYPE), config.get(SSE_KMS_KEY_ID));
304316
String entropyInjectionKey = config.get(ENTROPY_INJECT_KEY_OPTION);
@@ -371,6 +383,8 @@ public FileSystem create(URI fsUri) throws IOException {
371383
.region(region)
372384
.endpoint(endpoint)
373385
.pathStyleAccess(pathStyleAccess)
386+
.chunkedEncoding(config.get(CHUNKED_ENCODING_ENABLED))
387+
.checksumValidation(config.get(CHECKSUM_VALIDATION_ENABLED))
374388
.maxConnections(maxConnections)
375389
.connectionTimeout(config.get(CONNECTION_TIMEOUT))
376390
.socketTimeout(config.get(SOCKET_TIMEOUT))

flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java

Lines changed: 65 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ class S3ClientProvider implements AutoCloseableAsync {
8080
private final Duration connectionTimeout;
8181
private final Duration socketTimeout;
8282
private final Duration connectionMaxIdleTime;
83+
private final boolean pathStyleAccess;
84+
private final boolean chunkedEncoding;
85+
private final boolean checksumValidation;
86+
private final int maxConnections;
87+
private final int maxRetries;
8388
private final AtomicBoolean closed = new AtomicBoolean(false);
8489

8590
private S3ClientProvider(
@@ -91,7 +96,12 @@ private S3ClientProvider(
9196
Duration clientCloseTimeout,
9297
Duration connectionTimeout,
9398
Duration socketTimeout,
94-
Duration connectionMaxIdleTime) {
99+
Duration connectionMaxIdleTime,
100+
boolean pathStyleAccess,
101+
boolean chunkedEncoding,
102+
boolean checksumValidation,
103+
int maxConnections,
104+
int maxRetries) {
95105
this.s3Client = s3Client;
96106
this.transferManager = transferManager;
97107
this.encryptionConfig =
@@ -102,6 +112,11 @@ private S3ClientProvider(
102112
this.connectionTimeout = connectionTimeout;
103113
this.socketTimeout = socketTimeout;
104114
this.connectionMaxIdleTime = connectionMaxIdleTime;
115+
this.pathStyleAccess = pathStyleAccess;
116+
this.chunkedEncoding = chunkedEncoding;
117+
this.checksumValidation = checksumValidation;
118+
this.maxConnections = maxConnections;
119+
this.maxRetries = maxRetries;
105120
}
106121

107122
public S3Client getS3Client() {
@@ -144,6 +159,31 @@ Duration getConnectionMaxIdleTime() {
144159
return connectionMaxIdleTime;
145160
}
146161

162+
@VisibleForTesting
163+
boolean isPathStyleAccess() {
164+
return pathStyleAccess;
165+
}
166+
167+
@VisibleForTesting
168+
boolean isChunkedEncoding() {
169+
return chunkedEncoding;
170+
}
171+
172+
@VisibleForTesting
173+
boolean isChecksumValidation() {
174+
return checksumValidation;
175+
}
176+
177+
@VisibleForTesting
178+
int getMaxConnections() {
179+
return maxConnections;
180+
}
181+
182+
@VisibleForTesting
183+
int getMaxRetries() {
184+
return maxRetries;
185+
}
186+
147187
@Override
148188
public CompletableFuture<Void> closeAsync() {
149189
if (!closed.compareAndSet(false, true)) {
@@ -204,11 +244,12 @@ public static class Builder {
204244
private String region;
205245
private String endpoint;
206246
private boolean pathStyleAccess = false;
247+
private boolean chunkedEncoding = true;
248+
private boolean checksumValidation = true;
207249
private int maxConnections = 50;
208250
private Duration connectionTimeout = Duration.ofSeconds(60);
209251
private Duration socketTimeout = Duration.ofSeconds(60);
210252
private Duration connectionMaxIdleTime = Duration.ofSeconds(60);
211-
private boolean disableCertCheck = false;
212253
private int maxRetries = 3;
213254
private Duration clientCloseTimeout = Duration.ofSeconds(30);
214255

@@ -249,6 +290,16 @@ public Builder pathStyleAccess(boolean pathStyleAccess) {
249290
return this;
250291
}
251292

293+
public Builder chunkedEncoding(boolean chunkedEncoding) {
294+
this.chunkedEncoding = chunkedEncoding;
295+
return this;
296+
}
297+
298+
public Builder checksumValidation(boolean checksumValidation) {
299+
this.checksumValidation = checksumValidation;
300+
return this;
301+
}
302+
252303
public Builder maxConnections(int maxConnections) {
253304
this.maxConnections = maxConnections;
254305
return this;
@@ -274,11 +325,6 @@ public Builder clientCloseTimeout(Duration clientCloseTimeout) {
274325
return this;
275326
}
276327

277-
public Builder disableCertCheck(boolean disableCertCheck) {
278-
this.disableCertCheck = disableCertCheck;
279-
return this;
280-
}
281-
282328
public Builder maxRetries(int maxRetries) {
283329
this.maxRetries = maxRetries;
284330
return this;
@@ -326,14 +372,6 @@ public S3ClientProvider build() {
326372
}
327373

328374
URI endpointUri = (endpoint != null) ? URI.create(endpoint) : null;
329-
boolean isS3Compatible = endpointUri != null;
330-
331-
if (isS3Compatible && !pathStyleAccess) {
332-
pathStyleAccess = true;
333-
}
334-
if (isS3Compatible && "http".equalsIgnoreCase(endpointUri.getScheme())) {
335-
disableCertCheck = true;
336-
}
337375

338376
Region awsRegion = resolveRegion(region);
339377
StsClient stsClient = null;
@@ -347,12 +385,12 @@ public S3ClientProvider build() {
347385
credentialsProvider = baseProvider;
348386
}
349387

350-
S3Configuration.Builder s3ConfigBuilder =
351-
S3Configuration.builder().pathStyleAccessEnabled(pathStyleAccess);
352-
if (isS3Compatible) {
353-
s3ConfigBuilder.chunkedEncodingEnabled(false).checksumValidationEnabled(false);
354-
}
355-
S3Configuration s3Config = s3ConfigBuilder.build();
388+
S3Configuration s3Config =
389+
S3Configuration.builder()
390+
.pathStyleAccessEnabled(pathStyleAccess)
391+
.chunkedEncodingEnabled(chunkedEncoding)
392+
.checksumValidationEnabled(checksumValidation)
393+
.build();
356394

357395
ClientOverrideConfiguration overrideConfig =
358396
ClientOverrideConfiguration.builder()
@@ -406,7 +444,12 @@ public S3ClientProvider build() {
406444
clientCloseTimeout,
407445
connectionTimeout,
408446
socketTimeout,
409-
connectionMaxIdleTime);
447+
connectionMaxIdleTime,
448+
pathStyleAccess,
449+
chunkedEncoding,
450+
checksumValidation,
451+
maxConnections,
452+
maxRetries);
410453
}
411454

412455
private AwsCredentialsProvider buildBaseCredentialsProvider() {

0 commit comments

Comments
 (0)