Skip to content

Commit a5f73bc

Browse files
Enable parallel fetching of property sources in AwsS3EnvironmentRepository
This change introduces an optimization to fetch configuration files from S3 concurrently using CompletableFuture. It includes a new 'pool-size' property to configure the level of parallelism, while maintaining the original precedence order and providing backward-compatible sequential execution by default. Fixes gh-3222 Signed-off-by: Noah Hanka <noah.hanka@veeva.com>
1 parent 236a0d7 commit a5f73bc

4 files changed

Lines changed: 173 additions & 59 deletions

File tree

spring-cloud-config-server/src/main/java/org/springframework/cloud/config/server/environment/AwsS3EnvironmentProperties.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
/**
2323
* @author Clay McCoy
24+
* @author Noah Hanka
2425
*/
2526
@ConfigurationProperties("spring.cloud.config.server.awss3")
2627
public class AwsS3EnvironmentProperties implements EnvironmentRepositoryProperties {
@@ -41,11 +42,17 @@ public class AwsS3EnvironmentProperties implements EnvironmentRepositoryProperti
4142
private String bucket;
4243

4344
/**
44-
* Use application name as intermediate directory. Analogous to `searchPaths:
45-
* {application}` from Git backend.
45+
* Use application name as intermediate directory. Analogous to \`searchPaths:
46+
* {application}\` from Git backend.
4647
*/
4748
private boolean useDirectoryLayout;
4849

50+
/**
51+
* Thread pool size for fetching properties from S3 in parallel. If set to 0 or less,
52+
* fetching will be sequential.
53+
*/
54+
private int poolSize = 0;
55+
4956
private int order = DEFAULT_ORDER;
5057

5158
public String getRegion() {
@@ -80,6 +87,14 @@ public void setUseDirectoryLayout(boolean useDirectoryLayout) {
8087
this.useDirectoryLayout = useDirectoryLayout;
8188
}
8289

90+
public int getPoolSize() {
91+
return poolSize;
92+
}
93+
94+
public void setPoolSize(int poolSize) {
95+
this.poolSize = poolSize;
96+
}
97+
8398
public int getOrder() {
8499
return order;
85100
}

spring-cloud-config-server/src/main/java/org/springframework/cloud/config/server/environment/AwsS3EnvironmentRepository.java

Lines changed: 58 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import java.util.Collections;
2323
import java.util.List;
2424
import java.util.Properties;
25-
import java.util.function.Consumer;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.Executor;
27+
import java.util.stream.Collectors;
2628

2729
import org.apache.commons.logging.Log;
2830
import org.apache.commons.logging.LogFactory;
@@ -45,6 +47,7 @@
4547

4648
/**
4749
* @author Clay McCoy
50+
* @author Noah Hanka
4851
* @author Scott Frederick
4952
* @author Daniel Aiken
5053
*/
@@ -64,18 +67,26 @@ public class AwsS3EnvironmentRepository implements EnvironmentRepository, Ordere
6467

6568
private final boolean useApplicationAsDirectory;
6669

70+
private final Executor executor;
71+
6772
protected int order = Ordered.LOWEST_PRECEDENCE;
6873

6974
public AwsS3EnvironmentRepository(S3Client s3Client, String bucketName, ConfigServerProperties server) {
70-
this(s3Client, bucketName, false, server);
75+
this(s3Client, bucketName, false, server, null);
7176
}
7277

7378
public AwsS3EnvironmentRepository(S3Client s3Client, String bucketName, boolean useApplicationAsDirectory,
7479
ConfigServerProperties server) {
80+
this(s3Client, bucketName, useApplicationAsDirectory, server, null);
81+
}
82+
83+
public AwsS3EnvironmentRepository(S3Client s3Client, String bucketName, boolean useApplicationAsDirectory,
84+
ConfigServerProperties server, Executor executor) {
7585
this.s3Client = s3Client;
7686
this.bucketName = bucketName;
7787
this.serverProperties = server;
7888
this.useApplicationAsDirectory = useApplicationAsDirectory;
89+
this.executor = (executor != null) ? executor : Runnable::run;
7990
}
8091

8192
@Override
@@ -126,77 +137,70 @@ public Environment findOne(String specifiedApplication, String specifiedProfiles
126137

127138
private void addPropertySources(Environment environment, List<String> apps, String[] profiles,
128139
List<String> labels) {
140+
List<S3ConfigFile> allConfigs = new ArrayList<>();
129141
for (String label : labels) {
130142
// If we have profiles, add property sources with those profiles
131143
for (String profile : profiles) {
132-
addPropertySourcesForApps(apps,
133-
app -> addProfileSpecificPropertySource(environment, app, profile, label));
144+
apps.forEach(app -> allConfigs.addAll(getProfileSpecificS3ConfigFiles(app, profile, label)));
134145
}
135146
}
136147

137148
// If we have no profiles just add property sources for all apps
138149
if (profiles.length == 0) {
139150
for (String label : labels) {
140-
addPropertySourcesForApps(apps,
141-
app -> addNonProfileSpecificPropertySource(environment, app, null, label));
151+
apps.forEach(app -> allConfigs.addAll(getNonProfileSpecificS3ConfigFiles(app, null, label)));
142152
}
143153
}
144154
else {
145155
for (String label : labels) {
146156
// If we have profiles, we still need to add property sources from files
147-
// that
148-
// are not profile specific but we pass
149-
// along the profiles as well so we can check if any non-profile specific
150-
// YAML
151-
// files have profile specific documents
152-
// within them
157+
// that are not profile specific but we pass along the profiles as well
158+
// so we can check if any non-profile specific YAML files have profile
159+
// specific documents within them
153160
for (String profile : profiles) {
154-
addPropertySourcesForApps(apps,
155-
app -> addNonProfileSpecificPropertySource(environment, app, profile, label));
161+
apps.forEach(app -> allConfigs.addAll(getNonProfileSpecificS3ConfigFiles(app, profile, label)));
156162
}
157163
}
158164
}
159-
}
160165

161-
private void addPropertySourcesForApps(List<String> apps, Consumer<String> addPropertySource) {
162-
apps.forEach(addPropertySource);
166+
List<CompletableFuture<Void>> futures = allConfigs.stream()
167+
.map(config -> CompletableFuture.runAsync(config::read, executor))
168+
.collect(Collectors.toList());
169+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
170+
171+
for (S3ConfigFile s3ConfigFile : allConfigs) {
172+
addPropertySource(environment, s3ConfigFile);
173+
}
163174
}
164175

165-
private void addProfileSpecificPropertySource(Environment environment, String app, String profile, String label) {
166-
List<S3ConfigFile> s3ConfigFiles = getS3ConfigFile(app, profile, label, this::getS3PropertiesOrJsonConfigFile,
176+
private List<S3ConfigFile> getProfileSpecificS3ConfigFiles(String app, String profile, String label) {
177+
return getS3ConfigFile(app, profile, label, this::getS3PropertiesOrJsonConfigFile,
167178
this::getProfileSpecificS3ConfigFileYaml);
168-
addPropertySource(environment, s3ConfigFiles);
169179
}
170180

171-
private void addNonProfileSpecificPropertySource(Environment environment, String app, String profile,
172-
String label) {
173-
List<S3ConfigFile> s3ConfigFiles = getS3ConfigFile(app, profile, label,
174-
this::getNonProfileSpecificPropertiesOrJsonConfigFile, this::getNonProfileSpecificS3ConfigFileYaml);
175-
addPropertySource(environment, s3ConfigFiles);
176-
}
177-
178-
private void addPropertySource(Environment environment, List<S3ConfigFile> s3ConfigFiles) {
179-
for (S3ConfigFile s3ConfigFile : s3ConfigFiles) {
180-
final Properties config = s3ConfigFile.read();
181-
// This logic handles the case where the s3 file is a YAML file that is
182-
// not profile specific (ie it does not have -<profile> in the name)
183-
// and does not have any profile specific documents in it. In this case we do
184-
// not want to include this
185-
// property source we only want to include the document for the default
186-
// profile. When we create
187-
// the S3ConfigFile for this file we set the
188-
// shouldIncludeWithEmptyProperties to false
189-
// in ProfileSpecificYamlDocumentS3ConfigFile for this specific case.
190-
if (config != null) {
191-
if (!config.isEmpty() || s3ConfigFile.isShouldIncludeWithEmptyProperties()) {
192-
environment.setVersion(s3ConfigFile.getVersion());
193-
config.putAll(serverProperties.getOverrides());
194-
PropertySource propertySource = new PropertySource(s3ConfigFile.getName(), config);
195-
if (LOG.isDebugEnabled()) {
196-
LOG.debug("Adding property source to environment " + propertySource);
197-
}
198-
environment.add(propertySource);
181+
private List<S3ConfigFile> getNonProfileSpecificS3ConfigFiles(String app, String profile, String label) {
182+
return getS3ConfigFile(app, profile, label, this::getNonProfileSpecificPropertiesOrJsonConfigFile,
183+
this::getNonProfileSpecificS3ConfigFileYaml);
184+
}
185+
186+
private void addPropertySource(Environment environment, S3ConfigFile s3ConfigFile) {
187+
final Properties config = s3ConfigFile.read();
188+
// This logic handles the case where the s3 file is a YAML file that is
189+
// not profile specific (ie it does not have -<profile> in the name)
190+
// and does not have any profile specific documents in it. In this case we do
191+
// not want to include this property source we only want to include the
192+
// document for the default profile. When we create the S3ConfigFile for
193+
// this file we set the shouldIncludeWithEmptyProperties to false
194+
// in ProfileSpecificYamlDocumentS3ConfigFile for this specific case.
195+
if (config != null) {
196+
if (!config.isEmpty() || s3ConfigFile.isShouldIncludeWithEmptyProperties()) {
197+
environment.setVersion(s3ConfigFile.getVersion());
198+
config.putAll(serverProperties.getOverrides());
199+
PropertySource propertySource = new PropertySource(s3ConfigFile.getName(), config);
200+
if (LOG.isDebugEnabled()) {
201+
LOG.debug("Adding property source to environment " + propertySource);
199202
}
203+
environment.add(propertySource);
200204
}
201205
}
202206
}
@@ -348,8 +352,8 @@ protected S3ConfigFile(String application, String profile, String label, String
348352
this.profile = profile;
349353
this.label = label;
350354
this.bucketName = bucketName;
351-
this.s3Client = s3Client;
352355
this.useApplicationAsDirectory = useApplicationAsDirectory;
356+
this.s3Client = s3Client;
353357
}
354358

355359
String getVersion() {
@@ -437,11 +441,10 @@ class PropertyS3ConfigFile extends S3ConfigFile {
437441
PropertyS3ConfigFile(String application, String profile, String label, String bucketName,
438442
boolean useApplicationAsDirectory, S3Client s3Client) {
439443
super(application, profile, label, bucketName, useApplicationAsDirectory, s3Client);
440-
this.properties = read();
441444
}
442445

443446
@Override
444-
public Properties read() {
447+
public synchronized Properties read() {
445448
if (this.properties != null) {
446449
return this.properties;
447450
}
@@ -453,6 +456,7 @@ public Properties read() {
453456
LOG.warn("Exception thrown when reading property file", e);
454457
throw new IllegalStateException("Cannot load environment", e);
455458
}
459+
this.properties = props;
456460
return props;
457461
}
458462

@@ -478,8 +482,6 @@ class YamlS3ConfigFile extends S3ConfigFile {
478482
final YamlProcessor.DocumentMatcher... documentMatchers) {
479483
super(application, profile, label, bucketName, useApplicationAsDirectory, s3Client);
480484
this.documentMatchers = documentMatchers;
481-
this.properties = read();
482-
483485
}
484486

485487
protected static boolean profileMatchesActivateProperty(String profile, Properties properties) {
@@ -493,15 +495,16 @@ protected static boolean onProfilePropertyExists(Properties properties) {
493495
}
494496

495497
@Override
496-
public Properties read() {
498+
public synchronized Properties read() {
497499
if (properties != null) {
498500
return properties;
499501
}
500502
final YamlPropertiesFactoryBean yaml = new YamlPropertiesFactoryBean();
501503
try (InputStream in = getObject()) {
502504
yaml.setResources(new InputStreamResource(in));
503505
yaml.setDocumentMatchers(documentMatchers);
504-
return yaml.getObject();
506+
this.properties = yaml.getObject();
507+
return this.properties;
505508
}
506509
catch (Exception e) {
507510
LOG.warn("Could not read YAML file", e);
@@ -567,7 +570,6 @@ class JsonS3ConfigFile extends YamlS3ConfigFile {
567570
JsonS3ConfigFile(String application, String profile, String label, String bucketName,
568571
boolean useApplicationAsDirectory, S3Client s3Client) {
569572
super(application, profile, label, bucketName, useApplicationAsDirectory, s3Client);
570-
this.properties = read();
571573
}
572574

573575
@Override

spring-cloud-config-server/src/main/java/org/springframework/cloud/config/server/environment/AwsS3EnvironmentRepositoryFactory.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package org.springframework.cloud.config.server.environment;
1818

19+
import java.util.concurrent.Executor;
20+
import java.util.concurrent.Executors;
21+
1922
import software.amazon.awssdk.services.s3.S3Client;
2023
import software.amazon.awssdk.services.s3.S3ClientBuilder;
2124

@@ -38,8 +41,13 @@ public AwsS3EnvironmentRepository build(AwsS3EnvironmentProperties environmentPr
3841
configureClientBuilder(clientBuilder, environmentProperties.getRegion(), environmentProperties.getEndpoint());
3942
final S3Client client = clientBuilder.build();
4043

44+
Executor executor = null;
45+
if (environmentProperties.getPoolSize() > 0) {
46+
executor = Executors.newFixedThreadPool(environmentProperties.getPoolSize());
47+
}
48+
4149
AwsS3EnvironmentRepository repository = new AwsS3EnvironmentRepository(client,
42-
environmentProperties.getBucket(), environmentProperties.isUseDirectoryLayout(), server);
50+
environmentProperties.getBucket(), environmentProperties.isUseDirectoryLayout(), server, executor);
4351
repository.setOrder(environmentProperties.getOrder());
4452
return repository;
4553
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2013-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.config.server.environment;
18+
19+
import java.io.ByteArrayInputStream;
20+
import java.util.concurrent.Executor;
21+
import java.util.concurrent.Executors;
22+
23+
import org.junit.jupiter.api.Test;
24+
import software.amazon.awssdk.core.ResponseInputStream;
25+
import software.amazon.awssdk.http.AbortableInputStream;
26+
import software.amazon.awssdk.services.s3.S3Client;
27+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
28+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
29+
30+
import org.springframework.cloud.config.environment.Environment;
31+
import org.springframework.cloud.config.server.config.ConfigServerProperties;
32+
33+
import static org.assertj.core.api.Assertions.assertThat;
34+
import static org.mockito.ArgumentMatchers.any;
35+
import static org.mockito.Mockito.atLeastOnce;
36+
import static org.mockito.Mockito.mock;
37+
import static org.mockito.Mockito.verify;
38+
import static org.mockito.Mockito.when;
39+
40+
/**
41+
* @author Noah Hanka
42+
*/
43+
public class ParallelAwsS3EnvironmentRepositoryTests {
44+
45+
@Test
46+
public void testParallelFetching() {
47+
S3Client s3Client = mock(S3Client.class);
48+
ConfigServerProperties server = new ConfigServerProperties();
49+
Executor executor = Executors.newFixedThreadPool(2);
50+
51+
AwsS3EnvironmentRepository repo = new AwsS3EnvironmentRepository(s3Client, "bucket", false, server, executor);
52+
53+
String content = "foo: bar";
54+
GetObjectResponse response = GetObjectResponse.builder().build();
55+
56+
when(s3Client.getObject(any(GetObjectRequest.class))).thenAnswer(invocation -> {
57+
return new ResponseInputStream<>(response,
58+
AbortableInputStream.create(new ByteArrayInputStream(content.getBytes())));
59+
});
60+
61+
// Request with 2 profiles
62+
Environment env = repo.findOne("app", "p1,p2", "label");
63+
64+
assertThat(env.getPropertySources()).isNotEmpty();
65+
verify(s3Client, atLeastOnce()).getObject(any(GetObjectRequest.class));
66+
}
67+
68+
@Test
69+
public void testSequentialFetchingByDefault() {
70+
S3Client s3Client = mock(S3Client.class);
71+
ConfigServerProperties server = new ConfigServerProperties();
72+
73+
AwsS3EnvironmentRepository repo = new AwsS3EnvironmentRepository(s3Client, "bucket", server);
74+
75+
String content = "foo: bar";
76+
GetObjectResponse response = GetObjectResponse.builder().build();
77+
78+
when(s3Client.getObject(any(GetObjectRequest.class))).thenAnswer(invocation -> {
79+
return new ResponseInputStream<>(response,
80+
AbortableInputStream.create(new ByteArrayInputStream(content.getBytes())));
81+
});
82+
83+
Environment env = repo.findOne("app", "p1", "label");
84+
85+
assertThat(env.getPropertySources()).isNotEmpty();
86+
verify(s3Client, atLeastOnce()).getObject(any(GetObjectRequest.class));
87+
}
88+
89+
}

0 commit comments

Comments
 (0)