diff --git a/dynamic-control/src/main/java/io/opentelemetry/contrib/dynamic/policy/PolicyStore.java b/dynamic-control/src/main/java/io/opentelemetry/contrib/dynamic/policy/PolicyStore.java index cd0f2f3d5..b5ee87164 100644 --- a/dynamic-control/src/main/java/io/opentelemetry/contrib/dynamic/policy/PolicyStore.java +++ b/dynamic-control/src/main/java/io/opentelemetry/contrib/dynamic/policy/PolicyStore.java @@ -10,14 +10,20 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Holds the latest validated policy snapshot and reports whether an update changed effective * configuration. */ public final class PolicyStore { + private static final Logger logger = Logger.getLogger(PolicyStore.class.getName()); private List policies = Collections.emptyList(); + private final List implementers = new ArrayList<>(); + private long policyVersion; /** * Replaces the stored policies when the new snapshot is not equal to the current one. @@ -31,17 +37,91 @@ public final class PolicyStore { * * @return {@code true} if the store was updated, {@code false} if the snapshot was unchanged */ - public synchronized boolean updatePolicies(List newPolicies) { + public boolean updatePolicies(List newPolicies) { Objects.requireNonNull(newPolicies, "newPolicies cannot be null"); LinkedHashSet newPolicySet = new LinkedHashSet<>(newPolicies); - if (new LinkedHashSet<>(policies).equals(newPolicySet)) { - return false; + List policiesSnapshot; + List implementersSnapshot; + long snapshotVersion; + synchronized (this) { + if (new LinkedHashSet<>(policies).equals(newPolicySet)) { + return false; + } + policies = new ArrayList<>(newPolicySet); + policyVersion++; + snapshotVersion = policyVersion; + policiesSnapshot = new ArrayList<>(policies); + implementersSnapshot = new ArrayList<>(implementers); + } + for (RegisteredImplementer implementer : implementersSnapshot) { + notifyImplementer(implementer, policiesSnapshot, snapshotVersion); } - policies = new ArrayList<>(newPolicySet); return true; } + public void registerImplementer(PolicyImplementer implementer) { + Objects.requireNonNull(implementer, "implementer cannot be null"); + RegisteredImplementer registeredImplementer = new RegisteredImplementer(implementer); + List policiesSnapshot; + long snapshotVersion; + synchronized (this) { + implementers.add(registeredImplementer); + snapshotVersion = policyVersion; + policiesSnapshot = new ArrayList<>(policies); + } + notifyImplementer(registeredImplementer, policiesSnapshot, snapshotVersion); + } + public synchronized List getPolicies() { return Collections.unmodifiableList(new ArrayList<>(policies)); } + + public synchronized void clear() { + policies = Collections.emptyList(); + implementers.clear(); + policyVersion = 0; + } + + private static List relevantPoliciesFor( + PolicyImplementer implementer, List policies) { + Set supportedTypes = new LinkedHashSet<>(); + for (PolicyValidator validator : implementer.getValidators()) { + if (validator != null && validator.getPolicyType() != null) { + supportedTypes.add(validator.getPolicyType()); + } + } + ArrayList relevant = new ArrayList<>(); + for (TelemetryPolicy policy : policies) { + if (supportedTypes.contains(policy.getType())) { + relevant.add(policy); + } + } + return Collections.unmodifiableList(relevant); + } + + private static void notifyImplementer( + RegisteredImplementer registration, List policiesSnapshot, long version) { + synchronized (registration) { + if (version <= registration.lastDeliveredVersion) { + return; + } + try { + registration.implementer.onPoliciesChanged( + relevantPoliciesFor(registration.implementer, policiesSnapshot)); + } catch (RuntimeException e) { + logger.log(Level.WARNING, "Policy implementer failed to apply policy update", e); + } finally { + registration.lastDeliveredVersion = version; + } + } + } + + private static final class RegisteredImplementer { + private final PolicyImplementer implementer; + private long lastDeliveredVersion = -1; + + private RegisteredImplementer(PolicyImplementer implementer) { + this.implementer = implementer; + } + } } diff --git a/dynamic-control/src/main/java/io/opentelemetry/contrib/dynamic/policy/registry/PolicyInit.java b/dynamic-control/src/main/java/io/opentelemetry/contrib/dynamic/policy/registry/PolicyInit.java index 6d1b15247..2964f70e0 100644 --- a/dynamic-control/src/main/java/io/opentelemetry/contrib/dynamic/policy/registry/PolicyInit.java +++ b/dynamic-control/src/main/java/io/opentelemetry/contrib/dynamic/policy/registry/PolicyInit.java @@ -204,8 +204,9 @@ private static void resolveAndInitializeConfiguredPolicyTypes( } /** - * Initializes one policy class exactly once for this init pass. eg run {@code - * TraceSamplingRatePolicy::initialize} and cache the returned implementer. + * Initializes one policy class exactly once until shutdown. eg run {@code + * TraceSamplingRatePolicy::initialize}, cache the returned implementer, and register it with the + * policy store. * * @param policyClass policy class to initialize * @param autoConfiguration OpenTelemetry auto-configuration customizer @@ -218,21 +219,32 @@ private static void initializePolicyClass( if (!initializedPolicyClasses.add(policyClass)) { return; } - PolicyTypeInitializer policyTypeInitializer = POLICY_TYPE_INITIALIZERS.get(policyClass); - if (policyTypeInitializer == null) { - throw new IllegalStateException( - "No policyTypeInitializer registered for policy class '" + policyClass.getName() + "'"); - } + PolicyImplementer implementer; try { - PolicyImplementer implementer = policyTypeInitializer.initialize(autoConfiguration); - initializedImplementers.put(policyClass, implementer); - // TODO: register implementer with policyStore, not yet implemented - // policyStore.registerImplementer(implementer); - logger.log(Level.INFO, "Initialized policy class ''{0}''", policyClass.getName()); + synchronized (initializedImplementers) { + PolicyImplementer existing = initializedImplementers.get(policyClass); + if (existing != null) { + return; + } + PolicyTypeInitializer policyTypeInitializer = POLICY_TYPE_INITIALIZERS.get(policyClass); + if (policyTypeInitializer == null) { + throw new IllegalStateException( + "No policyTypeInitializer registered for policy class '" + + policyClass.getName() + + "'"); + } + implementer = + Objects.requireNonNull( + policyTypeInitializer.initialize(autoConfiguration), + "policyTypeInitializer returned null"); + initializedImplementers.put(policyClass, implementer); + } } catch (RuntimeException e) { throw new IllegalStateException( "Policy initializer failed for class '" + policyClass.getName() + "'", e); } + policyStore.registerImplementer(implementer); + logger.log(Level.INFO, "Initialized policy class ''{0}''", policyClass.getName()); } /** @@ -318,6 +330,11 @@ private static List createSourceValidators(PolicySourceConfig s *

Each source contributes an immutable snapshot; the store receives the flattened union. eg * OpAMPPolicyProvider and FilePolicyProvider both produce a new TraceSamplingRatePolicy, this * will combine them all into a single list. + * + *

TODO: implement spec-compliant merge semantics: matching policies' keep values must be + * combined using the most restrictive result; overlapping policy effects must be commutative, + * idempotent, and deterministic; duplicate policy IDs across providers must be resolved by + * provider priority. */ private static void updatePoliciesForSource( PolicyProvider provider, List policiesFromSource) { @@ -374,6 +391,7 @@ public static void shutdown() { sourcePolicies.clear(); sourcesActivated.set(false); initializedImplementers.clear(); + policyStore.clear(); } /** diff --git a/dynamic-control/src/test/java/io/opentelemetry/contrib/dynamic/policy/PolicyStoreTest.java b/dynamic-control/src/test/java/io/opentelemetry/contrib/dynamic/policy/PolicyStoreTest.java index 949a9d018..085d263d3 100644 --- a/dynamic-control/src/test/java/io/opentelemetry/contrib/dynamic/policy/PolicyStoreTest.java +++ b/dynamic-control/src/test/java/io/opentelemetry/contrib/dynamic/policy/PolicyStoreTest.java @@ -7,6 +7,11 @@ import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import io.opentelemetry.contrib.dynamic.policy.tracesampling.TraceSamplingRatePolicy; import java.util.Arrays; @@ -66,4 +71,81 @@ void updatePoliciesIgnoresDuplicatePoliciesInInput() { void getPoliciesReturnsEmptyWhenNeverUpdated() { assertThat(new PolicyStore().getPolicies()).isEqualTo(Collections.emptyList()); } + + @Test + void registerImplementerReceivesCurrentRelevantPolicies() { + PolicyStore store = new PolicyStore(); + store.updatePolicies( + Arrays.asList(new TraceSamplingRatePolicy(0.5), new TelemetryPolicy("other-policy"))); + + PolicyImplementer implementer = mock(PolicyImplementer.class); + PolicyValidator validator = mock(PolicyValidator.class); + when(validator.getPolicyType()).thenReturn(TraceSamplingRatePolicy.POLICY_TYPE); + when(implementer.getValidators()).thenReturn(singletonList(validator)); + + store.registerImplementer(implementer); + + verify(implementer).onPoliciesChanged(singletonList(new TraceSamplingRatePolicy(0.5))); + } + + @Test + void updatePoliciesNotifiesRegisteredImplementerWithRelevantPolicies() { + PolicyStore store = new PolicyStore(); + PolicyImplementer implementer = traceSamplingImplementer(); + + store.registerImplementer(implementer); + clearInvocations(implementer); + store.updatePolicies( + Arrays.asList(new TelemetryPolicy("other-policy"), new TraceSamplingRatePolicy(0.25))); + + verify(implementer).onPoliciesChanged(singletonList(new TraceSamplingRatePolicy(0.25))); + } + + @Test + void updatePoliciesContinuesWhenImplementerThrows() { + PolicyStore store = new PolicyStore(); + PolicyImplementer failingImplementer = traceSamplingImplementer(); + PolicyImplementer nextImplementer = traceSamplingImplementer(); + List updatedPolicies = singletonList(new TraceSamplingRatePolicy(0.25)); + doThrow(new IllegalStateException("boom")) + .when(failingImplementer) + .onPoliciesChanged(updatedPolicies); + + store.registerImplementer(failingImplementer); + store.registerImplementer(nextImplementer); + clearInvocations(failingImplementer, nextImplementer); + + assertThat(store.updatePolicies(updatedPolicies)).isTrue(); + + verify(failingImplementer).onPoliciesChanged(updatedPolicies); + verify(nextImplementer).onPoliciesChanged(updatedPolicies); + assertThat(store.getPolicies()).isEqualTo(updatedPolicies); + } + + @Test + void registerImplementerContinuesAfterPreviousImplementerThrows() { + PolicyStore store = new PolicyStore(); + List currentPolicies = singletonList(new TraceSamplingRatePolicy(0.5)); + assertThat(store.updatePolicies(currentPolicies)).isTrue(); + + PolicyImplementer failingImplementer = traceSamplingImplementer(); + PolicyImplementer nextImplementer = traceSamplingImplementer(); + doThrow(new IllegalStateException("boom")) + .when(failingImplementer) + .onPoliciesChanged(currentPolicies); + + store.registerImplementer(failingImplementer); + store.registerImplementer(nextImplementer); + + verify(failingImplementer).onPoliciesChanged(currentPolicies); + verify(nextImplementer).onPoliciesChanged(currentPolicies); + } + + private static PolicyImplementer traceSamplingImplementer() { + PolicyImplementer implementer = mock(PolicyImplementer.class); + PolicyValidator validator = mock(PolicyValidator.class); + when(validator.getPolicyType()).thenReturn(TraceSamplingRatePolicy.POLICY_TYPE); + when(implementer.getValidators()).thenReturn(singletonList(validator)); + return implementer; + } } diff --git a/dynamic-control/src/test/java/io/opentelemetry/contrib/dynamic/policy/registry/PolicyInitTest.java b/dynamic-control/src/test/java/io/opentelemetry/contrib/dynamic/policy/registry/PolicyInitTest.java index da0331c88..a6499a827 100644 --- a/dynamic-control/src/test/java/io/opentelemetry/contrib/dynamic/policy/registry/PolicyInitTest.java +++ b/dynamic-control/src/test/java/io/opentelemetry/contrib/dynamic/policy/registry/PolicyInitTest.java @@ -8,10 +8,13 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties; +import io.opentelemetry.contrib.dynamic.policy.PolicyImplementer; +import io.opentelemetry.contrib.dynamic.policy.TelemetryPolicy; import io.opentelemetry.contrib.dynamic.policy.tracesampling.TraceSamplingRatePolicy; import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizer; import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; @@ -21,6 +24,7 @@ import java.nio.file.Path; import java.util.Collections; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -95,6 +99,28 @@ void throwsWhenDeclarativeConfigUsesUnknownPolicyType() { .hasMessageContaining("Unknown policyType"); } + @Test + void initializesPolicyClassOnlyOnceAcrossRepeatedInitCalls() { + String policyType = "test-policy-idempotent"; + AtomicInteger initializeCount = new AtomicInteger(); + PolicyImplementer implementer = mock(PolicyImplementer.class); + when(implementer.getValidators()).thenReturn(Collections.emptyList()); + PolicyInit.registerPolicyType( + policyType, + IdempotentTestPolicy.class, + autoConfiguration -> { + initializeCount.incrementAndGet(); + return implementer; + }); + ConfigProperties config = mock(ConfigProperties.class); + + PolicyInit.initFromDeclarativeConfig(telemetryPolicyNodeConfig(policyType), config); + PolicyInit.initFromDeclarativeConfig(telemetryPolicyNodeConfig(policyType), config); + + assertThat(initializeCount.get()).isEqualTo(1); + verify(implementer, times(1)).onPoliciesChanged(Collections.emptyList()); + } + private static Function> capturePropertiesCustomizer( AutoConfigurationCustomizer customizer) { @SuppressWarnings("unchecked") @@ -134,4 +160,10 @@ private static String minimalJsonInitConfig() { + TraceSamplingRatePolicy.POLICY_TYPE + "\"}]}]}"; } + + private static final class IdempotentTestPolicy extends TelemetryPolicy { + private IdempotentTestPolicy() { + super("test-policy-idempotent"); + } + } }