Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public static void start(Instrumentation inst, SharedCommunicationObjects sco) {
return;
}

if (!config.isTraceEnabled()) {
LOGGER.debug("LLM Observability is disabled: tracing is disabled");
return;
}

sco.createRemaining(config);

String mlApp = config.getLlmObsMlApp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTraceApiInfo;
import datadog.trace.api.DDTraceId;
import datadog.trace.api.ProductTraceSource;
import datadog.trace.api.WellKnownTags;
import datadog.trace.api.internal.TraceSegment;
import datadog.trace.api.llmobs.LLMObs;
import datadog.trace.api.llmobs.LLMObsContext;
import datadog.trace.api.llmobs.LLMObsSpan;
Expand Down Expand Up @@ -110,6 +112,12 @@ public DDLLMObsSpan(
}
span.setTag(LLMOBS_TAG_PREFIX + PARENT_ID_TAG_INTERNAL, parentSpanID);
scope = LLMObsContext.attach(span.context());

// Mark this span as originating from LLM Observability product
TraceSegment segment = AgentTracer.get().getTraceSegment();
if (segment != null) {
segment.setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.LLMOBS);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package datadog.trace.llmobs

import datadog.communication.ddagent.SharedCommunicationObjects
import datadog.trace.test.util.DDSpecification
import okhttp3.HttpUrl

class LLMObsSystemTest extends DDSpecification {

void 'start disabled when llmobs is disabled'() {
setup:
injectSysConfig('llmobs.enabled', 'false')
rebuildConfig()
final inst = Mock(java.lang.instrument.Instrumentation)
final sco = Mock(SharedCommunicationObjects)

when:
LLMObsSystem.start(inst, sco)

then:
0 * sco._
}

void 'start disabled when trace is disabled'() {
setup:
injectSysConfig('llmobs.enabled', 'true')
injectSysConfig('trace.enabled', 'false')
rebuildConfig()
final inst = Mock(java.lang.instrument.Instrumentation)
final sco = Mock(SharedCommunicationObjects)

when:
LLMObsSystem.start(inst, sco)

then:
0 * sco._
}

void 'start enabled when apm tracing disabled but llmobs enabled'() {
setup:
injectSysConfig('llmobs.enabled', 'true')
injectSysConfig('apm.tracing.enabled', 'false')
rebuildConfig()
final inst = Mock(java.lang.instrument.Instrumentation)
final sco = Mock(SharedCommunicationObjects)
sco.agentUrl = HttpUrl.parse('http://localhost:8126')

when:
LLMObsSystem.start(inst, sco)

then:
1 * sco.createRemaining(_)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package datadog.smoketest.apmtracingdisabled;

import datadog.trace.api.llmobs.LLMObs;
import datadog.trace.api.llmobs.LLMObsSpan;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
Expand Down Expand Up @@ -73,6 +75,17 @@ public void write(
}
}

@GetMapping("/llmobs/test")
public String llmobsTest() {
// Create LLMObs span using public API
LLMObsSpan llmSpan =
LLMObs.startLLMSpan("llmobs-test-operation", "gpt-4", "openai", null, null);
llmSpan.annotateIO("test input", "test output");
llmSpan.finish();

return "LLMObs test completed";
}

private String forceKeepSpan() {
final Span span = GlobalTracer.get().activeSpan();
if (span != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package datadog.smoketest.apmtracingdisabled

import datadog.trace.api.sampling.PrioritySampling
import okhttp3.Request

class LlmObsApmDisabledSmokeTest extends AbstractApmTracingDisabledSmokeTest {

static final String LLMOBS_SERVICE_NAME = "llmobs-apm-disabled-test"

static final String[] LLMOBS_APM_DISABLED_PROPERTIES = [
"-Ddd.apm.tracing.enabled=false",
"-Ddd.llmobs.enabled=true",
"-Ddd.llmobs.ml-app=test-app",
"-Ddd.service.name=${LLMOBS_SERVICE_NAME}",
]

@Override
ProcessBuilder createProcessBuilder() {
return createProcess(LLMOBS_APM_DISABLED_PROPERTIES)
}

void 'When APM disabled and LLMObs enabled, LLMObs spans should be kept and APM spans should be dropped'() {
setup:
final llmobsUrl = "http://localhost:${httpPort}/rest-api/llmobs/test"
final llmobsRequest = new Request.Builder().url(llmobsUrl).get().build()

final apmUrl = "http://localhost:${httpPort}/rest-api/greetings"
final apmRequest = new Request.Builder().url(apmUrl).get().build()

when: "Create LLMObs span"
final llmobsResponse = client.newCall(llmobsRequest).execute()

then: "LLMObs request should succeed"
llmobsResponse.successful

when: "Create regular APM span"
final apmResponse = client.newCall(apmRequest).execute()

then: "APM request should succeed"
apmResponse.successful

and: "Wait for traces"
waitForTraceCount(2)

and: "LLMObs trace should be kept (SAMPLER_KEEP)"
def llmobsTrace = traces.find { trace ->
trace.spans.find { span ->
span.meta["http.url"] == llmobsUrl
}
}
assert llmobsTrace != null
// The LLMObs child span should have LLMObs tags
def llmobsChildSpan = llmobsTrace.spans.find { span ->
span.meta["_ml_obs_tag.model_name"] == "gpt-4"
}
assert llmobsChildSpan != null : "LLMObs child span with model_name=gpt-4 should exist"

and: "Regular APM trace should be dropped (SAMPLER_DROP)"
def apmTrace = traces.find { trace ->
trace.spans.find { span ->
span.meta["http.url"] == apmUrl
}
}
assert apmTrace != null
checkRootSpanPrioritySampling(apmTrace, PrioritySampling.SAMPLER_DROP)

and: "No NPE or errors in logs"
!isLogPresent { it.contains("NullPointerException") }
!isLogPresent { it.contains("ERROR") }
}

void 'LLMObs spans should have PROPAGATED_TRACE_SOURCE tag set'() {
setup:
final llmobsUrl = "http://localhost:${httpPort}/rest-api/llmobs/test"
final llmobsRequest = new Request.Builder().url(llmobsUrl).get().build()

when:
final response = client.newCall(llmobsRequest).execute()

then:
response.successful
waitForTraceCount(1)

and: "LLMObs span should be created successfully"
def trace = traces[0]
assert trace != null
def llmobsSpan = trace.spans.find { span ->
span.meta["_ml_obs_tag.model_name"] == "gpt-4"
}
assert llmobsSpan != null : "LLMObs span with model_name should exist"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package datadog.smoketest.apmtracingdisabled

import okhttp3.Request

class LlmObsTraceDisabledSmokeTest extends AbstractApmTracingDisabledSmokeTest {

static final String[] LLMOBS_TRACE_DISABLED_PROPERTIES = [
"-Ddd.trace.enabled=false",
"-Ddd.llmobs.enabled=true",
"-Ddd.llmobs.ml-app=test-app",
"-Ddd.service.name=llmobs-trace-disabled-test",
]

@Override
ProcessBuilder createProcessBuilder() {
return createProcess(LLMOBS_TRACE_DISABLED_PROPERTIES)
}

void 'DD_TRACE_ENABLED=false with DD_LLMOBS_ENABLED=true should disable LLMObs gracefully'() {
setup:
final llmobsUrl = "http://localhost:${httpPort}/rest-api/llmobs/test"
final llmobsRequest = new Request.Builder().url(llmobsUrl).get().build()

when: "Call LLMObs endpoint"
final response = client.newCall(llmobsRequest).execute()

then: "Request should succeed"
response.successful
response.code() == 200

and: "LLMObs disabled message in logs"
isLogPresent { it.contains("LLM Observability is disabled: tracing is disabled") }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package datadog.trace.common.sampling;

import static datadog.trace.api.sampling.PrioritySampling.SAMPLER_DROP;
import static datadog.trace.api.sampling.PrioritySampling.SAMPLER_KEEP;

import datadog.trace.api.ProductTraceSource;
import datadog.trace.api.sampling.SamplingMechanism;
import datadog.trace.core.CoreSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This sampler is used when APM tracing is disabled but LLM Observability is enabled. Unlike ASM
* standalone mode which only needs 1 trace per minute for billing/catalog purposes, LLM
* Observability needs to capture all LLM interactions to track costs, latency, and quality metrics.
* Therefore, this sampler keeps all LLMOBS traces and drops all APM-only traces.
*/
public class LlmObsStandaloneSampler implements Sampler, PrioritySampler {

private static final Logger log = LoggerFactory.getLogger(LlmObsStandaloneSampler.class);

@Override
public <T extends CoreSpan<T>> boolean sample(final T span) {
// Priority sampling sends all traces to the core agent, including traces marked dropped.
// This allows the core agent to collect stats on all traces.
return true;
}

@Override
public <T extends CoreSpan<T>> void setSamplingPriority(final T span) {
// Only keep traces that have the LLMOBS product flag
// Drop regular APM traces when APM tracing is disabled
T rootSpan = span.getLocalRootSpan();
if (rootSpan instanceof datadog.trace.core.DDSpan) {
datadog.trace.core.DDSpan ddRootSpan = (datadog.trace.core.DDSpan) rootSpan;
int traceSource = ddRootSpan.context().getPropagationTags().getTraceSource();
if (ProductTraceSource.isProductMarked(traceSource, ProductTraceSource.LLMOBS)) {
log.debug("Set SAMPLER_KEEP for LLMObs span {}", span.getSpanId());
span.setSamplingPriority(SAMPLER_KEEP, SamplingMechanism.DEFAULT);
return;
}
}
// Drop APM-only traces when APM tracing is disabled
log.debug("Set SAMPLER_DROP for APM-only span {}", span.getSpanId());
span.setSamplingPriority(SAMPLER_DROP, SamplingMechanism.DEFAULT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,17 @@ final class Builder {
public static Sampler forConfig(final Config config, final TraceConfig traceConfig) {
Sampler sampler;
if (config != null) {
if (!config.isApmTracingEnabled() && isAsmEnabled(config)) {
log.debug("APM is disabled. Only 1 trace per minute will be sent.");
return new AsmStandaloneSampler(Clock.systemUTC());
if (!config.isApmTracingEnabled()) {
if (config.isLlmObsEnabled()) {
log.debug("APM is disabled, but LLMObs is enabled. All LLMObs traces will be kept.");
return new LlmObsStandaloneSampler();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears this would break ASM. If ASM is enabled, we still need to pass, at least, 1 APM/ASM per minute.

} else if (isAsmEnabled(config)) {
log.debug("APM is disabled, but ASM is enabled. Only 1 trace per minute will be sent.");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.debug("APM is disabled, but ASM is enabled. Only 1 trace per minute will be sent.");
log.debug("APM is disabled, but ASM is enabled. Only 1 APM trace per minute will be sent, all ASM traces will be kept.");

From the rest of the comments in the PR, it seems this was not obvious.

return new AsmStandaloneSampler(Clock.systemUTC());
}
// APM disabled and no other products enabled - drop all APM traces
log.debug("APM is disabled. All APM traces will be dropped.");
return new ForcePrioritySampler(PrioritySampling.SAMPLER_DROP, SamplingMechanism.DEFAULT);
}
final Map<String, String> serviceRules = config.getTraceSamplingServiceRules();
final Map<String, String> operationRules = config.getTraceSamplingOperationRules();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ public void setSamplingPriorityIfNecessary() {
DDSpan rootSpan = getRootSpan();
if (traceConfig.sampler instanceof PrioritySampler && rootSpan != null) {
// Ignore the force-keep priority in the absence of propagated _dd.p.ts span tag marked for
// ASM.
// ASM or LLMOBS.
if ((!Config.get().isApmTracingEnabled()
&& !ProductTraceSource.isProductMarked(
rootSpan.context().getPropagationTags().getTraceSource(), ProductTraceSource.ASM))
rootSpan.context().getPropagationTags().getTraceSource(), ProductTraceSource.ASM)
&& !ProductTraceSource.isProductMarked(
rootSpan.context().getPropagationTags().getTraceSource(),
ProductTraceSource.LLMOBS))
|| rootSpan.context().getSamplingPriority() == PrioritySampling.UNSET) {
((PrioritySampler) traceConfig.sampler).setSamplingPriority(rootSpan);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package datadog.trace.common.sampling

import datadog.trace.api.ProductTraceSource
import datadog.trace.bootstrap.instrumentation.api.Tags
import datadog.trace.common.writer.ListWriter
import datadog.trace.core.test.DDCoreSpecification
import datadog.trace.api.sampling.PrioritySampling

class LlmObsStandaloneSamplerTest extends DDCoreSpecification {

def writer = new ListWriter()

void "test LLMOBS spans are kept"() {
setup:
def sampler = new LlmObsStandaloneSampler()
def tracer = tracerBuilder().writer(writer).sampler(sampler).build()

when: "LLMOBS span"
def span = tracer.buildSpan("llm-call").start()
def scope = tracer.activateSpan(span)
tracer.getTraceSegment().setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.LLMOBS)
sampler.setSamplingPriority(span)
scope.close()

then:
span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP

cleanup:
tracer.close()
}

void "test APM-only spans are dropped"() {
setup:
def sampler = new LlmObsStandaloneSampler()
def tracer = tracerBuilder().writer(writer).sampler(sampler).build()

when: "APM-only span (no LLMOBS flag)"
def span = tracer.buildSpan("http-request").start()
sampler.setSamplingPriority(span)

then:
span.getSamplingPriority() == PrioritySampling.SAMPLER_DROP

cleanup:
tracer.close()
}

void "test sample method always returns true"() {
setup:
def sampler = new LlmObsStandaloneSampler()
def tracer = tracerBuilder().writer(writer).sampler(sampler).build()

when:
def span = tracer.buildSpan("test").start()

then:
sampler.sample(span) == true

cleanup:
tracer.close()
}
}
Loading