Skip to content

Commit 7898804

Browse files
authored
[Fix #1395] persistence helper classes for allcorrelation (#1396)
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent fcd26da commit 7898804

38 files changed

Lines changed: 1002 additions & 77 deletions

File tree

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,8 @@ public WorkflowApplication build() {
444444
.orElseGet(() -> new DefaultCloudEventPredicateFactory());
445445
}
446446
if (allStrategyCorrelationInfoFactory == null) {
447-
allStrategyCorrelationInfoFactory = definition -> new InMemoryAllStrategyCorrelationInfo();
447+
allStrategyCorrelationInfoFactory =
448+
definition -> InMemoryAllStrategyCorrelationInfo.instance();
448449
}
449450

450451
if (defaultCatalogURI == null) {

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import java.util.concurrent.CompletableFuture;
19+
import java.util.function.Supplier;
1920

2021
public interface WorkflowInstance extends WorkflowInstanceData {
2122
CompletableFuture<WorkflowModel> start();
@@ -49,4 +50,6 @@ public interface WorkflowInstance extends WorkflowInstanceData {
4950
boolean cancel();
5051

5152
boolean resume();
53+
54+
<T> T addMetadataIfAbsent(String key, Supplier<T> supplier);
5255
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstanceData.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import java.time.Instant;
19+
import java.util.Optional;
1920

2021
public interface WorkflowInstanceData {
2122
String id();
@@ -29,4 +30,6 @@ public interface WorkflowInstanceData {
2930
WorkflowStatus status();
3031

3132
WorkflowModel context();
33+
34+
<T> Optional<T> findMetadata(String key, Class<T> objectClass);
3235
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,16 @@ public void addCancelable(CompletableFuture<?> cancelable) {
350350
}
351351
}
352352

353-
public <T> T additionalObject(String key, Supplier<T> supplier) {
353+
@Override
354+
public <T> T addMetadataIfAbsent(String key, Supplier<T> supplier) {
354355
return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get());
355356
}
356357

358+
@Override
359+
public <T> Optional<T> findMetadata(String key, Class<T> objectClass) {
360+
Object value = additionalObjects.get(key);
361+
return objectClass.isInstance(value) ? Optional.of(objectClass.cast(value)) : Optional.empty();
362+
}
363+
357364
public void restoreContext(WorkflowContext workflow, TaskContext context) {}
358365
}

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
*/
1616
package io.serverlessworkflow.impl.marshaller;
1717

18+
import java.net.URI;
1819
import java.time.Instant;
20+
import java.time.OffsetDateTime;
21+
import java.time.ZoneOffset;
1922
import java.util.ArrayList;
2023
import java.util.Collection;
2124
import java.util.LinkedHashMap;
@@ -112,6 +115,12 @@ public Object readObject() {
112115
case CUSTOM:
113116
return readCustomObject();
114117

118+
case URI:
119+
return URI.create(readString());
120+
121+
case OFFSET_DATE_TIME:
122+
return OffsetDateTime.ofInstant(readInstant(), ZoneOffset.of(readString()));
123+
115124
default:
116125
throw new IllegalStateException("Unsupported type " + type);
117126
}

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
package io.serverlessworkflow.impl.marshaller;
1717

1818
import io.serverlessworkflow.impl.WorkflowModel;
19+
import java.net.URI;
1920
import java.time.Instant;
21+
import java.time.OffsetDateTime;
2022
import java.util.Collection;
2123
import java.util.Map;
2224
import java.util.concurrent.ConcurrentHashMap;
@@ -100,9 +102,16 @@ public WorkflowOutputBuffer writeObject(Object object) {
100102
} else if (object instanceof Instant value) {
101103
writeType(Type.INSTANT);
102104
writeInstant(value);
103-
} else if (object instanceof byte[] bytes) {
105+
} else if (object instanceof OffsetDateTime value) {
106+
writeType(Type.OFFSET_DATE_TIME);
107+
writeInstant(value.toInstant());
108+
writeString(value.getOffset().toString());
109+
} else if (object instanceof URI value) {
110+
writeType(Type.URI);
111+
writeString(value.toString());
112+
} else if (object instanceof byte[] value) {
104113
writeType(Type.BYTES);
105-
writeBytes(bytes);
114+
writeBytes(value);
106115
} else {
107116
internalWriteObject(object);
108117
}

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,21 @@
1515
*/
1616
package io.serverlessworkflow.impl.marshaller;
1717

18+
import io.cloudevents.CloudEvent;
19+
import io.cloudevents.core.builder.CloudEventBuilder;
1820
import io.serverlessworkflow.impl.WorkflowModel;
1921
import java.io.ByteArrayInputStream;
2022
import java.io.ByteArrayOutputStream;
23+
import java.io.IOException;
24+
import java.io.UncheckedIOException;
2125
import java.lang.reflect.Modifier;
26+
import java.net.URI;
2227
import java.time.Instant;
28+
import java.time.OffsetDateTime;
2329
import java.util.ArrayList;
2430
import java.util.Collection;
2531
import java.util.List;
32+
import java.util.Set;
2633
import java.util.function.BiConsumer;
2734
import java.util.function.Function;
2835
import org.slf4j.Logger;
@@ -71,6 +78,58 @@ public static byte[] writeString(WorkflowBufferFactory factory, String value) {
7178
return writeValue(factory, value, (b, v) -> b.writeString(v));
7279
}
7380

81+
public static byte[] writeCloudEventExtensions(WorkflowBufferFactory factory, CloudEvent event) {
82+
try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
83+
WorkflowOutputBuffer out = factory.output(bytesOut)) {
84+
writeCloudEventExtensions(out, event);
85+
return bytesOut.toByteArray();
86+
} catch (IOException e) {
87+
throw new UncheckedIOException(e);
88+
}
89+
}
90+
91+
public static void writeCloudEventExtensions(WorkflowOutputBuffer out, CloudEvent event) {
92+
Set<String> extensionNames = event.getExtensionNames();
93+
out.writeInt(extensionNames.size());
94+
for (String extensionName : extensionNames) {
95+
out.writeString(extensionName);
96+
out.writeObject(event.getExtension(extensionName));
97+
}
98+
}
99+
100+
public static CloudEventBuilder readCloudEventExtensions(
101+
WorkflowBufferFactory factory, byte[] value, CloudEventBuilder builder) {
102+
try (ByteArrayInputStream bytesInt = new ByteArrayInputStream(value);
103+
WorkflowInputBuffer in = factory.input(bytesInt)) {
104+
return readCloudEventExtenstions(in, value, builder);
105+
} catch (IOException e) {
106+
throw new UncheckedIOException(e);
107+
}
108+
}
109+
110+
public static CloudEventBuilder readCloudEventExtenstions(
111+
WorkflowInputBuffer in, byte[] value, CloudEventBuilder builder) {
112+
int size = in.readInt();
113+
while (size-- > 0) {
114+
String extensionName = in.readString();
115+
Object extensionValue = in.readObject();
116+
if (extensionValue instanceof Number extValue) {
117+
builder.withExtension(extensionName, extValue);
118+
} else if (extensionValue instanceof String extValue) {
119+
builder.withExtension(extensionName, extValue);
120+
} else if (extensionValue instanceof Boolean extValue) {
121+
builder.withExtension(extensionName, extValue);
122+
} else if (extensionValue instanceof byte[] extValue) {
123+
builder.withExtension(extensionName, extValue);
124+
} else if (extensionValue instanceof OffsetDateTime extValue) {
125+
builder.withExtension(extensionName, extValue);
126+
} else if (extensionValue instanceof URI extValue) {
127+
builder.withExtension(extensionName, extValue);
128+
}
129+
}
130+
return builder;
131+
}
132+
74133
public static String readString(WorkflowBufferFactory factory, byte[] value) {
75134
return readValue(factory, value, WorkflowInputBuffer::readString);
76135
}

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/Type.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,7 @@ enum Type {
2929
MAP,
3030
COLLECTION,
3131
NULL,
32-
CUSTOM
32+
CUSTOM,
33+
OFFSET_DATE_TIME,
34+
URI
3335
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/AllStrategyCorrelationInfo.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,22 @@
1616
package io.serverlessworkflow.impl.scheduler;
1717

1818
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.WorkflowInstance;
1920
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
2021
import java.util.Collection;
22+
import java.util.Map;
2123
import java.util.function.Consumer;
2224

2325
public interface AllStrategyCorrelationInfo extends AutoCloseable {
24-
void correlate(
25-
EventRegistrationBuilder reg, CloudEvent event, Consumer<Collection<CloudEvent>> starter);
2626

27-
void register(EventRegistrationBuilder reg);
27+
void init(
28+
Collection<EventRegistrationBuilder> reg,
29+
Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter);
30+
31+
void correlate(EventRegistrationBuilder reg, CloudEvent event);
32+
33+
default void addMetadata(
34+
WorkflowInstance instance, Map<EventRegistrationBuilder, CloudEvent> events) {}
2835

2936
default void close() {}
3037
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package io.serverlessworkflow.impl.scheduler;
1717

1818
import io.serverlessworkflow.impl.WorkflowDefinition;
19-
import io.serverlessworkflow.impl.WorkflowModel;
19+
import io.serverlessworkflow.impl.WorkflowInstance;
2020
import java.util.concurrent.Executors;
2121
import java.util.concurrent.ScheduledExecutorService;
2222
import java.util.concurrent.ScheduledFuture;
@@ -83,10 +83,10 @@ protected CronResolverIntanceRunner(WorkflowDefinition definition) {
8383
}
8484

8585
@Override
86-
public void accept(WorkflowModel model) {
86+
public void accept(WorkflowInstance instance) {
8787
if (!cancelled.get()) {
8888
scheduleNext();
89-
super.accept(model);
89+
super.accept(instance);
9090
}
9191
}
9292
}

0 commit comments

Comments
 (0)