From 3dc6bfb0a429cae46455b170000216ffde724801 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Thu, 5 Jun 2025 18:36:23 +0200 Subject: [PATCH 1/2] HTTP server started on a duplicated context shares contextual data See #5589 If the server is started from a duplicated context, the listen context should be the underlying context (unwrapped). And then we should duplicate the unwrapped context. This is to make sure the creating context and the request context are not sharing local data. Signed-off-by: Thomas Segismont --- .../vertx/core/http/impl/HttpServerImpl.java | 4 ++-- .../java/io/vertx/tests/http/HttpTest.java | 23 +++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java index a899b7a4ba4..3828549f30b 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java @@ -185,7 +185,7 @@ public synchronized Future listen(SocketAddress address) { ContextInternal listenContext; // Not sure of this if (context.isEventLoopContext()) { - listenContext = context; + listenContext = context.unwrap(); } else { listenContext = context.toBuilder() .withThreadingModel(ThreadingModel.EVENT_LOOP) @@ -197,7 +197,7 @@ public synchronized Future listen(SocketAddress address) { server.exceptionHandler(exceptionHandler); server.connectHandler(so -> { NetSocketImpl soi = (NetSocketImpl) so; - Supplier streamContextSupplier = context::duplicate; + Supplier streamContextSupplier = context.unwrap()::duplicate; String host = address.isInetSocket() ? address.host() : "localhost"; int port = address.port(); String serverOrigin = (tcpOptions.isSsl() ? "https" : "http") + "://" + host + ":" + port; diff --git a/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java b/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java index dcb1866a77a..512d364c9c1 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java @@ -7049,4 +7049,27 @@ public void testHttpServerResponseWriteHead() throws Exception { await(); } + @Test + public void testServerStartedFromDuplicatedContext() throws Exception { + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + ContextInternal.LOCAL_MAP.get(duplicated, ConcurrentHashMap::new).put("foo", "bar"); + + server.requestHandler(req -> { + ContextInternal current = ContextInternal.current(); + assertTrue("Not a duplicated context", current.isDuplicate()); + assertNotSame("Request should be handled on a different duplicated context", duplicated, current); + ConcurrentMap localMap = ContextInternal.LOCAL_MAP.get(current, ConcurrentHashMap::new); + assertFalse("Local map shouldn't have an entry for the key 'foo'", localMap.containsKey("foo")); + req.response().end(); + }); + startServer(duplicated); + + client.request(requestOptions) + .compose(HttpClientRequest::send) + .expecting(HttpResponseExpectation.SC_OK) + .onComplete(onSuccess(v -> testComplete())); + + await(); + } } From 7688f6ef317d75cd5335871d1fa3fbafc4dda9ab Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Thu, 12 Jun 2025 15:46:31 +0200 Subject: [PATCH 2/2] More duplicated context unwrapping --- .../datagram/impl/DatagramSocketImpl.java | 9 ++- .../io/vertx/core/dns/impl/DnsClientImpl.java | 7 +-- .../java/io/vertx/core/eventbus/EventBus.java | 8 ++- .../core/eventbus/impl/EventBusImpl.java | 19 +++--- .../core/eventbus/impl/EventBusInternal.java | 7 +++ .../eventbus/impl/MessageConsumerImpl.java | 5 ++ .../core/http/impl/ClientWebSocketImpl.java | 2 +- .../http/impl/Http1xServerConnection.java | 2 +- .../core/http/impl/Http1xServerRequest.java | 15 ++--- .../http/impl/Http1xServerRequestHandler.java | 2 +- .../core/http/impl/HttpChannelConnector.java | 3 + .../vertx/core/http/impl/HttpClientImpl.java | 2 +- .../vertx/core/http/impl/HttpServerImpl.java | 45 +++++++------- .../http/impl/ServerWebSocketHandshaker.java | 12 ++-- .../core/http/impl/WebSocketClientImpl.java | 44 ++----------- .../core/http/impl/WebSocketImplBase.java | 10 ++- .../java/io/vertx/core/impl/VertxImpl.java | 5 +- .../io/vertx/core/internal/VertxInternal.java | 5 ++ .../io/vertx/core/internal/VertxWrapper.java | 3 +- .../vertx/core/net/impl/ConnectionBase.java | 4 ++ .../io/vertx/core/net/impl/NetClientImpl.java | 5 +- .../io/vertx/core/net/impl/NetServerImpl.java | 6 +- .../io/vertx/tests/datagram/DatagramTest.java | 26 ++++++++ .../tests/eventbus/EventBusTestBase.java | 27 ++++++++ .../java/io/vertx/tests/http/HttpTest.java | 61 +++++++++++++++++-- .../io/vertx/tests/http/WebSocketTest.java | 33 ++++++++++ .../test/java/io/vertx/tests/net/NetTest.java | 19 ++++++ .../java/io/vertx/tests/timer/TimerTest.java | 32 ++++++++++ 28 files changed, 303 insertions(+), 115 deletions(-) diff --git a/vertx-core/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java b/vertx-core/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java index a6cdf58a9b2..0ec16a87dd7 100644 --- a/vertx-core/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java @@ -51,7 +51,7 @@ */ public class DatagramSocketImpl implements DatagramSocket, MetricsProvider, Closeable { - public static DatagramSocketImpl create(VertxInternal vertx, CloseFuture closeFuture, DatagramSocketOptions options) { + public static DatagramSocketImpl create(ContextInternal vertx, CloseFuture closeFuture, DatagramSocketOptions options) { DatagramSocketImpl socket = new DatagramSocketImpl(vertx, closeFuture, options); // Make sure object is fully initiliased to avoid race with async registration socket.init(); @@ -65,11 +65,10 @@ public static DatagramSocketImpl create(VertxInternal vertx, CloseFuture closeFu private Handler exceptionHandler; private final CloseFuture closeFuture; - private DatagramSocketImpl(VertxInternal vertx, CloseFuture closeFuture, DatagramSocketOptions options) { - Transport transport = vertx.transport(); + private DatagramSocketImpl(ContextInternal context, CloseFuture closeFuture, DatagramSocketOptions options) { + Transport transport = context.owner().transport(); DatagramChannel channel = transport.datagramChannel(options.isIpV6() ? InternetProtocolFamily.IPv6 : InternetProtocolFamily.IPv4); transport.configure(channel, new DatagramSocketOptions(options)); - ContextInternal context = vertx.getOrCreateContext(); channel.config().setOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true); MaxMessagesRecvByteBufAllocator bufAllocator = channel.config().getRecvByteBufAllocator(); bufAllocator.maxMessagesPerRead(1); @@ -77,7 +76,7 @@ private DatagramSocketImpl(VertxInternal vertx, CloseFuture closeFuture, Datagra if (options.getLogActivity()) { channel.pipeline().addLast("logging", new LoggingHandler(options.getActivityLogDataFormat())); } - VertxMetrics metrics = vertx.metrics(); + VertxMetrics metrics = context.owner().metrics(); this.metrics = metrics != null ? metrics.createDatagramSocketMetrics(options) : null; this.channel = channel; this.context = context; diff --git a/vertx-core/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java b/vertx-core/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java index fb499889e83..3543411903f 100644 --- a/vertx-core/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java @@ -64,8 +64,7 @@ public DnsClientImpl(VertxInternal vertx, DnsClientOptions options) { this.vertx = vertx; } - private DnsNameResolver resolver(ContextInternal ctx, InternetProtocolFamily ipFamily) { - EventLoop el = ctx.nettyEventLoop(); + private DnsNameResolver resolver(EventLoop el, InternetProtocolFamily ipFamily) { DnsNameResolver resolver; synchronized (this) { if (closed) { @@ -169,7 +168,7 @@ public Future> resolveSRV(String name) { private Future> resolveAll(String name, InternetProtocolFamily ipFamily) { Objects.requireNonNull(name); ContextInternal ctx = vertx.getOrCreateContext(); - DnsNameResolver resolver = resolver(ctx, ipFamily); + DnsNameResolver resolver = resolver(ctx.nettyEventLoop(), ipFamily); if (resolver == null) { return ctx.failedFuture("DNS client is closed"); } @@ -200,7 +199,7 @@ private Future> resolveAll(String name, InternetProtocolFamily ipFa private Future> queryAll(String name, DnsRecordType recordType, Function mapper) { Objects.requireNonNull(name); ContextInternal ctx = vertx.getOrCreateContext(); - DnsNameResolver resolver = resolver(ctx, InternetProtocolFamily.IPv4); + DnsNameResolver resolver = resolver(ctx.nettyEventLoop(), InternetProtocolFamily.IPv4); if (resolver == null) { return ctx.failedFuture("DNS client is closed"); } diff --git a/vertx-core/src/main/java/io/vertx/core/eventbus/EventBus.java b/vertx-core/src/main/java/io/vertx/core/eventbus/EventBus.java index cb2e0976556..2aa13da0bb5 100644 --- a/vertx-core/src/main/java/io/vertx/core/eventbus/EventBus.java +++ b/vertx-core/src/main/java/io/vertx/core/eventbus/EventBus.java @@ -21,6 +21,7 @@ import io.vertx.core.eventbus.impl.DefaultSerializableChecker; import io.vertx.core.metrics.Measured; +import java.util.Objects; import java.util.function.Function; import static io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE; @@ -175,7 +176,12 @@ default Future> request(String address, @Nullable Object message) * @param handler the handler that will process the received messages * @return the event bus message consumer */ - MessageConsumer localConsumer(String address, Handler> handler); + default MessageConsumer localConsumer(String address, Handler> handler) { + Objects.requireNonNull(handler, "handler"); + MessageConsumer consumer = localConsumer(address); + consumer.handler(handler); + return consumer; + } /** * Create a message sender against the specified address. diff --git a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index f5acb39438b..e6151c9556b 100644 --- a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -187,7 +187,7 @@ public MessageConsumer consumer(MessageConsumerOptions options, Handler MessageConsumer consumer(String address) { checkStarted(); Objects.requireNonNull(address, "address"); - return new MessageConsumerImpl<>(vertx.getOrCreateContext(), this, address, false, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES); + return new MessageConsumerImpl<>(vertx.getOrCreateContext().unwrap(), this, address, false, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES); } @Override @@ -200,17 +200,18 @@ public MessageConsumer consumer(String address, Handler> handl @Override public MessageConsumer localConsumer(String address) { - checkStarted(); - Objects.requireNonNull(address, "address"); - return new MessageConsumerImpl<>(vertx.getOrCreateContext(), this, address, true, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES); + return localConsumer(vertx.getOrCreateContext(), address); } @Override - public MessageConsumer localConsumer(String address, Handler> handler) { - Objects.requireNonNull(handler, "handler"); - MessageConsumer consumer = localConsumer(address); - consumer.handler(handler); - return consumer; + public MessageConsumer localConsumer(Context context, String address) { + checkStarted(); + Objects.requireNonNull(context, "context"); + Objects.requireNonNull(address, "address"); + if (context.owner() != vertx) { + throw new IllegalArgumentException("Invalid context instance"); + } + return new MessageConsumerImpl<>((ContextInternal) context, this, address, true, MessageConsumerOptions.DEFAULT_MAX_BUFFERED_MESSAGES); } @Override diff --git a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusInternal.java b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusInternal.java index e4b2aaea452..b3ac19968f9 100644 --- a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusInternal.java +++ b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/EventBusInternal.java @@ -11,8 +11,13 @@ package io.vertx.core.eventbus.impl; +import io.vertx.core.Context; import io.vertx.core.Promise; import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.MessageConsumer; +import io.vertx.core.eventbus.MessageConsumerOptions; + +import java.util.Objects; public interface EventBusInternal extends EventBus { @@ -21,6 +26,8 @@ public interface EventBusInternal extends EventBus { */ void start(Promise promise); + MessageConsumer localConsumer(Context context, String address); + /** * Close the event bus and release any resources held. */ diff --git a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java index f9dc75654d3..3e4c976273f 100644 --- a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java @@ -38,6 +38,11 @@ public class MessageConsumerImpl extends HandlerRegistration implements Me MessageConsumerImpl(ContextInternal context, EventBusImpl eventBus, String address, boolean localOnly, int maxBufferedMessages) { super(context, eventBus, address, false); + + if (context.isDuplicate()) { + throw new IllegalArgumentException("Duplicated context are not allowed"); + } + this.localOnly = localOnly; this.result = context.promise(); this.maxBufferedMessages = maxBufferedMessages; diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/ClientWebSocketImpl.java b/vertx-core/src/main/java/io/vertx/core/http/impl/ClientWebSocketImpl.java index 8d9a3a17b2d..3a6ced12ddc 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/ClientWebSocketImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/ClientWebSocketImpl.java @@ -52,7 +52,7 @@ public class ClientWebSocketImpl implements ClientWebSocketInternal { @Override public Future connect(WebSocketConnectOptions options) { - return connect(client.vertx().getOrCreateContext(), options); + return connect(client.vertx().getOrCreateContext().unwrap(), options); } @Override diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java b/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java index ad95de480e1..bfd218a93d2 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java @@ -297,7 +297,7 @@ void createWebSocket(Http1xServerRequest request, PromiseInternal toWebSocket() { - return webSocketHandshake().compose(handshake -> handshake.accept()); - } - - /** - * @return a future of the un-accepted WebSocket - */ - Future webSocketHandshake() { - PromiseInternal promise = context.promise(); - webSocketHandshake(promise); - return promise.future(); + return webSocketHandshake(context.unwrap()).compose(handshake -> handshake.accept()); } /** * Handle the request when a WebSocket upgrade header is present. */ - private void webSocketHandshake(PromiseInternal promise) { + Future webSocketHandshake(ContextInternal context) { + PromiseInternal promise = context.promise(); BufferInternal body = BufferInternal.buffer(); boolean[] failed = new boolean[1]; handler(buff -> { @@ -482,6 +474,7 @@ private void webSocketHandshake(PromiseInternal promis }); // In case we were paused resume(); + return promise.future(); } @Override diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerRequestHandler.java b/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerRequestHandler.java index 30aea6b8018..05ba849a6a0 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerRequestHandler.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerRequestHandler.java @@ -44,7 +44,7 @@ public void handle(HttpServerRequest req) { if (wsHandler != null || wsHandshakeHandler != null) { if (req.headers().contains(UPGRADE, WEBSOCKET, true) && handlers.server.wsAccept()) { // Missing upgrade header + null request handler will be handled when creating the handshake by sending a 400 error - ((Http1xServerRequest)req).webSocketHandshake().onComplete(ar -> { + ((Http1xServerRequest)req).webSocketHandshake(((Http1xServerRequest)req).context.unwrap()).onComplete(ar -> { if (ar.succeeded()) { ServerWebSocketHandshaker handshake = (ServerWebSocketHandshaker) ar.result(); if (wsHandshakeHandler == null) { diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java index 16c5e76b930..caac92d5c91 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java @@ -166,6 +166,9 @@ public Future wrap(ContextInternal context, NetSoc } public Future httpConnect(ContextInternal context) { + if (context.isDuplicate()) { + throw new IllegalArgumentException("Cannot accept duplicate contexts"); + } Promise promise = context.promise(); Future future = promise.future(); // We perform the compose operation before calling connect to be sure that the composition happens diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java index 34b4a17fdea..622b9fd22df 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java @@ -242,7 +242,7 @@ public Future connect(HttpConnectOptions connect) { server, false, 0); - return (Future) connector.httpConnect(vertx.getOrCreateContext()).map(conn -> new UnpooledHttpClientConnection(conn).init()); + return (Future) connector.httpConnect(vertx.getOrCreateContext().unwrap()).map(conn -> new UnpooledHttpClientConnection(conn).init()); } @Override diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java index 3828549f30b..8ba636d7014 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java @@ -169,7 +169,27 @@ public Future listen() { } @Override - public synchronized Future listen(SocketAddress address) { + public Future listen(SocketAddress address) { + ContextInternal context = vertx.getOrCreateContext(); + ContextInternal listenContext; + if (context.isEventLoopContext()) { + listenContext = context.unwrap(); + } else { + listenContext = context.toBuilder() + .withThreadingModel(ThreadingModel.EVENT_LOOP) + .build(); + } + Promise promise = context.promise(); + Supplier streamContextSupplier = context.unwrap()::duplicate; + listen(listenContext, context.threadingModel(), streamContextSupplier, address, promise); + return promise.future(); + } + + private synchronized void listen(ContextInternal listenContext, + ThreadingModel threadingModel, + Supplier streamContextSupplier, + SocketAddress address, + Promise promise) { if (requestHandler == null && webSocketHandler == null && webSocketHandhakeHandler == null) { throw new IllegalStateException("Set request or WebSocket handler first"); } @@ -181,23 +201,12 @@ public synchronized Future listen(SocketAddress address) { if (tcpOptions.getSslOptions() != null) { configureApplicationLayerProtocols(tcpOptions.getSslOptions()); } - ContextInternal context = vertx.getOrCreateContext(); - ContextInternal listenContext; - // Not sure of this - if (context.isEventLoopContext()) { - listenContext = context.unwrap(); - } else { - listenContext = context.toBuilder() - .withThreadingModel(ThreadingModel.EVENT_LOOP) - .build(); - } NetServerInternal server = vertx.createNetServer(tcpOptions); Handler h = exceptionHandler; Handler exceptionHandler = h != null ? h : DEFAULT_EXCEPTION_HANDLER; server.exceptionHandler(exceptionHandler); server.connectHandler(so -> { NetSocketImpl soi = (NetSocketImpl) so; - Supplier streamContextSupplier = context.unwrap()::duplicate; String host = address.isInetSocket() ? address.host() : "localhost"; int port = address.port(); String serverOrigin = (tcpOptions.isSsl() ? "https" : "http") + "://" + host + ":" + port; @@ -211,7 +220,7 @@ public synchronized Future listen(SocketAddress address) { exceptionHandler); HttpServerConnectionInitializer initializer = new HttpServerConnectionInitializer( listenContext, - context.threadingModel(), + threadingModel, streamContextSupplier, this, vertx, @@ -224,15 +233,7 @@ public synchronized Future listen(SocketAddress address) { }); tcpServer = server; closeSequence = new CloseSequence(p -> doClose(server, p), p -> doShutdown(server, p )); - Promise result = context.promise(); - tcpServer.listen(listenContext, address).onComplete(ar -> { - if (ar.succeeded()) { - result.complete(this); - } else { - result.fail(ar.cause()); - } - }); - return result.future(); + tcpServer.listen(listenContext, address).map(this).onComplete(promise); } private void doShutdown(NetServer netServer, Completable p) { diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/ServerWebSocketHandshaker.java b/vertx-core/src/main/java/io/vertx/core/http/impl/ServerWebSocketHandshaker.java index e0903459b45..9fc552f43d4 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/ServerWebSocketHandshaker.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/ServerWebSocketHandshaker.java @@ -52,8 +52,8 @@ public class ServerWebSocketHandshaker extends FutureImpl imple private final WebSocketServerHandshaker handshaker; private boolean done; - public ServerWebSocketHandshaker(Http1xServerRequest request, WebSocketServerHandshaker handshaker, HttpServerOptions options) { - super(request.context); + public ServerWebSocketHandshaker(Http1xServerRequest request, ContextInternal context, WebSocketServerHandshaker handshaker, HttpServerOptions options) { + super(context); this.request = request; this.handshaker = handshaker; this.options = options; @@ -94,7 +94,7 @@ public Future accept() { } ServerWebSocket ws; try { - ws = acceptHandshake(); + ws = acceptHandshake(context); } catch (Exception e) { return rejectHandshake(BAD_REQUEST.code()) .transform(ar -> { @@ -159,7 +159,7 @@ private Future rejectHandshake(int sc) { return response.setStatusCode(sc).end(status.reasonPhrase()); } - private ServerWebSocket acceptHandshake() { + private ServerWebSocket acceptHandshake(ContextInternal context) { Http1xServerConnection httpConn = (Http1xServerConnection) request.connection(); ChannelHandlerContext chctx = httpConn.channelHandlerContext(); Channel channel = chctx.channel(); @@ -175,9 +175,9 @@ private ServerWebSocket acceptHandshake() { } VertxHandler handler = VertxHandler.create(ctx -> { long closingTimeoutMS = options.getWebSocketClosingTimeout() >= 0 ? options.getWebSocketClosingTimeout() * 1000L : 0L; - WebSocketConnectionImpl webSocketConn = new WebSocketConnectionImpl(request.context, ctx, true, closingTimeoutMS,httpConn.metrics); + WebSocketConnectionImpl webSocketConn = new WebSocketConnectionImpl(context, ctx, true, closingTimeoutMS,httpConn.metrics); ServerWebSocketImpl webSocket = new ServerWebSocketImpl( - request.context(), + context, webSocketConn, handshaker.version() != WebSocketVersion.V00, request, diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketClientImpl.java b/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketClientImpl.java index e1cf3db9792..25cd877237b 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketClientImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketClientImpl.java @@ -59,6 +59,9 @@ public Future connect(WebSocketConnectOptions options) { } void webSocket(ContextInternal ctx, WebSocketConnectOptions connectOptions, Promise promise) { + if (ctx.isDuplicate()) { + throw new IllegalArgumentException(); + } int port = getPort(connectOptions); String host = getHost(connectOptions); SocketAddress addr = SocketAddress.inetSocketAddress(port, host); @@ -91,47 +94,10 @@ public Future webSocket(int port, String host, String requestURI) { } public Future webSocket(WebSocketConnectOptions options) { - return webSocket(vertx.getOrCreateContext(), options); - } - - static WebSocketConnectOptions webSocketConnectOptionsAbs(String url, MultiMap headers, WebSocketVersion version, List subProtocols) { - URI uri; - try { - uri = new URI(url); - } catch (URISyntaxException e) { - throw new IllegalArgumentException(e); - } - String scheme = uri.getScheme(); - if (!"ws".equals(scheme) && !"wss".equals(scheme)) { - throw new IllegalArgumentException("Scheme: " + scheme); - } - boolean ssl = scheme.length() == 3; - int port = uri.getPort(); - if (port == -1) port = ssl ? 443 : 80; - StringBuilder relativeUri = new StringBuilder(); - if (uri.getRawPath() != null) { - relativeUri.append(uri.getRawPath()); - } - if (uri.getRawQuery() != null) { - relativeUri.append('?').append(uri.getRawQuery()); - } - if (uri.getRawFragment() != null) { - relativeUri.append('#').append(uri.getRawFragment()); - } - return new WebSocketConnectOptions() - .setHost(uri.getHost()) - .setPort(port).setSsl(ssl) - .setURI(relativeUri.toString()) - .setHeaders(headers) - .setVersion(version) - .setSubProtocols(subProtocols); - } - - public Future webSocketAbs(String url, MultiMap headers, WebSocketVersion version, List subProtocols) { - return webSocket(webSocketConnectOptionsAbs(url, headers, version, subProtocols)); + return webSocket(vertx.getOrCreateContext().unwrap(), options); } - Future webSocket(ContextInternal ctx, WebSocketConnectOptions connectOptions) { + private Future webSocket(ContextInternal ctx, WebSocketConnectOptions connectOptions) { PromiseInternal promise = ctx.promise(); webSocket(ctx, connectOptions, promise); return promise.andThen(ar -> { diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java b/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java index 35bf96ce98f..42f502d2ee9 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java @@ -25,6 +25,7 @@ import io.vertx.core.MultiMap; import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.impl.EventBusInternal; import io.vertx.core.internal.buffer.BufferInternal; import io.vertx.core.eventbus.EventBus; import io.vertx.core.eventbus.Message; @@ -88,6 +89,9 @@ public abstract class WebSocketImplBase implements WebSocke int maxWebSocketFrameSize, int maxWebSocketMessageSize, boolean registerWebSocketWriteHandlers) { + if (context.isDuplicate()) { + throw new IllegalArgumentException(); + } this.supportsContinuation = supportsContinuation; if (registerWebSocketWriteHandlers) { textHandlerID = "__vertx.ws." + UUID.randomUUID(); @@ -117,12 +121,12 @@ protected void handleMessage(WebSocketFrameInternal msg) { this.headers = headers; } - void registerHandler(EventBus eventBus) { + void registerHandler(EventBusInternal eventBus) { if (binaryHandlerID != null) { Handler> binaryHandler = msg -> writeBinaryFrameInternal(msg.body()); Handler> textHandler = msg -> writeTextFrameInternal(msg.body()); - binaryHandlerRegistration = eventBus.localConsumer(binaryHandlerID).handler(binaryHandler); - textHandlerRegistration = eventBus.localConsumer(textHandlerID).handler(textHandler); + binaryHandlerRegistration = eventBus.localConsumer(context, binaryHandlerID).handler(binaryHandler); + textHandlerRegistration = eventBus.localConsumer(context, textHandlerID).handler(textHandler); } } diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java index 7e9f3397bd1..6b662696494 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -338,7 +338,8 @@ public TimeUnit maxEventLoopExecTimeUnit() { @Override public DatagramSocket createDatagramSocket(DatagramSocketOptions options) { CloseFuture closeFuture = new CloseFuture(log); - DatagramSocketImpl so = DatagramSocketImpl.create(this, closeFuture, options); + ContextInternal context = getOrCreateContext().unwrap(); + DatagramSocketImpl so = DatagramSocketImpl.create(context, closeFuture, options); closeFuture.add(so); CloseFuture fut = resolveCloseFuture(); fut.add(closeFuture); @@ -426,7 +427,7 @@ public HttpClientBuilder httpClientBuilder() { return new HttpClientBuilderInternal(this); } - public EventBus eventBus() { + public EventBusInternal eventBus() { return eventBus; } diff --git a/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java b/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java index c432f963398..ac94c7d07fa 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java @@ -14,6 +14,8 @@ import io.netty.channel.EventLoopGroup; import io.vertx.core.*; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.impl.EventBusInternal; import io.vertx.core.impl.*; import io.vertx.core.internal.deployment.DeploymentManager; import io.vertx.core.internal.resolver.NameResolver; @@ -102,6 +104,9 @@ default NetServerInternal createNetServer() { @Override ContextInternal getOrCreateContext(); + @Override + EventBusInternal eventBus(); + EventLoopGroup eventLoopGroup(); EventLoopGroup acceptorEventLoopGroup(); diff --git a/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java b/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java index 8f93268ad00..b0a55cdf099 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java @@ -17,6 +17,7 @@ import io.vertx.core.dns.DnsClient; import io.vertx.core.dns.DnsClientOptions; import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.impl.EventBusInternal; import io.vertx.core.file.FileSystem; import io.vertx.core.http.*; import io.vertx.core.internal.deployment.DeploymentManager; @@ -106,7 +107,7 @@ public FileSystem fileSystem() { } @Override - public EventBus eventBus() { + public EventBusInternal eventBus() { return delegate.eventBus(); } diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/ConnectionBase.java b/vertx-core/src/main/java/io/vertx/core/net/impl/ConnectionBase.java index 14de56b5322..00eec0fd921 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/ConnectionBase.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/ConnectionBase.java @@ -81,6 +81,10 @@ public abstract class ConnectionBase { protected ConnectionBase(ContextInternal context, ChannelHandlerContext chctx) { + if (context.isDuplicate()) { + throw new IllegalArgumentException("Cannot accept duplicate contexts"); + } + PromiseInternal f = context.promise(); chctx .channel() diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java b/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java index f0036c8bdac..d6af716128a 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/NetClientImpl.java @@ -139,7 +139,7 @@ public Future connect(SocketAddress remoteAddress, String serverName) @Override public Future connect(ConnectOptions connectOptions) { - ContextInternal context = vertx.getOrCreateContext(); + ContextInternal context = vertx.getOrCreateContext().unwrap(); Promise promise = context.promise(); connectInternal(connectOptions, options.isRegisterWriteHandler(), promise, context, options.getReconnectAttempts()); return promise.future(); @@ -237,6 +237,9 @@ private void connectInternal(ConnectOptions connectOptions, Promise connectHandler, ContextInternal context, int remainingAttempts) { + if (context.isDuplicate()) { + throw new IllegalArgumentException("Cannot accept duplicate contexts"); + } if (closeSequence.started()) { connectHandler.fail(new IllegalStateException("Client is closed")); } else { diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java b/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java index dbb9c511ebc..150c76dcdac 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/NetServerImpl.java @@ -141,7 +141,8 @@ public Future shutdown(long timeout, TimeUnit unit) { @Override public Future listen(SocketAddress localAddress) { - return listen(vertx.getOrCreateContext(), localAddress); + ContextInternal context = vertx.getOrCreateContext(); + return listen(context.unwrap(), localAddress); } @Override @@ -152,6 +153,9 @@ public Future listen(ContextInternal context, SocketAddress localAddr if (handler == null) { throw new IllegalStateException("Set connect handler first"); } + if (context.isDuplicate()) { + throw new IllegalArgumentException("Duplicate context are not allowed"); + } return bind(context, localAddress).map(this); } diff --git a/vertx-core/src/test/java/io/vertx/tests/datagram/DatagramTest.java b/vertx-core/src/test/java/io/vertx/tests/datagram/DatagramTest.java index 181069ce249..b89cde14951 100644 --- a/vertx-core/src/test/java/io/vertx/tests/datagram/DatagramTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/datagram/DatagramTest.java @@ -16,6 +16,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.datagram.DatagramSocket; import io.vertx.core.datagram.DatagramSocketOptions; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.buffer.BufferInternal; import io.vertx.core.impl.Utils; import io.vertx.core.json.JsonObject; @@ -31,6 +32,7 @@ import java.net.InetAddress; import java.net.NetworkInterface; import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -571,4 +573,28 @@ public void start(Promise startPromise) { })); await(); } + + @Test + public void testSocketWithDuplicatedContext() { + waitFor(2); + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + duplicated.getLocal(ContextInternal.LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); + Promise cont = Promise.promise(); + duplicated.runOnContext(v -> { + peer2 = vertx.createDatagramSocket(new DatagramSocketOptions()); + peer2.exceptionHandler(t -> fail(t.getMessage())); + peer2.handler(packet -> { + assertSame(context, Vertx.currentContext()); + complete(); + }); + peer2.listen(1234, "127.0.0.1").onComplete(cont); + }); + cont.future().await(); + peer1 = vertx.createDatagramSocket(new DatagramSocketOptions()); + peer1 + .send(TestUtils.randomBuffer(128), 1234, "127.0.0.1") + .onComplete(onSuccess(s -> complete())); + await(); + } } diff --git a/vertx-core/src/test/java/io/vertx/tests/eventbus/EventBusTestBase.java b/vertx-core/src/test/java/io/vertx/tests/eventbus/EventBusTestBase.java index 4f5bbf3576c..df8a6de974f 100644 --- a/vertx-core/src/test/java/io/vertx/tests/eventbus/EventBusTestBase.java +++ b/vertx-core/src/test/java/io/vertx/tests/eventbus/EventBusTestBase.java @@ -15,6 +15,7 @@ import io.vertx.core.*; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.*; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.tests.shareddata.AsyncMapTest.SomeClusterSerializableObject; @@ -28,6 +29,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -976,4 +978,29 @@ public void testConsumerUnregistrationContextCallback() throws Exception { }); await(); } + + @Test + public void testConsumerWithDuplicatedContext() { + Vertx[] vertices = vertices(2); + + ContextInternal context = (ContextInternal) vertices[0].getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + duplicated.getLocal(ContextInternal.LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); + + duplicated.runOnContext(v1 -> { + vertices[0] + .eventBus() + .consumer(ADDRESS1, msg -> { + ContextInternal current = ContextInternal.current(); + assertTrue("Not a duplicated context", current.isDuplicate()); + assertNull("Local map shouldn't have an entry for the key 'foo'", current.getLocal(ContextInternal.LOCAL_MAP)); + testComplete(); + }).completion() + .onComplete(onSuccess(v2 -> { + vertices[1].eventBus().send(ADDRESS1, "ping"); + })); + }); + + await(); + } } diff --git a/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java b/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java index 512d364c9c1..47892116307 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java @@ -7050,25 +7050,74 @@ public void testHttpServerResponseWriteHead() throws Exception { } @Test - public void testServerStartedFromDuplicatedContext() throws Exception { + public void testServerWithDuplicatedContext() throws Exception { ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); ContextInternal duplicated = context.duplicate(); - ContextInternal.LOCAL_MAP.get(duplicated, ConcurrentHashMap::new).put("foo", "bar"); + duplicated.getLocal(ContextInternal.LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); server.requestHandler(req -> { ContextInternal current = ContextInternal.current(); assertTrue("Not a duplicated context", current.isDuplicate()); assertNotSame("Request should be handled on a different duplicated context", duplicated, current); - ConcurrentMap localMap = ContextInternal.LOCAL_MAP.get(current, ConcurrentHashMap::new); - assertFalse("Local map shouldn't have an entry for the key 'foo'", localMap.containsKey("foo")); + assertNull("Local map shouldn't have an entry for the key 'foo'", current.getLocal(ContextInternal.LOCAL_MAP)); req.response().end(); }); - startServer(duplicated); + startServer(testAddress, duplicated); client.request(requestOptions) .compose(HttpClientRequest::send) .expecting(HttpResponseExpectation.SC_OK) - .onComplete(onSuccess(v -> testComplete())); + .await(); + } + + @Test + public void testClientPoolWithDuplicatedContext() throws Exception { + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + duplicated.getLocal(ContextInternal.LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); + + server.requestHandler(req -> { + req.response().end(); + }); + startServer(testAddress); + + duplicated.runOnContext(v -> { + client.request(requestOptions) + .compose(req -> { + HttpConnection conn = req.connection(); + conn.closeHandler(v2 -> { + assertFalse("A duplicated context", context.isDuplicate()); + testComplete(); + }); + return req + .send() + .expecting(HttpResponseExpectation.SC_OK) + .onComplete(onSuccess(v2 -> { + conn.close(); + })); + }); + }); + + await(); + } + + @Test + public void testClientConnectWithDuplicatedContext() throws Exception { + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + duplicated.getLocal(ContextInternal.LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); + + server.requestHandler(req -> { + req.response().end(); + }); + startServer(testAddress); + + duplicated.runOnContext(v -> { + client.connect(requestOptions).onComplete(conn -> { + assertSame(duplicated.unwrap(), Vertx.currentContext()); + testComplete(); + }); + }); await(); } diff --git a/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java b/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java index 914b1c5e96d..57acb7054da 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/WebSocketTest.java @@ -36,6 +36,7 @@ import io.vertx.core.http.WebSocketVersion; import io.vertx.core.http.impl.Http1xClientConnection; import io.vertx.core.http.impl.Http1xServerConnection; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.http.WebSocketInternal; import io.vertx.core.http.impl.ws.WebSocketFrameImpl; import io.vertx.core.internal.VertxInternal; @@ -85,6 +86,7 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -4058,4 +4060,35 @@ public void testPoolShouldNotStarveOnConnectError() throws Exception { awaitLatch(latch, 10, TimeUnit.SECONDS); } + + @Test + public void testClientWebSocketWithDuplicatedContext1() { + testClientWebSocketWithDuplicatedContext(() -> client.connect(DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, "/")); + } + + @Test + public void testClientWebSocketWithDuplicatedContext2() { + testClientWebSocketWithDuplicatedContext(() -> client.webSocket().connect(DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, "/")); + } + + private void testClientWebSocketWithDuplicatedContext(Supplier> sup) { + server = vertx.createHttpServer(new HttpServerOptions().setPort(DEFAULT_HTTP_PORT)).webSocketHandler(ws -> { + ws.write(Buffer.buffer("ping")); + }); + server.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST).await(); + client = vertx.createWebSocketClient(); + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + duplicated.runOnContext(v -> { + sup.get() + .onComplete(onSuccess(ws -> { + assertSame(context, Vertx.currentContext()); + ws.handler(data -> { + assertSame(context, Vertx.currentContext()); + testComplete(); + }); + })); + }); + await(); + } } diff --git a/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java b/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java index ac5b1e3e12b..cdb39a331c9 100755 --- a/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/net/NetTest.java @@ -30,6 +30,7 @@ import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.http.*; import io.vertx.core.impl.Utils; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.VertxInternal; import io.vertx.core.internal.buffer.BufferInternal; import io.vertx.core.internal.net.NetClientInternal; @@ -4659,4 +4660,22 @@ public void testSNIServerSSLEnginePeerHost() throws Exception { assertEquals("host2.com", test.indicatedServerName); assertTrue("X509ExtendedKeyManager.chooseEngineServerAlias is not called", called.get()); } + + @Test + public void testServerWithDuplicatedContext() throws Exception { + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + duplicated.getLocal(ContextInternal.LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); + + server.connectHandler(so -> { + ContextInternal current = ContextInternal.current(); + assertFalse("A duplicated context", current.isDuplicate()); + assertNull("Local map shouldn't have an entry for the key 'foo'", current.getLocal(ContextInternal.LOCAL_MAP)); + so.end(); + }); + startServer(testAddress, duplicated); + + NetSocket so = client.connect(testAddress).await(); + so.end().await(); + } } diff --git a/vertx-core/src/test/java/io/vertx/tests/timer/TimerTest.java b/vertx-core/src/test/java/io/vertx/tests/timer/TimerTest.java index 5b4328e898e..fb64ed4d908 100644 --- a/vertx-core/src/test/java/io/vertx/tests/timer/TimerTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/timer/TimerTest.java @@ -19,7 +19,9 @@ import io.vertx.test.core.VertxTestBase; import org.junit.Test; +import java.util.Collections; import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -320,6 +322,36 @@ public void handle(Long l) { await(); } + @Test + public void testPeriodicWithDuplicateContext() { + + waitFor(2); + + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + ContextInternal duplicated = context.duplicate(); + duplicated.getLocal(ContextInternal.LOCAL_MAP, ConcurrentHashMap::new).put("foo", "bar"); + + duplicated.runOnContext(v -> { + vertx.setPeriodic(10, id -> { + ContextInternal current = (ContextInternal) Vertx.currentContext(); + assertTrue("Not a duplicated context", current.isDuplicate()); + assertNull("Local map shouldn't have an entry for the key 'foo'", current.getLocal(ContextInternal.LOCAL_MAP)); + vertx.cancelTimer(id); + complete(); + }); + }); + + duplicated.setPeriodic(10, id -> { + ContextInternal current = (ContextInternal) Vertx.currentContext(); + assertTrue("Not a duplicated context", current.isDuplicate()); + assertNull("Local map shouldn't have an entry for the key 'foo'", current.getLocal(ContextInternal.LOCAL_MAP)); + vertx.cancelTimer(id); + complete(); + }); + + await(); + } + @Repeat(times = 100) @Test public void testRaceWhenTimerCreatedOutsideEventLoop() {