|
70 | 70 | import java.util.UUID; |
71 | 71 | import java.util.concurrent.Callable; |
72 | 72 | import java.util.concurrent.Executor; |
| 73 | +import java.util.concurrent.ExecutorService; |
73 | 74 | import java.util.concurrent.Executors; |
74 | 75 | import lombok.extern.slf4j.Slf4j; |
75 | 76 | import org.junit.jupiter.api.BeforeEach; |
@@ -328,19 +329,26 @@ void logAndRateLimiterMiddleware() throws TimedOutException { |
328 | 329 | }) |
329 | 330 | .when(messageHandler) |
330 | 331 | .handleMessage(any()); |
331 | | - Executor executor = Executors.newSingleThreadExecutor(); |
| 332 | + ExecutorService executor = Executors.newFixedThreadPool(4); |
332 | 333 | long startTime = System.currentTimeMillis(); |
333 | | - for (RqueueMessage message : messages) { |
334 | | - executor.execute(new RqueueExecutor( |
335 | | - rqueueBeanProvider, |
336 | | - queueStateMgr, |
337 | | - newArrayList(logMiddleware, testRateLimiter), |
338 | | - postProcessingHandler, |
339 | | - message, |
340 | | - queueDetail, |
341 | | - queueThreadPool)); |
| 334 | + try { |
| 335 | + for (RqueueMessage message : messages) { |
| 336 | + executor.execute(new RqueueExecutor( |
| 337 | + rqueueBeanProvider, |
| 338 | + queueStateMgr, |
| 339 | + newArrayList(logMiddleware, testRateLimiter), |
| 340 | + postProcessingHandler, |
| 341 | + message, |
| 342 | + queueDetail, |
| 343 | + queueThreadPool)); |
| 344 | + } |
| 345 | + TimeoutUtils.waitFor( |
| 346 | + () -> testRateLimiter.jobs.size() == jobCount, |
| 347 | + 20_000L, |
| 348 | + "all jobs to proceed"); |
| 349 | + } finally { |
| 350 | + executor.shutdownNow(); |
342 | 351 | } |
343 | | - TimeoutUtils.waitFor(() -> testRateLimiter.jobs.size() == jobCount, "all jobs to proceed"); |
344 | 352 | long endTime = System.currentTimeMillis(); |
345 | 353 | // we need to round since rate is at second resolution and execution in millis so we round this |
346 | 354 | // to next second |
|
0 commit comments