Skip to content

Commit 9bef2f0

Browse files
authored
More flexibility for implementors of persistence plugin (#1186)
Operations is now a parent interface of transactions. This allow more flexibility in commit strategies without having to duplicate default writer logic. Test now expect a PersistenceHandlers class rather than a store. Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 055c2be commit 9bef2f0

File tree

8 files changed

+362
-249
lines changed

8 files changed

+362
-249
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.WorkflowDefinition;
19+
import io.serverlessworkflow.impl.WorkflowInstance;
20+
import java.util.Optional;
21+
import java.util.stream.Stream;
22+
23+
public abstract class AbstractPersistenceInstanceReader implements PersistenceInstanceReader {
24+
25+
protected final Stream<WorkflowInstance> scanAll(
26+
PersistenceInstanceOperations operations,
27+
WorkflowDefinition definition,
28+
String applicationId) {
29+
return operations
30+
.scanAll(applicationId, definition)
31+
.map(v -> new WorkflowPersistenceInstance(definition, v));
32+
}
33+
34+
protected final Optional<WorkflowInstance> find(
35+
PersistenceInstanceOperations operations, WorkflowDefinition definition, String instanceId) {
36+
return operations
37+
.readWorkflowInfo(definition, instanceId)
38+
.map(i -> new WorkflowPersistenceInstance(definition, i));
39+
}
40+
41+
@Override
42+
public void close() {}
43+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.TaskContextData;
19+
import io.serverlessworkflow.impl.WorkflowContextData;
20+
import io.serverlessworkflow.impl.WorkflowStatus;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.function.Consumer;
23+
24+
public abstract class AbstractPersistenceInstanceWriter implements PersistenceInstanceWriter {
25+
26+
@Override
27+
public CompletableFuture<Void> started(WorkflowContextData workflowContext) {
28+
return doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext);
29+
}
30+
31+
@Override
32+
public CompletableFuture<Void> completed(WorkflowContextData workflowContext) {
33+
return removeProcessInstance(workflowContext);
34+
}
35+
36+
@Override
37+
public CompletableFuture<Void> failed(WorkflowContextData workflowContext, Throwable ex) {
38+
return removeProcessInstance(workflowContext);
39+
}
40+
41+
@Override
42+
public CompletableFuture<Void> aborted(WorkflowContextData workflowContext) {
43+
return removeProcessInstance(workflowContext);
44+
}
45+
46+
protected CompletableFuture<Void> removeProcessInstance(WorkflowContextData workflowContext) {
47+
return doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext);
48+
}
49+
50+
@Override
51+
public CompletableFuture<Void> taskStarted(
52+
WorkflowContextData workflowContext, TaskContextData taskContext) {
53+
return CompletableFuture.completedFuture(null);
54+
}
55+
56+
@Override
57+
public CompletableFuture<Void> taskRetried(
58+
WorkflowContextData workflowContext, TaskContextData taskContext) {
59+
return doTransaction(t -> t.writeRetryTask(workflowContext, taskContext), workflowContext);
60+
}
61+
62+
@Override
63+
public CompletableFuture<Void> taskCompleted(
64+
WorkflowContextData workflowContext, TaskContextData taskContext) {
65+
return doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext), workflowContext);
66+
}
67+
68+
@Override
69+
public CompletableFuture<Void> suspended(WorkflowContextData workflowContext) {
70+
return doTransaction(
71+
t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED), workflowContext);
72+
}
73+
74+
@Override
75+
public CompletableFuture<Void> resumed(WorkflowContextData workflowContext) {
76+
return doTransaction(t -> t.clearStatus(workflowContext), workflowContext);
77+
}
78+
79+
@Override
80+
public void close() throws Exception {}
81+
82+
protected abstract CompletableFuture<Void> doTransaction(
83+
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData context);
84+
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.Optional;
2121
import java.util.stream.Stream;
2222

23-
public class DefaultPersistenceInstanceReader implements PersistenceInstanceReader {
23+
public class DefaultPersistenceInstanceReader extends AbstractPersistenceInstanceReader {
2424

2525
private final PersistenceInstanceStore store;
2626

@@ -32,7 +32,7 @@ protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) {
3232
public Optional<WorkflowInstance> find(WorkflowDefinition definition, String instanceId) {
3333
PersistenceInstanceTransaction transaction = store.begin();
3434
try {
35-
Optional<WorkflowInstance> instance = read(transaction, definition, instanceId);
35+
Optional<WorkflowInstance> instance = find(transaction, definition, instanceId);
3636
transaction.commit(definition);
3737
return instance;
3838
} catch (Exception ex) {
@@ -41,21 +41,10 @@ public Optional<WorkflowInstance> find(WorkflowDefinition definition, String ins
4141
}
4242
}
4343

44-
private Optional<WorkflowInstance> read(
45-
PersistenceInstanceTransaction t, WorkflowDefinition definition, String instanceId) {
46-
return t.readWorkflowInfo(definition, instanceId)
47-
.map(i -> new WorkflowPersistenceInstance(definition, i));
48-
}
49-
5044
@Override
5145
public Stream<WorkflowInstance> scanAll(WorkflowDefinition definition, String applicationId) {
5246
PersistenceInstanceTransaction transaction = store.begin();
53-
return transaction
54-
.scanAll(applicationId, definition)
55-
.onClose(() -> transaction.commit(definition))
56-
.map(v -> new WorkflowPersistenceInstance(definition, v));
47+
return super.scanAll(transaction, definition, applicationId)
48+
.onClose(() -> transaction.commit(definition));
5749
}
58-
59-
@Override
60-
public void close() {}
6150
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java

Lines changed: 15 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,28 @@
1515
*/
1616
package io.serverlessworkflow.impl.persistence;
1717

18-
import io.serverlessworkflow.impl.TaskContextData;
1918
import io.serverlessworkflow.impl.WorkflowContextData;
2019
import io.serverlessworkflow.impl.WorkflowDefinitionData;
21-
import io.serverlessworkflow.impl.WorkflowStatus;
2220
import java.time.Duration;
2321
import java.util.Map;
2422
import java.util.Optional;
2523
import java.util.concurrent.CompletableFuture;
2624
import java.util.concurrent.ConcurrentHashMap;
2725
import java.util.concurrent.ExecutorService;
28-
import java.util.concurrent.TimeUnit;
2926
import java.util.function.Consumer;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
3029

31-
public class DefaultPersistenceInstanceWriter implements PersistenceInstanceWriter {
30+
public class DefaultPersistenceInstanceWriter extends AbstractPersistenceInstanceWriter {
3231

3332
private final PersistenceInstanceStore store;
3433
private final Map<String, CompletableFuture<Void>> futuresMap = new ConcurrentHashMap<>();
3534
private final Optional<ExecutorService> executorService;
3635
private final Duration closeTimeout;
3736

37+
private static final Logger logger =
38+
LoggerFactory.getLogger(DefaultPersistenceInstanceWriter.class);
39+
3840
protected DefaultPersistenceInstanceWriter(
3941
PersistenceInstanceStore store,
4042
Optional<ExecutorService> executorService,
@@ -45,61 +47,14 @@ protected DefaultPersistenceInstanceWriter(
4547
}
4648

4749
@Override
48-
public CompletableFuture<Void> started(WorkflowContextData workflowContext) {
49-
return doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext);
50-
}
51-
52-
@Override
53-
public CompletableFuture<Void> completed(WorkflowContextData workflowContext) {
54-
return removeProcessInstance(workflowContext);
55-
}
56-
57-
@Override
58-
public CompletableFuture<Void> failed(WorkflowContextData workflowContext, Throwable ex) {
59-
return removeProcessInstance(workflowContext);
60-
}
61-
62-
@Override
63-
public CompletableFuture<Void> aborted(WorkflowContextData workflowContext) {
64-
return removeProcessInstance(workflowContext);
65-
}
66-
6750
protected CompletableFuture<Void> removeProcessInstance(WorkflowContextData workflowContext) {
68-
return doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext)
51+
return super.removeProcessInstance(workflowContext)
6952
.thenRun(() -> futuresMap.remove(workflowContext.instanceData().id()));
7053
}
7154

7255
@Override
73-
public CompletableFuture<Void> taskStarted(
74-
WorkflowContextData workflowContext, TaskContextData taskContext) {
75-
return CompletableFuture.completedFuture(null);
76-
}
77-
78-
@Override
79-
public CompletableFuture<Void> taskRetried(
80-
WorkflowContextData workflowContext, TaskContextData taskContext) {
81-
return doTransaction(t -> t.writeRetryTask(workflowContext, taskContext), workflowContext);
82-
}
83-
84-
@Override
85-
public CompletableFuture<Void> taskCompleted(
86-
WorkflowContextData workflowContext, TaskContextData taskContext) {
87-
return doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext), workflowContext);
88-
}
89-
90-
@Override
91-
public CompletableFuture<Void> suspended(WorkflowContextData workflowContext) {
92-
return doTransaction(
93-
t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED), workflowContext);
94-
}
95-
96-
@Override
97-
public CompletableFuture<Void> resumed(WorkflowContextData workflowContext) {
98-
return doTransaction(t -> t.clearStatus(workflowContext), workflowContext);
99-
}
100-
101-
private CompletableFuture<Void> doTransaction(
102-
Consumer<PersistenceInstanceTransaction> operation, WorkflowContextData context) {
56+
protected CompletableFuture<Void> doTransaction(
57+
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData context) {
10358
final ExecutorService service =
10459
this.executorService.orElse(context.definition().application().executorService());
10560
final Runnable runnable = () -> executeTransaction(operation, context.definition());
@@ -112,28 +67,23 @@ private CompletableFuture<Void> doTransaction(
11267
}
11368

11469
private void executeTransaction(
115-
Consumer<PersistenceInstanceTransaction> operation, WorkflowDefinitionData definition) {
70+
Consumer<PersistenceInstanceOperations> operation, WorkflowDefinitionData definition) {
11671
PersistenceInstanceTransaction transaction = store.begin();
11772
try {
11873
operation.accept(transaction);
11974
transaction.commit(definition);
12075
} catch (Exception ex) {
121-
transaction.rollback(definition);
76+
try {
77+
transaction.rollback(definition);
78+
} catch (Exception rollEx) {
79+
logger.warn("Exception during rollback. Ignoring it", ex);
80+
}
12281
throw ex;
12382
}
12483
}
12584

12685
@Override
12786
public void close() {
12887
futuresMap.clear();
129-
executorService.ifPresent(
130-
e -> {
131-
try {
132-
e.awaitTermination(closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
133-
e.shutdown();
134-
} catch (InterruptedException ex) {
135-
Thread.currentThread().interrupt();
136-
}
137-
});
13888
}
13989
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.TaskContextData;
19+
import io.serverlessworkflow.impl.WorkflowContextData;
20+
import io.serverlessworkflow.impl.WorkflowDefinition;
21+
import io.serverlessworkflow.impl.WorkflowStatus;
22+
import java.util.Optional;
23+
import java.util.stream.Stream;
24+
25+
public interface PersistenceInstanceOperations {
26+
void writeInstanceData(WorkflowContextData workflowContext);
27+
28+
void writeRetryTask(WorkflowContextData workflowContext, TaskContextData taskContext);
29+
30+
void writeCompletedTask(WorkflowContextData workflowContext, TaskContextData taskContext);
31+
32+
void writeStatus(WorkflowContextData workflowContext, WorkflowStatus suspended);
33+
34+
void removeProcessInstance(WorkflowContextData workflowContext);
35+
36+
void clearStatus(WorkflowContextData workflowContext);
37+
38+
Stream<PersistenceWorkflowInfo> scanAll(String applicationId, WorkflowDefinition definition);
39+
40+
Optional<PersistenceWorkflowInfo> readWorkflowInfo(
41+
WorkflowDefinition definition, String instanceId);
42+
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,34 +15,11 @@
1515
*/
1616
package io.serverlessworkflow.impl.persistence;
1717

18-
import io.serverlessworkflow.impl.TaskContextData;
19-
import io.serverlessworkflow.impl.WorkflowContextData;
20-
import io.serverlessworkflow.impl.WorkflowDefinition;
2118
import io.serverlessworkflow.impl.WorkflowDefinitionData;
22-
import io.serverlessworkflow.impl.WorkflowStatus;
23-
import java.util.Optional;
24-
import java.util.stream.Stream;
2519

26-
public interface PersistenceInstanceTransaction {
20+
public interface PersistenceInstanceTransaction extends PersistenceInstanceOperations {
2721

2822
void commit(WorkflowDefinitionData definition);
2923

3024
void rollback(WorkflowDefinitionData definition);
31-
32-
void writeInstanceData(WorkflowContextData workflowContext);
33-
34-
void writeRetryTask(WorkflowContextData workflowContext, TaskContextData taskContext);
35-
36-
void writeCompletedTask(WorkflowContextData workflowContext, TaskContextData taskContext);
37-
38-
void writeStatus(WorkflowContextData workflowContext, WorkflowStatus suspended);
39-
40-
void removeProcessInstance(WorkflowContextData workflowContext);
41-
42-
void clearStatus(WorkflowContextData workflowContext);
43-
44-
Stream<PersistenceWorkflowInfo> scanAll(String applicationId, WorkflowDefinition definition);
45-
46-
Optional<PersistenceWorkflowInfo> readWorkflowInfo(
47-
WorkflowDefinition definition, String instanceId);
4825
}

0 commit comments

Comments
 (0)