3434import io .nats .client .impl .Headers ;
3535import java .io .IOException ;
3636import java .time .Duration ;
37+ import java .time .Instant ;
38+ import java .time .ZoneOffset ;
39+ import java .time .format .DateTimeFormatter ;
3740import java .util .ArrayList ;
3841import java .util .Collections ;
3942import java .util .List ;
5962public class JetStreamMessageBroker implements MessageBroker , AutoCloseable {
6063
6164 private static final Logger log = Logger .getLogger (JetStreamMessageBroker .class .getName ());
62- private static final Capabilities CAPS = new Capabilities (false , false , false , false , false , false );
65+
66+ /**
67+ * JetStream publish header used by NATS >= 2.12 to hold a message until a UTC delivery time
68+ * (ADR-51). Value must be RFC 3339 UTC, e.g. {@code 2026-05-09T12:30:00Z}.
69+ */
70+ static final String HDR_NEXT_DELIVER_TIME = "Nats-Next-Deliver-Time" ;
71+
72+ /**
73+ * Enrichment header: epoch-ms at which this message was scheduled to be processed.
74+ * Written at scheduling publish time; read back in {@code enrichFromDelivery} so that
75+ * {@link RqueueMessage#getProcessAt()} can be populated without deserializing the payload.
76+ */
77+ static final String HDR_PROCESS_AT = "Rqueue-Process-At" ;
78+
79+ /**
80+ * Enrichment header: period in milliseconds for periodic messages.
81+ * Written at scheduling publish time; read back in {@code enrichFromDelivery} to restore
82+ * {@link RqueueMessage#getPeriod()} for payloads that have it as zero.
83+ */
84+ static final String HDR_PERIOD = "Rqueue-Period" ;
6385
6486 /**
6587 * Lower bound for fetch wait when the caller passes a non-positive duration. JetStream rejects
@@ -70,12 +92,18 @@ public class JetStreamMessageBroker implements MessageBroker, AutoCloseable {
7092 */
7193 private static final Duration MIN_FETCH_WAIT = Duration .ofMillis (50 );
7294
95+ /** RFC 3339 UTC formatter for the {@code Nats-Next-Deliver-Time} header value. */
96+ private static final DateTimeFormatter RFC3339_UTC =
97+ DateTimeFormatter .ofPattern ("yyyy-MM-dd'T'HH:mm:ss'Z'" ).withZone (ZoneOffset .UTC );
98+
7399 private final Connection connection ;
74100 private final JetStream js ;
75101 private final JetStreamManagement jsm ;
76102 private final RqueueNatsConfig config ;
77103 private final RqueueSerDes serdes ;
78104 private final NatsProvisioner provisioner ;
105+ private final boolean schedulingSupported ;
106+ private final Capabilities caps ;
79107
80108 /**
81109 * keyed by {@code "<consumerName>::<RqueueMessage.id>"}, value is the underlying NATS
@@ -113,6 +141,14 @@ public JetStreamMessageBroker(
113141 this .config = config ;
114142 this .serdes = serdes ;
115143 this .provisioner = provisioner ;
144+ this .schedulingSupported = provisioner != null && provisioner .isMessageSchedulingSupported ();
145+ this .caps = new Capabilities (
146+ schedulingSupported , // supportsDelayedEnqueue — requires NATS >= 2.12
147+ false , // supportsScheduledIntrospection — no inspectable scheduled-zset
148+ false , // supportsCronJobs — no server-side cron
149+ false , // usesPrimaryHandlerDispatch — no Redis processing-ZSET
150+ true , // supportsViewData — peek() reads from JetStream stream
151+ true ); // supportsMoveMessage — NatsRqueueUtilityService.moveMessage()
116152 }
117153
118154 public static Builder builder () {
@@ -251,9 +287,40 @@ public void enqueue(QueueDetail q, String priority, RqueueMessage m) {
251287
252288 @ Override
253289 public void enqueueWithDelay (QueueDetail q , RqueueMessage m , long delayMs ) {
254- throw new UnsupportedOperationException (
255- "delayed enqueue not supported by NATS backend in this version; "
256- + "use the Redis backend for scheduled messages" );
290+ if (!schedulingSupported ) {
291+ throw new RqueueNatsException (
292+ "NATS message scheduling (ADR-51) is not available: the connected server is older than "
293+ + NatsProvisioner .SCHEDULING_MIN_VERSION
294+ + ". Upgrade NATS to "
295+ + NatsProvisioner .SCHEDULING_MIN_VERSION
296+ + "+ or use the Redis backend for delayed messages." );
297+ }
298+ String subject = subjectFor (q );
299+ String stream = streamFor (q );
300+ provisioner .ensureStream (stream , List .of (subject ), q .getType (), streamDescription (q ));
301+ Headers headers = buildSchedulingHeaders (m , delayMs );
302+ try {
303+ byte [] payload = serdes .serialize (m );
304+ js .publish (subject , headers , payload );
305+ } catch (IOException | JetStreamApiException e ) {
306+ throw new RqueueNatsException (
307+ "Failed to enqueue scheduled message id="
308+ + m .getId ()
309+ + " queue="
310+ + q .getName ()
311+ + " subject="
312+ + subject ,
313+ e );
314+ } catch (RuntimeException e ) {
315+ throw new RqueueNatsException (
316+ "Failed to serialize/enqueue scheduled message id="
317+ + m .getId ()
318+ + " queue="
319+ + q .getName ()
320+ + " subject="
321+ + subject ,
322+ e );
323+ }
257324 }
258325
259326 @ Override
@@ -303,9 +370,52 @@ public Mono<Void> enqueueReactive(QueueDetail q, RqueueMessage m) {
303370
304371 @ Override
305372 public Mono <Void > enqueueWithDelayReactive (QueueDetail q , RqueueMessage m , long delayMs ) {
306- return Mono .error (new UnsupportedOperationException (
307- "delayed enqueue not supported by NATS backend in this version; "
308- + "use the Redis backend for scheduled messages" ));
373+ if (!schedulingSupported ) {
374+ return Mono .error (new RqueueNatsException (
375+ "NATS message scheduling (ADR-51) is not available: the connected server is older than "
376+ + NatsProvisioner .SCHEDULING_MIN_VERSION
377+ + ". Upgrade NATS to "
378+ + NatsProvisioner .SCHEDULING_MIN_VERSION
379+ + "+ or use the Redis backend for delayed messages." ));
380+ }
381+ String subject = subjectFor (q );
382+ String stream = streamFor (q );
383+ try {
384+ provisioner .ensureStream (stream , List .of (subject ), q .getType (), streamDescription (q ));
385+ } catch (Exception e ) {
386+ return Mono .error (new RqueueNatsException (
387+ "Failed to provision stream for reactive scheduled enqueue id="
388+ + m .getId ()
389+ + " queue="
390+ + q .getName (),
391+ e ));
392+ }
393+ Headers headers = buildSchedulingHeaders (m , delayMs );
394+ byte [] payload ;
395+ try {
396+ payload = serdes .serialize (m );
397+ } catch (RuntimeException | IOException e ) {
398+ return Mono .error (new RqueueNatsException (
399+ "Failed to serialize scheduled message id="
400+ + m .getId ()
401+ + " queue="
402+ + q .getName ()
403+ + " subject="
404+ + subject ,
405+ e ));
406+ }
407+ return Mono .fromFuture (() -> js .publishAsync (subject , headers , payload ))
408+ .onErrorMap (e -> e instanceof RqueueNatsException
409+ ? e
410+ : new RqueueNatsException (
411+ "Failed to enqueue scheduled message id="
412+ + m .getId ()
413+ + " queue="
414+ + q .getName ()
415+ + " subject="
416+ + subject ,
417+ e ))
418+ .then ();
309419 }
310420
311421 @ Override
@@ -415,13 +525,7 @@ private List<RqueueMessage> popInternal(
415525 for (Message nm : msgs ) {
416526 try {
417527 RqueueMessage rm = serdes .deserialize (nm .getData (), RqueueMessage .class );
418- // derive failure count from JetStream redelivery metadata
419- try {
420- long deliveredCount = nm .metaData ().deliveredCount ();
421- rm .setFailureCount ((int ) Math .max (0 , deliveredCount - 1 ));
422- } catch (Exception ignored ) {
423- // defensive: metadata unavailable on non-JetStream messages
424- }
528+ enrichFromDelivery (rm , nm );
425529 if (rm .getId () != null ) {
426530 inFlight .put (inFlightKey (consumerName , rm .getId ()), nm );
427531 }
@@ -456,6 +560,37 @@ public boolean ack(QueueDetail q, RqueueMessage m) {
456560 return true ;
457561 }
458562
563+ /**
564+ * Extend the visibility timeout for a message that is still being processed. Sends a NATS
565+ * WIP (work-in-progress) signal to the server, which resets the consumer's {@code ackWait}
566+ * timer back to its configured value. Call this periodically from a long-running handler to
567+ * prevent JetStream from redelivering the message to another consumer while work is in flight.
568+ *
569+ * <p>The {@code deltaMs} hint from the caller is ignored — NATS always resets to the consumer's
570+ * fixed {@code ackWait}; there is no per-message extension API in JetStream.
571+ *
572+ * @return {@code true} if the WIP signal was sent; {@code false} if the message is no longer
573+ * tracked (already acked, nacked, or the process restarted).
574+ */
575+ @ Override
576+ public boolean extendVisibilityTimeout (QueueDetail q , RqueueMessage m , long deltaMs ) {
577+ if (m .getId () == null ) {
578+ return false ;
579+ }
580+ Message nm = inFlight .get (inFlightKey (q .resolvedConsumerName (), m .getId ()));
581+ if (nm == null ) {
582+ return false ;
583+ }
584+ try {
585+ nm .inProgress ();
586+ return true ;
587+ } catch (RuntimeException e ) {
588+ log .log (Level .WARNING ,
589+ "inProgress failed for message id=" + m .getId () + " queue=" + q .getName (), e );
590+ return false ;
591+ }
592+ }
593+
459594 @ Override
460595 public boolean nack (QueueDetail q , RqueueMessage m , long retryDelayMs ) {
461596 if (m .getId () == null ) {
@@ -838,7 +973,107 @@ public void validateQueueName(String queueName) {
838973
839974 @ Override
840975 public Capabilities capabilities () {
841- return CAPS ;
976+ return caps ;
977+ }
978+
979+ /**
980+ * Headers for a scheduled (or periodic) JetStream publish.
981+ *
982+ * <p><b>Dedup key strategy — {@code Nats-Msg-Id}:</b>
983+ * <ul>
984+ * <li>Scheduled messages ({@code processAt > 0}) — key is {@code id@processAt}.
985+ * Each period of a recurring message has a unique {@code processAt} (advances by
986+ * {@code period} each time), so consecutive periods never share a key and are never
987+ * suppressed. Retries of the same period reuse the identical {@code processAt}, so
988+ * the duplicate {@code scheduleNext} publish is correctly deduplicated by JetStream —
989+ * preventing double-execution of a period when the handler fails and is redelivered.
990+ * <li>Non-scheduled messages ({@code processAt == 0}) — plain {@code m.getId()}; each
991+ * enqueue generates a fresh UUID so there is no collision risk.
992+ * </ul>
993+ *
994+ * <p><b>Enrichment headers</b> — written at publish time so they can be read back on
995+ * {@link #popInternal} without deserializing the payload:
996+ * <ul>
997+ * <li>{@code Rqueue-Process-At} — epoch-ms at which the message should be processed;
998+ * used to set {@link RqueueMessage#setProcessAt} if the deserialized payload lacks it.
999+ * <li>{@code Rqueue-Period} — period in ms for periodic messages; used to set
1000+ * {@link RqueueMessage#setPeriod} if the deserialized payload lacks it.
1001+ * </ul>
1002+ */
1003+ private Headers buildSchedulingHeaders (RqueueMessage m , long delayMs ) {
1004+ Headers headers = new Headers ();
1005+ if (m .getId () != null ) {
1006+ // Dedup key: id-at-processAt for scheduled messages (processAt > 0).
1007+ // - Each period of a recurring message has a unique processAt → unique key → no
1008+ // cross-period suppression.
1009+ // - If scheduleNext is called again for the same period on redelivery after a handler
1010+ // failure, processAt is identical → same key → JetStream deduplicates the second
1011+ // publish and the period executes exactly once.
1012+ // For non-scheduled messages (processAt == 0) the plain id is used; each enqueue
1013+ // generates a fresh UUID so there is no collision risk.
1014+ String dedupKey = m .getProcessAt () > 0
1015+ ? m .getId () + "-at-" + m .getProcessAt ()
1016+ : m .getId ();
1017+ headers .add ("Nats-Msg-Id" , dedupKey );
1018+ }
1019+ long deliverAtMs = m .getProcessAt () > 0
1020+ ? m .getProcessAt ()
1021+ : System .currentTimeMillis () + delayMs ;
1022+ String deliverAt = RFC3339_UTC .format (Instant .ofEpochMilli (deliverAtMs ));
1023+ headers .add (HDR_NEXT_DELIVER_TIME , deliverAt );
1024+ // Enrichment headers — readable on pop without deserializing the payload
1025+ headers .add (HDR_PROCESS_AT , String .valueOf (deliverAtMs ));
1026+ if (m .isPeriodic ()) {
1027+ headers .add (HDR_PERIOD , String .valueOf (m .getPeriod ()));
1028+ }
1029+ return headers ;
1030+ }
1031+
1032+ /**
1033+ * Enrich a deserialized {@link RqueueMessage} with delivery metadata from the NATS message.
1034+ *
1035+ * <p><b>Failure count</b> — derived from {@code metaData().deliveredCount() - 1} (JetStream
1036+ * tracks redeliveries in the reply-to subject). This is the authoritative source; we never
1037+ * re-publish on retry, so a static header set at publish time would always show 0.
1038+ *
1039+ * <p><b>Scheduling fields</b> — {@code processAt} and {@code period} are read from the
1040+ * enrichment headers ({@code Rqueue-Process-At}, {@code Rqueue-Period}) when the deserialized
1041+ * message has them as zero. This handles payloads published by older broker versions that
1042+ * predate per-field population, or any case where the JSON payload was trimmed.
1043+ */
1044+ private static void enrichFromDelivery (RqueueMessage rm , Message nm ) {
1045+ // Failure count from JetStream redelivery metadata (the authoritative retry counter).
1046+ try {
1047+ long deliveredCount = nm .metaData ().deliveredCount ();
1048+ rm .setFailureCount ((int ) Math .max (0 , deliveredCount - 1 ));
1049+ } catch (Exception ignored ) {
1050+ // defensive: metadata absent on non-JetStream or synthetic messages
1051+ }
1052+ // Scheduling fields from publish-time enrichment headers.
1053+ if (rm .getProcessAt () <= 0 ) {
1054+ String processAtHdr = nm .getHeaders () == null ? null
1055+ : nm .getHeaders ().getFirst (HDR_PROCESS_AT );
1056+ if (processAtHdr != null ) {
1057+ try {
1058+ rm .setProcessAt (Long .parseLong (processAtHdr ));
1059+ } catch (NumberFormatException ignored ) {
1060+ // malformed header; leave processAt as-is
1061+ }
1062+ }
1063+ }
1064+ if (!rm .isPeriodic () && nm .getHeaders () != null ) {
1065+ String periodHdr = nm .getHeaders ().getFirst (HDR_PERIOD );
1066+ if (periodHdr != null ) {
1067+ try {
1068+ long period = Long .parseLong (periodHdr );
1069+ if (period > 0 ) {
1070+ rm .setPeriod (period );
1071+ }
1072+ } catch (NumberFormatException ignored ) {
1073+ // malformed header; leave period as-is
1074+ }
1075+ }
1076+ }
8421077 }
8431078
8441079 @ Override
0 commit comments