Skip to content

Commit 60f2cbe

Browse files
Add an interceptor for listExecutions
1 parent b7c72a2 commit 60f2cbe

7 files changed

Lines changed: 137 additions & 20 deletions

File tree

temporal-sdk/src/main/java/io/temporal/client/ListWorkflowExecutionIterator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@
1111
import javax.annotation.Nonnull;
1212
import javax.annotation.Nullable;
1313

14-
class ListWorkflowExecutionIterator
14+
public class ListWorkflowExecutionIterator
1515
extends EagerPaginator<ListWorkflowExecutionsResponse, WorkflowExecutionInfo> {
1616
private final @Nullable String query;
1717
private final @Nonnull String namespace;
1818
private final @Nullable Integer pageSize;
1919
private final @Nonnull GenericWorkflowClient genericClient;
2020

21-
ListWorkflowExecutionIterator(
21+
public ListWorkflowExecutionIterator(
2222
@Nullable String query,
2323
@Nonnull String namespace,
2424
@Nullable Integer pageSize,

temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import com.google.common.base.Preconditions;
66
import com.google.common.base.Strings;
7-
import com.google.common.collect.Iterators;
87
import com.google.common.reflect.TypeToken;
98
import com.uber.m3.tally.Scope;
109
import io.temporal.api.common.v1.WorkflowExecution;
@@ -243,22 +242,10 @@ public Stream<WorkflowExecutionMetadata> listExecutions(@Nullable String query)
243242

244243
Stream<WorkflowExecutionMetadata> listExecutions(
245244
@Nullable String query, @Nullable Integer pageSize) {
246-
ListWorkflowExecutionIterator iterator =
247-
new ListWorkflowExecutionIterator(query, options.getNamespace(), pageSize, genericClient);
248-
iterator.init();
249-
Iterator<WorkflowExecutionMetadata> wrappedIterator =
250-
Iterators.transform(
251-
iterator, info -> new WorkflowExecutionMetadata(info, options.getDataConverter()));
252-
253-
// IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is
254-
// impossible
255-
// TODO We don't add DISTINCT to be safe. It's not explicitly stated if Temporal Server list
256-
// API
257-
// guarantees absence of duplicates
258-
final int CHARACTERISTICS = Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE;
259-
260-
return StreamSupport.stream(
261-
Spliterators.spliteratorUnknownSize(wrappedIterator, CHARACTERISTICS), false);
245+
return workflowClientCallsInvoker
246+
.listWorkflowExecutions(
247+
new WorkflowClientCallsInterceptor.ListWorkflowExecutionsInput(query, pageSize))
248+
.getStream();
262249
}
263250

264251
@Override

temporal-sdk/src/main/java/io/temporal/client/WorkflowExecutionMetadata.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@
2020
import javax.annotation.Nonnull;
2121
import javax.annotation.Nullable;
2222

23+
/** WorkflowExecutionMetadata contains information about a workflow execution. */
2324
public class WorkflowExecutionMetadata {
2425
private final @Nonnull WorkflowExecutionInfo info;
2526
private final @Nonnull DataConverter dataConverter;
2627

27-
WorkflowExecutionMetadata(
28+
public WorkflowExecutionMetadata(
2829
@Nonnull WorkflowExecutionInfo info, @Nonnull DataConverter dataConverter) {
2930
this.info = Preconditions.checkNotNull(info, "info");
3031
this.dataConverter = Preconditions.checkNotNull(dataConverter, "dataConverter");

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.concurrent.CompletableFuture;
1111
import java.util.concurrent.TimeUnit;
1212
import java.util.concurrent.TimeoutException;
13+
import java.util.stream.Stream;
1314
import javax.annotation.Nonnull;
1415
import javax.annotation.Nullable;
1516

@@ -75,6 +76,40 @@ public interface WorkflowClientCallsInterceptor {
7576

7677
DescribeWorkflowOutput describe(DescribeWorkflowInput input);
7778

79+
ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input);
80+
81+
final class ListWorkflowExecutionsInput {
82+
private final String query;
83+
private final Integer pageSize;
84+
85+
public ListWorkflowExecutionsInput(@Nullable String query, @Nullable Integer pageSize) {
86+
this.query = query;
87+
this.pageSize = pageSize;
88+
}
89+
90+
@Nullable
91+
public String getQuery() {
92+
return query;
93+
}
94+
95+
@Nullable
96+
public Integer getPageSize() {
97+
return pageSize;
98+
}
99+
}
100+
101+
final class ListWorkflowExecutionsOutput {
102+
private final Stream<WorkflowExecutionMetadata> stream;
103+
104+
public ListWorkflowExecutionsOutput(Stream<WorkflowExecutionMetadata> stream) {
105+
this.stream = stream;
106+
}
107+
108+
public Stream<WorkflowExecutionMetadata> getStream() {
109+
return stream;
110+
}
111+
}
112+
78113
final class WorkflowStartInput {
79114
private final String workflowId;
80115
private final String workflowType;

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptorBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,9 @@ public TerminateOutput terminate(TerminateInput input) {
7272
public DescribeWorkflowOutput describe(DescribeWorkflowInput input) {
7373
return next.describe(input);
7474
}
75+
76+
@Override
77+
public ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input) {
78+
return next.listWorkflowExecutions(input);
79+
}
7580
}

temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,13 @@
3030
import java.util.concurrent.CompletableFuture;
3131
import java.util.concurrent.TimeUnit;
3232
import java.util.concurrent.TimeoutException;
33+
import java.util.stream.StreamSupport;
3334
import javax.annotation.Nullable;
3435
import org.slf4j.Logger;
3536
import org.slf4j.LoggerFactory;
3637

38+
import com.google.common.collect.Iterators;
39+
3740
public class RootWorkflowClientInvoker implements WorkflowClientCallsInterceptor {
3841
private static final Logger log = LoggerFactory.getLogger(RootWorkflowClientInvoker.class);
3942
private static final long POLL_UPDATE_TIMEOUT_S = 60L;
@@ -676,6 +679,29 @@ public DescribeWorkflowOutput describe(DescribeWorkflowInput input) {
676679
new WorkflowExecutionDescription(response, dataConverterWithWorkflowContext));
677680
}
678681

682+
@Override
683+
public ListWorkflowExecutionsOutput listWorkflowExecutions(ListWorkflowExecutionsInput input) {
684+
ListWorkflowExecutionIterator iterator =
685+
new ListWorkflowExecutionIterator(
686+
input.getQuery(), clientOptions.getNamespace(), input.getPageSize(), genericClient);
687+
iterator.init();
688+
Iterator<WorkflowExecutionMetadata> wrappedIterator =
689+
Iterators.transform(
690+
iterator,
691+
info -> new WorkflowExecutionMetadata(info, clientOptions.getDataConverter()));
692+
693+
// IMMUTABLE here means that "interference" (in Java Streams terms) to this spliterator is
694+
// impossible
695+
// TODO We don't add DISTINCT to be safe. It's not explicitly stated if Temporal Server list
696+
// API
697+
// guarantees absence of duplicates
698+
final int CHARACTERISTICS = Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.IMMUTABLE;
699+
700+
return new ListWorkflowExecutionsOutput(
701+
StreamSupport.stream(
702+
Spliterators.spliteratorUnknownSize(wrappedIterator, CHARACTERISTICS), false));
703+
}
704+
679705
private static <R> R convertResultPayloads(
680706
Optional<Payloads> resultValue,
681707
Class<R> resultClass,
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.temporal.client;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertFalse;
5+
import static org.junit.Assume.assumeTrue;
6+
7+
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
8+
import io.temporal.common.interceptors.WorkflowClientCallsInterceptorBase;
9+
import io.temporal.common.interceptors.WorkflowClientInterceptorBase;
10+
import io.temporal.testing.internal.SDKTestWorkflowRule;
11+
import io.temporal.workflow.shared.TestWorkflows;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
import org.junit.Rule;
14+
import org.junit.Test;
15+
16+
public class ListWorkflowExecutionsInterceptorTest {
17+
@Rule
18+
public SDKTestWorkflowRule testWorkflowRule =
19+
SDKTestWorkflowRule.newBuilder()
20+
.setWorkflowTypes(TestWorkflows.DoNothingNoArgsWorkflow.class)
21+
.build();
22+
23+
@Test
24+
public void listExecutions_isIntercepted() throws InterruptedException {
25+
assumeTrue(
26+
"Test Server doesn't support listWorkflowExecutions endpoint yet",
27+
SDKTestWorkflowRule.useExternalService);
28+
29+
AtomicInteger intercepted = new AtomicInteger();
30+
WorkflowClient workflowClient =
31+
WorkflowClient.newInstance(
32+
testWorkflowRule.getWorkflowServiceStubs(),
33+
WorkflowClientOptions.newBuilder(testWorkflowRule.getWorkflowClient().getOptions())
34+
.setInterceptors(
35+
new WorkflowClientInterceptorBase() {
36+
@Override
37+
public WorkflowClientCallsInterceptor workflowClientCallsInterceptor(
38+
WorkflowClientCallsInterceptor next) {
39+
return new WorkflowClientCallsInterceptorBase(next) {
40+
@Override
41+
public ListWorkflowExecutionsOutput listWorkflowExecutions(
42+
ListWorkflowExecutionsInput input) {
43+
intercepted.incrementAndGet();
44+
return super.listWorkflowExecutions(input);
45+
}
46+
};
47+
}
48+
})
49+
.validateAndBuildWithDefaults());
50+
51+
WorkflowStub.fromTyped(testWorkflowRule.newWorkflowStub(TestWorkflows.NoArgsWorkflow.class))
52+
.start();
53+
54+
// Visibility API is eventually consistent
55+
Thread.sleep(2_000);
56+
java.util.List<WorkflowExecutionMetadata> result =
57+
workflowClient
58+
.listExecutions("TaskQueue='" + testWorkflowRule.getTaskQueue() + "'")
59+
.collect(java.util.stream.Collectors.toList());
60+
assertFalse(result.isEmpty());
61+
assertEquals(1, intercepted.get());
62+
}
63+
}

0 commit comments

Comments
 (0)