Skip to content

Commit a493eeb

Browse files
committed
Delete BrokerMessagePoller; route NATS through DefaultRqueuePoller
Replace the separate BrokerMessagePoller/ConsumerNameResolver path with the existing DefaultRqueuePoller + QueueThreadPool + RqueueExecutor pipeline used by the Redis backend. Key changes: - Delete BrokerMessagePoller, ConsumerNameResolver and their tests. - Add backend-neutral methods to MessageBroker SPI: parkForRetry, moveToDlq, scheduleNext, getVisibilityTimeoutScore, extendVisibilityTimeout. RedisMessageBroker overrides each with the existing Redis template calls. - Refactor PostProcessingHandler, JobImpl, RqueueExecutor and RqueueMessagePoller to use MessageBroker instead of the Redis-specific RqueueMessageTemplate. - RqueueMessageListenerContainer.initialize() resolves an effective broker (injected or a RedisMessageBroker wrapper) and removes the usesPrimaryHandlerDispatch==false branch that wired the old path. - JetStreamMessageBroker: set usesPrimaryHandlerDispatch=true so NATS queues flow through the standard poller; add resolveConsumerName fallback so a null consumerName from RqueueMessagePoller gets a stable "rqueue-<queue>" default. - Update all affected tests to mock MessageBroker instead of RqueueMessageTemplate. Assisted-By: Claude Code
1 parent 2512e44 commit a493eeb

21 files changed

Lines changed: 220 additions & 1632 deletions

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueBeanProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.github.sonus21.rqueue.common.RqueueLockManager;
2020
import com.github.sonus21.rqueue.config.RqueueConfig;
2121
import com.github.sonus21.rqueue.config.RqueueWebConfig;
22+
import com.github.sonus21.rqueue.core.spi.MessageBroker;
2223
import com.github.sonus21.rqueue.core.support.MessageProcessor;
2324
import com.github.sonus21.rqueue.dao.RqueueJobDao;
2425
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
@@ -69,4 +70,7 @@ public class RqueueBeanProvider {
6970

7071
@Autowired
7172
private RqueueConfig rqueueConfig;
73+
74+
/** Set by the container during initialization; never null after {@code afterPropertiesSet}. */
75+
private MessageBroker messageBroker;
7276
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,60 @@ default List<RqueueMessage> pop(
9191

9292
List<RqueueMessage> peek(QueueDetail q, long offset, long count);
9393

94+
/**
95+
* Remove {@code old} from the processing store and re-enqueue {@code updated} for retry.
96+
* {@code delayMs <= 0} means immediate; {@code delayMs > 0} means schedule after that delay.
97+
* Backends without a processing store (e.g. NATS) default to a plain nack.
98+
*/
99+
default void parkForRetry(QueueDetail q, RqueueMessage old, RqueueMessage updated, long delayMs) {
100+
nack(q, updated, delayMs);
101+
}
102+
103+
/**
104+
* Remove {@code old} from the processing store and enqueue {@code updated} to {@code targetQueue}.
105+
* {@code delayMs <= 0} means immediate (list push); {@code delayMs > 0} means schedule (sorted-set).
106+
* Backends without a processing store default to a plain enqueue to the DLQ.
107+
*/
108+
default void moveToDlq(
109+
QueueDetail source,
110+
String targetQueue,
111+
RqueueMessage old,
112+
RqueueMessage updated,
113+
long delayMs) {
114+
if (delayMs > 0) {
115+
enqueueWithDelay(source, updated, delayMs);
116+
} else {
117+
enqueue(source, updated);
118+
}
119+
}
120+
121+
/**
122+
* Schedule the next execution of a periodic message.
123+
* {@code messageKey} is the deduplication key; {@code expirySeconds} is the TTL for that key.
124+
* Backends that don't support server-side scheduling default to a delayed enqueue.
125+
*/
126+
default void scheduleNext(
127+
QueueDetail q, String messageKey, RqueueMessage message, long expirySeconds) {
128+
long delayMs = Math.max(0, message.getProcessAt() - System.currentTimeMillis());
129+
enqueueWithDelay(q, message, delayMs);
130+
}
131+
132+
/**
133+
* Returns the score (epoch-ms deadline) of {@code m} in the processing store, or {@code null}
134+
* if the backend does not track per-message visibility (e.g. NATS uses consumer-level AckWait).
135+
*/
136+
default Long getVisibilityTimeoutScore(QueueDetail q, RqueueMessage m) {
137+
return null;
138+
}
139+
140+
/**
141+
* Adds {@code deltaMs} to the visibility timeout of {@code m} in the processing store.
142+
* Returns {@code false} if the backend does not support per-message lease extension.
143+
*/
144+
default boolean extendVisibilityTimeout(QueueDetail q, RqueueMessage m, long deltaMs) {
145+
return false;
146+
}
147+
94148
long size(QueueDetail q);
95149

96150
AutoCloseable subscribe(String channel, Consumer<String> handler);

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBroker.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.github.sonus21.rqueue.core.spi.MessageBroker;
2323
import com.github.sonus21.rqueue.listener.QueueDetail;
2424
import com.github.sonus21.rqueue.models.MessageMoveResult;
25+
import com.github.sonus21.rqueue.utils.RedisUtils;
2526
import java.time.Duration;
2627
import java.util.List;
2728
import java.util.function.Consumer;
@@ -146,6 +147,56 @@ public void publish(String channel, String payload) {
146147
template.getTemplate().convertAndSend(channel, payload);
147148
}
148149

150+
@Override
151+
public void parkForRetry(QueueDetail q, RqueueMessage old, RqueueMessage updated, long delayMs) {
152+
if (delayMs <= 0) {
153+
template.moveMessage(q.getProcessingQueueName(), q.getQueueName(), old, updated);
154+
} else {
155+
template.moveMessageWithDelay(
156+
q.getProcessingQueueName(), q.getScheduledQueueName(), old, updated, delayMs);
157+
}
158+
}
159+
160+
@Override
161+
public void moveToDlq(
162+
QueueDetail source,
163+
String targetQueue,
164+
RqueueMessage old,
165+
RqueueMessage updated,
166+
long delayMs) {
167+
RedisUtils.executePipeLine(
168+
template.getTemplate(),
169+
(connection, keySerializer, valueSerializer) -> {
170+
byte[] updatedBytes = valueSerializer.serialize(updated);
171+
byte[] oldBytes = valueSerializer.serialize(old);
172+
byte[] processingQueueBytes =
173+
keySerializer.serialize(source.getProcessingQueueName());
174+
byte[] targetQueueBytes = keySerializer.serialize(targetQueue);
175+
if (delayMs > 0) {
176+
connection.zAdd(targetQueueBytes, delayMs, updatedBytes);
177+
} else {
178+
connection.lPush(targetQueueBytes, updatedBytes);
179+
}
180+
connection.zRem(processingQueueBytes, oldBytes);
181+
});
182+
}
183+
184+
@Override
185+
public void scheduleNext(
186+
QueueDetail q, String messageKey, RqueueMessage message, long expirySeconds) {
187+
template.scheduleMessage(q.getScheduledQueueName(), messageKey, message, expirySeconds);
188+
}
189+
190+
@Override
191+
public Long getVisibilityTimeoutScore(QueueDetail q, RqueueMessage m) {
192+
return template.getScore(q.getProcessingQueueName(), m);
193+
}
194+
195+
@Override
196+
public boolean extendVisibilityTimeout(QueueDetail q, RqueueMessage m, long deltaMs) {
197+
return template.addScore(q.getProcessingQueueName(), m, deltaMs);
198+
}
199+
149200
@Override
150201
public Capabilities capabilities() {
151202
return Capabilities.REDIS_DEFAULTS;

0 commit comments

Comments
 (0)