|
33 | 33 | import com.tencent.trpc.core.worker.handler.TrpcThreadExceptionHandler; |
34 | 34 | import com.tencent.trpc.core.worker.spi.WorkerPool; |
35 | 35 | import com.tencent.trpc.core.worker.support.thread.ThreadWorkerPool; |
| 36 | +import java.lang.reflect.Field; |
36 | 37 | import java.util.HashMap; |
37 | 38 | import java.util.Map; |
38 | 39 | import java.util.Objects; |
39 | 40 | import java.util.concurrent.CompletableFuture; |
40 | 41 | import java.util.concurrent.CompletionException; |
41 | 42 | import java.util.concurrent.CompletionStage; |
| 43 | +import java.util.concurrent.ConcurrentMap; |
42 | 44 | import java.util.concurrent.ExecutorService; |
43 | 45 | import java.util.concurrent.Executors; |
44 | 46 | import java.util.concurrent.atomic.AtomicLong; |
@@ -248,4 +250,55 @@ public void testDoInvokeWithCompletedFuture() { |
248 | 250 | } |
249 | 251 | } |
250 | 252 |
|
| 253 | + @Test |
| 254 | + @SuppressWarnings("unchecked") |
| 255 | + public void testGetInvokerSkipsUnavailableCachedInvoker() throws Exception { |
| 256 | + Assert.assertFalse(consumerInvokerProxy.isAvailable()); |
| 257 | + |
| 258 | + // Put the unavailable invoker into invokerCache via reflection |
| 259 | + Field cacheField = DefClusterInvoker.class.getDeclaredField("invokerCache"); |
| 260 | + cacheField.setAccessible(true); |
| 261 | + ConcurrentMap<String, ConsumerInvokerProxy<GenericClient>> cache = |
| 262 | + (ConcurrentMap<String, ConsumerInvokerProxy<GenericClient>>) cacheField.get(defClusterInvoker); |
| 263 | + |
| 264 | + ServiceInstance instance = new ServiceInstance("127.0.0.1", 12345); |
| 265 | + String key = "127.0.0.1:12345:null"; |
| 266 | + cache.put(key, consumerInvokerProxy); |
| 267 | + |
| 268 | + // Spy on defClusterInvoker and mock createInvoker to return a new available proxy |
| 269 | + DefClusterInvoker<GenericClient> spy = PowerMockito.spy(defClusterInvoker); |
| 270 | + ConsumerInvokerProxy<GenericClient> newProxy = PowerMockito.mock(ConsumerInvokerProxy.class); |
| 271 | + PowerMockito.when(newProxy.isAvailable()).thenReturn(true); |
| 272 | + PowerMockito.doReturn(newProxy).when(spy).createInvoker(instance); |
| 273 | + |
| 274 | + ConsumerInvokerProxy<GenericClient> result = spy.getInvoker(instance); |
| 275 | + |
| 276 | + // Should not return the unavailable cached invoker |
| 277 | + Assert.assertNotSame(consumerInvokerProxy, result); |
| 278 | + // Should return the new available one from createInvoker |
| 279 | + Assert.assertSame(newProxy, result); |
| 280 | + } |
| 281 | + |
| 282 | + @Test |
| 283 | + @SuppressWarnings("unchecked") |
| 284 | + public void testGetInvokerReturnsAvailableCachedInvoker() throws Exception { |
| 285 | + // Put an available invoker into invokerCache via reflection |
| 286 | + Field cacheField = DefClusterInvoker.class.getDeclaredField("invokerCache"); |
| 287 | + cacheField.setAccessible(true); |
| 288 | + ConcurrentMap<String, ConsumerInvokerProxy<GenericClient>> cache = |
| 289 | + (ConcurrentMap<String, ConsumerInvokerProxy<GenericClient>>) cacheField.get(defClusterInvoker); |
| 290 | + |
| 291 | + ServiceInstance instance = new ServiceInstance("127.0.0.1", 12345); |
| 292 | + String key = "127.0.0.1:12345:null"; |
| 293 | + |
| 294 | + // Create an available proxy (client.isAvailable() = true) |
| 295 | + ConsumerInvokerProxy<GenericClient> availableProxy = PowerMockito.mock(ConsumerInvokerProxy.class); |
| 296 | + PowerMockito.when(availableProxy.isAvailable()).thenReturn(true); |
| 297 | + cache.put(key, availableProxy); |
| 298 | + |
| 299 | + // getInvoker should return the cached available proxy directly |
| 300 | + ConsumerInvokerProxy<GenericClient> result = defClusterInvoker.getInvoker(instance); |
| 301 | + Assert.assertSame(availableProxy, result); |
| 302 | + } |
| 303 | + |
251 | 304 | } |
0 commit comments