55package io .modelcontextprotocol .server ;
66
77import java .util .UUID ;
8- import java .util .concurrent .CountDownLatch ;
9- import java .util .concurrent .TimeUnit ;
108
119import io .modelcontextprotocol .MockMcpServerTransport ;
1210import io .modelcontextprotocol .MockMcpServerTransportProvider ;
@@ -42,21 +40,6 @@ private static McpAsyncServer buildServer(MockMcpServerTransportProvider transpo
4240 .build ();
4341 }
4442
45- /**
46- * Sends a request through the transport provider and blocks until the session has
47- * sent a response for that request ID, guaranteeing the handler has fully executed.
48- */
49- private static void sendAndAwait (MockMcpServerTransport transport , MockMcpServerTransportProvider transportProvider ,
50- McpSchema .JSONRPCRequest request ) throws InterruptedException {
51- String requestId = request .id ().toString ();
52- CountDownLatch latch = new CountDownLatch (1 );
53- transport .setInterceptorForNextResponse (requestId , latch );
54- transportProvider .simulateIncomingMessage (request );
55- assertThat (latch .await (5 , TimeUnit .SECONDS ))
56- .as ("server should have responded to request " + requestId + " within 5 s" )
57- .isTrue ();
58- }
59-
6043 private static McpSchema .JSONRPCRequest initRequest () {
6144 return new McpSchema .JSONRPCRequest (McpSchema .JSONRPC_VERSION , McpSchema .METHOD_INITIALIZE ,
6245 UUID .randomUUID ().toString (),
@@ -79,12 +62,12 @@ private static McpSchema.JSONRPCRequest unsubscribeRequest(String uri) {
7962 }
8063
8164 @ Test
82- void notifyResourcesUpdated_noSubscribers_completesEmpty () throws InterruptedException {
65+ void notifyResourcesUpdated_noSubscribers_completesEmpty () {
8366 MockMcpServerTransport transport = new MockMcpServerTransport ();
8467 MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider (transport );
8568 McpAsyncServer server = buildServer (transportProvider );
8669
87- sendAndAwait ( transport , transportProvider , initRequest ());
70+ transportProvider . simulateIncomingMessage ( initRequest ());
8871 transportProvider .simulateIncomingMessage (initializedNotification ());
8972 transport .clearSentMessages ();
9073
@@ -98,14 +81,14 @@ void notifyResourcesUpdated_noSubscribers_completesEmpty() throws InterruptedExc
9881 }
9982
10083 @ Test
101- void notifyResourcesUpdated_afterSubscribe_notifiesSession () throws InterruptedException {
84+ void notifyResourcesUpdated_afterSubscribe_notifiesSession () {
10285 MockMcpServerTransport transport = new MockMcpServerTransport ();
10386 MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider (transport );
10487 McpAsyncServer server = buildServer (transportProvider );
10588
106- sendAndAwait ( transport , transportProvider , initRequest ());
89+ transportProvider . simulateIncomingMessage ( initRequest ());
10790 transportProvider .simulateIncomingMessage (initializedNotification ());
108- sendAndAwait ( transport , transportProvider , subscribeRequest (RESOURCE_URI ));
91+ transportProvider . simulateIncomingMessage ( subscribeRequest (RESOURCE_URI ));
10992 transport .clearSentMessages ();
11093
11194 StepVerifier .create (server .notifyResourcesUpdated (new McpSchema .ResourcesUpdatedNotification (RESOURCE_URI )))
@@ -120,14 +103,14 @@ void notifyResourcesUpdated_afterSubscribe_notifiesSession() throws InterruptedE
120103 }
121104
122105 @ Test
123- void notifyResourcesUpdated_differentUri_doesNotNotifySession () throws InterruptedException {
106+ void notifyResourcesUpdated_differentUri_doesNotNotifySession () {
124107 MockMcpServerTransport transport = new MockMcpServerTransport ();
125108 MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider (transport );
126109 McpAsyncServer server = buildServer (transportProvider );
127110
128- sendAndAwait ( transport , transportProvider , initRequest ());
111+ transportProvider . simulateIncomingMessage ( initRequest ());
129112 transportProvider .simulateIncomingMessage (initializedNotification ());
130- sendAndAwait ( transport , transportProvider , subscribeRequest (RESOURCE_URI ));
113+ transportProvider . simulateIncomingMessage ( subscribeRequest (RESOURCE_URI ));
131114 transport .clearSentMessages ();
132115
133116 StepVerifier
@@ -142,15 +125,15 @@ void notifyResourcesUpdated_differentUri_doesNotNotifySession() throws Interrupt
142125 }
143126
144127 @ Test
145- void notifyResourcesUpdated_afterUnsubscribe_doesNotNotifySession () throws InterruptedException {
128+ void notifyResourcesUpdated_afterUnsubscribe_doesNotNotifySession () {
146129 MockMcpServerTransport transport = new MockMcpServerTransport ();
147130 MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider (transport );
148131 McpAsyncServer server = buildServer (transportProvider );
149132
150- sendAndAwait ( transport , transportProvider , initRequest ());
133+ transportProvider . simulateIncomingMessage ( initRequest ());
151134 transportProvider .simulateIncomingMessage (initializedNotification ());
152- sendAndAwait ( transport , transportProvider , subscribeRequest (RESOURCE_URI ));
153- sendAndAwait ( transport , transportProvider , unsubscribeRequest (RESOURCE_URI ));
135+ transportProvider . simulateIncomingMessage ( subscribeRequest (RESOURCE_URI ));
136+ transportProvider . simulateIncomingMessage ( unsubscribeRequest (RESOURCE_URI ));
154137 transport .clearSentMessages ();
155138
156139 StepVerifier .create (server .notifyResourcesUpdated (new McpSchema .ResourcesUpdatedNotification (RESOURCE_URI )))
@@ -163,14 +146,14 @@ void notifyResourcesUpdated_afterUnsubscribe_doesNotNotifySession() throws Inter
163146 }
164147
165148 @ Test
166- void notifyResourcesUpdated_afterSessionClose_doesNotNotifySession () throws InterruptedException {
149+ void notifyResourcesUpdated_afterSessionClose_doesNotNotifySession () {
167150 MockMcpServerTransport transport = new MockMcpServerTransport ();
168151 MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider (transport );
169152 McpAsyncServer server = buildServer (transportProvider );
170153
171- sendAndAwait ( transport , transportProvider , initRequest ());
154+ transportProvider . simulateIncomingMessage ( initRequest ());
172155 transportProvider .simulateIncomingMessage (initializedNotification ());
173- sendAndAwait ( transport , transportProvider , subscribeRequest (RESOURCE_URI ));
156+ transportProvider . simulateIncomingMessage ( subscribeRequest (RESOURCE_URI ));
174157
175158 // Close the session; onClose must fire and remove the subscription
176159 transportProvider .closeGracefully ().block ();
0 commit comments