Skip to content

Commit d44376e

Browse files
committed
Reduce flakiness in io.opentelemetry.javaagent.instrumentation.vertx.rx.v3_5.server.VertxReactivePropagationTest.highConcurrency()
Automated fix attempt based on Develocity flaky-test analysis.
1 parent 723a8a0 commit d44376e

3 files changed

Lines changed: 90 additions & 57 deletions

File tree

instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/rx/v3_5/server/VertxReactivePropagationTest.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@
3535
import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_SYSTEM;
3636
import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_USER;
3737
import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DbSystemNameIncubatingValues.HSQLDB;
38+
import static java.util.concurrent.TimeUnit.SECONDS;
3839
import static org.assertj.core.api.Assertions.assertThat;
3940

4041
import io.opentelemetry.api.GlobalOpenTelemetry;
4142
import io.opentelemetry.api.trace.Span;
4243
import io.opentelemetry.api.trace.SpanKind;
4344
import io.opentelemetry.context.Context;
45+
import io.opentelemetry.context.Scope;
4446
import io.opentelemetry.context.propagation.TextMapPropagator;
4547
import io.opentelemetry.context.propagation.TextMapSetter;
4648
import io.opentelemetry.instrumentation.test.utils.PortUtils;
@@ -59,6 +61,7 @@
5961
import java.util.concurrent.ExecutionException;
6062
import java.util.concurrent.ExecutorService;
6163
import java.util.concurrent.Executors;
64+
import java.util.concurrent.Future;
6265
import java.util.concurrent.TimeoutException;
6366
import java.util.function.Consumer;
6467
import org.junit.jupiter.api.BeforeAll;
@@ -149,7 +152,7 @@ void contextPropagation() {
149152

150153
@SuppressWarnings("deprecation") // uses deprecated db semconv
151154
@Test
152-
void highConcurrency() {
155+
void highConcurrency() throws Exception {
153156
int count = 100;
154157
String baseUrl = "/listProducts";
155158
CountDownLatch latch = new CountDownLatch(1);
@@ -160,29 +163,37 @@ void highConcurrency() {
160163
TextMapSetter<HttpRequestBuilder> setter =
161164
(carrier, name, value) -> carrier.header(name, value);
162165

166+
List<Future<?>> futures = new ArrayList<>();
163167
for (int i = 0; i < count; i++) {
164168
int index = i;
165-
pool.submit(
166-
() -> {
167-
try {
168-
latch.await();
169-
} catch (InterruptedException e) {
170-
Thread.currentThread().interrupt();
171-
}
172-
testing.runWithSpan(
173-
"client " + index,
174-
() -> {
175-
HttpRequestBuilder builder =
176-
HttpRequest.builder()
177-
.get(baseUrl + "?" + TEST_REQUEST_ID_PARAMETER + "=" + index);
178-
Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index);
179-
propagator.inject(Context.current(), builder, setter);
180-
client.execute(builder.build()).aggregate().join();
181-
});
182-
});
169+
futures.add(
170+
pool.submit(
171+
() -> {
172+
try {
173+
assertThat(latch.await(10, SECONDS)).isTrue();
174+
} catch (InterruptedException e) {
175+
Thread.currentThread().interrupt();
176+
throw new AssertionError(e);
177+
}
178+
try (Scope ignored = Context.root().makeCurrent()) {
179+
testing.runWithSpan(
180+
"client " + index,
181+
() -> {
182+
HttpRequestBuilder builder =
183+
HttpRequest.builder()
184+
.get(baseUrl + "?" + TEST_REQUEST_ID_PARAMETER + "=" + index);
185+
Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index);
186+
propagator.inject(Context.current(), builder, setter);
187+
client.execute(builder.build()).aggregate().join();
188+
});
189+
}
190+
}));
183191
}
184192

185193
latch.countDown();
194+
for (Future<?> future : futures) {
195+
future.get(30, SECONDS);
196+
}
186197

187198
List<Consumer<TraceAssert>> assertions = new ArrayList<>();
188199
for (int i = 0; i < count; i++) {

instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/rx/v3_5/server/VertxReactivePropagationTest.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@
3535
import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_SYSTEM;
3636
import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_USER;
3737
import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DbSystemNameIncubatingValues.HSQLDB;
38+
import static java.util.concurrent.TimeUnit.SECONDS;
3839
import static org.assertj.core.api.Assertions.assertThat;
3940

4041
import io.opentelemetry.api.GlobalOpenTelemetry;
4142
import io.opentelemetry.api.trace.Span;
4243
import io.opentelemetry.api.trace.SpanKind;
4344
import io.opentelemetry.context.Context;
45+
import io.opentelemetry.context.Scope;
4446
import io.opentelemetry.context.propagation.TextMapPropagator;
4547
import io.opentelemetry.context.propagation.TextMapSetter;
4648
import io.opentelemetry.instrumentation.test.utils.PortUtils;
@@ -59,6 +61,7 @@
5961
import java.util.concurrent.ExecutionException;
6062
import java.util.concurrent.ExecutorService;
6163
import java.util.concurrent.Executors;
64+
import java.util.concurrent.Future;
6265
import java.util.concurrent.TimeoutException;
6366
import java.util.function.Consumer;
6467
import org.junit.jupiter.api.BeforeAll;
@@ -149,7 +152,7 @@ void contextPropagation() {
149152

150153
@SuppressWarnings("deprecation") // uses deprecated db semconv
151154
@Test
152-
void highConcurrency() {
155+
void highConcurrency() throws Exception {
153156
int count = 100;
154157
String baseUrl = "/listProducts";
155158
CountDownLatch latch = new CountDownLatch(1);
@@ -160,29 +163,37 @@ void highConcurrency() {
160163
TextMapSetter<HttpRequestBuilder> setter =
161164
(carrier, name, value) -> carrier.header(name, value);
162165

166+
List<Future<?>> futures = new ArrayList<>();
163167
for (int i = 0; i < count; i++) {
164168
int index = i;
165-
pool.submit(
166-
() -> {
167-
try {
168-
latch.await();
169-
} catch (InterruptedException e) {
170-
Thread.currentThread().interrupt();
171-
}
172-
testing.runWithSpan(
173-
"client " + index,
174-
() -> {
175-
HttpRequestBuilder builder =
176-
HttpRequest.builder()
177-
.get(baseUrl + "?" + TEST_REQUEST_ID_PARAMETER + "=" + index);
178-
Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index);
179-
propagator.inject(Context.current(), builder, setter);
180-
client.execute(builder.build()).aggregate().join();
181-
});
182-
});
169+
futures.add(
170+
pool.submit(
171+
() -> {
172+
try {
173+
assertThat(latch.await(10, SECONDS)).isTrue();
174+
} catch (InterruptedException e) {
175+
Thread.currentThread().interrupt();
176+
throw new AssertionError(e);
177+
}
178+
try (Scope ignored = Context.root().makeCurrent()) {
179+
testing.runWithSpan(
180+
"client " + index,
181+
() -> {
182+
HttpRequestBuilder builder =
183+
HttpRequest.builder()
184+
.get(baseUrl + "?" + TEST_REQUEST_ID_PARAMETER + "=" + index);
185+
Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index);
186+
propagator.inject(Context.current(), builder, setter);
187+
client.execute(builder.build()).aggregate().join();
188+
});
189+
}
190+
}));
183191
}
184192

185193
latch.countDown();
194+
for (Future<?> future : futures) {
195+
future.get(30, SECONDS);
196+
}
186197

187198
List<Consumer<TraceAssert>> assertions = new ArrayList<>();
188199
for (int i = 0; i < count; i++) {

instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/rx/v3_5/server/VertxReactivePropagationTest.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@
3535
import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_SYSTEM;
3636
import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_USER;
3737
import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DbSystemNameIncubatingValues.HSQLDB;
38+
import static java.util.concurrent.TimeUnit.SECONDS;
3839
import static org.assertj.core.api.Assertions.assertThat;
3940

4041
import io.opentelemetry.api.GlobalOpenTelemetry;
4142
import io.opentelemetry.api.trace.Span;
4243
import io.opentelemetry.api.trace.SpanKind;
4344
import io.opentelemetry.context.Context;
45+
import io.opentelemetry.context.Scope;
4446
import io.opentelemetry.context.propagation.TextMapPropagator;
4547
import io.opentelemetry.context.propagation.TextMapSetter;
4648
import io.opentelemetry.instrumentation.test.utils.PortUtils;
@@ -59,6 +61,7 @@
5961
import java.util.concurrent.ExecutionException;
6062
import java.util.concurrent.ExecutorService;
6163
import java.util.concurrent.Executors;
64+
import java.util.concurrent.Future;
6265
import java.util.concurrent.TimeoutException;
6366
import java.util.function.Consumer;
6467
import org.junit.jupiter.api.BeforeAll;
@@ -149,7 +152,7 @@ void contextPropagation() {
149152

150153
@SuppressWarnings("deprecation") // uses deprecated db semconv
151154
@Test
152-
void highConcurrency() {
155+
void highConcurrency() throws Exception {
153156
int count = 100;
154157
String baseUrl = "/listProducts";
155158
CountDownLatch latch = new CountDownLatch(1);
@@ -160,29 +163,37 @@ void highConcurrency() {
160163
TextMapSetter<HttpRequestBuilder> setter =
161164
(carrier, name, value) -> carrier.header(name, value);
162165

166+
List<Future<?>> futures = new ArrayList<>();
163167
for (int i = 0; i < count; i++) {
164168
int index = i;
165-
pool.submit(
166-
() -> {
167-
try {
168-
latch.await();
169-
} catch (InterruptedException e) {
170-
Thread.currentThread().interrupt();
171-
}
172-
testing.runWithSpan(
173-
"client " + index,
174-
() -> {
175-
HttpRequestBuilder builder =
176-
HttpRequest.builder()
177-
.get(baseUrl + "?" + TEST_REQUEST_ID_PARAMETER + "=" + index);
178-
Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index);
179-
propagator.inject(Context.current(), builder, setter);
180-
client.execute(builder.build()).aggregate().join();
181-
});
182-
});
169+
futures.add(
170+
pool.submit(
171+
() -> {
172+
try {
173+
assertThat(latch.await(10, SECONDS)).isTrue();
174+
} catch (InterruptedException e) {
175+
Thread.currentThread().interrupt();
176+
throw new AssertionError(e);
177+
}
178+
try (Scope ignored = Context.root().makeCurrent()) {
179+
testing.runWithSpan(
180+
"client " + index,
181+
() -> {
182+
HttpRequestBuilder builder =
183+
HttpRequest.builder()
184+
.get(baseUrl + "?" + TEST_REQUEST_ID_PARAMETER + "=" + index);
185+
Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index);
186+
propagator.inject(Context.current(), builder, setter);
187+
client.execute(builder.build()).aggregate().join();
188+
});
189+
}
190+
}));
183191
}
184192

185193
latch.countDown();
194+
for (Future<?> future : futures) {
195+
future.get(30, SECONDS);
196+
}
186197

187198
List<Consumer<TraceAssert>> assertions = new ArrayList<>();
188199
for (int i = 0; i < count; i++) {

0 commit comments

Comments
 (0)