Skip to content

Commit 8e549d1

Browse files
Apply Palantir Java Format
1 parent a493eeb commit 8e549d1

10 files changed

Lines changed: 32 additions & 34 deletions

File tree

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBroker.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,12 +165,10 @@ public void moveToDlq(
165165
RqueueMessage updated,
166166
long delayMs) {
167167
RedisUtils.executePipeLine(
168-
template.getTemplate(),
169-
(connection, keySerializer, valueSerializer) -> {
168+
template.getTemplate(), (connection, keySerializer, valueSerializer) -> {
170169
byte[] updatedBytes = valueSerializer.serialize(updated);
171170
byte[] oldBytes = valueSerializer.serialize(old);
172-
byte[] processingQueueBytes =
173-
keySerializer.serialize(source.getProcessingQueueName());
171+
byte[] processingQueueBytes = keySerializer.serialize(source.getProcessingQueueName());
174172
byte[] targetQueueBytes = keySerializer.serialize(targetQueue);
175173
if (delayMs > 0) {
176174
connection.zAdd(targetQueueBytes, delayMs, updatedBytes);

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/JobImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
import com.github.sonus21.rqueue.config.RqueueConfig;
2121
import com.github.sonus21.rqueue.core.Job;
2222
import com.github.sonus21.rqueue.core.RqueueMessage;
23-
import com.github.sonus21.rqueue.core.spi.MessageBroker;
2423
import com.github.sonus21.rqueue.core.context.Context;
2524
import com.github.sonus21.rqueue.core.context.DefaultContext;
2625
import com.github.sonus21.rqueue.core.middleware.TimeProviderMiddleware;
26+
import com.github.sonus21.rqueue.core.spi.MessageBroker;
2727
import com.github.sonus21.rqueue.dao.RqueueJobDao;
2828
import com.github.sonus21.rqueue.models.db.Execution;
2929
import com.github.sonus21.rqueue.models.db.MessageMetadata;

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ private void moveMessageToDlq(JobImpl job, int failureCount, Throwable throwable
137137
job.getRqueueMessage(),
138138
job.getQueueDetail().getDeadLetterQueueName());
139139
RqueueMessage rqueueMessage = job.getRqueueMessage();
140-
RqueueMessage newMessage = rqueueMessage.toBuilder().failureCount(failureCount).build();
140+
RqueueMessage newMessage =
141+
rqueueMessage.toBuilder().failureCount(failureCount).build();
141142
newMessage.updateReEnqueuedAt();
142143
QueueDetail queueDetail = job.getQueueDetail();
143144
Object userMessage = job.getMessage();
@@ -151,7 +152,8 @@ private void moveMessageToDlq(JobImpl job, int failureCount, Throwable throwable
151152
"Queue Config not found for queue {}",
152153
null,
153154
queueDetail.getDeadLetterQueue());
154-
broker.moveToDlq(queueDetail, queueDetail.getDeadLetterQueueName(), rqueueMessage, newMessage, -1);
155+
broker.moveToDlq(
156+
queueDetail, queueDetail.getDeadLetterQueueName(), rqueueMessage, newMessage, -1);
155157
} else {
156158
newMessage.setQueueName(queueConfig.getName());
157159
newMessage.setFailureCount(0);
@@ -161,10 +163,12 @@ private void moveMessageToDlq(JobImpl job, int failureCount, Throwable throwable
161163
backOff = (backOff == TaskExecutionBackOff.STOP)
162164
? FixedTaskExecutionBackOff.DEFAULT_INTERVAL
163165
: backOff;
164-
broker.moveToDlq(queueDetail, queueConfig.getScheduledQueueName(), rqueueMessage, newMessage, backOff);
166+
broker.moveToDlq(
167+
queueDetail, queueConfig.getScheduledQueueName(), rqueueMessage, newMessage, backOff);
165168
}
166169
} else {
167-
broker.moveToDlq(queueDetail, queueDetail.getDeadLetterQueueName(), rqueueMessage, newMessage, -1);
170+
broker.moveToDlq(
171+
queueDetail, queueDetail.getDeadLetterQueueName(), rqueueMessage, newMessage, -1);
168172
}
169173
publishEvent(job, newMessage, MessageStatus.MOVED_TO_DLQ);
170174
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,7 @@ abstract class RqueueMessagePoller extends MessageContainerBase {
5959
}
6060

6161
private List<RqueueMessage> getMessages(QueueDetail queueDetail, int count) {
62-
return rqueueBeanProvider
63-
.getMessageBroker()
64-
.pop(queueDetail, null, count, Duration.ZERO);
62+
return rqueueBeanProvider.getMessageBroker().pop(queueDetail, null, count, Duration.ZERO);
6563
}
6664

6765
private void execute(

rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/JobImplTest.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -257,9 +257,7 @@ void updateExecutionTime() {
257257
void getVisibilityTimeout() {
258258
JobImpl job = instance();
259259
job.execute();
260-
doReturn(-10L)
261-
.when(messageBroker)
262-
.getVisibilityTimeoutScore(queueDetail, rqueueMessage);
260+
doReturn(-10L).when(messageBroker).getVisibilityTimeoutScore(queueDetail, rqueueMessage);
263261
assertEquals(Duration.ZERO, job.getVisibilityTimeout());
264262

265263
doReturn(System.currentTimeMillis() + 10_000L)
@@ -268,23 +266,17 @@ void getVisibilityTimeout() {
268266
Duration timeout = job.getVisibilityTimeout();
269267
assertTrue(timeout.toMillis() <= 10_000 && timeout.toMillis() >= 9_000);
270268

271-
doReturn(0L)
272-
.when(messageBroker)
273-
.getVisibilityTimeoutScore(queueDetail, rqueueMessage);
269+
doReturn(0L).when(messageBroker).getVisibilityTimeoutScore(queueDetail, rqueueMessage);
274270
assertEquals(Duration.ZERO, job.getVisibilityTimeout());
275271
}
276272

277273
@Test
278274
void updateVisibilityTimeout() {
279275
JobImpl job = instance();
280276
job.execute();
281-
doReturn(true)
282-
.when(messageBroker)
283-
.extendVisibilityTimeout(queueDetail, rqueueMessage, 5_000L);
277+
doReturn(true).when(messageBroker).extendVisibilityTimeout(queueDetail, rqueueMessage, 5_000L);
284278
assertTrue(job.updateVisibilityTimeout(Duration.ofSeconds(5)));
285-
doReturn(false)
286-
.when(messageBroker)
287-
.extendVisibilityTimeout(queueDetail, rqueueMessage, 5_000L);
279+
doReturn(false).when(messageBroker).extendVisibilityTimeout(queueDetail, rqueueMessage, 5_000L);
288280
assertFalse(job.updateVisibilityTimeout(Duration.ofSeconds(5)));
289281
}
290282

rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueExecutorTest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,10 +218,7 @@ void messageIsParkedForRetry() {
218218
.run();
219219
verify(messageBroker, times(1))
220220
.parkForRetry(
221-
eq(queueDetail),
222-
any(RqueueMessage.class),
223-
any(RqueueMessage.class),
224-
eq(5000L));
221+
eq(queueDetail), any(RqueueMessage.class), any(RqueueMessage.class), eq(5000L));
225222
}
226223

227224
@Test

rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,8 @@ void brokerWithPrimaryHandlerDispatchUsesNormalStartQueuePath() throws Exception
186186
// All brokers now go through the normal startQueue/startGroup path.
187187
assertTrue(
188188
container.startQueueCalled.get() || container.startGroupCalled.get(),
189-
"startQueue or startGroup should be invoked for any broker using primary handler dispatch");
189+
"startQueue or startGroup should be invoked for any broker using primary handler"
190+
+ " dispatch");
190191
} finally {
191192
container.stop();
192193
container.destroy();
@@ -207,7 +208,8 @@ void redisDefaultsBrokerAlsoUsesNormalStartQueuePath() throws Exception {
207208
assertTrue(
208209
container.startQueueCalled.get() || container.startGroupCalled.get(),
209210
"legacy Redis-side wiring should run for REDIS_DEFAULTS capabilities");
210-
assertFalse(container.startBrokerPollersCalled.get(),
211+
assertFalse(
212+
container.startBrokerPollersCalled.get(),
211213
"startBrokerPollers no longer exists; flag must remain false");
212214
} finally {
213215
container.stop();

rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerPriorityTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ public Capabilities capabilities() {
9898
.numRetry(3)
9999
.priority(Collections.emptyMap())
100100
.build();
101-
RqueueMessage msg = RqueueMessage.builder().id("x").queueName("q1").message("p").build();
101+
RqueueMessage msg =
102+
RqueueMessage.builder().id("x").queueName("q1").message("p").build();
102103
broker.enqueue(qd, "high", msg);
103104
broker.enqueue(qd, msg);
104105
assertEquals(2, plain.get(), "default priority overload should delegate to enqueue(q, m)");

rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMiddlewareTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@
3737
import com.github.sonus21.rqueue.core.Job;
3838
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
3939
import com.github.sonus21.rqueue.core.RqueueMessage;
40-
import com.github.sonus21.rqueue.core.spi.MessageBroker;
4140
import com.github.sonus21.rqueue.core.context.Context;
4241
import com.github.sonus21.rqueue.core.context.DefaultContext;
4342
import com.github.sonus21.rqueue.core.middleware.ContextMiddleware;
4443
import com.github.sonus21.rqueue.core.middleware.Middleware;
4544
import com.github.sonus21.rqueue.core.middleware.PermissionMiddleware;
4645
import com.github.sonus21.rqueue.core.middleware.ProfilerMiddleware;
4746
import com.github.sonus21.rqueue.core.middleware.RateLimiterMiddleware;
47+
import com.github.sonus21.rqueue.core.spi.MessageBroker;
4848
import com.github.sonus21.rqueue.core.support.MessageProcessor;
4949
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
5050
import com.github.sonus21.rqueue.dao.RqueueJobDao;

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,13 +255,19 @@ public Mono<Void> enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long
255255

256256
@Override
257257
public List<RqueueMessage> pop(QueueDetail q, String consumerName, int batch, Duration wait) {
258-
return popInternal(streamFor(q), subjectFor(q), resolveConsumerName(q.getName(), consumerName), batch, wait);
258+
return popInternal(
259+
streamFor(q), subjectFor(q), resolveConsumerName(q.getName(), consumerName), batch, wait);
259260
}
260261

261262
@Override
262263
public List<RqueueMessage> pop(
263264
QueueDetail q, String priority, String consumerName, int batch, Duration wait) {
264-
return popInternal(streamFor(q, priority), subjectFor(q, priority), resolveConsumerName(q.getName(), consumerName), batch, wait);
265+
return popInternal(
266+
streamFor(q, priority),
267+
subjectFor(q, priority),
268+
resolveConsumerName(q.getName(), consumerName),
269+
batch,
270+
wait);
265271
}
266272

267273
private static String resolveConsumerName(String queueName, String consumerName) {

0 commit comments

Comments
 (0)