Skip to content

Commit c7a516b

Browse files
authored
File Storage Support for Java SDK #99
File Storage Support for Java SDK
2 parents b271697 + 72e7a21 commit c7a516b

63 files changed

Lines changed: 4903 additions & 10 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

conductor-client-spring/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ repositories {
2020
dependencies {
2121
api project(":conductor-client")
2222
implementation 'org.springframework.boot:spring-boot-starter:3.5.13'
23+
24+
testImplementation 'org.springframework.boot:spring-boot-starter-test:3.3.13'
2325
}
2426

2527
java {

conductor-client-spring/src/main/java/com/netflix/conductor/client/spring/ConductorClientAutoConfiguration.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
import java.util.Optional;
1919

2020
import org.apache.commons.lang3.StringUtils;
21+
import org.conductoross.conductor.client.FileClient;
22+
import org.conductoross.conductor.client.FileClientProperties;
2123
import org.springframework.boot.autoconfigure.AutoConfiguration;
2224
import org.springframework.boot.autoconfigure.AutoConfigureOrder;
2325
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
2426
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
27+
import org.springframework.boot.context.properties.ConfigurationProperties;
2528
import org.springframework.boot.context.properties.EnableConfigurationProperties;
2629
import org.springframework.context.annotation.Bean;
2730
import org.springframework.core.Ordered;
@@ -82,6 +85,7 @@ public TaskRunnerConfigurer taskRunnerConfigurer(Environment env,
8285
TaskClient taskClient,
8386
ClientProperties clientProperties,
8487
List<Worker> workers,
88+
Optional<FileClient> fileClient,
8589
Optional<MetricsCollector> metricsCollector) {
8690
Map<String, Integer> taskThreadCount = new HashMap<>();
8791
for (Worker worker : workers) {
@@ -105,6 +109,7 @@ public TaskRunnerConfigurer taskRunnerConfigurer(Environment env,
105109
.withTaskToDomain(clientProperties.getTaskToDomain())
106110
.withShutdownGracePeriodSeconds(clientProperties.getShutdownGracePeriodSeconds())
107111
.withTaskPollTimeout(clientProperties.getTaskPollTimeout());
112+
fileClient.ifPresent(builder::withFileClient);
108113
metricsCollector.ifPresent(builder::withMetricsCollector);
109114
return builder.build();
110115
}
@@ -122,4 +127,19 @@ public WorkflowExecutor workflowExecutor(ConductorClient client, AnnotatedWorker
122127
public WorkflowClient workflowClient(ConductorClient client) {
123128
return new WorkflowClient(client);
124129
}
130+
131+
@Bean
132+
@ConfigurationProperties("conductor.file-client")
133+
@ConditionalOnBean(ConductorClient.class)
134+
@ConditionalOnMissingBean
135+
public FileClientProperties fileClientProperties() {
136+
return new FileClientProperties();
137+
}
138+
139+
@Bean
140+
@ConditionalOnBean(ConductorClient.class)
141+
@ConditionalOnMissingBean
142+
public FileClient fileClient(ConductorClient client, FileClientProperties fileClientProperties) {
143+
return new FileClient(client, fileClientProperties, null);
144+
}
125145
}

conductor-client-spring/src/main/java/io/orkes/conductor/client/spring/OrkesConductorClientAutoConfiguration.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@
1313
package io.orkes.conductor.client.spring;
1414

1515
import org.apache.commons.lang3.StringUtils;
16+
import org.conductoross.conductor.client.FileClient;
17+
import org.conductoross.conductor.client.FileClientProperties;
1618
import org.springframework.boot.autoconfigure.AutoConfiguration;
1719
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
1820
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
21+
import org.springframework.boot.context.properties.ConfigurationProperties;
1922
import org.springframework.boot.context.properties.EnableConfigurationProperties;
2023
import org.springframework.context.annotation.Bean;
2124
import org.springframework.context.annotation.Import;
@@ -128,4 +131,18 @@ public SecretClient orkesSecretClient(OrkesClients clients) {
128131
return clients.getSecretClient();
129132
}
130133

134+
@Bean
135+
@ConfigurationProperties("conductor.file-client")
136+
@ConditionalOnBean(ApiClient.class)
137+
@ConditionalOnMissingBean
138+
public FileClientProperties fileClientProperties() {
139+
return new FileClientProperties();
140+
}
141+
142+
@Bean
143+
@ConditionalOnBean(ApiClient.class)
144+
@ConditionalOnMissingBean
145+
public FileClient fileClient(ApiClient client, FileClientProperties fileClientProperties) {
146+
return new FileClient(client, fileClientProperties, null);
147+
}
131148
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2026 Conductor Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package com.netflix.conductor.client.spring;
14+
15+
import org.conductoross.conductor.client.FileClient;
16+
import org.conductoross.conductor.client.FileClientProperties;
17+
import org.junit.jupiter.api.Test;
18+
import org.springframework.boot.autoconfigure.AutoConfigurations;
19+
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
class ConductorClientAutoConfigurationTest {
24+
25+
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
26+
.withConfiguration(AutoConfigurations.of(ConductorClientAutoConfiguration.class));
27+
28+
@Test
29+
void zeroConfigYieldsWorkingFileClientBean() {
30+
contextRunner
31+
.withPropertyValues("conductor.client.base-path=http://localhost:8080/api")
32+
.run(context -> {
33+
assertThat(context).hasSingleBean(FileClient.class);
34+
FileClientProperties properties = context.getBean(FileClientProperties.class);
35+
assertThat(properties.getLocalCacheDirectory())
36+
.startsWith(System.getProperty("java.io.tmpdir"));
37+
});
38+
}
39+
40+
@Test
41+
void cacheDirectoryOverrideIsRespected() {
42+
contextRunner
43+
.withPropertyValues(
44+
"conductor.client.base-path=http://localhost:8080/api",
45+
"conductor.file-client.local-cache-directory=/tmp/custom-cache")
46+
.run(context -> {
47+
FileClientProperties properties = context.getBean(FileClientProperties.class);
48+
assertThat(properties.getLocalCacheDirectory()).isEqualTo("/tmp/custom-cache");
49+
});
50+
}
51+
52+
}

conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import java.io.PrintWriter;
1616
import java.io.StringWriter;
17+
import java.nio.file.Path;
1718
import java.util.ArrayList;
1819
import java.util.Collections;
1920
import java.util.LinkedList;
@@ -34,6 +35,11 @@
3435
import java.util.function.Function;
3536

3637
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
38+
import org.conductoross.conductor.client.FileClient;
39+
import org.conductoross.conductor.sdk.file.FileHandler;
40+
import org.conductoross.conductor.sdk.file.FileUploadOptions;
41+
import org.conductoross.conductor.sdk.file.LocalFileHandler;
42+
import org.conductoross.conductor.sdk.file.WorkflowFileClient;
3743
import org.slf4j.Logger;
3844
import org.slf4j.LoggerFactory;
3945

@@ -58,6 +64,7 @@
5864
import com.netflix.conductor.common.metadata.tasks.Task;
5965
import com.netflix.conductor.common.metadata.tasks.TaskResult;
6066

67+
import com.google.common.annotations.VisibleForTesting;
6168
import com.google.common.base.Stopwatch;
6269
import com.google.common.util.concurrent.Uninterruptibles;
6370

@@ -80,6 +87,7 @@ class TaskRunner {
8087
private final EventDispatcher<TaskRunnerEvent> eventDispatcher;
8188
private final LinkedBlockingQueue<Task> tasksTobeExecuted;
8289
private final boolean enableUpdateV2;
90+
private final FileClient fileClient;
8391
private static final int LEASE_EXTEND_RETRY_COUNT = 3;
8492
private static final double LEASE_EXTEND_DURATION_FACTOR = 0.8;
8593
private final ScheduledExecutorService leaseExtendExecutorService;
@@ -95,9 +103,11 @@ class TaskRunner {
95103
int taskPollTimeout,
96104
List<PollFilter> pollFilters,
97105
EventDispatcher<TaskRunnerEvent> eventDispatcher,
98-
boolean useVirtualThreads) {
106+
boolean useVirtualThreads,
107+
FileClient fileClient) {
99108
this.worker = worker;
100109
this.taskClient = taskClient;
110+
this.fileClient = fileClient;
101111
this.updateRetryCount = updateRetryCount;
102112
this.taskPollTimeout = taskPollTimeout;
103113
this.pollingIntervalInMillis = worker.getPollingInterval();
@@ -376,6 +386,11 @@ private void executeTask(Worker worker, Task task) {
376386
return;
377387
}
378388

389+
// Set FileClient on task for file storage support
390+
if (fileClient != null) {
391+
task.setWorkflowFileClient(new WorkflowFileClient(fileClient, task.getWorkflowInstanceId()));
392+
}
393+
379394
// Calculate inbound network latency
380395
try {
381396
if(task.getExecutionMetadata().getServerSendTime() != null ){
@@ -398,6 +413,7 @@ private void executeTask(Worker worker, Task task) {
398413
worker.getIdentity());
399414
result = worker.execute(task);
400415
stopwatch.stop();
416+
uploadFilesToFileStorage(result, task.getWorkflowInstanceId(), task.getTaskId());
401417
eventDispatcher.publish(new TaskExecutionCompleted(taskType, task.getTaskId(), worker.getIdentity(), stopwatch.elapsed(TimeUnit.MILLISECONDS)));
402418
// record execution end time in task
403419
task.getExecutionMetadata().setExecutionEndTime(System.currentTimeMillis());
@@ -499,6 +515,26 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo
499515
}
500516
}
501517

518+
@VisibleForTesting
519+
void uploadFilesToFileStorage(TaskResult result, String workflowId, String taskId) {
520+
if (fileClient == null || result.getOutputData() == null) {
521+
return;
522+
}
523+
for (var entry : result.getOutputData().entrySet()) {
524+
if (!(entry.getValue() instanceof FileHandler fh)) {
525+
continue;
526+
}
527+
if (fh.getFileHandleId() == null) {
528+
Path path = ((LocalFileHandler) fh).getPath();
529+
FileUploadOptions options = new FileUploadOptions()
530+
.setContentType(fh.getContentType())
531+
.setTaskId(taskId);
532+
FileHandler uploaded = fileClient.upload(workflowId, path, options);
533+
entry.setValue(uploaded);
534+
}
535+
}
536+
}
537+
502538
private Optional<String> upload(TaskResult result, String taskType) {
503539
try {
504540
return taskClient.evaluateAndUploadLargePayload(result.getOutputData(), taskType);

conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunnerConfigurer.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.function.Consumer;
2222

2323
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
24+
import org.conductoross.conductor.client.FileClient;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
2627

@@ -55,6 +56,7 @@ public class TaskRunnerConfigurer {
5556
private final EventDispatcher<TaskRunnerEvent> eventDispatcher;
5657
private final MetricsCollector metricsCollector;
5758
private final boolean useVirtualThreads;
59+
private final FileClient fileClient;
5860

5961
/**
6062
* @see TaskRunnerConfigurer.Builder
@@ -77,6 +79,7 @@ private TaskRunnerConfigurer(TaskRunnerConfigurer.Builder builder) {
7779
this.eventDispatcher = builder.eventDispatcher;
7880
this.metricsCollector = builder.metricsCollector;
7981
this.useVirtualThreads = builder.useVirtualThreads;
82+
this.fileClient = builder.fileClient;
8083
builder.workers.forEach(this.workers::add);
8184
taskRunners = new LinkedList<>();
8285
}
@@ -179,7 +182,8 @@ private void startWorker(Worker worker) {
179182
taskPollTimeout,
180183
pollFilters,
181184
eventDispatcher,
182-
useVirtualThreads);
185+
useVirtualThreads,
186+
fileClient);
183187
// startWorker(worker) is executed by several threads.
184188
// taskRunners.add(taskRunner) without synchronization could lead to a race condition and unpredictable behavior,
185189
// including potential null values being inserted or corrupted state.
@@ -212,6 +216,7 @@ public static class Builder {
212216
private final List<PollFilter> pollFilters = new LinkedList<>();
213217
private final EventDispatcher<TaskRunnerEvent> eventDispatcher = new EventDispatcher<>();
214218
private boolean useVirtualThreads;
219+
private FileClient fileClient;
215220

216221
/**
217222
* Returns the event dispatcher used by this builder, allowing direct
@@ -382,5 +387,10 @@ public Builder withUseVirtualThreads(boolean useVirtualThreads) {
382387
this.useVirtualThreads = useVirtualThreads;
383388
return this;
384389
}
390+
391+
public Builder withFileClient(FileClient fileClient) {
392+
this.fileClient = fileClient;
393+
return this;
394+
}
385395
}
386396
}

conductor-client/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,16 @@
1818
import java.util.Optional;
1919

2020
import org.apache.commons.lang3.StringUtils;
21+
import org.conductoross.conductor.sdk.file.FileHandler;
22+
import org.conductoross.conductor.sdk.file.FileStorageException;
23+
import org.conductoross.conductor.sdk.file.FileUploader;
24+
import org.conductoross.conductor.sdk.file.ManagedFileHandler;
25+
import org.conductoross.conductor.sdk.file.WorkflowFileClient;
2126

2227
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
2328
import com.netflix.conductor.common.run.tasks.TypedTask;
2429

30+
import com.fasterxml.jackson.annotation.JsonIgnore;
2531
import lombok.*;
2632

2733
@Data
@@ -179,6 +185,32 @@ public boolean isRetriable() {
179185

180186
private long firstStartTime;
181187

188+
@JsonIgnore
189+
private transient WorkflowFileClient workflowFileClient;
190+
191+
@JsonIgnore
192+
public FileUploader getFileUploader() {
193+
return workflowFileClient;
194+
}
195+
196+
public void setWorkflowFileClient(WorkflowFileClient workflowFileClient) {
197+
this.workflowFileClient = workflowFileClient;
198+
}
199+
200+
public FileHandler getInputFileHandler(String key) {
201+
if (workflowFileClient == null) {
202+
throw new FileStorageException("FileClient is not configured; cannot access file input for key '" + key + "'");
203+
}
204+
Object value = getInputData().get(key);
205+
String fileHandleId = FileHandler.extractFileHandleId(value);
206+
if (FileHandler.isFileHandleId(fileHandleId)) {
207+
return new ManagedFileHandler(fileHandleId, workflowFileClient);
208+
}
209+
throw new FileStorageException(
210+
"Expected " + FileHandler.PREFIX
211+
+ " reference for key '" + key + "', got: " + value);
212+
}
213+
182214
public void setInputData(Map<String, Object> inputData) {
183215
if (inputData == null) {
184216
inputData = new HashMap<>();

0 commit comments

Comments
 (0)