22
33import com .fasterxml .jackson .core .JsonProcessingException ;
44import com .google .common .base .Defaults ;
5+ import com .google .common .base .Strings ;
56import io .nexusrpc .Header ;
67import io .nexusrpc .handler .HandlerException ;
78import io .nexusrpc .handler .ServiceImplInstance ;
@@ -88,23 +89,6 @@ public static NexusWorkflowStarter createNexusBoundStub(
8889 HandlerException .ErrorType .BAD_REQUEST ,
8990 new IllegalArgumentException ("failed to generate workflow operation token" , e ));
9091 }
91- // Add the Nexus operation ID to the headers if it is not already present to support fabricating
92- // a NexusOperationStarted event if the completion is received before the response to a
93- // StartOperation request.
94- Map <String , String > headers =
95- request .getCallbackHeaders ().entrySet ().stream ()
96- .collect (
97- Collectors .toMap (
98- (k ) -> k .getKey ().toLowerCase (),
99- Map .Entry ::getValue ,
100- (a , b ) -> a ,
101- () -> new TreeMap <>(String .CASE_INSENSITIVE_ORDER )));
102- if (!headers .containsKey (Header .OPERATION_ID )) {
103- headers .put (Header .OPERATION_ID .toLowerCase (), operationToken );
104- }
105- if (!headers .containsKey (Header .OPERATION_TOKEN )) {
106- headers .put (Header .OPERATION_TOKEN .toLowerCase (), operationToken );
107- }
10892 List <Link > links =
10993 request .getLinks () == null
11094 ? null
@@ -127,21 +111,42 @@ public static NexusWorkflowStarter createNexusBoundStub(
127111 })
128112 .filter (Objects ::nonNull )
129113 .collect (Collectors .toList ());
130- Callback .Builder cbBuilder =
131- Callback .newBuilder ()
132- .setNexus (
133- Callback .Nexus .newBuilder ()
134- .setUrl (request .getCallbackUrl ())
135- .putAllHeader (headers )
136- .build ());
137- if (links != null ) {
138- cbBuilder .addAllLinks (links );
139- }
140114 WorkflowOptions .Builder nexusWorkflowOptions =
141- WorkflowOptions .newBuilder (options )
142- .setRequestId (request .getRequestId ())
143- .setCompletionCallbacks (Collections .singletonList (cbBuilder .build ()))
144- .setLinks (links );
115+ WorkflowOptions .newBuilder (options ).setRequestId (request .getRequestId ()).setLinks (links );
116+
117+ // If a callback URL is provided, pass it as a completion callback.
118+ if (!Strings .isNullOrEmpty (request .getCallbackUrl ())) {
119+ // Add the Nexus operation ID to the headers if it is not already present to support
120+ // fabricating
121+ // a NexusOperationStarted event if the completion is received before the response to a
122+ // StartOperation request.
123+ Map <String , String > headers =
124+ request .getCallbackHeaders ().entrySet ().stream ()
125+ .collect (
126+ Collectors .toMap (
127+ (k ) -> k .getKey ().toLowerCase (),
128+ Map .Entry ::getValue ,
129+ (a , b ) -> a ,
130+ () -> new TreeMap <>(String .CASE_INSENSITIVE_ORDER )));
131+ if (!headers .containsKey (Header .OPERATION_ID )) {
132+ headers .put (Header .OPERATION_ID .toLowerCase (), operationToken );
133+ }
134+ if (!headers .containsKey (Header .OPERATION_TOKEN )) {
135+ headers .put (Header .OPERATION_TOKEN .toLowerCase (), operationToken );
136+ }
137+ Callback .Builder cbBuilder =
138+ Callback .newBuilder ()
139+ .setNexus (
140+ Callback .Nexus .newBuilder ()
141+ .setUrl (request .getCallbackUrl ())
142+ .putAllHeader (headers )
143+ .build ());
144+ if (links != null ) {
145+ cbBuilder .addAllLinks (links );
146+ }
147+ nexusWorkflowOptions .setCompletionCallbacks (Collections .singletonList (cbBuilder .build ()));
148+ }
149+
145150 if (options .getTaskQueue () == null ) {
146151 nexusWorkflowOptions .setTaskQueue (request .getTaskQueue ());
147152 }
0 commit comments