diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/rx/v3_5/server/VertxReactivePropagationTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/rx/v3_5/server/VertxReactivePropagationTest.java index d9af4ef43811..c71079950851 100644 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/rx/v3_5/server/VertxReactivePropagationTest.java +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version35Test/java/io/opentelemetry/javaagent/instrumentation/vertx/rx/v3_5/server/VertxReactivePropagationTest.java @@ -35,12 +35,14 @@ import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_SYSTEM; import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_USER; import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DbSystemNameIncubatingValues.HSQLDB; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.context.propagation.TextMapSetter; import io.opentelemetry.instrumentation.test.utils.PortUtils; @@ -59,6 +61,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import org.junit.jupiter.api.BeforeAll; @@ -149,7 +152,7 @@ void contextPropagation() { @SuppressWarnings("deprecation") // uses deprecated db semconv @Test - void highConcurrency() { + void highConcurrency() throws Exception { int count = 100; String baseUrl = "/listProducts"; CountDownLatch latch = new CountDownLatch(1); @@ -160,29 +163,37 @@ void highConcurrency() { TextMapSetter setter = (carrier, name, value) -> carrier.header(name, value); + List> futures = new ArrayList<>(); for (int i = 0; i < count; i++) { int index = i; - pool.submit( - () -> { - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - testing.runWithSpan( - "client " + index, - () -> { - HttpRequestBuilder builder = - HttpRequest.builder() - .get(baseUrl + "?" + TEST_REQUEST_ID_PARAMETER + "=" + index); - Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index); - propagator.inject(Context.current(), builder, setter); - client.execute(builder.build()).aggregate().join(); - }); - }); + futures.add( + pool.submit( + () -> { + try { + assertThat(latch.await(10, SECONDS)).isTrue(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } + try (Scope ignored = Context.root().makeCurrent()) { + testing.runWithSpan( + "client " + index, + () -> { + HttpRequestBuilder builder = + HttpRequest.builder() + .get(baseUrl + "?" + TEST_REQUEST_ID_PARAMETER + "=" + index); + Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index); + propagator.inject(Context.current(), builder, setter); + client.execute(builder.build()).aggregate().join(); + }); + } + })); } latch.countDown(); + for (Future future : futures) { + future.get(30, SECONDS); + } List> assertions = new ArrayList<>(); for (int i = 0; i < count; i++) { diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/rx/v3_5/server/VertxReactivePropagationTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/rx/v3_5/server/VertxReactivePropagationTest.java index d9af4ef43811..c71079950851 100644 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/rx/v3_5/server/VertxReactivePropagationTest.java +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version41Test/java/io/opentelemetry/javaagent/instrumentation/vertx/rx/v3_5/server/VertxReactivePropagationTest.java @@ -35,12 +35,14 @@ import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_SYSTEM; import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_USER; import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DbSystemNameIncubatingValues.HSQLDB; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.context.propagation.TextMapSetter; import io.opentelemetry.instrumentation.test.utils.PortUtils; @@ -59,6 +61,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import org.junit.jupiter.api.BeforeAll; @@ -149,7 +152,7 @@ void contextPropagation() { @SuppressWarnings("deprecation") // uses deprecated db semconv @Test - void highConcurrency() { + void highConcurrency() throws Exception { int count = 100; String baseUrl = "/listProducts"; CountDownLatch latch = new CountDownLatch(1); @@ -160,29 +163,37 @@ void highConcurrency() { TextMapSetter setter = (carrier, name, value) -> carrier.header(name, value); + List> futures = new ArrayList<>(); for (int i = 0; i < count; i++) { int index = i; - pool.submit( - () -> { - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - testing.runWithSpan( - "client " + index, - () -> { - HttpRequestBuilder builder = - HttpRequest.builder() - .get(baseUrl + "?" + TEST_REQUEST_ID_PARAMETER + "=" + index); - Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index); - propagator.inject(Context.current(), builder, setter); - client.execute(builder.build()).aggregate().join(); - }); - }); + futures.add( + pool.submit( + () -> { + try { + assertThat(latch.await(10, SECONDS)).isTrue(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } + try (Scope ignored = Context.root().makeCurrent()) { + testing.runWithSpan( + "client " + index, + () -> { + HttpRequestBuilder builder = + HttpRequest.builder() + .get(baseUrl + "?" + TEST_REQUEST_ID_PARAMETER + "=" + index); + Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index); + propagator.inject(Context.current(), builder, setter); + client.execute(builder.build()).aggregate().join(); + }); + } + })); } latch.countDown(); + for (Future future : futures) { + future.get(30, SECONDS); + } List> assertions = new ArrayList<>(); for (int i = 0; i < count; i++) { diff --git a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/rx/v3_5/server/VertxReactivePropagationTest.java b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/rx/v3_5/server/VertxReactivePropagationTest.java index 9a14b2f02289..052f1485e6af 100644 --- a/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/rx/v3_5/server/VertxReactivePropagationTest.java +++ b/instrumentation/vertx/vertx-rx-java-3.5/javaagent/src/version5Test/java/io/opentelemetry/javaagent/instrumentation/vertx/rx/v3_5/server/VertxReactivePropagationTest.java @@ -35,12 +35,14 @@ import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_SYSTEM; import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_USER; import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DbSystemNameIncubatingValues.HSQLDB; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.context.propagation.TextMapSetter; import io.opentelemetry.instrumentation.test.utils.PortUtils; @@ -59,6 +61,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import org.junit.jupiter.api.BeforeAll; @@ -149,7 +152,7 @@ void contextPropagation() { @SuppressWarnings("deprecation") // uses deprecated db semconv @Test - void highConcurrency() { + void highConcurrency() throws Exception { int count = 100; String baseUrl = "/listProducts"; CountDownLatch latch = new CountDownLatch(1); @@ -160,29 +163,37 @@ void highConcurrency() { TextMapSetter setter = (carrier, name, value) -> carrier.header(name, value); + List> futures = new ArrayList<>(); for (int i = 0; i < count; i++) { int index = i; - pool.submit( - () -> { - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - testing.runWithSpan( - "client " + index, - () -> { - HttpRequestBuilder builder = - HttpRequest.builder() - .get(baseUrl + "?" + TEST_REQUEST_ID_PARAMETER + "=" + index); - Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index); - propagator.inject(Context.current(), builder, setter); - client.execute(builder.build()).aggregate().join(); - }); - }); + futures.add( + pool.submit( + () -> { + try { + assertThat(latch.await(10, SECONDS)).isTrue(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } + try (Scope ignored = Context.root().makeCurrent()) { + testing.runWithSpan( + "client " + index, + () -> { + HttpRequestBuilder builder = + HttpRequest.builder() + .get(baseUrl + "?" + TEST_REQUEST_ID_PARAMETER + "=" + index); + Span.current().setAttribute(TEST_REQUEST_ID_ATTRIBUTE, index); + propagator.inject(Context.current(), builder, setter); + client.execute(builder.build()).aggregate().join(); + }); + } + })); } latch.countDown(); + for (Future future : futures) { + future.get(30, SECONDS); + } List> assertions = new ArrayList<>(); for (int i = 0; i < count; i++) {