Skip to content

Commit b009df3

Browse files
authored
Redis 8.8: Add XNACK support (#3728)
* Add XNACK support * Remove XNackParams and related methods because params are internal * Address review suggestions * Fix template
1 parent 6cd1a96 commit b009df3

15 files changed

Lines changed: 484 additions & 1 deletion

src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3114,6 +3114,16 @@ public RedisFuture<List<StreamEntryDeletionResult>> xackdel(K key, K group, Stre
31143114
return dispatch(commandBuilder.xackdel(key, group, policy, messageIds));
31153115
}
31163116

3117+
@Override
3118+
public RedisFuture<Long> xnack(K key, K group, XNackMode mode, String messageId) {
3119+
return dispatch(commandBuilder.xnack(key, group, mode, messageId));
3120+
}
3121+
3122+
@Override
3123+
public RedisFuture<Long> xnack(K key, K group, XNackMode mode, String... messageIds) {
3124+
return dispatch(commandBuilder.xnack(key, group, mode, messageIds));
3125+
}
3126+
31173127
@Override
31183128
public RedisFuture<String> xadd(K key, Map<K, V> body) {
31193129
return dispatch(commandBuilder.xadd(key, null, body));

src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3176,6 +3176,16 @@ public Flux<StreamEntryDeletionResult> xackdel(K key, K group, StreamDeletionPol
31763176
return createDissolvingFlux(() -> commandBuilder.xackdel(key, group, policy, messageIds));
31773177
}
31783178

3179+
@Override
3180+
public Mono<Long> xnack(K key, K group, XNackMode mode, String messageId) {
3181+
return createMono(() -> commandBuilder.xnack(key, group, mode, messageId));
3182+
}
3183+
3184+
@Override
3185+
public Mono<Long> xnack(K key, K group, XNackMode mode, String... messageIds) {
3186+
return createMono(() -> commandBuilder.xnack(key, group, mode, messageIds));
3187+
}
3188+
31793189
@Override
31803190
public Mono<String> xadd(K key, Map<K, V> body) {
31813191
return createMono(() -> commandBuilder.xadd(key, null, body));

src/main/java/io/lettuce/core/RedisCommandBuilder.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3269,6 +3269,36 @@ public Command<K, V, List<StreamEntryDeletionResult>> xackdel(K key, K group, St
32693269
return createCommand(XACKDEL, new StreamEntryDeletionResultListOutput<>(codec), args);
32703270
}
32713271

3272+
public Command<K, V, Long> xnack(K key, K group, XNackMode mode, String messageId) {
3273+
notNullKey(key);
3274+
LettuceAssert.notNull(group, "Group " + MUST_NOT_BE_NULL);
3275+
LettuceAssert.notNull(mode, "XNackMode " + MUST_NOT_BE_NULL);
3276+
LettuceAssert.notNull(messageId, "MessageId " + MUST_NOT_BE_NULL);
3277+
3278+
CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(key).addKey(group).add(mode).add(CommandKeyword.IDS).add(1)
3279+
.add(messageId);
3280+
3281+
return createCommand(XNACK, new IntegerOutput<>(codec), args);
3282+
}
3283+
3284+
public Command<K, V, Long> xnack(K key, K group, XNackMode mode, String[] messageIds) {
3285+
notNullKey(key);
3286+
LettuceAssert.notNull(group, "Group " + MUST_NOT_BE_NULL);
3287+
LettuceAssert.notNull(mode, "XNackMode " + MUST_NOT_BE_NULL);
3288+
LettuceAssert.notEmpty(messageIds, "MessageIds " + MUST_NOT_BE_EMPTY);
3289+
LettuceAssert.noNullElements(messageIds, "MessageIds " + MUST_NOT_CONTAIN_NULL_ELEMENTS);
3290+
3291+
CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(key).addKey(group).add(mode);
3292+
3293+
args.add(CommandKeyword.IDS).add(messageIds.length);
3294+
3295+
for (String messageId : messageIds) {
3296+
args.add(messageId);
3297+
}
3298+
3299+
return createCommand(XNACK, new IntegerOutput<>(codec), args);
3300+
}
3301+
32723302
public Command<K, V, ClaimedMessages<K, V>> xautoclaim(K key, XAutoClaimArgs<K> xAutoClaimArgs) {
32733303
notNullKey(key);
32743304
LettuceAssert.notNull(xAutoClaimArgs, "XAutoClaimArgs " + MUST_NOT_BE_NULL);
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2026, Redis Ltd. and Contributors
3+
* All rights reserved.
4+
*
5+
* Licensed under the MIT License.
6+
*/
7+
package io.lettuce.core;
8+
9+
import io.lettuce.core.protocol.ProtocolKeyword;
10+
11+
import java.nio.charset.StandardCharsets;
12+
13+
/**
14+
* Nacking mode for the {@literal XNACK} command. Determines how the delivery counter of the negatively-acknowledged messages is
15+
* adjusted in the consumer group's Pending Entries List.
16+
*
17+
* @since 7.6
18+
*/
19+
public enum XNackMode implements ProtocolKeyword {
20+
21+
/**
22+
* Used when the consumer is NACKing due to internal errors or shutdown, not because the message is problematic. The
23+
* delivery counter of each specified message is decremented by 1.
24+
*/
25+
SILENT,
26+
27+
/**
28+
* Used when the message causes problems for this consumer but may succeed for other consumers (e.g., requires more
29+
* resources than available). The delivery counter stays the same.
30+
*/
31+
FAIL,
32+
33+
/**
34+
* Used for invalid or suspected malicious messages. The delivery counter is set to {@literal LLONG_MAX}.
35+
*/
36+
FATAL;
37+
38+
public final byte[] bytes;
39+
40+
XNackMode() {
41+
bytes = name().getBytes(StandardCharsets.US_ASCII);
42+
}
43+
44+
@Override
45+
public byte[] getBytes() {
46+
return bytes;
47+
}
48+
49+
}

src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,37 @@ public interface RedisStreamAsyncCommands<K, V> {
7676
*/
7777
RedisFuture<List<StreamEntryDeletionResult>> xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds);
7878

79+
/**
80+
* Negatively acknowledge a single pending message in a consumer group, making it immediately available for reconsumption by
81+
* other consumers (using {@code XREADGROUP CLAIM}). Depending on the {@link XNackMode}, the delivery counter is adjusted to
82+
* reflect the reason for the NACK.
83+
* <p>
84+
* NACKing a single message is expected to be the dominant case; this overload avoids the array allocation incurred by the
85+
* varargs variant.
86+
*
87+
* @param key the stream key.
88+
* @param group name of the consumer group.
89+
* @param mode the nacking mode.
90+
* @param messageId message Id to negatively acknowledge.
91+
* @return the number of messages successfully NACKed.
92+
* @since 7.6
93+
*/
94+
RedisFuture<Long> xnack(K key, K group, XNackMode mode, String messageId);
95+
96+
/**
97+
* Negatively acknowledge one or more pending messages in a consumer group, making them immediately available for
98+
* reconsumption by other consumers (using {@code XREADGROUP CLAIM}). Depending on the {@link XNackMode}, the delivery
99+
* counter is adjusted to reflect the reason for the NACK.
100+
*
101+
* @param key the stream key.
102+
* @param group name of the consumer group.
103+
* @param mode the nacking mode.
104+
* @param messageIds message Id's to negatively acknowledge.
105+
* @return the number of messages successfully NACKed.
106+
* @since 7.6
107+
*/
108+
RedisFuture<Long> xnack(K key, K group, XNackMode mode, String... messageIds);
109+
79110
/**
80111
* Append a message to the stream {@code key}.
81112
*

src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,37 @@ public interface RedisStreamReactiveCommands<K, V> {
7777
*/
7878
Flux<StreamEntryDeletionResult> xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds);
7979

80+
/**
81+
* Negatively acknowledge a single pending message in a consumer group, making it immediately available for reconsumption by
82+
* other consumers (using {@code XREADGROUP CLAIM}). Depending on the {@link XNackMode}, the delivery counter is adjusted to
83+
* reflect the reason for the NACK.
84+
* <p>
85+
* NACKing a single message is expected to be the dominant case; this overload avoids the array allocation incurred by the
86+
* varargs variant.
87+
*
88+
* @param key the stream key.
89+
* @param group name of the consumer group.
90+
* @param mode the nacking mode.
91+
* @param messageId message Id to negatively acknowledge.
92+
* @return the number of messages successfully NACKed.
93+
* @since 7.6
94+
*/
95+
Mono<Long> xnack(K key, K group, XNackMode mode, String messageId);
96+
97+
/**
98+
* Negatively acknowledge one or more pending messages in a consumer group, making them immediately available for
99+
* reconsumption by other consumers (using {@code XREADGROUP CLAIM}). Depending on the {@link XNackMode}, the delivery
100+
* counter is adjusted to reflect the reason for the NACK.
101+
*
102+
* @param key the stream key.
103+
* @param group name of the consumer group.
104+
* @param mode the nacking mode.
105+
* @param messageIds message Id's to negatively acknowledge.
106+
* @return the number of messages successfully NACKed.
107+
* @since 7.6
108+
*/
109+
Mono<Long> xnack(K key, K group, XNackMode mode, String... messageIds);
110+
80111
/**
81112
* Append a message to the stream {@code key}.
82113
*

src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,37 @@ public interface RedisStreamCommands<K, V> {
7676
*/
7777
List<StreamEntryDeletionResult> xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds);
7878

79+
/**
80+
* Negatively acknowledge a single pending message in a consumer group, making it immediately available for reconsumption by
81+
* other consumers (using {@code XREADGROUP CLAIM}). Depending on the {@link XNackMode}, the delivery counter is adjusted to
82+
* reflect the reason for the NACK.
83+
* <p>
84+
* NACKing a single message is expected to be the dominant case; this overload avoids the array allocation incurred by the
85+
* varargs variant.
86+
*
87+
* @param key the stream key.
88+
* @param group name of the consumer group.
89+
* @param mode the nacking mode.
90+
* @param messageId message Id to negatively acknowledge.
91+
* @return the number of messages successfully NACKed.
92+
* @since 7.6
93+
*/
94+
Long xnack(K key, K group, XNackMode mode, String messageId);
95+
96+
/**
97+
* Negatively acknowledge one or more pending messages in a consumer group, making them immediately available for
98+
* reconsumption by other consumers (using {@code XREADGROUP CLAIM}). Depending on the {@link XNackMode}, the delivery
99+
* counter is adjusted to reflect the reason for the NACK.
100+
*
101+
* @param key the stream key.
102+
* @param group name of the consumer group.
103+
* @param mode the nacking mode.
104+
* @param messageIds message Id's to negatively acknowledge.
105+
* @return the number of messages successfully NACKed.
106+
* @since 7.6
107+
*/
108+
Long xnack(K key, K group, XNackMode mode, String... messageIds);
109+
79110
/**
80111
* Append a message to the stream {@code key}.
81112
*

src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,37 @@ public interface NodeSelectionStreamAsyncCommands<K, V> {
7676
*/
7777
AsyncExecutions<List<StreamEntryDeletionResult>> xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds);
7878

79+
/**
80+
* Negatively acknowledge a single pending message in a consumer group, making it immediately available for reconsumption by
81+
* other consumers (using {@code XREADGROUP CLAIM}). Depending on the {@link XNackMode}, the delivery counter is adjusted to
82+
* reflect the reason for the NACK.
83+
* <p>
84+
* NACKing a single message is expected to be the dominant case; this overload avoids the array allocation incurred by the
85+
* varargs variant.
86+
*
87+
* @param key the stream key.
88+
* @param group name of the consumer group.
89+
* @param mode the nacking mode.
90+
* @param messageId message Id to negatively acknowledge.
91+
* @return the number of messages successfully NACKed.
92+
* @since 7.6
93+
*/
94+
AsyncExecutions<Long> xnack(K key, K group, XNackMode mode, String messageId);
95+
96+
/**
97+
* Negatively acknowledge one or more pending messages in a consumer group, making them immediately available for
98+
* reconsumption by other consumers (using {@code XREADGROUP CLAIM}). Depending on the {@link XNackMode}, the delivery
99+
* counter is adjusted to reflect the reason for the NACK.
100+
*
101+
* @param key the stream key.
102+
* @param group name of the consumer group.
103+
* @param mode the nacking mode.
104+
* @param messageIds message Id's to negatively acknowledge.
105+
* @return the number of messages successfully NACKed.
106+
* @since 7.6
107+
*/
108+
AsyncExecutions<Long> xnack(K key, K group, XNackMode mode, String... messageIds);
109+
79110
/**
80111
* Append a message to the stream {@code key}.
81112
*

src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,37 @@ public interface NodeSelectionStreamCommands<K, V> {
7676
*/
7777
Executions<List<StreamEntryDeletionResult>> xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds);
7878

79+
/**
80+
* Negatively acknowledge a single pending message in a consumer group, making it immediately available for reconsumption by
81+
* other consumers (using {@code XREADGROUP CLAIM}). Depending on the {@link XNackMode}, the delivery counter is adjusted to
82+
* reflect the reason for the NACK.
83+
* <p>
84+
* NACKing a single message is expected to be the dominant case; this overload avoids the array allocation incurred by the
85+
* varargs variant.
86+
*
87+
* @param key the stream key.
88+
* @param group name of the consumer group.
89+
* @param mode the nacking mode.
90+
* @param messageId message Id to negatively acknowledge.
91+
* @return the number of messages successfully NACKed.
92+
* @since 7.6
93+
*/
94+
Executions<Long> xnack(K key, K group, XNackMode mode, String messageId);
95+
96+
/**
97+
* Negatively acknowledge one or more pending messages in a consumer group, making them immediately available for
98+
* reconsumption by other consumers (using {@code XREADGROUP CLAIM}). Depending on the {@link XNackMode}, the delivery
99+
* counter is adjusted to reflect the reason for the NACK.
100+
*
101+
* @param key the stream key.
102+
* @param group name of the consumer group.
103+
* @param mode the nacking mode.
104+
* @param messageIds message Id's to negatively acknowledge.
105+
* @return the number of messages successfully NACKed.
106+
* @since 7.6
107+
*/
108+
Executions<Long> xnack(K key, K group, XNackMode mode, String... messageIds);
109+
79110
/**
80111
* Append a message to the stream {@code key}.
81112
*

src/main/java/io/lettuce/core/protocol/CommandType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public enum CommandType implements ProtocolKeyword {
101101

102102
// Stream
103103

104-
XACK, XACKDEL, XADD, XAUTOCLAIM, XCFGSET, XCLAIM, XDEL, XDELEX, XGROUP, XINFO, XLEN, XPENDING, XRANGE, XREVRANGE, XREAD, XREADGROUP, XTRIM,
104+
XACK, XACKDEL, XADD, XAUTOCLAIM, XCFGSET, XCLAIM, XDEL, XDELEX, XGROUP, XINFO, XLEN, XNACK, XPENDING, XRANGE, XREVRANGE, XREAD, XREADGROUP, XTRIM,
105105

106106
// JSON
107107

0 commit comments

Comments
 (0)