Skip to content

Commit 888d0a1

Browse files
committed
XADD KEEPREF|DELREF|ACKED
1 parent efe65ab commit 888d0a1

8 files changed

Lines changed: 150 additions & 41 deletions

File tree

src/StackExchange.Redis/Interfaces/IDatabase.cs

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2482,7 +2482,7 @@ IEnumerable<SortedSetEntry> SortedSetScan(
24822482
/// <param name="flags">The flags to use for this operation.</param>
24832483
/// <returns>The ID of the newly created message.</returns>
24842484
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
2485-
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
2485+
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags);
24862486

24872487
/// <summary>
24882488
/// Adds an entry using the specified values to the given stream key.
@@ -2497,7 +2497,46 @@ IEnumerable<SortedSetEntry> SortedSetScan(
24972497
/// <param name="flags">The flags to use for this operation.</param>
24982498
/// <returns>The ID of the newly created message.</returns>
24992499
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
2500-
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
2500+
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags);
2501+
2502+
/// <summary>
2503+
/// Adds an entry using the specified values to the given stream key.
2504+
/// If key does not exist, a new key holding a stream is created.
2505+
/// The command returns the ID of the newly created stream entry.
2506+
/// </summary>
2507+
/// <param name="key">The key of the stream.</param>
2508+
/// <param name="streamField">The field name for the stream entry.</param>
2509+
/// <param name="streamValue">The value to set in the stream entry.</param>
2510+
/// <param name="messageId">The ID to assign to the stream entry, defaults to an auto-generated ID ("*").</param>
2511+
/// <param name="maxLength">The maximum length of the stream.</param>
2512+
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
2513+
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
2514+
/// <param name="deleteMode">Determines how stream trimming should be performed.</param>
2515+
/// <param name="flags">The flags to use for this operation.</param>
2516+
/// <returns>The ID of the newly created message.</returns>
2517+
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
2518+
#pragma warning disable RS0026 // different shape
2519+
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamDeleteMode deleteMode = StreamDeleteMode.KeepReferences, CommandFlags flags = CommandFlags.None);
2520+
#pragma warning restore RS0026
2521+
2522+
/// <summary>
2523+
/// Adds an entry using the specified values to the given stream key.
2524+
/// If key does not exist, a new key holding a stream is created.
2525+
/// The command returns the ID of the newly created stream entry.
2526+
/// </summary>
2527+
/// <param name="key">The key of the stream.</param>
2528+
/// <param name="streamPairs">The fields and their associated values to set in the stream entry.</param>
2529+
/// <param name="messageId">The ID to assign to the stream entry, defaults to an auto-generated ID ("*").</param>
2530+
/// <param name="maxLength">The maximum length of the stream.</param>
2531+
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
2532+
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
2533+
/// <param name="deleteMode">Determines how stream trimming should be performed.</param>
2534+
/// <param name="flags">The flags to use for this operation.</param>
2535+
/// <returns>The ID of the newly created message.</returns>
2536+
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
2537+
#pragma warning disable RS0026 // different shape
2538+
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamDeleteMode deleteMode = StreamDeleteMode.KeepReferences, CommandFlags flags = CommandFlags.None);
2539+
#pragma warning restore RS0026
25012540

25022541
/// <summary>
25032542
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer.

src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -594,19 +594,27 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
594594
/// <inheritdoc cref="IDatabase.StreamAcknowledge(RedisKey, RedisValue, RedisValue[], CommandFlags)"/>
595595
Task<long> StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
596596

597-
/// <inheritdoc cref="IDatabase.StreamAcknowledgeAndDelete(RedisKey, RedisValue, StreamDeleteMode, RedisValue, CommandFlags)"/>
598597
#pragma warning disable RS0026 // similar overloads
598+
/// <inheritdoc cref="IDatabase.StreamAcknowledgeAndDelete(RedisKey, RedisValue, StreamDeleteMode, RedisValue, CommandFlags)"/>
599599
Task<StreamDeleteResult> StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamDeleteMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None);
600600

601601
/// <inheritdoc cref="IDatabase.StreamAcknowledgeAndDelete(RedisKey, RedisValue, StreamDeleteMode, RedisValue[], CommandFlags)"/>
602602
Task<StreamDeleteResult[]> StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamDeleteMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
603603
#pragma warning restore RS0026
604604

605605
/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, RedisValue, RedisValue, RedisValue?, int?, bool, CommandFlags)"/>
606-
Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
606+
Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags);
607607

608608
/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, NameValueEntry[], RedisValue?, int?, bool, CommandFlags)"/>
609-
Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
609+
Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags);
610+
611+
#pragma warning disable RS0026 // similar overloads
612+
/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, RedisValue, RedisValue, RedisValue?, long?, bool, long?, StreamDeleteMode, CommandFlags)"/>
613+
Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamDeleteMode deleteMode = StreamDeleteMode.KeepReferences, CommandFlags flags = CommandFlags.None);
614+
615+
/// <inheritdoc cref="IDatabase.StreamAdd(RedisKey, NameValueEntry[], RedisValue?, long?, bool, long?, StreamDeleteMode, CommandFlags)"/>
616+
Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamDeleteMode deleteMode = StreamDeleteMode.KeepReferences, CommandFlags flags = CommandFlags.None);
617+
#pragma warning restore RS0026
610618

611619
/// <inheritdoc cref="IDatabase.StreamAutoClaim(RedisKey, RedisValue, RedisValue, long, RedisValue, int?, CommandFlags)"/>
612620
Task<StreamAutoClaimResult> StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None);

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -570,12 +570,18 @@ public Task<StreamDeleteResult> StreamAcknowledgeAndDeleteAsync(RedisKey key, Re
570570
public Task<StreamDeleteResult[]> StreamAcknowledgeAndDeleteAsync(RedisKey key, RedisValue groupName, StreamDeleteMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
571571
Inner.StreamAcknowledgeAndDeleteAsync(ToInner(key), groupName, mode, messageIds, flags);
572572

573-
public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) =>
573+
public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
574574
Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags);
575575

576-
public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) =>
576+
public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
577577
Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags);
578578

579+
public Task<RedisValue> StreamAddAsync(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamDeleteMode mode = StreamDeleteMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
580+
Inner.StreamAddAsync(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);
581+
582+
public Task<RedisValue> StreamAddAsync(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamDeleteMode mode = StreamDeleteMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
583+
Inner.StreamAddAsync(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);
584+
579585
public Task<StreamAutoClaimResult> StreamAutoClaimAsync(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) =>
580586
Inner.StreamAutoClaimAsync(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags);
581587

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -552,12 +552,18 @@ public StreamDeleteResult StreamAcknowledgeAndDelete(RedisKey key, RedisValue gr
552552
public StreamDeleteResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamDeleteMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None) =>
553553
Inner.StreamAcknowledgeAndDelete(ToInner(key), groupName, mode, messageIds, flags);
554554

555-
public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) =>
555+
public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
556556
Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, flags);
557557

558-
public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) =>
558+
public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags) =>
559559
Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, flags);
560560

561+
public RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamDeleteMode mode = StreamDeleteMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
562+
Inner.StreamAdd(ToInner(key), streamField, streamValue, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);
563+
564+
public RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamDeleteMode mode = StreamDeleteMode.KeepReferences, CommandFlags flags = CommandFlags.None) =>
565+
Inner.StreamAdd(ToInner(key), streamPairs, messageId, maxLength, useApproximateMaxLength, limit, mode, flags);
566+
561567
public StreamAutoClaimResult StreamAutoClaim(RedisKey key, RedisValue consumerGroup, RedisValue claimingConsumer, long minIdleTimeInMs, RedisValue startAtId, int? count = null, CommandFlags flags = CommandFlags.None) =>
562568
Inner.StreamAutoClaim(ToInner(key), consumerGroup, claimingConsumer, minIdleTimeInMs, startAtId, count, flags);
563569

src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -717,8 +717,6 @@ StackExchange.Redis.IDatabase.SortedSetUpdate(StackExchange.Redis.RedisKey key,
717717
StackExchange.Redis.IDatabase.SortedSetUpdate(StackExchange.Redis.RedisKey key, StackExchange.Redis.SortedSetEntry[]! values, StackExchange.Redis.SortedSetWhen when = StackExchange.Redis.SortedSetWhen.Always, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
718718
StackExchange.Redis.IDatabase.StreamAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
719719
StackExchange.Redis.IDatabase.StreamAcknowledge(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
720-
StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue
721-
StackExchange.Redis.IDatabase.StreamAdd(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisValue
722720
StackExchange.Redis.IDatabase.StreamAutoClaim(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamAutoClaimResult
723721
StackExchange.Redis.IDatabase.StreamAutoClaimIdsOnly(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamAutoClaimIdsOnlyResult
724722
StackExchange.Redis.IDatabase.StreamClaim(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
@@ -953,8 +951,6 @@ StackExchange.Redis.IDatabaseAsync.SortedSetUpdateAsync(StackExchange.Redis.Redi
953951
StackExchange.Redis.IDatabaseAsync.SortedSetUpdateAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.SortedSetEntry[]! values, StackExchange.Redis.SortedSetWhen when = StackExchange.Redis.SortedSetWhen.Always, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
954952
StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue messageId, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
955953
StackExchange.Redis.IDatabaseAsync.StreamAcknowledgeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
956-
StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.NameValueEntry[]! streamPairs, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisValue>!
957-
StackExchange.Redis.IDatabaseAsync.StreamAddAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue streamField, StackExchange.Redis.RedisValue streamValue, StackExchange.Redis.RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisValue>!
958954
StackExchange.Redis.IDatabaseAsync.StreamAutoClaimAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamAutoClaimResult>!
959955
StackExchange.Redis.IDatabaseAsync.StreamAutoClaimIdsOnlyAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue startAtId, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamAutoClaimIdsOnlyResult>!
960956
StackExchange.Redis.IDatabaseAsync.StreamClaimAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue consumerGroup, StackExchange.Redis.RedisValue claimingConsumer, long minIdleTimeInMs, StackExchange.Redis.RedisValue[]! messageIds, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!

0 commit comments

Comments
 (0)