Skip to content

Commit e1ea796

Browse files
vkuttypCopilot
andcommitted
feat: add QueryStreamAsync to ISqlDatabase + ToDataTableAsync / QueryDataTableAsync extensions
- Add QueryStreamAsync(sql, params, ct) to ISqlDatabase interface - Implement QueryStreamAsync on SqliteConnection (yields rows via DbDataReader) - Implement QueryStreamAsync on SqliteConnectionPool (acquire/yield/release) - Add ToDataTableAsync() extension on IAsyncEnumerable<SqlRow> — streams rows into System.Data.DataTable one at a time, never holding full result in memory - Add QueryDataTableAsync() shorthand on ISqlDatabase — single call for DataGridView binding without going through SqlDataTable first Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent b96b00e commit e1ea796

3 files changed

Lines changed: 120 additions & 0 deletions

File tree

src/SqlDotnetty.Core/ISqlDatabase.cs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ public interface ISqlDatabase : IAsyncDisposable
1212
Task<IReadOnlyList<SqlRow>> QueryAsync(string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default);
1313
Task<IReadOnlyList<T>> QueryAsync<T>(string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default) where T : new();
1414

15+
// ── Streaming (row by row — never buffers the full result set) ────────────
16+
IAsyncEnumerable<SqlRow> QueryStreamAsync(string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default);
17+
1518
// ── Execute (INSERT/UPDATE/DELETE) returns rows affected ──────────────────
1619
Task<int> ExecuteAsync(string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default);
1720

@@ -50,4 +53,81 @@ public static async Task<SqlValue> ScalarAsync(
5053
var rows = await db.QueryAsync(sql, parameters, ct).ConfigureAwait(false);
5154
return rows.Count > 0 && rows[0].ColumnCount > 0 ? rows[0][0] : SqlValue.Null_;
5255
}
56+
57+
/// <summary>
58+
/// Streams query results directly into a <see cref="System.Data.DataTable"/> one row
59+
/// at a time — equivalent to <c>DataTable.Load(IDataReader)</c> but fully async.
60+
/// Rows appear in the DataTable as they arrive from the server; the full result set
61+
/// is never held in memory simultaneously.
62+
/// </summary>
63+
/// <example>
64+
/// <code>
65+
/// // Bind a large table to DataGridView without buffering all rows first
66+
/// var dt = await conn.QueryStreamAsync("SELECT * FROM Employees")
67+
/// .ToDataTableAsync("Employees");
68+
/// dataGridView1.DataSource = dt;
69+
///
70+
/// // Or use the shorthand on ISqlDatabase directly:
71+
/// dataGridView1.DataSource = await conn.QueryDataTableAsync("SELECT * FROM Employees");
72+
/// </code>
73+
/// </example>
74+
public static async Task<System.Data.DataTable> ToDataTableAsync(
75+
this IAsyncEnumerable<SqlRow> rows,
76+
string tableName = "",
77+
CancellationToken ct = default)
78+
{
79+
var dt = new System.Data.DataTable(tableName);
80+
bool columnsAdded = false;
81+
82+
await foreach (var row in rows.WithCancellation(ct).ConfigureAwait(false))
83+
{
84+
// Build columns from the first row's metadata
85+
if (!columnsAdded)
86+
{
87+
foreach (var col in row.Columns)
88+
dt.Columns.Add(col.Name, typeof(object));
89+
columnsAdded = true;
90+
}
91+
92+
var dataRow = dt.NewRow();
93+
for (int i = 0; i < row.ColumnCount; i++)
94+
dataRow[i] = SqlValueToClr(row[i]);
95+
dt.Rows.Add(dataRow);
96+
}
97+
98+
return dt;
99+
}
100+
101+
/// <summary>
102+
/// Convenience shorthand — streams SQL results directly into a
103+
/// <see cref="System.Data.DataTable"/> ready for DataGridView binding.
104+
/// </summary>
105+
public static Task<System.Data.DataTable> QueryDataTableAsync(
106+
this ISqlDatabase db,
107+
string sql,
108+
IReadOnlyList<SqlParameter>? parameters = null,
109+
CancellationToken ct = default)
110+
=> db.QueryStreamAsync(sql, parameters, ct).ToDataTableAsync(tableName: "", ct);
111+
112+
public static Task<System.Data.DataTable> QueryDataTableAsync(
113+
this ISqlDatabase db, string sql, params SqlParameter[] parameters)
114+
=> db.QueryStreamAsync(sql, parameters).ToDataTableAsync();
115+
116+
private static object SqlValueToClr(SqlValue v) => v switch
117+
{
118+
SqlValue.Null => DBNull.Value,
119+
SqlValue.Bool b => b.Value,
120+
SqlValue.Int8 b => b.Value,
121+
SqlValue.Int16 s => s.Value,
122+
SqlValue.Int32 i => i.Value,
123+
SqlValue.Int64 l => l.Value,
124+
SqlValue.Float f => f.Value,
125+
SqlValue.Double d => d.Value,
126+
SqlValue.Decimal d => d.Value,
127+
SqlValue.Text t => t.Value,
128+
SqlValue.Bytes b => b.Value,
129+
SqlValue.Uuid u => u.Value,
130+
SqlValue.Date d => d.Value,
131+
_ => DBNull.Value,
132+
};
53133
}

src/SqlDotnetty.Sqlite/SqliteConnection.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,32 @@ public async Task<IReadOnlyList<T>> QueryAsync<T>(
105105
return rows.Select(r => decoder.Decode<T>(r)).ToList();
106106
}
107107

108+
public async IAsyncEnumerable<SqlRow> QueryStreamAsync(
109+
string sql,
110+
IReadOnlyList<SqlParameter>? parameters = null,
111+
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct = default)
112+
{
113+
await _lock.WaitAsync(ct).ConfigureAwait(false);
114+
try
115+
{
116+
using var cmd = CreateCommand(sql, parameters);
117+
using var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false);
118+
119+
var sqlColumns = new List<SqlColumn>();
120+
for (int i = 0; i < reader.FieldCount; i++)
121+
sqlColumns.Add(new SqlColumn(reader.GetName(i), reader.GetDataTypeName(i)));
122+
123+
while (await reader.ReadAsync(ct).ConfigureAwait(false))
124+
{
125+
var values = new SqlValue[reader.FieldCount];
126+
for (int i = 0; i < reader.FieldCount; i++)
127+
values[i] = ConvertValue(reader, i);
128+
yield return new SqlRow(sqlColumns, values);
129+
}
130+
}
131+
finally { _lock.Release(); }
132+
}
133+
108134
public async Task<int> ExecuteAsync(
109135
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
110136
{

src/SqlDotnetty.Sqlite/SqliteConnectionPool.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,20 @@ public async Task<IReadOnlyList<T>> QueryAsync<T>(
117117
finally { await ReleaseAsync(conn).ConfigureAwait(false); }
118118
}
119119

120+
public async IAsyncEnumerable<SqlRow> QueryStreamAsync(
121+
string sql,
122+
IReadOnlyList<SqlParameter>? parameters = null,
123+
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct = default)
124+
{
125+
var conn = await AcquireAsync(ct).ConfigureAwait(false);
126+
try
127+
{
128+
await foreach (var row in conn.QueryStreamAsync(sql, parameters, ct).ConfigureAwait(false))
129+
yield return row;
130+
}
131+
finally { await ReleaseAsync(conn).ConfigureAwait(false); }
132+
}
133+
120134
public async Task<int> ExecuteAsync(
121135
string sql, IReadOnlyList<SqlParameter>? parameters = null, CancellationToken ct = default)
122136
{

0 commit comments

Comments
 (0)