Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@ public void registerHelperResources(HelperResourceBuilder helperResourceBuilder)
helperResourceBuilder.register(
"META-INF/services/org.apache.dubbo.rpc.Filter",
"apache-dubbo-2.7/META-INF/services/org.apache.dubbo.rpc.Filter");
helperResourceBuilder.register(
"META-INF/services/org.apache.dubbo.rpc.cluster.Cluster",
"apache-dubbo-2.7/META-INF/services/org.apache.dubbo.rpc.cluster.Cluster");
}

@Override
public List<String> exposedClassNames() {
return asList(
"io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.OpenTelemetryClientFilter",
"io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.OpenTelemetryServerFilter");
"io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.OpenTelemetryServerFilter",
"io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.RegistryCapturingClusterWrapperProxy");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7;

import io.opentelemetry.instrumentation.apachedubbo.v2_7.internal.RegistryCapturingClusterWrapper;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Directory;

public final class RegistryCapturingClusterWrapperProxy implements Cluster {

private final RegistryCapturingClusterWrapper delegate;

@SuppressWarnings("unused")
public RegistryCapturingClusterWrapperProxy(Cluster cluster) {
this.delegate = new RegistryCapturingClusterWrapper(cluster);
}

@Override
public <T> Invoker<T> join(Directory<T> directory) {
return delegate.join(directory);
}

@SuppressWarnings("unused")
public <T> Invoker<T> join(Directory<T> directory, boolean buildFilterChain) {
return delegate.join(directory, buildFilterChain);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.RegistryCapturingClusterWrapperProxy
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7;

import io.opentelemetry.instrumentation.apachedubbo.v2_7.AbstractDubboRegistryTest;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;

class DubboAgentRegistryTest extends AbstractDubboRegistryTest {

@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();

@Override
protected InstrumentationExtension testing() {
return testing;
}

@Override
protected boolean hasServicePeerName() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.instrumentation.apachedubbo.v2_7;

import com.google.auto.value.AutoValue;
import io.opentelemetry.instrumentation.apachedubbo.v2_7.internal.DubboRegistryUtil;
import java.net.InetSocketAddress;
import javax.annotation.Nullable;
import org.apache.dubbo.common.URL;
Expand All @@ -16,14 +17,15 @@
public abstract class DubboRequest {

static DubboRequest create(RpcInvocation invocation, RpcContext context) {
// In dubbo 3 RpcContext delegates to a ThreadLocal context. We copy the url and remote address
// here to ensure we can access them from the thread that ends the span.
// In dubbo 3 RpcContext delegates to a ThreadLocal context. We copy the url, remote address,
// and registry address here to ensure we can access them from the thread that ends the span.
return new AutoValue_DubboRequest(
invocation,
context,
context.getUrl(),
context.getRemoteAddress(),
context.getLocalAddress());
context.getLocalAddress(),
DubboRegistryUtil.extractRegistryAddress(invocation));
}

abstract RpcInvocation invocation();
Expand All @@ -37,4 +39,7 @@ static DubboRequest create(RpcInvocation invocation, RpcContext context) {

@Nullable
public abstract InetSocketAddress localAddress();

@Nullable
public abstract String registryAddress();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.instrumentation.apachedubbo.v2_7.internal;

import static io.opentelemetry.instrumentation.apachedubbo.v2_7.internal.DubboRegistryUtil.buildServiceTarget;

import io.opentelemetry.instrumentation.apachedubbo.v2_7.DubboRequest;
import io.opentelemetry.instrumentation.api.semconv.network.NetworkAttributesGetter;
import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesGetter;
Expand All @@ -22,11 +24,19 @@ public final class DubboClientNetworkAttributesGetter
@Nullable
@Override
public String getServerAddress(DubboRequest request) {
String registryAddress = request.registryAddress();
if (registryAddress != null) {
return registryAddress + "/" + buildServiceTarget(request.url());
}
return request.url().getHost();
}

@Nullable
@Override
public Integer getServerPort(DubboRequest request) {
if (request.registryAddress() != null) {
return null;
}
return request.url().getPort();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.apachedubbo.v2_7.internal;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.Directory;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public final class DubboRegistryUtil {

private static final MethodHandles.Lookup LOOKUP = MethodHandles.lookup();

private static final ClassValue<Optional<MethodHandle>> DIRECTORY_ACCESSOR =
createAccessor("getDirectory", "directory");
private static final ClassValue<Optional<MethodHandle>> REGISTRY_ACCESSOR =
createAccessor("getRegistry", "registry");
private static final ClassValue<Optional<MethodHandle>> URL_ACCESSOR =
createAccessor("getUrl", null);

private static final ThreadLocal<String> CAPTURED_REGISTRY_ADDRESS = new ThreadLocal<>();

/**
* Used by {@link RegistryCapturingInvoker} while the cluster delegate runs (including into
* consumer protocol filters). Returns the previous value so callers can restore it.
*/
@Nullable
static String pushCapturedRegistryAddress(String address) {
String previous = CAPTURED_REGISTRY_ADDRESS.get();
CAPTURED_REGISTRY_ADDRESS.set(address);
return previous;
}

static void restoreCapturedRegistryAddress(@Nullable String previous) {
if (previous == null) {
CAPTURED_REGISTRY_ADDRESS.remove();
} else {
CAPTURED_REGISTRY_ADDRESS.set(previous);
}
}
Comment thread
steverao marked this conversation as resolved.

@Nullable
public static String extractRegistryAddress(RpcInvocation invocation) {
String captured = CAPTURED_REGISTRY_ADDRESS.get();
if (captured != null) {
return captured;
}

Invoker<?> invoker = invocation.getInvoker();
if (invoker == null) {
return null;
}

Directory<?> directory = getDirectory(invoker);
if (directory != null) {
String address = tryExtractRegistryAddressFromDirectory(directory);
if (address != null) {
return address;
}
}

return null;
}

public static String buildServiceTarget(URL url) {
String interfaceName = url.getServiceInterface();
if (interfaceName == null || interfaceName.isEmpty()) {
interfaceName = url.getPath();
}
if (interfaceName == null || interfaceName.isEmpty()) {
return "";
}

String version = url.getParameter("version");
String group = url.getParameter("group");
boolean hasVersion = version != null && !version.isEmpty();
boolean hasGroup = group != null && !group.isEmpty();

if (!hasVersion && !hasGroup) {
return interfaceName;
}

StringBuilder sb = new StringBuilder(interfaceName);
sb.append(':');
if (hasVersion) {
sb.append(version);
}
if (hasGroup) {
sb.append(':').append(group);
}
return sb.toString();
}

@Nullable
private static Directory<?> getDirectory(Invoker<?> invoker) {
MethodHandle mh = DIRECTORY_ACCESSOR.get(invoker.getClass()).orElse(null);
if (mh == null) {
return null;
}
try {
Object obj = mh.invoke(invoker);
return obj instanceof Directory ? (Directory<?>) obj : null;
} catch (Throwable t) {
return null;
}
}

/**
* Resolves {@code protocol://host:port} from a registry-backed directory (for example {@code
* RegistryDirectory}), using {@code getRegistry()} when present and otherwise the {@code
* registry} field. Called once per consumer refer when {@link RegistryCapturingClusterWrapper}
* wraps the cluster invoker.
*/
@Nullable
static String tryExtractRegistryAddressFromDirectory(Directory<?> directory) {
MethodHandle getRegistry = REGISTRY_ACCESSOR.get(directory.getClass()).orElse(null);
if (getRegistry == null) {
return null;
}
try {
Object registry = getRegistry.invoke(directory);
if (registry == null) {
return null;
}
MethodHandle getUrl = URL_ACCESSOR.get(registry.getClass()).orElse(null);
if (getUrl == null) {
return null;
}
Object urlObj = getUrl.invoke(registry);
if (!(urlObj instanceof URL)) {
return null;
}
URL url = (URL) urlObj;
return url.getProtocol() + "://" + url.getAddress();
} catch (Throwable t) {
return null;
}
}

private static ClassValue<Optional<MethodHandle>> createAccessor(
String methodName, @Nullable String fieldName) {
return new ClassValue<Optional<MethodHandle>>() {
@Override
protected Optional<MethodHandle> computeValue(Class<?> type) {
MethodHandle mh = resolveMethod(type, methodName);
if (mh != null) {
return Optional.of(mh);
}
if (fieldName != null) {
mh = resolveField(type, fieldName);
if (mh != null) {
return Optional.of(mh);
}
}
return Optional.empty();
}
};
}

@Nullable
private static MethodHandle resolveMethod(Class<?> clazz, String name) {
try {
Method m = clazz.getMethod(name);
return LOOKUP.unreflect(m);
} catch (NoSuchMethodException | IllegalAccessException ignored) {
// ignore
}
return null;
}

@Nullable
private static MethodHandle resolveField(Class<?> clazz, String name) {
for (Class<?> c = clazz; c != null && c != Object.class; c = c.getSuperclass()) {
try {
Field f = c.getDeclaredField(name);
f.setAccessible(true);
return LOOKUP.unreflectGetter(f);
} catch (NoSuchFieldException | IllegalAccessException ignored) {
// ignore
}
}
return null;
}

private DubboRegistryUtil() {}
}
Loading
Loading