@@ -242,8 +242,10 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
242242 // If the provider provided work items, execute them.
243243 if ( workItem . NewMessages ? . Count > 0 )
244244 {
245- // As long as we are in this execution loop, the extended session is maintained.
246- workItem . IsExtendedSession = true ;
245+ concurrencyLockAcquired = this . concurrentSessionLock . Acquire ( ) ;
246+ workItem . IsExtendedSession = concurrencyLockAcquired ;
247+ // Regardless of whether or not we acquired the concurrent session lock, we will make sure to execute this work item.
248+ // If we failed to acquire it, we will end the extended session after this execution.
247249 bool isCompletedOrInterrupted = await this . OnProcessWorkItemAsync ( workItem ) ;
248250 if ( isCompletedOrInterrupted )
249251 {
@@ -253,15 +255,11 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
253255 processCount ++ ;
254256 }
255257
256- // Fetches beyond the first require getting an extended session lock, used to prevent starvation.
258+ // If we failed to acquire the concurrent session lock, we will end the extended session after the execution of the first work item
257259 if ( processCount > 0 && ! concurrencyLockAcquired )
258260 {
259- concurrencyLockAcquired = this . concurrentSessionLock . Acquire ( ) ;
260- if ( ! concurrencyLockAcquired )
261- {
262- TraceHelper . Trace ( TraceEventType . Verbose , "OnProcessWorkItemSession-MaxOperations" , "Failed to acquire concurrent session lock." ) ;
263- break ;
264- }
261+ TraceHelper . Trace ( TraceEventType . Verbose , "OnProcessWorkItemSession-MaxOperations" , "Failed to acquire concurrent session lock." ) ;
262+ break ;
265263 }
266264
267265 TraceHelper . Trace ( TraceEventType . Verbose , "OnProcessWorkItemSession-StartFetch" , "Starting fetch of existing session." ) ;
0 commit comments