Skip to content

Commit 5026db6

Browse files
authored
feat(client): add XNACK command with options and tests (#3238)
1 parent 5f4331c commit 5026db6

3 files changed

Lines changed: 155 additions & 0 deletions

File tree

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import { strict as assert } from 'node:assert';
2+
import XNACK from './XNACK';
3+
import { parseArgs } from './generic-transformers';
4+
import testUtils, { GLOBAL } from '../test-utils';
5+
6+
describe('XNACK', () => {
7+
describe('transformArguments', () => {
8+
it('string - SILENT', () => {
9+
assert.deepEqual(
10+
parseArgs(XNACK, 'key', 'group', 'SILENT', '0-0'),
11+
['XNACK', 'key', 'group', 'SILENT', 'IDS', '1', '0-0']
12+
);
13+
});
14+
15+
it('array - FAIL', () => {
16+
assert.deepEqual(
17+
parseArgs(XNACK, 'key', 'group', 'FAIL', ['0-0', '1-0']),
18+
['XNACK', 'key', 'group', 'FAIL', 'IDS', '2', '0-0', '1-0']
19+
);
20+
});
21+
22+
it('array - FATAL', () => {
23+
assert.deepEqual(
24+
parseArgs(XNACK, 'key', 'group', 'FATAL', ['0-0', '1-0', '2-0']),
25+
['XNACK', 'key', 'group', 'FATAL', 'IDS', '3', '0-0', '1-0', '2-0']
26+
);
27+
});
28+
29+
it('with RETRYCOUNT', () => {
30+
assert.deepEqual(
31+
parseArgs(XNACK, 'key', 'group', 'FAIL', '0-0', {
32+
RETRYCOUNT: 7
33+
}),
34+
['XNACK', 'key', 'group', 'FAIL', 'IDS', '1', '0-0', 'RETRYCOUNT', '7']
35+
);
36+
});
37+
38+
it('with FORCE', () => {
39+
assert.deepEqual(
40+
parseArgs(XNACK, 'key', 'group', 'FAIL', ['0-0', '1-0'], {
41+
FORCE: true
42+
}),
43+
['XNACK', 'key', 'group', 'FAIL', 'IDS', '2', '0-0', '1-0', 'FORCE']
44+
);
45+
});
46+
47+
it('with RETRYCOUNT and FORCE', () => {
48+
assert.deepEqual(
49+
parseArgs(XNACK, 'key', 'group', 'FAIL', ['0-0', '1-0'], {
50+
RETRYCOUNT: 3,
51+
FORCE: true
52+
}),
53+
['XNACK', 'key', 'group', 'FAIL', 'IDS', '2', '0-0', '1-0', 'RETRYCOUNT', '3', 'FORCE']
54+
);
55+
});
56+
});
57+
58+
testUtils.testWithClient('xNack', async client => {
59+
60+
const key = `xnack:tmp:${Date.now()}`;
61+
const group = 'group';
62+
const consumer = 'consumer-1';
63+
64+
await client.del(key);
65+
await client.xGroupCreate(key, group, '0', { MKSTREAM: true });
66+
67+
const id1 = await client.xAdd(key, '*', { field: '1' });
68+
const id2 = await client.xAdd(key, '*', { field: '2' });
69+
const id3 = await client.xAdd(key, '*', { field: '3' });
70+
const id4 = await client.xAdd(key, '*', { field: '4' });
71+
const id5 = await client.xAdd(key, '*', { field: '5' });
72+
const id6 = await client.xAdd(key, '*', { field: '6' });
73+
74+
await client.xReadGroup(group, consumer, {
75+
key,
76+
id: '>'
77+
});
78+
79+
const reply = await client.xNack(key, group, 'FAIL', [id1, id2]);
80+
const replyWithRetryCount = await client.xNack(key, group, 'FAIL', id3, {
81+
RETRYCOUNT: 7
82+
});
83+
const replyWithForce = await client.xNack(key, group, 'FAIL', id4, {
84+
FORCE: true
85+
});
86+
const replyWithRetryCountAndForce = await client.xNack(key, group, 'FAIL', [id5, id6], {
87+
RETRYCOUNT: 3,
88+
FORCE: true
89+
});
90+
91+
assert.equal(reply, 2);
92+
assert.equal(replyWithRetryCount, 1);
93+
assert.equal(replyWithForce, 1);
94+
assert.equal(replyWithRetryCountAndForce, 2);
95+
96+
assert.equal(typeof reply, 'number');
97+
assert.equal(typeof replyWithRetryCount, 'number');
98+
assert.equal(typeof replyWithForce, 'number');
99+
assert.equal(typeof replyWithRetryCountAndForce, 'number');
100+
}, {
101+
...GLOBAL.SERVERS.OPEN,
102+
minimumDockerVersion: [8, 8]
103+
});
104+
105+
});
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { CommandParser } from '../client/parser';
2+
import { Command, NumberReply, RedisArgument } from '../RESP/types';
3+
import { RedisVariadicArgument } from './generic-transformers';
4+
5+
export type XNackMode = 'SILENT' | 'FAIL' | 'FATAL';
6+
export interface XNackOptions {
7+
RETRYCOUNT?: number;
8+
FORCE?: boolean;
9+
}
10+
11+
export default {
12+
IS_READ_ONLY: false,
13+
/**
14+
* Constructs the XNACK command to negatively acknowledge one or more pending stream entries.
15+
* Added since Redis 8.8.
16+
*
17+
* @param parser - The command parser
18+
* @param key - The stream key
19+
* @param group - The consumer group name
20+
* @param mode - NACK mode: SILENT, FAIL, or FATAL
21+
* @param id - One or more message IDs to nack
22+
* @param options - Additional options for retry count and force handling
23+
* @see https://redis.io/commands/xnack/
24+
*/
25+
parseCommand(
26+
parser: CommandParser,
27+
key: RedisArgument,
28+
group: RedisArgument,
29+
mode: XNackMode,
30+
id: RedisVariadicArgument,
31+
options?: XNackOptions
32+
) {
33+
parser.push('XNACK');
34+
parser.pushKey(key);
35+
parser.push(group, mode, 'IDS');
36+
parser.pushVariadicWithLength(id);
37+
38+
if (options?.RETRYCOUNT !== undefined) {
39+
parser.push('RETRYCOUNT', options.RETRYCOUNT.toString());
40+
}
41+
42+
if (options?.FORCE) {
43+
parser.push('FORCE');
44+
}
45+
},
46+
transformReply: undefined as unknown as () => NumberReply
47+
} as const satisfies Command;

packages/client/lib/commands/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ import XINFO_CONSUMERS from './XINFO_CONSUMERS';
308308
import XINFO_GROUPS from './XINFO_GROUPS';
309309
import XINFO_STREAM from './XINFO_STREAM';
310310
import XLEN from './XLEN';
311+
import XNACK from './XNACK';
311312
import XPENDING_RANGE from './XPENDING_RANGE';
312313
import XPENDING from './XPENDING';
313314
import XRANGE from './XRANGE';
@@ -1006,6 +1007,8 @@ export default {
10061007
xInfoStream: XINFO_STREAM,
10071008
XLEN,
10081009
xLen: XLEN,
1010+
XNACK,
1011+
xNack: XNACK,
10091012
XPENDING_RANGE,
10101013
xPendingRange: XPENDING_RANGE,
10111014
XPENDING,

0 commit comments

Comments
 (0)