Skip to content

Commit 2baba33

Browse files
vkuttypCopilot
andcommitted
Fix JsonChunkAssembler bugs; add streaming JSON tests & benchmarks
Bug 1 — only 1 element returned per chunk: ProcessBuffer had needBreak=true after finding one element, causing it to stop immediately. Removed the flag; the reader now processes ALL complete elements in a single pass without early exit. Bug 2 — JsonReaderException on wide rows (FOR JSON splits mid-string): The outer loop was compacting the buffer even while elemStart >= 0 (mid-element), which shifted rawBuf bytes and made elemStart point into the middle of a string instead of its opening '{'. Fixed: CompactBuffer is now only called when consumedTo > 0, and absolute positions (parsedTo, elemStart) are adjusted by the compaction offset. Also added inner do-while loop so one chunk worth of data is fully drained before awaiting the next chunk (buffer may contain many complete elements). Integration tests (MsSqlIntegrationTests.cs): - QueryJsonStreamAsync_SmallDataset_YieldsCorrectCount — 20 rows, all properties validated - QueryJsonStreamAsync_LargeDataset_YieldsAllRows — 5 000 rows; first-element latency logged - QueryJsonStreamAsync_VeryLargeRows_HandlesChunkBoundaries — 50 rows × 2500-char Description forcing SQL Server to split a single JSON object across multiple FOR JSON rows Benchmark (MsSqlBenchmarks.cs): - CosmoSQL Warm FOR JSON streamed (object-by-object) - CosmoSQL Warm FOR JSON buffered (full array) - ADO.NET Warm FOR JSON buffered (full array) Controlled by BENCH_JSON_QUERY env var (default: SELECT TOP 10000 … FOR JSON PATH) All 10/10 integration tests pass; 5 000-row stream: first element in 5.3 ms, all in 17.6 ms. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 03f9490 commit 2baba33

3 files changed

Lines changed: 234 additions & 87 deletions

File tree

Benchmarks/CosmoSQLBenchmarks/MsSqlBenchmarks.cs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,49 @@ public async Task Ado_Warm_ToJson()
184184
.ToList());
185185
}
186186

187+
// ── FOR JSON streaming vs buffered ──────────────────────────────────────────
188+
189+
private static readonly string JsonQuery =
190+
Env("BENCH_JSON_QUERY",
191+
"SELECT TOP 10000 AccountNo, AccountName FROM Accounts FOR JSON PATH");
192+
193+
[Benchmark(Description = "CosmoSQL Warm FOR JSON streamed (object-by-object)")]
194+
public async Task Cosmo_Warm_ForJson_Streamed()
195+
{
196+
// Process each JSON object as it arrives — never holds the full array in RAM.
197+
int count = 0;
198+
await foreach (var elem in _cosmoConn!.QueryJsonStreamAsync(JsonQuery))
199+
count++;
200+
_ = count;
201+
}
202+
203+
[Benchmark(Description = "CosmoSQL Warm FOR JSON buffered (full array)")]
204+
public async Task Cosmo_Warm_ForJson_Buffered()
205+
{
206+
// Traditional approach: collect all FOR JSON rows, concat, then parse.
207+
var rows = await _cosmoConn!.QueryAsync(JsonQuery);
208+
var sb = new System.Text.StringBuilder();
209+
foreach (var row in rows)
210+
sb.Append(row[0].AsString());
211+
using var doc = System.Text.Json.JsonDocument.Parse(sb.ToString());
212+
var root = doc.RootElement;
213+
_ = root.GetArrayLength();
214+
}
215+
216+
[Benchmark(Description = "ADO.NET Warm FOR JSON buffered (full array)")]
217+
public async Task Ado_Warm_ForJson_Buffered()
218+
{
219+
// ADO.NET has no streaming FOR JSON — must buffer everything.
220+
var sb = new System.Text.StringBuilder();
221+
await using var cmd = new SqlCommand(JsonQuery, _adoConn);
222+
await using var rdr = await cmd.ExecuteReaderAsync();
223+
while (await rdr.ReadAsync())
224+
sb.Append(rdr.GetString(0));
225+
using var doc = System.Text.Json.JsonDocument.Parse(sb.ToString());
226+
var root = doc.RootElement;
227+
_ = root.GetArrayLength();
228+
}
229+
187230
// ── helpers ─────────────────────────────────────────────────────────────────
188231

189232
private static string Env(string key, string fallback)

src/SqlDotnetty.MsSql/JsonChunkAssembler.cs

Lines changed: 87 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -48,45 +48,66 @@ public static async IAsyncEnumerable<JsonElement> AssembleJsonObjectsAsync(
4848
IAsyncEnumerable<string> chunks,
4949
[EnumeratorCancellation] CancellationToken ct = default)
5050
{
51-
// Growing byte buffer for all received chunk bytes.
5251
using var buf = new MemoryStream(32768);
5352

54-
var state = default(JsonReaderState);
55-
long parsedTo = 0; // absolute byte offset consumed so far
56-
int depth = 0; // JSON nesting depth (1 = inside the top-level array)
57-
long elemStart = -1; // absolute offset of the current element's opening byte
58-
bool inArray = false;
53+
var state = default(JsonReaderState);
54+
long parsedTo = 0; // absolute position in rawBuf where reader stopped
55+
int depth = 0;
56+
long elemStart = -1; // absolute position of current in-progress element's '{', or -1
57+
bool inArray = false;
5958

6059
await foreach (var chunk in chunks.WithCancellation(ct))
6160
{
6261
if (string.IsNullOrEmpty(chunk)) continue;
6362

64-
// Append chunk bytes.
63+
// Append chunk bytes to the rolling buffer.
6564
buf.Position = buf.Length;
6665
var chunkBytes = Encoding.UTF8.GetBytes(chunk);
6766
buf.Write(chunkBytes, 0, chunkBytes.Length);
6867

69-
// Process the buffer SYNCHRONOUSLY (Utf8JsonReader is a ref struct and
70-
// cannot cross yield/await boundaries — so we collect results then yield).
71-
var result = ProcessBuffer(
72-
buf.GetBuffer(), (int)parsedTo, (int)buf.Length, state,
73-
depth, elemStart, inArray);
74-
75-
// Update mutable state from synchronous result.
76-
state = result.State;
77-
depth = result.Depth;
78-
inArray = result.InArray;
79-
elemStart = result.ElemStart;
80-
81-
// Compact: if elements were yielded, discard consumed prefix bytes.
82-
if (result.ConsumedAbsolute > parsedTo)
83-
CompactBuffer(buf, result.ConsumedAbsolute, out parsedTo);
84-
else
85-
parsedTo += result.BytesConsumedFromOffset;
86-
87-
// Yield complete elements OUTSIDE the Utf8JsonReader scope.
88-
foreach (var elem in result.Elements)
89-
yield return elem;
68+
// Inner loop: drain as many complete elements as possible from the current
69+
// buffer (one ProcessBuffer call may not find all of them if the result
70+
// set is large, but more commonly runs 1-2 times).
71+
bool madeProgress;
72+
do
73+
{
74+
madeProgress = false;
75+
76+
// ProcessBuffer is synchronous (Utf8JsonReader is a ref struct and cannot
77+
// cross yield/await boundaries).
78+
var result = ProcessBuffer(
79+
buf.GetBuffer(), (int)parsedTo, (int)buf.Length,
80+
state, depth, elemStart, inArray);
81+
82+
state = result.State;
83+
depth = result.Depth;
84+
inArray = result.InArray;
85+
86+
long newParsedTo = parsedTo + result.BytesConsumedFromOffset;
87+
long consumedTo = result.ConsumedTo; // absolute; safe to discard
88+
long newElemStart = result.ElemStart; // absolute, or -1
89+
90+
if (result.BytesConsumedFromOffset > 0 || result.Elements.Count > 0)
91+
madeProgress = true;
92+
93+
// Compact: discard bytes [0..consumedTo). This is always safe because
94+
// consumedTo only advances after a complete element (elemStart resets
95+
// to -1 at that point), so newElemStart is either -1 or >= consumedTo.
96+
if (consumedTo > 0)
97+
{
98+
CompactBuffer(buf, consumedTo);
99+
newParsedTo -= consumedTo;
100+
if (newElemStart >= 0) newElemStart -= consumedTo;
101+
}
102+
103+
parsedTo = newParsedTo;
104+
elemStart = newElemStart;
105+
106+
// Yield complete elements OUTSIDE the Utf8JsonReader scope.
107+
foreach (var elem in result.Elements)
108+
yield return elem;
109+
110+
} while (madeProgress);
90111
}
91112
}
92113

@@ -104,26 +125,35 @@ public static async Task<string> BufferJsonAsync(
104125
return sb.ToString();
105126
}
106127

107-
// ── Synchronous processing (no async/yield — safe for ref struct) ─────────
128+
// ── Synchronous inner processor (no async/yield — safe for ref struct) ───
108129

109130
private readonly ref struct BufferResult
110131
{
111-
public IReadOnlyList<JsonElement> Elements { get; init; }
112-
public JsonReaderState State { get; init; }
113-
public int Depth { get; init; }
114-
public long ElemStart { get; init; }
115-
public bool InArray { get; init; }
116-
/// <summary>Bytes consumed from the start of the slice passed in.</summary>
132+
public IReadOnlyList<JsonElement> Elements { get; init; }
133+
public JsonReaderState State { get; init; }
134+
public int Depth { get; init; }
135+
public long ElemStart { get; init; }
136+
public bool InArray { get; init; }
137+
/// <summary>Bytes the reader consumed from parsedToInt (may be 0 if all data is partial).</summary>
117138
public long BytesConsumedFromOffset { get; init; }
118-
/// <summary>Absolute offset to compact to (= original parsedTo + BytesConsumedFromOffset).</summary>
119-
public long ConsumedAbsolute { get; init; }
139+
/// <summary>
140+
/// Absolute position through which bytes are safe to discard (= after the last
141+
/// complete element extracted this call, or 0 if no element was completed).
142+
/// </summary>
143+
public long ConsumedTo { get; init; }
120144
}
121145

146+
/// <summary>
147+
/// Parse as many complete top-level JSON array elements as possible from
148+
/// <c>rawBuf[parsedToInt..bufLength]</c> using <see cref="Utf8JsonReader"/> in
149+
/// incremental mode. Never breaks early — processes ALL complete elements.
150+
/// </summary>
122151
private static BufferResult ProcessBuffer(
123152
byte[] rawBuf, int parsedToInt, int bufLength,
124153
JsonReaderState state, int depth, long elemStart, bool inArray)
125154
{
126-
var elements = new List<JsonElement>();
155+
var elements = new List<JsonElement>();
156+
long consumedTo = 0; // absolute; nothing safe to discard yet
127157

128158
var available = bufLength - parsedToInt;
129159
if (available <= 0)
@@ -132,20 +162,18 @@ private static BufferResult ProcessBuffer(
132162
{
133163
Elements = elements, State = state, Depth = depth,
134164
ElemStart = elemStart, InArray = inArray,
135-
BytesConsumedFromOffset = 0, ConsumedAbsolute = parsedToInt,
165+
BytesConsumedFromOffset = 0, ConsumedTo = 0,
136166
};
137167
}
138168

139169
var span = new ReadOnlySpan<byte>(rawBuf, parsedToInt, available);
140170
var reader = new Utf8JsonReader(span, isFinalBlock: false, state);
141171

142-
long consumedAbsolute = parsedToInt;
143-
bool needBreak = false;
144-
145-
while (!needBreak && reader.Read())
172+
while (reader.Read())
146173
{
147174
var tt = reader.TokenType;
148175

176+
// Detect the outer array wrapper (first token of a FOR JSON result).
149177
if (tt == JsonTokenType.StartArray && !inArray && depth == 0)
150178
{
151179
inArray = true;
@@ -157,6 +185,7 @@ private static BufferResult ProcessBuffer(
157185

158186
if (tt is JsonTokenType.StartObject or JsonTokenType.StartArray)
159187
{
188+
// Mark where a new top-level element begins (only at array depth 1).
160189
if (depth == 1)
161190
elemStart = parsedToInt + reader.TokenStartIndex;
162191
depth++;
@@ -167,20 +196,18 @@ private static BufferResult ProcessBuffer(
167196

168197
if (depth == 1 && elemStart >= 0)
169198
{
170-
// Complete element — extract and parse bytes.
171-
long end = parsedToInt + reader.BytesConsumed;
172-
int len = (int)(end - elemStart);
199+
// A complete top-level element just finished.
200+
long end = parsedToInt + reader.BytesConsumed;
201+
int len = (int)(end - elemStart);
173202
var elemBytes = new byte[len];
174203
Array.Copy(rawBuf, (int)elemStart, elemBytes, 0, len);
175204

176205
using var doc = JsonDocument.Parse(elemBytes);
177206
elements.Add(doc.RootElement.Clone());
178207

179-
elemStart = -1;
180-
consumedAbsolute = end;
181-
182-
// Signal that the outer loop should compact and break after this.
183-
needBreak = true;
208+
elemStart = -1;
209+
consumedTo = end; // bytes up to here are now safe to discard
210+
// ── DO NOT break — continue to collect further elements ──
184211
}
185212
else if (depth == 0)
186213
{
@@ -189,55 +216,28 @@ private static BufferResult ProcessBuffer(
189216
}
190217
}
191218

192-
long bytesConsumed = parsedToInt + reader.BytesConsumed - parsedToInt;
193-
194219
return new BufferResult
195220
{
196-
Elements = elements,
197-
State = reader.CurrentState,
198-
Depth = depth,
199-
ElemStart = elemStart,
200-
InArray = inArray,
221+
Elements = elements,
222+
State = reader.CurrentState,
223+
Depth = depth,
224+
ElemStart = elemStart,
225+
InArray = inArray,
201226
BytesConsumedFromOffset = reader.BytesConsumed,
202-
ConsumedAbsolute = needBreak ? consumedAbsolute : parsedToInt + reader.BytesConsumed,
227+
ConsumedTo = consumedTo,
203228
};
204229
}
205230

206231
/// <summary>
207-
/// Discard fully-consumed prefix bytes from <paramref name="buf"/> to cap memory usage.
232+
/// Discard the first <paramref name="consumedAbsolute"/> bytes from the buffer by
233+
/// block-copying the remaining bytes to the front. Keeps memory usage bounded.
208234
/// </summary>
209-
private static void CompactBuffer(MemoryStream buf, long consumedAbsolute, out long newParsedTo)
235+
private static void CompactBuffer(MemoryStream buf, long consumedAbsolute)
210236
{
211237
long remaining = buf.Length - consumedAbsolute;
212238
var rawBuf = buf.GetBuffer();
213239
if (remaining > 0)
214240
Buffer.BlockCopy(rawBuf, (int)consumedAbsolute, rawBuf, 0, (int)remaining);
215241
buf.SetLength(remaining);
216-
newParsedTo = 0;
217242
}
218243
}
219-
220-
221-
/// <summary>
222-
/// Assembles fragmented JSON chunks (as produced by SQL Server's <c>FOR JSON</c> queries)
223-
/// into complete, independently-parsed <see cref="JsonElement"/> objects.
224-
/// </summary>
225-
/// <remarks>
226-
/// <para>
227-
/// SQL Server's <c>FOR JSON PATH</c> / <c>FOR JSON AUTO</c> does not respect JSON value
228-
/// boundaries when splitting output across rows — a single row may contain half a property
229-
/// name, and the next row may start mid-value. Naively streaming row-by-row therefore
230-
/// yields unusable partial JSON.
231-
/// </para>
232-
/// <para>
233-
/// This class solves that by feeding chunks through .NET's <see cref="Utf8JsonReader"/>
234-
/// with <see cref="JsonReaderState"/> persistence. The reader correctly handles escaped
235-
/// strings, nested objects and arrays, and resumes from exactly the right position when
236-
/// more data arrives — so <c>{</c> characters inside string values are never misread as
237-
/// object boundaries.
238-
/// </para>
239-
/// <para>
240-
/// For a <c>FOR JSON</c> result like <c>[{"Id":1},{"Id":2},…]</c>, each top-level array
241-
/// element is yielded as a separate <see cref="JsonElement"/> the moment the closing <c>}</c>
242-
/// of that element is received, without waiting for the entire array to arrive.
243-
/// </para>

0 commit comments

Comments
 (0)