Skip to content

Commit 3e9b52f

Browse files
committed
[Fix #1395] Handling duplicated id
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent e9a2821 commit 3e9b52f

12 files changed

Lines changed: 87 additions & 37 deletions

File tree

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
import java.net.URI;
1919
import java.time.Instant;
20-
import java.time.OffsetDateTime;
21-
import java.time.ZoneOffset;
2220
import java.util.ArrayList;
2321
import java.util.Collection;
2422
import java.util.LinkedHashMap;
@@ -116,10 +114,10 @@ public Object readObject() {
116114
return readCustomObject();
117115

118116
case URI:
119-
return URI.create(readString());
117+
return readURI();
120118

121119
case OFFSET_DATE_TIME:
122-
return OffsetDateTime.ofInstant(readInstant(), ZoneOffset.of(readString()));
120+
return readOffsetDateTime();
123121

124122
default:
125123
throw new IllegalStateException("Unsupported type " + type);

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,7 @@ public WorkflowOutputBuffer writeObject(Object object) {
104104
writeInstant(value);
105105
} else if (object instanceof OffsetDateTime value) {
106106
writeType(Type.OFFSET_DATE_TIME);
107-
writeInstant(value.toInstant());
108-
writeString(value.getOffset().toString());
107+
writeOffsetDateTime(value);
109108
} else if (object instanceof URI value) {
110109
writeType(Type.URI);
111110
writeString(value.toString());

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,14 @@ public static byte[] writeString(WorkflowBufferFactory factory, String value) {
7878
return writeValue(factory, value, (b, v) -> b.writeString(v));
7979
}
8080

81+
public static byte[] writeOffsetDataTime(WorkflowBufferFactory factory, OffsetDateTime value) {
82+
return writeValue(factory, value, (b, v) -> b.writeOffsetDateTime(value));
83+
}
84+
85+
public static byte[] writeURI(WorkflowBufferFactory factory, URI value) {
86+
return writeValue(factory, value, (b, v) -> b.writeURI(value));
87+
}
88+
8189
public static byte[] writeCloudEventExtensions(WorkflowBufferFactory factory, CloudEvent event) {
8290
try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
8391
WorkflowOutputBuffer out = factory.output(bytesOut)) {
@@ -162,6 +170,14 @@ public static Instant readInstant(WorkflowBufferFactory factory, byte[] value) {
162170
return readValue(factory, value, WorkflowInputBuffer::readInstant);
163171
}
164172

173+
public static OffsetDateTime readOffsetDateTime(WorkflowBufferFactory factory, byte[] value) {
174+
return readValue(factory, value, WorkflowInputBuffer::readOffsetDateTime);
175+
}
176+
177+
public static URI readURI(WorkflowBufferFactory factory, byte[] value) {
178+
return readValue(factory, value, WorkflowInputBuffer::readURI);
179+
}
180+
165181
public static <T extends Enum<T>> T readEnum(
166182
WorkflowBufferFactory factory, byte[] value, Class<T> enumClass) {
167183
return readValue(factory, value, b -> b.readEnum(enumClass));

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowInputBuffer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
package io.serverlessworkflow.impl.marshaller;
1717

1818
import java.io.Closeable;
19+
import java.net.URI;
1920
import java.time.Instant;
21+
import java.time.OffsetDateTime;
22+
import java.time.ZoneOffset;
2023
import java.util.Collection;
2124
import java.util.Map;
2225

@@ -40,6 +43,14 @@ public interface WorkflowInputBuffer extends Closeable {
4043

4144
byte[] readBytes();
4245

46+
default OffsetDateTime readOffsetDateTime() {
47+
return OffsetDateTime.ofInstant(readInstant(), ZoneOffset.of(readString()));
48+
}
49+
50+
default URI readURI() {
51+
return URI.create(readString());
52+
}
53+
4354
<T extends Enum<T>> T readEnum(Class<T> enumClass);
4455

4556
Instant readInstant();

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/WorkflowOutputBuffer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
*/
1616
package io.serverlessworkflow.impl.marshaller;
1717

18+
import java.net.URI;
1819
import java.time.Instant;
20+
import java.time.OffsetDateTime;
1921
import java.util.Collection;
2022
import java.util.Map;
2123

@@ -41,6 +43,17 @@ public interface WorkflowOutputBuffer extends AutoCloseable {
4143

4244
WorkflowOutputBuffer writeInstant(Instant instant);
4345

46+
default WorkflowOutputBuffer writeOffsetDateTime(OffsetDateTime time) {
47+
writeInstant(time.toInstant());
48+
writeString(time.getOffset().toString());
49+
return this;
50+
}
51+
52+
default WorkflowOutputBuffer writeURI(URI uri) {
53+
writeString(uri.toString());
54+
return this;
55+
}
56+
4457
WorkflowOutputBuffer writeMap(Map<String, Object> map);
4558

4659
WorkflowOutputBuffer writeCollection(Collection<Object> col);

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
import io.cloudevents.CloudEvent;
1919
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
20-
import java.util.ArrayList;
2120
import java.util.Collection;
2221
import java.util.HashMap;
23-
import java.util.List;
22+
import java.util.Iterator;
23+
import java.util.LinkedHashSet;
2424
import java.util.Map;
2525
import java.util.function.Consumer;
2626

@@ -37,7 +37,7 @@ public static AllStrategyCorrelationInfo instance() {
3737

3838
private InMemoryAllStrategyCorrelationInfo() {}
3939

40-
private Map<EventRegistrationBuilder, List<CloudEvent>> correlatedEvents;
40+
private Map<EventRegistrationBuilder, Collection<CloudEvent>> correlatedEvents;
4141
private Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter;
4242

4343
@Override
@@ -48,9 +48,11 @@ public void correlate(EventRegistrationBuilder reg, CloudEvent event) {
4848
synchronized (correlatedEvents) {
4949
correlatedEvents.get(reg).add(event);
5050
if (satisfyCondition(correlatedEvents)) {
51-
for (java.util.Map.Entry<EventRegistrationBuilder, List<CloudEvent>> values :
51+
for (java.util.Map.Entry<EventRegistrationBuilder, Collection<CloudEvent>> values :
5252
correlatedEvents.entrySet()) {
53-
result.put(values.getKey(), values.getValue().remove(0));
53+
Iterator<CloudEvent> iter = values.getValue().iterator();
54+
result.put(values.getKey(), iter.next());
55+
iter.remove();
5456
}
5557
}
5658
}
@@ -65,11 +67,11 @@ public void init(
6567
Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter) {
6668
correlatedEvents = new HashMap<>();
6769
this.starter = starter;
68-
regs.forEach(reg -> correlatedEvents.put(reg, new ArrayList<CloudEvent>()));
70+
regs.forEach(reg -> correlatedEvents.put(reg, new LinkedHashSet<CloudEvent>()));
6971
}
7072

71-
private boolean satisfyCondition(Map<EventRegistrationBuilder, List<CloudEvent>> events) {
72-
for (List<CloudEvent> values : events.values()) {
73+
private boolean satisfyCondition(Map<EventRegistrationBuilder, Collection<CloudEvent>> events) {
74+
for (Collection<CloudEvent> values : events.values()) {
7375
if (values.isEmpty()) {
7476
return false;
7577
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.util.ArrayList;
2424
import java.util.Collection;
2525
import java.util.HashMap;
26+
import java.util.Iterator;
27+
import java.util.LinkedHashSet;
2628
import java.util.List;
2729
import java.util.Map;
2830
import java.util.Map.Entry;
@@ -82,15 +84,22 @@ private void queueCorrelation(
8284
this.completableFuture =
8385
completableFuture.thenCompose(
8486
v -> executor.execute(() -> doTransaction(function), definition));
85-
completableFuture.thenAccept(events -> events.forEach(starter));
87+
completableFuture.whenComplete(
88+
(events, ex) -> {
89+
if (ex != null) {
90+
logger.error("Exception processing correlation task ", ex);
91+
} else {
92+
events.forEach(starter);
93+
}
94+
});
8695
}
8796
}
8897

8998
private Collection<Map<EventRegistrationBuilder, CloudEvent>> eventAdded(
9099
CorrelationOperations operations, String reg, CloudEvent event) {
91100
logger.debug(
92101
"Received event {} for definition {} and registration {}", event, definition.id(), reg);
93-
Map<String, List<CloudEvent>> events = initMap();
102+
Map<String, Collection<CloudEvent>> events = initMap();
94103
operations.retrieveEvents(events);
95104
events.get(reg).add(event);
96105
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
@@ -99,39 +108,40 @@ private Collection<Map<EventRegistrationBuilder, CloudEvent>> eventAdded(
99108
return result;
100109
}
101110

102-
private Map<String, List<CloudEvent>> initMap() {
111+
private Map<String, Collection<CloudEvent>> initMap() {
103112
return id2RegMapping.keySet().stream()
104-
.collect(Collectors.toMap(k -> k, k -> new ArrayList<>()));
113+
.collect(Collectors.toMap(k -> k, k -> new LinkedHashSet<>()));
105114
}
106115

107116
private Collection<Map<EventRegistrationBuilder, CloudEvent>> startupCheck(
108117
CorrelationOperations operations) {
109118
logger.debug("Checking cloud events for definition {}", definition.id());
110119
operations.clearProcessed();
111-
Map<String, List<CloudEvent>> events = initMap();
120+
Map<String, Collection<CloudEvent>> events = initMap();
112121
operations.retrieveEvents(events);
113122
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = checkCorrelation(events);
114123
markProcessed(operations, result);
115124
return result;
116125
}
117126

118127
private final Collection<Map<EventRegistrationBuilder, CloudEvent>> checkCorrelation(
119-
Map<String, List<CloudEvent>> events) {
128+
Map<String, Collection<CloudEvent>> events) {
120129
logger.debug("Stored CloudEvents for definition {} are {}", definition.id(), events);
121-
if (events.isEmpty()) {
122-
return List.of();
123-
}
124130
Collection<Map<EventRegistrationBuilder, CloudEvent>> result = new ArrayList<>();
131+
Map<String, Iterator<CloudEvent>> iteratingEvents =
132+
events.entrySet().stream()
133+
.collect(Collectors.toMap(Entry::getKey, e -> e.getValue().iterator()));
125134
boolean notDone = true;
126135
while (notDone) {
127136
Map<EventRegistrationBuilder, CloudEvent> row = new HashMap<>();
128-
for (Entry<String, List<CloudEvent>> item : events.entrySet()) {
129-
List<CloudEvent> list = item.getValue();
130-
if (list.isEmpty()) {
137+
for (Entry<String, Iterator<CloudEvent>> item : iteratingEvents.entrySet()) {
138+
Iterator<CloudEvent> iter = item.getValue();
139+
if (!iter.hasNext()) {
131140
notDone = false;
132141
break;
133142
}
134-
row.put(id2RegMapping.get(item.getKey()), list.remove(0));
143+
row.put(id2RegMapping.get(item.getKey()), iter.next());
144+
iter.remove();
135145
}
136146
if (notDone) {
137147
result.add(row);

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717

1818
import io.cloudevents.CloudEvent;
1919
import java.util.Collection;
20-
import java.util.List;
2120
import java.util.Map;
2221

23-
interface CorrelationOperations {
22+
public interface CorrelationOperations {
2423

25-
default void retrieveEvents(Map<String, List<CloudEvent>> reg2EventsMap) {}
24+
default void retrieveEvents(Map<String, Collection<CloudEvent>> reg2EventsMap) {}
2625

2726
default void storeEvent(String regId, CloudEvent event) {}
2827

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/OperationAllStrategyCorrelationInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.util.Map;
2323
import java.util.function.Function;
2424

25-
class OperationAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo {
25+
public class OperationAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo {
2626

2727
private final PersistenceInstanceOperations operations;
2828

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/StoreAllStrategyCorrelationInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626

27-
class StoreAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo {
27+
public class StoreAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo {
2828

2929
private static final Logger logger =
3030
LoggerFactory.getLogger(StoreAllStrategyCorrelationInfo.class);

0 commit comments

Comments
 (0)