Skip to content

Commit 2d3f747

Browse files
committed
add stream delete mode to minid api
add xackdel
1 parent 606e30f commit 2d3f747

14 files changed

Lines changed: 355 additions & 43 deletions

File tree

StackExchange.Redis.sln

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
1919
docs\ReleaseNotes.md = docs\ReleaseNotes.md
2020
Shared.ruleset = Shared.ruleset
2121
version.json = version.json
22+
tests\RedisConfigs\docker-compose.yml = tests\RedisConfigs\docker-compose.yml
2223
EndProjectSection
2324
EndProject
2425
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "RedisConfigs", "RedisConfigs", "{96E891CD-2ED7-4293-A7AB-4C6F5D8D2B05}"

docs/ReleaseNotes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Current package versions:
1111
- Add support for new `BITOP` operations in CE 8.2 ([#2900 by atakavci](https://github.com/StackExchange/StackExchange.Redis/pull/2900))
1212
- Package updates ([#2906 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2906))
1313
- Fix handshake error with `CLIENT ID` ([#2909 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2909))
14+
- Add 8.2 stream commands ([#2842 by kijanawoodard](https://github.com/StackExchange/StackExchange.Redis/pull/2842), by [#xxxx by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/xxxx))
1415

1516
## 2.8.41
1617

src/StackExchange.Redis/Enums/RedisCommand.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,10 +206,12 @@ internal enum RedisCommand
206206
WATCH,
207207

208208
XACK,
209+
XACKDEL,
209210
XADD,
210211
XAUTOCLAIM,
211212
XCLAIM,
212213
XDEL,
214+
XDELEX,
213215
XGROUP,
214216
XINFO,
215217
XLEN,
@@ -496,9 +498,11 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
496498
case RedisCommand.GEOADD:
497499
case RedisCommand.SORT:
498500
case RedisCommand.XACK:
501+
case RedisCommand.XACKDEL:
499502
case RedisCommand.XADD:
500503
case RedisCommand.XCLAIM:
501504
case RedisCommand.XDEL:
505+
case RedisCommand.XDELEX:
502506
case RedisCommand.XGROUP:
503507
case RedisCommand.XREADGROUP:
504508
case RedisCommand.XTRIM:
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
namespace StackExchange.Redis;
2+
3+
/// <summary>
4+
/// Determines how stream trimming works.
5+
/// </summary>
6+
public enum StreamDeleteMode
7+
{
8+
/// <summary>
9+
/// Trims the stream according to the specified policy (MAXLEN or MINID) regardless of whether entries are referenced by any consumer groups, but preserves existing references to these entries in all consumer groups' PEL.
10+
/// </summary>
11+
KeepReferences = 0,
12+
13+
/// <summary>
14+
/// Trims the stream according to the specified policy and also removes all references to the trimmed entries from all consumer groups' PEL.
15+
/// </summary>
16+
/// <remarks>Requires server 8.2 or above.</remarks>
17+
DeleteReferences = 1,
18+
19+
/// <summary>
20+
/// With ACKED: Only trims entries that were read and acknowledged by all consumer groups.
21+
/// </summary>
22+
/// <remarks>Requires server 8.2 or above.</remarks>
23+
Acknowledged = 2,
24+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
namespace StackExchange.Redis;
2+
3+
/// <summary>
4+
/// Determines how stream trimming works.
5+
/// </summary>
6+
public enum StreamDeleteResult
7+
{
8+
/// <summary>
9+
/// No such id exists in the provided stream key.
10+
/// </summary>
11+
NotFound = -1,
12+
13+
/// <summary>
14+
/// Entry was deleted from the stream.
15+
/// </summary>
16+
Deleted = 1,
17+
18+
/// <summary>
19+
/// Entry was not deleted, but there are still dangling references.
20+
/// </summary>
21+
/// <remarks>This response relates to the <see cref="StreamDeleteMode.Acknowledged"/> mode.</remarks>
22+
NotDeleted = 2,
23+
}

src/StackExchange.Redis/Interfaces/IDatabase.cs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2440,6 +2440,34 @@ IEnumerable<SortedSetEntry> SortedSetScan(
24402440
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
24412441
long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
24422442

2443+
/// <summary>
2444+
/// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged.
2445+
/// </summary>
2446+
/// <param name="key">The key of the stream.</param>
2447+
/// <param name="groupName">The name of the consumer group that received the message.</param>
2448+
/// <param name="mode">The delete mode to use when acknowledging the message.</param>
2449+
/// <param name="messageId">The ID of the message to acknowledge.</param>
2450+
/// <param name="flags">The flags to use for this operation.</param>
2451+
/// <returns>The outcome of the delete operation.</returns>
2452+
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
2453+
#pragma warning disable RS0026 // similar overloads
2454+
StreamDeleteResult StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamDeleteMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None);
2455+
#pragma warning restore RS0026
2456+
2457+
/// <summary>
2458+
/// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged.
2459+
/// </summary>
2460+
/// <param name="key">The key of the stream.</param>
2461+
/// <param name="groupName">The name of the consumer group that received the message.</param>
2462+
/// /// <param name="mode">The delete mode to use when acknowledging the message.</param>
2463+
/// <param name="messageIds">The IDs of the messages to acknowledge.</param>
2464+
/// <param name="flags">The flags to use for this operation.</param>
2465+
/// <returns>The outcome of each delete operation.</returns>
2466+
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
2467+
#pragma warning disable RS0026 // similar overloads
2468+
StreamDeleteResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamDeleteMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
2469+
#pragma warning restore RS0026
2470+
24432471
/// <summary>
24442472
/// Adds an entry using the specified values to the given stream key.
24452473
/// If key does not exist, a new key holding a stream is created.
@@ -2782,10 +2810,11 @@ IEnumerable<SortedSetEntry> SortedSetScan(
27822810
/// <param name="minId">All entries with an id (timestamp) earlier minId will be removed.</param>
27832811
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed minId by a small number. This improves performance when removing messages.</param>
27842812
/// <param name="limit">The maximum number of entries to remove per call when useApproximateMaxLength = true. If 0, the limiting mechanism is disabled entirely.</param>
2813+
/// <param name="mode">Determines how stream trimming should be performed.</param>
27852814
/// <param name="flags">The flags to use for this operation.</param>
27862815
/// <returns>The number of messages removed from the stream.</returns>
27872816
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
2788-
long StreamTrimByMinId(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None);
2817+
long StreamTrimByMinId(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, StreamDeleteMode mode = StreamDeleteMode.KeepReferences, CommandFlags flags = CommandFlags.None);
27892818

27902819
/// <summary>
27912820
/// If key already exists and is a string, this command appends the value at the end of the string.

src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,14 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
594594
/// <inheritdoc cref="IDatabase.StreamAcknowledge(RedisKey, RedisValue, RedisValue[], CommandFlags)"/>
595595
Task<long> StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
596596

597+
/// <inheritdoc cref="IDatabase.StreamAcknowledgeAndDelete(RedisKey, RedisValue, StreamDeleteMode, RedisValue, CommandFlags)"/>
598+
#pragma warning disable RS0026 // similar overloads
599+
Task<StreamDeleteResult> StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamDeleteMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None);
600+
601+
/// <inheritdoc cref="IDatabase.StreamAcknowledgeAndDelete(RedisKey, RedisValue, StreamDeleteMode, RedisValue[], CommandFlags)"/>
602+
Task<StreamDeleteResult[]> StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamDeleteMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
603+
#pragma warning restore RS0026
604+
597605
/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, RedisValue, RedisValue, RedisValue?, int?, bool, CommandFlags)"/>
598606
Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
599607

@@ -672,8 +680,8 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
672680
/// <inheritdoc cref="IDatabase.StreamTrim(RedisKey, int, bool, CommandFlags)"/>
673681
Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
674682

675-
/// <inheritdoc cref="IDatabase.StreamTrimByMinId(RedisKey, RedisValue, bool, int?, CommandFlags)"/>
676-
Task<long> StreamTrimByMinIdAsync(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None);
683+
/// <inheritdoc cref="IDatabase.StreamTrimByMinId(RedisKey, RedisValue, bool, int?, StreamDeleteMode, CommandFlags)"/>
684+
Task<long> StreamTrimByMinIdAsync(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, StreamDeleteMode mode = StreamDeleteMode.KeepReferences, CommandFlags flags = CommandFlags.None);
677685

678686
/// <inheritdoc cref="IDatabase.StringAppend(RedisKey, RedisValue, CommandFlags)"/>
679687
Task<long> StringAppendAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None);

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,12 @@ public Task<long> StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, Red
564564
public Task<long> StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
565565
Inner.StreamAcknowledgeAsync(ToInner(key), groupName, messageIds, flags);
566566

567+
public Task<StreamDeleteResult> StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamDeleteMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) =>
568+
Inner.StreamAcknowledgeAndDeleteAsync(ToInner(key), groupName, mode, messageId, flags);
569+
570+
public Task<StreamDeleteResult[]> StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamDeleteMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
571+
Inner.StreamAcknowledgeAndDeleteAsync(ToInner(key), groupName, mode, messageIds, flags);
572+
567573
public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) =>
568574
Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags);
569575

@@ -642,8 +648,8 @@ public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions
642648
public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) =>
643649
Inner.StreamTrimAsync(ToInner(key), maxLength, useApproximateMaxLength, flags);
644650

645-
public Task<long> StreamTrimByMinIdAsync(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None) =>
646-
Inner.StreamTrimByMinIdAsync(ToInner(key), minId, useApproximateMaxLength, limit, flags);
651+
public Task<long> StreamTrimByMinIdAsync(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, StreamDeleteMode mode = StreamDeleteMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
652+
Inner.StreamTrimByMinIdAsync(ToInner(key), minId, useApproximateMaxLength, limit, mode, flags);
647653

648654
public Task<long> StringAppendAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) =>
649655
Inner.StringAppendAsync(ToInner(key), value, flags);

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,12 @@ public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue mes
546546
public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
547547
Inner.StreamAcknowledge(ToInner(key), groupName, messageIds, flags);
548548

549+
public StreamDeleteResult StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamDeleteMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None) =>
550+
Inner.StreamAcknowledgeAndDelete(ToInner(key), groupName, mode, messageId, flags);
551+
552+
public StreamDeleteResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamDeleteMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
553+
Inner.StreamAcknowledgeAndDelete(ToInner(key), groupName, mode, messageIds, flags);
554+
549555
public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) =>
550556
Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags);
551557

@@ -624,8 +630,8 @@ public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValu
624630
public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) =>
625631
Inner.StreamTrim(ToInner(key), maxLength, useApproximateMaxLength, flags);
626632

627-
public long StreamTrimByMinId(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None) =>
628-
Inner.StreamTrimByMinId(ToInner(key), minId, useApproximateMaxLength, limit, flags);
633+
public long StreamTrimByMinId(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, StreamDeleteMode mode = StreamDeleteMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
634+
Inner.StreamTrimByMinId(ToInner(key), minId, useApproximateMaxLength, limit, mode, flags);
629635

630636
public long StringAppend(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) =>
631637
Inner.StringAppend(ToInner(key), value, flags);
Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,15 @@
11
#nullable enable
2-
StackExchange.Redis.IDatabase.StreamTrimByMinId(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
3-
StackExchange.Redis.IDatabaseAsync.StreamTrimByMinIdAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
2+
StackExchange.Redis.IDatabase.StreamAcknowledgeAndDelete(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamDeleteMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamDeleteResult
3+
StackExchange.Redis.IDatabase.StreamAcknowledgeAndDelete(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamDeleteMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamDeleteResult[]!
4+
StackExchange.Redis.IDatabase.StreamTrimByMinId(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, StackExchange.Redis.StreamDeleteMode mode = StackExchange.Redis.StreamDeleteMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
5+
StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAndDeleteAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamDeleteMode mode, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamDeleteResult>!
6+
StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAndDeleteAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.StreamDeleteMode mode, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamDeleteResult[]!>!
7+
StackExchange.Redis.IDatabaseAsync.StreamTrimByMinIdAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, StackExchange.Redis.StreamDeleteMode mode = StackExchange.Redis.StreamDeleteMode.KeepReferences, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
8+
StackExchange.Redis.StreamDeleteMode
9+
StackExchange.Redis.StreamDeleteMode.Acknowledged = 2 -> StackExchange.Redis.StreamDeleteMode
10+
StackExchange.Redis.StreamDeleteMode.DeleteReferences = 1 -> StackExchange.Redis.StreamDeleteMode
11+
StackExchange.Redis.StreamDeleteMode.KeepReferences = 0 -> StackExchange.Redis.StreamDeleteMode
12+
StackExchange.Redis.StreamDeleteResult
13+
StackExchange.Redis.StreamDeleteResult.Deleted = 1 -> StackExchange.Redis.StreamDeleteResult
14+
StackExchange.Redis.StreamDeleteResult.NotDeleted = 2 -> StackExchange.Redis.StreamDeleteResult
15+
StackExchange.Redis.StreamDeleteResult.NotFound = -1 -> StackExchange.Redis.StreamDeleteResult

0 commit comments

Comments
 (0)