Skip to content

Commit ebc5a87

Browse files
authored
Cosmos: Modernize JSON serialization - Update pipeline (#38024)
Refactors DocumentSource to serialize to stream instead of DOM. Leverages JsonValueReaderWriter for serialization. For backwards compatibility: * Adds custom JsonValueReaderWriter's where found to be needed. * Use UnsafeRelaxedJsonEscaping (encoding special (html) characters) Partly removes __jObject, but not fully so the binding and query pipelines still work. Will be fully removed in next PR. Adds some todo's aswell, these will be picked up in following pr's if they have #34567 linked. Obsoletes ContentResponseOnWriteEnabled Simplifies CosmosTransactionalBatchTest and updates values for new (unindented) serialization Part of: #34567
1 parent 3717f8f commit ebc5a87

33 files changed

Lines changed: 901 additions & 914 deletions

src/EFCore.Cosmos/Infrastructure/CosmosDbContextOptionsBuilder.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,13 +205,14 @@ public virtual CosmosDbContextOptionsBuilder MaxRequestsPerTcpConnection(int req
205205
/// This reduces networking and CPU load by not sending the resource back over the network and serializing it on the client.
206206
/// </summary>
207207
/// <remarks>
208+
/// The EntityFrameworkCore default is <see langword="false" /> since 11.0.
208209
/// See <see href="https://aka.ms/efcore-docs-dbcontext-options">Using DbContextOptions</see>, and
209210
/// <see href="https://aka.ms/efcore-docs-cosmos">Accessing Azure Cosmos DB with EF Core</see> for more information and examples.
210211
/// </remarks>
211212
/// <param name="enabled"><see langword="false" /> to have null resource</param>
213+
[Obsolete("Enabling ContentResponseOnWrite currently has no benefit for EF Core.")]
212214
public virtual CosmosDbContextOptionsBuilder ContentResponseOnWriteEnabled(bool enabled = true)
213-
=> WithOption(e => e.ContentResponseOnWriteEnabled(Check.NotNull(enabled)));
214-
215+
=> WithOption(e => e.ContentResponseOnWriteEnabled(enabled));
215216

216217
/// <summary>
217218
/// Sets the <see cref="Cosmos.Infrastructure.SessionTokenManagementMode"/> to use.

src/EFCore.Cosmos/Infrastructure/Internal/CosmosDbOptionExtension.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ protected CosmosOptionsExtension(CosmosOptionsExtension copyFrom)
7474
_gatewayModeMaxConnectionLimit = copyFrom._gatewayModeMaxConnectionLimit;
7575
_maxTcpConnectionsPerEndpoint = copyFrom._maxTcpConnectionsPerEndpoint;
7676
_maxRequestsPerTcpConnection = copyFrom._maxRequestsPerTcpConnection;
77+
_enableContentResponseOnWrite = copyFrom._enableContentResponseOnWrite;
7778
_httpClientFactory = copyFrom._httpClientFactory;
7879
_sessionTokenManagementMode = copyFrom._sessionTokenManagementMode;
7980
_enableBulkExecution = copyFrom._enableBulkExecution;

src/EFCore.Cosmos/Infrastructure/Internal/CosmosSingletonOptions.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public class CosmosSingletonOptions : ICosmosSingletonOptions
141141
/// any release. You should only use it directly in your code with extreme caution and knowing that
142142
/// doing so can result in application failures when updating to a new Entity Framework Core release.
143143
/// </summary>
144-
public virtual bool? EnableContentResponseOnWrite { get; }
144+
public virtual bool? EnableContentResponseOnWrite { get; private set; }
145145

146146
/// <summary>
147147
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
@@ -185,6 +185,7 @@ public virtual void Initialize(IDbContextOptions options)
185185
GatewayModeMaxConnectionLimit = cosmosOptions.GatewayModeMaxConnectionLimit;
186186
MaxTcpConnectionsPerEndpoint = cosmosOptions.MaxTcpConnectionsPerEndpoint;
187187
MaxRequestsPerTcpConnection = cosmosOptions.MaxRequestsPerTcpConnection;
188+
EnableContentResponseOnWrite = cosmosOptions.EnableContentResponseOnWrite;
188189
HttpClientFactory = cosmosOptions.HttpClientFactory;
189190
EnableBulkExecution = cosmosOptions.EnableBulkExecution;
190191
}
@@ -216,6 +217,7 @@ public virtual void Validate(IDbContextOptions options)
216217
|| GatewayModeMaxConnectionLimit != cosmosOptions.GatewayModeMaxConnectionLimit
217218
|| MaxTcpConnectionsPerEndpoint != cosmosOptions.MaxTcpConnectionsPerEndpoint
218219
|| MaxRequestsPerTcpConnection != cosmosOptions.MaxRequestsPerTcpConnection
220+
|| EnableContentResponseOnWrite != cosmosOptions.EnableContentResponseOnWrite
219221
|| HttpClientFactory != cosmosOptions.HttpClientFactory
220222
|| EnableBulkExecution != cosmosOptions.EnableBulkExecution
221223
))

src/EFCore.Cosmos/Query/Internal/CosmosSerializationUtilities.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ namespace Microsoft.EntityFrameworkCore.Cosmos.Query.Internal;
1717
/// <remarks>
1818
/// Inspired by RelationalJsonUtilities.
1919
/// </remarks>
20-
public static class CosmosSerializationUtilities
20+
public static class CosmosSerializationUtilities // @TODO: Can this be removed? Use document source instead? #34567
2121
{
2222
/// <summary>
2323
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to

src/EFCore.Cosmos/Storage/Internal/ByteArrayConverter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public override object ReadJson(
5656
{
5757
if (reader.TokenType != JsonToken.StartArray)
5858
{
59-
throw new Exception(reader.TokenType.ToString());
59+
throw new InvalidOperationException(CoreStrings.JsonReaderInvalidTokenType(reader.TokenType));
6060
}
6161

6262
var byteList = new List<byte>();

src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs

Lines changed: 34 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
using System.Collections.ObjectModel;
66
using System.Net;
77
using System.Runtime.CompilerServices;
8-
using System.Text;
8+
using System.Runtime.InteropServices;
99
using Microsoft.Azure.Cosmos.Scripts;
1010
using Microsoft.EntityFrameworkCore.Cosmos.Diagnostics.Internal;
1111
using Microsoft.EntityFrameworkCore.Cosmos.Infrastructure.Internal;
@@ -47,8 +47,6 @@ public class CosmosClientWrapper : ICosmosClientWrapper
4747
private readonly string _databaseId;
4848
private readonly IExecutionStrategy _executionStrategy;
4949
private readonly IDiagnosticsLogger<DbLoggerCategory.Database.Command> _commandLogger;
50-
private readonly IDiagnosticsLogger<DbLoggerCategory.Database> _databaseLogger;
51-
private readonly bool? _enableContentResponseOnWrite;
5250

5351
static CosmosClientWrapper()
5452
{
@@ -68,34 +66,14 @@ public CosmosClientWrapper(
6866
ISingletonCosmosClientWrapper singletonWrapper,
6967
IDbContextOptions dbContextOptions,
7068
IExecutionStrategy executionStrategy,
71-
IDiagnosticsLogger<DbLoggerCategory.Database.Command> commandLogger,
72-
IDiagnosticsLogger<DbLoggerCategory.Database> databaseLogger)
69+
IDiagnosticsLogger<DbLoggerCategory.Database.Command> commandLogger)
7370
{
7471
var options = dbContextOptions.FindExtension<CosmosOptionsExtension>();
7572

7673
_singletonWrapper = singletonWrapper;
7774
_databaseId = options!.DatabaseName;
7875
_executionStrategy = executionStrategy;
7976
_commandLogger = commandLogger;
80-
_databaseLogger = databaseLogger;
81-
_enableContentResponseOnWrite = options.EnableContentResponseOnWrite;
82-
}
83-
84-
/// <summary>
85-
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
86-
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
87-
/// any release. You should only use it directly in your code with extreme caution and knowing that
88-
/// doing so can result in application failures when updating to a new Entity Framework Core release.
89-
/// </summary>
90-
public static Stream Serialize(JToken document)
91-
{
92-
var stream = new MemoryStream();
93-
using var writer = new StreamWriter(stream, new UTF8Encoding(), bufferSize: 1024, leaveOpen: true);
94-
95-
using var jsonWriter = new JsonTextWriter(writer);
96-
CosmosClientWrapper.Serializer.Serialize(jsonWriter, document);
97-
jsonWriter.Flush();
98-
return stream;
9977
}
10078

10179
/// <summary>
@@ -335,25 +313,25 @@ private static string GetPathFromRoot(IReadOnlyEntityType entityType)
335313
/// </summary>
336314
public virtual Task<bool> CreateItemAsync(
337315
string containerId,
338-
JToken document,
316+
string documentId,
317+
ReadOnlyMemory<byte> document,
339318
IUpdateEntry updateEntry,
340319
ISessionTokenStorage sessionTokenStorage,
341320
CancellationToken cancellationToken = default)
342-
=> _executionStrategy.ExecuteAsync((containerId, document, updateEntry, sessionTokenStorage, this), CreateItemOnceAsync, null, cancellationToken);
321+
=> _executionStrategy.ExecuteAsync((containerId, documentId, document, updateEntry, sessionTokenStorage, this), CreateItemOnceAsync, null, cancellationToken);
343322

344323
private static async Task<bool> CreateItemOnceAsync(
345324
DbContext _,
346-
(string ContainerId, JToken Document, IUpdateEntry Entry, ISessionTokenStorage SessionTokenStorage, CosmosClientWrapper Wrapper) parameters,
325+
(string ContainerId, string DocumentId, ReadOnlyMemory<byte> Document, IUpdateEntry Entry, ISessionTokenStorage SessionTokenStorage, CosmosClientWrapper Wrapper) parameters,
347326
CancellationToken cancellationToken = default)
348327
{
349-
using var stream = Serialize(parameters.Document);
350-
351328
var containerId = parameters.ContainerId;
329+
var documentId = parameters.DocumentId;
352330
var entry = parameters.Entry;
353331
var wrapper = parameters.Wrapper;
354332
var sessionTokenStorage = parameters.SessionTokenStorage;
355333
var container = wrapper.Client.GetDatabase(wrapper._databaseId).GetContainer(parameters.ContainerId);
356-
var itemRequestOptions = CreateItemRequestOptions(entry, wrapper._enableContentResponseOnWrite, sessionTokenStorage.GetSessionToken(containerId));
334+
var itemRequestOptions = CreateItemRequestOptions(entry, sessionTokenStorage.GetSessionToken(containerId));
357335
var partitionKeyValue = ExtractPartitionKeyValue(entry);
358336
var preTriggers = GetTriggers(entry, TriggerType.Pre, TriggerOperation.Create);
359337
var postTriggers = GetTriggers(entry, TriggerType.Post, TriggerOperation.Create);
@@ -370,6 +348,12 @@ private static async Task<bool> CreateItemOnceAsync(
370348
}
371349
}
372350

351+
if (!MemoryMarshal.TryGetArray(parameters.Document, out var segment) || segment.Array == null)
352+
{
353+
throw new UnreachableException("ReadOnlyMemory should have an underlying array.");
354+
}
355+
356+
using var stream = new MemoryStream(segment.Array, segment.Offset, segment.Count);
373357
using var response = await container.CreateItemStreamAsync(
374358
stream,
375359
partitionKeyValue,
@@ -381,7 +365,7 @@ private static async Task<bool> CreateItemOnceAsync(
381365
response.Diagnostics.GetClientElapsedTime(),
382366
response.Headers.RequestCharge,
383367
response.Headers.ActivityId,
384-
parameters.Document["id"]!.ToString(),
368+
documentId,
385369
containerId,
386370
partitionKeyValue);
387371

@@ -399,7 +383,7 @@ private static async Task<bool> CreateItemOnceAsync(
399383
public virtual Task<bool> ReplaceItemAsync(
400384
string collectionId,
401385
string documentId,
402-
JObject document,
386+
ReadOnlyMemory<byte> document,
403387
IUpdateEntry updateEntry,
404388
ISessionTokenStorage sessionTokenStorage,
405389
CancellationToken cancellationToken = default)
@@ -408,17 +392,15 @@ public virtual Task<bool> ReplaceItemAsync(
408392

409393
private static async Task<bool> ReplaceItemOnceAsync(
410394
DbContext _,
411-
(string ContainerId, string ResourceId, JObject Document, IUpdateEntry Entry, ISessionTokenStorage SessionTokenStorage, CosmosClientWrapper Wrapper) parameters,
395+
(string ContainerId, string ResourceId, ReadOnlyMemory<byte> Document, IUpdateEntry Entry, ISessionTokenStorage SessionTokenStorage, CosmosClientWrapper Wrapper) parameters,
412396
CancellationToken cancellationToken = default)
413397
{
414-
using var stream = Serialize(parameters.Document);
415-
416398
var containerId = parameters.ContainerId;
417399
var entry = parameters.Entry;
418400
var wrapper = parameters.Wrapper;
419401
var sessionTokenStorage = parameters.SessionTokenStorage;
420402
var container = wrapper.Client.GetDatabase(wrapper._databaseId).GetContainer(parameters.ContainerId);
421-
var itemRequestOptions = CreateItemRequestOptions(entry, wrapper._enableContentResponseOnWrite, sessionTokenStorage.GetSessionToken(containerId));
403+
var itemRequestOptions = CreateItemRequestOptions(entry, sessionTokenStorage.GetSessionToken(containerId));
422404
var partitionKeyValue = ExtractPartitionKeyValue(entry);
423405
var preTriggers = GetTriggers(entry, TriggerType.Pre, TriggerOperation.Replace);
424406
var postTriggers = GetTriggers(entry, TriggerType.Post, TriggerOperation.Replace);
@@ -435,6 +417,12 @@ private static async Task<bool> ReplaceItemOnceAsync(
435417
}
436418
}
437419

420+
if (!MemoryMarshal.TryGetArray(parameters.Document, out var segment) || segment.Array == null)
421+
{
422+
throw new UnreachableException("ReadOnlyMemory should have an underlying array.");
423+
}
424+
425+
using var stream = new MemoryStream(segment.Array, segment.Offset, segment.Count);
438426
using var response = await container.ReplaceItemStreamAsync(
439427
stream,
440428
parameters.ResourceId,
@@ -481,7 +469,7 @@ private static async Task<bool> DeleteItemOnceAsync(
481469
var sessionTokenStorage = parameters.SessionTokenStorage;
482470
var items = wrapper.Client.GetDatabase(wrapper._databaseId).GetContainer(parameters.ContainerId);
483471

484-
var itemRequestOptions = CreateItemRequestOptions(entry, wrapper._enableContentResponseOnWrite, sessionTokenStorage.GetSessionToken(containerId));
472+
var itemRequestOptions = CreateItemRequestOptions(entry, sessionTokenStorage.GetSessionToken(containerId));
485473
var partitionKeyValue = ExtractPartitionKeyValue(entry);
486474
var preTriggers = GetTriggers(entry, TriggerType.Pre, TriggerOperation.Delete);
487475
var postTriggers = GetTriggers(entry, TriggerType.Post, TriggerOperation.Delete);
@@ -539,7 +527,7 @@ public virtual ICosmosTransactionalBatchWrapper CreateTransactionalBatch(string
539527

540528
var batch = container.CreateTransactionalBatch(partitionKeyValue);
541529

542-
return new CosmosTransactionalBatchWrapper(batch, containerId, partitionKeyValue, checkSize, _enableContentResponseOnWrite);
530+
return new CosmosTransactionalBatchWrapper(batch, containerId, partitionKeyValue, checkSize);
543531
}
544532

545533
/// <summary>
@@ -578,9 +566,9 @@ private static async Task<CosmosTransactionalBatchResult> ExecuteTransactionalBa
578566
return ProcessBatchResponse(batch.CollectionId, response, batch.Entries, sessionTokenStorage);
579567
}
580568

581-
private static ItemRequestOptions CreateItemRequestOptions(IUpdateEntry entry, bool? enableContentResponseOnWrite, string? sessionToken)
569+
private static ItemRequestOptions CreateItemRequestOptions(IUpdateEntry entry, string? sessionToken)
582570
{
583-
var helper = RequestOptionsHelper.Create(entry, enableContentResponseOnWrite);
571+
var helper = RequestOptionsHelper.Create(entry);
584572

585573
var itemRequestOptions = new ItemRequestOptions
586574
{
@@ -590,7 +578,6 @@ private static ItemRequestOptions CreateItemRequestOptions(IUpdateEntry entry, b
590578
if (helper != null)
591579
{
592580
itemRequestOptions.IfMatchEtag = helper.IfMatchEtag;
593-
itemRequestOptions.EnableContentResponseOnWrite = helper.EnableContentResponseOnWrite;
594581
}
595582

596583
return itemRequestOptions;
@@ -681,31 +668,23 @@ private static CosmosTransactionalBatchResult ProcessBatchResponse(string contai
681668
var entry = entries[i];
682669
var item = response[i];
683670

684-
ProcessWriteResponse(entry.Entry, (string)item.ETag, (Stream)item.ResourceStream);
671+
ProcessWriteResponse(entry.Entry, item.ETag, item.ResourceStream);
685672
}
686673

687674
return CosmosTransactionalBatchResult.Success;
688675
}
689676

690677
private static void ProcessWriteResponse(IUpdateEntry entry, string eTag, Stream? content)
691678
{
692-
var etagProperty = entry.EntityType.GetETagProperty();
693-
if (etagProperty != null && entry.EntityState != EntityState.Deleted)
679+
if (entry.EntityState == EntityState.Deleted)
694680
{
695-
entry.SetStoreGeneratedValue(etagProperty, eTag);
681+
return;
696682
}
697683

698-
var jObjectProperty = entry.EntityType.FindProperty(CosmosPartitionKeyInPrimaryKeyConvention.JObjectPropertyName);
699-
if (jObjectProperty is { ValueGenerated: ValueGenerated.OnAddOrUpdate }
700-
&& content != null)
684+
var etagProperty = entry.EntityType.GetETagProperty();
685+
if (etagProperty != null)
701686
{
702-
using var responseStream = content;
703-
using var reader = new StreamReader(responseStream);
704-
using var jsonReader = new JsonTextReader(reader);
705-
706-
var createdDocument = Serializer.Deserialize<JObject>(jsonReader);
707-
708-
entry.SetStoreGeneratedValue(jObjectProperty, createdDocument);
687+
entry.SetStoreGeneratedValue(etagProperty, eTag);
709688
}
710689
}
711690

0 commit comments

Comments
 (0)