Skip to content

Commit 7835a7c

Browse files
Support persistence locking in RPC (#206)
1 parent 33cb531 commit 7835a7c

7 files changed

Lines changed: 117 additions & 8 deletions

File tree

script/.env

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ CASSANDRA_VERSION=3.11.9
22
ELASTICSEARCH_VERSION=7.16.2
33
MYSQL_VERSION=8
44
POSTGRESQL_VERSION=13
5-
TEMPORAL_VERSION=1.20.1
6-
TEMPORAL_UI_VERSION=2.10.1
5+
TEMPORAL_VERSION=1.21.4
6+
TEMPORAL_UI_VERSION=2.18.0

script/dynamicconfig/development-cass.yaml

Lines changed: 0 additions & 3 deletions
This file was deleted.

script/dynamicconfig/development-sql.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,5 @@ limit.maxIDLength:
44
system.forceSearchAttributesCacheRefreshOnRead:
55
- value: true # Dev setup only. Please don't turn this on in production.
66
constraints: {}
7+
frontend.enableUpdateWorkflowExecution:
8+
- value: true

src/main/java/io/iworkflow/core/RPC.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,15 @@
2525
// used when dataAttributesLoadingType is PARTIAL_WITHOUT_LOCKING
2626
String[] dataAttributesPartialLoadingKeys() default {};
2727

28+
String[] dataAttributesLockingKeys() default {};
29+
2830
PersistenceLoadingType searchAttributesLoadingType() default PersistenceLoadingType.ALL_WITHOUT_LOCKING;
2931

3032
// used when searchAttributesPartialLoadingKeys is PARTIAL_WITHOUT_LOCKING
3133
String[] searchAttributesPartialLoadingKeys() default {};
3234

35+
String[] searchAttributesLockingKeys() default {};
36+
3337
/**
3438
* Only used when workflow has enabled {@link PersistenceOptions} CachingPersistenceByMemo
3539
* By default, it's false for high throughput support

src/main/java/io/iworkflow/core/RpcInvocationHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@ public Object intercept(@AllArguments Object[] allArguments,
5656
final Object output = unregisteredClient.invokeRpc(outputType, input, workflowId, workflowRunId, method.getName(), rpcAnno.timeoutSeconds(),
5757
new PersistenceLoadingPolicy()
5858
.persistenceLoadingType(rpcAnno.dataAttributesLoadingType())
59-
.partialLoadingKeys(Arrays.asList(rpcAnno.dataAttributesPartialLoadingKeys())),
59+
.partialLoadingKeys(Arrays.asList(rpcAnno.dataAttributesPartialLoadingKeys()))
60+
.lockingKeys(Arrays.asList(rpcAnno.dataAttributesLockingKeys())),
6061
new PersistenceLoadingPolicy()
6162
.persistenceLoadingType(rpcAnno.searchAttributesLoadingType())
63+
.lockingKeys(Arrays.asList(rpcAnno.searchAttributesLockingKeys()))
6264
.partialLoadingKeys(Arrays.asList(rpcAnno.searchAttributesPartialLoadingKeys())),
6365
useMemo,
6466
searchAttributeKeyAndTypes

src/test/java/io/iworkflow/integ/RpcTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
import io.iworkflow.core.ClientOptions;
66
import io.iworkflow.core.ClientSideException;
77
import io.iworkflow.core.ImmutableStopWorkflowOptions;
8+
import io.iworkflow.core.ImmutableWorkflowOptions;
89
import io.iworkflow.gen.models.ErrorResponse;
10+
import io.iworkflow.gen.models.WorkflowConfig;
911
import io.iworkflow.gen.models.WorkflowStopType;
1012
import io.iworkflow.integ.persistence.BasicPersistenceWorkflow;
1113
import io.iworkflow.integ.rpc.NoStateWorkflow;
@@ -17,9 +19,13 @@
1719
import org.junit.jupiter.api.BeforeEach;
1820
import org.junit.jupiter.api.Test;
1921

22+
import java.util.ArrayList;
2023
import java.util.Arrays;
2124
import java.util.Map;
2225
import java.util.concurrent.ExecutionException;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.Future;
2329

2430
import static io.iworkflow.integ.persistence.BasicPersistenceWorkflow.TEST_SEARCH_ATTRIBUTE_INT;
2531
import static io.iworkflow.integ.persistence.BasicPersistenceWorkflow.TEST_SEARCH_ATTRIBUTE_KEYWORD;
@@ -37,6 +43,63 @@ public void setup() throws ExecutionException, InterruptedException {
3743
TestSingletonWorkerService.startWorkerIfNotUp();
3844
}
3945

46+
@Test
47+
public void testRPCLocking() throws InterruptedException, ExecutionException {
48+
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
49+
final String wfId = "testRPCLocking" + System.currentTimeMillis() / 1000;
50+
client.startWorkflow(
51+
NoStateWorkflow.class, wfId, 1000, 999,
52+
ImmutableWorkflowOptions.builder()
53+
.workflowConfigOverride(
54+
new WorkflowConfig()
55+
.continueAsNewThreshold(1)
56+
)
57+
.build());
58+
59+
final NoStateWorkflow rpcStub = client.newRpcStub(NoStateWorkflow.class, wfId, "");
60+
61+
ExecutorService executor = Executors.newFixedThreadPool(10);
62+
final ArrayList<Future<String>> futures = new ArrayList<>();
63+
int total = 1000;
64+
for (int i = 0; i < total; i++) {
65+
66+
final Future<String> future = executor.submit(() -> {
67+
try {
68+
return client.invokeRPC(rpcStub::increaseCounter);
69+
} catch (ClientSideException e) {
70+
if (e.getStatusCode() != 450) {
71+
throw e;
72+
}
73+
}
74+
return "fail";
75+
}
76+
);
77+
futures.add(future);
78+
}
79+
80+
int succ = 0;
81+
for (int i = 0; i < total; i++) {
82+
;
83+
try {
84+
final String done = futures.get(i).get();
85+
if (done.equals("done")) {
86+
succ++;
87+
}
88+
} catch (Exception ignored) {
89+
}
90+
}
91+
92+
Assertions.assertTrue(succ > 0);
93+
Assertions.assertEquals(succ, client.invokeRPC(rpcStub::getCounter));
94+
95+
executor.shutdown();
96+
97+
// TODO make sure continue as new is happening when no state is executed
98+
// https://github.com/indeedeng/iwf/issues/339
99+
100+
client.stopWorkflow(wfId, null);
101+
}
102+
40103
@Test
41104
public void testRPCWorkflowFunc1() throws InterruptedException {
42105
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);

src/test/java/io/iworkflow/integ/rpc/NoStateWorkflow.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,64 @@
44
import io.iworkflow.core.ObjectWorkflow;
55
import io.iworkflow.core.RPC;
66
import io.iworkflow.core.communication.Communication;
7+
import io.iworkflow.core.persistence.DataAttributeDef;
78
import io.iworkflow.core.persistence.Persistence;
9+
import io.iworkflow.core.persistence.PersistenceFieldDef;
10+
import io.iworkflow.gen.models.PersistenceLoadingType;
811
import org.springframework.beans.factory.annotation.Autowired;
912
import org.springframework.stereotype.Component;
1013

14+
import java.util.Arrays;
15+
import java.util.List;
16+
1117
import static io.iworkflow.integ.RpcTest.RPC_OUTPUT;
1218

1319
@Component
1420
public class NoStateWorkflow implements ObjectWorkflow {
1521

22+
public static final String DA_COUNTER = "counter";
1623
private RpcWorkflow rpcWorkflow;
1724
private NoStartStateWorkflow noStartStateWorkflow;
1825

1926
@Autowired
20-
public NoStateWorkflow(RpcWorkflow rpcWorkflow, NoStartStateWorkflow noStartStateWorkflow ){
21-
this.rpcWorkflow=rpcWorkflow;
27+
public NoStateWorkflow(RpcWorkflow rpcWorkflow, NoStartStateWorkflow noStartStateWorkflow) {
28+
this.rpcWorkflow = rpcWorkflow;
2229
this.noStartStateWorkflow = noStartStateWorkflow;
2330
}
31+
32+
@Override
33+
public List<PersistenceFieldDef> getPersistenceSchema() {
34+
return Arrays.asList(
35+
DataAttributeDef.create(Integer.class, DA_COUNTER)
36+
);
37+
}
38+
39+
@RPC(
40+
dataAttributesLoadingType = PersistenceLoadingType.PARTIAL_WITH_EXCLUSIVE_LOCK,
41+
dataAttributesPartialLoadingKeys = {DA_COUNTER},
42+
dataAttributesLockingKeys = {DA_COUNTER}
43+
)
44+
public String increaseCounter(Context context, Persistence persistence, Communication communication) {
45+
Integer current = persistence.getDataAttribute(DA_COUNTER, Integer.class);
46+
if (current == null) {
47+
current = 0;
48+
}
49+
current++;
50+
persistence.setDataAttribute(DA_COUNTER, current);
51+
return "done";
52+
}
53+
54+
@RPC
55+
public String testWrite(Context context, Persistence persistence, Communication communication) {
56+
persistence.setDataAttribute(DA_COUNTER, 123);
57+
return "done";
58+
}
59+
60+
@RPC
61+
public Integer getCounter(Context context, Persistence persistence, Communication communication) {
62+
return persistence.getDataAttribute(DA_COUNTER, Integer.class);
63+
}
64+
2465
@RPC
2566
public Long testRpcFunc1(Context context, String input, Persistence persistence, Communication communication) {
2667
if (context.getWorkflowId().isEmpty() || context.getWorkflowRunId().isEmpty()) {

0 commit comments

Comments
 (0)