Skip to content

Commit c8e2ac9

Browse files
authored
[Fix #1296] Prevent double loading of the same process instance (#1297)
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent fc6eb22 commit c8e2ac9

5 files changed

Lines changed: 28 additions & 3 deletions

File tree

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Map;
4040
import java.util.Objects;
4141
import java.util.Optional;
42+
import java.util.concurrent.ConcurrentHashMap;
4243

4344
public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData {
4445

@@ -56,6 +57,7 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
5657
private Cancellable everySchedule;
5758
private Cancellable cronSchedule;
5859
private Collection<WorkflowInstance> scheduledInstances = new ArrayList<>();
60+
private Map<String, WorkflowInstance> activeInstances = new ConcurrentHashMap<>();
5961

6062
private WorkflowDefinition(
6163
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
@@ -177,6 +179,18 @@ public void addScheduledInstance(WorkflowInstance workflowInstance) {
177179
scheduledInstances.add(workflowInstance);
178180
}
179181

182+
void removeInstance(WorkflowInstance instance) {
183+
activeInstances.remove(instance.id());
184+
}
185+
186+
void addInstance(WorkflowInstance instance) {
187+
activeInstances.put(instance.id(), instance);
188+
}
189+
190+
public Optional<WorkflowInstance> activeInstance(String instanceId) {
191+
return Optional.ofNullable(activeInstances.get(instanceId));
192+
}
193+
180194
@Override
181195
public WorkflowDefinitionId id() {
182196
return definitionId;
@@ -194,6 +208,7 @@ public void close() {
194208
cronSchedule.cancel();
195209
}
196210
scheduledInstances.clear();
211+
activeInstances.clear();
197212
}
198213

199214
@Override

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ protected WorkflowMutableInstance(WorkflowDefinition definition, String id, Work
6363
this.input = input;
6464
this.status = new AtomicReference<>(WorkflowStatus.PENDING);
6565
this.workflowContext = new WorkflowContext(definition, this);
66+
definition.addInstance(this);
6667
}
6768

6869
@Override
@@ -120,6 +121,7 @@ private void whenCompleted(WorkflowModel result, Throwable ex) {
120121
if (ex != null) {
121122
handleException(ex instanceof CompletionException ? ex = ex.getCause() : ex);
122123
}
124+
workflowContext.definition().removeInstance(this);
123125
}
124126

125127
private void handleException(Throwable ex) {

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ protected final Stream<WorkflowInstance> scanAll(
2828
String applicationId) {
2929
return operations
3030
.scanAll(applicationId, definition)
31-
.map(v -> new WorkflowPersistenceInstance(definition, v));
31+
.map(v -> WorkflowPersistenceInstance.of(definition, v));
3232
}
3333

3434
protected final Optional<WorkflowInstance> find(
3535
PersistenceInstanceOperations operations, WorkflowDefinition definition, String instanceId) {
3636
return operations
3737
.readWorkflowInfo(definition, instanceId)
38-
.map(i -> new WorkflowPersistenceInstance(definition, i));
38+
.map(i -> WorkflowPersistenceInstance.of(definition, i));
3939
}
4040

4141
@Override

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.serverlessworkflow.impl.TaskContext;
1919
import io.serverlessworkflow.impl.WorkflowContext;
2020
import io.serverlessworkflow.impl.WorkflowDefinition;
21+
import io.serverlessworkflow.impl.WorkflowInstance;
2122
import io.serverlessworkflow.impl.WorkflowModel;
2223
import io.serverlessworkflow.impl.WorkflowMutableInstance;
2324
import io.serverlessworkflow.impl.WorkflowStatus;
@@ -28,7 +29,13 @@ public class WorkflowPersistenceInstance extends WorkflowMutableInstance {
2829

2930
private final PersistenceWorkflowInfo info;
3031

31-
public WorkflowPersistenceInstance(WorkflowDefinition definition, PersistenceWorkflowInfo info) {
32+
public static WorkflowInstance of(WorkflowDefinition definition, PersistenceWorkflowInfo info) {
33+
return definition
34+
.activeInstance(info.id())
35+
.orElseGet(() -> new WorkflowPersistenceInstance(definition, info));
36+
}
37+
38+
private WorkflowPersistenceInstance(WorkflowDefinition definition, PersistenceWorkflowInfo info) {
3239
super(definition, info.id(), info.input());
3340
this.info = info;
3441
this.startedAt = info.startedAt();

impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ void testWorkflowInstance() throws InterruptedException {
152152
try (Stream<WorkflowInstance> stream = handlers.reader().scanAll(definition)) {
153153
assertThat(stream.count()).isEqualTo(1);
154154
}
155+
definition.close();
155156
instance =
156157
(WorkflowPersistenceInstance)
157158
handlers.reader().find(definition, workflowInstance.id()).orElseThrow();

0 commit comments

Comments
 (0)