Skip to content

Commit e9a2821

Browse files
committed
[Fix #1395] Refining approach
There were some implicit constraints in the map to be returned by retrieveEvents. I think it is better to pass the Map already populated with the expected registrations associated to an empty modifiable array and let the implementor just add the cloud events to every array. Also, the event can be stored at the end, reducing the likeness of using it for calculations in a different cluster Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 411dc95 commit e9a2821

4 files changed

Lines changed: 45 additions & 29 deletions

File tree

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,10 @@ public ScheduledEventConsumer(
6060
builderInfo.registrations().registrations();
6161
allStrategyCorrelationInfo.init(registrationBuilders, this::start);
6262
registrationBuilders.forEach(
63-
reg -> {
64-
registrations.add(
65-
eventConsumer.register(
66-
reg, ce -> allStrategyCorrelationInfo.correlate(reg, (CloudEvent) ce)));
67-
});
63+
reg ->
64+
registrations.add(
65+
eventConsumer.register(
66+
reg, ce -> allStrategyCorrelationInfo.correlate(reg, (CloudEvent) ce))));
6867
} else {
6968
builderInfo
7069
.registrations()

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,19 +90,33 @@ private Collection<Map<EventRegistrationBuilder, CloudEvent>> eventAdded(
9090
CorrelationOperations operations, String reg, CloudEvent event) {
9191
logger.debug(
9292
"Received event {} for definition {} and registration {}", event, definition.id(), reg);
93+
Map<String, List<CloudEvent>> events = initMap();
94+
operations.retrieveEvents(events);
95+
events.get(reg).add(event);
96+
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
9397
operations.storeEvent(reg, event);
94-
return checkCorrelation(operations);
98+
markProcessed(operations, result);
99+
return result;
100+
}
101+
102+
private Map<String, List<CloudEvent>> initMap() {
103+
return id2RegMapping.keySet().stream()
104+
.collect(Collectors.toMap(k -> k, k -> new ArrayList<>()));
95105
}
96106

97107
private Collection<Map<EventRegistrationBuilder, CloudEvent>> startupCheck(
98108
CorrelationOperations operations) {
109+
logger.debug("Checking cloud events for definition {}", definition.id());
99110
operations.clearProcessed();
100-
return checkCorrelation(operations);
111+
Map<String, List<CloudEvent>> events = initMap();
112+
operations.retrieveEvents(events);
113+
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
114+
markProcessed(operations, result);
115+
return result;
101116
}
102117

103118
private final Collection<Map<EventRegistrationBuilder, CloudEvent>> checkCorrelation(
104-
CorrelationOperations operations) {
105-
Map<String, List<CloudEvent>> events = operations.retrieveEvents(id2RegMapping.keySet());
119+
Map<String, List<CloudEvent>> events) {
106120
logger.debug("Stored CloudEvents for definition {} are {}", definition.id(), events);
107121
if (events.isEmpty()) {
108122
return List.of();
@@ -117,13 +131,18 @@ private final Collection<Map<EventRegistrationBuilder, CloudEvent>> checkCorrela
117131
notDone = false;
118132
break;
119133
}
120-
CloudEvent retrieved = list.remove(0);
121-
row.put(id2RegMapping.get(item.getKey()), retrieved);
134+
row.put(id2RegMapping.get(item.getKey()), list.remove(0));
122135
}
123136
if (notDone) {
124137
result.add(row);
125138
}
126139
}
140+
return result;
141+
}
142+
143+
private void markProcessed(
144+
CorrelationOperations operations,
145+
Collection<Map<EventRegistrationBuilder, CloudEvent>> result) {
127146
if (!result.isEmpty()) {
128147
Map<String, Collection<String>> processed = new HashMap<>();
129148
for (Map<EventRegistrationBuilder, CloudEvent> item : result) {
@@ -135,7 +154,6 @@ private final Collection<Map<EventRegistrationBuilder, CloudEvent>> checkCorrela
135154
}
136155
operations.markAsProcessed(processed);
137156
}
138-
return result;
139157
}
140158

141159
public void addMetadata(

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@
2222

2323
interface CorrelationOperations {
2424

25-
default Map<String, List<CloudEvent>> retrieveEvents(Collection<String> targetRegIds) {
26-
return Map.of();
27-
}
25+
default void retrieveEvents(Map<String, List<CloudEvent>> reg2EventsMap) {}
2826

2927
default void storeEvent(String regId, CloudEvent event) {}
3028

impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727
import io.serverlessworkflow.impl.persistence.PersistenceInstanceTransaction;
2828
import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo;
2929
import io.serverlessworkflow.impl.persistence.PersistenceWorkflowInfo;
30-
import java.util.ArrayList;
3130
import java.util.Collection;
32-
import java.util.HashMap;
3331
import java.util.List;
3432
import java.util.Map;
3533
import java.util.Map.Entry;
@@ -113,26 +111,28 @@ public void clearStatus(WorkflowContextData workflowContext) {
113111
clearStatus(workflowContext.definition(), key(workflowContext));
114112
}
115113

116-
public Map<String, List<CloudEvent>> retrieveEvents(Collection<String> targetRegIds) {
117-
Map<String, List<CloudEvent>> result = new HashMap<>();
118-
targetRegIds.forEach(
119-
regId -> {
120-
Map<String, P> processedCes = processedCloudEvents(regId);
121-
Map<String, C> ces = cloudEvents(regId);
122-
result.put(
123-
regId,
124-
ces.values().stream()
114+
@Override
115+
public void retrieveEvents(Map<String, List<CloudEvent>> events) {
116+
events
117+
.entrySet()
118+
.forEach(
119+
e -> {
120+
String regId = e.getKey();
121+
List<CloudEvent> cloudEvents = e.getValue();
122+
Map<String, P> processedCes = processedCloudEvents(regId);
123+
cloudEvents(regId).values().stream()
125124
.map(this::unmarshallCloudEvent)
126125
.filter(ce -> !processedCes.containsKey(ce.getId()))
127-
.collect(Collectors.toCollection(ArrayList::new)));
128-
});
129-
return result;
126+
.forEach(cloudEvents::add);
127+
});
130128
}
131129

130+
@Override
132131
public void storeEvent(String regId, CloudEvent event) {
133132
cloudEvents(regId).put(event.getId(), marshallCloudEvent(event));
134133
}
135134

135+
@Override
136136
public void markAsProcessed(Map<String, Collection<String>> regCeIds) {
137137
regCeIds.forEach(
138138
(k, v) -> {
@@ -141,6 +141,7 @@ public void markAsProcessed(Map<String, Collection<String>> regCeIds) {
141141
});
142142
}
143143

144+
@Override
144145
public void clearProcessed() {
145146
deleteAllProcessedMaps();
146147
}

0 commit comments

Comments
 (0)