@@ -177,21 +177,28 @@ public static System.Data.DataRow ToDataRow(this SqlRow row)
177177 /// Optional callback invoked on the UI thread after each batch is flushed.
178178 /// Receives the total number of rows added so far. Use to update a label, progress bar, etc.
179179 /// </param>
180+ /// <param name="onError">
181+ /// Optional callback invoked on the UI thread if the producer encounters an error.
182+ /// Receives the exception. If not provided, the exception is re-thrown from the awaited task.
183+ /// </param>
180184 /// <param name="ct">Cancellation token.</param>
181185 /// <example>
182186 /// <code>
183187 /// var dt = new DataTable();
184188 /// dataGridView1.DataSource = dt;
185189 ///
186190 /// await conn.QueryStreamAsync("SELECT * FROM BigTable")
187- /// .FillDataTableAsync(dt, onProgress: n => label1.Text = $"{n} rows loaded");
191+ /// .FillDataTableAsync(dt,
192+ /// onProgress: n => label1.Text = $"{n} rows loaded",
193+ /// onError: ex => MessageBox.Show(ex.Message));
188194 /// </code>
189195 /// </example>
190196 public static async Task FillDataTableAsync (
191197 this IAsyncEnumerable < SqlRow > rows ,
192198 System . Data . DataTable table ,
193199 int batchSize = 500 ,
194200 Action < int > ? onProgress = null ,
201+ Action < Exception > ? onError = null ,
195202 CancellationToken ct = default )
196203 {
197204 // Bounded channel: producer (background) reads network; consumer (UI thread) flushes batches.
@@ -207,6 +214,7 @@ public static async Task FillDataTableAsync(
207214 // Producer: run network I/O on a thread-pool thread so the UI is never blocked.
208215 var producer = Task . Run ( async ( ) =>
209216 {
217+ Exception ? fault = null ;
210218 try
211219 {
212220 var batch = new List < SqlRow > ( batchSize ) ;
@@ -222,24 +230,39 @@ public static async Task FillDataTableAsync(
222230 if ( batch . Count > 0 )
223231 await channel . Writer . WriteAsync ( batch , ct ) . ConfigureAwait ( false ) ;
224232 }
233+ catch ( Exception ex ) when ( ex is not OperationCanceledException )
234+ {
235+ fault = ex ;
236+ }
225237 finally
226238 {
227- channel . Writer . Complete ( ) ;
239+ // Pass the exception into the channel so the consumer sees it immediately
240+ // instead of silently completing with no data.
241+ channel . Writer . Complete ( fault ) ;
228242 }
229243 } , ct ) ;
230244
231245 // Consumer: runs on the calling (UI) thread — no ConfigureAwait(false).
232246 // await Task.Yield() between batches lets the message pump process redraws/clicks.
233247 int totalRows = 0 ;
234- await foreach ( var batch in channel . Reader . ReadAllAsync ( ct ) )
248+ try
249+ {
250+ await foreach ( var batch in channel . Reader . ReadAllAsync ( ct ) )
251+ {
252+ FlushBatch ( table , batch ) ;
253+ totalRows += batch . Count ;
254+ onProgress ? . Invoke ( totalRows ) ;
255+ await Task . Yield ( ) ;
256+ }
257+ }
258+ catch ( Exception ex ) when ( onError != null )
235259 {
236- FlushBatch ( table , batch ) ;
237- totalRows += batch . Count ;
238- onProgress ? . Invoke ( totalRows ) ;
239- await Task . Yield ( ) ;
260+ // Surface the error on the UI thread via the callback instead of throwing.
261+ onError ( ex ) ;
262+ return ;
240263 }
241264
242- // Propagate any producer exception (e.g. network error) to the caller.
265+ // Propagate any producer exception to the caller when no onError handler is set .
243266 await producer ;
244267 }
245268
0 commit comments