Skip to content

Commit 52890a8

Browse files
berndverstBernd Verst
andauthored
Fix WorkItemDispatcher.DispatchAsync silent termination due to unhandled exceptions (#1322)
* Fix WorkItemDispatcher.DispatchAsync silent termination due to unhandled exceptions Wrap the DispatchAsync loop body in an outer try/catch to prevent the dispatch loop from silently dying when exceptions occur outside the inner fetch try/catch block. Add ContinueWith to Task.Run in StartAsync as a last-resort safety net for fatal exceptions. Add new DispatcherLoopFailed structured log event (EventId 30) at Error level with full exception details for telemetry visibility. Add comprehensive unit tests for the dispatch loop resilience. Fixes #1320 --------- Co-authored-by: Bernd Verst <beverst@microsoft.com>
1 parent 5a445c6 commit 52890a8

6 files changed

Lines changed: 641 additions & 97 deletions

File tree

src/DurableTask.Core/Logging/EventIds.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ static class EventIds
3131
public const int ProcessWorkItemStarting = 27;
3232
public const int ProcessWorkItemCompleted = 28;
3333
public const int ProcessWorkItemFailed = 29;
34+
public const int DispatcherLoopFailed = 30;
3435

3536
public const int SchedulingOrchestration = 40;
3637
public const int RaisingEvent = 41;

src/DurableTask.Core/Logging/LogEvents.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,37 @@ void IEventSourceEvent.WriteEventSource() =>
158158
StructuredEventSource.Log.DispatcherStopped(this.Dispatcher, Utils.AppName, Utils.PackageVersion);
159159
}
160160

161+
internal class DispatcherLoopFailed : StructuredLogEvent, IEventSourceEvent
162+
{
163+
public DispatcherLoopFailed(WorkItemDispatcherContext context, Exception exception)
164+
{
165+
this.Dispatcher = context.GetDisplayName();
166+
this.Details = exception.ToString();
167+
}
168+
169+
[StructuredLogField]
170+
public string Dispatcher { get; }
171+
172+
[StructuredLogField]
173+
public string Details { get; }
174+
175+
public override EventId EventId => new EventId(
176+
EventIds.DispatcherLoopFailed,
177+
nameof(EventIds.DispatcherLoopFailed));
178+
179+
public override LogLevel Level => LogLevel.Error;
180+
181+
protected override string CreateLogMessage() =>
182+
$"{this.Dispatcher}: Unhandled exception in dispatch loop. Will retry after backoff. Details: {this.Details}";
183+
184+
void IEventSourceEvent.WriteEventSource() =>
185+
StructuredEventSource.Log.DispatcherLoopFailed(
186+
this.Dispatcher,
187+
this.Details,
188+
Utils.AppName,
189+
Utils.PackageVersion);
190+
}
191+
161192
internal class DispatchersStopping : StructuredLogEvent, IEventSourceEvent
162193
{
163194
public DispatchersStopping(string name, string id, int concurrentWorkItemCount, int activeFetchers)

src/DurableTask.Core/Logging/LogHelper.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,21 @@ internal void DispatcherStopped(WorkItemDispatcherContext context)
107107
}
108108
}
109109

110+
/// <summary>
111+
/// Logs that a work item dispatch loop encountered an unhandled exception.
112+
/// </summary>
113+
/// <param name="context">The context of the dispatcher that failed.</param>
114+
/// <param name="exception">The unhandled exception.</param>
115+
internal void DispatcherLoopFailed(WorkItemDispatcherContext context, Exception exception)
116+
{
117+
if (this.IsStructuredLoggingEnabled)
118+
{
119+
this.WriteStructuredLog(
120+
new LogEvents.DispatcherLoopFailed(context, exception),
121+
exception);
122+
}
123+
}
124+
110125
/// <summary>
111126
/// Logs that the work item dispatcher is watching for individual dispatch loops to finish stopping.
112127
/// </summary>

src/DurableTask.Core/Logging/StructuredEventSource.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,15 @@ internal void DispatcherStopped(string Dispatcher, string AppName, string Extens
102102
}
103103
}
104104

105+
[Event(EventIds.DispatcherLoopFailed, Level = EventLevel.Error, Version = 1)]
106+
internal void DispatcherLoopFailed(string Dispatcher, string Details, string AppName, string ExtensionVersion)
107+
{
108+
if (this.IsEnabled(EventLevel.Error))
109+
{
110+
this.WriteEvent(EventIds.DispatcherLoopFailed, Dispatcher, Details, AppName, ExtensionVersion);
111+
}
112+
}
113+
105114
[Event(EventIds.DispatchersStopping, Level = EventLevel.Verbose, Version = 1)]
106115
internal void DispatchersStopping(
107116
string Dispatcher,

src/DurableTask.Core/WorkItemDispatcher.cs

Lines changed: 143 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,14 @@ public async Task StartAsync()
142142

143143
// We just want this to Run we intentionally don't wait
144144
#pragma warning disable 4014
145-
Task.Run(() => this.DispatchAsync(context));
145+
Task.Run(() => this.DispatchAsync(context)).ContinueWith(t =>
146+
{
147+
TraceHelper.TraceException(
148+
TraceEventType.Critical,
149+
"WorkItemDispatcherDispatch-FatalTermination",
150+
t.Exception,
151+
$"Dispatch loop for '{this.name}' terminated fatally!");
152+
}, TaskContinuationOptions.OnlyOnFaulted);
146153
#pragma warning restore 4014
147154
}
148155
}
@@ -224,128 +231,167 @@ async Task DispatchAsync(WorkItemDispatcherContext context)
224231
bool logThrottle = true;
225232
while (this.isStarted)
226233
{
227-
if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5)))
234+
var semaphoreAcquired = false;
235+
var scheduledWorkItem = false;
236+
try
228237
{
229-
if (logThrottle)
238+
if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5)))
230239
{
231-
// This can happen frequently under heavy load.
232-
// To avoid log spam, we log just once until we can proceed.
233-
this.LogHelper.FetchingThrottled(
234-
context,
235-
this.concurrentWorkItemCount,
236-
this.MaxConcurrentWorkItems);
237-
TraceHelper.Trace(
238-
TraceEventType.Warning,
239-
"WorkItemDispatcherDispatch-MaxOperations",
240-
this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept."));
241-
242-
logThrottle = false;
240+
if (logThrottle)
241+
{
242+
// This can happen frequently under heavy load.
243+
// To avoid log spam, we log just once until we can proceed.
244+
this.LogHelper.FetchingThrottled(
245+
context,
246+
this.concurrentWorkItemCount,
247+
this.MaxConcurrentWorkItems);
248+
TraceHelper.Trace(
249+
TraceEventType.Warning,
250+
"WorkItemDispatcherDispatch-MaxOperations",
251+
this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept."));
252+
253+
logThrottle = false;
254+
}
255+
256+
continue;
243257
}
244258

245-
continue;
246-
}
259+
semaphoreAcquired = true;
260+
logThrottle = true;
247261

248-
logThrottle = true;
262+
var delaySecs = 0;
263+
T workItem = default(T);
264+
try
265+
{
266+
Interlocked.Increment(ref this.activeFetchers);
267+
this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems);
268+
TraceHelper.Trace(
269+
TraceEventType.Verbose,
270+
"WorkItemDispatcherDispatch-StartFetch",
271+
this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
249272

250-
var delaySecs = 0;
251-
T workItem = default(T);
252-
try
253-
{
254-
Interlocked.Increment(ref this.activeFetchers);
255-
this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems);
256-
TraceHelper.Trace(
257-
TraceEventType.Verbose,
258-
"WorkItemDispatcherDispatch-StartFetch",
259-
this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
273+
Stopwatch timer = Stopwatch.StartNew();
274+
workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token);
260275

261-
Stopwatch timer = Stopwatch.StartNew();
262-
workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token);
276+
if (!IsNull(workItem))
277+
{
278+
string workItemId = this.workItemIdentifier(workItem);
279+
this.LogHelper.FetchWorkItemCompleted(
280+
context,
281+
workItemId,
282+
timer.Elapsed,
283+
this.concurrentWorkItemCount,
284+
this.MaxConcurrentWorkItems);
285+
}
263286

264-
if (!IsNull(workItem))
287+
TraceHelper.Trace(
288+
TraceEventType.Verbose,
289+
"WorkItemDispatcherDispatch-EndFetch",
290+
this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
291+
}
292+
catch (TimeoutException)
265293
{
266-
string workItemId = this.workItemIdentifier(workItem);
267-
this.LogHelper.FetchWorkItemCompleted(
268-
context,
269-
workItemId,
270-
timer.Elapsed,
271-
this.concurrentWorkItemCount,
272-
this.MaxConcurrentWorkItems);
294+
delaySecs = 0;
273295
}
274-
275-
TraceHelper.Trace(
276-
TraceEventType.Verbose,
277-
"WorkItemDispatcherDispatch-EndFetch",
278-
this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
279-
}
280-
catch (TimeoutException)
281-
{
282-
delaySecs = 0;
283-
}
284-
catch (TaskCanceledException exception)
285-
{
286-
TraceHelper.Trace(
287-
TraceEventType.Information,
288-
"WorkItemDispatcherDispatch-TaskCanceledException",
289-
this.GetFormattedLog(dispatcherId, $"TaskCanceledException while fetching workItem, should be harmless: {exception.Message}"));
290-
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
291-
}
292-
catch (Exception exception)
293-
{
294-
if (!this.isStarted)
296+
catch (TaskCanceledException exception)
295297
{
296298
TraceHelper.Trace(
297-
TraceEventType.Information,
298-
"WorkItemDispatcherDispatch-HarmlessException",
299-
this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}"));
299+
TraceEventType.Information,
300+
"WorkItemDispatcherDispatch-TaskCanceledException",
301+
this.GetFormattedLog(dispatcherId, $"TaskCanceledException while fetching workItem, should be harmless: {exception.Message}"));
302+
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
300303
}
301-
else
304+
catch (Exception exception)
302305
{
303-
this.LogHelper.FetchWorkItemFailure(context, exception);
304-
// TODO : dump full node context here
305-
TraceHelper.TraceException(
306-
TraceEventType.Warning,
307-
"WorkItemDispatcherDispatch-Exception",
308-
exception,
309-
this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}"));
310-
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
306+
if (!this.isStarted)
307+
{
308+
TraceHelper.Trace(
309+
TraceEventType.Information,
310+
"WorkItemDispatcherDispatch-HarmlessException",
311+
this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}"));
312+
}
313+
else
314+
{
315+
this.LogHelper.FetchWorkItemFailure(context, exception);
316+
// TODO : dump full node context here
317+
TraceHelper.TraceException(
318+
TraceEventType.Warning,
319+
"WorkItemDispatcherDispatch-Exception",
320+
exception,
321+
this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}"));
322+
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
323+
}
324+
}
325+
finally
326+
{
327+
Interlocked.Decrement(ref this.activeFetchers);
311328
}
312-
}
313-
finally
314-
{
315-
Interlocked.Decrement(ref this.activeFetchers);
316-
}
317329

318-
var scheduledWorkItem = false;
319-
if (!IsNull(workItem))
320-
{
321-
if (!this.isStarted)
330+
if (!IsNull(workItem))
322331
{
323-
if (this.SafeReleaseWorkItem != null)
332+
if (!this.isStarted)
324333
{
325-
await this.SafeReleaseWorkItem(workItem);
334+
if (this.SafeReleaseWorkItem != null)
335+
{
336+
await this.SafeReleaseWorkItem(workItem);
337+
}
338+
}
339+
else
340+
{
341+
Interlocked.Increment(ref this.concurrentWorkItemCount);
342+
// We just want this to Run we intentionally don't wait
343+
#pragma warning disable 4014
344+
Task.Run(() => this.ProcessWorkItemAsync(context, workItem));
345+
#pragma warning restore 4014
346+
347+
scheduledWorkItem = true;
326348
}
327349
}
328-
else
350+
351+
delaySecs = Math.Max(this.delayOverrideSecs, delaySecs);
352+
if (delaySecs > 0)
329353
{
330-
Interlocked.Increment(ref this.concurrentWorkItemCount);
331-
// We just want this to Run we intentionally don't wait
332-
#pragma warning disable 4014
333-
Task.Run(() => this.ProcessWorkItemAsync(context, workItem));
334-
#pragma warning restore 4014
354+
await Task.Delay(TimeSpan.FromSeconds(delaySecs));
355+
}
335356

336-
scheduledWorkItem = true;
357+
if (!scheduledWorkItem)
358+
{
359+
this.concurrencyLock.Release();
337360
}
338361
}
339-
340-
delaySecs = Math.Max(this.delayOverrideSecs, delaySecs);
341-
if (delaySecs > 0)
362+
catch (Exception exception) when (!Utils.IsFatal(exception))
342363
{
343-
await Task.Delay(TimeSpan.FromSeconds(delaySecs));
344-
}
364+
// Catch-all for any unhandled exception in the dispatch loop body.
365+
// Without this, the dispatch loop would silently terminate because
366+
// DispatchAsync runs as a fire-and-forget Task.Run.
367+
this.LogHelper.DispatcherLoopFailed(context, exception);
368+
TraceHelper.TraceException(
369+
TraceEventType.Error,
370+
"WorkItemDispatcherDispatch-UnhandledException",
371+
exception,
372+
this.GetFormattedLog(dispatcherId,
373+
$"Unhandled exception in dispatch loop. Will retry after backoff."));
374+
375+
// Release the semaphore if we acquired it but never handed it off
376+
// to ProcessWorkItemAsync, to avoid permanently reducing concurrency.
377+
if (semaphoreAcquired && !scheduledWorkItem)
378+
{
379+
try { this.concurrencyLock.Release(); } catch { /* best effort */ }
380+
}
345381

346-
if (!scheduledWorkItem)
347-
{
348-
this.concurrencyLock.Release();
382+
try
383+
{
384+
await Task.Delay(TimeSpan.FromSeconds(BackOffIntervalOnInvalidOperationSecs), this.shutdownCancellationTokenSource.Token);
385+
}
386+
catch (OperationCanceledException)
387+
{
388+
// Shutdown requested during backoff; exit promptly.
389+
}
390+
catch (ObjectDisposedException)
391+
{
392+
// CancellationTokenSource was disposed (e.g., Dispose called
393+
// shortly after StopAsync); treat as shutdown.
394+
}
349395
}
350396
}
351397

0 commit comments

Comments
 (0)