Skip to content

Commit 914e432

Browse files
committed
Increase broker unit coverage
1 parent 421ac70 commit 914e432

2 files changed

Lines changed: 383 additions & 2 deletions

File tree

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Copyright (c) 2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
package com.github.sonus21.rqueue.core.spi;
11+
12+
import static org.junit.jupiter.api.Assertions.assertEquals;
13+
import static org.junit.jupiter.api.Assertions.assertFalse;
14+
import static org.junit.jupiter.api.Assertions.assertNull;
15+
import static org.junit.jupiter.api.Assertions.assertTrue;
16+
17+
import com.github.sonus21.rqueue.CoreUnitTest;
18+
import com.github.sonus21.rqueue.core.RqueueMessage;
19+
import com.github.sonus21.rqueue.listener.QueueDetail;
20+
import com.github.sonus21.rqueue.utils.TestUtils;
21+
import java.time.Duration;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.function.Consumer;
25+
import org.junit.jupiter.api.Test;
26+
import reactor.test.StepVerifier;
27+
28+
@CoreUnitTest
29+
class MessageBrokerDefaultMethodsTest {
30+
31+
private final QueueDetail queue = TestUtils.createQueueDetail("queue", 1, 30_000L, null);
32+
private final RqueueMessage oldMessage =
33+
RqueueMessage.builder().id("old").message("old").build();
34+
private final RqueueMessage updatedMessage =
35+
RqueueMessage.builder().id("updated").message("updated").processAt(1L).build();
36+
37+
@Test
38+
void priorityAndReactiveDefaultsDelegateToBlockingOperations() {
39+
RecordingBroker broker = new RecordingBroker();
40+
41+
broker.enqueue(queue, "high", updatedMessage);
42+
StepVerifier.create(broker.enqueueReactive(queue, oldMessage)).verifyComplete();
43+
StepVerifier.create(broker.enqueueWithDelayReactive(queue, updatedMessage, 17L))
44+
.verifyComplete();
45+
broker.pop(queue, "high", "consumer", 3, Duration.ofMillis(25L));
46+
47+
assertEquals(2, broker.enqueueCalls);
48+
assertEquals(oldMessage, broker.lastEnqueued);
49+
assertEquals(1, broker.delayCalls);
50+
assertEquals(17L, broker.lastDelayMs);
51+
assertEquals(1, broker.popCalls);
52+
assertEquals("consumer", broker.lastConsumerName);
53+
assertEquals(Duration.ofMillis(25L), broker.lastWait);
54+
}
55+
56+
@Test
57+
void retryDlqAndScheduleDefaultsUseBackendPrimitives() {
58+
RecordingBroker broker = new RecordingBroker();
59+
60+
broker.parkForRetry(queue, oldMessage, updatedMessage, 123L);
61+
broker.moveToDlq(queue, "dlq", oldMessage, updatedMessage, 0L);
62+
broker.moveToDlq(queue, "dlq", oldMessage, updatedMessage, 99L);
63+
broker.scheduleNext(queue, "period-key", updatedMessage, 60L);
64+
65+
assertEquals(1, broker.nackCalls);
66+
assertEquals(updatedMessage, broker.lastNacked);
67+
assertEquals(123L, broker.lastRetryDelayMs);
68+
assertEquals(1, broker.enqueueCalls);
69+
assertEquals(2, broker.delayCalls);
70+
assertEquals(updatedMessage, broker.lastDelayed);
71+
}
72+
73+
@Test
74+
void dashboardDefaultsExposeRedisLabelsAndSingleSubscriberFallback() {
75+
RecordingBroker broker = new RecordingBroker();
76+
77+
assertEquals("Redis", broker.storageKicker());
78+
assertEquals(
79+
"Underlying Redis structures for the queues visible on this page.",
80+
broker.storageDescription());
81+
assertNull(broker.storageDisplayName(queue));
82+
assertNull(broker.dlqStorageDisplayName(queue));
83+
assertNull(broker.consumerPendingSizes(queue));
84+
assertNull(broker.dataTypeLabel(null, null));
85+
assertFalse(broker.isSizeApproximate(queue));
86+
assertNull(broker.getVisibilityTimeoutScore(queue, oldMessage));
87+
assertFalse(broker.extendVisibilityTimeout(queue, oldMessage, 1L));
88+
89+
List<SubscriberView> subscribers = broker.subscribers(queue);
90+
91+
assertEquals(1, subscribers.size());
92+
assertEquals(queue.resolvedConsumerName(), subscribers.get(0).consumerName());
93+
assertEquals(42L, subscribers.get(0).pending());
94+
assertEquals(0L, subscribers.get(0).inFlight());
95+
assertTrue(subscribers.get(0).pendingShared());
96+
}
97+
98+
@Test
99+
void subscribersDefaultFallsBackToZeroWhenSizeFails() {
100+
RecordingBroker broker = new RecordingBroker();
101+
broker.failSize = true;
102+
103+
List<SubscriberView> subscribers = broker.subscribers(queue);
104+
105+
assertEquals(1, subscribers.size());
106+
assertEquals(0L, subscribers.get(0).pending());
107+
}
108+
109+
private static final class RecordingBroker implements MessageBroker {
110+
111+
int enqueueCalls;
112+
int delayCalls;
113+
int popCalls;
114+
int nackCalls;
115+
boolean failSize;
116+
String lastConsumerName;
117+
Duration lastWait;
118+
long lastDelayMs;
119+
long lastRetryDelayMs;
120+
RqueueMessage lastEnqueued;
121+
RqueueMessage lastDelayed;
122+
RqueueMessage lastNacked;
123+
124+
@Override
125+
public void enqueue(QueueDetail q, RqueueMessage m) {
126+
enqueueCalls++;
127+
lastEnqueued = m;
128+
}
129+
130+
@Override
131+
public void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs) {
132+
delayCalls++;
133+
lastDelayed = m;
134+
lastDelayMs = delayMs;
135+
}
136+
137+
@Override
138+
public List<RqueueMessage> pop(QueueDetail q, String consumerName, int batch, Duration wait) {
139+
popCalls++;
140+
lastConsumerName = consumerName;
141+
lastWait = wait;
142+
return Collections.emptyList();
143+
}
144+
145+
@Override
146+
public boolean ack(QueueDetail q, RqueueMessage m) {
147+
return true;
148+
}
149+
150+
@Override
151+
public boolean nack(QueueDetail q, RqueueMessage m, long retryDelayMs) {
152+
nackCalls++;
153+
lastNacked = m;
154+
lastRetryDelayMs = retryDelayMs;
155+
return true;
156+
}
157+
158+
@Override
159+
public long moveExpired(QueueDetail q, long now, int batch) {
160+
return 0;
161+
}
162+
163+
@Override
164+
public List<RqueueMessage> peek(QueueDetail q, long offset, long count) {
165+
return Collections.emptyList();
166+
}
167+
168+
@Override
169+
public long size(QueueDetail q) {
170+
if (failSize) {
171+
throw new IllegalStateException("backend down");
172+
}
173+
return 42L;
174+
}
175+
176+
@Override
177+
public AutoCloseable subscribe(String channel, Consumer<String> handler) {
178+
return () -> {};
179+
}
180+
181+
@Override
182+
public void publish(String channel, String payload) {}
183+
184+
@Override
185+
public Capabilities capabilities() {
186+
return Capabilities.REDIS_DEFAULTS;
187+
}
188+
}
189+
}

0 commit comments

Comments
 (0)