|
| 1 | +# NATS backend port — task tracker |
| 2 | + |
| 3 | +Snapshot of `nats-backend` branch progress and what's left to land. Kept here so a fresh session can resume cleanly. |
| 4 | + |
| 5 | +## What's done |
| 6 | + |
| 7 | +### Phase 1 — internal SPI in `rqueue-core` |
| 8 | +- `com.github.sonus21.rqueue.core.spi` package with: |
| 9 | + - `MessageBroker` interface — `enqueue / enqueueWithDelay / pop / ack / nack / moveExpired / peek / size / subscribe / publish / capabilities` and the `default` reactive overloads (`enqueueReactive`, `enqueueWithDelayReactive`). |
| 10 | + - `Capabilities` record (`supportsDelayedEnqueue`, `supportsScheduledIntrospection`, `supportsCronJobs`, `usesPrimaryHandlerDispatch`). |
| 11 | + - `MessageBrokerFactory` + `MessageBrokerLoader` (ServiceLoader). |
| 12 | +- `RedisMessageBroker` thin delegate over the existing `RqueueMessageTemplate`. |
| 13 | +- Public-API additions only — no removals: `setMessageBroker` / `getMessageBroker` on `RqueueMessageTemplateImpl`, `SimpleRqueueListenerContainerFactory`, `RqueueMessageListenerContainer`. `RqueueMessageTemplate` interface frozen. |
| 14 | +- Existing 461+ `:rqueue-core:test` cases pass unchanged; 14 new `RedisMessageBrokerDelegationTest` cases lock the delegation contract. |
| 15 | + |
| 16 | +### Phase 2 — `rqueue-nats` module + `JetStreamMessageBroker` |
| 17 | +- New module `rqueue-nats` (broker-impl only, no Spring/Boot deps; jnats 2.25.2 as `api`). Auto-config and `@Conditional` wiring live in `rqueue-spring-boot-starter` and `rqueue-spring`, gated by `@ConditionalOnClass(io.nats.client.JetStream.class)` + `@ConditionalOnProperty(rqueue.backend=nats)`. |
| 18 | +- `JetStreamMessageBroker`: |
| 19 | + - Builder API (`builder().connection(...).jetStream(...).management(...).config(...).build()`). |
| 20 | + - `enqueue` → `js.publish(subject, headers, payload)` with `Nats-Msg-Id` header for dedup. |
| 21 | + - `enqueueWithDelay` → throws `UnsupportedOperationException` (NATS v1 doesn't support arbitrary delay). |
| 22 | + - `pop` → ensures stream + durable consumer, caches `JetStreamSubscription`, `sub.fetch(batch, wait)`, stashes raw `Message` in `inFlight` keyed by `RqueueMessage.id` for ack/nack lookup. |
| 23 | + - `ack` / `nack(delayMs)` → `Message.ack()` / `Message.nakWithDelay(...)`. |
| 24 | + - `peek` → ephemeral pull consumer with `AckPolicy.None`, fetch + unsubscribe (no perturbation of the durable's ack-pending state). |
| 25 | + - `size` → `jsm.getStreamInfo(stream).getStreamState().getMsgCount()`. |
| 26 | + - `subscribe` / `publish` → core NATS `Dispatcher`, returns `AutoCloseable` that calls `closeDispatcher`. |
| 27 | + - Reactive overrides via `js.publishAsync(...)` wrapped in `Mono.fromFuture`. |
| 28 | + - DLQ bridge: `installDeadLetterBridge(QueueDetail, consumerName)` subscribes to `$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.>` and republishes exhausted messages to the DLQ subject. |
| 29 | +- `RqueueNatsConfig` POJO + nested `StreamDefaults` / `ConsumerDefaults` for stream replication/storage/retention/dedup-window and consumer ack-wait/max-deliver/max-ack-pending. |
| 30 | +- `NatsProvisioner` (in `rqueue-nats/.../internal/`) — idempotent `ensureStream`, `ensureConsumer`, `ensureDlqStream`. Logs WARN if existing config drifts from desired (doesn't mutate). |
| 31 | +- `JetStreamMessageBrokerFactory` + `META-INF/services/com.github.sonus21.rqueue.core.spi.MessageBrokerFactory` for ServiceLoader discovery (`name() == "nats"`). |
| 32 | +- `RqueueNatsException` (RuntimeException) wraps `IOException` / `JetStreamApiException` with stream/subject/consumer context in the message. |
| 33 | +- 9 Docker-gated ITs in `rqueue-nats/src/test/`: `EnqueueAck`, `Retry+DLQ`, `CompetingConsumers`, `IndependentConsumers`, `Dedup`, `Peek`, `PubSub`, `ReactiveEnqueue`, `DelayThrows`. All pass against `nats:2.10-alpine -js` via Testcontainers. |
| 34 | + |
| 35 | +### Phase 3 — Spring/Boot wiring + listener-container branch |
| 36 | +- `RqueueNatsAutoConfig` (Boot) registered via `META-INF/spring/...AutoConfiguration.imports`, gated `@ConditionalOnClass(JetStream.class) + @ConditionalOnProperty(rqueue.backend=nats)`. Provides `Connection`, `JetStream`, `JetStreamManagement`, `MessageBroker`, `RqueueQueueMetricsProvider` beans. |
| 37 | +- `RqueueNatsListenerConfig` (non-Boot) activated by `NatsBackendCondition`. `@EnableRqueue(backend=NATS)` opts in via the new `Backend` enum. |
| 38 | +- `RqueueListenerAutoConfig`'s default Redis broker uses `@ConditionalOnMissingBean(MessageBroker.class)`; the NATS bean wins when present. |
| 39 | +- `@RqueueListener.consumerName()` attribute (additive). `ConsumerNameResolver` resolves: `consumerName` if set, else `"rqueue-" + queue + "-" + bean + "_" + method` with everything outside `[A-Za-z0-9_-]` collapsed to `_` (NATS durable-name constraint). |
| 40 | +- `RqueueMessageHandler` skips primary-validation and logs one boot WARN listing `@RqueueHandler` annotated methods when capability says no primary dispatch. |
| 41 | +- Cross-handler validation: `(queue, consumerName)` collisions fail boot fast. |
| 42 | + |
| 43 | +### Phase 3.5 — runtime path |
| 44 | +- `BrokerMessagePoller` in `rqueue-core/listener/`. One thread per `(queue, consumerName, priority)` triple per `@RqueueListener.concurrency.max`. Loop: `broker.pop` → deserialize via `MessageConverter` → reflection-invoke the bound `HandlerMethod` (calls `createWithResolvedBean()` so bean-name lookup works) → `broker.ack` / `broker.nack(delayMs)` with `TaskExecutionBackOff`. |
| 45 | +- `RqueueMessageListenerContainer` branches on `messageBroker != null && !capabilities.usesPrimaryHandlerDispatch()`: |
| 46 | + - `startBrokerPollers()` enumerates active queues + handler methods, resolves consumer names, spawns pollers. |
| 47 | + - `MessageScheduler` not started; `RqueueMessageHandler` primary loop bypassed. |
| 48 | + - `doStop()` signals every poller; `doDestroy()` calls `broker.close()` if `AutoCloseable`. |
| 49 | +- `BaseMessageSender.enqueue` routes through `MessageBroker.enqueue` when the active broker has `!usesPrimaryHandlerDispatch`. `storeMessageMetadata` short-circuits on the same flag. |
| 50 | +- `RqueueListenerAutoConfig.rqueueMessageTemplate` propagates the autowired `MessageBroker` onto the template bean — without that, `BaseMessageSender#enqueue` would silently fall back to the Redis publish path. |
| 51 | +- `SimpleRqueueListenerContainerFactory.createMessageListenerContainer()` skips the `redisConnectionFactory != null` assertion when a non-Redis broker is wired. |
| 52 | + |
| 53 | +### Phase 4 — dashboard + `QueueDetail` NATS fields |
| 54 | +- `QueueDetail` adds nullable NATS fields with `resolved*` helpers: `natsStream`, `natsSubject`, `natsDlqStream`, `natsDlqSubject`, `natsAckWaitOverride`, `natsMaxDeliverOverride`, `natsDedupWindow`. Defaults derived from `queueName` when null. |
| 55 | +- `RqueueQDetailServiceImpl` routes `size` / `peek` through `MessageBroker` when set; falls back to existing Redis path otherwise. |
| 56 | +- `DataViewResponse` adds `hideScheduledPanel` / `hideCronJobs` flags. Pebble template `base.html` hides the "Scheduled" sidebar entry when the flag is set. |
| 57 | +- Dashboard chain (`RqueueRestController`, `RqueueDashboardChartServiceImpl`, etc.) gated `@Conditional(RedisBackendCondition)`; on NATS the dashboard reports broker-derived sizes only. |
| 58 | + |
| 59 | +### Cross-phase summary |
| 60 | +- Pluggable selection via `rqueue.backend=redis|nats` (default `redis`) and classpath presence. |
| 61 | +- `RqueueConfig` carries the active `Backend` enum; downstream beans branch on that instead of probing the classpath. |
| 62 | +- `Backend.AUTO` removed; `@EnableRqueue.backend()` defaults to `REDIS`. |
| 63 | + |
| 64 | +### Backend wiring split |
| 65 | +- New module `rqueue-redis` with the Redis-shaped impls (DAOs, lock manager, KV-shaped beans). |
| 66 | +- `Backend` enum + `RedisBackendCondition` / `NatsBackendCondition` in `rqueue-core`. |
| 67 | +- `RqueueConfig.backend` field (default `REDIS`) bound from the `rqueue.backend` property; `Backend.AUTO` removed. |
| 68 | +- `RqueueListenerBaseConfig.rqueueConfig(...)` factory tolerates a missing `RedisConnectionFactory`. |
| 69 | +- `RqueueRedisTemplate` and `RqueueMessageTemplateImpl` constructors tolerate null Redis connection factory (NATS path constructs them for type satisfaction but never invokes Redis ops on them). |
| 70 | +- `SimpleRqueueListenerContainerFactory` skips its `redisConnectionFactory != null` assertion when a non-Redis broker is set. |
| 71 | +- `BaseMessageSender` routes producer enqueue through `MessageBroker.enqueue` when broker has `!usesPrimaryHandlerDispatch`; `storeMessageMetadata` short-circuits on the same flag. |
| 72 | +- `RqueueQueueMetricsProvider` is the new backend-agnostic interface for queue-depth gauges; `RedisRqueueQueueMetricsProvider` (rqueue-redis) and `NatsRqueueQueueMetricsProvider` (rqueue-nats) supply impls. `RqueueMetrics` now reads through this provider, decoupled from `RqueueStringDao`. |
| 73 | + |
| 74 | +### NATS-native impls (KV-backed) |
| 75 | +- `NatsRqueueLockManager` — KV bucket `rqueue-locks`, atomic create/release with revisioned delete, 6 ITs. |
| 76 | +- `NatsRqueueSystemConfigDao` — KV bucket `rqueue-queue-config`, in-process cache, 6 ITs. |
| 77 | +- `NatsRqueueJobDao` — KV bucket `rqueue-jobs`, scan-by-message-id, 7 ITs. |
| 78 | +- `NatsRqueueMessageMetadataService` — KV bucket `rqueue-message-metadata`, 8 ITs. |
| 79 | +- `NatsRqueueUtilityService` — admin-only stub returning "not supported" responses. |
| 80 | +- `NatsRqueueStringDao` deleted — no consumer on the NATS path needs it. |
| 81 | + |
| 82 | +### Bean-graph cleanup |
| 83 | +- `RqueueStringDao` consumers (`RqueueLockManagerImpl`, `RqueueJobDaoImpl`, `RqueueMessageMetadataServiceImpl`, `RqueueSystemManagerServiceImpl`, `RqueueUtilityServiceImpl`, `RqueueMetrics`'s old size getter) all gated `@Conditional(RedisBackendCondition.class)` or refactored to use `RqueueQueueMetricsProvider`. |
| 84 | +- `RqueueStringDao` is now strictly internal to the Redis backend; no NATS-path bean autowires it. |
| 85 | +- `BaseMessageSender`, `RqueueMessageManagerImpl`, `RqueueEndpointManagerImpl`, `RqueueBeanProvider` reverted to plain `@Autowired` (no more `required=false` shotgun) — every required interface has either a Redis impl or a `NatsRqueueXxx` impl. |
| 86 | + |
| 87 | +### CI |
| 88 | +- `nats_integration_test` job in `.github/workflows/java-ci.yaml` installs nats-server v2.10.22 binary directly (no Docker), sets `NATS_RUNNING=true` + `NATS_URL`, mirrors the `redis_cluster_test` pattern. |
| 89 | +- `AbstractNatsBootIT` and `AbstractJetStreamIT` honor `NATS_RUNNING` (CI path) and fall back to Testcontainers (local Docker path). |
| 90 | +- Tests are tagged `@Tag("nats")` via `NatsIntegrationTest` / `NatsUnitTest` meta-annotations. |
| 91 | + |
| 92 | +### Module moves to `rqueue-redis` |
| 93 | +- DAO impls: `RqueueStringDaoImpl`, `RqueueJobDaoImpl`, `RqueueMessageMetadataDaoImpl`, `RqueueQStatsDaoImpl`, `RqueueSystemConfigDaoImpl` → `rqueue-redis/src/main/java/com/github/sonus21/rqueue/redis/dao/`. |
| 94 | +- Metrics: `RedisRqueueQueueMetricsProvider` → `rqueue-redis/.../redis/metrics/`. |
| 95 | +- 5 `@Bean` factories from `RqueueListenerBaseConfig` → `RqueueRedisListenerConfig`: `rqueueRedisLongTemplate`, `rqueueRedisListenerContainerFactory`, `stringRqueueRedisTemplate`, `rqueueInternalPubSubChannel`, `rqueueStringDao`. |
| 96 | +- 5 more `@Bean` factories moved most recently: `scheduledMessageScheduler`, `processingMessageScheduler`, `rqueueWorkerRegistry`, `rqueueLockManager`, `rqueueQueueMetrics`. |
| 97 | +- 6 service impls (in flight, see Pending): `RqueueDashboardChartServiceImpl`, `RqueueJobServiceImpl`, `RqueueMessageMetadataServiceImpl`, `RqueueQDetailServiceImpl`, `RqueueSystemManagerServiceImpl`, `RqueueUtilityServiceImpl` — moved to `rqueue-redis/.../redis/web/service/impl/`. |
| 98 | + |
| 99 | +### CI & PR |
| 100 | +- Branch `nats-backend` pushed to `origin`. ~50 commits, all carry `Assisted-By: Claude Code` only (no `Co-Authored-By:`). `CLAUDE.md` documents the rule. |
| 101 | + |
| 102 | +## Pending items |
| 103 | + |
| 104 | +### In flight — finish service impl move (slice B of `@Conditional` cleanup) |
| 105 | + |
| 106 | +The 6 `*ServiceImpl` files have moved to `rqueue-redis`. Their unit tests have been moved alongside. **Compilation in `rqueue-redis:test` is failing** because the moved tests depend on `CoreUnitTest` (annotation) and `QueueStatisticsTest` (fixture data) which still live in `rqueue-core/src/test`. Two paths: |
| 107 | + |
| 108 | +1. Promote `CoreUnitTest` + `QueueStatisticsTest` to `rqueue-test-util/src/main` so they're visible across modules. Smallest change. |
| 109 | +2. Add Gradle `java-test-fixtures` to `rqueue-core` and pull from `rqueue-redis` via `testFixtures(project(":rqueue-core"))`. More canonical Gradle but bigger setup. |
| 110 | + |
| 111 | +Pick **option 1** for simplicity. Move: |
| 112 | +- `rqueue-core/src/test/java/com/github/sonus21/rqueue/CoreUnitTest.java` → `rqueue-test-util/src/main/java/com/github/sonus21/rqueue/CoreUnitTest.java` |
| 113 | +- `rqueue-core/src/test/java/com/github/sonus21/rqueue/models/db/QueueStatisticsTest.java` → split into a fixture helper in `rqueue-test-util` and a thin test that re-exercises it in core (or just keep both copies during transition). |
| 114 | + |
| 115 | +Then re-run `./gradlew :rqueue-core:test :rqueue-redis:test :rqueue-nats:test -DincludeTags=unit` and `./gradlew :rqueue-spring-boot-starter:test --tests NatsBackendEndToEndIT`. |
| 116 | + |
| 117 | +### Other open follow-ups |
| 118 | + |
| 119 | +- **`RqueueStringDao` interface** itself — analysis recommended keeping it Redis-only (don't split into smaller cross-backend interfaces). It already has zero NATS-path consumers; just document it as Redis-internal in javadoc. |
| 120 | +- **`RqueueMessageMetadataDao`, `RqueueQStatsDao`** — no NATS impls needed; all consumers are Redis-only gated. Verified, no action. |
| 121 | +- **Reactive listener container** — only enqueue side is reactive in v1. Phase 5 territory. |
| 122 | +- **Delayed/scheduled/cron messages on NATS** — throws `UnsupportedOperationException`. Out of scope for v1. |
| 123 | +- **Cross-queue `priorityGroup` weighting on NATS** — boot WARN, not honored. Acceptable for v1. |
| 124 | +- **Elastic `@RqueueListener.concurrency` (min < max)** — falls back to fixed `max` on NATS. Acceptable. |
| 125 | +- **`@RqueueHandler(primary)` on NATS** — ignored, single boot WARN. |
| 126 | +- **PR open on `sonus21/rqueue:nats-backend`** — branch pushed; user opened the PR through the GitHub UI. |
| 127 | + |
| 128 | +## Local verification commands |
| 129 | + |
| 130 | +``` |
| 131 | +./gradlew :rqueue-core:test :rqueue-redis:test :rqueue-nats:test -DincludeTags=unit |
| 132 | +./gradlew :rqueue-spring-boot-starter:test --tests "com.github.sonus21.rqueue.spring.boot.integration.NatsBackendEndToEndIT" |
| 133 | +./gradlew :rqueue-nats:test --tests "com.github.sonus21.rqueue.nats.lock.NatsRqueueLockManagerIT" |
| 134 | +./gradlew :rqueue-nats:test --tests "com.github.sonus21.rqueue.nats.dao.NatsRqueueSystemConfigDaoIT" |
| 135 | +./gradlew :rqueue-nats:test --tests "com.github.sonus21.rqueue.nats.dao.NatsRqueueJobDaoIT" |
| 136 | +./gradlew :rqueue-nats:test --tests "com.github.sonus21.rqueue.nats.service.NatsRqueueMessageMetadataServiceIT" |
| 137 | +``` |
| 138 | + |
| 139 | +## Commit-rule reminder |
| 140 | + |
| 141 | +`CLAUDE.md` at the repo root forbids `Co-Authored-By:` for any AI tool. Use `Assisted-By: Claude Code` as a single trailer per commit. The trailer rewrite has already been applied to historical commits; new commits just need the right form. |
0 commit comments