diff --git a/instrumentation/apache-dubbo-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachedubbo/v2_7/DubboInstrumentationModule.java b/instrumentation/apache-dubbo-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachedubbo/v2_7/DubboInstrumentationModule.java index 3e6ec6f5fd23..4073f67d7aa6 100644 --- a/instrumentation/apache-dubbo-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachedubbo/v2_7/DubboInstrumentationModule.java +++ b/instrumentation/apache-dubbo-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachedubbo/v2_7/DubboInstrumentationModule.java @@ -31,13 +31,17 @@ public void registerHelperResources(HelperResourceBuilder helperResourceBuilder) helperResourceBuilder.register( "META-INF/services/org.apache.dubbo.rpc.Filter", "apache-dubbo-2.7/META-INF/services/org.apache.dubbo.rpc.Filter"); + helperResourceBuilder.register( + "META-INF/services/org.apache.dubbo.rpc.cluster.Cluster", + "apache-dubbo-2.7/META-INF/services/org.apache.dubbo.rpc.cluster.Cluster"); } @Override public List exposedClassNames() { return asList( "io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.OpenTelemetryClientFilter", - "io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.OpenTelemetryServerFilter"); + "io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.OpenTelemetryServerFilter", + "io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.RegistryCapturingClusterWrapperProxy"); } @Override diff --git a/instrumentation/apache-dubbo-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachedubbo/v2_7/RegistryCapturingClusterWrapperProxy.java b/instrumentation/apache-dubbo-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachedubbo/v2_7/RegistryCapturingClusterWrapperProxy.java new file mode 100644 index 000000000000..a368826c8dbc --- /dev/null +++ b/instrumentation/apache-dubbo-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachedubbo/v2_7/RegistryCapturingClusterWrapperProxy.java @@ -0,0 +1,31 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7; + +import io.opentelemetry.instrumentation.apachedubbo.v2_7.internal.RegistryCapturingClusterWrapper; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.cluster.Cluster; +import org.apache.dubbo.rpc.cluster.Directory; + +public final class RegistryCapturingClusterWrapperProxy implements Cluster { + + private final RegistryCapturingClusterWrapper delegate; + + @SuppressWarnings("unused") + public RegistryCapturingClusterWrapperProxy(Cluster cluster) { + this.delegate = new RegistryCapturingClusterWrapper(cluster); + } + + @Override + public Invoker join(Directory directory) { + return delegate.join(directory); + } + + @SuppressWarnings("unused") + public Invoker join(Directory directory, boolean buildFilterChain) { + return delegate.join(directory, buildFilterChain); + } +} diff --git a/instrumentation/apache-dubbo-2.7/javaagent/src/main/resources/apache-dubbo-2.7/META-INF/services/org.apache.dubbo.rpc.cluster.Cluster b/instrumentation/apache-dubbo-2.7/javaagent/src/main/resources/apache-dubbo-2.7/META-INF/services/org.apache.dubbo.rpc.cluster.Cluster new file mode 100644 index 000000000000..76d5f86920ef --- /dev/null +++ b/instrumentation/apache-dubbo-2.7/javaagent/src/main/resources/apache-dubbo-2.7/META-INF/services/org.apache.dubbo.rpc.cluster.Cluster @@ -0,0 +1 @@ +io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.RegistryCapturingClusterWrapperProxy diff --git a/instrumentation/apache-dubbo-2.7/javaagent/src/testDubbo/java/io/opentelemetry/javaagent/instrumentation/apachedubbo/v2_7/DubboAgentRegistryTest.java b/instrumentation/apache-dubbo-2.7/javaagent/src/testDubbo/java/io/opentelemetry/javaagent/instrumentation/apachedubbo/v2_7/DubboAgentRegistryTest.java new file mode 100644 index 000000000000..6a022484ba3e --- /dev/null +++ b/instrumentation/apache-dubbo-2.7/javaagent/src/testDubbo/java/io/opentelemetry/javaagent/instrumentation/apachedubbo/v2_7/DubboAgentRegistryTest.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7; + +import io.opentelemetry.instrumentation.apachedubbo.v2_7.AbstractDubboRegistryTest; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class DubboAgentRegistryTest extends AbstractDubboRegistryTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected boolean hasServicePeerName() { + return true; + } +} diff --git a/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/DubboRequest.java b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/DubboRequest.java index 0733b744c867..1d8046e12de5 100644 --- a/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/DubboRequest.java +++ b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/DubboRequest.java @@ -6,6 +6,7 @@ package io.opentelemetry.instrumentation.apachedubbo.v2_7; import com.google.auto.value.AutoValue; +import io.opentelemetry.instrumentation.apachedubbo.v2_7.internal.DubboRegistryUtil; import java.net.InetSocketAddress; import javax.annotation.Nullable; import org.apache.dubbo.common.URL; @@ -16,14 +17,15 @@ public abstract class DubboRequest { static DubboRequest create(RpcInvocation invocation, RpcContext context) { - // In dubbo 3 RpcContext delegates to a ThreadLocal context. We copy the url and remote address - // here to ensure we can access them from the thread that ends the span. + // In dubbo 3 RpcContext delegates to a ThreadLocal context. We copy the url, remote address, + // and registry address here to ensure we can access them from the thread that ends the span. return new AutoValue_DubboRequest( invocation, context, context.getUrl(), context.getRemoteAddress(), - context.getLocalAddress()); + context.getLocalAddress(), + DubboRegistryUtil.extractRegistryAddress(invocation)); } abstract RpcInvocation invocation(); @@ -37,4 +39,7 @@ static DubboRequest create(RpcInvocation invocation, RpcContext context) { @Nullable public abstract InetSocketAddress localAddress(); + + @Nullable + public abstract String registryAddress(); } diff --git a/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/DubboClientNetworkAttributesGetter.java b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/DubboClientNetworkAttributesGetter.java index 1077ee19c7ce..4931ab542027 100644 --- a/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/DubboClientNetworkAttributesGetter.java +++ b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/DubboClientNetworkAttributesGetter.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.apachedubbo.v2_7.internal; +import static io.opentelemetry.instrumentation.apachedubbo.v2_7.internal.DubboRegistryUtil.buildServiceTarget; + import io.opentelemetry.instrumentation.apachedubbo.v2_7.DubboRequest; import io.opentelemetry.instrumentation.api.semconv.network.NetworkAttributesGetter; import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesGetter; @@ -22,11 +24,19 @@ public final class DubboClientNetworkAttributesGetter @Nullable @Override public String getServerAddress(DubboRequest request) { + String registryAddress = request.registryAddress(); + if (registryAddress != null) { + return registryAddress + "/" + buildServiceTarget(request.url()); + } return request.url().getHost(); } + @Nullable @Override public Integer getServerPort(DubboRequest request) { + if (request.registryAddress() != null) { + return null; + } return request.url().getPort(); } diff --git a/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/DubboRegistryUtil.java b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/DubboRegistryUtil.java new file mode 100644 index 000000000000..8736ab5f6632 --- /dev/null +++ b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/DubboRegistryUtil.java @@ -0,0 +1,199 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.apachedubbo.v2_7.internal; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.RpcInvocation; +import org.apache.dubbo.rpc.cluster.Directory; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public final class DubboRegistryUtil { + + private static final MethodHandles.Lookup LOOKUP = MethodHandles.lookup(); + + private static final ClassValue> DIRECTORY_ACCESSOR = + createAccessor("getDirectory", "directory"); + private static final ClassValue> REGISTRY_ACCESSOR = + createAccessor("getRegistry", "registry"); + private static final ClassValue> URL_ACCESSOR = + createAccessor("getUrl", null); + + private static final ThreadLocal CAPTURED_REGISTRY_ADDRESS = new ThreadLocal<>(); + + /** + * Used by {@link RegistryCapturingInvoker} while the cluster delegate runs (including into + * consumer protocol filters). Returns the previous value so callers can restore it. + */ + @Nullable + static String pushCapturedRegistryAddress(String address) { + String previous = CAPTURED_REGISTRY_ADDRESS.get(); + CAPTURED_REGISTRY_ADDRESS.set(address); + return previous; + } + + static void restoreCapturedRegistryAddress(@Nullable String previous) { + if (previous == null) { + CAPTURED_REGISTRY_ADDRESS.remove(); + } else { + CAPTURED_REGISTRY_ADDRESS.set(previous); + } + } + + @Nullable + public static String extractRegistryAddress(RpcInvocation invocation) { + String captured = CAPTURED_REGISTRY_ADDRESS.get(); + if (captured != null) { + return captured; + } + + Invoker invoker = invocation.getInvoker(); + if (invoker == null) { + return null; + } + + Directory directory = getDirectory(invoker); + if (directory != null) { + String address = tryExtractRegistryAddressFromDirectory(directory); + if (address != null) { + return address; + } + } + + return null; + } + + public static String buildServiceTarget(URL url) { + String interfaceName = url.getServiceInterface(); + if (interfaceName == null || interfaceName.isEmpty()) { + interfaceName = url.getPath(); + } + if (interfaceName == null || interfaceName.isEmpty()) { + return ""; + } + + String version = url.getParameter("version"); + String group = url.getParameter("group"); + boolean hasVersion = version != null && !version.isEmpty(); + boolean hasGroup = group != null && !group.isEmpty(); + + if (!hasVersion && !hasGroup) { + return interfaceName; + } + + StringBuilder sb = new StringBuilder(interfaceName); + sb.append(':'); + if (hasVersion) { + sb.append(version); + } + if (hasGroup) { + sb.append(':').append(group); + } + return sb.toString(); + } + + @Nullable + private static Directory getDirectory(Invoker invoker) { + MethodHandle mh = DIRECTORY_ACCESSOR.get(invoker.getClass()).orElse(null); + if (mh == null) { + return null; + } + try { + Object obj = mh.invoke(invoker); + return obj instanceof Directory ? (Directory) obj : null; + } catch (Throwable t) { + return null; + } + } + + /** + * Resolves {@code protocol://host:port} from a registry-backed directory (for example {@code + * RegistryDirectory}), using {@code getRegistry()} when present and otherwise the {@code + * registry} field. Called once per consumer refer when {@link RegistryCapturingClusterWrapper} + * wraps the cluster invoker. + */ + @Nullable + static String tryExtractRegistryAddressFromDirectory(Directory directory) { + MethodHandle getRegistry = REGISTRY_ACCESSOR.get(directory.getClass()).orElse(null); + if (getRegistry == null) { + return null; + } + try { + Object registry = getRegistry.invoke(directory); + if (registry == null) { + return null; + } + MethodHandle getUrl = URL_ACCESSOR.get(registry.getClass()).orElse(null); + if (getUrl == null) { + return null; + } + Object urlObj = getUrl.invoke(registry); + if (!(urlObj instanceof URL)) { + return null; + } + URL url = (URL) urlObj; + return url.getProtocol() + "://" + url.getAddress(); + } catch (Throwable t) { + return null; + } + } + + private static ClassValue> createAccessor( + String methodName, @Nullable String fieldName) { + return new ClassValue>() { + @Override + protected Optional computeValue(Class type) { + MethodHandle mh = resolveMethod(type, methodName); + if (mh != null) { + return Optional.of(mh); + } + if (fieldName != null) { + mh = resolveField(type, fieldName); + if (mh != null) { + return Optional.of(mh); + } + } + return Optional.empty(); + } + }; + } + + @Nullable + private static MethodHandle resolveMethod(Class clazz, String name) { + try { + Method m = clazz.getMethod(name); + return LOOKUP.unreflect(m); + } catch (NoSuchMethodException | IllegalAccessException ignored) { + // ignore + } + return null; + } + + @Nullable + private static MethodHandle resolveField(Class clazz, String name) { + for (Class c = clazz; c != null && c != Object.class; c = c.getSuperclass()) { + try { + Field f = c.getDeclaredField(name); + f.setAccessible(true); + return LOOKUP.unreflectGetter(f); + } catch (NoSuchFieldException | IllegalAccessException ignored) { + // ignore + } + } + return null; + } + + private DubboRegistryUtil() {} +} diff --git a/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/RegistryCapturingClusterWrapper.java b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/RegistryCapturingClusterWrapper.java new file mode 100644 index 000000000000..d4001e1e0e8e --- /dev/null +++ b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/RegistryCapturingClusterWrapper.java @@ -0,0 +1,105 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.apachedubbo.v2_7.internal; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Method; +import javax.annotation.Nullable; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.cluster.Cluster; +import org.apache.dubbo.rpc.cluster.Directory; + +/** + * Dubbo {@link Cluster} SPI wrapper that records the registry URL for each registry-backed {@link + * Directory} when the cluster invoker is entered, so consumer {@link org.apache.dubbo.rpc.Filter} + * implementations can read it from {@link DubboRegistryUtil}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class RegistryCapturingClusterWrapper implements Cluster { + + private static final MethodHandles.Lookup LOOKUP = MethodHandles.lookup(); + + /** Dubbo 3.0.4+: join(Directory, boolean). Absent on 2.7 and 3.0.0-3.0.3. */ + @Nullable private static final MethodHandle JOIN_TWO_ARG; + + /** Dubbo 2.7 and 3.0.0-3.0.3: join(Directory). Absent on 3.0.4+. */ + @Nullable private static final MethodHandle JOIN_ONE_ARG; + + static { + MethodHandle two = null; + MethodHandle one = null; + try { + Method m = Cluster.class.getMethod("join", Directory.class, boolean.class); + two = LOOKUP.unreflect(m); + } catch (ReflectiveOperationException ignored) { + // Dubbo 2.7 / 3.0.0-3.0.3 + } + try { + Method m = Cluster.class.getMethod("join", Directory.class); + one = LOOKUP.unreflect(m); + } catch (ReflectiveOperationException ignored) { + // Dubbo 3.0.4+ + } + JOIN_TWO_ARG = two; + JOIN_ONE_ARG = one; + } + + private final Cluster cluster; + + /** SPI wrapper constructor — must accept the delegate {@link Cluster}. */ + @SuppressWarnings("unused") // Used by Dubbo ExtensionLoader via reflection + public RegistryCapturingClusterWrapper(Cluster cluster) { + this.cluster = cluster; + } + + @Override + public Invoker join(Directory directory) { + return wrapIfNeeded(directory, delegateJoin(cluster, directory, true)); + } + + /** + * Dubbo 3.0.4+ entry point. Not an {@code @Override} when compiling against Dubbo 2.7, but + * required for {@link Cluster} at runtime on 3.0.4+. + */ + @SuppressWarnings("unused") + public Invoker join(Directory directory, boolean buildFilterChain) { + return wrapIfNeeded(directory, delegateJoin(cluster, directory, buildFilterChain)); + } + + /** + * {@link MethodHandle#invoke} is untyped; the returned invoker matches the generic {@code T} from + * the {@code directory} argument and delegate {@link Cluster#join}. + */ + @SuppressWarnings("unchecked") + private static Invoker delegateJoin( + Cluster cluster, Directory directory, boolean buildFilterChain) { + try { + if (JOIN_TWO_ARG != null) { + return (Invoker) JOIN_TWO_ARG.invoke(cluster, directory, buildFilterChain); + } + if (JOIN_ONE_ARG != null) { + return (Invoker) JOIN_ONE_ARG.invoke(cluster, directory); + } + } catch (RpcException e) { + throw e; + } catch (Throwable t) { + throw new RpcException(t.getMessage(), t); + } + throw new RpcException("No join(Directory) or join(Directory,boolean) on Cluster"); + } + + private static Invoker wrapIfNeeded(Directory directory, Invoker invoker) { + String registryAddress = DubboRegistryUtil.tryExtractRegistryAddressFromDirectory(directory); + if (registryAddress == null) { + return invoker; + } + return new RegistryCapturingInvoker<>(invoker, registryAddress); + } +} diff --git a/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/RegistryCapturingInvoker.java b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/RegistryCapturingInvoker.java new file mode 100644 index 000000000000..cf0410c431ff --- /dev/null +++ b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/RegistryCapturingInvoker.java @@ -0,0 +1,59 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.apachedubbo.v2_7.internal; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; + +/** + * Wraps a cluster invoker to publish the consumer registry address for the current thread while the + * delegate chain runs (for example into the Dubbo consumer protocol filter chain). + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +final class RegistryCapturingInvoker implements Invoker { + + private final Invoker delegate; + private final String registryAddress; + + RegistryCapturingInvoker(Invoker delegate, String registryAddress) { + this.delegate = delegate; + this.registryAddress = registryAddress; + } + + @Override + public Class getInterface() { + return delegate.getInterface(); + } + + @Override + public URL getUrl() { + return delegate.getUrl(); + } + + @Override + public boolean isAvailable() { + return delegate.isAvailable(); + } + + @Override + public void destroy() { + delegate.destroy(); + } + + @Override + public Result invoke(Invocation invocation) { + String previous = DubboRegistryUtil.pushCapturedRegistryAddress(registryAddress); + try { + return delegate.invoke(invocation); + } finally { + DubboRegistryUtil.restoreCapturedRegistryAddress(previous); + } + } +} diff --git a/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/resources/META-INF/services/org.apache.dubbo.rpc.cluster.Cluster b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/resources/META-INF/services/org.apache.dubbo.rpc.cluster.Cluster new file mode 100644 index 000000000000..91645ea1aa83 --- /dev/null +++ b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/main/resources/META-INF/services/org.apache.dubbo.rpc.cluster.Cluster @@ -0,0 +1 @@ +io.opentelemetry.instrumentation.apachedubbo.v2_7.internal.RegistryCapturingClusterWrapper diff --git a/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/test/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/DubboRegistryTest.java b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/test/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/DubboRegistryTest.java new file mode 100644 index 000000000000..61c01d0e768f --- /dev/null +++ b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/test/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/DubboRegistryTest.java @@ -0,0 +1,26 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.apachedubbo.v2_7; + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class DubboRegistryTest extends AbstractDubboRegistryTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected boolean hasServicePeerName() { + return false; + } +} diff --git a/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/test/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/DubboRegistryUtilTest.java b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/test/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/DubboRegistryUtilTest.java new file mode 100644 index 000000000000..337e1d4e7821 --- /dev/null +++ b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/test/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/DubboRegistryUtilTest.java @@ -0,0 +1,125 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.apachedubbo.v2_7.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +import java.lang.reflect.Field; +import java.util.stream.Stream; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcInvocation; +import org.apache.dubbo.rpc.cluster.Directory; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +class DubboRegistryUtilTest { + + private static final URL DUMMY_URL = + URL.valueOf("dubbo://192.168.1.100:20880/com.example.Service"); + + @ParameterizedTest + @MethodSource("serviceTargetProvider") + void buildServiceTarget(String urlString, String expected) { + assertThat(DubboRegistryUtil.buildServiceTarget(URL.valueOf(urlString))).isEqualTo(expected); + } + + static Stream serviceTargetProvider() { + return Stream.of( + Arguments.of( + "dubbo://192.168.1.100:20880/com.example.HelloService", "com.example.HelloService"), + Arguments.of( + "dubbo://192.168.1.100:20880/com.example.HelloService?version=2.0.0", + "com.example.HelloService:2.0.0"), + Arguments.of( + "dubbo://192.168.1.100:20880/com.example.HelloService?version=2.0.0&group=gray", + "com.example.HelloService:2.0.0:gray"), + Arguments.of( + "dubbo://192.168.1.100:20880/com.example.HelloService?group=gray", + "com.example.HelloService::gray"), + Arguments.of( + "dubbo://192.168.1.100:20880/com.example.HelloService?version=&group=", + "com.example.HelloService"), + Arguments.of("dubbo://192.168.1.100:20880/", ""), + Arguments.of("dubbo://192.168.1.100:20880", "")); + } + + @Test + void extractRegistryAddressReturnsNullWhenNoInvoker() { + RpcInvocation invocation = new RpcInvocation(); + assertThat(DubboRegistryUtil.extractRegistryAddress(invocation)).isNull(); + } + + /** Tests the field-based reflection fallback when no {@code getDirectory()} method exists. */ + @Test + void extractRegistryAddressFieldFallback() throws Exception { + Directory dir = mockDirectoryWithRegistry(new FakeZkRegistry()); + FieldOnlyClusterInvoker invoker = new FieldOnlyClusterInvoker(dir, DUMMY_URL); + RpcInvocation invocation = new RpcInvocation(); + invocation.setInvoker(invoker); + assertThat(DubboRegistryUtil.extractRegistryAddress(invocation)) + .isEqualTo("zookeeper://10.0.0.1:2181"); + } + + @SuppressWarnings("UnusedVariable") + abstract static class MockableRegistryDirectory implements Directory { + Object registry; + } + + private static Directory mockDirectoryWithRegistry(Object registry) throws Exception { + MockableRegistryDirectory dir = mock(MockableRegistryDirectory.class); + Field f = MockableRegistryDirectory.class.getDeclaredField("registry"); + f.setAccessible(true); + f.set(dir, registry); + return dir; + } + + @SuppressWarnings({"unused", "EffectivelyPrivate"}) + private static class FieldOnlyClusterInvoker implements Invoker { + private final Directory directory; + private final URL url; + + FieldOnlyClusterInvoker(Directory directory, URL url) { + this.directory = directory; + this.url = url; + } + + @Override + public Class getInterface() { + return Object.class; + } + + @Override + public Result invoke(Invocation invocation) { + throw new UnsupportedOperationException(); + } + + @Override + public URL getUrl() { + return url; + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public void destroy() {} + } + + static class FakeZkRegistry { + @SuppressWarnings("unused") + public URL getUrl() { + return URL.valueOf("zookeeper://10.0.0.1:2181/org.apache.dubbo.registry.RegistryService"); + } + } +} diff --git a/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/test/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/RegistryCapturingClusterWrapperTest.java b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/test/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/RegistryCapturingClusterWrapperTest.java new file mode 100644 index 000000000000..a806be55600f --- /dev/null +++ b/instrumentation/apache-dubbo-2.7/library-autoconfigure/src/test/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/internal/RegistryCapturingClusterWrapperTest.java @@ -0,0 +1,126 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.apachedubbo.v2_7.internal; + +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.cluster.Cluster; +import org.apache.dubbo.rpc.cluster.Directory; +import org.apache.dubbo.rpc.cluster.directory.StaticDirectory; +import org.junit.jupiter.api.Test; + +class RegistryCapturingClusterWrapperTest { + + private static final URL DUMMY_URL = + URL.valueOf("dubbo://192.168.1.100:20880/com.example.Service"); + + @Test + void joinDoesNotWrapStaticDirectory() { + Invoker innerInvoker = new NoopInvoker(DUMMY_URL); + RegistryCapturingClusterWrapper wrapper = + new RegistryCapturingClusterWrapper(new FakeCluster(innerInvoker)); + StubInvoker stub = new StubInvoker(DUMMY_URL); + StaticDirectory staticDir = new StaticDirectory<>(singletonList(stub)); + + Invoker out = wrapper.join(staticDir); + assertThat(out).isSameAs(innerInvoker); + } + + /** + * Fake {@link Cluster} that provides both the Dubbo 2.7 {@code join(Directory)} and 3.0.4+ {@code + * join(Directory, boolean)} signatures, following the same pattern as {@link + * RegistryCapturingClusterWrapper}. + */ + @SuppressWarnings("unchecked") + private static class FakeCluster implements Cluster { + private final Invoker invoker; + + FakeCluster(Invoker invoker) { + this.invoker = invoker; + } + + // Dubbo 2.7 signature + // @Override — present in 2.7, removed in 3.0.4+ + @SuppressWarnings({"MissingOverride", "UnusedMethod", "UnusedVariable", "EffectivelyPrivate"}) + public Invoker join(Directory directory) { + return (Invoker) invoker; + } + + // Dubbo 3.0.4+ signature + // @Override — present in 3.0.4+, absent in 2.7 + @SuppressWarnings({"MissingOverride", "UnusedMethod", "UnusedVariable", "EffectivelyPrivate"}) + public Invoker join(Directory directory, boolean buildFilterChain) { + return (Invoker) invoker; + } + } + + private static class NoopInvoker implements Invoker { + private final URL url; + + NoopInvoker(URL url) { + this.url = url; + } + + @Override + public Class getInterface() { + return Object.class; + } + + @Override + public Result invoke(Invocation invocation) { + return null; + } + + @Override + public URL getUrl() { + return url; + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public void destroy() {} + } + + private static class StubInvoker implements Invoker { + private final URL url; + + StubInvoker(URL url) { + this.url = url; + } + + @Override + public Class getInterface() { + return Object.class; + } + + @Override + public Result invoke(Invocation invocation) { + throw new UnsupportedOperationException(); + } + + @Override + public URL getUrl() { + return url; + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public void destroy() {} + } +} diff --git a/instrumentation/apache-dubbo-2.7/testing/build.gradle.kts b/instrumentation/apache-dubbo-2.7/testing/build.gradle.kts index 2184a92a0608..def329fc28ec 100644 --- a/instrumentation/apache-dubbo-2.7/testing/build.gradle.kts +++ b/instrumentation/apache-dubbo-2.7/testing/build.gradle.kts @@ -10,5 +10,11 @@ dependencies { api("org.apache.dubbo:dubbo:$apacheDubboVersion") api("org.apache.dubbo:dubbo-config-api:$apacheDubboVersion") + api("org.apache.dubbo:dubbo-registry-zookeeper:$apacheDubboVersion") + api("org.apache.curator:curator-test:5.9.0") + api("org.apache.curator:curator-recipes:5.9.0") + + implementation("javax.annotation:javax.annotation-api:1.3.2") + implementation("io.opentelemetry:opentelemetry-api") } diff --git a/instrumentation/apache-dubbo-2.7/testing/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/AbstractDubboRegistryTest.java b/instrumentation/apache-dubbo-2.7/testing/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/AbstractDubboRegistryTest.java new file mode 100644 index 000000000000..699bba6e464c --- /dev/null +++ b/instrumentation/apache-dubbo-2.7/testing/src/main/java/io/opentelemetry/instrumentation/apachedubbo/v2_7/AbstractDubboRegistryTest.java @@ -0,0 +1,219 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.apachedubbo.v2_7; + +import static io.opentelemetry.instrumentation.apachedubbo.v2_7.AbstractDubboTest.assertLatestDeps; +import static io.opentelemetry.instrumentation.api.internal.SemconvStability.emitOldRpcSemconv; +import static io.opentelemetry.instrumentation.api.internal.SemconvStability.emitStableRpcSemconv; +import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan; +import static io.opentelemetry.instrumentation.testing.junit.service.SemconvServiceStabilityUtil.maybeStablePeerService; +import static io.opentelemetry.instrumentation.testing.util.TestLatestDeps.testLatestDeps; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PEER_ADDRESS; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PEER_PORT; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_TYPE; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM_NAME; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.apachedubbo.v2_7.api.HelloService; +import io.opentelemetry.instrumentation.apachedubbo.v2_7.impl.HelloServiceImpl; +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.lang.reflect.Field; +import java.net.InetAddress; +import org.apache.curator.test.TestingServer; +import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.config.ApplicationConfig; +import org.apache.dubbo.config.ProtocolConfig; +import org.apache.dubbo.config.ReferenceConfig; +import org.apache.dubbo.config.RegistryConfig; +import org.apache.dubbo.config.ServiceConfig; +import org.apache.dubbo.config.bootstrap.DubboBootstrap; +import org.apache.dubbo.rpc.service.GenericService; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** + * Integration test that verifies the registry-mode end-to-end flow: provider registers to + * ZooKeeper, consumer discovers via ZooKeeper, and the {@code SERVER_ADDRESS} span attribute + * contains the registry address (with service interface, version, and group) instead of the + * provider host. + */ +@SuppressWarnings("deprecation") // using deprecated semconv +public abstract class AbstractDubboRegistryTest { + + private static final String SERVICE_VERSION = "1.0.0"; + private static final String SERVICE_GROUP = "testGroup"; + + private static TestingServer zkServer; + + @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); + + protected abstract InstrumentationExtension testing(); + + protected abstract boolean hasServicePeerName(); + + @BeforeAll + static void setUp() throws Exception { + zkServer = new TestingServer(); + cleanup.deferAfterAll(zkServer); + zkServer.start(); + + System.setProperty("dubbo.application.qos-enable", "false"); + cleanup.deferAfterAll(() -> System.clearProperty("dubbo.application.qos-enable")); + Field field = NetUtils.class.getDeclaredField("LOCAL_ADDRESS"); + field.setAccessible(true); + InetAddress originalLocalAddress = (InetAddress) field.get(null); + field.set(null, InetAddress.getLoopbackAddress()); + cleanup.deferAfterAll(() -> field.set(null, originalLocalAddress)); + } + + private static String zkAddress() { + return "zookeeper://127.0.0.1:" + zkServer.getPort(); + } + + private static RegistryConfig newZkRegistryConfig(String zkAddr) { + RegistryConfig config = new RegistryConfig(); + config.setAddress(zkAddr); + config.setUseAsConfigCenter(false); + config.setUseAsMetadataCenter(false); + return config; + } + + @Test + void testRegistryModeServerAddress() throws Exception { + int port = PortUtils.findOpenPort(); + String zkAddr = zkAddress(); + + ProtocolConfig protocolConfig = new ProtocolConfig(); + protocolConfig.setPort(port); + + ServiceConfig service = new ServiceConfig<>(); + service.setInterface(HelloService.class); + service.setRef(new HelloServiceImpl()); + service.setVersion(SERVICE_VERSION); + service.setGroup(SERVICE_GROUP); + + DubboBootstrap providerBootstrap = DubboTestUtil.newDubboBootstrap(); + cleanup.deferCleanup(providerBootstrap::destroy); + providerBootstrap + .application(new ApplicationConfig("dubbo-registry-test-provider")) + .registry(newZkRegistryConfig(zkAddr)) + .service(service) + .protocol(protocolConfig) + .start(); + + // --- consumer: discover from ZooKeeper (must disable injvm to avoid same-JVM shortcut) --- + ReferenceConfig referenceConfig = new ReferenceConfig<>(); + referenceConfig.setInterface(HelloService.class); + referenceConfig.setGeneric("true"); + referenceConfig.setVersion(SERVICE_VERSION); + referenceConfig.setGroup(SERVICE_GROUP); + referenceConfig.setTimeout(30000); + referenceConfig.setInjvm(false); + + ProtocolConfig consumerProtocol = new ProtocolConfig(); + consumerProtocol.setRegister(false); + + DubboBootstrap consumerBootstrap = DubboTestUtil.newDubboBootstrap(); + cleanup.deferCleanup(consumerBootstrap::destroy); + consumerBootstrap + .application(new ApplicationConfig("dubbo-registry-test-consumer")) + .registry(newZkRegistryConfig(zkAddr)) + .reference(referenceConfig) + .protocol(consumerProtocol) + .start(); + + @SuppressWarnings({"rawtypes", "unchecked"}) + ReferenceConfig reference = (ReferenceConfig) referenceConfig; + GenericService genericService = reference.get(); + + Object response = + runWithSpan( + "parent", + () -> + genericService.$invoke( + "hello", new String[] {String.class.getName()}, new Object[] {"hello"})); + + assertThat(response).isEqualTo("hello"); + + // In registry mode, SERVER_ADDRESS = "registryProtocol://host:port/interface:version:group" + // and SERVER_PORT is absent (null). + // See https://github.com/open-telemetry/semantic-conventions/pull/3317 + String expectedServiceTarget = + HelloService.class.getName() + ":" + SERVICE_VERSION + ":" + SERVICE_GROUP; + String expectedServerAddress = zkAddr + "/" + expectedServiceTarget; + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("org.apache.dubbo.rpc.service.GenericService/$invoke") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(RPC_SYSTEM, emitOldRpcSemconv() ? "apache_dubbo" : null), + equalTo(RPC_SYSTEM_NAME, emitStableRpcSemconv() ? "dubbo" : null), + equalTo( + RPC_SERVICE, + emitOldRpcSemconv() + ? "org.apache.dubbo.rpc.service.GenericService" + : null), + equalTo( + RPC_METHOD, + emitStableRpcSemconv() + ? "org.apache.dubbo.rpc.service.GenericService/$invoke" + : "$invoke"), + equalTo(SERVER_ADDRESS, expectedServerAddress), + equalTo(SERVER_PORT, null), + satisfies( + NETWORK_PEER_ADDRESS, + val -> + assertLatestDeps(val, v -> v.isInstanceOf(String.class))), + satisfies( + NETWORK_PEER_PORT, + val -> assertLatestDeps(val, v -> v.isInstanceOf(Long.class))), + satisfies(NETWORK_TYPE, AbstractDubboTest::assertNetworkType)), + span -> + span.hasName( + "io.opentelemetry.instrumentation.apachedubbo.v2_7.api.HelloService/hello") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(RPC_SYSTEM, emitOldRpcSemconv() ? "apache_dubbo" : null), + equalTo(RPC_SYSTEM_NAME, emitStableRpcSemconv() ? "dubbo" : null), + equalTo( + RPC_SERVICE, + emitOldRpcSemconv() + ? "io.opentelemetry.instrumentation.apachedubbo.v2_7.api.HelloService" + : null), + equalTo( + RPC_METHOD, + emitStableRpcSemconv() + ? "io.opentelemetry.instrumentation.apachedubbo.v2_7.api.HelloService/hello" + : "hello"), + equalTo( + maybeStablePeerService(), + hasServicePeerName() && testLatestDeps() + ? "test-peer-service" + : null), + satisfies( + NETWORK_PEER_ADDRESS, val -> val.isInstanceOf(String.class)), + satisfies( + NETWORK_PEER_PORT, val -> val.isInstanceOf(Long.class))))); + } +}