Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/plugins-jdk17-test.1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ jobs:
- c3p0-0.9.2.x-0.10.x-scenario
- spring-scheduled-6.x-scenario
- caffeine-3.x-scenario
- lettuce-webflux-6x-scenario
steps:
- uses: actions/checkout@v2
with:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/plugins-test.1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ jobs:
- kotlin-coroutine-scenario
- lettuce-scenario
- lettuce-6.5.x-scenario
- lettuce-webflux-5x-scenario
- mongodb-3.x-scenario
- mongodb-4.x-scenario
- netty-socketio-scenario
Expand Down
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Release Notes.
9.7.0
------------------


* Added support for Lettuce reactive Redis commands.

All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/249?closed=1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,17 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
}
EnhancedInstance enhancedCommand = (EnhancedInstance) spanCarrierCommand;

RedisCommandEnhanceInfo redisCommandEnhanceInfo = (RedisCommandEnhanceInfo) enhancedCommand.getSkyWalkingDynamicField();

if (redisCommandEnhanceInfo == null) {
redisCommandEnhanceInfo = new RedisCommandEnhanceInfo();
}

// command has been handle by another channel writer (cluster or sentinel case)
if (enhancedCommand.getSkyWalkingDynamicField() != null) {
if (redisCommandEnhanceInfo.getSpan() != null) {
//set peer in last channel writer (delegate)
if (peer != null) {
AbstractSpan span = (AbstractSpan) enhancedCommand.getSkyWalkingDynamicField();
span.setPeer(peer);
redisCommandEnhanceInfo.getSpan().setPeer(peer);
}
return;
}
Expand All @@ -81,7 +86,16 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
operationName = operationName + "BATCH_WRITE";
command = "BATCH_WRITE";
}

if (redisCommandEnhanceInfo.getSnapshot() != null) {
AbstractSpan localSpan = ContextManager.createLocalSpan("RedisReactive/local");
localSpan.setComponent(ComponentsDefine.LETTUCE);
SpanLayer.asCache(localSpan);
ContextManager.continued(redisCommandEnhanceInfo.getSnapshot());
}

AbstractSpan span = ContextManager.createExitSpan(operationName, peer);

span.setComponent(ComponentsDefine.LETTUCE);
Tags.CACHE_TYPE.set(span, "Redis");
if (StringUtil.isNotEmpty(key)) {
Expand All @@ -92,7 +106,12 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
SpanLayer.asCache(span);
span.prepareForAsync();
ContextManager.stopSpan();
enhancedCommand.setSkyWalkingDynamicField(span);

if (redisCommandEnhanceInfo.getSnapshot() != null) {
ContextManager.stopSpan();
}

enhancedCommand.setSkyWalkingDynamicField(redisCommandEnhanceInfo.setSpan(span));
}

private String getArgsKey(RedisCommand<?, ?, ?> redisCommand) {
Expand Down Expand Up @@ -124,7 +143,7 @@ public void handleMethodException(EnhancedInstance objInst, Method method, Objec
RedisCommand<?, ?, ?> redisCommand = getSpanCarrierCommand(allArguments[0]);
if (redisCommand instanceof EnhancedInstance && ((EnhancedInstance) redisCommand).getSkyWalkingDynamicField() != null) {
EnhancedInstance enhancedRedisCommand = (EnhancedInstance) redisCommand;
AbstractSpan abstractSpan = (AbstractSpan) enhancedRedisCommand.getSkyWalkingDynamicField();
AbstractSpan abstractSpan = ((RedisCommandEnhanceInfo) enhancedRedisCommand.getSkyWalkingDynamicField()).getSpan();
enhancedRedisCommand.setSkyWalkingDynamicField(null);
abstractSpan.log(t);
abstractSpan.asyncFinish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) {
if (objInst.getSkyWalkingDynamicField() != null) {
AbstractSpan span = (AbstractSpan) objInst.getSkyWalkingDynamicField();
AbstractSpan span = ((RedisCommandEnhanceInfo) objInst.getSkyWalkingDynamicField()).getSpan();
span.errorOccurred();
span.tag(new StringTag(CANCEL_SIGNAL_TAG), COMMAND_CANCEL_VALUE);
span.asyncFinish();
Expand All @@ -49,7 +49,7 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
if (objInst.getSkyWalkingDynamicField() != null) {
AbstractSpan span = (AbstractSpan) objInst.getSkyWalkingDynamicField();
AbstractSpan span = ((RedisCommandEnhanceInfo) objInst.getSkyWalkingDynamicField()).getSpan();
span.log(t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) {
if (objInst.getSkyWalkingDynamicField() != null) {
Throwable t = (Throwable) allArguments[0];
AbstractSpan span = (AbstractSpan) objInst.getSkyWalkingDynamicField();
AbstractSpan span = ((RedisCommandEnhanceInfo) objInst.getSkyWalkingDynamicField()).getSpan();
span.log(t);
span.asyncFinish();
objInst.setSkyWalkingDynamicField(null);
Expand All @@ -46,7 +46,7 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
if (objInst.getSkyWalkingDynamicField() != null) {
AbstractSpan span = (AbstractSpan) objInst.getSkyWalkingDynamicField();
AbstractSpan span = ((RedisCommandEnhanceInfo) objInst.getSkyWalkingDynamicField()).getSpan();
span.log(t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) {
if (objInst.getSkyWalkingDynamicField() != null) {
AbstractSpan span = (AbstractSpan) objInst.getSkyWalkingDynamicField();
AbstractSpan span = ((RedisCommandEnhanceInfo) objInst.getSkyWalkingDynamicField()).getSpan();
span.asyncFinish();
objInst.setSkyWalkingDynamicField(null);
}
Expand All @@ -45,7 +45,7 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
if (objInst.getSkyWalkingDynamicField() != null) {
AbstractSpan span = (AbstractSpan) objInst.getSkyWalkingDynamicField();
AbstractSpan span = ((RedisCommandEnhanceInfo) objInst.getSkyWalkingDynamicField()).getSpan();
span.log(t);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.lettuce.common;

import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;

/**
* RedisCommandEnhanceInfo holds SkyWalking tracing data for Lettuce commands
* executed in different asynchronous models.
*
* <p>The {@link AbstractSpan} is used for non-reactive (blocking) commands
* that are executed asynchronously, where the span needs to be created
* at command submission time and finished when the command completes.</p>
*
* <p>The {@link ContextSnapshot} is used for reactive commands, where the
* tracing context is captured from Reactor {@code Context} and later
* continued at subscription or execution time to bridge reactive
* boundaries.</p>
*/
class RedisCommandEnhanceInfo {

private AbstractSpan span;
private ContextSnapshot snapshot;

public AbstractSpan getSpan() {
return span;
}

public RedisCommandEnhanceInfo setSpan(AbstractSpan span) {
Comment thread
wuwen5 marked this conversation as resolved.
Outdated
this.span = span;
return this;
}

public ContextSnapshot getSnapshot() {
return snapshot;
}

public RedisCommandEnhanceInfo setSnapshot(ContextSnapshot snapshot) {
this.snapshot = snapshot;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.lettuce.common;

import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;

/**
* Interceptor for RedisSubscription constructor.
* <p>
* This interceptor captures the {@link io.lettuce.core.protocol.RedisCommand} instance
* at subscription construction time and stores it into SkyWalking dynamic field.
* </p>
*/
public class RedisSubscriptionConstructorInterceptor implements InstanceConstructorInterceptor {

@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
// allArguments[1] is the RedisCommand passed to the RedisSubscription constructor
objInst.setSkyWalkingDynamicField(allArguments[1]);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.lettuce.common;

import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.InstanceMethodsAroundInterceptorV2;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.MethodInvocationContext;
import reactor.core.CoreSubscriber;

import java.lang.reflect.Method;

/**
* Interceptor for {@code RedisPublisher.RedisSubscription#subscribe(Subscriber)} method.
*
* <p>
* This interceptor works together with the constructor interceptor of
* {@code RedisSubscription}:
* </p>
*/
public class RedisSubscriptionSubscribeMethodInterceptor implements InstanceMethodsAroundInterceptorV2 {

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInvocationContext context) {
if (allArguments[0] instanceof CoreSubscriber) {
CoreSubscriber<?> subscriber = (CoreSubscriber<?>) allArguments[0];
// get ContextSnapshot from reactor context, the snapshot is set to reactor context by any other plugin
// such as DispatcherHandlerHandleMethodInterceptor in spring-webflux-5.x-plugin
Object skywalkingContextSnapshot = subscriber.currentContext().getOrDefault("SKYWALKING_CONTEXT_SNAPSHOT", null);
if (skywalkingContextSnapshot != null) {
((EnhancedInstance) objInst.getSkyWalkingDynamicField()).setSkyWalkingDynamicField(new RedisCommandEnhanceInfo()
.setSnapshot((ContextSnapshot) skywalkingContextSnapshot));
}
}
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret, MethodInvocationContext context) {
return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t, MethodInvocationContext context) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.lettuce.common.define;

import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.ClassInstanceMethodsEnhancePluginDefineV2;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.v2.InstanceMethodsInterceptV2Point;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;

import static net.bytebuddy.matcher.ElementMatchers.any;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;

/**
*
*/
public class RedisSubscriptionInstrumentation extends ClassInstanceMethodsEnhancePluginDefineV2 {

private static final String ENHANCE_CLASS = "io.lettuce.core.RedisPublisher$RedisSubscription";

private static final String REDIS_SUBSCRIPTION_SUBSCRIBE_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.lettuce.common.RedisSubscriptionSubscribeMethodInterceptor";
private static final String REDIS_SUBSCRIPTION_CONST_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.lettuce.common.RedisSubscriptionConstructorInterceptor";

@Override
public InstanceMethodsInterceptV2Point[] getInstanceMethodsInterceptV2Points() {
return new InstanceMethodsInterceptV2Point[]{
new InstanceMethodsInterceptV2Point() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("subscribe");
}

@Override
public String getMethodsInterceptorV2() {
return REDIS_SUBSCRIPTION_SUBSCRIBE_METHOD_INTERCEPTOR;
}

@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}

@Override
public ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}

@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return any().and(takesArgument(1, named("io.lettuce.core.protocol.RedisCommand")));
}

@Override
public String getConstructorInterceptor() {
return REDIS_SUBSCRIPTION_CONST_METHOD_INTERCEPTOR;
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
# limitations under the License.

lettuce-common=org.apache.skywalking.apm.plugin.lettuce.common.define.DefaultEndpointInstrumentation
lettuce-common=org.apache.skywalking.apm.plugin.lettuce.common.define.RedisCommandInstrumentation
lettuce-common=org.apache.skywalking.apm.plugin.lettuce.common.define.RedisCommandInstrumentation
lettuce-common=org.apache.skywalking.apm.plugin.lettuce.common.define.RedisSubscriptionInstrumentation
Loading
Loading