Skip to content

Commit 8692012

Browse files
Improve tracing interceptor
1 parent 1036823 commit 8692012

11 files changed

Lines changed: 296 additions & 40 deletions

File tree

temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingClientInterceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public WorkflowClientCallsInterceptor workflowClientCallsInterceptor(
3131
}
3232

3333
@Override
34-
public NexusServiceClientCallsInterceptor nexusServiceClientInterceptor(
34+
public NexusServiceClientCallsInterceptor nexusServiceClientCallsInterceptor(
3535
NexusServiceClientCallsInterceptor next) {
3636
return new OpenTracingNexusServiceClientCallsInterceptor(
3737
next, options, spanFactory, contextAccessor);

temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,13 @@ public enum SpanOperationType {
1717
HANDLE_UPDATE("HandleUpdate"),
1818
START_NEXUS_OPERATION("StartNexusOperation"),
1919
RUN_START_NEXUS_OPERATION("RunStartNexusOperationHandler"),
20-
RUN_CANCEL_NEXUS_OPERATION("RunCancelNexusOperationHandler");
20+
RUN_CANCEL_NEXUS_OPERATION("RunCancelNexusOperationHandler"),
21+
RUN_FETCH_NEXUS_OPERATION_INFO("RunFetchNexusOperationInfoHandler"),
22+
RUN_FETCH_NEXUS_OPERATION_RESULT("RunFetchNexusOperationResultHandler"),
23+
CLIENT_START_NEXUS_OPERATION("ClientStartNexusOperation"),
24+
CLIENT_CANCEL_NEXUS_OPERATION("ClientCancelNexusOperation"),
25+
CLIENT_FETCH_NEXUS_OPERATION_INFO("ClientFetchNexusOperationInfo"),
26+
CLIENT_FETCH_NEXUS_OPERATION_RESULT("ClientFetchNexusOperationResult");
2127

2228
private final String defaultPrefix;
2329

temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,13 @@ protected Map<String, String> getSpanTags(SpanCreationContext context) {
8686
StandardTagNames.RUN_ID, context.getRunId());
8787
case RUN_START_NEXUS_OPERATION:
8888
case RUN_CANCEL_NEXUS_OPERATION:
89+
case RUN_FETCH_NEXUS_OPERATION_INFO:
90+
case RUN_FETCH_NEXUS_OPERATION_RESULT:
8991
case HANDLE_QUERY:
92+
case CLIENT_START_NEXUS_OPERATION:
93+
case CLIENT_CANCEL_NEXUS_OPERATION:
94+
case CLIENT_FETCH_NEXUS_OPERATION_INFO:
95+
case CLIENT_FETCH_NEXUS_OPERATION_RESULT:
9096
return ImmutableMap.of();
9197
}
9298
throw new IllegalArgumentException("Unknown span operation type provided");

temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingNexusOperationInboundCallsInterceptor.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.temporal.opentracing.internal;
22

33
import io.nexusrpc.OperationException;
4+
import io.nexusrpc.OperationStillRunningException;
45
import io.opentracing.Scope;
56
import io.opentracing.Span;
67
import io.opentracing.SpanContext;
@@ -73,4 +74,48 @@ public CancelOperationOutput cancelOperation(CancelOperationInput input) {
7374
operationCancelSpan.finish();
7475
}
7576
}
77+
78+
@Override
79+
public FetchOperationResultOutput fetchOperationResult(FetchOperationResultInput input)
80+
throws OperationException, OperationStillRunningException {
81+
SpanContext rootSpanContext =
82+
contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer);
83+
84+
Span operationFetchResultSpan =
85+
spanFactory
86+
.createFetchNexusOperationResultSpan(
87+
tracer,
88+
input.getOperationContext().getService(),
89+
input.getOperationContext().getOperation(),
90+
rootSpanContext)
91+
.start();
92+
try (Scope scope = tracer.scopeManager().activate(operationFetchResultSpan)) {
93+
return super.fetchOperationResult(input);
94+
} catch (Throwable t) {
95+
spanFactory.logFail(operationFetchResultSpan, t);
96+
throw t;
97+
} finally {
98+
operationFetchResultSpan.finish();
99+
}
100+
}
101+
102+
@Override
103+
public FetchOperationInfoResponse fetchOperationInfo(FetchOperationInfoInput input) {
104+
SpanContext rootSpanContext =
105+
contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer);
106+
107+
Span operationFetchInfoSpan =
108+
spanFactory
109+
.createFetchNexusOperationInfoSpan(
110+
tracer,
111+
input.getOperationContext().getService(),
112+
input.getOperationContext().getOperation(),
113+
rootSpanContext)
114+
.start();
115+
try (Scope scope = tracer.scopeManager().activate(operationFetchInfoSpan)) {
116+
return super.fetchOperationInfo(input);
117+
} finally {
118+
operationFetchInfoSpan.finish();
119+
}
120+
}
76121
}

temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingNexusServiceClientCallsInterceptor.java

Lines changed: 86 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ public StartOperationOutput startOperation(StartOperationInput input) throws Ope
3838
contextAccessor.writeSpanContextToHeader(
3939
() ->
4040
spanFactory
41-
.createStartNexusOperationSpan(
42-
tracer, input.getServiceName(), input.getOperationName(), null, null)
41+
.createClientStartNexusOperationSpan(
42+
tracer, input.getServiceName(), input.getOperationName())
4343
.start(),
4444
input.getOptions().getHeaders(),
4545
tracer);
@@ -59,8 +59,8 @@ public CompletableFuture<StartOperationOutput> startOperationAsync(StartOperatio
5959
contextAccessor.writeSpanContextToHeader(
6060
() ->
6161
spanFactory
62-
.createStartNexusOperationSpan(
63-
tracer, input.getServiceName(), input.getOperationName(), null, null)
62+
.createClientStartNexusOperationSpan(
63+
tracer, input.getServiceName(), input.getOperationName())
6464
.start(),
6565
input.getOptions().getHeaders(),
6666
tracer);
@@ -82,8 +82,8 @@ public CancelOperationOutput cancelOperation(CancelOperationInput input) {
8282
contextAccessor.writeSpanContextToHeader(
8383
() ->
8484
spanFactory
85-
.createCancelNexusOperationSpan(
86-
tracer, input.getServiceName(), input.getOperationName(), null)
85+
.createClientCancelNexusOperationSpan(
86+
tracer, input.getServiceName(), input.getOperationName())
8787
.start(),
8888
input.getOptions().getHeaders(),
8989
tracer);
@@ -103,8 +103,8 @@ public CompletableFuture<CancelOperationOutput> cancelOperationAsync(CancelOpera
103103
contextAccessor.writeSpanContextToHeader(
104104
() ->
105105
spanFactory
106-
.createCancelNexusOperationSpan(
107-
tracer, input.getServiceName(), input.getOperationName(), null)
106+
.createClientCancelNexusOperationSpan(
107+
tracer, input.getServiceName(), input.getOperationName())
108108
.start(),
109109
input.getOptions().getHeaders(),
110110
tracer);
@@ -123,14 +123,47 @@ public CompletableFuture<CancelOperationOutput> cancelOperationAsync(CancelOpera
123123
@Override
124124
public FetchOperationResultOutput fetchOperationResult(FetchOperationResultInput input)
125125
throws OperationException, OperationStillRunningException {
126-
propagate(input.getOptions().getHeaders());
127-
return super.fetchOperationResult(input);
126+
Span span =
127+
contextAccessor.writeSpanContextToHeader(
128+
() ->
129+
spanFactory
130+
.createClientFetchNexusOperationResultSpan(
131+
tracer,
132+
input.getServiceName(),
133+
input.getOperationName(),
134+
input.getOperationToken())
135+
.start(),
136+
input.getOptions().getHeaders(),
137+
tracer);
138+
try (Scope ignored = tracer.scopeManager().activate(span)) {
139+
return super.fetchOperationResult(input);
140+
} catch (Throwable t) {
141+
spanFactory.logFail(span, t);
142+
throw t;
143+
} finally {
144+
span.finish();
145+
}
128146
}
129147

130148
@Override
131149
public FetchOperationInfoOutput fetchOperationInfo(FetchOperationInfoInput input) {
132-
propagate(input.getOptions().getHeaders());
133-
return super.fetchOperationInfo(input);
150+
Span span =
151+
contextAccessor.writeSpanContextToHeader(
152+
() ->
153+
spanFactory
154+
.createClientFetchNexusOperationInfoSpan(
155+
tracer, input.getServiceName(), input.getOperationName())
156+
.start(),
157+
input.getOptions().getHeaders(),
158+
tracer);
159+
try (Scope ignored = tracer.scopeManager().activate(span)) {
160+
return super.fetchOperationInfo(input);
161+
} catch (Throwable t) {
162+
spanFactory.logFail(span, t);
163+
throw t;
164+
} finally {
165+
span.finish();
166+
}
134167
}
135168

136169
@Override
@@ -142,15 +175,52 @@ public CompleteOperationOutput completeOperation(CompleteOperationInput input) {
142175
@Override
143176
public CompletableFuture<FetchOperationResultOutput> fetchOperationResultAsync(
144177
FetchOperationResultInput input) {
145-
propagate(input.getOptions().getHeaders());
146-
return super.fetchOperationResultAsync(input);
178+
Span span =
179+
contextAccessor.writeSpanContextToHeader(
180+
() ->
181+
spanFactory
182+
.createClientFetchNexusOperationResultSpan(
183+
tracer,
184+
input.getServiceName(),
185+
input.getOperationName(),
186+
input.getOperationToken())
187+
.start(),
188+
input.getOptions().getHeaders(),
189+
tracer);
190+
try (Scope ignored = tracer.scopeManager().activate(span)) {
191+
return super.fetchOperationResultAsync(input)
192+
.whenComplete(
193+
(r, e) -> {
194+
if (e != null) {
195+
spanFactory.logFail(span, e);
196+
}
197+
span.finish();
198+
});
199+
}
147200
}
148201

149202
@Override
150203
public CompletableFuture<FetchOperationInfoOutput> fetchOperationInfoAsync(
151204
FetchOperationInfoInput input) {
152-
propagate(input.getOptions().getHeaders());
153-
return super.fetchOperationInfoAsync(input);
205+
Span span =
206+
contextAccessor.writeSpanContextToHeader(
207+
() ->
208+
spanFactory
209+
.createClientFetchNexusOperationInfoSpan(
210+
tracer, input.getServiceName(), input.getOperationName())
211+
.start(),
212+
input.getOptions().getHeaders(),
213+
tracer);
214+
try (Scope ignored = tracer.scopeManager().activate(span)) {
215+
return super.fetchOperationInfoAsync(input)
216+
.whenComplete(
217+
(r, e) -> {
218+
if (e != null) {
219+
spanFactory.logFail(span, e);
220+
}
221+
span.finish();
222+
});
223+
}
154224
}
155225

156226
@Override

temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,46 @@ public Tracer.SpanBuilder createChildWorkflowStartSpan(
6969
return createSpan(context, tracer, null, References.CHILD_OF);
7070
}
7171

72+
public Tracer.SpanBuilder createClientStartNexusOperationSpan(
73+
Tracer tracer, String serviceName, String operationName) {
74+
SpanCreationContext context =
75+
SpanCreationContext.newBuilder()
76+
.setSpanOperationType(SpanOperationType.CLIENT_START_NEXUS_OPERATION)
77+
.setActionName(serviceName + "/" + operationName)
78+
.build();
79+
return createSpan(context, tracer, null, References.CHILD_OF);
80+
}
81+
82+
public Tracer.SpanBuilder createClientCancelNexusOperationSpan(
83+
Tracer tracer, String serviceName, String operationName) {
84+
SpanCreationContext context =
85+
SpanCreationContext.newBuilder()
86+
.setSpanOperationType(SpanOperationType.CLIENT_CANCEL_NEXUS_OPERATION)
87+
.setActionName(serviceName + "/" + operationName)
88+
.build();
89+
return createSpan(context, tracer, null, References.CHILD_OF);
90+
}
91+
92+
public Tracer.SpanBuilder createClientFetchNexusOperationInfoSpan(
93+
Tracer tracer, String serviceName, String operationName) {
94+
SpanCreationContext context =
95+
SpanCreationContext.newBuilder()
96+
.setSpanOperationType(SpanOperationType.CLIENT_FETCH_NEXUS_OPERATION_INFO)
97+
.setActionName(serviceName + "/" + operationName)
98+
.build();
99+
return createSpan(context, tracer, null, References.CHILD_OF);
100+
}
101+
102+
public Tracer.SpanBuilder createClientFetchNexusOperationResultSpan(
103+
Tracer tracer, String serviceName, String operationName, String operationToken) {
104+
SpanCreationContext context =
105+
SpanCreationContext.newBuilder()
106+
.setSpanOperationType(SpanOperationType.CLIENT_FETCH_NEXUS_OPERATION_RESULT)
107+
.setActionName(serviceName + "/" + operationName)
108+
.build();
109+
return createSpan(context, tracer, null, References.CHILD_OF);
110+
}
111+
72112
public Tracer.SpanBuilder createExternalWorkflowSignalSpan(
73113
Tracer tracer, String signalName, String workflowId, String runId) {
74114
SpanCreationContext context =
@@ -185,6 +225,26 @@ public Tracer.SpanBuilder createCancelNexusOperationSpan(
185225
return createSpan(context, tracer, nexusStartSpanContext, References.FOLLOWS_FROM);
186226
}
187227

228+
public Tracer.SpanBuilder createFetchNexusOperationResultSpan(
229+
Tracer tracer, String serviceName, String operationName, SpanContext nexusStartSpanContext) {
230+
SpanCreationContext context =
231+
SpanCreationContext.newBuilder()
232+
.setSpanOperationType(SpanOperationType.RUN_FETCH_NEXUS_OPERATION_RESULT)
233+
.setActionName(serviceName + "/" + operationName)
234+
.build();
235+
return createSpan(context, tracer, nexusStartSpanContext, References.FOLLOWS_FROM);
236+
}
237+
238+
public Tracer.SpanBuilder createFetchNexusOperationInfoSpan(
239+
Tracer tracer, String serviceName, String operationName, SpanContext nexusStartSpanContext) {
240+
SpanCreationContext context =
241+
SpanCreationContext.newBuilder()
242+
.setSpanOperationType(SpanOperationType.RUN_FETCH_NEXUS_OPERATION_INFO)
243+
.setActionName(serviceName + "/" + operationName)
244+
.build();
245+
return createSpan(context, tracer, nexusStartSpanContext, References.FOLLOWS_FROM);
246+
}
247+
188248
public Tracer.SpanBuilder createWorkflowStartUpdateSpan(
189249
Tracer tracer, String updateName, String workflowId, String runId) {
190250
SpanCreationContext context =

0 commit comments

Comments
 (0)