Skip to content

Commit 1bacaa1

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: refactor OpenTelemetry (OTel) instrumentation within the ADK core, moving from manual span management to structured helper classes
### Key Changes * **Structured Instrumentation:** Replaces manual `Tracing.trace` calls and explicit `Scope` management with `Flowable.using` and `Maybe.using` patterns. It introduces helper classes like `AgentInvocation` and `ToolExecution` to encapsulate telemetry logic. * **Metrics Integration:** Adds support for tracking new metrics during agent execution: * `gen_ai.agent.invocation.duration` * `gen_ai.agent.request.size` / `gen_ai.agent.response.size` * `gen_ai.agent.workflow.steps` * **Reactive API Improvements:** Leverages `Tracing.withContext()` and `doOnNext`/`doOnError` hooks within `AgentInvocation` and `ToolExecution` to automatically capture events and errors without polluting the core logic. * **Trace Hierarchy Refinement:** Updates how spans are nested. For example, in `ContextPropagationTest`, the child agent span is now correctly parented to the specific LLM call span that triggered it, rather than the parent agent span. * **Testing:** Significantly enhances `BaseAgentTest` to verify metric collection using `InMemoryMetricReader`. Updates various tests to match the new span naming convention (e.g., removing brackets from tool execution spans). ### Impact This refactor simplifies the core agent and flow logic by removing boilerplate telemetry code, making the instrumentation more robust and easier to maintain while expanding the observability of the ADK through new metrics. PiperOrigin-RevId: 918116236
1 parent 020499b commit 1bacaa1

9 files changed

Lines changed: 166 additions & 82 deletions

File tree

core/src/main/java/com/google/adk/agents/BaseAgent.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,12 @@
2424
import com.google.adk.agents.Callbacks.BeforeAgentCallback;
2525
import com.google.adk.events.Event;
2626
import com.google.adk.plugins.Plugin;
27-
import com.google.adk.telemetry.Tracing;
27+
import com.google.adk.telemetry.Instrumentation.AgentInvocation;
2828
import com.google.adk.utils.AgentEnums.AgentOrigin;
2929
import com.google.common.collect.ImmutableList;
3030
import com.google.errorprone.annotations.CanIgnoreReturnValue;
3131
import com.google.errorprone.annotations.DoNotCall;
3232
import com.google.genai.types.Content;
33-
import io.opentelemetry.context.Context;
3433
import io.reactivex.rxjava3.core.Completable;
3534
import io.reactivex.rxjava3.core.Flowable;
3635
import io.reactivex.rxjava3.core.Maybe;
@@ -322,11 +321,10 @@ public Flowable<Event> runAsync(InvocationContext parentContext) {
322321
private Flowable<Event> run(
323322
InvocationContext parentContext,
324323
Function<InvocationContext, Flowable<Event>> runImplementation) {
325-
Context parentSpanContext = Context.current();
326-
return Flowable.defer(
327-
() -> {
328-
InvocationContext invocationContext = createInvocationContext(parentContext);
329-
324+
return Flowable.using(
325+
() -> new AgentInvocation(createInvocationContext(parentContext), this),
326+
agentInvocation -> {
327+
InvocationContext invocationContext = agentInvocation.getCtx();
330328
Flowable<Event> mainAndAfterEvents =
331329
Flowable.defer(() -> runImplementation.apply(invocationContext))
332330
.concatWith(
@@ -350,14 +348,10 @@ private Flowable<Event> run(
350348
return Flowable.just(beforeEvent).concatWith(mainAndAfterEvents);
351349
})
352350
.switchIfEmpty(mainAndAfterEvents)
353-
.compose(
354-
Tracing.<Event>trace("invoke_agent " + name())
355-
.setParent(parentSpanContext)
356-
.configure(
357-
span ->
358-
Tracing.traceAgentInvocation(
359-
span, name(), description(), invocationContext)));
360-
});
351+
.doOnNext(agentInvocation::addEvent)
352+
.doOnError(agentInvocation::setError);
353+
},
354+
AgentInvocation::close);
361355
}
362356

363357
/**

core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -435,12 +435,10 @@ private Flowable<Event> runOneStep(Context spanContext, InvocationContext contex
435435
"Agent not found: " + agentToTransfer)));
436436
}
437437
return postProcessedEvents.concatWith(
438-
Flowable.defer(
439-
() -> {
440-
try (Scope s = spanContext.makeCurrent()) {
441-
return nextAgent.get().runAsync(context);
442-
}
443-
}));
438+
nextAgent
439+
.get()
440+
.runAsync(context)
441+
.compose(Tracing.withContext(spanContext)));
444442
}
445443
return postProcessedEvents;
446444
});
@@ -622,12 +620,10 @@ public void onError(Throwable e) {
622620
"Agent not found: " + event.actions().transferToAgent().get());
623621
}
624622
Flowable<Event> nextAgentEvents =
625-
Flowable.defer(
626-
() -> {
627-
try (Scope s = spanContext.makeCurrent()) {
628-
return nextAgent.get().runLive(invocationContext);
629-
}
630-
});
623+
nextAgent
624+
.get()
625+
.runLive(invocationContext)
626+
.compose(Tracing.withContext(spanContext));
631627
events = Flowable.concat(events, nextAgentEvents);
632628
}
633629
return events;

core/src/main/java/com/google/adk/flows/llmflows/Functions.java

Lines changed: 35 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import com.google.adk.events.Event;
3030
import com.google.adk.events.EventActions;
3131
import com.google.adk.events.ToolConfirmation;
32+
import com.google.adk.telemetry.Instrumentation;
33+
import com.google.adk.telemetry.Instrumentation.ToolExecution;
3234
import com.google.adk.telemetry.Tracing;
3335
import com.google.adk.tools.BaseTool;
3436
import com.google.adk.tools.FunctionTool;
@@ -417,57 +419,50 @@ private static Maybe<Event> postProcessFunctionResult(
417419
ToolContext toolContext,
418420
boolean isLive,
419421
Context parentContext) {
422+
return Maybe.using(
423+
() ->
424+
Instrumentation.recordToolExecution(
425+
tool, invocationContext.agent(), functionArgs, parentContext),
426+
toolExecution ->
427+
processFunctionResult(
428+
maybeFunctionResult, invocationContext, tool, functionArgs, toolContext, isLive)
429+
.doOnSuccess(event -> toolExecution.context().setFunctionResponseEvent(event))
430+
.doOnError(toolExecution::setError),
431+
ToolExecution::close);
432+
}
433+
434+
private static Maybe<Event> processFunctionResult(
435+
Maybe<Map<String, Object>> maybeFunctionResult,
436+
InvocationContext invocationContext,
437+
BaseTool tool,
438+
Map<String, Object> functionArgs,
439+
ToolContext toolContext,
440+
boolean isLive) {
420441
return maybeFunctionResult
421442
.map(Optional::of)
422443
.defaultIfEmpty(Optional.empty())
423444
.onErrorResumeNext(
424-
t -> {
425-
Maybe<Map<String, Object>> errorCallbackResult =
426-
handleOnToolErrorCallback(invocationContext, tool, functionArgs, toolContext, t);
427-
Maybe<Optional<Map<String, Object>>> mappedResult;
428-
if (isLive) {
429-
// In live mode, handle null results from the error callback gracefully.
430-
mappedResult = errorCallbackResult.map(Optional::ofNullable);
431-
} else {
432-
// In non-live mode, a null result from the error callback will cause an NPE
433-
// when wrapped with Optional.of(), potentially matching prior behavior.
434-
mappedResult = errorCallbackResult.map(Optional::of);
435-
}
436-
return mappedResult.switchIfEmpty(Single.error(t));
437-
})
445+
t ->
446+
handleOnToolErrorCallback(invocationContext, tool, functionArgs, toolContext, t)
447+
.map(res -> isLive ? Optional.ofNullable(res) : Optional.of(res))
448+
.switchIfEmpty(Single.error(t)))
438449
.flatMapMaybe(
439450
optionalInitialResult -> {
440451
Map<String, Object> initialFunctionResult = optionalInitialResult.orElse(null);
441-
442452
return maybeInvokeAfterToolCall(
443453
invocationContext, tool, functionArgs, toolContext, initialFunctionResult)
444454
.map(Optional::of)
445-
.defaultIfEmpty(Optional.ofNullable(initialFunctionResult))
446-
.flatMapMaybe(
447-
finalOptionalResult -> {
448-
Map<String, Object> finalFunctionResult = finalOptionalResult.orElse(null);
449-
if (tool.longRunning() && finalFunctionResult == null) {
450-
return Maybe.empty();
451-
}
452-
Event event =
453-
buildResponseEvent(
454-
tool, finalFunctionResult, toolContext, invocationContext);
455-
return Maybe.just(event);
456-
});
455+
.defaultIfEmpty(Optional.ofNullable(initialFunctionResult));
457456
})
458-
.compose(
459-
Tracing.<Event>trace("execute_tool [" + tool.name() + "]")
460-
.setParent(parentContext)
461-
.onSuccess(
462-
(span, event) ->
463-
Tracing.traceToolExecution(
464-
span,
465-
tool.name(),
466-
tool.description(),
467-
tool.getClass().getSimpleName(),
468-
functionArgs,
469-
event,
470-
null)));
457+
.flatMapMaybe(
458+
finalOptionalResult -> {
459+
if (tool.longRunning() && finalOptionalResult.isEmpty()) {
460+
return Maybe.empty();
461+
}
462+
return Maybe.just(
463+
buildResponseEvent(
464+
tool, finalOptionalResult.orElse(null), toolContext, invocationContext));
465+
});
471466
}
472467

473468
private static Optional<Event> mergeParallelFunctionResponseEvents(

core/src/main/java/com/google/adk/telemetry/Instrumentation.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,13 @@ public static final class ToolExecution extends ClosableTelemetryScope {
160160
private final BaseAgent agent;
161161
private final Map<String, Object> functionArgs;
162162

163-
public ToolExecution(BaseTool tool, BaseAgent agent, Map<String, Object> functionArgs) {
164-
super(Tracing.getTracer().spanBuilder("execute_tool " + tool.name()).startSpan());
163+
public ToolExecution(
164+
BaseTool tool, BaseAgent agent, Map<String, Object> functionArgs, Context parentContext) {
165+
super(
166+
Tracing.getTracer()
167+
.spanBuilder("execute_tool " + tool.name())
168+
.setParent(parentContext)
169+
.startSpan());
165170
this.tool = tool;
166171
this.agent = agent;
167172
this.functionArgs = functionArgs;
@@ -202,6 +207,11 @@ public static AgentInvocation recordAgentInvocation(InvocationContext ctx, BaseA
202207
/** Creates a ToolExecution context to record tool execution telemetry. */
203208
public static ToolExecution recordToolExecution(
204209
BaseTool tool, BaseAgent agent, Map<String, Object> functionArgs) {
205-
return new ToolExecution(tool, agent, functionArgs);
210+
return recordToolExecution(tool, agent, functionArgs, Context.current());
211+
}
212+
213+
public static ToolExecution recordToolExecution(
214+
BaseTool tool, BaseAgent agent, Map<String, Object> functionArgs, Context parentContext) {
215+
return new ToolExecution(tool, agent, functionArgs, parentContext);
206216
}
207217
}

core/src/test/java/com/google/adk/agents/BaseAgentTest.java

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,42 @@
2222
import com.google.adk.agents.Callbacks.AfterAgentCallback;
2323
import com.google.adk.agents.Callbacks.BeforeAgentCallback;
2424
import com.google.adk.events.Event;
25+
import com.google.adk.telemetry.Metrics;
2526
import com.google.adk.testing.TestBaseAgent;
2627
import com.google.adk.testing.TestCallback;
2728
import com.google.adk.testing.TestUtils;
2829
import com.google.common.collect.ImmutableList;
2930
import com.google.genai.types.Content;
3031
import com.google.genai.types.Part;
32+
import io.opentelemetry.api.GlobalOpenTelemetry;
33+
import io.opentelemetry.api.common.AttributeKey;
34+
import io.opentelemetry.api.metrics.Meter;
35+
import io.opentelemetry.sdk.OpenTelemetrySdk;
36+
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
37+
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
38+
import io.opentelemetry.sdk.metrics.data.MetricData;
39+
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
40+
import io.opentelemetry.sdk.testing.time.TestClock;
41+
import io.opentelemetry.sdk.trace.SdkTracerProvider;
3142
import io.reactivex.rxjava3.core.Completable;
3243
import io.reactivex.rxjava3.core.Maybe;
3344
import java.util.List;
3445
import java.util.concurrent.atomic.AtomicBoolean;
46+
import org.junit.After;
47+
import org.junit.Before;
3548
import org.junit.Test;
3649
import org.junit.runner.RunWith;
3750
import org.junit.runners.JUnit4;
3851

3952
@RunWith(JUnit4.class)
4053
public final class BaseAgentTest {
41-
4254
private static final String TEST_AGENT_NAME = "testAgent";
4355
private static final String TEST_AGENT_DESCRIPTION = "A test agent";
4456

57+
private InMemoryMetricReader inMemoryMetricReader;
58+
private TestClock testClock;
59+
private Meter originalMeter;
60+
4561
private static class ClosableTestAgent extends TestBaseAgent {
4662
final AtomicBoolean closed = new AtomicBoolean(false);
4763

@@ -56,6 +72,35 @@ public Completable close() {
5672
}
5773
}
5874

75+
@Before
76+
public void setUp() {
77+
GlobalOpenTelemetry.resetForTest();
78+
testClock = TestClock.create();
79+
inMemoryMetricReader = InMemoryMetricReader.create();
80+
SdkMeterProvider sdkMeterProvider =
81+
SdkMeterProvider.builder()
82+
.registerMetricReader(inMemoryMetricReader)
83+
.setClock(testClock)
84+
.build();
85+
86+
OpenTelemetrySdk openTelemetrySdk =
87+
OpenTelemetrySdk.builder()
88+
.setTracerProvider(SdkTracerProvider.builder().build())
89+
.setMeterProvider(sdkMeterProvider)
90+
.build();
91+
92+
GlobalOpenTelemetry.set(openTelemetrySdk);
93+
originalMeter = GlobalOpenTelemetry.getMeter("gcp.vertex.agent");
94+
Metrics.setMeterForTesting(openTelemetrySdk.getMeter("gcp.vertex.agent"));
95+
}
96+
97+
@After
98+
public void tearDown() {
99+
if (originalMeter != null) {
100+
Metrics.setMeterForTesting(originalMeter);
101+
}
102+
}
103+
59104
@Test
60105
public void constructor_setsNameAndDescription() {
61106
String name = "testName";
@@ -173,6 +218,36 @@ public void runAsync_noCallbacks_invokesRunAsyncImpl() {
173218
assertThat(results).hasSize(1);
174219
assertThat(results.get(0).content()).hasValue(runAsyncImplContent);
175220
assertThat(runAsyncImpl.wasCalled()).isTrue();
221+
MetricData durationMetric = findMetricByName("gen_ai.agent.invocation.duration");
222+
assertThat(durationMetric.getUnit()).isEqualTo("ms");
223+
HistogramPointData durationPoint =
224+
(HistogramPointData) durationMetric.getHistogramData().getPoints().iterator().next();
225+
assertThat(durationPoint.getAttributes().get(AttributeKey.stringKey("gen_ai.agent.name")))
226+
.isEqualTo("testAgent");
227+
228+
MetricData reqSizeMetric = findMetricByName("gen_ai.agent.request.size");
229+
assertThat(reqSizeMetric.getUnit()).isEqualTo("By");
230+
HistogramPointData reqSizePoint =
231+
(HistogramPointData) reqSizeMetric.getHistogramData().getPoints().iterator().next();
232+
assertThat(reqSizePoint.getSum()).isEqualTo(12.0);
233+
assertThat(reqSizePoint.getAttributes().get(AttributeKey.stringKey("gen_ai.agent.name")))
234+
.isEqualTo("testAgent");
235+
236+
MetricData respSizeMetric = findMetricByName("gen_ai.agent.response.size");
237+
assertThat(respSizeMetric.getUnit()).isEqualTo("By");
238+
HistogramPointData respSizePoint =
239+
(HistogramPointData) respSizeMetric.getHistogramData().getPoints().iterator().next();
240+
assertThat(respSizePoint.getSum()).isEqualTo(11.0);
241+
assertThat(respSizePoint.getAttributes().get(AttributeKey.stringKey("gen_ai.agent.name")))
242+
.isEqualTo("testAgent");
243+
244+
MetricData workflowStepsMetric = findMetricByName("gen_ai.agent.workflow.steps");
245+
assertThat(workflowStepsMetric.getUnit()).isEqualTo("1");
246+
HistogramPointData workflowStepsPoint =
247+
(HistogramPointData) workflowStepsMetric.getHistogramData().getPoints().iterator().next();
248+
assertThat(workflowStepsPoint.getSum()).isEqualTo(1.0);
249+
assertThat(workflowStepsPoint.getAttributes().get(AttributeKey.stringKey("gen_ai.agent.name")))
250+
.isEqualTo("testAgent");
176251
}
177252

178253
@Test
@@ -627,4 +702,11 @@ public void close_twoLevelsSubAgents_closesAllSubAgents() {
627702
assertThat(subAgent.closed.get()).isTrue();
628703
assertThat(subSubAgent.closed.get()).isTrue();
629704
}
705+
706+
private MetricData findMetricByName(String name) {
707+
return inMemoryMetricReader.collectAllMetrics().stream()
708+
.filter(m -> m.getName().equals(name))
709+
.findFirst()
710+
.orElseThrow(() -> new AssertionError("Metric not found: " + name));
711+
}
630712
}

core/src/test/java/com/google/adk/agents/LlmAgentTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ public void runAsync_withTools_createsToolSpans() throws InterruptedException {
494494
List<SpanData> spans = openTelemetryRule.getSpans();
495495
SpanData agentSpan = findSpanByName(spans, "invoke_agent test agent");
496496
List<SpanData> llmSpans = findSpansByName(spans, "call_llm");
497-
List<SpanData> toolSpans = findSpansByName(spans, "execute_tool [echo_tool]");
497+
List<SpanData> toolSpans = findSpansByName(spans, "execute_tool echo_tool");
498498

499499
assertThat(llmSpans).hasSize(2);
500500
assertThat(toolSpans).hasSize(1);

core/src/test/java/com/google/adk/runner/RunnerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,7 +1330,7 @@ public void runAsync_createsToolSpansWithCorrectParent() {
13301330
List<SpanData> spans = openTelemetryRule.getSpans();
13311331
List<SpanData> llmSpans = spans.stream().filter(s -> s.getName().equals("call_llm")).toList();
13321332
List<SpanData> toolSpans =
1333-
spans.stream().filter(s -> s.getName().equals("execute_tool [echo_tool]")).toList();
1333+
spans.stream().filter(s -> s.getName().equals("execute_tool echo_tool")).toList();
13341334

13351335
assertThat(llmSpans).hasSize(2);
13361336
assertThat(toolSpans).hasSize(1);
@@ -1365,7 +1365,7 @@ public void runLive_createsToolSpansWithCorrectParent() throws Exception {
13651365
List<SpanData> spans = openTelemetryRule.getSpans();
13661366
List<SpanData> llmSpans = spans.stream().filter(s -> s.getName().equals("call_llm")).toList();
13671367
List<SpanData> toolSpans =
1368-
spans.stream().filter(s -> s.getName().equals("execute_tool [echo_tool]")).toList();
1368+
spans.stream().filter(s -> s.getName().equals("execute_tool echo_tool")).toList();
13691369

13701370
// In runLive, there is one call_llm span for the execution
13711371
assertThat(llmSpans).hasSize(1);

0 commit comments

Comments
 (0)