Skip to content

Commit 8554198

Browse files
committed
Add publishToInternalChannel API
1 parent 7fed832 commit 8554198

File tree

6 files changed

+182
-40
lines changed

6 files changed

+182
-40
lines changed

iwf-idl

src/main/java/io/iworkflow/core/Client.java

Lines changed: 75 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,7 @@
55
import io.iworkflow.core.exceptions.WorkflowAlreadyStartedException;
66
import io.iworkflow.core.exceptions.WorkflowNotExistsException;
77
import io.iworkflow.core.persistence.PersistenceOptions;
8-
import io.iworkflow.gen.models.ErrorSubStatus;
9-
import io.iworkflow.gen.models.KeyValue;
10-
import io.iworkflow.gen.models.SearchAttribute;
11-
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
12-
import io.iworkflow.gen.models.SearchAttributeValueType;
13-
import io.iworkflow.gen.models.StateCompletionOutput;
14-
import io.iworkflow.gen.models.WorkflowGetDataObjectsResponse;
15-
import io.iworkflow.gen.models.WorkflowGetResponse;
16-
import io.iworkflow.gen.models.WorkflowGetSearchAttributesResponse;
17-
import io.iworkflow.gen.models.WorkflowSearchRequest;
18-
import io.iworkflow.gen.models.WorkflowSearchResponse;
8+
import io.iworkflow.gen.models.*;
199
import io.iworkflow.gen.models.WorkflowStateOptions;
2010
import net.bytebuddy.ByteBuddy;
2111
import net.bytebuddy.implementation.MethodDelegation;
@@ -505,6 +495,80 @@ public void signalWorkflow(
505495
signalWorkflow(workflowClass, workflowId, "", signalChannelName, signalValue);
506496
}
507497

498+
499+
/**
500+
* Send a single message to internalChannel
501+
*
502+
* @param workflowClass required
503+
* @param workflowId required
504+
* @param internalChannelName required
505+
* @param channelMessage optional, can be null.
506+
* @throws NoRunningWorkflowException if the workflow is not existing or not running
507+
*/
508+
public void publishToInternalChannel(
509+
final Class<? extends ObjectWorkflow> workflowClass,
510+
final String workflowId,
511+
final String internalChannelName,
512+
final Object channelMessage) {
513+
publishToInternalChannel(workflowClass, workflowId, "", internalChannelName, channelMessage);
514+
}
515+
516+
/**
517+
* Send a single message to internalChannel
518+
*
519+
* @param workflowClass required
520+
* @param workflowId required
521+
* @param workflowRunId optional, can be empty
522+
* @param internalChannelName required
523+
* @param channelMessage optional, can be null.
524+
* @throws NoRunningWorkflowException if the workflow is not existing or not running
525+
*/
526+
public void publishToInternalChannel(
527+
final Class<? extends ObjectWorkflow> workflowClass,
528+
final String workflowId,
529+
final String workflowRunId,
530+
final String internalChannelName,
531+
final Object channelMessage) {
532+
publishToInternalChannelBatch(workflowClass, workflowId, workflowRunId, internalChannelName, channelMessage);
533+
}
534+
535+
/**
536+
* Send a batch of messages to internalChannel
537+
*
538+
* @param workflowClass required
539+
* @param workflowId required
540+
* @param workflowRunId optional, can be empty
541+
* @param internalChannelName required
542+
* @param channelMessages optional, can be null. messages in batch
543+
* @throws NoRunningWorkflowException if the workflow is not existing or not running
544+
*/
545+
public void publishToInternalChannelBatch(
546+
final Class<? extends ObjectWorkflow> workflowClass,
547+
final String workflowId,
548+
final String workflowRunId,
549+
final String internalChannelName,
550+
final Object... channelMessages) {
551+
final String wfType = workflowClass.getSimpleName();
552+
553+
checkWorkflowTypeExists(wfType);
554+
555+
final Class<?> channelValueType = registry.getInternalChannelTypeStore(wfType).getType(internalChannelName);
556+
557+
List<InterStateChannelPublishing> rawMessages = new ArrayList<>(channelMessages.length);
558+
for (Object channelValue : channelMessages) {
559+
if (channelValue != null && !channelValueType.isInstance(channelValue)) {
560+
throw new IllegalArgumentException(String.format("message value is not of channel type %s", channelValueType.getName()));
561+
}
562+
rawMessages.add(
563+
new InterStateChannelPublishing()
564+
.channelName(internalChannelName)
565+
.value(clientOptions.getObjectEncoder().encode(channelValue))
566+
);
567+
}
568+
569+
unregisteredClient.publishToInternalChannel(workflowId, workflowRunId, rawMessages);
570+
}
571+
508572
/**
509573
* @param workflowId required
510574
* @param workflowRunId optional, can be empty

src/main/java/io/iworkflow/core/UnregisteredClient.java

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,34 +8,7 @@
88
import io.iworkflow.core.validator.CronScheduleValidator;
99
import io.iworkflow.gen.api.ApiClient;
1010
import io.iworkflow.gen.api.DefaultApi;
11-
import io.iworkflow.gen.models.EncodedObject;
12-
import io.iworkflow.gen.models.KeyValue;
13-
import io.iworkflow.gen.models.PersistenceLoadingPolicy;
14-
import io.iworkflow.gen.models.SearchAttribute;
15-
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
16-
import io.iworkflow.gen.models.StateCompletionOutput;
17-
import io.iworkflow.gen.models.WorkflowGetDataObjectsRequest;
18-
import io.iworkflow.gen.models.WorkflowGetDataObjectsResponse;
19-
import io.iworkflow.gen.models.WorkflowGetRequest;
20-
import io.iworkflow.gen.models.WorkflowGetResponse;
21-
import io.iworkflow.gen.models.WorkflowGetSearchAttributesRequest;
22-
import io.iworkflow.gen.models.WorkflowGetSearchAttributesResponse;
23-
import io.iworkflow.gen.models.WorkflowResetRequest;
24-
import io.iworkflow.gen.models.WorkflowResetResponse;
25-
import io.iworkflow.gen.models.WorkflowRpcRequest;
26-
import io.iworkflow.gen.models.WorkflowRpcResponse;
27-
import io.iworkflow.gen.models.WorkflowSearchRequest;
28-
import io.iworkflow.gen.models.WorkflowSearchResponse;
29-
import io.iworkflow.gen.models.WorkflowSetDataObjectsRequest;
30-
import io.iworkflow.gen.models.WorkflowSetSearchAttributesRequest;
31-
import io.iworkflow.gen.models.WorkflowSignalRequest;
32-
import io.iworkflow.gen.models.WorkflowSkipTimerRequest;
33-
import io.iworkflow.gen.models.WorkflowStartOptions;
34-
import io.iworkflow.gen.models.WorkflowStartRequest;
35-
import io.iworkflow.gen.models.WorkflowStartResponse;
36-
import io.iworkflow.gen.models.WorkflowStatus;
37-
import io.iworkflow.gen.models.WorkflowStopRequest;
38-
import io.iworkflow.gen.models.WorkflowWaitForStateCompletionRequest;
11+
import io.iworkflow.gen.models.*;
3912

4013
import java.util.List;
4114
import java.util.Map;
@@ -449,6 +422,23 @@ public void signalWorkflow(
449422
}
450423
}
451424

425+
public void publishToInternalChannel(
426+
final String workflowId,
427+
final String workflowRunId,
428+
final List<InterStateChannelPublishing> messages){
429+
430+
try {
431+
defaultApi.apiV1WorkflowPublishToInternalChannelPost(
432+
new PublishToInternalChannelRequest()
433+
.messages(messages)
434+
.workflowId(workflowId)
435+
.workflowRunId(workflowRunId)
436+
);
437+
} catch (FeignException.FeignClientException exp) {
438+
throw IwfHttpException.fromFeignException(clientOptions.getObjectEncoder(), exp);
439+
}
440+
}
441+
452442
/**
453443
* @param workflowId workflowId
454444
* @param workflowRunId workflowRunId

src/test/java/io/iworkflow/integ/InternalChannelTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.iworkflow.core.Client;
44
import io.iworkflow.core.ClientOptions;
55
import io.iworkflow.integ.internalchannel.BasicInternalChannelWorkflow;
6+
import io.iworkflow.integ.internalchannel.WaitingInternalChannelWorkflow;
67
import io.iworkflow.spring.TestSingletonWorkerService;
78
import io.iworkflow.spring.controller.WorkflowRegistry;
89
import org.junit.jupiter.api.Assertions;
@@ -28,4 +29,16 @@ public void testBasicInternalWorkflow() throws InterruptedException {
2829
final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
2930
Assertions.assertEquals(3, output);
3031
}
32+
33+
@Test
34+
public void testWaitingInternalWorkflow() throws InterruptedException {
35+
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
36+
final String wfId = "waiting-internal-test-id" + System.currentTimeMillis() / 1000;
37+
final Integer input = 1;
38+
final String runId = client.startWorkflow(
39+
WaitingInternalChannelWorkflow.class, wfId, 10, input);
40+
client.publishToInternalChannelBatch(WaitingInternalChannelWorkflow.class, wfId, "", WaitingInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME, 2, 3);
41+
final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
42+
Assertions.assertEquals(6, output);
43+
}
3144
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.iworkflow.integ.internalchannel;
2+
3+
import io.iworkflow.core.ObjectWorkflow;
4+
import io.iworkflow.core.StateDef;
5+
import io.iworkflow.core.communication.CommunicationMethodDef;
6+
import io.iworkflow.core.communication.InternalChannelDef;
7+
import org.springframework.stereotype.Component;
8+
9+
import java.util.Arrays;
10+
import java.util.List;
11+
12+
@Component
13+
public class WaitingInternalChannelWorkflow implements ObjectWorkflow {
14+
public static final String INTER_STATE_CHANNEL_NAME = "test-inter-state-channel-1";
15+
16+
@Override
17+
public List<CommunicationMethodDef> getCommunicationSchema() {
18+
return Arrays.asList(
19+
InternalChannelDef.create(Integer.class, INTER_STATE_CHANNEL_NAME)
20+
);
21+
}
22+
23+
@Override
24+
public List<StateDef> getWorkflowStates() {
25+
return Arrays.asList(
26+
StateDef.startingState(new WaitingInternalChannelWorkflowState())
27+
);
28+
}
29+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package io.iworkflow.integ.internalchannel;
2+
3+
import io.iworkflow.core.Context;
4+
import io.iworkflow.core.StateDecision;
5+
import io.iworkflow.core.WorkflowState;
6+
import io.iworkflow.core.command.CommandRequest;
7+
import io.iworkflow.core.command.CommandResults;
8+
import io.iworkflow.core.communication.Communication;
9+
import io.iworkflow.core.communication.InternalChannelCommand;
10+
import io.iworkflow.core.communication.InternalChannelCommandResult;
11+
import io.iworkflow.core.persistence.Persistence;
12+
13+
public class WaitingInternalChannelWorkflowState implements WorkflowState<Integer> {
14+
15+
@Override
16+
public Class<Integer> getInputType() {
17+
return Integer.class;
18+
}
19+
20+
@Override
21+
public CommandRequest waitUntil(
22+
Context context,
23+
Integer input,
24+
Persistence persistence,
25+
final Communication communication) {
26+
return CommandRequest.forAllCommandCompleted(
27+
InternalChannelCommand.create(WaitingInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME),
28+
InternalChannelCommand.create(WaitingInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME)
29+
);
30+
}
31+
32+
@Override
33+
public StateDecision execute(
34+
Context context,
35+
Integer input,
36+
CommandResults commandResults,
37+
Persistence persistence,
38+
final Communication communication) {
39+
final InternalChannelCommandResult result1 = commandResults.getAllInternalChannelCommandResult().get(0);
40+
final InternalChannelCommandResult result2 = commandResults.getAllInternalChannelCommandResult().get(1);
41+
42+
Integer output = input + (Integer) result1.getValue().get() + (Integer) result2.getValue().get();
43+
44+
return StateDecision.gracefulCompleteWorkflow(output);
45+
}
46+
}

0 commit comments

Comments
 (0)