Skip to content

Commit 47de416

Browse files
committed
NCBC-4180: Proper staging of expiry on mutations
Motivation ========== Transactions doc wants the expiry to be staged with the absolute unix time in all cases. Then, when unstaged, it is applied. This means the expiry "starts" when the operation is _called_, not when the transaction commits. Modification ============ Updated what we stage accordingly. Added helpers to TimeSpanExtensions to help with this. Similarly when unstaging (whether in the transaction or in the cleaner), we then apply it accordingly. Results ======= Fit tests which test this pass. Change-Id: Ie701dab4031e422129c8f2d10a824c685a461fd8 Reviewed-on: https://review.couchbase.org/c/couchbase-net-client/+/242300 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Jeffry Morris <jeffrymorris@gmail.com>
1 parent 8d80a09 commit 47de416

8 files changed

Lines changed: 171 additions & 28 deletions

File tree

src/Couchbase/Client/Transactions/AttemptContext.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
using Couchbase.Client.Transactions.Support;
3838
using Couchbase.Core.IO.Operations;
3939
using Couchbase.Core.IO.Transcoders;
40+
using Couchbase.Utils;
4041
using Microsoft.Extensions.Logging;
4142
using Newtonsoft.Json.Linq;
4243
using static Couchbase.Client.Transactions.Error.ErrorBuilder;
@@ -670,6 +671,7 @@ private async Task SetAtrPendingIfFirstMutation(IRequestSpan? parentSpan)
670671
private async Task<TransactionGetResult> CreateStagedReplace(TransactionGetResult doc, object content,
671672
string opId, bool accessDeleted, IRequestSpan? parentSpan, ITypeTranscoder? transcoder, TimeSpan? expiry)
672673
{
674+
DateTimeOffset? absoluteExpiry = expiry?.ToEpochTtl();
673675
using var traceSpan = TraceSpan(parent: parentSpan);
674676
_ = _atr ?? throw new ArgumentNullException(nameof(_atr), "ATR should have already been initialized");
675677
try
@@ -680,7 +682,7 @@ private async Task<TransactionGetResult> CreateStagedReplace(TransactionGetResul
680682
var contentWrapper = new JObjectContentWrapper(content, transcoder);
681683
bool isTombstone = doc.Cas == 0;
682684
(var updatedCas, var mutationToken) =
683-
await _docs.MutateStagedReplace(doc, contentWrapper, opId, _atr, accessDeleted, expiry).CAF();
685+
await _docs.MutateStagedReplace(doc, contentWrapper, opId, _atr, accessDeleted, absoluteExpiry).CAF();
684686
Logger.LogDebug(
685687
"{method} for {redactedId}, attemptId={attemptId}, preCase={preCas}, postCas={postCas}, accessDeleted={accessDeleted}",
686688
nameof(CreateStagedReplace), Redactor.UserData(doc.Id), AttemptId, doc.Cas, updatedCas,
@@ -699,13 +701,13 @@ private async Task<TransactionGetResult> CreateStagedReplace(TransactionGetResul
699701
{
700702
// If doc is already in stagedMutations as an INSERT or INSERT_SHADOW, then remove that, and add this op as a new INSERT or INSERT_SHADOW(depending on what was replaced).
701703
_stagedMutations.Add(new StagedMutation(doc, content, contentWrapper.Flags, StagedMutationType.Insert,
702-
mutationToken, expiry));
704+
mutationToken, absoluteExpiry));
703705
}
704706
else
705707
{
706708
// If doc is already in stagedMutations as a REPLACE, then overwrite it.
707709
_stagedMutations.Add(
708-
new StagedMutation(doc, content, contentWrapper.Flags, StagedMutationType.Replace, mutationToken, expiry));
710+
new StagedMutation(doc, content, contentWrapper.Flags, StagedMutationType.Replace, mutationToken, absoluteExpiry));
709711
}
710712

711713
return TransactionGetResult.FromInsert(
@@ -852,6 +854,7 @@ private async Task<TransactionGetResult> CreateStagedInsert(ICouchbaseCollection
852854
object content, string opId, ulong? cas = null, IRequestSpan? parentSpan = null,
853855
ITypeTranscoder? transcoder = null, TimeSpan? expiry = null)
854856
{
857+
DateTimeOffset? absoluteExpiry = expiry?.ToEpochTtl();
855858
using var traceSpan = TraceSpan(parent: parentSpan);
856859
try
857860
{
@@ -871,7 +874,7 @@ private async Task<TransactionGetResult> CreateStagedInsert(ICouchbaseCollection
871874
if (byteContent == null)
872875
throw new InvalidArgumentException("couldn't convert content to byte[]");
873876
(var updatedCas, var mutationToken) =
874-
await _docs.MutateStagedInsert(collection, id, contentWrapper, opId, _atr!, cas, expiry).CAF();
877+
await _docs.MutateStagedInsert(collection, id, contentWrapper, opId, _atr!, cas, absoluteExpiry).CAF();
875878
Logger.LogDebug(
876879
"{method} for {redactedId}, attemptId={attemptId}, preCas={preCas}, postCas={postCas}, opId={opId}",
877880
nameof(CreateStagedInsert), Redactor.UserData(id), AttemptId, cas, updatedCas, opId);
@@ -893,7 +896,7 @@ private async Task<TransactionGetResult> CreateStagedInsert(ICouchbaseCollection
893896
await _testHooks.AfterStagedInsertComplete(this, id).CAF();
894897

895898
var stagedMutation = new StagedMutation(getResult, byteContent, contentWrapper.Flags, StagedMutationType.Insert,
896-
mutationToken, expiry);
899+
mutationToken, absoluteExpiry);
897900
_stagedMutations.Add(stagedMutation);
898901

899902
return (RepeatAction.NoRepeat, getResult);
@@ -949,7 +952,7 @@ await ForwardCompatibility.Check(this,
949952
docWithMeta.GetPostTransactionResult();
950953
var stagedMutation =
951954
new StagedMutation(docAlreadyExistsResult,
952-
content, docAlreadyExistsResult.Flags, StagedMutationType.Insert, null, expiry);
955+
content, docAlreadyExistsResult.Flags, StagedMutationType.Insert, null, absoluteExpiry);
953956
_stagedMutations.Add(stagedMutation);
954957
return (RepeatAction.NoRepeat,
955958
RepeatAction.NoRepeat);

src/Couchbase/Client/Transactions/Cleanup/Cleaner.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
using Couchbase.Client.Transactions.LogUtil;
1414
using Couchbase.Client.Transactions.Support;
1515
using Couchbase.Core.IO.Operations;
16+
using Couchbase.Utils;
1617
using Couchbase.Core.IO.Transcoders;
1718
using Microsoft.Extensions.Logging;
1819

@@ -234,7 +235,7 @@ await coll.InsertAsync(op.Id, content,
234235
{
235236
options.Durability(durabilityLevel)
236237
.Transcoder(op.StagedContent?.Transcoder);
237-
if (op.Expiry.HasValue) options.Expiry(op.Expiry.Value);
238+
if (op.Expiry.HasValue) options.Expiry(op.Expiry.Value.RemainingTtl());
238239
}).CAF();
239240
}
240241
else
@@ -251,7 +252,7 @@ await coll.MutateInAsync(op.Id, specs =>
251252
.PreserveTtl(coll.Scope.Bucket.SupportsCollections);
252253
if (op.Expiry.HasValue)
253254
{
254-
opts.Expiry(op.Expiry.Value);
255+
opts.Expiry(op.Expiry.Value.RemainingTtl());
255256
opts.PreserveTtl(false);
256257
}
257258
}).CAF();

src/Couchbase/Client/Transactions/Components/StagedMutation.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#nullable enable
1+
#nullable enable
22
using System;
33
using System.Globalization;
44
using Couchbase.Core;
@@ -18,9 +18,9 @@ internal class StagedMutation
1818

1919
public Flags? Flags { get; }
2020

21-
public TimeSpan? Expiry { get; }
21+
public DateTimeOffset? Expiry { get; }
2222

23-
public StagedMutation(TransactionGetResult doc, object? content, Flags? flags, StagedMutationType type, MutationToken? mutationToken = null, TimeSpan? expiry = null)
23+
public StagedMutation(TransactionGetResult doc, object? content, Flags? flags, StagedMutationType type, MutationToken? mutationToken = null, DateTimeOffset? expiry = null)
2424
{
2525
Doc = doc;
2626
Content = content;

src/Couchbase/Client/Transactions/DataAccess/DocumentRepository.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
using Couchbase.Core.Configuration.Server;
1616
using Couchbase.Core.IO.Operations;
1717
using Couchbase.KeyValue.ZoneAware;
18+
using Couchbase.Utils;
1819
using Newtonsoft.Json;
1920
using Newtonsoft.Json.Linq;
2021
using JsonSerializer = Newtonsoft.Json.JsonSerializer;
@@ -47,7 +48,7 @@ public DocumentRepository(TransactionContext overallContext, TimeSpan? keyValueT
4748
_userDataTranscoder = new JsonTranscoder(userDataSerializer);
4849
}
4950

50-
public async Task<(ulong updatedCas, MutationToken mutationToken)> MutateStagedInsert(ICouchbaseCollection collection, string docId, IContentAsWrapper content, string opId, IAtrRepository atr, ulong? cas = null, TimeSpan? expiry = null)
51+
public async Task<(ulong updatedCas, MutationToken mutationToken)> MutateStagedInsert(ICouchbaseCollection collection, string docId, IContentAsWrapper content, string opId, IAtrRepository atr, ulong? cas = null, DateTimeOffset? expiry = null)
5152
{
5253
List<MutateInSpec> specs = CreateMutationSpecs(atr, "insert", content, opId, expiry: expiry);
5354
var opts = GetMutateInOptions(StoreSemantics.Insert, collection)
@@ -77,7 +78,7 @@ public DocumentRepository(TransactionContext overallContext, TimeSpan? keyValueT
7778
return (mutateResult.Cas, mutateResult.MutationToken);
7879
}
7980

80-
public async Task<(ulong updatedCas, MutationToken mutationToken)> MutateStagedReplace(TransactionGetResult doc, IContentAsWrapper content, string opId, IAtrRepository atr, bool accessDeleted, TimeSpan? expiry = null)
81+
public async Task<(ulong updatedCas, MutationToken mutationToken)> MutateStagedReplace(TransactionGetResult doc, IContentAsWrapper content, string opId, IAtrRepository atr, bool accessDeleted, DateTimeOffset? expiry = null)
8182
{
8283
if (doc.Cas == 0)
8384
{
@@ -141,7 +142,7 @@ public bool SupportsReplaceBodyWithXattr(ICouchbaseCollection collection)
141142
return false;
142143
}
143144

144-
public async Task<(ulong updatedCas, MutationToken? mutationToken)> UnstageInsertOrReplace(ICouchbaseCollection collection, string docId, ulong cas, object finalDoc, bool insertMode, Flags flags, TimeSpan? expiry = null)
145+
public async Task<(ulong updatedCas, MutationToken? mutationToken)> UnstageInsertOrReplace(ICouchbaseCollection collection, string docId, ulong cas, object finalDoc, bool insertMode, Flags flags, DateTimeOffset? expiry = null)
145146
{
146147
bool isBinary = flags.DataFormat == DataFormat.Binary;
147148
if (SupportsReplaceBodyWithXattr(collection))
@@ -159,7 +160,7 @@ public bool SupportsReplaceBodyWithXattr(ICouchbaseCollection collection)
159160

160161
if (expiry.HasValue)
161162
{
162-
opts.Expiry(expiry.Value);
163+
opts.Expiry(expiry.Value.RemainingTtl());
163164
opts.PreserveTtl(false);
164165
}
165166
opts.Transcoder(isBinary
@@ -187,7 +188,7 @@ public bool SupportsReplaceBodyWithXattr(ICouchbaseCollection collection)
187188
: _userDataTranscoder);
188189
if (expiry.HasValue)
189190
{
190-
opts.Expiry(expiry.Value);
191+
opts.Expiry(expiry.Value.RemainingTtl());
191192
}
192193
var mutateResult = await collection.InsertAsync(docId, finalDoc, opts).CAF();
193194
return (mutateResult.Cas, mutateResult.MutationToken);
@@ -201,7 +202,7 @@ public bool SupportsReplaceBodyWithXattr(ICouchbaseCollection collection)
201202
: _userDataTranscoder);
202203
if (expiry.HasValue)
203204
{
204-
opts.Expiry(expiry.Value);
205+
opts.Expiry(expiry.Value.RemainingTtl());
205206
opts.PreserveTtl(false);
206207
}
207208
var mutateResult = await collection.MutateInAsync(docId, specs =>
@@ -337,7 +338,7 @@ private MutateInOptions GetMutateInOptions(StoreSemantics storeSemantics, ICouch
337338
.StoreSemantics(storeSemantics)
338339
.PreserveTtl(collection.Scope.Bucket.SupportsCollections);
339340

340-
private List<MutateInSpec> CreateMutationSpecs(IAtrRepository atr, string opType, IContentAsWrapper content, string opId, DocumentMetadata? dm = null, TimeSpan? expiry = null)
341+
private List<MutateInSpec> CreateMutationSpecs(IAtrRepository atr, string opType, IContentAsWrapper content, string opId, DocumentMetadata? dm = null, DateTimeOffset? expiry = null)
341342
{
342343
var specs = new List<MutateInSpec>
343344
{
@@ -355,7 +356,7 @@ private List<MutateInSpec> CreateMutationSpecs(IAtrRepository atr, string opType
355356
};
356357
if (expiry.HasValue)
357358
{
358-
specs.Add(MutateInSpec.Upsert(TransactionFields.DocExpiry, expiry.Value.TotalMilliseconds,
359+
specs.Add(MutateInSpec.Upsert(TransactionFields.DocExpiry, expiry.Value.ToUnixTimeSeconds(),
359360
true, true));
360361
}
361362
switch (opType)

src/Couchbase/Client/Transactions/DataAccess/IDocumentRepository.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#nullable enable
1+
#nullable enable
22
using System;
33
using System.Threading.Tasks;
44
using Couchbase.Core;
@@ -12,11 +12,11 @@ namespace Couchbase.Client.Transactions.DataAccess
1212
{
1313
internal interface IDocumentRepository
1414
{
15-
Task<(ulong updatedCas, MutationToken mutationToken)> MutateStagedInsert(ICouchbaseCollection collection, string docId, IContentAsWrapper content, string opId, IAtrRepository atr, ulong? cas = null, TimeSpan? expiry = null);
16-
Task<(ulong updatedCas, MutationToken mutationToken)> MutateStagedReplace(TransactionGetResult doc, IContentAsWrapper content, string opId, IAtrRepository atr, bool accessDeleted, TimeSpan? expiry = null);
15+
Task<(ulong updatedCas, MutationToken mutationToken)> MutateStagedInsert(ICouchbaseCollection collection, string docId, IContentAsWrapper content, string opId, IAtrRepository atr, ulong? cas = null, DateTimeOffset? expiry = null);
16+
Task<(ulong updatedCas, MutationToken mutationToken)> MutateStagedReplace(TransactionGetResult doc, IContentAsWrapper content, string opId, IAtrRepository atr, bool accessDeleted, DateTimeOffset? expiry = null);
1717
Task<(ulong updatedCas, MutationToken mutationToken)> MutateStagedRemove(TransactionGetResult doc, IAtrRepository atr);
1818
Task<(ulong updatedCas, MutationToken mutationToken)> RemoveStagedInsert(TransactionGetResult doc);
19-
Task<(ulong updatedCas, MutationToken? mutationToken)> UnstageInsertOrReplace(ICouchbaseCollection collection, string docId, ulong cas, object finalDoc, bool insertMode, Flags flags, TimeSpan? expiry);
19+
Task<(ulong updatedCas, MutationToken? mutationToken)> UnstageInsertOrReplace(ICouchbaseCollection collection, string docId, ulong cas, object finalDoc, bool insertMode, Flags flags, DateTimeOffset? expiry);
2020
Task UnstageRemove(ICouchbaseCollection collection, string docId, ulong cas = 0);
2121

2222
Task ClearTransactionMetadata(ICouchbaseCollection collection, string docId, ulong cas, bool isDeleted);

src/Couchbase/Client/Transactions/DataModel/DocumentLookupResult.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#nullable enable
1+
#nullable enable
22
using System;
33
using Couchbase.KeyValue;
44
using Couchbase.Client.Transactions.Components;
@@ -46,12 +46,12 @@ internal DocumentLookupResult(
4646

4747
internal ICouchbaseCollection DocumentCollection { get; }
4848

49-
internal TimeSpan? Expiry {
49+
internal DateTimeOffset? Expiry {
5050
get
5151
{
5252
var txn = LookupInResult.ContentAs<JObject>(0);
5353
var intValue = txn?["aux"]?["docexpiry"]?.Value<long?>();
54-
return intValue.HasValue ? TimeSpan.FromMilliseconds(intValue.Value) : null;
54+
return intValue.HasValue ? DateTimeOffset.FromUnixTimeSeconds(intValue.Value) : null;
5555
}
5656
}
5757

src/Couchbase/Utils/TimeSpanExtensions.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,33 @@ public static uint ToTtl(this TimeSpan duration)
5050
}
5151
}
5252

53+
/// <summary>
54+
/// Converts a relative <see cref="TimeSpan"/> duration to an absolute <see cref="DateTimeOffset"/>
55+
/// representing when the TTL should expire. Used by transactions to capture the expiry time
56+
/// at staging, so the TTL clock starts from staging rather than unstaging.
57+
/// </summary>
58+
/// <param name="duration">The relative TTL duration.</param>
59+
/// <returns>An absolute <see cref="DateTimeOffset"/> representing the expiry time.</returns>
60+
public static DateTimeOffset ToEpochTtl(this TimeSpan duration)
61+
{
62+
return DateTimeOffset.UtcNow + duration;
63+
}
64+
65+
/// <summary>
66+
/// Converts an absolute expiry <see cref="DateTimeOffset"/> back to the remaining
67+
/// <see cref="TimeSpan"/> relative to now. Returns a minimum of 1 second to avoid
68+
/// passing 0 to the server, which would mean "never expire".
69+
/// </summary>
70+
/// <param name="absoluteExpiry">The absolute expiry time.</param>
71+
/// <returns>The remaining TTL as a <see cref="TimeSpan"/>, minimum 1 second.</returns>
72+
public static TimeSpan RemainingTtl(this DateTimeOffset absoluteExpiry)
73+
{
74+
var remaining = absoluteExpiry - DateTimeOffset.UtcNow;
75+
return remaining > TimeSpan.FromSeconds(1)
76+
? remaining
77+
: TimeSpan.FromSeconds(1);
78+
}
79+
5380
/// <summary>
5481
/// Converts a duration expressed as milliseconds to a unix-based TTL.
5582
/// </summary>

0 commit comments

Comments
 (0)