Skip to content

Commit df9a5b7

Browse files
committed
Add rqueue-spring-boot-nats-example
Spring Boot example app on the NATS / JetStream backend, mirroring the existing rqueue-spring-boot-example. Backend selection is a property switch only (rqueue.backend=nats + rqueue.nats.connection.url); the listener / controller / domain code is unchanged from the redis example, which is the whole point of the pluggable-backend split. Two small differences from the redis example: - Delayed / periodic enqueue endpoints (job-delay, sch-job) and their listeners are removed. v1 NATS broker doesn't model delayed delivery — the redis ZSET-backed schedulers don't exist on the NATS side, and the broker throws UnsupportedOperationException for those calls. - application.properties selects NATS, points connection.url at a JetStream-enabled nats-server, and sets all four auto-create flags to true (the defaults; flipped explicitly for documentation). Side fix: rqueue-nats now declares api(":rqueue-web") because the NATS-shaped web service impls (NatsRqueueQDetailService etc., added in parallel work) implement interfaces from rqueue-web. Mirrors how rqueue-redis pulls rqueue-web for the same reason. Includes a README documenting how to run nats-server locally, the streams / KV buckets the app provisions, and the autoCreate*=false / pre-create flow for locked-down JetStream accounts. Assisted-By: Claude Code
1 parent 768a791 commit df9a5b7

11 files changed

Lines changed: 483 additions & 0 deletions

File tree

rqueue-nats/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ dependencies {
4545
// rqueue-spring-boot-starter (auto-config), both of which reference this module's
4646
// types via compileOnly and gate them behind @ConditionalOnClass(JetStream.class).
4747
api project(":rqueue-core")
48+
// NATS-shaped web service impls (NatsRqueueQDetailService, etc.) implement interfaces
49+
// declared in rqueue-web. Mirrors how rqueue-redis pulls rqueue-web for the same reason.
50+
api project(":rqueue-web")
4851
api "io.nats:jnats:${natsVersion}"
4952
testImplementation project(":rqueue-test-util")
5053
testImplementation "org.testcontainers:testcontainers:${testcontainersVersion}"
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# rqueue-spring-boot-nats-example
2+
3+
Spring Boot example app using **NATS / JetStream** as the Rqueue backend, mirroring
4+
[`rqueue-spring-boot-example`](../rqueue-spring-boot-example) (which uses Redis).
5+
6+
The application code is identical to the redis example modulo two small things:
7+
8+
- **Delayed and periodic enqueue endpoints are removed.** The v1 NATS broker doesn't model
9+
delayed or scheduled delivery — the redis backend's ZSET schedulers don't exist on the NATS
10+
side, and the broker throws `UnsupportedOperationException` for those calls.
11+
- **`application.properties` selects the NATS backend** via `rqueue.backend=nats` and points
12+
`rqueue.nats.connection.url` at a JetStream-enabled `nats-server`.
13+
14+
Backend selection is a property switch only — the listener / controller / domain code is
15+
unchanged from the redis example, which is the whole point of the pluggable-backend split.
16+
17+
## Running locally
18+
19+
Start a JetStream-enabled NATS server (any one of these works):
20+
21+
```sh
22+
# native binary
23+
nats-server -js
24+
25+
# docker
26+
docker run -p 4222:4222 nats:latest -js
27+
```
28+
29+
Then:
30+
31+
```sh
32+
./gradlew :rqueue-spring-boot-nats-example:bootRun
33+
```
34+
35+
Once the app is up:
36+
37+
```sh
38+
# enqueue a String to a queue (default queue is "simple-queue", from application.properties)
39+
curl 'http://localhost:8080/push?q=simple-queue&msg=hello'
40+
41+
# enqueue a Job object to job-queue (with DLQ wired to job-morgue)
42+
curl http://localhost:8080/job
43+
```
44+
45+
Watch the logs for `simple: hello` / `job-queue: Job(id=…)` from `MessageListener`.
46+
47+
## Inspecting the JetStream state
48+
49+
`nats stream ls` will show:
50+
51+
```
52+
rqueue-js-simple-queue
53+
rqueue-js-job-queue
54+
rqueue-js-job-queue-dlq
55+
rqueue-js-job-morgue
56+
```
57+
58+
(The `rqueue-js-` prefix is the default; configure via `rqueue.nats.naming.streamPrefix`.)
59+
60+
`nats kv ls` will show the six shared KV buckets used by the NATS-backed daos
61+
(`rqueue-jobs`, `rqueue-locks`, `rqueue-message-metadata`, etc.). See the README's
62+
"NATS backend" section in the repo root for the full table.
63+
64+
## Locked-down JetStream accounts
65+
66+
If your NATS account can't run `add_stream` / `kv_create` at runtime, set:
67+
68+
```properties
69+
rqueue.nats.auto-create-streams=false
70+
rqueue.nats.auto-create-consumers=false
71+
rqueue.nats.auto-create-dlq-stream=false
72+
rqueue.nats.auto-create-kv-buckets=false
73+
```
74+
75+
…and pre-create the streams + buckets per the root README. `NatsStreamValidator` /
76+
`NatsKvBucketValidator` will fail boot deterministically with the list of missing
77+
streams / buckets if any are not present.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
plugins {
2+
id "org.springframework.boot" version "${springBootVersion}"
3+
id "war"
4+
}
5+
dependencies {
6+
implementation project(":rqueue-spring-boot-starter")
7+
// rqueue-nats brings the JetStream broker, KV-backed daos, NatsStreamValidator, etc.
8+
// It's a runtime-conditional dependency on the starter side (gated by rqueue.backend=nats),
9+
// so the example pins it explicitly.
10+
implementation project(":rqueue-nats")
11+
implementation "org.springframework.boot:spring-boot-starter-web:${springBootVersion}"
12+
// jnats is api-exposed by rqueue-nats, but pinning it here keeps the build readable.
13+
implementation "io.nats:jnats:${natsVersion}"
14+
// https://mvnrepository.com/artifact/ch.qos.logback/logback-core
15+
implementation "ch.qos.logback:logback-core:${logbackVersion}"
16+
// https://mvnrepository.com/artifact/ch.qos.logback/logback-classic
17+
implementation "ch.qos.logback:logback-classic:${logbackVersion}"
18+
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor:${springBootVersion}"
19+
providedRuntime "org.springframework.boot:spring-boot-starter-tomcat:${springBootVersion}"
20+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.example;
18+
19+
import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer;
20+
import com.github.sonus21.rqueue.utils.StringUtils;
21+
import java.util.UUID;
22+
import lombok.AllArgsConstructor;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.springframework.beans.factory.annotation.Autowired;
25+
import org.springframework.web.bind.annotation.GetMapping;
26+
import org.springframework.web.bind.annotation.RequestParam;
27+
import org.springframework.web.bind.annotation.RestController;
28+
29+
/**
30+
* Same shape as the redis example's controller, with delayed / periodic endpoints removed —
31+
* the v1 NATS broker doesn't support delayed delivery or periodic enqueue and would throw
32+
* {@code UnsupportedOperationException} at runtime if those code paths were hit.
33+
*/
34+
@RestController
35+
@AllArgsConstructor(onConstructor = @__(@Autowired))
36+
@Slf4j
37+
public class Controller {
38+
39+
private final RqueueMessageEnqueuer rqueueMessageEnqueuer;
40+
41+
@GetMapping(value = "/push")
42+
public String push(String q, String msg) {
43+
String messageId = rqueueMessageEnqueuer.enqueue(q, msg);
44+
log.info("Message {}", msg);
45+
return "Message sent successfully, message id " + messageId;
46+
}
47+
48+
private String getQueue(String queue) {
49+
if (queue == null) {
50+
return "job-queue";
51+
}
52+
return queue;
53+
}
54+
55+
private Job getJob(String message) {
56+
Job job = new Job();
57+
job.setId(UUID.randomUUID().toString());
58+
if (!StringUtils.isEmpty(message)) {
59+
job.setMessage(message);
60+
} else {
61+
job.setMessage("Hi this is " + job.getId());
62+
}
63+
return job;
64+
}
65+
66+
@GetMapping("job")
67+
public String sendJobNotification(
68+
@RequestParam(required = false) String msg, @RequestParam(required = false) String q) {
69+
Job job = getJob(msg);
70+
String messageId = rqueueMessageEnqueuer.enqueue(getQueue(q), job);
71+
job.setMessage(messageId);
72+
return job.toString();
73+
}
74+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.example;
18+
19+
import lombok.Getter;
20+
import lombok.Setter;
21+
import lombok.ToString;
22+
23+
@Getter
24+
@Setter
25+
@ToString
26+
public class Job {
27+
28+
private String id;
29+
private String message;
30+
private String messageId;
31+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.example;
18+
19+
import com.github.sonus21.rqueue.annotation.RqueueListener;
20+
import com.github.sonus21.rqueue.utils.TimeoutUtils;
21+
import java.util.Random;
22+
import lombok.extern.slf4j.Slf4j;
23+
import org.springframework.beans.factory.annotation.Value;
24+
import org.springframework.stereotype.Component;
25+
26+
/**
27+
* Same listener shape as the redis example, minus the delayed-queue and scheduled-job
28+
* listeners (those rely on Redis-only ZSET-backed schedulers; the v1 NATS broker delegates
29+
* redelivery to JetStream's own {@code AckWait} timer instead).
30+
*/
31+
@Component
32+
@Slf4j
33+
public class MessageListener {
34+
35+
private static final Random random = new Random();
36+
37+
@Value("${job.fail.percentage:0}")
38+
private int percentageFailure;
39+
40+
@Value("${job.execution.interval:100}")
41+
private int jobExecutionTime;
42+
43+
protected boolean shouldFail() {
44+
if (percentageFailure == 0) {
45+
return false;
46+
}
47+
if (percentageFailure >= 100) {
48+
return true;
49+
}
50+
return random.nextInt(100) < percentageFailure;
51+
}
52+
53+
protected void execute(String msg, Object any, boolean failingEnabled) {
54+
log.info(msg, any);
55+
TimeoutUtils.sleep(random.nextInt(jobExecutionTime));
56+
if (failingEnabled && shouldFail()) {
57+
throw new IllegalArgumentException("Failing On Purpose " + any);
58+
}
59+
}
60+
61+
@RqueueListener(value = "${rqueue.simple.queue}")
62+
public void onSimpleMessage(String message) {
63+
execute("simple: {}", message, false);
64+
}
65+
66+
@RqueueListener(
67+
value = "job-queue",
68+
deadLetterQueue = "job-morgue",
69+
numRetries = "2",
70+
deadLetterQueueListenerEnabled = "false",
71+
concurrency = "10-20")
72+
public void onJobMessage(Job job) {
73+
execute("job-queue: {}", job, true);
74+
}
75+
76+
@RqueueListener(value = "job-morgue", numRetries = "1", concurrency = "1-3")
77+
public void onJobDlqMessage(Job job) {
78+
execute("job-morgue: {}", job, true);
79+
}
80+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.example;
18+
19+
import com.github.sonus21.rqueue.utils.StringUtils;
20+
import org.springframework.beans.factory.annotation.Value;
21+
import org.springframework.context.annotation.Configuration;
22+
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
23+
import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
24+
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
25+
26+
@EnableWebMvc
27+
@Configuration
28+
public class MvcConfig implements WebMvcConfigurer {
29+
30+
@Value("${rqueue.web.url.prefix:}")
31+
private String rqueueWebUrlPrefix;
32+
33+
@Override
34+
public void addResourceHandlers(ResourceHandlerRegistry registry) {
35+
if (!registry.hasMappingForPattern("/webjars/**")) {
36+
registry
37+
.addResourceHandler("/webjars/**")
38+
.addResourceLocations("classpath:/META-INF/resources/webjars/");
39+
}
40+
if (!StringUtils.isEmpty(rqueueWebUrlPrefix)) {
41+
registry
42+
.addResourceHandler(rqueueWebUrlPrefix + "/**")
43+
.addResourceLocations("classpath:/public/");
44+
}
45+
if (!registry.hasMappingForPattern("/**")) {
46+
registry.addResourceHandler("/**").addResourceLocations("classpath:/public/");
47+
}
48+
}
49+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.example;
18+
19+
import com.github.sonus21.rqueue.config.SimpleRqueueListenerContainerFactory;
20+
import com.github.sonus21.rqueue.utils.Constants;
21+
import org.springframework.beans.factory.annotation.Value;
22+
import org.springframework.boot.SpringApplication;
23+
import org.springframework.boot.autoconfigure.SpringBootApplication;
24+
import org.springframework.context.annotation.Bean;
25+
26+
/**
27+
* Mirror of the redis-backed {@code rqueue-spring-boot-example} for the NATS / JetStream backend.
28+
* Selecting the backend is a property switch only (see {@code application.properties}); the
29+
* application code is unchanged from the redis example, which is the whole point of the
30+
* pluggable-backend split.
31+
*/
32+
@SpringBootApplication
33+
public class RQueueApplication {
34+
35+
@Value("${workers.count:3}")
36+
private int workersCount;
37+
38+
public static void main(String[] args) {
39+
SpringApplication.run(RQueueApplication.class, args);
40+
}
41+
42+
@Bean
43+
public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() {
44+
SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory =
45+
new SimpleRqueueListenerContainerFactory();
46+
simpleRqueueListenerContainerFactory.setMaxNumWorkers(workersCount);
47+
simpleRqueueListenerContainerFactory.setPollingInterval(Constants.ONE_MILLI);
48+
return simpleRqueueListenerContainerFactory;
49+
}
50+
}

0 commit comments

Comments
 (0)