Skip to content

Commit 268279e

Browse files
Add NexusInfo
1 parent bf0b196 commit 268279e

9 files changed

Lines changed: 127 additions & 2 deletions

temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.uber.m3.tally.Scope;
44
import io.temporal.client.WorkflowClient;
55
import io.temporal.common.Experimental;
6+
import io.temporal.nexus.NexusInfo;
67

78
/**
89
* Can be used to intercept calls from a Nexus operation into the Temporal APIs.
@@ -20,6 +21,9 @@
2021
*/
2122
@Experimental
2223
public interface NexusOperationOutboundCallsInterceptor {
24+
/** Intercepts call to get the Nexus info in a Nexus operation. */
25+
NexusInfo getInfo();
26+
2327
/** Intercepts call to get the metric scope in a Nexus operation. */
2428
Scope getMetricsScope();
2529

temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptorBase.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.uber.m3.tally.Scope;
44
import io.temporal.client.WorkflowClient;
55
import io.temporal.common.Experimental;
6+
import io.temporal.nexus.NexusInfo;
67

78
/** Convenience base class for {@link NexusOperationOutboundCallsInterceptor} implementations. */
89
@Experimental
@@ -14,6 +15,11 @@ public NexusOperationOutboundCallsInterceptorBase(NexusOperationOutboundCallsInt
1415
this.next = next;
1516
}
1617

18+
@Override
19+
public NexusInfo getInfo() {
20+
return next.getInfo();
21+
}
22+
1723
@Override
1824
public Scope getMetricsScope() {
1925
return next.getMetricsScope();

temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.temporal.api.common.v1.Link;
55
import io.temporal.client.WorkflowClient;
66
import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor;
7+
import io.temporal.nexus.NexusInfo;
78
import io.temporal.nexus.NexusOperationContext;
89

910
public class InternalNexusOperationContext {
@@ -58,6 +59,11 @@ public Link getStartWorkflowResponseLink() {
5859
}
5960

6061
private class NexusOperationContextImpl implements NexusOperationContext {
62+
@Override
63+
public NexusInfo getInfo() {
64+
return outboundCalls.getInfo();
65+
}
66+
6167
@Override
6268
public Scope getMetricsScope() {
6369
return outboundCalls.getMetricsScope();
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.temporal.internal.nexus;
2+
3+
import io.temporal.nexus.NexusInfo;
4+
5+
class NexusInfoImpl implements NexusInfo {
6+
private final String namespace;
7+
private final String taskQueue;
8+
9+
NexusInfoImpl(String namespace, String taskQueue) {
10+
this.namespace = namespace;
11+
this.taskQueue = taskQueue;
12+
}
13+
14+
@Override
15+
public String getNamespace() {
16+
return namespace;
17+
}
18+
19+
@Override
20+
public String getTaskQueue() {
21+
return taskQueue;
22+
}
23+
}

temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationOutboundCallsInterceptor.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,24 @@
33
import com.uber.m3.tally.Scope;
44
import io.temporal.client.WorkflowClient;
55
import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor;
6+
import io.temporal.nexus.NexusInfo;
67

78
public class RootNexusOperationOutboundCallsInterceptor
89
implements NexusOperationOutboundCallsInterceptor {
910
private final Scope scope;
1011
private final WorkflowClient client;
12+
private final NexusInfo nexusInfo;
1113

12-
RootNexusOperationOutboundCallsInterceptor(Scope scope, WorkflowClient client) {
14+
RootNexusOperationOutboundCallsInterceptor(
15+
Scope scope, WorkflowClient client, NexusInfo nexusInfo) {
1316
this.scope = scope;
1417
this.client = client;
18+
this.nexusInfo = nexusInfo;
19+
}
20+
21+
@Override
22+
public NexusInfo getInfo() {
23+
return nexusInfo;
1524
}
1625

1726
@Override

temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ public OperationHandler<Object, Object> intercept(
2727
InternalNexusOperationContext temporalNexusContext = CurrentNexusOperationContext.get();
2828
inboundCallsInterceptor.init(
2929
new RootNexusOperationOutboundCallsInterceptor(
30-
temporalNexusContext.getMetricsScope(), temporalNexusContext.getWorkflowClient()));
30+
temporalNexusContext.getMetricsScope(),
31+
temporalNexusContext.getWorkflowClient(),
32+
new NexusInfoImpl(
33+
temporalNexusContext.getNamespace(), temporalNexusContext.getTaskQueue())));
3134
return new OperationInterceptorConverter(inboundCallsInterceptor);
3235
}
3336

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.temporal.nexus;
2+
3+
/**
4+
* Temporal information about the Nexus Operation. Use {@link NexusOperationContext#getInfo()} from
5+
* a Nexus Operation implementation to access.
6+
*/
7+
public interface NexusInfo {
8+
/**
9+
* @return Namespace of the worker that is executing the Nexus Operation
10+
*/
11+
String getNamespace();
12+
13+
/**
14+
* @return Nexus Task Queue of the worker that is executing the Nexus Operation
15+
*/
16+
String getTaskQueue();
17+
}

temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
*/
1111
public interface NexusOperationContext {
1212

13+
/** Get Temporal information about the Nexus Operation. */
14+
NexusInfo getInfo();
15+
1316
/**
1417
* Get scope for reporting business metrics in a nexus handler. This scope is tagged with the
1518
* service and operation.
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package io.temporal.workflow.nexus;
2+
3+
import io.nexusrpc.handler.OperationHandler;
4+
import io.nexusrpc.handler.OperationImpl;
5+
import io.nexusrpc.handler.ServiceImpl;
6+
import io.temporal.nexus.Nexus;
7+
import io.temporal.nexus.NexusInfo;
8+
import io.temporal.testing.internal.SDKTestWorkflowRule;
9+
import io.temporal.workflow.*;
10+
import io.temporal.workflow.shared.TestNexusServices;
11+
import io.temporal.workflow.shared.TestWorkflows;
12+
import org.junit.Assert;
13+
import org.junit.Rule;
14+
import org.junit.Test;
15+
16+
public class NexusOperationInfoTest {
17+
@Rule
18+
public SDKTestWorkflowRule testWorkflowRule =
19+
SDKTestWorkflowRule.newBuilder()
20+
.setWorkflowTypes(TestNexus.class)
21+
.setNexusServiceImplementation(new TestNexusServiceImpl())
22+
.build();
23+
24+
@Test
25+
public void testOperationHeaders() {
26+
TestWorkflows.TestWorkflow1 workflowStub =
27+
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
28+
Assert.assertEquals(
29+
"UnitTest:" + testWorkflowRule.getTaskQueue(),
30+
workflowStub.execute(testWorkflowRule.getTaskQueue()));
31+
}
32+
33+
public static class TestNexus implements TestWorkflows.TestWorkflow1 {
34+
@Override
35+
public String execute(String input) {
36+
// Try to call with the typed stub
37+
TestNexusServices.TestNexusService1 serviceStub =
38+
Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class);
39+
return serviceStub.operation(input);
40+
}
41+
}
42+
43+
@ServiceImpl(service = TestNexusServices.TestNexusService1.class)
44+
public static class TestNexusServiceImpl {
45+
@OperationImpl
46+
public OperationHandler<String, String> operation() {
47+
return OperationHandler.sync(
48+
(context, details, input) -> {
49+
NexusInfo info = Nexus.getOperationContext().getInfo();
50+
return info.getNamespace() + ":" + info.getTaskQueue();
51+
});
52+
}
53+
}
54+
}

0 commit comments

Comments
 (0)