Skip to content

Commit dd1cd8c

Browse files
Apply Palantir Java Format
1 parent 5895320 commit dd1cd8c

28 files changed

Lines changed: 266 additions & 291 deletions

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,3 @@ default MessageBroker getMessageBroker() {
112112
return null;
113113
}
114114
}
115-

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/BaseMessageSender.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@ abstract class BaseMessageSender {
8080
protected Object storeMessageMetadata(
8181
RqueueMessage rqueueMessage, Long delayInMillis, boolean reactive, boolean isUnique) {
8282
com.github.sonus21.rqueue.core.spi.MessageBroker broker = messageTemplate.getMessageBroker();
83-
boolean skipMetadata =
84-
broker != null && !broker.capabilities().usesPrimaryHandlerDispatch();
83+
boolean skipMetadata = broker != null && !broker.capabilities().usesPrimaryHandlerDispatch();
8584
if (skipMetadata) {
8685
return reactive ? reactor.core.publisher.Mono.just(true) : null;
8786
}
@@ -120,9 +119,7 @@ protected Object enqueue(
120119
boolean reactive) {
121120
com.github.sonus21.rqueue.core.spi.MessageBroker broker = messageTemplate.getMessageBroker();
122121
boolean useBroker =
123-
!reactive
124-
&& broker != null
125-
&& !broker.capabilities().usesPrimaryHandlerDispatch();
122+
!reactive && broker != null && !broker.capabilities().usesPrimaryHandlerDispatch();
126123
if (delayInMilliSecs == null || delayInMilliSecs <= MIN_DELAY) {
127124
if (useBroker) {
128125
broker.enqueue(queueDetail, priority, rqueueMessage);

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/ReactiveRqueueMessageEnqueuerImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,8 @@ private <T> Mono<T> pushReactiveMessage(
103103
|| delayInMilliSecs <= com.github.sonus21.rqueue.utils.Constants.MIN_DELAY) {
104104
brokerMono = messageBroker.enqueueReactive(queueDetail, rqueueMessage);
105105
} else {
106-
brokerMono =
107-
messageBroker.enqueueWithDelayReactive(
108-
queueDetail, rqueueMessage, delayInMilliSecs);
106+
brokerMono = messageBroker.enqueueWithDelayReactive(
107+
queueDetail, rqueueMessage, delayInMilliSecs);
109108
}
110109
return brokerMono.then(Mono.defer(() -> monoConverter.apply(rqueueMessage)));
111110
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/Capabilities.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ public record Capabilities(
2020
boolean supportsDelayedEnqueue,
2121
boolean supportsScheduledIntrospection,
2222
boolean supportsCronJobs,
23-
boolean usesPrimaryHandlerDispatch
24-
) {
23+
boolean usesPrimaryHandlerDispatch) {
2524
public static final Capabilities REDIS_DEFAULTS = new Capabilities(true, true, true, true);
2625
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBrokerLoader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121

2222
public final class MessageBrokerLoader {
2323

24-
private MessageBrokerLoader() {
25-
}
24+
private MessageBrokerLoader() {}
2625

2726
public static MessageBroker load(String name, Map<String, String> config) {
2827
for (MessageBrokerFactory f : ServiceLoader.load(MessageBrokerFactory.class)) {

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBroker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ public long size(QueueDetail q) {
123123
public AutoCloseable subscribe(String channel, Consumer<String> handler) {
124124
if (pubSubContainer == null) {
125125
throw new IllegalStateException(
126-
"RedisMessageListenerContainer not configured for RedisMessageBroker; subscribe is unavailable");
126+
"RedisMessageListenerContainer not configured for RedisMessageBroker; subscribe is"
127+
+ " unavailable");
127128
}
128129
final ChannelTopic topic = new ChannelTopic(channel);
129130
final MessageListener listener = new MessageListener() {

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java

Lines changed: 30 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -414,36 +414,31 @@ private void validateConsumerNameUniqueness() {
414414
for (RqueueMessageHandler.HandlerMethodWithPrimary hmp : e.getValue()) {
415415
Object beanRef = hmp.method.getBean();
416416
String beanName =
417-
beanRef instanceof String
418-
? (String) beanRef
419-
: beanRef.getClass().getSimpleName();
417+
beanRef instanceof String ? (String) beanRef : beanRef.getClass().getSimpleName();
420418
String methodName = hmp.method.getMethod().getName();
421419
com.github.sonus21.rqueue.annotation.RqueueListener ann =
422420
org.springframework.core.annotation.AnnotationUtils.findAnnotation(
423421
hmp.method.getMethod(), com.github.sonus21.rqueue.annotation.RqueueListener.class);
424422
if (ann == null) {
425-
ann =
426-
org.springframework.core.annotation.AnnotationUtils.findAnnotation(
427-
hmp.method.getBeanType(),
428-
com.github.sonus21.rqueue.annotation.RqueueListener.class);
423+
ann = org.springframework.core.annotation.AnnotationUtils.findAnnotation(
424+
hmp.method.getBeanType(), com.github.sonus21.rqueue.annotation.RqueueListener.class);
429425
}
430426
for (String queue : mapping.getQueueNames()) {
431427
String consumerName =
432428
ConsumerNameResolver.resolveConsumerName(ann, beanName, methodName, queue);
433429
String key = queue + "::" + consumerName;
434430
String prior = seen.putIfAbsent(key, beanName + "#" + methodName);
435431
if (prior != null) {
436-
collisions.add(
437-
"queue='"
438-
+ queue
439-
+ "' consumerName='"
440-
+ consumerName
441-
+ "' between "
442-
+ prior
443-
+ " and "
444-
+ beanName
445-
+ "#"
446-
+ methodName);
432+
collisions.add("queue='"
433+
+ queue
434+
+ "' consumerName='"
435+
+ consumerName
436+
+ "' between "
437+
+ prior
438+
+ " and "
439+
+ beanName
440+
+ "#"
441+
+ methodName);
447442
}
448443
}
449444
}
@@ -679,10 +674,8 @@ protected void startBrokerPollers() {
679674
org.springframework.core.annotation.AnnotationUtils.findAnnotation(
680675
hmp.method.getMethod(), com.github.sonus21.rqueue.annotation.RqueueListener.class);
681676
if (ann == null) {
682-
ann =
683-
org.springframework.core.annotation.AnnotationUtils.findAnnotation(
684-
hmp.method.getBeanType(),
685-
com.github.sonus21.rqueue.annotation.RqueueListener.class);
677+
ann = org.springframework.core.annotation.AnnotationUtils.findAnnotation(
678+
hmp.method.getBeanType(), com.github.sonus21.rqueue.annotation.RqueueListener.class);
686679
}
687680
for (String queue : mapping.getQueueNames()) {
688681
QueueDetail qd = queueByName.get(queue);
@@ -709,9 +702,9 @@ protected void startBrokerPollers() {
709702
&& !qd.getPriorityGroup().equals(qd.getName())
710703
&& !qd.getPriorityGroup().equals(Constants.DEFAULT_PRIORITY_GROUP)) {
711704
log.warn(
712-
"Queue '{}' is part of cross-queue priorityGroup='{}'. The NATS backend does not "
713-
+ "support cross-queue priority groups in v1; the priority hint will be honored "
714-
+ "on the same queue but cross-queue weighting is ignored.",
705+
"Queue '{}' is part of cross-queue priorityGroup='{}'. The NATS backend does not"
706+
+ " support cross-queue priority groups in v1; the priority hint will be"
707+
+ " honored on the same queue but cross-queue weighting is ignored.",
715708
queue,
716709
qd.getPriorityGroup());
717710
}
@@ -720,22 +713,18 @@ protected void startBrokerPollers() {
720713
String consumerName =
721714
priority == null ? baseConsumerName : baseConsumerName + "-" + priority;
722715
for (int i = 0; i < threadCount; i++) {
723-
BrokerMessagePoller poller =
724-
new BrokerMessagePoller(
725-
messageBroker,
726-
qd,
727-
priority,
728-
consumerName,
729-
hmp.method,
730-
rqueueMessageHandler.getMessageConverter(),
731-
taskExecutionBackOff,
732-
queueStateMgr,
733-
Math.max(
734-
1,
735-
qd.getBatchSize() > 0
736-
? qd.getBatchSize()
737-
: DEFAULT_BROKER_POLLER_BATCH),
738-
DEFAULT_BROKER_POLLER_FETCH_WAIT);
716+
BrokerMessagePoller poller = new BrokerMessagePoller(
717+
messageBroker,
718+
qd,
719+
priority,
720+
consumerName,
721+
hmp.method,
722+
rqueueMessageHandler.getMessageConverter(),
723+
taskExecutionBackOff,
724+
queueStateMgr,
725+
Math.max(
726+
1, qd.getBatchSize() > 0 ? qd.getBatchSize() : DEFAULT_BROKER_POLLER_BATCH),
727+
DEFAULT_BROKER_POLLER_FETCH_WAIT);
739728
brokerPollers.add(poller);
740729
Future<?> future = taskExecutor.submit(poller);
741730
String key =

rqueue-core/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueQDetailServiceImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
2727
import com.github.sonus21.rqueue.core.spi.MessageBroker;
2828
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
29-
import com.github.sonus21.rqueue.listener.QueueDetail;
3029
import com.github.sonus21.rqueue.exception.UnknownSwitchCase;
30+
import com.github.sonus21.rqueue.listener.QueueDetail;
3131
import com.github.sonus21.rqueue.models.db.DeadLetterQueue;
3232
import com.github.sonus21.rqueue.models.db.MessageMetadata;
3333
import com.github.sonus21.rqueue.models.db.QueueConfig;
@@ -108,7 +108,6 @@ public void setMessageBroker(MessageBroker messageBroker) {
108108
this.messageBroker = messageBroker;
109109
}
110110

111-
112111
/**
113112
* Visible for tests and pluggable backends.
114113
*/

rqueue-core/src/test/java/com/github/sonus21/rqueue/core/impl/ReactiveRqueueMessageEnqueuerBrokerRoutingTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,14 @@ void enqueueInReactive_routesThroughBrokerDelayed_whenBrokerSet() {
124124
.verifyComplete();
125125

126126
verify(messageBroker, times(1))
127-
.enqueueWithDelayReactive(
128-
any(QueueDetail.class), any(RqueueMessage.class), eq(5_000L));
129-
verify(messageTemplate, never())
130-
.addReactiveMessageWithDelay(any(), any(), any());
127+
.enqueueWithDelayReactive(any(QueueDetail.class), any(RqueueMessage.class), eq(5_000L));
128+
verify(messageTemplate, never()).addReactiveMessageWithDelay(any(), any(), any());
131129
}
132130

133131
@Test
134132
void enqueueReactive_fallsBackToRedisTemplate_whenBrokerNull() {
135-
when(messageTemplate.addReactiveMessage(eq(queueDetail.getQueueName()), any(RqueueMessage.class)))
133+
when(messageTemplate.addReactiveMessage(
134+
eq(queueDetail.getQueueName()), any(RqueueMessage.class)))
136135
.thenReturn(Mono.just(1L));
137136

138137
StepVerifier.create(enqueuer.enqueue(queue, "payload"))

rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBrokerDelegationTest.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,9 @@ void enqueueWithDelayDelegatesToAddMessageWithDelay() {
9797
RqueueMessage m = RqueueMessage.builder().id("a").message("msg").build();
9898
broker.enqueueWithDelay(QUEUE, m, 5000L);
9999

100-
verify(template).addMessageWithDelay(
101-
QUEUE.getScheduledQueueName(), QUEUE.getScheduledQueueChannelName(), m);
100+
verify(template)
101+
.addMessageWithDelay(
102+
QUEUE.getScheduledQueueName(), QUEUE.getScheduledQueueChannelName(), m);
102103
}
103104

104105
@Test
@@ -109,12 +110,13 @@ void popDelegatesToTemplatePop() {
109110
List<RqueueMessage> out = broker.pop(QUEUE, "consumer", 5, Duration.ofSeconds(1));
110111

111112
assertNotNull(out);
112-
verify(template).pop(
113-
QUEUE.getQueueName(),
114-
QUEUE.getProcessingQueueName(),
115-
QUEUE.getProcessingQueueChannelName(),
116-
QUEUE.getVisibilityTimeout(),
117-
5);
113+
verify(template)
114+
.pop(
115+
QUEUE.getQueueName(),
116+
QUEUE.getProcessingQueueName(),
117+
QUEUE.getProcessingQueueChannelName(),
118+
QUEUE.getVisibilityTimeout(),
119+
5);
118120
}
119121

120122
@Test
@@ -138,35 +140,33 @@ void ackReturnsFalseWhenNotRemoved() {
138140
void nackWithNoDelayDelegatesToMoveMessage() {
139141
RqueueMessage m = RqueueMessage.builder().id("a").message("msg").build();
140142
assertTrue(broker.nack(QUEUE, m, 0L));
141-
verify(template).moveMessage(
142-
QUEUE.getProcessingQueueName(), QUEUE.getQueueName(), m, m);
143+
verify(template).moveMessage(QUEUE.getProcessingQueueName(), QUEUE.getQueueName(), m, m);
143144
}
144145

145146
@Test
146147
void nackWithDelayDelegatesToMoveMessageWithDelay() {
147148
RqueueMessage m = RqueueMessage.builder().id("a").message("msg").build();
148149
assertTrue(broker.nack(QUEUE, m, 1500L));
149-
verify(template).moveMessageWithDelay(
150-
QUEUE.getProcessingQueueName(), QUEUE.getScheduledQueueName(), m, m, 1500L);
150+
verify(template)
151+
.moveMessageWithDelay(
152+
QUEUE.getProcessingQueueName(), QUEUE.getScheduledQueueName(), m, m, 1500L);
151153
}
152154

153155
@Test
154156
void moveExpiredDelegatesToMoveMessageZsetToList() {
155157
when(template.moveMessageZsetToList(
156-
eq(QUEUE.getScheduledQueueName()), eq(QUEUE.getQueueName()), eq(10)))
158+
eq(QUEUE.getScheduledQueueName()), eq(QUEUE.getQueueName()), eq(10)))
157159
.thenReturn(new MessageMoveResult(7, true));
158160

159161
long moved = broker.moveExpired(QUEUE, System.currentTimeMillis(), 10);
160162

161163
assertEquals(7L, moved);
162-
verify(template).moveMessageZsetToList(
163-
QUEUE.getScheduledQueueName(), QUEUE.getQueueName(), 10);
164+
verify(template).moveMessageZsetToList(QUEUE.getScheduledQueueName(), QUEUE.getQueueName(), 10);
164165
}
165166

166167
@Test
167168
void peekDelegatesToReadFromList() {
168-
when(template.readFromList(QUEUE.getQueueName(), 0L, 4L))
169-
.thenReturn(Collections.emptyList());
169+
when(template.readFromList(QUEUE.getQueueName(), 0L, 4L)).thenReturn(Collections.emptyList());
170170

171171
broker.peek(QUEUE, 0L, 5L);
172172

@@ -203,7 +203,8 @@ void subscribeRegistersListenerOnContainerAndCloseRemovesIt() throws Exception {
203203
org.mockito.ArgumentCaptor<Topic> topicCaptor =
204204
org.mockito.ArgumentCaptor.forClass(Topic.class);
205205
verify(pubSubContainer).addMessageListener(listenerCaptor.capture(), topicCaptor.capture());
206-
assertEquals(new ChannelTopic("ch").getTopic(), ((ChannelTopic) topicCaptor.getValue()).getTopic());
206+
assertEquals(
207+
new ChannelTopic("ch").getTopic(), ((ChannelTopic) topicCaptor.getValue()).getTopic());
207208

208209
// simulate a delivered message
209210
Message message = new Message() {
@@ -221,8 +222,8 @@ public byte[] getChannel() {
221222
assertEquals("payload", received[0]);
222223

223224
handle.close();
224-
verify(pubSubContainer, times(1)).removeMessageListener(listenerCaptor.getValue(),
225-
topicCaptor.getValue());
225+
verify(pubSubContainer, times(1))
226+
.removeMessageListener(listenerCaptor.getValue(), topicCaptor.getValue());
226227
}
227228

228229
@Test

0 commit comments

Comments
 (0)