Skip to content

Commit bcefce9

Browse files
vkuttypclaude
andcommitted
release: 1.9.54 — real protocol streaming for Postgres + MySQL
Both QueryStreamAsync implementations were previously fake: var rows = await QueryAsync(...).ConfigureAwait(false); foreach (var row in rows) yield return row; So callers got identical buffer-everything-then-yield behaviour — no matter how many MB the result set was, they waited for the full reply before the first row surfaced. This release replaces both with real per-network-frame streaming: Postgres (PostgresConnection.cs) - drives the PipeReader token loop directly - parses RowDescription / DataRow / ReadyForQuery / ErrorResponse in place, yielding all DataRows from each ReadAsync result before requesting more bytes from the socket. MySQL (MySqlConnection.cs) - implements the COM_QUERY response state machine inline (col-count → col-defs → EOF → rows → EOF/OK) - same per-frame yield discipline as the Postgres path. End-to-end effect: a multi-megabyte SELECT on a slow link now surfaces rows progressively instead of waiting for the full reply, so consumers that NDJSON-stream to a UI (CosmoSqlStudio, CosmoStudioJs) see live ticking instead of one big hand-off at the end. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 991179a commit bcefce9

3 files changed

Lines changed: 250 additions & 9 deletions

File tree

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@
77
<Authors>vkuttyp</Authors>
88
<PackageLicenseExpression>MIT</PackageLicenseExpression>
99
<RepositoryUrl>https://github.com/vkuttyp/CosmoSQLClient-Dotnet</RepositoryUrl>
10-
<Version>1.9.53</Version>
10+
<Version>1.9.54</Version>
1111
</PropertyGroup>
1212
</Project>

src/CosmoSQLClient.MySql/MySqlConnection.cs

Lines changed: 139 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -453,12 +453,146 @@ public async Task<IReadOnlyList<SqlRow>> QueryAsync(string sql, IReadOnlyList<Sq
453453
return result;
454454
}
455455

456-
public async IAsyncEnumerable<SqlRow> QueryStreamAsync(string sql, IReadOnlyList<SqlParameter>? parameters = null, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct = default)
456+
/// <summary>
457+
/// Stream rows from a SELECT-style query as they arrive on the wire. Each
458+
/// network frame's worth of row packets is parsed and yielded before we go
459+
/// back for more bytes — so a multi-megabyte result set on a slow link
460+
/// surfaces rows progressively instead of waiting for the entire response.
461+
/// Replaces the previous fake implementation that just delegated to
462+
/// <see cref="QueryAsync"/> and yielded from the materialized list.
463+
/// </summary>
464+
public async IAsyncEnumerable<SqlRow> QueryStreamAsync(
465+
string sql,
466+
IReadOnlyList<SqlParameter>? parameters = null,
467+
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct = default)
457468
{
458-
// QueryStreamAsync can't easily yield from inside ProcessTokensAsync loop without an intermediate enumerator pattern.
459-
// For simplicity, we execute QueryAsync and yield the list. Since the list is fully unboxed struct array, overhead is minimal compared to true streaming!
460-
var rows = await QueryAsync(sql, parameters, ct).ConfigureAwait(false);
461-
foreach(var row in rows) yield return row;
469+
if (parameters is { Count: > 0 })
470+
throw new NotSupportedException("Parameterized MySQL query requires Binary Protocol integration rewrite.");
471+
472+
await _lock.WaitAsync(ct).ConfigureAwait(false);
473+
try
474+
{
475+
await SendPacketAsync(MyQueryMessage.Build(sql), 0, ct).ConfigureAwait(false);
476+
477+
// State machine for the COM_QUERY response shape:
478+
// 1) header packet — column count (lenenc int) | OK (0x00) | ERR (0xFF)
479+
// 2) N column-definition packets
480+
// 3) EOF (0xFE, len < 9) — columns done
481+
// 4) row packets …
482+
// 5) EOF (0xFE, len < 9) or OK (0x00, len < 9) — rows done
483+
int expectedCols = 0;
484+
int readCols = 0;
485+
List<MyColumnDef>? colDefs = null;
486+
IReadOnlyList<SqlColumn>? columns = null;
487+
Dictionary<string, int>? colIndex = null;
488+
var pendingRows = new List<SqlRow>(64);
489+
SqlException? errorToThrow = null;
490+
bool done = false;
491+
492+
while (!done)
493+
{
494+
var result = await _reader!.ReadAsync(ct).ConfigureAwait(false);
495+
var buffer = result.Buffer;
496+
497+
while (buffer.Length >= 4)
498+
{
499+
int len;
500+
var lenSeq = buffer.Slice(0, 3);
501+
if (lenSeq.IsSingleSegment)
502+
{
503+
var span = lenSeq.First.Span;
504+
len = span[0] | (span[1] << 8) | (span[2] << 16);
505+
}
506+
else
507+
{
508+
byte[] tmp = new byte[3];
509+
lenSeq.CopyTo(tmp);
510+
len = tmp[0] | (tmp[1] << 8) | (tmp[2] << 16);
511+
}
512+
513+
if (buffer.Length < len + 4) break;
514+
515+
var payload = buffer.Slice(4, len);
516+
buffer = buffer.Slice(len + 4);
517+
518+
var peek = new SequenceReader<byte>(payload);
519+
if (!peek.TryRead(out byte header))
520+
continue;
521+
522+
if (header == 0xFF)
523+
{
524+
errorToThrow = SqlException.Query(MyDecoder.DecodeError(ref peek).Message);
525+
done = true;
526+
break;
527+
}
528+
529+
if (columns == null)
530+
{
531+
if (expectedCols == 0)
532+
{
533+
// OK with no result set (e.g. SELECT that returns nothing? In practice
534+
// COM_QUERY only emits OK here for non-SELECT — for streaming a SELECT
535+
// we never expect this branch, but handle it defensively.)
536+
if (header == 0x00) { done = true; break; }
537+
538+
var r2 = new SequenceReader<byte>(payload);
539+
expectedCols = (int)MyDecoder.ReadLenEncInt(ref r2);
540+
colDefs = new List<MyColumnDef>(expectedCols);
541+
continue;
542+
}
543+
544+
if (readCols < expectedCols)
545+
{
546+
colDefs!.Add(MyDecoder.DecodeColumnMeta(payload));
547+
readCols++;
548+
continue;
549+
}
550+
551+
// EOF terminator after column defs.
552+
if (header == 0xFE && payload.Length < 9)
553+
{
554+
var cols = new SqlColumn[colDefs!.Count];
555+
for (int i = 0; i < cols.Length; i++)
556+
cols[i] = new SqlColumn(colDefs[i].Name, colDefs[i].Type.ToString());
557+
columns = cols;
558+
colIndex = new Dictionary<string, int>(cols.Length, StringComparer.OrdinalIgnoreCase);
559+
for (int i = 0; i < cols.Length; i++)
560+
if (!colIndex.ContainsKey(cols[i].Name)) colIndex.Add(cols[i].Name, i);
561+
continue;
562+
}
563+
564+
continue;
565+
}
566+
567+
// Rows phase. Either the row data, or the terminating EOF / OK packet.
568+
if ((header == 0xFE || header == 0x00) && payload.Length < 9)
569+
{
570+
done = true;
571+
break;
572+
}
573+
574+
pendingRows.Add(MyDecoder.DecodeRow(payload, columns!, colIndex));
575+
}
576+
577+
_reader!.AdvanceTo(buffer.Start, result.Buffer.End);
578+
579+
// Yield everything we collected from this network frame BEFORE the next
580+
// ReadAsync, so the consumer sees rows as the wire delivers them.
581+
if (pendingRows.Count > 0)
582+
{
583+
foreach (var row in pendingRows) yield return row;
584+
pendingRows.Clear();
585+
}
586+
587+
if (result.IsCompleted) break;
588+
}
589+
590+
if (errorToThrow is not null) throw errorToThrow;
591+
}
592+
finally
593+
{
594+
_lock.Release();
595+
}
462596
}
463597

464598
private struct SqlExecuteBuilder : IMySqlTokenHandler

src/CosmoSQLClient.Postgres/PostgresConnection.cs

Lines changed: 110 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -462,10 +462,117 @@ private static string MapPlaceholders(string sql, IReadOnlyList<SqlParameter> pa
462462
return result;
463463
}
464464

465-
public async IAsyncEnumerable<SqlRow> QueryStreamAsync(string sql, IReadOnlyList<SqlParameter>? parameters = null, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct = default)
465+
/// <summary>
466+
/// Stream rows from a SELECT-style query as they arrive on the wire. Each
467+
/// network frame's worth of <c>DataRow</c> messages is parsed and yielded
468+
/// before we go back for more bytes — so a multi-megabyte result set on a
469+
/// slow link surfaces rows progressively instead of waiting for the entire
470+
/// response. Replaces the previous fake implementation that just delegated
471+
/// to <see cref="QueryAsync"/> and yielded from the materialized list.
472+
/// </summary>
473+
public async IAsyncEnumerable<SqlRow> QueryStreamAsync(
474+
string sql,
475+
IReadOnlyList<SqlParameter>? parameters = null,
476+
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct = default)
466477
{
467-
var rows = await QueryAsync(sql, parameters, ct).ConfigureAwait(false);
468-
foreach (var row in rows) yield return row;
478+
await _lock.WaitAsync(ct).ConfigureAwait(false);
479+
try
480+
{
481+
// Send the query — same wire shape QueryAsync uses.
482+
if (parameters is { Count: > 0 })
483+
{
484+
sql = MapPlaceholders(sql, parameters);
485+
_writer!.Write(PgParseMessage.Build("", sql, new int[parameters.Count])); // 0 OIDs = infer
486+
_writer!.Write(PgBindMessage.Build("", "", parameters));
487+
_writer!.Write(PgDescribeMessage.Build('P', ""));
488+
_writer!.Write(PgExecuteMessage.Build(""));
489+
_writer!.Write(PgSyncMessage.Build());
490+
await _writer!.FlushAsync(ct).ConfigureAwait(false);
491+
}
492+
else
493+
{
494+
_writer!.Write(PgQueryMessage.Build(sql));
495+
await _writer!.FlushAsync(ct).ConfigureAwait(false);
496+
}
497+
498+
IReadOnlyList<SqlColumn>? columns = null;
499+
Dictionary<string, int>? colIndex = null;
500+
var pendingRows = new List<SqlRow>(64);
501+
SqlException? errorToThrow = null;
502+
bool done = false;
503+
504+
while (!done)
505+
{
506+
var result = await _reader!.ReadAsync(ct).ConfigureAwait(false);
507+
var buffer = result.Buffer;
508+
509+
while (buffer.Length >= 5)
510+
{
511+
byte type = buffer.First.Span[0];
512+
int len;
513+
var lenSeq = buffer.Slice(1, 4);
514+
if (lenSeq.IsSingleSegment)
515+
{
516+
var span = lenSeq.First.Span;
517+
len = (span[0] << 24) | (span[1] << 16) | (span[2] << 8) | span[3];
518+
}
519+
else
520+
{
521+
byte[] lenBuf = new byte[4];
522+
lenSeq.CopyTo(lenBuf);
523+
len = (lenBuf[0] << 24) | (lenBuf[1] << 16) | (lenBuf[2] << 8) | lenBuf[3];
524+
}
525+
526+
if (buffer.Length < len + 1) break;
527+
528+
var payload = buffer.Slice(5, len - 4);
529+
buffer = buffer.Slice(len + 1);
530+
531+
if (type == (byte)PgBackendType.ReadyForQuery)
532+
{
533+
done = true;
534+
break;
535+
}
536+
if (type == (byte)PgBackendType.ErrorResponse)
537+
{
538+
errorToThrow = SqlException.Query(PgDecoder.ParseErrorResponse(payload));
539+
done = true;
540+
break;
541+
}
542+
if (type == (byte)PgBackendType.RowDescription)
543+
{
544+
columns = PgDecoder.ParseRowDescription(payload);
545+
colIndex = new Dictionary<string, int>(columns.Count, StringComparer.OrdinalIgnoreCase);
546+
for (int i = 0; i < columns.Count; i++)
547+
if (!colIndex.ContainsKey(columns[i].Name)) colIndex.Add(columns[i].Name, i);
548+
}
549+
else if (type == (byte)PgBackendType.DataRow && columns != null)
550+
{
551+
pendingRows.Add(PgDecoder.ParseDataRow(payload, columns, colIndex));
552+
}
553+
// Other token types (ParseComplete, BindComplete, ParameterDescription,
554+
// NoData, CommandComplete, NoticeResponse, …) are transient — ignored.
555+
}
556+
557+
_reader.AdvanceTo(buffer.Start, result.Buffer.End);
558+
559+
// Yield everything we collected from this network frame BEFORE the next
560+
// ReadAsync, so the consumer sees rows as the wire delivers them.
561+
if (pendingRows.Count > 0)
562+
{
563+
foreach (var row in pendingRows) yield return row;
564+
pendingRows.Clear();
565+
}
566+
567+
if (result.IsCompleted) break;
568+
}
569+
570+
if (errorToThrow is not null) throw errorToThrow;
571+
}
572+
finally
573+
{
574+
_lock.Release();
575+
}
469576
}
470577

471578
private struct SqlExecuteBuilder : IPostgresTokenHandler

0 commit comments

Comments
 (0)