diff --git a/dynamic-control/src/main/java/io/opentelemetry/contrib/dynamic/policy/PolicyImplementer.java b/dynamic-control/src/main/java/io/opentelemetry/contrib/dynamic/policy/PolicyImplementer.java new file mode 100644 index 000000000..0d97da07f --- /dev/null +++ b/dynamic-control/src/main/java/io/opentelemetry/contrib/dynamic/policy/PolicyImplementer.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.dynamic.policy; + +import java.util.List; + +/** + * Applies validated telemetry policies to runtime components. + * + *

Implementers are notified when policies change and are responsible for translating those + * policies into concrete runtime behavior (for example, updating a sampler). Implementers also + * declare the {@link PolicyValidator}s they support so that only relevant policies are delivered to + * them. + */ +public interface PolicyImplementer { + + /** + * Called when the relevant policies have changed. + * + *

Implementers should treat the provided list as authoritative for their policy types and + * update runtime state accordingly. + * + *

The upstream policy pipeline is assumed to have already merged policies for prioritization + * and conflict resolution. The provided list is expected to be consistent and not contain + * conflicting policies for the same type. + * + * @param policies the set of policies that apply to this implementer + */ + void onPoliciesChanged(List policies); + + /** + * Returns the validators that this implementer supports. + * + *

These validators define which policy types and aliases the implementer can accept and + * process. + * + * @return the list of supported validators + */ + List getValidators(); +} diff --git a/dynamic-control/src/main/java/io/opentelemetry/contrib/dynamic/policy/TraceSamplingRatePolicyImplementer.java b/dynamic-control/src/main/java/io/opentelemetry/contrib/dynamic/policy/TraceSamplingRatePolicyImplementer.java new file mode 100644 index 000000000..18b38c90b --- /dev/null +++ b/dynamic-control/src/main/java/io/opentelemetry/contrib/dynamic/policy/TraceSamplingRatePolicyImplementer.java @@ -0,0 +1,76 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.dynamic.policy; + +import com.fasterxml.jackson.databind.JsonNode; +import io.opentelemetry.contrib.dynamic.sampler.DelegatingSampler; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Implements the {@code trace-sampling} policy by updating a {@link DelegatingSampler}. + * + *

This implementer listens for validated {@link TelemetryPolicy} updates of type {@code + * "trace-sampling"} and applies the {@code probability} field to the delegate sampler using {@link + * Sampler#traceIdRatioBased(double)} wrapped by {@link Sampler#parentBased(Sampler)}. + * + *

If the policy spec is {@code null} (policy removal), the delegate falls back to {@link + * Sampler#alwaysOn()}. + * + *

Validation is performed by {@link TraceSamplingValidator}; this implementer only consumes + * policies produced by that validator. + * + *

Policies with a non-null spec that omit {@code probability} are ignored, which should not + * occur if validation is functioning correctly. + * + *

This class is thread-safe. Calls to {@link #onPoliciesChanged(List)} can occur concurrently + * with sampling operations on the associated {@link DelegatingSampler}. + */ +public final class TraceSamplingRatePolicyImplementer implements PolicyImplementer { + + private static final String TRACE_SAMPLING_TYPE = "trace-sampling"; + private static final String PROBABILITY_FIELD = "probability"; + private static final List VALIDATORS = + Collections.singletonList(new TraceSamplingValidator()); + + private final DelegatingSampler delegatingSampler; + + /** + * Creates a new implementer that updates the provided {@link DelegatingSampler}. + * + * @param delegatingSampler the sampler to update when policies change + */ + public TraceSamplingRatePolicyImplementer(DelegatingSampler delegatingSampler) { + Objects.requireNonNull(delegatingSampler, "delegatingSampler cannot be null"); + this.delegatingSampler = delegatingSampler; + } + + @Override + public List getValidators() { + return VALIDATORS; + } + + @Override + public void onPoliciesChanged(List policies) { + for (TelemetryPolicy policy : policies) { + if (!TRACE_SAMPLING_TYPE.equals(policy.getType())) { + continue; + } + JsonNode spec = policy.getSpec(); + if (spec == null) { + delegatingSampler.setDelegate(Sampler.alwaysOn()); + continue; + } + if (spec.has(PROBABILITY_FIELD)) { + double ratio = spec.get(PROBABILITY_FIELD).asDouble(1.0); + Sampler sampler = Sampler.parentBased(Sampler.traceIdRatioBased(ratio)); + delegatingSampler.setDelegate(sampler); + } + } + } +} diff --git a/dynamic-control/src/test/java/io/opentelemetry/contrib/dynamic/policy/TraceSamplingRatePolicyImplementerTest.java b/dynamic-control/src/test/java/io/opentelemetry/contrib/dynamic/policy/TraceSamplingRatePolicyImplementerTest.java new file mode 100644 index 000000000..729dd88e6 --- /dev/null +++ b/dynamic-control/src/test/java/io/opentelemetry/contrib/dynamic/policy/TraceSamplingRatePolicyImplementerTest.java @@ -0,0 +1,107 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.dynamic.policy; + +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.contrib.dynamic.sampler.DelegatingSampler; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingDecision; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.jupiter.api.Test; + +class TraceSamplingRatePolicyImplementerTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Test + void nullSpecFallsBackToAlwaysOn() { + DelegatingSampler delegatingSampler = new DelegatingSampler(Sampler.alwaysOff()); + TraceSamplingRatePolicyImplementer implementer = + new TraceSamplingRatePolicyImplementer(delegatingSampler); + + implementer.onPoliciesChanged(singletonList(new TelemetryPolicy("trace-sampling", null))); + + assertThat(decisionFor(delegatingSampler)).isEqualTo(SamplingDecision.RECORD_AND_SAMPLE); + } + + @Test + void appliesProbabilityToDelegate() { + DelegatingSampler delegatingSampler = new DelegatingSampler(Sampler.alwaysOff()); + TraceSamplingRatePolicyImplementer implementer = + new TraceSamplingRatePolicyImplementer(delegatingSampler); + + implementer.onPoliciesChanged( + singletonList(new TelemetryPolicy("trace-sampling", spec("probability", 1.0)))); + + assertThat(decisionFor(delegatingSampler)).isEqualTo(SamplingDecision.RECORD_AND_SAMPLE); + } + + @Test + void ignoresUnrelatedPolicyTypes() { + DelegatingSampler delegatingSampler = new DelegatingSampler(Sampler.alwaysOff()); + TraceSamplingRatePolicyImplementer implementer = + new TraceSamplingRatePolicyImplementer(delegatingSampler); + + implementer.onPoliciesChanged( + singletonList(new TelemetryPolicy("other-policy", spec("value", 1.0)))); + + assertThat(decisionFor(delegatingSampler)).isEqualTo(SamplingDecision.DROP); + } + + @Test + void ignoresTraceSamplingPolicyWithoutProbability() { + DelegatingSampler delegatingSampler = new DelegatingSampler(Sampler.alwaysOff()); + TraceSamplingRatePolicyImplementer implementer = + new TraceSamplingRatePolicyImplementer(delegatingSampler); + + implementer.onPoliciesChanged( + singletonList(new TelemetryPolicy("trace-sampling", spec("other-field", 1.0)))); + + assertThat(decisionFor(delegatingSampler)).isEqualTo(SamplingDecision.DROP); + } + + @Test + void lastTraceSamplingPolicyWins() { + DelegatingSampler delegatingSampler = new DelegatingSampler(Sampler.alwaysOff()); + TraceSamplingRatePolicyImplementer implementer = + new TraceSamplingRatePolicyImplementer(delegatingSampler); + + List policies = + Arrays.asList( + new TelemetryPolicy("trace-sampling", spec("probability", 0.0)), + new TelemetryPolicy("trace-sampling", spec("probability", 1.0))); + + implementer.onPoliciesChanged(policies); + + assertThat(decisionFor(delegatingSampler)).isEqualTo(SamplingDecision.RECORD_AND_SAMPLE); + } + + private static SamplingDecision decisionFor(DelegatingSampler sampler) { + SamplingResult result = + sampler.shouldSample( + Context.root(), + "00000000000000000000000000000001", + "test-span", + SpanKind.INTERNAL, + Attributes.empty(), + Collections.emptyList()); + return result.getDecision(); + } + + private static JsonNode spec(String field, double value) { + return MAPPER.createObjectNode().put(field, value); + } +}