Skip to content

Commit 89201d4

Browse files
committed
[Fix #1395] Handling duplicated id
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent e9a2821 commit 89201d4

4 files changed

Lines changed: 28 additions & 25 deletions

File tree

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
import io.cloudevents.CloudEvent;
1919
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
20-
import java.util.ArrayList;
2120
import java.util.Collection;
2221
import java.util.HashMap;
23-
import java.util.List;
22+
import java.util.Iterator;
23+
import java.util.LinkedHashSet;
2424
import java.util.Map;
2525
import java.util.function.Consumer;
2626

@@ -37,7 +37,7 @@ public static AllStrategyCorrelationInfo instance() {
3737

3838
private InMemoryAllStrategyCorrelationInfo() {}
3939

40-
private Map<EventRegistrationBuilder, List<CloudEvent>> correlatedEvents;
40+
private Map<EventRegistrationBuilder, Collection<CloudEvent>> correlatedEvents;
4141
private Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter;
4242

4343
@Override
@@ -48,9 +48,11 @@ public void correlate(EventRegistrationBuilder reg, CloudEvent event) {
4848
synchronized (correlatedEvents) {
4949
correlatedEvents.get(reg).add(event);
5050
if (satisfyCondition(correlatedEvents)) {
51-
for (java.util.Map.Entry<EventRegistrationBuilder, List<CloudEvent>> values :
51+
for (java.util.Map.Entry<EventRegistrationBuilder, Collection<CloudEvent>> values :
5252
correlatedEvents.entrySet()) {
53-
result.put(values.getKey(), values.getValue().remove(0));
53+
Iterator<CloudEvent> iter = values.getValue().iterator();
54+
result.put(values.getKey(), iter.next());
55+
iter.remove();
5456
}
5557
}
5658
}
@@ -65,11 +67,11 @@ public void init(
6567
Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter) {
6668
correlatedEvents = new HashMap<>();
6769
this.starter = starter;
68-
regs.forEach(reg -> correlatedEvents.put(reg, new ArrayList<CloudEvent>()));
70+
regs.forEach(reg -> correlatedEvents.put(reg, new LinkedHashSet<CloudEvent>()));
6971
}
7072

71-
private boolean satisfyCondition(Map<EventRegistrationBuilder, List<CloudEvent>> events) {
72-
for (List<CloudEvent> values : events.values()) {
73+
private boolean satisfyCondition(Map<EventRegistrationBuilder, Collection<CloudEvent>> events) {
74+
for (Collection<CloudEvent> values : events.values()) {
7375
if (values.isEmpty()) {
7476
return false;
7577
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.util.ArrayList;
2424
import java.util.Collection;
2525
import java.util.HashMap;
26+
import java.util.Iterator;
27+
import java.util.LinkedHashSet;
2628
import java.util.List;
2729
import java.util.Map;
2830
import java.util.Map.Entry;
@@ -90,7 +92,7 @@ private Collection<Map<EventRegistrationBuilder, CloudEvent>> eventAdded(
9092
CorrelationOperations operations, String reg, CloudEvent event) {
9193
logger.debug(
9294
"Received event {} for definition {} and registration {}", event, definition.id(), reg);
93-
Map<String, List<CloudEvent>> events = initMap();
95+
Map<String, Collection<CloudEvent>> events = initMap();
9496
operations.retrieveEvents(events);
9597
events.get(reg).add(event);
9698
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
@@ -99,39 +101,40 @@ private Collection<Map<EventRegistrationBuilder, CloudEvent>> eventAdded(
99101
return result;
100102
}
101103

102-
private Map<String, List<CloudEvent>> initMap() {
104+
private Map<String, Collection<CloudEvent>> initMap() {
103105
return id2RegMapping.keySet().stream()
104-
.collect(Collectors.toMap(k -> k, k -> new ArrayList<>()));
106+
.collect(Collectors.toMap(k -> k, k -> new LinkedHashSet<>()));
105107
}
106108

107109
private Collection<Map<EventRegistrationBuilder, CloudEvent>> startupCheck(
108110
CorrelationOperations operations) {
109111
logger.debug("Checking cloud events for definition {}", definition.id());
110112
operations.clearProcessed();
111-
Map<String, List<CloudEvent>> events = initMap();
113+
Map<String, Collection<CloudEvent>> events = initMap();
112114
operations.retrieveEvents(events);
113115
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
114116
markProcessed(operations, result);
115117
return result;
116118
}
117119

118120
private final Collection<Map<EventRegistrationBuilder, CloudEvent>> checkCorrelation(
119-
Map<String, List<CloudEvent>> events) {
121+
Map<String, Collection<CloudEvent>> events) {
120122
logger.debug("Stored CloudEvents for definition {} are {}", definition.id(), events);
121-
if (events.isEmpty()) {
122-
return List.of();
123-
}
124123
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = new ArrayList<>();
124+
Map<String, Iterator<CloudEvent>> iteratingEvents =
125+
events.entrySet().stream()
126+
.collect(Collectors.toMap(Entry::getKey, e -> e.getValue().iterator()));
125127
boolean notDone = true;
126128
while (notDone) {
127129
Map<EventRegistrationBuilder, CloudEvent> row = new HashMap<>();
128-
for (Entry<String, List<CloudEvent>> item : events.entrySet()) {
129-
List<CloudEvent> list = item.getValue();
130-
if (list.isEmpty()) {
130+
for (Entry<String, Iterator<CloudEvent>> item : iteratingEvents.entrySet()) {
131+
Iterator<CloudEvent> iter = item.getValue();
132+
if (!iter.hasNext()) {
131133
notDone = false;
132134
break;
133135
}
134-
row.put(id2RegMapping.get(item.getKey()), list.remove(0));
136+
row.put(id2RegMapping.get(item.getKey()), iter.next());
137+
iter.remove();
135138
}
136139
if (notDone) {
137140
result.add(row);

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717

1818
import io.cloudevents.CloudEvent;
1919
import java.util.Collection;
20-
import java.util.List;
2120
import java.util.Map;
2221

2322
interface CorrelationOperations {
2423

25-
default void retrieveEvents(Map<String, List<CloudEvent>> reg2EventsMap) {}
24+
default void retrieveEvents(Map<String, Collection<CloudEvent>> reg2EventsMap) {}
2625

2726
default void storeEvent(String regId, CloudEvent event) {}
2827

impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo;
2929
import io.serverlessworkflow.impl.persistence.PersistenceWorkflowInfo;
3030
import java.util.Collection;
31-
import java.util.List;
3231
import java.util.Map;
3332
import java.util.Map.Entry;
3433
import java.util.Optional;
@@ -112,13 +111,13 @@ public void clearStatus(WorkflowContextData workflowContext) {
112111
}
113112

114113
@Override
115-
public void retrieveEvents(Map<String, List<CloudEvent>> events) {
114+
public void retrieveEvents(Map<String, Collection<CloudEvent>> events) {
116115
events
117116
.entrySet()
118117
.forEach(
119118
e -> {
120119
String regId = e.getKey();
121-
List<CloudEvent> cloudEvents = e.getValue();
120+
Collection<CloudEvent> cloudEvents = e.getValue();
122121
Map<String, P> processedCes = processedCloudEvents(regId);
123122
cloudEvents(regId).values().stream()
124123
.map(this::unmarshallCloudEvent)

0 commit comments

Comments
 (0)