Skip to content

Commit 5cfcf3b

Browse files
authored
Merge branch 'jg/optimizer_refactor' into master
2 parents c6fa787 + ec0a6af commit 5cfcf3b

13 files changed

Lines changed: 83 additions & 19 deletions

File tree

helm/transfer-service/Charts.yaml

Whitespace-only changes.

src/main/java/org/onedatashare/transferservice/odstransferservice/config/OptimizerConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
import org.springframework.web.client.RestTemplate;
88
import org.springframework.web.util.DefaultUriBuilderFactory;
99

10+
import java.util.concurrent.Executor;
11+
import java.util.concurrent.ExecutorService;
12+
import java.util.concurrent.Executors;
13+
1014
@Configuration
1115
public class OptimizerConfig {
1216

@@ -19,4 +23,9 @@ public RestTemplate optimizerTemplate() {
1923
.uriTemplateHandler(new DefaultUriBuilderFactory(optimizerUrl))
2024
.build();
2125
}
26+
27+
@Bean(name ="optimizerTaskExecutor")
28+
public Executor optimizerTaskExecutor(){
29+
return Executors.newVirtualThreadPerTaskExecutor();
30+
}
2231
}

src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,13 @@
1717
import org.springframework.amqp.core.Message;
1818
import org.springframework.amqp.core.Queue;
1919
import org.springframework.amqp.rabbit.annotation.RabbitListener;
20-
import org.springframework.batch.core.JobExecution;
2120
import org.springframework.batch.core.JobParameters;
2221
import org.springframework.batch.core.JobParametersBuilder;
2322
import org.springframework.batch.core.launch.JobLauncher;
24-
import org.springframework.beans.factory.annotation.Autowired;
2523
import org.springframework.stereotype.Service;
2624

2725
import java.util.ArrayList;
2826
import java.util.List;
29-
import java.util.Set;
3027

3128
@Service
3229
public class RabbitMQConsumer {
@@ -52,35 +49,38 @@ public RabbitMQConsumer(VfsExpander vfsExpander, Queue userQueue, JobParamServic
5249
this.jobLauncher = asyncJobLauncher;
5350
this.jc = jc;
5451
this.objectMapper = new ObjectMapper();
55-
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
52+
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);
5653
this.objectMapper.setDefaultPropertyInclusion(JsonInclude.Include.ALWAYS);
5754
this.threadPoolManager = threadPoolManager;
5855
}
5956

6057
@RabbitListener(queues = "#{userQueue}")
6158
public void consumeDefaultMessage(final Message message) {
6259
String jsonStr = new String(message.getBody());
60+
6361
logger.info("Message recv: {}", jsonStr);
6462
try {
6563
TransferJobRequest request = objectMapper.readValue(jsonStr, TransferJobRequest.class);
6664
logger.info("Job Recieved: {}", request.toString());
65+
6766
if (request.getSource().getType().equals(EndpointType.vfs)) {
6867
List<EntityInfo> fileExpandedList = vfsExpander.expandDirectory(request.getSource().getInfoList(), request.getSource().getFileSourcePath());
6968
request.getSource().setInfoList(new ArrayList<>(fileExpandedList));
7069
}
7170
JobParameters parameters = jobParamService.translate(new JobParametersBuilder(), request);
7271
jc.setRequest(request);
7372
jobLauncher.run(jc.concurrentJobDefinition(), parameters);
73+
7474
return;
7575
} catch (Exception e) {
76-
logger.debug("Failed to parse jsonStr: {} to TransferJobRequest.java", jsonStr);
76+
logger.error("Failed to parse jsonStr: {} to TransferJobRequest.java", jsonStr);
7777
}
7878
try {
7979
TransferApplicationParams params = objectMapper.readValue(jsonStr, TransferApplicationParams.class);
8080
logger.info("Parsed TransferApplicationParams: {}", params);
8181
this.threadPoolManager.applyOptimizer(params.getConcurrency(), params.getParallelism());
8282
} catch (Exception e) {
83-
logger.info("Did not apply transfer params due to parsing message failure");
83+
logger.error("Did not apply transfer params due to parsing message failure");
8484
}
8585
}
8686
}

src/main/java/org/onedatashare/transferservice/odstransferservice/model/EntityInfo.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22

33
import com.fasterxml.jackson.core.JsonProcessingException;
44
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
56
import lombok.AllArgsConstructor;
67
import lombok.Data;
78
import lombok.NoArgsConstructor;
89

910
@Data
1011
@AllArgsConstructor
1112
@NoArgsConstructor
13+
@JsonIgnoreProperties(ignoreUnknown = true)
1214
public class EntityInfo {
1315
private String id;
1416
private String path;

src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferJobRequest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public class TransferJobRequest {
2222
private TransferOptions options;
2323
@JsonInclude(JsonInclude.Include.NON_NULL)
2424
private UUID jobUuid;
25+
public String transferNodeName;
2526

2627
@Data
2728
@AllArgsConstructor

src/main/java/org/onedatashare/transferservice/odstransferservice/model/credential/AccountEndpointCredential.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
package org.onedatashare.transferservice.odstransferservice.model.credential;
22

3+
import com.fasterxml.jackson.annotation.JsonIgnore;
4+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
35
import lombok.Data;
46
import lombok.ToString;
57
import org.onedatashare.transferservice.odstransferservice.Enum.EndpointType;
68

79
@Data
10+
@JsonIgnoreProperties(ignoreUnknown = true)
811
public class AccountEndpointCredential extends EndpointCredential{
912
private String uri; //the hostname and port to reach the server
1013
private String username; //this should be the username for the client
1114
@ToString.Exclude
1215
private String secret; //This will contain the password of the resource you
16+
@JsonIgnore
1317
byte[] encryptedSecret;
1418

1519
public static String[] uriFormat(AccountEndpointCredential credential, EndpointType type) {

src/main/java/org/onedatashare/transferservice/odstransferservice/model/credential/EndpointCredential.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package org.onedatashare.transferservice.odstransferservice.model.credential;
22

3+
import com.fasterxml.jackson.annotation.JsonIgnore;
4+
35
/**
46
* Base class for storing one user credential
57
*/
68
public class EndpointCredential{
7-
protected String accountId;
89

10+
String accountId;
911
public EndpointCredential(){}
1012
public EndpointCredential(String accountId){
1113
this.accountId = accountId;
Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,26 @@
11
package org.onedatashare.transferservice.odstransferservice.model.optimizer;
22

3+
import lombok.AllArgsConstructor;
34
import lombok.Data;
45

6+
import java.util.UUID;
7+
58
@Data
69
public class OptimizerCreateRequest {
710
String nodeId;
811
int maxConcurrency;
912
int maxParallelism;
1013
int maxPipelining;
1114
int maxChunkSize;
12-
1315
String optimizerType;
14-
1516
long fileCount;
1617
Long jobId;
1718
String dbType;
19+
String jobUuid;
20+
String userId;
1821

19-
public OptimizerCreateRequest(String nodeId, int maxConcurrency, int maxParallelism, int maxPipelining, String optimizerType, long fileCount, long jobId, String dbType) {
22+
public OptimizerCreateRequest(String userId,String nodeId, int maxConcurrency, int maxParallelism, int maxPipelining, String optimizerType, long fileCount, long jobId, String dbType, String jobUuid) {
23+
this.userId = userId;
2024
this.maxConcurrency = maxConcurrency;
2125
this.maxChunkSize = Integer.MAX_VALUE;
2226
this.maxParallelism = maxParallelism;
@@ -26,5 +30,6 @@ public OptimizerCreateRequest(String nodeId, int maxConcurrency, int maxParallel
2630
this.fileCount = fileCount;
2731
this.jobId = jobId;
2832
this.dbType = dbType;
33+
this.jobUuid = jobUuid;
2934
}
3035
}

src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ThreadPoolManager.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,19 +66,34 @@ public void applyOptimizer(int concurrency, int parallel) {
6666
if (key.contains(STEP_POOL_PREFIX)) {
6767
logger.info("Changing {} pool size from {} to {}", pool.getThreadNamePrefix(), pool.getConcurrencyLimit(), concurrency);
6868
if (concurrency > 0 && concurrency != pool.getConcurrencyLimit()) {
69-
// pool.setMaxPoolSize(concurrency);
7069
pool.setConcurrencyLimit(concurrency);
7170
logger.info("Set {} pool size to {}", pool.getThreadNamePrefix(), concurrency);
7271
}
7372
}
73+
if (key.contains(PARALLEL_POOL_PREFIX)) {
74+
logger.info("Changing {} pool size from {} to {}", pool.getThreadNamePrefix(), pool.getConcurrencyLimit(), parallel);
75+
if (parallel > 0 && parallel != pool.getConcurrencyLimit()) {
76+
pool.setConcurrencyLimit(parallel);
77+
logger.info("Set {} pool size to {}", pool.getThreadNamePrefix(), parallel);
78+
}
79+
}
7480
}
81+
7582
for (String key : this.platformThreadMap.keySet()) {
7683
ThreadPoolTaskExecutor pool = this.platformThreadMap.get(key);
84+
if (key.contains(STEP_POOL_PREFIX)) {
85+
logger.info("Changing {} pool size from {} to {}", pool.getThreadNamePrefix(), pool.getCorePoolSize(), concurrency);
86+
if (concurrency > 0 && concurrency != pool.getCorePoolSize()) {
87+
pool.setCorePoolSize(concurrency);
88+
logger.info("Set {} pool size to {}", pool.getThreadNamePrefix(), concurrency);
89+
}
90+
}
7791
if (key.contains(PARALLEL_POOL_PREFIX)) {
7892
logger.info("Changing {} pool size from {} to {}", pool.getThreadNamePrefix(), pool.getCorePoolSize(), parallel);
79-
if (parallel > 0 && parallel != pool.getPoolSize()) {
80-
// pool.setMaxPoolSize(parallel);
93+
if (parallel > 0 && parallel != pool.getCorePoolSize()) {
8194
pool.setCorePoolSize(parallel);
95+
logger.info("Set {} pool size to {}", pool.getThreadNamePrefix(), parallel);
96+
8297
}
8398
}
8499
}
@@ -100,7 +115,7 @@ public void clearJobPool() {
100115
logger.info("Cleared all thread pools");
101116
}
102117

103-
// public SimpleAsyncTaskExecutor sequentialThreadPool() {
118+
// public SimpleAsyncTaskExecutor sequentialThreadPool() {
104119
// return this.createVirtualThreadExecutor(1, SEQUENTIAL_POOL_PREFIX);
105120
// }
106121
//

src/main/java/org/onedatashare/transferservice/odstransferservice/service/OptimizerService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@
99
import org.springframework.http.HttpEntity;
1010
import org.springframework.http.HttpHeaders;
1111
import org.springframework.http.MediaType;
12+
import org.springframework.scheduling.annotation.Async;
1213
import org.springframework.stereotype.Service;
1314
import org.springframework.web.client.RestClientException;
1415
import org.springframework.web.client.RestTemplate;
1516

17+
import java.util.concurrent.CompletableFuture;
18+
1619
@Service
1720
public class OptimizerService {
1821

@@ -31,14 +34,17 @@ public OptimizerService() {
3134
headers.setContentType(MediaType.APPLICATION_JSON);
3235
}
3336

37+
@Async("optimizerTaskExecutor")
3438
public void createOptimizerBlocking(OptimizerCreateRequest optimizerCreateRequest) throws RestClientException {
3539
optimizerCreateRequest.setNodeId(this.appName);
3640
logger.info("Sending OptimizerCreateRequest {}", optimizerCreateRequest);
3741
HttpEntity<OptimizerCreateRequest> createRequestHttpEntity = new HttpEntity<>(optimizerCreateRequest, this.headers);
3842
logger.info(createRequestHttpEntity.getBody().toString());
3943
this.optimizerTemplate.postForLocation("/optimizer/create", createRequestHttpEntity, Void.class);
44+
CompletableFuture.completedFuture(null);
4045
}
4146

47+
@Async("optimizerTaskExecutor")
4248
public void deleteOptimizerBlocking(OptimizerDeleteRequest optimizerDeleteRequest) {
4349
optimizerDeleteRequest.setNodeId(this.appName);
4450
try {
@@ -47,5 +53,6 @@ public void deleteOptimizerBlocking(OptimizerDeleteRequest optimizerDeleteReques
4753
logger.error("Failed to Delete optimizer. {}", optimizerDeleteRequest);
4854
}
4955
logger.info("Deleted {}", optimizerDeleteRequest);
56+
CompletableFuture.completedFuture(null);
5057
}
5158
}

0 commit comments

Comments
 (0)