Skip to content

Commit d598724

Browse files
committed
RequestIdInfo and links changes in test server
1 parent 6c961a0 commit d598724

9 files changed

Lines changed: 224 additions & 36 deletions

File tree

temporal-test-server/src/main/java/io/temporal/internal/testservice/RequestContext.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@
1111
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowTask;
1212
import io.temporal.workflow.Functions;
1313
import java.time.Duration;
14-
import java.util.ArrayList;
15-
import java.util.List;
16-
import java.util.Objects;
14+
import java.util.*;
1715
import java.util.function.LongSupplier;
1816
import javax.annotation.Nonnull;
1917
import javax.annotation.Nullable;

temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1346,7 +1346,9 @@ private static void startWorkflow(
13461346
if (request.hasUserMetadata()) {
13471347
event.setUserMetadata(request.getUserMetadata());
13481348
}
1349-
ctx.addEvent(event.build());
1349+
long eventId = ctx.addEvent(event.build());
1350+
ctx.getWorkflowMutableState()
1351+
.attachRequestId(request.getRequestId(), event.getEventType(), eventId);
13501352
}
13511353

13521354
private static void completeWorkflow(

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,18 @@
55
import io.temporal.api.common.v1.Callback;
66
import io.temporal.api.common.v1.Payload;
77
import io.temporal.api.common.v1.Payloads;
8+
import io.temporal.api.enums.v1.EventType;
89
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
910
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
1011
import io.temporal.api.failure.v1.Failure;
1112
import io.temporal.api.history.v1.*;
1213
import io.temporal.api.nexus.v1.Link;
1314
import io.temporal.api.nexus.v1.StartOperationResponse;
1415
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
16+
import io.temporal.api.workflow.v1.RequestIdInfo;
1517
import io.temporal.api.workflowservice.v1.*;
1618
import java.util.List;
19+
import java.util.Map;
1720
import java.util.Optional;
1821
import java.util.function.Consumer;
1922
import javax.annotation.Nonnull;
@@ -132,7 +135,11 @@ PollWorkflowExecutionUpdateResponse pollUpdateWorkflowExecution(
132135

133136
boolean isTerminalState();
134137

135-
boolean isRequestIdAttached(String requestId);
138+
RequestIdInfo getRequestIdInfo(String requestId);
139+
140+
void attachRequestId(@Nonnull String requestId, EventType eventType, long eventId);
136141

137142
List<Callback> getCompletionCallbacks();
143+
144+
void updateRequestIdToEventId(Map<String, RequestIdInfo> requestIdToEventId);
138145
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static io.temporal.internal.testservice.StateMachines.*;
88
import static io.temporal.internal.testservice.StateUtils.mergeMemo;
99
import static io.temporal.internal.testservice.TestServiceRetryState.validateAndOverrideRetryPolicy;
10+
import static io.temporal.internal.testservice.TestWorkflowStore.BUFFERED_EVENT_ID;
1011

1112
import com.google.common.base.Preconditions;
1213
import com.google.common.base.Strings;
@@ -118,7 +119,7 @@ private interface UpdateProcedure {
118119
new ConcurrentHashMap<>();
119120
public StickyExecutionAttributes stickyExecutionAttributes;
120121
private Map<String, Payload> currentMemo;
121-
private final Set<String> attachedRequestIds = new HashSet<>();
122+
private final Map<String, RequestIdInfo> requestIdInfos = new HashMap<>();
122123
private final List<Callback> completionCallbacks = new ArrayList<>();
123124

124125
/**
@@ -2044,8 +2045,19 @@ public boolean isTerminalState() {
20442045
}
20452046

20462047
@Override
2047-
public boolean isRequestIdAttached(@Nonnull String requestId) {
2048-
return attachedRequestIds.contains(requestId);
2048+
public RequestIdInfo getRequestIdInfo(@Nonnull String requestId) {
2049+
return this.requestIdInfos.get(requestId);
2050+
}
2051+
2052+
@Override
2053+
public void attachRequestId(@Nonnull String requestId, EventType eventType, long eventId) {
2054+
this.requestIdInfos.put(
2055+
requestId,
2056+
RequestIdInfo.newBuilder()
2057+
.setEventType(eventType)
2058+
.setEventId(eventId)
2059+
.setBuffered(eventId == BUFFERED_EVENT_ID)
2060+
.build());
20492061
}
20502062

20512063
private void updateHeartbeatTimer(
@@ -3208,13 +3220,25 @@ private DescribeWorkflowExecutionResponse describeWorkflowExecutionInsideLock()
32083220
.map(TestWorkflowMutableStateImpl::constructPendingChildExecutionInfo)
32093221
.collect(Collectors.toList());
32103222

3223+
WorkflowExecutionExtendedInfo.Builder extendedInfo = WorkflowExecutionExtendedInfo.newBuilder();
3224+
extendedInfo.putAllRequestIdInfos(
3225+
this.requestIdInfos.entrySet().stream()
3226+
.collect(
3227+
Collectors.toMap(
3228+
Map.Entry::getKey,
3229+
e ->
3230+
e.getValue().toBuilder()
3231+
.setEventId(e.getValue().getBuffered() ? 0 : e.getValue().getEventId())
3232+
.build())));
3233+
32113234
return DescribeWorkflowExecutionResponse.newBuilder()
32123235
.setExecutionConfig(executionConfig)
32133236
.setWorkflowExecutionInfo(executionInfo)
32143237
.addAllPendingActivities(pendingActivities)
32153238
.addAllPendingNexusOperations(pendingNexusOperations)
32163239
.addAllPendingChildren(pendingChildren)
32173240
.addAllCallbacks(callbacks)
3241+
.setWorkflowExtendedInfo(extendedInfo)
32183242
.build();
32193243
}
32203244

@@ -3502,26 +3526,26 @@ private void addExecutionSignaledByExternalEvent(
35023526

35033527
private void addWorkflowExecutionOptionsUpdatedEvent(
35043528
RequestContext ctx, String requestId, List<Callback> completionCallbacks, List<Link> links) {
3529+
HistoryEvent.Builder event =
3530+
HistoryEvent.newBuilder()
3531+
.setWorkerMayIgnore(true)
3532+
.setEventType(EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED);
3533+
35053534
WorkflowExecutionOptionsUpdatedEventAttributes.Builder attrs =
35063535
WorkflowExecutionOptionsUpdatedEventAttributes.newBuilder();
35073536
if (requestId != null) {
35083537
attrs.setAttachedRequestId(requestId);
3509-
this.attachedRequestIds.add(requestId);
3538+
this.attachRequestId(requestId, event.getEventType(), BUFFERED_EVENT_ID);
35103539
}
35113540
if (completionCallbacks != null) {
35123541
attrs.addAllAttachedCompletionCallbacks(completionCallbacks);
35133542
this.completionCallbacks.addAll(completionCallbacks);
35143543
}
35153544

3516-
HistoryEvent.Builder event =
3517-
HistoryEvent.newBuilder()
3518-
.setWorkerMayIgnore(true)
3519-
.setEventType(EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED)
3520-
.setWorkflowExecutionOptionsUpdatedEventAttributes(attrs);
3545+
event.setWorkflowExecutionOptionsUpdatedEventAttributes(attrs);
35213546
if (links != null) {
35223547
event.addAllLinks(links);
35233548
}
3524-
35253549
ctx.addEvent(event.build());
35263550
}
35273551

@@ -3656,4 +3680,9 @@ private boolean isTerminalState(State workflowState) {
36563680
public List<Callback> getCompletionCallbacks() {
36573681
return completionCallbacks;
36583682
}
3683+
3684+
@Override
3685+
public void updateRequestIdToEventId(Map<String, RequestIdInfo> requestIdToEventId) {
3686+
requestIdInfos.putAll(requestIdToEventId);
3687+
}
36593688
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java

Lines changed: 82 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.nexusrpc.Header;
1919
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
2020
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
21+
import io.temporal.api.common.v1.Link;
2122
import io.temporal.api.common.v1.Payload;
2223
import io.temporal.api.common.v1.Payloads;
2324
import io.temporal.api.common.v1.RetryPolicy;
@@ -36,6 +37,7 @@
3637
import io.temporal.api.testservice.v1.TestServiceGrpc;
3738
import io.temporal.api.testservice.v1.UnlockTimeSkippingRequest;
3839
import io.temporal.api.workflow.v1.OnConflictOptions;
40+
import io.temporal.api.workflow.v1.RequestIdInfo;
3941
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
4042
import io.temporal.api.workflowservice.v1.*;
4143
import io.temporal.internal.common.ProtoUtils;
@@ -275,13 +277,26 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl(
275277
case WORKFLOW_ID_CONFLICT_POLICY_FAIL:
276278
return throwDuplicatedWorkflow(startRequest, existing);
277279
case WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING:
280+
final String existingRunId = existing.getExecutionId().getExecution().getRunId();
281+
StartWorkflowExecutionResponse.Builder response =
282+
StartWorkflowExecutionResponse.newBuilder()
283+
.setStarted(false)
284+
.setRunId(existingRunId);
278285
if (startRequest.hasOnConflictOptions()) {
279286
existing.applyOnConflictOptions(startRequest);
287+
response.setLink(
288+
generateRequestIdRefLink(
289+
startRequest.getNamespace(),
290+
startRequest.getWorkflowId(),
291+
existingRunId,
292+
startRequest.getRequestId(),
293+
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED));
294+
} else {
295+
response.setLink(
296+
generateStartEventRefLink(
297+
startRequest.getNamespace(), startRequest.getWorkflowId(), existingRunId));
280298
}
281-
return StartWorkflowExecutionResponse.newBuilder()
282-
.setStarted(false)
283-
.setRunId(existing.getExecutionId().getExecution().getRunId())
284-
.build();
299+
return response.build();
285300
case WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING:
286301
existing.terminateWorkflowExecution(
287302
TerminateWorkflowExecutionRequest.newBuilder()
@@ -456,7 +471,14 @@ private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocke
456471
mutableState.startWorkflow(
457472
continuedExecutionRunId.isPresent(), eagerWorkflowTaskPollRequest, withStart);
458473
StartWorkflowExecutionResponse.Builder response =
459-
StartWorkflowExecutionResponse.newBuilder().setRunId(execution.getRunId()).setStarted(true);
474+
StartWorkflowExecutionResponse.newBuilder()
475+
.setRunId(execution.getRunId())
476+
.setStarted(true)
477+
.setLink(
478+
generateStartEventRefLink(
479+
startRequest.getNamespace(),
480+
startRequest.getWorkflowId(),
481+
execution.getRunId()));
460482
if (eagerWorkflowTask != null) {
461483
response.setEagerWorkflowTask(eagerWorkflowTask);
462484
}
@@ -952,7 +974,10 @@ public void respondNexusTaskFailed(
952974
}
953975

954976
public void completeNexusOperation(
955-
NexusOperationRef ref, String operationID, Link startLink, HistoryEvent completionEvent) {
977+
NexusOperationRef ref,
978+
String operationID,
979+
io.temporal.api.nexus.v1.Link startLink,
980+
HistoryEvent completionEvent) {
956981
TestWorkflowMutableState target = getMutableState(ref.getExecutionId());
957982

958983
switch (completionEvent.getEventType()) {
@@ -2083,26 +2108,64 @@ public WorkflowServiceStubs newClientStub() {
20832108

20842109
private StartWorkflowExecutionResponse dedupeRequest(
20852110
StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existingWorkflow) {
2086-
String requestId = startRequest.getRequestId();
2087-
String existingRequestId = existingWorkflow.getStartRequest().getRequestId();
2088-
if (existingRequestId.equals(requestId)) {
2089-
return StartWorkflowExecutionResponse.newBuilder()
2090-
.setStarted(true)
2091-
.setRunId(existingWorkflow.getExecutionId().getExecution().getRunId())
2092-
.build();
2111+
final String requestId = startRequest.getRequestId();
2112+
final String existingRunId = existingWorkflow.getExecutionId().getExecution().getRunId();
2113+
final RequestIdInfo requestIdInfo = existingWorkflow.getRequestIdInfo(requestId);
2114+
if (requestIdInfo == null) {
2115+
return null;
20932116
}
2094-
2095-
if (existingWorkflow.isRequestIdAttached(requestId)) {
2117+
if (requestIdInfo.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) {
20962118
return StartWorkflowExecutionResponse.newBuilder()
2097-
.setStarted(false)
2098-
.setRunId(existingWorkflow.getExecutionId().getExecution().getRunId())
2119+
.setStarted(true)
2120+
.setRunId(existingRunId)
2121+
.setLink(
2122+
generateStartEventRefLink(
2123+
startRequest.getNamespace(), startRequest.getWorkflowId(), existingRunId))
20992124
.build();
21002125
}
2101-
2102-
return null;
2126+
return StartWorkflowExecutionResponse.newBuilder()
2127+
.setStarted(false)
2128+
.setRunId(existingRunId)
2129+
.setLink(
2130+
generateRequestIdRefLink(
2131+
startRequest.getNamespace(),
2132+
startRequest.getWorkflowId(),
2133+
existingRunId,
2134+
requestId,
2135+
requestIdInfo.getEventType()))
2136+
.build();
21032137
}
21042138

21052139
private static StatusRuntimeException createInvalidArgument(String description) {
21062140
throw Status.INVALID_ARGUMENT.withDescription(description).asRuntimeException();
21072141
}
2142+
2143+
private static Link generateStartEventRefLink(String namespace, String workflowId, String runId) {
2144+
return Link.newBuilder()
2145+
.setWorkflowEvent(
2146+
Link.WorkflowEvent.newBuilder()
2147+
.setNamespace(namespace)
2148+
.setWorkflowId(workflowId)
2149+
.setRunId(runId)
2150+
.setEventRef(
2151+
Link.WorkflowEvent.EventReference.newBuilder()
2152+
.setEventId(1)
2153+
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)))
2154+
.build();
2155+
}
2156+
2157+
private static Link generateRequestIdRefLink(
2158+
String namespace, String workflowId, String runId, String requestId, EventType eventType) {
2159+
return Link.newBuilder()
2160+
.setWorkflowEvent(
2161+
Link.WorkflowEvent.newBuilder()
2162+
.setNamespace(namespace)
2163+
.setWorkflowId(workflowId)
2164+
.setRunId(runId)
2165+
.setRequestIdRef(
2166+
Link.WorkflowEvent.RequestIdReference.newBuilder()
2167+
.setRequestId(requestId)
2168+
.setEventType(eventType)))
2169+
.build();
2170+
}
21082171
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowStore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ enum WorkflowState {
1818
CLOSED
1919
}
2020

21+
long BUFFERED_EVENT_ID = -123L;
22+
2123
class TaskQueueId {
2224

2325
private final String namespace;

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ private void checkNextEventId(long nextEventId) {
7979
}
8080
}
8181

82-
void addAllLocked(List<HistoryEvent> events, Timestamp eventTime) {
82+
List<HistoryEvent> addAllLocked(List<HistoryEvent> events, Timestamp eventTime) {
83+
int currentSize = history.size();
8384
for (HistoryEvent event : events) {
8485
HistoryEvent.Builder eBuilder = event.toBuilder();
8586
if (completed) {
@@ -94,6 +95,7 @@ void addAllLocked(List<HistoryEvent> events, Timestamp eventTime) {
9495
completed = completed || WorkflowExecutionUtils.isWorkflowExecutionClosedEvent(eBuilder);
9596
}
9697
newEventsCondition.signalAll();
98+
return history.subList(currentSize, history.size());
9799
}
98100

99101
long getNextEventIdLocked() {
@@ -174,10 +176,19 @@ public long save(RequestContext ctx) {
174176
histories.put(executionId, history);
175177
}
176178
history.checkNextEventId(ctx.getInitialEventId());
177-
history.addAllLocked(events, ctx.currentTime());
179+
List<HistoryEvent> newEvents = history.addAllLocked(events, ctx.currentTime());
178180
result = history.getNextEventIdLocked();
179181
selfAdvancingTimer.updateLocks(ctx.getTimerLocks());
180182
ctx.fireCallbacks(history.getEventsLocked().size());
183+
184+
TestWorkflowMutableState mutableState = ctx.getWorkflowMutableState();
185+
for (HistoryEvent event : newEvents) {
186+
if (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED) {
187+
final String requestId =
188+
event.getWorkflowExecutionOptionsUpdatedEventAttributes().getAttachedRequestId();
189+
mutableState.attachRequestId(requestId, event.getEventType(), event.getEventId());
190+
}
191+
}
181192
} finally {
182193
lock.unlock();
183194
}

temporal-test-server/src/test/java/io/temporal/testserver/functional/DescribeWorkflowAsserter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.temporal.api.common.v1.Payloads;
88
import io.temporal.api.common.v1.WorkflowExecution;
99
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
10+
import io.temporal.api.workflow.v1.RequestIdInfo;
1011
import io.temporal.api.workflow.v1.WorkflowExecutionConfig;
1112
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
1213
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
@@ -187,4 +188,12 @@ public DescribeWorkflowAsserter assertPendingChildrenCount(int expected) {
187188
"child workflow count should match", expected, actual.getPendingChildrenCount());
188189
return this;
189190
}
191+
192+
public DescribeWorkflowAsserter assertRequestIdInfos(Map<String, RequestIdInfo> expected) {
193+
Assert.assertEquals(
194+
"request id infos should match",
195+
expected,
196+
actual.getWorkflowExtendedInfo().getRequestIdInfosMap());
197+
return this;
198+
}
190199
}

0 commit comments

Comments
 (0)