Skip to content

Commit 3445554

Browse files
authored
Cosmos: Track session tokens for Pre-Condition, Conflict and document NotFound failures (#37941)
Closes #37942
1 parent 0842c55 commit 3445554

6 files changed

Lines changed: 249 additions & 77 deletions

File tree

src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs

Lines changed: 58 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class CosmosClientWrapper : ICosmosClientWrapper
4141
/// </summary>
4242
public static readonly string DefaultPartitionKey = "__partitionKey";
4343

44+
private const string SubStatusCodeHeaderName = "x-ms-substatus";
45+
4446
private readonly ISingletonCosmosClientWrapper _singletonWrapper;
4547
private readonly string _databaseId;
4648
private readonly IExecutionStrategy _executionStrategy;
@@ -383,7 +385,7 @@ private static async Task<bool> CreateItemOnceAsync(
383385
containerId,
384386
partitionKeyValue);
385387

386-
ProcessResponse(containerId, response, entry, sessionTokenStorage);
388+
ProcessWriteResponse(containerId, response, entry, sessionTokenStorage);
387389

388390
return response.StatusCode == HttpStatusCode.Created;
389391
}
@@ -449,7 +451,7 @@ private static async Task<bool> ReplaceItemOnceAsync(
449451
containerId,
450452
partitionKeyValue);
451453

452-
ProcessResponse(containerId, response, entry, sessionTokenStorage);
454+
ProcessWriteResponse(containerId, response, entry, sessionTokenStorage);
453455

454456
return response.StatusCode == HttpStatusCode.OK;
455457
}
@@ -511,7 +513,7 @@ private static async Task<bool> DeleteItemOnceAsync(
511513
containerId,
512514
partitionKeyValue);
513515

514-
ProcessResponse(containerId, response, entry, sessionTokenStorage);
516+
ProcessWriteResponse(containerId, response, entry, sessionTokenStorage);
515517

516518
return response.StatusCode == HttpStatusCode.NoContent;
517519
}
@@ -573,22 +575,7 @@ private static async Task<CosmosTransactionalBatchResult> ExecuteTransactionalBa
573575
batch.PartitionKeyValue,
574576
"[ \"" + string.Join("\", \"", batch.Entries.Select(x => x.Id)) + "\" ]");
575577

576-
if (!response.IsSuccessStatusCode)
577-
{
578-
var errorCode = response.StatusCode;
579-
var errorEntries = response
580-
.Select((opResult, index) => (opResult, index))
581-
.Where(r => r.opResult.StatusCode == errorCode)
582-
.Select(r => batch.Entries[r.index].Entry)
583-
.ToList();
584-
585-
var exception = new CosmosException(response.ErrorMessage, errorCode, 0, response.ActivityId, response.RequestCharge);
586-
return new CosmosTransactionalBatchResult(errorEntries, exception);
587-
}
588-
589-
ProcessResponse(batch.CollectionId, response, batch.Entries, sessionTokenStorage);
590-
591-
return CosmosTransactionalBatchResult.Success;
578+
return ProcessBatchResponse(batch.CollectionId, response, batch.Entries, sessionTokenStorage);
592579
}
593580

594581
private static ItemRequestOptions CreateItemRequestOptions(IUpdateEntry entry, bool? enableContentResponseOnWrite, string? sessionToken)
@@ -642,35 +629,65 @@ private static PartitionKey ExtractPartitionKeyValue(IUpdateEntry entry)
642629
return builder.Build();
643630
}
644631

645-
private static void ProcessResponse(string containerId, ResponseMessage response, IUpdateEntry entry, ISessionTokenStorage sessionTokenStorage)
632+
private static void ProcessWriteResponse(string containerId, ResponseMessage response, IUpdateEntry entry, ISessionTokenStorage sessionTokenStorage)
646633
{
647-
response.EnsureSuccessStatusCode();
648-
649-
if (!string.IsNullOrWhiteSpace(response.Headers.Session))
634+
try
650635
{
651-
sessionTokenStorage.TrackSessionToken(containerId, response.Headers.Session);
636+
response.EnsureSuccessStatusCode();
652637
}
638+
catch (CosmosException)
639+
{
640+
TryTrackSessionTokenFromFailure(containerId, response.StatusCode, response.Headers, sessionTokenStorage);
641+
throw;
642+
}
643+
644+
sessionTokenStorage.TrackSessionToken(containerId, response.Headers.Session);
653645

654-
ProcessResponse(entry, response.Headers.ETag, response.Content);
646+
ProcessWriteResponse(entry, response.Headers.ETag, response.Content);
655647
}
656648

657-
private static void ProcessResponse(string containerId, TransactionalBatchResponse batchResponse, IReadOnlyList<CosmosTransactionalBatchEntry> entries, ISessionTokenStorage sessionTokenStorage)
649+
private static void TryTrackSessionTokenFromFailure(string containerId, HttpStatusCode statusCode, Headers headers, ISessionTokenStorage sessionTokenStorage)
658650
{
659-
if (!string.IsNullOrWhiteSpace(batchResponse.Headers.Session))
651+
// Some failures indicate document changes on the server that should be reflected in the session token to avoid subsequent stale reads.
652+
const string readSessionNotAvailableSubStatusCode = "1002";
653+
if (statusCode == HttpStatusCode.Conflict || statusCode == HttpStatusCode.PreconditionFailed ||
654+
(statusCode == HttpStatusCode.NotFound && (!headers.TryGetValue(SubStatusCodeHeaderName, out var subStatusCode) || subStatusCode != readSessionNotAvailableSubStatusCode)))
660655
{
661-
sessionTokenStorage.TrackSessionToken(containerId, batchResponse.Headers.Session);
656+
sessionTokenStorage.TrackSessionToken(containerId, headers.Session);
662657
}
658+
}
663659

664-
for (var i = 0; i < batchResponse.Count; i++)
660+
private static CosmosTransactionalBatchResult ProcessBatchResponse(string containerId, TransactionalBatchResponse response, IReadOnlyList<CosmosTransactionalBatchEntry> entries, ISessionTokenStorage sessionTokenStorage)
661+
{
662+
if (!response.IsSuccessStatusCode)
663+
{
664+
TryTrackSessionTokenFromFailure(containerId, response.StatusCode, response.Headers, sessionTokenStorage);
665+
666+
var errorCode = response.StatusCode;
667+
var errorEntries = response
668+
.Select((opResult, index) => (opResult, index))
669+
.Where(r => r.opResult.StatusCode == errorCode)
670+
.Select(r => entries[r.index].Entry)
671+
.ToList();
672+
673+
var exception = new CosmosException(response.ErrorMessage, errorCode, 0, response.ActivityId, response.RequestCharge);
674+
return new CosmosTransactionalBatchResult(errorEntries, exception);
675+
}
676+
677+
sessionTokenStorage.TrackSessionToken(containerId, response.Headers.Session);
678+
679+
for (var i = 0; i < response.Count; i++)
665680
{
666681
var entry = entries[i];
667-
var response = batchResponse[i];
682+
var item = response[i];
668683

669-
ProcessResponse(entry.Entry, response.ETag, response.ResourceStream);
684+
ProcessWriteResponse(entry.Entry, (string)item.ETag, (Stream)item.ResourceStream);
670685
}
686+
687+
return CosmosTransactionalBatchResult.Success;
671688
}
672689

673-
private static void ProcessResponse(IUpdateEntry entry, string eTag, Stream? content)
690+
private static void ProcessWriteResponse(IUpdateEntry entry, string eTag, Stream? content)
674691
{
675692
var etagProperty = entry.EntityType.GetETagProperty();
676693
if (etagProperty != null && entry.EntityState != EntityState.Deleted)
@@ -739,7 +756,7 @@ public virtual IAsyncEnumerable<JToken> ExecuteSqlQueryAsync(
739756
containerId,
740757
partitionKeyValue);
741758

742-
return JObjectFromReadItemResponseMessage(response);
759+
return JObjectFromReadItemResponseMessage(containerId, response, sessionTokenStorage);
743760
}
744761

745762
private static async Task<ResponseMessage> CreateSingleItemQueryAsync(
@@ -758,28 +775,27 @@ private static async Task<ResponseMessage> CreateSingleItemQueryAsync(
758775
itemRequestOptions,
759776
cancellationToken: cancellationToken).ConfigureAwait(false);
760777

761-
if (!string.IsNullOrWhiteSpace(response.Headers.Session))
762-
{
763-
sessionTokenStorage.TrackSessionToken(containerId, response.Headers.Session);
764-
}
765-
766778
return response;
767779
}
768780

769-
private static JObject? JObjectFromReadItemResponseMessage(ResponseMessage responseMessage)
781+
private static JObject? JObjectFromReadItemResponseMessage(string containerId, ResponseMessage responseMessage, ISessionTokenStorage sessionTokenStorage)
770782
{
771783
if (responseMessage.StatusCode == HttpStatusCode.NotFound)
772784
{
773-
const string subStatusCodeHeaderName = "x-ms-substatus";
774785
// We get no sub-status code if document not found, other not found errors (like session or container) have a sub status code
775-
if (!responseMessage.Headers.TryGetValue(subStatusCodeHeaderName, out var subStatusCode) || string.IsNullOrWhiteSpace(subStatusCode) || subStatusCode == "0")
786+
if (!responseMessage.Headers.TryGetValue(SubStatusCodeHeaderName, out var subStatusCode) || string.IsNullOrWhiteSpace(subStatusCode) || subStatusCode == "0")
776787
{
788+
// Track session token to ensure subsequent requests will not read stale data where the document might still exist.
789+
sessionTokenStorage.TrackSessionToken(containerId, responseMessage.Headers.Session);
790+
777791
return null;
778792
}
779793
}
780794

781795
responseMessage.EnsureSuccessStatusCode();
782796

797+
sessionTokenStorage.TrackSessionToken(containerId, responseMessage.Headers.Session);
798+
783799
var responseStream = responseMessage.Content;
784800
using var reader = new StreamReader(responseStream);
785801
using var jsonReader = new JsonTextReader(reader);
@@ -1066,10 +1082,7 @@ public CosmosFeedIteratorWrapper(FeedIterator inner, string containerName, ISess
10661082
public override async Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
10671083
{
10681084
var response = await _inner.ReadNextAsync(cancellationToken).ConfigureAwait(false);
1069-
if (!string.IsNullOrWhiteSpace(response.Headers.Session))
1070-
{
1071-
_sessionTokenStorage.TrackSessionToken(_containerName, response.Headers.Session);
1072-
}
1085+
_sessionTokenStorage.TrackSessionToken(_containerName, response.Headers.Session);
10731086
return response;
10741087
}
10751088
}

src/EFCore.Cosmos/Storage/Internal/ISessionTokenStorage.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public interface ISessionTokenStorage
6666
/// any release. You should only use it directly in your code with extreme caution and knowing that
6767
/// doing so can result in application failures when updating to a new Entity Framework Core release.
6868
/// </summary>
69-
public void TrackSessionToken(string containerName, string sessionToken);
69+
public void TrackSessionToken(string containerName, string? sessionToken);
7070

7171
/// <summary>
7272
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to

src/EFCore.Cosmos/Storage/Internal/SessionTokenStorage.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,11 @@ public virtual void SetDefaultContainerSessionToken(string? sessionToken)
181181
/// any release. You should only use it directly in your code with extreme caution and knowing that
182182
/// doing so can result in application failures when updating to a new Entity Framework Core release.
183183
/// </summary>
184-
public virtual void TrackSessionToken(string containerName, string sessionToken)
184+
public virtual void TrackSessionToken(string containerName, string? sessionToken)
185185
{
186186
ArgumentNullException.ThrowIfNullOrWhiteSpace(containerName, nameof(containerName));
187-
ArgumentNullException.ThrowIfNullOrWhiteSpace(sessionToken, nameof(sessionToken));
188187

189-
if (_mode == SessionTokenManagementMode.FullyAutomatic)
188+
if (_mode == SessionTokenManagementMode.FullyAutomatic || string.IsNullOrWhiteSpace(sessionToken))
190189
{
191190
return;
192191
}

0 commit comments

Comments
 (0)