Skip to content

Commit f9107cf

Browse files
committed
[Fix #1395] Refining approach
There were some implicit constraints in the map to be returned by retrieveEvents. I think it is better to pass the Map already populated with the expected registrations associated to an empty modifiable array and let the implementor just add the cloud events to every array. Also, the event can be stored at the end, reducing the likeness of using it for calculations in a different cluster Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 411dc95 commit f9107cf

4 files changed

Lines changed: 48 additions & 30 deletions

File tree

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,10 @@ public ScheduledEventConsumer(
6060
builderInfo.registrations().registrations();
6161
allStrategyCorrelationInfo.init(registrationBuilders, this::start);
6262
registrationBuilders.forEach(
63-
reg -> {
64-
registrations.add(
65-
eventConsumer.register(
66-
reg, ce -> allStrategyCorrelationInfo.correlate(reg, (CloudEvent) ce)));
67-
});
63+
reg ->
64+
registrations.add(
65+
eventConsumer.register(
66+
reg, ce -> allStrategyCorrelationInfo.correlate(reg, (CloudEvent) ce))));
6867
} else {
6968
builderInfo
7069
.registrations()

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,19 +90,33 @@ private Collection<Map<EventRegistrationBuilder, CloudEvent>> eventAdded(
9090
CorrelationOperations operations, String reg, CloudEvent event) {
9191
logger.debug(
9292
"Received event {} for definition {} and registration {}", event, definition.id(), reg);
93+
Map<String, List<CloudEvent>> events = initMap();
94+
operations.retrieveEvents(events);
95+
events.get(reg).add(event);
96+
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
9397
operations.storeEvent(reg, event);
94-
return checkCorrelation(operations);
98+
markProcessed(operations, result);
99+
return result;
100+
}
101+
102+
private Map<String, List<CloudEvent>> initMap() {
103+
return id2RegMapping.keySet().stream()
104+
.collect(Collectors.toMap(k -> k, k -> new ArrayList<>()));
95105
}
96106

97107
private Collection<Map<EventRegistrationBuilder, CloudEvent>> startupCheck(
98108
CorrelationOperations operations) {
109+
logger.debug("Checking cloud events for definition {}", definition.id());
99110
operations.clearProcessed();
100-
return checkCorrelation(operations);
111+
Map<String, List<CloudEvent>> events = initMap();
112+
operations.retrieveEvents(events);
113+
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
114+
markProcessed(operations, result);
115+
return result;
101116
}
102117

103118
private final Collection<Map<EventRegistrationBuilder, CloudEvent>> checkCorrelation(
104-
CorrelationOperations operations) {
105-
Map<String, List<CloudEvent>> events = operations.retrieveEvents(id2RegMapping.keySet());
119+
Map<String, List<CloudEvent>> events) {
106120
logger.debug("Stored CloudEvents for definition {} are {}", definition.id(), events);
107121
if (events.isEmpty()) {
108122
return List.of();
@@ -117,13 +131,18 @@ private final Collection<Map<EventRegistrationBuilder, CloudEvent>> checkCorrela
117131
notDone = false;
118132
break;
119133
}
120-
CloudEvent retrieved = list.remove(0);
121-
row.put(id2RegMapping.get(item.getKey()), retrieved);
134+
row.put(id2RegMapping.get(item.getKey()), list.remove(0));
122135
}
123136
if (notDone) {
124137
result.add(row);
125138
}
126139
}
140+
return result;
141+
}
142+
143+
private void markProcessed(
144+
CorrelationOperations operations,
145+
Collection<Map<EventRegistrationBuilder, CloudEvent>> result) {
127146
if (!result.isEmpty()) {
128147
Map<String, Collection<String>> processed = new HashMap<>();
129148
for (Map<EventRegistrationBuilder, CloudEvent> item : result) {
@@ -135,7 +154,6 @@ private final Collection<Map<EventRegistrationBuilder, CloudEvent>> checkCorrela
135154
}
136155
operations.markAsProcessed(processed);
137156
}
138-
return result;
139157
}
140158

141159
public void addMetadata(

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@
2222

2323
interface CorrelationOperations {
2424

25-
default Map<String, List<CloudEvent>> retrieveEvents(Collection<String> targetRegIds) {
26-
return Map.of();
27-
}
25+
default void retrieveEvents(Map<String, List<CloudEvent>> reg2EventsMap) {}
2826

2927
default void storeEvent(String regId, CloudEvent event) {}
3028

impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import io.serverlessworkflow.impl.persistence.PersistenceWorkflowInfo;
3030
import java.util.ArrayList;
3131
import java.util.Collection;
32-
import java.util.HashMap;
3332
import java.util.List;
3433
import java.util.Map;
3534
import java.util.Map.Entry;
@@ -113,26 +112,29 @@ public void clearStatus(WorkflowContextData workflowContext) {
113112
clearStatus(workflowContext.definition(), key(workflowContext));
114113
}
115114

116-
public Map<String, List<CloudEvent>> retrieveEvents(Collection<String> targetRegIds) {
117-
Map<String, List<CloudEvent>> result = new HashMap<>();
118-
targetRegIds.forEach(
119-
regId -> {
120-
Map<String, P> processedCes = processedCloudEvents(regId);
121-
Map<String, C> ces = cloudEvents(regId);
122-
result.put(
123-
regId,
124-
ces.values().stream()
125-
.map(this::unmarshallCloudEvent)
126-
.filter(ce -> !processedCes.containsKey(ce.getId()))
127-
.collect(Collectors.toCollection(ArrayList::new)));
128-
});
129-
return result;
115+
@Override
116+
public void retrieveEvents(Map<String, List<CloudEvent>> events) {
117+
events
118+
.entrySet()
119+
.forEach(
120+
e -> {
121+
String regId = e.getKey();
122+
Map<String, P> processedCes = processedCloudEvents(regId);
123+
Map<String, C> ces = cloudEvents(regId);
124+
e.setValue(
125+
ces.values().stream()
126+
.map(this::unmarshallCloudEvent)
127+
.filter(ce -> !processedCes.containsKey(ce.getId()))
128+
.collect(Collectors.toCollection(ArrayList::new)));
129+
});
130130
}
131131

132+
@Override
132133
public void storeEvent(String regId, CloudEvent event) {
133134
cloudEvents(regId).put(event.getId(), marshallCloudEvent(event));
134135
}
135136

137+
@Override
136138
public void markAsProcessed(Map<String, Collection<String>> regCeIds) {
137139
regCeIds.forEach(
138140
(k, v) -> {
@@ -141,6 +143,7 @@ public void markAsProcessed(Map<String, Collection<String>> regCeIds) {
141143
});
142144
}
143145

146+
@Override
144147
public void clearProcessed() {
145148
deleteAllProcessedMaps();
146149
}

0 commit comments

Comments
 (0)