Skip to content

Commit 07e5243

Browse files
authored
[Fix #1395] Refining All strategy correlation persistence approach (#1398)
* [Fix #1395] Refining approach There were some implicit constraints in the map to be returned by retrieveEvents. I think it is better to pass the Map already populated with the expected registrations associated to an empty modifiable array and let the implementor just add the cloud events to every array. Also, the event can be stored at the end, reducing the likeness of using it for calculations in a different cluster Signed-off-by: fjtirado <ftirados@redhat.com> * [Fix #1395] Handling duplicated id Signed-off-by: fjtirado <ftirados@redhat.com> --------- Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 88e068b commit 07e5243

13 files changed

Lines changed: 128 additions & 58 deletions

File tree

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
import java.net.URI;
1919
import java.time.Instant;
20-
import java.time.OffsetDateTime;
21-
import java.time.ZoneOffset;
2220
import java.util.ArrayList;
2321
import java.util.Collection;
2422
import java.util.LinkedHashMap;
@@ -116,10 +114,10 @@ public Object readObject() {
116114
return readCustomObject();
117115

118116
case URI:
119-
return URI.create(readString());
117+
return readURI();
120118

121119
case OFFSET_DATE_TIME:
122-
return OffsetDateTime.ofInstant(readInstant(), ZoneOffset.of(readString()));
120+
return readOffsetDateTime();
123121

124122
default:
125123
throw new IllegalStateException("Unsupported type " + type);

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,7 @@ public WorkflowOutputBuffer writeObject(Object object) {
104104
writeInstant(value);
105105
} else if (object instanceof OffsetDateTime value) {
106106
writeType(Type.OFFSET_DATE_TIME);
107-
writeInstant(value.toInstant());
108-
writeString(value.getOffset().toString());
107+
writeOffsetDateTime(value);
109108
} else if (object instanceof URI value) {
110109
writeType(Type.URI);
111110
writeString(value.toString());

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,14 @@ public static byte[] writeString(WorkflowBufferFactory factory, String value) {
7878
return writeValue(factory, value, (b, v) -> b.writeString(v));
7979
}
8080

81+
public static byte[] writeOffsetDateTime(WorkflowBufferFactory factory, OffsetDateTime value) {
82+
return writeValue(factory, value, (b, v) -> b.writeOffsetDateTime(v));
83+
}
84+
85+
public static byte[] writeURI(WorkflowBufferFactory factory, URI value) {
86+
return writeValue(factory, value, (b, v) -> b.writeURI(v));
87+
}
88+
8189
public static byte[] writeCloudEventExtensions(WorkflowBufferFactory factory, CloudEvent event) {
8290
try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
8391
WorkflowOutputBuffer out = factory.output(bytesOut)) {
@@ -99,6 +107,9 @@ public static void writeCloudEventExtensions(WorkflowOutputBuffer out, CloudEven
99107

100108
public static CloudEventBuilder readCloudEventExtensions(
101109
WorkflowBufferFactory factory, byte[] value, CloudEventBuilder builder) {
110+
if (value == null) {
111+
return builder;
112+
}
102113
try (ByteArrayInputStream bytesInt = new ByteArrayInputStream(value);
103114
WorkflowInputBuffer in = factory.input(bytesInt)) {
104115
return readCloudEventExtenstions(in, value, builder);
@@ -162,6 +173,14 @@ public static Instant readInstant(WorkflowBufferFactory factory, byte[] value) {
162173
return readValue(factory, value, WorkflowInputBuffer::readInstant);
163174
}
164175

176+
public static OffsetDateTime readOffsetDateTime(WorkflowBufferFactory factory, byte[] value) {
177+
return readValue(factory, value, WorkflowInputBuffer::readOffsetDateTime);
178+
}
179+
180+
public static URI readURI(WorkflowBufferFactory factory, byte[] value) {
181+
return readValue(factory, value, WorkflowInputBuffer::readURI);
182+
}
183+
165184
public static <T extends Enum<T>> T readEnum(
166185
WorkflowBufferFactory factory, byte[] value, Class<T> enumClass) {
167186
return readValue(factory, value, b -> b.readEnum(enumClass));

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowInputBuffer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
package io.serverlessworkflow.impl.marshaller;
1717

1818
import java.io.Closeable;
19+
import java.net.URI;
1920
import java.time.Instant;
21+
import java.time.OffsetDateTime;
22+
import java.time.ZoneOffset;
2023
import java.util.Collection;
2124
import java.util.Map;
2225

@@ -40,6 +43,14 @@ public interface WorkflowInputBuffer extends Closeable {
4043

4144
byte[] readBytes();
4245

46+
default OffsetDateTime readOffsetDateTime() {
47+
return OffsetDateTime.ofInstant(readInstant(), ZoneOffset.of(readString()));
48+
}
49+
50+
default URI readURI() {
51+
return URI.create(readString());
52+
}
53+
4354
<T extends Enum<T>> T readEnum(Class<T> enumClass);
4455

4556
Instant readInstant();

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowOutputBuffer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
*/
1616
package io.serverlessworkflow.impl.marshaller;
1717

18+
import java.net.URI;
1819
import java.time.Instant;
20+
import java.time.OffsetDateTime;
1921
import java.util.Collection;
2022
import java.util.Map;
2123

@@ -41,6 +43,17 @@ public interface WorkflowOutputBuffer extends AutoCloseable {
4143

4244
WorkflowOutputBuffer writeInstant(Instant instant);
4345

46+
default WorkflowOutputBuffer writeOffsetDateTime(OffsetDateTime time) {
47+
writeInstant(time.toInstant());
48+
writeString(time.getOffset().toString());
49+
return this;
50+
}
51+
52+
default WorkflowOutputBuffer writeURI(URI uri) {
53+
writeString(uri.toString());
54+
return this;
55+
}
56+
4457
WorkflowOutputBuffer writeMap(Map<String, Object> map);
4558

4659
WorkflowOutputBuffer writeCollection(Collection<Object> col);

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
import io.cloudevents.CloudEvent;
1919
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
20-
import java.util.ArrayList;
2120
import java.util.Collection;
2221
import java.util.HashMap;
23-
import java.util.List;
22+
import java.util.Iterator;
23+
import java.util.LinkedHashSet;
2424
import java.util.Map;
2525
import java.util.function.Consumer;
2626

@@ -37,7 +37,7 @@ public static AllStrategyCorrelationInfo instance() {
3737

3838
private InMemoryAllStrategyCorrelationInfo() {}
3939

40-
private Map<EventRegistrationBuilder, List<CloudEvent>> correlatedEvents;
40+
private Map<EventRegistrationBuilder, Collection<CloudEvent>> correlatedEvents;
4141
private Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter;
4242

4343
@Override
@@ -48,9 +48,11 @@ public void correlate(EventRegistrationBuilder reg, CloudEvent event) {
4848
synchronized (correlatedEvents) {
4949
correlatedEvents.get(reg).add(event);
5050
if (satisfyCondition(correlatedEvents)) {
51-
for (java.util.Map.Entry<EventRegistrationBuilder, List<CloudEvent>> values :
51+
for (java.util.Map.Entry<EventRegistrationBuilder, Collection<CloudEvent>> values :
5252
correlatedEvents.entrySet()) {
53-
result.put(values.getKey(), values.getValue().remove(0));
53+
Iterator<CloudEvent> iter = values.getValue().iterator();
54+
result.put(values.getKey(), iter.next());
55+
iter.remove();
5456
}
5557
}
5658
}
@@ -65,11 +67,11 @@ public void init(
6567
Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter) {
6668
correlatedEvents = new HashMap<>();
6769
this.starter = starter;
68-
regs.forEach(reg -> correlatedEvents.put(reg, new ArrayList<CloudEvent>()));
70+
regs.forEach(reg -> correlatedEvents.put(reg, new LinkedHashSet<CloudEvent>()));
6971
}
7072

71-
private boolean satisfyCondition(Map<EventRegistrationBuilder, List<CloudEvent>> events) {
72-
for (List<CloudEvent> values : events.values()) {
73+
private boolean satisfyCondition(Map<EventRegistrationBuilder, Collection<CloudEvent>> events) {
74+
for (Collection<CloudEvent> values : events.values()) {
7375
if (values.isEmpty()) {
7476
return false;
7577
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,10 @@ public ScheduledEventConsumer(
6060
builderInfo.registrations().registrations();
6161
allStrategyCorrelationInfo.init(registrationBuilders, this::start);
6262
registrationBuilders.forEach(
63-
reg -> {
64-
registrations.add(
65-
eventConsumer.register(
66-
reg, ce -> allStrategyCorrelationInfo.correlate(reg, (CloudEvent) ce)));
67-
});
63+
reg ->
64+
registrations.add(
65+
eventConsumer.register(
66+
reg, ce -> allStrategyCorrelationInfo.correlate(reg, (CloudEvent) ce))));
6867
} else {
6968
builderInfo
7069
.registrations()

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.util.ArrayList;
2424
import java.util.Collection;
2525
import java.util.HashMap;
26+
import java.util.Iterator;
27+
import java.util.LinkedHashSet;
2628
import java.util.List;
2729
import java.util.Map;
2830
import java.util.Map.Entry;
@@ -80,8 +82,16 @@ private void queueCorrelation(
8082
Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter) {
8183
synchronized (this) {
8284
this.completableFuture =
83-
completableFuture.thenCompose(
84-
v -> executor.execute(() -> doTransaction(function), definition));
85+
completableFuture
86+
.thenCompose(v -> executor.execute(() -> doTransaction(function), definition))
87+
.exceptionally(
88+
ex -> {
89+
logger.error(
90+
"Exception processing correlation task for definition {}",
91+
definition.id(),
92+
ex);
93+
return List.of();
94+
});
8595
completableFuture.thenAccept(events -> events.forEach(starter));
8696
}
8797
}
@@ -90,40 +100,60 @@ private Collection<Map<EventRegistrationBuilder, CloudEvent>> eventAdded(
90100
CorrelationOperations operations, String reg, CloudEvent event) {
91101
logger.debug(
92102
"Received event {} for definition {} and registration {}", event, definition.id(), reg);
103+
Map<String, Collection<CloudEvent>> events = initMap();
104+
operations.retrieveEvents(events);
105+
events.get(reg).add(event);
106+
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
93107
operations.storeEvent(reg, event);
94-
return checkCorrelation(operations);
108+
markProcessed(operations, result);
109+
return result;
110+
}
111+
112+
private Map<String, Collection<CloudEvent>> initMap() {
113+
return id2RegMapping.keySet().stream()
114+
.collect(Collectors.toMap(k -> k, k -> new LinkedHashSet<>()));
95115
}
96116

97117
private Collection<Map<EventRegistrationBuilder, CloudEvent>> startupCheck(
98118
CorrelationOperations operations) {
119+
logger.debug("Checking cloud events for definition {}", definition.id());
99120
operations.clearProcessed();
100-
return checkCorrelation(operations);
121+
Map<String, Collection<CloudEvent>> events = initMap();
122+
operations.retrieveEvents(events);
123+
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
124+
markProcessed(operations, result);
125+
return result;
101126
}
102127

103128
private final Collection<Map<EventRegistrationBuilder, CloudEvent>> checkCorrelation(
104-
CorrelationOperations operations) {
105-
Map<String, List<CloudEvent>> events = operations.retrieveEvents(id2RegMapping.keySet());
129+
Map<String, Collection<CloudEvent>> events) {
106130
logger.debug("Stored CloudEvents for definition {} are {}", definition.id(), events);
107-
if (events.isEmpty()) {
108-
return List.of();
109-
}
110131
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = new ArrayList<>();
132+
Map<String, Iterator<CloudEvent>> iteratingEvents =
133+
events.entrySet().stream()
134+
.collect(Collectors.toMap(Entry::getKey, e -> e.getValue().iterator()));
111135
boolean notDone = true;
112136
while (notDone) {
113137
Map<EventRegistrationBuilder, CloudEvent> row = new HashMap<>();
114-
for (Entry<String, List<CloudEvent>> item : events.entrySet()) {
115-
List<CloudEvent> list = item.getValue();
116-
if (list.isEmpty()) {
138+
for (Entry<String, Iterator<CloudEvent>> item : iteratingEvents.entrySet()) {
139+
Iterator<CloudEvent> iter = item.getValue();
140+
if (!iter.hasNext()) {
117141
notDone = false;
118142
break;
119143
}
120-
CloudEvent retrieved = list.remove(0);
121-
row.put(id2RegMapping.get(item.getKey()), retrieved);
144+
row.put(id2RegMapping.get(item.getKey()), iter.next());
145+
iter.remove();
122146
}
123147
if (notDone) {
124148
result.add(row);
125149
}
126150
}
151+
return result;
152+
}
153+
154+
private void markProcessed(
155+
CorrelationOperations operations,
156+
Collection<Map<EventRegistrationBuilder, CloudEvent>> result) {
127157
if (!result.isEmpty()) {
128158
Map<String, Collection<String>> processed = new HashMap<>();
129159
for (Map<EventRegistrationBuilder, CloudEvent> item : result) {
@@ -135,7 +165,6 @@ private final Collection<Map<EventRegistrationBuilder, CloudEvent>> checkCorrela
135165
}
136166
operations.markAsProcessed(processed);
137167
}
138-
return result;
139168
}
140169

141170
public void addMetadata(

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@
1717

1818
import io.cloudevents.CloudEvent;
1919
import java.util.Collection;
20-
import java.util.List;
2120
import java.util.Map;
2221

23-
interface CorrelationOperations {
22+
public interface CorrelationOperations {
2423

25-
default Map<String, List<CloudEvent>> retrieveEvents(Collection<String> targetRegIds) {
26-
return Map.of();
27-
}
24+
default void retrieveEvents(Map<String, Collection<CloudEvent>> reg2EventsMap) {}
2825

2926
default void storeEvent(String regId, CloudEvent event) {}
3027

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/OperationAllStrategyCorrelationInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.util.Map;
2323
import java.util.function.Function;
2424

25-
class OperationAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo {
25+
public class OperationAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo {
2626

2727
private final PersistenceInstanceOperations operations;
2828

0 commit comments

Comments
 (0)