Skip to content

Commit 6daaf4d

Browse files
YaoYingLonglaurit
andauthored
Fix the bug where Redisson loses context during command execution. (#18701)
Co-authored-by: Lauri Tulmin <ltulmin@splunk.com>
1 parent 0f6b98d commit 6daaf4d

8 files changed

Lines changed: 259 additions & 12 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.redisson.v3_0;
7+
8+
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
9+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
10+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
11+
import static net.bytebuddy.matcher.ElementMatchers.named;
12+
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
13+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
14+
15+
import io.opentelemetry.context.Context;
16+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
17+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
18+
import io.opentelemetry.javaagent.instrumentation.redisson.common.v3_0.ContextPropagatingCompletableFuture;
19+
import java.util.concurrent.CompletableFuture;
20+
import javax.annotation.Nullable;
21+
import net.bytebuddy.asm.Advice;
22+
import net.bytebuddy.asm.Advice.AssignReturned;
23+
import net.bytebuddy.description.type.TypeDescription;
24+
import net.bytebuddy.implementation.bytecode.assign.Assigner;
25+
import net.bytebuddy.matcher.ElementMatcher;
26+
import org.redisson.api.RFuture;
27+
28+
class ConnectionManagerConnectionFutureInstrumentation implements TypeInstrumentation {
29+
30+
@Override
31+
public ElementMatcher<TypeDescription> typeMatcher() {
32+
return implementsInterface(named("org.redisson.connection.ConnectionManager"));
33+
}
34+
35+
@Override
36+
public ElementMatcher<ClassLoader> classLoaderOptimization() {
37+
return hasClassesNamed("org.redisson.connection.ConnectionManager");
38+
}
39+
40+
@Override
41+
public void transform(TypeTransformer transformer) {
42+
transformer.applyAdviceToMethod(
43+
namedOneOf("connectionReadOp", "connectionWriteOp").and(takesArguments(2)),
44+
getClass().getName() + "$WrapConnectionFutureAdvice");
45+
}
46+
47+
@SuppressWarnings("unused")
48+
public static class WrapConnectionFutureAdvice {
49+
50+
@AssignReturned.ToReturned(typing = Assigner.Typing.DYNAMIC)
51+
@Advice.OnMethodExit(suppress = Throwable.class, inline = false)
52+
@Nullable
53+
public static Object onExit(
54+
@Advice.Return(typing = Assigner.Typing.DYNAMIC) @Nullable Object future) {
55+
Context context = currentContext();
56+
if (future instanceof RFuture) {
57+
return ContextPropagatingRFuture.wrap((RFuture<?>) future, context);
58+
}
59+
if (future instanceof CompletableFuture) {
60+
return ContextPropagatingCompletableFuture.wrap((CompletableFuture<?>) future, context);
61+
}
62+
return future;
63+
}
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.redisson.v3_0;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
import org.redisson.api.RFuture;
11+
import org.redisson.misc.RedissonPromise;
12+
13+
public class ContextPropagatingRFuture<T> extends RedissonPromise<T> {
14+
15+
private ContextPropagatingRFuture(RFuture<T> delegate, Context context) {
16+
delegate.whenComplete(
17+
(result, error) -> {
18+
try (Scope ignored = context.makeCurrent()) {
19+
if (delegate.isCancelled()) {
20+
cancel(false);
21+
} else if (error != null) {
22+
tryFailure(error);
23+
} else {
24+
trySuccess(result);
25+
}
26+
}
27+
});
28+
}
29+
30+
public static <T> RFuture<T> wrap(RFuture<T> delegate, Context context) {
31+
if (context == Context.root() || delegate instanceof ContextPropagatingRFuture) {
32+
return delegate;
33+
}
34+
return new ContextPropagatingRFuture<>(delegate, context);
35+
}
36+
}

instrumentation/redisson/redisson-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/redisson/v3_0/RedissonInstrumentationModule.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
3030

3131
@Override
3232
public List<TypeInstrumentation> typeInstrumentations() {
33-
return asList(new RedisConnectionInstrumentation(), new RedisCommandDataInstrumentation());
33+
return asList(
34+
new ConnectionManagerConnectionFutureInstrumentation(),
35+
new RedisConnectionInstrumentation(),
36+
new RedisCommandDataInstrumentation());
3437
}
3538
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.redisson.v3_17;
7+
8+
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
9+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.extendsClass;
10+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
11+
import static net.bytebuddy.matcher.ElementMatchers.named;
12+
import static net.bytebuddy.matcher.ElementMatchers.returns;
13+
14+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
15+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
16+
import io.opentelemetry.javaagent.instrumentation.redisson.common.v3_0.ContextPropagatingCompletableFuture;
17+
import java.util.concurrent.CompletableFuture;
18+
import net.bytebuddy.asm.Advice;
19+
import net.bytebuddy.asm.Advice.AssignReturned;
20+
import net.bytebuddy.description.type.TypeDescription;
21+
import net.bytebuddy.matcher.ElementMatcher;
22+
23+
class RedisExecutorConnectionFutureInstrumentation implements TypeInstrumentation {
24+
25+
@Override
26+
public ElementMatcher<TypeDescription> typeMatcher() {
27+
return extendsClass(named("org.redisson.command.RedisExecutor"));
28+
}
29+
30+
@Override
31+
public ElementMatcher<ClassLoader> classLoaderOptimization() {
32+
return hasClassesNamed("org.redisson.command.RedisExecutor");
33+
}
34+
35+
@Override
36+
public void transform(TypeTransformer transformer) {
37+
transformer.applyAdviceToMethod(
38+
named("getConnection").and(returns(CompletableFuture.class)),
39+
getClass().getName() + "$WrapConnectionFutureAdvice");
40+
}
41+
42+
@SuppressWarnings("unused")
43+
public static class WrapConnectionFutureAdvice {
44+
45+
@AssignReturned.ToReturned
46+
@Advice.OnMethodExit(suppress = Throwable.class, inline = false)
47+
public static CompletableFuture<?> onExit(@Advice.Return CompletableFuture<?> future) {
48+
return ContextPropagatingCompletableFuture.wrap(future, currentContext());
49+
}
50+
}
51+
}

instrumentation/redisson/redisson-3.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/redisson/v3_17/RedissonInstrumentationModule.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
2929

3030
@Override
3131
public List<TypeInstrumentation> typeInstrumentations() {
32-
return asList(new RedisConnectionInstrumentation(), new RedisCommandDataInstrumentation());
32+
return asList(
33+
new RedisExecutorConnectionFutureInstrumentation(),
34+
new RedisConnectionInstrumentation(),
35+
new RedisCommandDataInstrumentation());
3336
}
3437
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.redisson.common.v3_0;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
import java.util.concurrent.CompletableFuture;
11+
12+
public final class ContextPropagatingCompletableFuture<T> extends CompletableFuture<T> {
13+
14+
private ContextPropagatingCompletableFuture(CompletableFuture<T> delegate, Context context) {
15+
delegate.whenComplete(
16+
(result, error) -> {
17+
try (Scope ignored = context.makeCurrent()) {
18+
if (delegate.isCancelled()) {
19+
cancel(false);
20+
} else if (error != null) {
21+
completeExceptionally(error);
22+
} else {
23+
complete(result);
24+
}
25+
}
26+
});
27+
}
28+
29+
public static <T> CompletableFuture<T> wrap(CompletableFuture<T> delegate, Context context) {
30+
if (context == Context.root() || delegate instanceof ContextPropagatingCompletableFuture) {
31+
return delegate;
32+
}
33+
return new ContextPropagatingCompletableFuture<>(delegate, context);
34+
}
35+
}

instrumentation/redisson/redisson-common-3.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/redisson/AbstractRedissonAsyncClientTest.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@
4242
import org.junit.jupiter.api.Assumptions;
4343
import org.junit.jupiter.api.BeforeAll;
4444
import org.junit.jupiter.api.BeforeEach;
45+
import org.junit.jupiter.api.Tag;
4546
import org.junit.jupiter.api.Test;
47+
import org.junit.jupiter.api.TestInfo;
4648
import org.junit.jupiter.api.TestInstance;
4749
import org.junit.jupiter.api.extension.RegisterExtension;
4850
import org.redisson.Redisson;
@@ -65,15 +67,14 @@ public abstract class AbstractRedissonAsyncClientTest {
6567
@RegisterExtension
6668
protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
6769

70+
private static final String TEST_RECONNECT = "testReconnect";
6871
private static final Duration TIMEOUT = Duration.ofSeconds(30);
6972

7073
private final GenericContainer<?> redisServer =
7174
new GenericContainer<>("redis:6.2.3-alpine").withExposedPorts(6379);
7275

7376
private String ip;
74-
7577
private int port;
76-
7778
private String address;
7879
private RedissonClient redisson;
7980

@@ -91,7 +92,7 @@ void cleanupAll() {
9192
}
9293

9394
@BeforeEach
94-
void setup() throws InvocationTargetException, IllegalAccessException {
95+
void setup(TestInfo testInfo) throws InvocationTargetException, IllegalAccessException {
9596
String newAddress = address;
9697
if (useRedisProtocol()) {
9798
// Newer versions of redisson require scheme, older versions forbid it
@@ -101,6 +102,11 @@ void setup() throws InvocationTargetException, IllegalAccessException {
101102
SingleServerConfig singleServerConfig = config.useSingleServer();
102103
singleServerConfig.setAddress(newAddress);
103104
singleServerConfig.setTimeout(30_000);
105+
if (testInfo.getTags().contains(TEST_RECONNECT)) {
106+
// When verifying the futureCallback test case, simulate reconnection during Redis command
107+
// execution.
108+
singleServerConfig.setConnectionMinimumIdleSize(0);
109+
}
104110
try {
105111
// disable connection ping if it exists
106112
singleServerConfig
@@ -144,23 +150,24 @@ void futureSet() {
144150
}
145151

146152
@Test
147-
void futureWhenComplete() {
153+
@Tag(TEST_RECONNECT)
154+
void futureCallback() {
148155
RSet<String> set = redisson.getSet("set1");
149156
CompletionStage<Boolean> result =
150157
testing.runWithSpan(
151158
"parent",
152159
() -> {
153160
RFuture<Boolean> future = set.addAsync("s1");
154-
return future.whenComplete(
155-
(res, throwable) -> {
161+
return future.thenApply(
162+
res -> {
156163
assertThat(Span.current().getSpanContext().isValid()).isTrue();
157164
testing.runWithSpan("callback", () -> {});
165+
return res;
158166
});
159167
});
160168
assertThat(result.toCompletableFuture()).succeedsWithin(TIMEOUT);
161169

162-
testing.waitAndAssertSortedTraces(
163-
orderByRootSpanName("parent", "SADD", "callback"),
170+
testing.waitAndAssertTraces(
164171
trace ->
165172
trace.hasSpansSatisfyingExactly(
166173
span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(),

instrumentation/redisson/redisson-common-3.0/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/redisson/AbstractRedissonClientTest.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@
4646
import org.junit.jupiter.api.Assumptions;
4747
import org.junit.jupiter.api.BeforeAll;
4848
import org.junit.jupiter.api.BeforeEach;
49+
import org.junit.jupiter.api.Tag;
4950
import org.junit.jupiter.api.Test;
51+
import org.junit.jupiter.api.TestInfo;
5052
import org.junit.jupiter.api.TestInstance;
5153
import org.junit.jupiter.api.extension.RegisterExtension;
5254
import org.redisson.Redisson;
@@ -75,11 +77,12 @@ public abstract class AbstractRedissonClientTest {
7577
@RegisterExtension
7678
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
7779

80+
private static final String TEST_RECONNECT = "testReconnect";
81+
7882
private final GenericContainer<?> redisServer =
7983
new GenericContainer<>("redis:6.2.3-alpine").withExposedPorts(6379);
8084

8185
private String ip;
82-
8386
private int port;
8487
private String address;
8588
private RedissonClient redisson;
@@ -98,7 +101,7 @@ void cleanupAll() {
98101
}
99102

100103
@BeforeEach
101-
void setup() throws InvocationTargetException, IllegalAccessException {
104+
void setup(TestInfo testInfo) throws InvocationTargetException, IllegalAccessException {
102105
String newAddress = address;
103106
if (useRedisProtocol()) {
104107
// Newer versions of redisson require scheme, older versions forbid it
@@ -115,6 +118,12 @@ void setup() throws InvocationTargetException, IllegalAccessException {
115118
SingleServerConfig singleServerConfig = config.useSingleServer();
116119
singleServerConfig.setAddress(newAddress);
117120
singleServerConfig.setTimeout(30_000);
121+
if (testInfo.getTags().contains(TEST_RECONNECT)) {
122+
// When verifying the stringCommandLazyConnection test case, simulate reconnection during
123+
// Redis
124+
// command execution.
125+
singleServerConfig.setConnectionMinimumIdleSize(0);
126+
}
118127
try {
119128
// disable connection ping if it exists
120129
singleServerConfig
@@ -135,6 +144,44 @@ void cleanup() {
135144
}
136145
}
137146

147+
@Test
148+
@Tag(TEST_RECONNECT)
149+
void stringCommandLazyConnection() {
150+
testing.runWithSpan(
151+
"parent",
152+
() -> {
153+
RBucket<String> keyObject = redisson.getBucket("foo");
154+
keyObject.set("bar");
155+
keyObject.get();
156+
});
157+
testing.waitAndAssertTraces(
158+
trace ->
159+
trace.hasSpansSatisfyingExactly(
160+
span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(),
161+
span ->
162+
span.hasName("SET")
163+
.hasKind(CLIENT)
164+
.hasParent(trace.getSpan(0))
165+
.hasAttributesSatisfyingExactly(
166+
equalTo(NETWORK_TYPE, emitOldDatabaseSemconv() ? IPV4 : null),
167+
equalTo(NETWORK_PEER_ADDRESS, ip),
168+
equalTo(NETWORK_PEER_PORT, port),
169+
equalTo(maybeStable(DB_SYSTEM), REDIS),
170+
equalTo(maybeStable(DB_STATEMENT), "SET foo ?"),
171+
equalTo(maybeStable(DB_OPERATION), "SET")),
172+
span ->
173+
span.hasName("GET")
174+
.hasKind(CLIENT)
175+
.hasParent(trace.getSpan(0))
176+
.hasAttributesSatisfyingExactly(
177+
equalTo(NETWORK_TYPE, emitOldDatabaseSemconv() ? IPV4 : null),
178+
equalTo(NETWORK_PEER_ADDRESS, ip),
179+
equalTo(NETWORK_PEER_PORT, port),
180+
equalTo(maybeStable(DB_SYSTEM), REDIS),
181+
equalTo(maybeStable(DB_STATEMENT), "GET foo"),
182+
equalTo(maybeStable(DB_OPERATION), "GET"))));
183+
}
184+
138185
@Test
139186
void testDurationMetric() {
140187
AtomicReference<String> instrumentationName = new AtomicReference<>();

0 commit comments

Comments
 (0)