Skip to content

Commit cafdc8b

Browse files
committed
Move Redis-only impls from rqueue-core to rqueue-redis
Continues the rqueue-core/rqueue-redis split. Anything that's intrinsically Redis-shaped (Lua scripts, ZSET pollers, RedisTemplate-bound DAOs) moves to rqueue-redis; anything backend-agnostic (registry orchestration, metrics SPI) stays in core and gains a backend-shaped store interface. Moved to rqueue-redis under com.github.sonus21.rqueue.redis.* : - core/MessageScheduler (abstract base) - core/ScheduledQueueMessageScheduler (delayed -> ready ZSET poller) - core/ProcessingQueueMessageScheduler (ack-window expiry poller) - core/RedisScheduleTriggerHandler (pub/sub trigger helper) - common/impl/RqueueLockManagerImpl (Lua-script-backed distributed lock) Tests for these moved alongside; only added imports for things that were package-local before the move and switched the package declarations. RqueueQueueMetrics (the Redis-only convenience class) is deleted — its callers (SpringTestBase, MetricTest) now go through RqueueQueueMetricsProvider, which is the existing backend-agnostic SPI. The Provider gains three default-method overloads taking (queueName, priority); RedisRqueueQueueMetricsProvider overrides them to read priority sub-queues, NATS keeps the default (no priority sub-queues there). Both providers now swallow QueueDoesNotExist and return 0 so callers can use the values directly as gauge readings. Worker registry split (parallel change kept in this commit): the impl is backend-agnostic and lives in rqueue-core; storage moves behind a new WorkerRegistryStore SPI with Redis (RedisWorkerRegistryStore) and NATS JetStream KV (NatsWorkerRegistryStore) implementations. The stale RqueueWorkerRegistryImplTest (written against the old single-arg constructor) is removed; coverage will come back via store-shaped tests. RqueueRedisListenerConfig is updated to wire the new Redis impls and the RedisWorkerRegistryStore. RqueueNatsAutoConfig wires the NATS counterpart. All 490 unit tests across rqueue-core (368), rqueue-redis (93), and rqueue-nats (29) pass. Assisted-By: Claude Code
1 parent 4f2fec9 commit cafdc8b

28 files changed

Lines changed: 708 additions & 532 deletions

File tree

README.md

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,99 @@ Micrometer based dashboard for queue
267267

268268
---
269269

270+
## NATS backend
271+
272+
Rqueue can use NATS JetStream as the message broker instead of Redis by setting
273+
`rqueue.backend=nats` and including the `rqueue-nats` module on the classpath. JetStream streams
274+
(one per queue) are provisioned by `JetStreamMessageBrokerFactory` / `NatsProvisioner`. State
275+
that Redis stores in keys, hashes, and sorted-sets is mapped onto JetStream **KV buckets**
276+
one bucket per concern. All buckets use the default replicas / storage settings of the JetStream
277+
account unless noted; per-entry TTL relies on the bucket's `ttl` (NATS' name for `maxAge`),
278+
which is set once at bucket creation.
279+
280+
| Bucket name | Purpose | TTL behaviour | Created in |
281+
|------------------------------|-------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
282+
| `rqueue-queue-config` | Per-queue `QueueConfig` records (registered queues, DLQ wiring, flags). | No TTL. Entries persist until explicitly overwritten. | [`NatsRqueueSystemConfigDao`](rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueSystemConfigDao.java) (`@Conditional(NatsBackendCondition)`) |
283+
| `rqueue-jobs` | `RqueueJob` execution history per message id. | TTL captured from the first `createJob`/`save` call's `expiry` argument; bucket-level so it applies uniformly. | [`NatsRqueueJobDao`](rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueJobDao.java) |
284+
| `rqueue-locks` | Distributed locks (scheduler leadership, message-level locks). | TTL captured from the first `acquireLock` call's `duration` argument. | [`NatsRqueueLockManager`](rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/lock/NatsRqueueLockManager.java) |
285+
| `rqueue-message-metadata` | Per-message metadata (delivery status, retry count, dead-letter flags). | No TTL at the bucket. Per-write `ttl` arguments are ignored on this v1 impl. | [`NatsRqueueMessageMetadataService`](rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java) |
286+
| `rqueue-workers` | Worker process info (host, pid, version, last-seen). | TTL = `rqueue.workerRegistry.workerTtl` (captured on first heartbeat). | [`NatsWorkerRegistryStore`](rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/worker/NatsWorkerRegistryStore.java) |
287+
| `rqueue-worker-heartbeats` | Per-(queue, worker) heartbeats. Keys flattened as `<queue>__<worker>`. | TTL = `rqueue.workerRegistry.queueTtl` (captured on first refresh; falls back to 1 h if registry not enabled). | [`NatsWorkerRegistryStore`](rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/worker/NatsWorkerRegistryStore.java) |
288+
289+
### How buckets are configured
290+
291+
- **Lazy, code-driven creation.** Each store / dao calls `kvm.create(KeyValueConfiguration...)`
292+
the first time it is touched after startup. There is no `application.yml` switch to disable
293+
this, and there is no provisioning step you need to run by hand — but the JetStream account
294+
used by your `Connection` bean must have permission to create KV buckets (i.e. JetStream must
295+
be enabled and account limits must allow it).
296+
- **TTL is fixed at bucket creation.** All buckets that take a `ttl` snapshot the value at
297+
creation. Changing the corresponding rqueue property after the bucket exists has no effect
298+
until the bucket is deleted out-of-band and recreated. This matches NATS KV semantics — the
299+
bucket's `maxAge` is immutable.
300+
- **No bucket per queue.** All queues share the same buckets above; per-queue scoping is done
301+
via the key prefix (`rqueue.workerRegistry.queueKey(queueName)`, etc.).
302+
- **Connection wiring.** The `io.nats.client.Connection` bean comes from
303+
[`RqueueNatsAutoConfig`](rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueNatsAutoConfig.java)
304+
(Spring Boot) when `rqueue.backend=nats` and `io.nats.client.JetStream` is on the classpath.
305+
All KV stores receive that same `Connection` and call `connection.keyValueManagement()` /
306+
`connection.keyValue(name)` against it.
307+
308+
### Pre-creating buckets (restricted JetStream accounts)
309+
310+
In managed or locked-down JetStream deployments the credentials your application uses may not
311+
have permission to create KV buckets at runtime. In that case the lazy `kvm.create(...)` call
312+
on first use will fail with `JetStreamApiException` ("permission violation" or "stream not
313+
found"), and depending on the call site the failure may be logged and swallowed (registry,
314+
metadata) or surface as a missing record.
315+
316+
To run against such a NATS, **pre-create every bucket below before starting the application**.
317+
The commands assume the [`nats` CLI](https://docs.nats.io/using-nats/nats-tools/nats_cli) is
318+
configured against the same account and creds your application uses. Substitute your own
319+
values for replicas, storage, and TTL; the values shown match the defaults Rqueue would use if
320+
it created the bucket itself.
321+
322+
```bash
323+
# State that must persist (no TTL).
324+
nats kv add rqueue-queue-config --replicas=3 --storage=file
325+
nats kv add rqueue-message-metadata --replicas=3 --storage=file
326+
327+
# Job history. Use the same value as rqueue.job.durability (default 7 days).
328+
nats kv add rqueue-jobs --replicas=3 --storage=file --ttl=7d
329+
330+
# Distributed locks. Use a value at least as large as your longest expected lock hold.
331+
nats kv add rqueue-locks --replicas=3 --storage=file --ttl=10m
332+
333+
# Worker registry. Match rqueue.workerRegistry.workerTtl / queueTtl exactly.
334+
nats kv add rqueue-workers --replicas=3 --storage=file --ttl=5m
335+
nats kv add rqueue-worker-heartbeats --replicas=3 --storage=file --ttl=10m
336+
```
337+
338+
Once the buckets exist, Rqueue's lazy initialiser short-circuits — `kvm.getStatus(name)` returns
339+
non-null and the existing bucket is opened, no `create` call is made. The application
340+
credentials only need read/write on the buckets, not management privileges.
341+
342+
> Today there is no `rqueue.nats.autoCreateKvBuckets` flag to fail fast at startup if a bucket
343+
> is missing — the failure is observed lazily on first use. If you want stricter validation,
344+
> file an issue / PR; the hook point is the `ensureBucket(...)` method in each store and dao.
345+
346+
### Re-creating a bucket with new settings
347+
348+
If you need to change a bucket's TTL or replication settings after deployment, delete the
349+
bucket via the NATS CLI and either let Rqueue recreate it on the next startup (open accounts)
350+
or recreate it yourself with the new flags (restricted accounts):
351+
352+
```bash
353+
nats kv del rqueue-worker-heartbeats --force
354+
nats kv add rqueue-worker-heartbeats --replicas=3 --storage=file --ttl=20m
355+
```
356+
357+
Be aware that any data in the bucket is lost (which is acceptable for the worker registry and
358+
locks, but **not** for `rqueue-queue-config` — back it up first if you have configured queues
359+
through the dashboard).
360+
361+
---
362+
270363
## Status
271364

272365
Rqueue is stable and production ready, processing millions of messages daily in production

rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueQueueMetrics.java

Lines changed: 0 additions & 131 deletions
This file was deleted.

rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueQueueMetricsProvider.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,23 @@ public interface RqueueQueueMetricsProvider {
5353
* {@code 0} when no DLQ is configured for the queue or the backend does not surface DLQ depth.
5454
*/
5555
long getDeadLetterMessageCount(String queueName);
56+
57+
/**
58+
* Priority-aware variant of {@link #getPendingMessageCount(String)}. The default implementation
59+
* ignores priority and returns the parent queue depth, which is the right behaviour for backends
60+
* that don't model per-priority sub-queues.
61+
*/
62+
default long getPendingMessageCount(String queueName, String priority) {
63+
return getPendingMessageCount(queueName);
64+
}
65+
66+
/** Priority-aware variant of {@link #getScheduledMessageCount(String)}. */
67+
default long getScheduledMessageCount(String queueName, String priority) {
68+
return getScheduledMessageCount(queueName);
69+
}
70+
71+
/** Priority-aware variant of {@link #getProcessingMessageCount(String)}. */
72+
default long getProcessingMessageCount(String queueName, String priority) {
73+
return getProcessingMessageCount(queueName);
74+
}
5675
}

0 commit comments

Comments
 (0)