Skip to content

Commit bb585e9

Browse files
authored
[Fix #1226] Waiting for pending writes before close (#1227)
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent bcb099a commit bb585e9

2 files changed

Lines changed: 83 additions & 29 deletions

File tree

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.WorkflowContextData;
19+
import io.serverlessworkflow.impl.WorkflowDefinitionData;
20+
import java.util.Map;
21+
import java.util.Optional;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.concurrent.ExecutionException;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.function.Consumer;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
public abstract class AsyncPersistenceInstanceWriter extends AbstractPersistenceInstanceWriter {
31+
32+
private static final Logger logger =
33+
LoggerFactory.getLogger(AsyncPersistenceInstanceWriter.class);
34+
35+
private final Map<String, CompletableFuture<Void>> futuresMap = new ConcurrentHashMap<>();
36+
37+
@Override
38+
protected CompletableFuture<Void> doTransaction(
39+
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData context) {
40+
final ExecutorService service =
41+
executorService().orElse(context.definition().application().executorService());
42+
final Runnable runnable = () -> doTransaction(operation, context.definition());
43+
return futuresMap.compute(
44+
context.instanceData().id(),
45+
(k, v) ->
46+
v == null
47+
? CompletableFuture.runAsync(runnable, service)
48+
: v.thenRunAsync(runnable, service));
49+
}
50+
51+
@Override
52+
protected CompletableFuture<Void> removeProcessInstance(WorkflowContextData workflowContext) {
53+
return super.removeProcessInstance(workflowContext)
54+
.thenRun(() -> futuresMap.remove(workflowContext.instanceData().id()));
55+
}
56+
57+
protected abstract void doTransaction(
58+
Consumer<PersistenceInstanceOperations> operation, WorkflowDefinitionData definition);
59+
60+
protected Optional<ExecutorService> executorService() {
61+
return Optional.empty();
62+
}
63+
64+
@Override
65+
public void close() {
66+
for (CompletableFuture<Void> future : futuresMap.values()) {
67+
try {
68+
future.get();
69+
} catch (InterruptedException ex) {
70+
logger.warn("Thread interrupted while writing to db", ex);
71+
Thread.currentThread().interrupt();
72+
} catch (ExecutionException ex) {
73+
logger.warn("Exception while writing to db", ex.getCause());
74+
}
75+
}
76+
futuresMap.clear();
77+
}
78+
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,53 +15,29 @@
1515
*/
1616
package io.serverlessworkflow.impl.persistence;
1717

18-
import io.serverlessworkflow.impl.WorkflowContextData;
1918
import io.serverlessworkflow.impl.WorkflowDefinitionData;
20-
import java.util.Map;
2119
import java.util.Optional;
22-
import java.util.concurrent.CompletableFuture;
23-
import java.util.concurrent.ConcurrentHashMap;
2420
import java.util.concurrent.ExecutorService;
2521
import java.util.function.Consumer;
2622
import org.slf4j.Logger;
2723
import org.slf4j.LoggerFactory;
2824

29-
public class DefaultPersistenceInstanceWriter extends AbstractPersistenceInstanceWriter {
25+
public class DefaultPersistenceInstanceWriter extends AsyncPersistenceInstanceWriter {
3026

3127
private final PersistenceInstanceStore store;
32-
private final Map<String, CompletableFuture<Void>> futuresMap = new ConcurrentHashMap<>();
3328
private final Optional<ExecutorService> executorService;
3429

3530
private static final Logger logger =
3631
LoggerFactory.getLogger(DefaultPersistenceInstanceWriter.class);
3732

3833
protected DefaultPersistenceInstanceWriter(
3934
PersistenceInstanceStore store, Optional<ExecutorService> executorService) {
40-
this.store = store;
4135
this.executorService = executorService;
36+
this.store = store;
4237
}
4338

4439
@Override
45-
protected CompletableFuture<Void> removeProcessInstance(WorkflowContextData workflowContext) {
46-
return super.removeProcessInstance(workflowContext)
47-
.thenRun(() -> futuresMap.remove(workflowContext.instanceData().id()));
48-
}
49-
50-
@Override
51-
protected CompletableFuture<Void> doTransaction(
52-
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData context) {
53-
final ExecutorService service =
54-
this.executorService.orElse(context.definition().application().executorService());
55-
final Runnable runnable = () -> executeTransaction(operation, context.definition());
56-
return futuresMap.compute(
57-
context.instanceData().id(),
58-
(k, v) ->
59-
v == null
60-
? CompletableFuture.runAsync(runnable, service)
61-
: v.thenRunAsync(runnable, service));
62-
}
63-
64-
private void executeTransaction(
40+
protected void doTransaction(
6541
Consumer<PersistenceInstanceOperations> operation, WorkflowDefinitionData definition) {
6642
PersistenceInstanceTransaction transaction = store.begin();
6743
try {
@@ -78,7 +54,7 @@ private void executeTransaction(
7854
}
7955

8056
@Override
81-
public void close() {
82-
futuresMap.clear();
57+
protected Optional<ExecutorService> executorService() {
58+
return executorService;
8359
}
8460
}

0 commit comments

Comments
 (0)