Skip to content

Commit b73ffaf

Browse files
impolitepandaEvaE-Filigrancamrrxsavacano28
authored
[backend] feat(wf-engine): init split service layer (#4842)
Co-authored-by: EvaE-Filigran <eva.ezzeribi@filigran.io> Co-authored-by: Camille Roux <camille.roux@filigran.io> Co-authored-by: savacano28 <stephanya.casanova@filigran.io>
1 parent 9e77020 commit b73ffaf

87 files changed

Lines changed: 16367 additions & 712 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package io.openaev.aop;
2+
3+
import io.openaev.rest.settings.PreviewFeature;
4+
import io.openaev.service.PreviewFeatureService;
5+
import io.openaev.service.chaining.QueueChainingService;
6+
import io.openaev.service.chaining.StepService;
7+
import java.io.IOException;
8+
import java.util.*;
9+
import lombok.RequiredArgsConstructor;
10+
import lombok.extern.slf4j.Slf4j;
11+
import org.apache.commons.lang3.StringUtils;
12+
import org.aspectj.lang.JoinPoint;
13+
import org.aspectj.lang.annotation.AfterReturning;
14+
import org.aspectj.lang.annotation.Aspect;
15+
import org.aspectj.lang.reflect.MethodSignature;
16+
import org.springframework.expression.EvaluationContext;
17+
import org.springframework.expression.Expression;
18+
import org.springframework.expression.ExpressionParser;
19+
import org.springframework.expression.spel.standard.SpelExpressionParser;
20+
import org.springframework.expression.spel.support.StandardEvaluationContext;
21+
import org.springframework.stereotype.Component;
22+
23+
/**
24+
* Aspect that intercepts methods annotated with {@link WorkflowUpdateEvent} to trigger workflow
25+
* updates in the chaining engine.
26+
*
27+
* <p>This aspect uses SpEL expressions defined in the annotation to extract inject or expectation
28+
* IDs from method parameters, then sends update events to the workflow external update queue.
29+
*/
30+
@Aspect
31+
@Component
32+
@RequiredArgsConstructor
33+
@Slf4j
34+
public class WorkflowUpdateEventAspect {
35+
36+
/**
37+
* This list will contain IDs of any event that couldn't be sent due to a problem with the queue
38+
* (most likely a network error, or the queue system being down)
39+
*/
40+
private Set<String> unsentEventsCache = new HashSet<>();
41+
42+
private final PreviewFeatureService previewFeatureService;
43+
44+
private final QueueChainingService queueChainingService;
45+
private final StepService stepService;
46+
47+
private final ExpressionParser parser = new SpelExpressionParser();
48+
49+
/**
50+
* Advice executed after methods annotated with {@link WorkflowUpdateEvent} returns. If an
51+
* exception is throw by the annotated method, this advice will not be called.
52+
*
53+
* <p>Extracts the inject ID or expectation IDs from method parameters using SpEL expressions
54+
* defined in the annotation, then dispatches workflow update events accordingly.
55+
*
56+
* @param joinPoint the join point providing access to the method signature and arguments
57+
* @param annotation the {@link WorkflowUpdateEvent} annotation containing SpEL expressions
58+
* @throws IllegalStateException if the annotation does not specify exactly one of injectId or
59+
* expectationIds
60+
*/
61+
@AfterReturning("@annotation(annotation)")
62+
public void afterEventProcessed(JoinPoint joinPoint, WorkflowUpdateEvent annotation) {
63+
if (!previewFeatureService.isFeatureEnabled(PreviewFeature.INJECT_CHAINING)) {
64+
// No chaining enabled, does nothing
65+
return;
66+
}
67+
68+
String injectIdSPEL = annotation.injectId();
69+
String expectationIdsSPEL = annotation.expectationIds();
70+
71+
boolean hasInjectId = StringUtils.isNotBlank(injectIdSPEL);
72+
boolean hasExpectation = StringUtils.isNotBlank(expectationIdsSPEL);
73+
74+
if (hasInjectId == hasExpectation) {
75+
throw new IllegalStateException(
76+
"Annotation @WorkflowUpdateEvent on "
77+
+ joinPoint.getSignature().toShortString()
78+
+ " must set exactly one of injectId or expectationIds");
79+
}
80+
81+
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
82+
String[] parameterNames = signature.getParameterNames();
83+
Object[] args = joinPoint.getArgs();
84+
85+
// Create SpEL evaluation context to retrieve the resource ID if it exists
86+
EvaluationContext context = new StandardEvaluationContext();
87+
88+
// Add all method parameters to context
89+
for (int i = 0; i < parameterNames.length; i++) {
90+
context.setVariable(parameterNames[i], args[i]);
91+
}
92+
93+
if (hasInjectId) {
94+
this.handleInjectIdParam(context, injectIdSPEL);
95+
} else {
96+
this.handleExpectationTracesParam(context, expectationIdsSPEL);
97+
}
98+
99+
// Retry events that are stored in the unsent cache due to previous errors
100+
sendEvents(unsentEventsCache);
101+
}
102+
103+
/**
104+
* Send a workflow update event related to the given inject to the queue, and update the unsent
105+
* events cache if an error occurs
106+
*
107+
* @param context the SpEL evaluation context
108+
* @param injectIdSPEL the SpEL expression to fetch the injectId from the request
109+
*/
110+
private void handleInjectIdParam(EvaluationContext context, String injectIdSPEL) {
111+
Expression exp = parser.parseExpression(injectIdSPEL);
112+
String injectId =
113+
exp.getValue(context) != null
114+
? Objects.requireNonNull(exp.getValue(context)).toString()
115+
: "";
116+
117+
if (!injectId.isEmpty()) {
118+
String stepId = stepService.findStepIdByInjectId(injectId);
119+
try {
120+
queueChainingService.updateStep(stepId);
121+
} catch (IOException e) {
122+
// In case an error occurs, we store the inject in the unsent event cache to be retried
123+
// later, when other events will be sent
124+
unsentEventsCache.add(stepId);
125+
}
126+
}
127+
}
128+
129+
/**
130+
* Send a workflow update event related to all the injects related to the given expectation IDs to
131+
* the queue
132+
*
133+
* @param context the SpEL evaluation context
134+
* @param expectationIDsdSPEL the SpEL expression to fetch the injectId from the request
135+
*/
136+
private void handleExpectationTracesParam(EvaluationContext context, String expectationIDsdSPEL) {
137+
Expression exp = parser.parseExpression(expectationIDsdSPEL);
138+
Object expectationIdsFromSPEL =
139+
exp.getValue(context) != null ? Objects.requireNonNull(exp.getValue(context)) : null;
140+
141+
Set<String> expectationIds = new HashSet<>();
142+
if (expectationIdsFromSPEL instanceof Collection<?> c) {
143+
c.stream().map(Object::toString).forEach(expectationIds::add);
144+
} else if (expectationIdsFromSPEL instanceof String expectationId) {
145+
expectationIds.add(expectationId);
146+
} else {
147+
throw new IllegalStateException(
148+
"@WorkflowUpdateEvent.expectationIDsdSpEL must return a Collection or a String");
149+
}
150+
151+
Set<String> stepIds = stepService.findStepIdsByExpectationIds(expectationIds);
152+
sendEvents(stepIds);
153+
}
154+
155+
/**
156+
* Send a list of events in the queue, and update the unsent events cache if an error occurs
157+
*
158+
* @param stepIds step IDs to notify with an event
159+
*/
160+
private void sendEvents(Set<String> stepIds) {
161+
if (stepIds.isEmpty()) {
162+
return;
163+
}
164+
Set<String> remainingUnsetEvents = new HashSet<>(stepIds);
165+
try {
166+
for (String stepId : stepIds) {
167+
queueChainingService.updateStep(stepId);
168+
remainingUnsetEvents.remove(stepId);
169+
}
170+
unsentEventsCache.clear();
171+
} catch (IOException e) {
172+
// If a fail occurs, no need to continue the loop, just keep the remaining events in the
173+
// unsent events cache to be retried later, when other events will be sent
174+
unsentEventsCache = remainingUnsetEvents;
175+
}
176+
}
177+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.openaev.api.chaining;
2+
3+
import io.openaev.api.chaining.dto.StepsCreateInput;
4+
import io.openaev.database.model.Step;
5+
import io.openaev.database.model.Workflow;
6+
import io.openaev.rest.exception.ChainingException;
7+
import java.util.Optional;
8+
9+
/** The interface Action step. IMPLEMENTED BY: - InjectExecutionStep */
10+
public interface ActionStep {
11+
/**
12+
* Create step.
13+
*
14+
* @param stepInput the step input
15+
* @param workflow the workflow
16+
* @return the step
17+
*/
18+
Optional<Step> create(StepsCreateInput.StepCreateInput stepInput, Workflow workflow)
19+
throws ChainingException;
20+
21+
/**
22+
* Creates a Ready step. The step is created with status READY based on a step template.
23+
* Duplicates the template step and fills its content from the input.
24+
*
25+
* @param stepTemplate the stepTemplate
26+
* @param input the input for the new step
27+
* @param workflowRun the workflow run
28+
* @return the created Ready step
29+
*/
30+
Optional<Step> ready(Step stepTemplate, String input, Workflow workflowRun)
31+
throws ChainingException;
32+
33+
/**
34+
* Executes a Ready step. Changes the status from READY to RUN.
35+
*
36+
* @param readyStep the step currently in READY status
37+
* @return the step after being set to RUN
38+
*/
39+
Optional<Step> run(Step readyStep) throws ChainingException;
40+
41+
/**
42+
* Updates a step. Applies the necessary processing based on the new output.
43+
*
44+
* @param stepRun the step run to update
45+
* @return the updated step
46+
*/
47+
Optional<Step> update(Step stepRun) throws ChainingException;
48+
49+
/**
50+
* Ends a step. Checks if all expected outputs have been received and updates the status from RUN
51+
* to END.
52+
*
53+
* @param stepRun the step to end
54+
* @param workflow the workflow
55+
*/
56+
void end(Step stepRun, Workflow workflow) throws ChainingException;
57+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.openaev.api.chaining;
2+
3+
import com.fasterxml.jackson.annotation.JsonSubTypes;
4+
import com.fasterxml.jackson.annotation.JsonTypeInfo;
5+
import io.openaev.rest.inject.form.InjectInput;
6+
7+
/**
8+
* Interface representing the data associated with a step.
9+
*
10+
* <p>This is a polymorphic type used to handle different kinds of step data. The concrete type is
11+
* determined by the JSON property {@code "type"}.
12+
*
13+
* <p>Currently supported implementations:
14+
*
15+
* <ul>
16+
* <li>{@link InjectInput} with type name "inject"
17+
* </ul>
18+
*
19+
* <p>When serialized/deserialized with Jackson, the {@code type} property determines which concrete
20+
* implementation to use.
21+
*/
22+
@JsonTypeInfo(
23+
use = JsonTypeInfo.Id.NAME,
24+
include = JsonTypeInfo.As.PROPERTY,
25+
property = "type",
26+
defaultImpl = InjectInput.class)
27+
@JsonSubTypes({@JsonSubTypes.Type(value = InjectInput.class, name = "inject")})
28+
public interface DataInputStep {}

0 commit comments

Comments
 (0)