Skip to content

Commit 03f9490

Browse files
vkuttypCopilot
andcommitted
Wire QueryJsonStreamAsync into Connection, Command, and Pool
- MsSqlConnection.QueryJsonStreamAsync(): uses QueryStreamAsync row-by-row streaming, extracts the JSON column text, feeds fragments to JsonChunkAssembler.AssembleJsonObjectsAsync() which uses Utf8JsonReader + JsonReaderState to detect object boundaries across SQL Server's arbitrary chunk splits (~2033-char rows) and yields one JsonElement per array element. - Refactored JsonChunkAssembler to move all Utf8JsonReader usage into a synchronous ProcessBuffer() helper (ref struct can't cross yield/await), so the async iterator only awaits chunks and yields collected elements. - MsSqlCommand.QueryJsonStreamAsync(): thin pass-through with CommandTimeout. - MsSqlConnectionPool.QueryJsonStreamAsync(): acquire/stream/release pattern. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 8d7723a commit 03f9490

4 files changed

Lines changed: 322 additions & 0 deletions

File tree

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
using System.Runtime.CompilerServices;
2+
using System.Text;
3+
using System.Text.Json;
4+
5+
namespace CosmoSQLClient.MsSql;
6+
7+
/// <summary>
8+
/// Assembles fragmented JSON chunks (as produced by SQL Server's <c>FOR JSON</c> queries)
9+
/// into complete, independently-parsed <see cref="JsonElement"/> objects.
10+
/// </summary>
11+
/// <remarks>
12+
/// <para>
13+
/// SQL Server's <c>FOR JSON PATH</c> / <c>FOR JSON AUTO</c> does not respect JSON value
14+
/// boundaries when splitting output across rows — a single row may contain half a property
15+
/// name, and the next row may start mid-value. Naively streaming row-by-row therefore
16+
/// yields unusable partial JSON.
17+
/// </para>
18+
/// <para>
19+
/// This class solves that by feeding chunks through .NET's <see cref="Utf8JsonReader"/>
20+
/// with <see cref="JsonReaderState"/> persistence. The reader correctly handles escaped
21+
/// strings, nested objects and arrays, and resumes from exactly the right position when
22+
/// more data arrives — so <c>{</c> characters inside string values are never misread as
23+
/// object boundaries.
24+
/// </para>
25+
/// <para>
26+
/// For a <c>FOR JSON</c> result like <c>[{"Id":1},{"Id":2},…]</c>, each top-level array
27+
/// element is yielded as a separate <see cref="JsonElement"/> the moment the closing <c>}</c>
28+
/// of that element is received, without waiting for the entire array to arrive.
29+
/// </para>
30+
/// </remarks>
31+
public static class JsonChunkAssembler
32+
{
33+
// ── Public API ────────────────────────────────────────────────────────────
34+
35+
/// <summary>
36+
/// Stream individual JSON objects from a sequence of arbitrary-length string chunks.
37+
/// Designed for SQL Server <c>FOR JSON</c> output but works with any JSON array of objects.
38+
/// </summary>
39+
/// <param name="chunks">
40+
/// Raw JSON fragments — e.g. each row's text from a <c>FOR JSON</c> result set.
41+
/// </param>
42+
/// <param name="ct">Optional cancellation token.</param>
43+
/// <returns>
44+
/// One <see cref="JsonElement"/> per top-level array element as soon as it is complete,
45+
/// without buffering the entire JSON in memory.
46+
/// </returns>
47+
public static async IAsyncEnumerable<JsonElement> AssembleJsonObjectsAsync(
48+
IAsyncEnumerable<string> chunks,
49+
[EnumeratorCancellation] CancellationToken ct = default)
50+
{
51+
// Growing byte buffer for all received chunk bytes.
52+
using var buf = new MemoryStream(32768);
53+
54+
var state = default(JsonReaderState);
55+
long parsedTo = 0; // absolute byte offset consumed so far
56+
int depth = 0; // JSON nesting depth (1 = inside the top-level array)
57+
long elemStart = -1; // absolute offset of the current element's opening byte
58+
bool inArray = false;
59+
60+
await foreach (var chunk in chunks.WithCancellation(ct))
61+
{
62+
if (string.IsNullOrEmpty(chunk)) continue;
63+
64+
// Append chunk bytes.
65+
buf.Position = buf.Length;
66+
var chunkBytes = Encoding.UTF8.GetBytes(chunk);
67+
buf.Write(chunkBytes, 0, chunkBytes.Length);
68+
69+
// Process the buffer SYNCHRONOUSLY (Utf8JsonReader is a ref struct and
70+
// cannot cross yield/await boundaries — so we collect results then yield).
71+
var result = ProcessBuffer(
72+
buf.GetBuffer(), (int)parsedTo, (int)buf.Length, state,
73+
depth, elemStart, inArray);
74+
75+
// Update mutable state from synchronous result.
76+
state = result.State;
77+
depth = result.Depth;
78+
inArray = result.InArray;
79+
elemStart = result.ElemStart;
80+
81+
// Compact: if elements were yielded, discard consumed prefix bytes.
82+
if (result.ConsumedAbsolute > parsedTo)
83+
CompactBuffer(buf, result.ConsumedAbsolute, out parsedTo);
84+
else
85+
parsedTo += result.BytesConsumedFromOffset;
86+
87+
// Yield complete elements OUTSIDE the Utf8JsonReader scope.
88+
foreach (var elem in result.Elements)
89+
yield return elem;
90+
}
91+
}
92+
93+
/// <summary>
94+
/// Concatenate all chunks into a single JSON string.
95+
/// Suitable when the total JSON is small enough to fit in memory.
96+
/// </summary>
97+
public static async Task<string> BufferJsonAsync(
98+
IAsyncEnumerable<string> chunks,
99+
CancellationToken ct = default)
100+
{
101+
var sb = new StringBuilder();
102+
await foreach (var chunk in chunks.WithCancellation(ct))
103+
sb.Append(chunk);
104+
return sb.ToString();
105+
}
106+
107+
// ── Synchronous processing (no async/yield — safe for ref struct) ─────────
108+
109+
private readonly ref struct BufferResult
110+
{
111+
public IReadOnlyList<JsonElement> Elements { get; init; }
112+
public JsonReaderState State { get; init; }
113+
public int Depth { get; init; }
114+
public long ElemStart { get; init; }
115+
public bool InArray { get; init; }
116+
/// <summary>Bytes consumed from the start of the slice passed in.</summary>
117+
public long BytesConsumedFromOffset { get; init; }
118+
/// <summary>Absolute offset to compact to (= original parsedTo + BytesConsumedFromOffset).</summary>
119+
public long ConsumedAbsolute { get; init; }
120+
}
121+
122+
private static BufferResult ProcessBuffer(
123+
byte[] rawBuf, int parsedToInt, int bufLength,
124+
JsonReaderState state, int depth, long elemStart, bool inArray)
125+
{
126+
var elements = new List<JsonElement>();
127+
128+
var available = bufLength - parsedToInt;
129+
if (available <= 0)
130+
{
131+
return new BufferResult
132+
{
133+
Elements = elements, State = state, Depth = depth,
134+
ElemStart = elemStart, InArray = inArray,
135+
BytesConsumedFromOffset = 0, ConsumedAbsolute = parsedToInt,
136+
};
137+
}
138+
139+
var span = new ReadOnlySpan<byte>(rawBuf, parsedToInt, available);
140+
var reader = new Utf8JsonReader(span, isFinalBlock: false, state);
141+
142+
long consumedAbsolute = parsedToInt;
143+
bool needBreak = false;
144+
145+
while (!needBreak && reader.Read())
146+
{
147+
var tt = reader.TokenType;
148+
149+
if (tt == JsonTokenType.StartArray && !inArray && depth == 0)
150+
{
151+
inArray = true;
152+
depth = 1;
153+
continue;
154+
}
155+
156+
if (!inArray) continue;
157+
158+
if (tt is JsonTokenType.StartObject or JsonTokenType.StartArray)
159+
{
160+
if (depth == 1)
161+
elemStart = parsedToInt + reader.TokenStartIndex;
162+
depth++;
163+
}
164+
else if (tt is JsonTokenType.EndObject or JsonTokenType.EndArray)
165+
{
166+
depth--;
167+
168+
if (depth == 1 && elemStart >= 0)
169+
{
170+
// Complete element — extract and parse bytes.
171+
long end = parsedToInt + reader.BytesConsumed;
172+
int len = (int)(end - elemStart);
173+
var elemBytes = new byte[len];
174+
Array.Copy(rawBuf, (int)elemStart, elemBytes, 0, len);
175+
176+
using var doc = JsonDocument.Parse(elemBytes);
177+
elements.Add(doc.RootElement.Clone());
178+
179+
elemStart = -1;
180+
consumedAbsolute = end;
181+
182+
// Signal that the outer loop should compact and break after this.
183+
needBreak = true;
184+
}
185+
else if (depth == 0)
186+
{
187+
inArray = false;
188+
}
189+
}
190+
}
191+
192+
long bytesConsumed = parsedToInt + reader.BytesConsumed - parsedToInt;
193+
194+
return new BufferResult
195+
{
196+
Elements = elements,
197+
State = reader.CurrentState,
198+
Depth = depth,
199+
ElemStart = elemStart,
200+
InArray = inArray,
201+
BytesConsumedFromOffset = reader.BytesConsumed,
202+
ConsumedAbsolute = needBreak ? consumedAbsolute : parsedToInt + reader.BytesConsumed,
203+
};
204+
}
205+
206+
/// <summary>
207+
/// Discard fully-consumed prefix bytes from <paramref name="buf"/> to cap memory usage.
208+
/// </summary>
209+
private static void CompactBuffer(MemoryStream buf, long consumedAbsolute, out long newParsedTo)
210+
{
211+
long remaining = buf.Length - consumedAbsolute;
212+
var rawBuf = buf.GetBuffer();
213+
if (remaining > 0)
214+
Buffer.BlockCopy(rawBuf, (int)consumedAbsolute, rawBuf, 0, (int)remaining);
215+
buf.SetLength(remaining);
216+
newParsedTo = 0;
217+
}
218+
}
219+
220+
221+
/// <summary>
222+
/// Assembles fragmented JSON chunks (as produced by SQL Server's <c>FOR JSON</c> queries)
223+
/// into complete, independently-parsed <see cref="JsonElement"/> objects.
224+
/// </summary>
225+
/// <remarks>
226+
/// <para>
227+
/// SQL Server's <c>FOR JSON PATH</c> / <c>FOR JSON AUTO</c> does not respect JSON value
228+
/// boundaries when splitting output across rows — a single row may contain half a property
229+
/// name, and the next row may start mid-value. Naively streaming row-by-row therefore
230+
/// yields unusable partial JSON.
231+
/// </para>
232+
/// <para>
233+
/// This class solves that by feeding chunks through .NET's <see cref="Utf8JsonReader"/>
234+
/// with <see cref="JsonReaderState"/> persistence. The reader correctly handles escaped
235+
/// strings, nested objects and arrays, and resumes from exactly the right position when
236+
/// more data arrives — so <c>{</c> characters inside string values are never misread as
237+
/// object boundaries.
238+
/// </para>
239+
/// <para>
240+
/// For a <c>FOR JSON</c> result like <c>[{"Id":1},{"Id":2},…]</c>, each top-level array
241+
/// element is yielded as a separate <see cref="JsonElement"/> the moment the closing <c>}</c>
242+
/// of that element is received, without waiting for the entire array to arrive.
243+
/// </para>

src/SqlDotnetty.MsSql/MsSqlCommand.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,15 @@ public async Task<SqlDataSet> ExecuteDataSetAsync(CancellationToken ct = default
117117
/// Process(row);
118118
/// </code>
119119
/// </example>
120+
/// <summary>
121+
/// Execute <see cref="CommandText"/> as a <c>FOR JSON</c> query and stream one
122+
/// <see cref="System.Text.Json.JsonElement"/> per array element without buffering
123+
/// the entire JSON result in memory.
124+
/// </summary>
125+
public IAsyncEnumerable<System.Text.Json.JsonElement> QueryJsonStreamAsync(
126+
int jsonColumnIndex = 0, CancellationToken ct = default)
127+
=> Connection.QueryJsonStreamAsync(CommandText, Parameters.AsReadOnly(), jsonColumnIndex, ct);
128+
120129
public IAsyncEnumerable<SqlRow> QueryStreamAsync(CancellationToken ct = default)
121130
=> Connection.QueryStreamAsync(CommandText, Parameters.AsReadOnly(), ct);
122131

src/SqlDotnetty.MsSql/MsSqlConnection.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,54 @@ public Task CommitAsync(CancellationToken ct = default)
383383
public Task RollbackAsync(CancellationToken ct = default)
384384
=> ExecuteRawSqlAsync("ROLLBACK TRANSACTION", ct);
385385

386+
// ── FOR JSON streaming ────────────────────────────────────────────────────
387+
388+
/// <summary>
389+
/// Execute a <c>FOR JSON</c> query and stream one <see cref="System.Text.Json.JsonElement"/>
390+
/// per array element as each element becomes complete — without buffering the entire JSON
391+
/// result in memory.
392+
/// </summary>
393+
/// <remarks>
394+
/// SQL Server splits <c>FOR JSON</c> output into rows of ~2033 characters each, and the
395+
/// split points do NOT align with JSON object boundaries. This method uses
396+
/// <see cref="System.Text.Json.Utf8JsonReader"/> with <see cref="System.Text.Json.JsonReaderState"/>
397+
/// preservation to detect exact element boundaries across fragments and yield each
398+
/// top-level array element the moment its closing <c>}</c> arrives.
399+
/// </remarks>
400+
/// <param name="sql">A SQL query ending with <c>FOR JSON PATH</c> or <c>FOR JSON AUTO</c>.</param>
401+
/// <param name="parameters">Optional query parameters.</param>
402+
/// <param name="jsonColumnIndex">
403+
/// Zero-based index of the column that holds the JSON fragment (default: 0).
404+
/// </param>
405+
/// <example>
406+
/// <code>
407+
/// await foreach (var element in conn.QueryJsonStreamAsync(
408+
/// "SELECT Id, Name, Price FROM Products FOR JSON PATH"))
409+
/// {
410+
/// var id = element.GetProperty("Id").GetInt32();
411+
/// var name = element.GetProperty("Name").GetString();
412+
/// }
413+
/// </code>
414+
/// </example>
415+
public IAsyncEnumerable<System.Text.Json.JsonElement> QueryJsonStreamAsync(
416+
string sql,
417+
IReadOnlyList<SqlParameter>? parameters = null,
418+
int jsonColumnIndex = 0,
419+
CancellationToken ct = default)
420+
{
421+
var chunks = ExtractColumnChunks(QueryStreamAsync(sql, parameters, ct), jsonColumnIndex, ct);
422+
return JsonChunkAssembler.AssembleJsonObjectsAsync(chunks, ct);
423+
}
424+
425+
private static async IAsyncEnumerable<string> ExtractColumnChunks(
426+
IAsyncEnumerable<SqlRow> rows,
427+
int columnIndex,
428+
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct)
429+
{
430+
await foreach (var row in rows.WithCancellation(ct))
431+
yield return row[columnIndex].AsString() ?? string.Empty;
432+
}
433+
386434
public async Task CloseAsync()
387435
{
388436
_isOpen = false;

src/SqlDotnetty.MsSql/MsSqlConnectionPool.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,28 @@ public async IAsyncEnumerable<SqlRow> QueryStreamAsync(
258258
finally { await ReleaseAsync(conn).ConfigureAwait(false); }
259259
}
260260

261+
/// <summary>
262+
/// Execute a <c>FOR JSON</c> query and stream one
263+
/// <see cref="System.Text.Json.JsonElement"/> per array element without buffering
264+
/// the entire JSON result. A pool connection is acquired for the duration and
265+
/// returned automatically when done.
266+
/// </summary>
267+
public async IAsyncEnumerable<System.Text.Json.JsonElement> QueryJsonStreamAsync(
268+
string sql,
269+
IReadOnlyList<SqlParameter>? parameters = null,
270+
int jsonColumnIndex = 0,
271+
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct = default)
272+
{
273+
var conn = await AcquireAsync(ct).ConfigureAwait(false);
274+
try
275+
{
276+
await foreach (var elem in conn.QueryJsonStreamAsync(sql, parameters, jsonColumnIndex, ct)
277+
.ConfigureAwait(false))
278+
yield return elem;
279+
}
280+
finally { await ReleaseAsync(conn).ConfigureAwait(false); }
281+
}
282+
261283
public async Task BeginTransactionAsync(CancellationToken ct = default)
262284
{
263285
var conn = await AcquireAsync(ct).ConfigureAwait(false);

0 commit comments

Comments
 (0)