Skip to content

Commit 9ec62f3

Browse files
sophiatevSophia Tevosyan
andauthored
Adding new logic for extended sessions to enable the DTS use-case (#1333)
* first commit? * marked the return value as nullable --------- Co-authored-by: Sophia Tevosyan <stevosyan@microsoft.com>
1 parent 1107cd1 commit 9ec62f3

4 files changed

Lines changed: 16 additions & 6 deletions

File tree

src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ namespace DurableTask.AzureStorage.Messaging
1919
using System.Linq;
2020
using System.Threading;
2121
using System.Threading.Tasks;
22-
using Azure;
2322
using DurableTask.Core;
2423
using DurableTask.Core.History;
2524
using Newtonsoft.Json;
@@ -226,5 +225,11 @@ bool IsNonexistantInstance()
226225
{
227226
return this.RuntimeState.Events.Count == 0 || this.RuntimeState.ExecutionStartedEvent == null;
228227
}
228+
229+
public Task EndSessionAsync()
230+
{
231+
// No-op
232+
return Task.CompletedTask;
233+
}
229234
}
230235
}

src/DurableTask.Core/IOrchestrationSession.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
// limitations under the License.
1212
// ----------------------------------------------------------------------------------
1313

14+
#nullable enable
1415
namespace DurableTask.Core
1516
{
1617
using System.Collections.Generic;
@@ -29,6 +30,12 @@ public interface IOrchestrationSession
2930
/// or until an internal wait period has expired. In either case, <c>null</c> can be returned
3031
/// and the dispatcher will shut down the session.
3132
/// </remarks>
32-
Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(TaskOrchestrationWorkItem workItem);
33+
Task<IList<TaskMessage>?> FetchNewOrchestrationMessagesAsync(TaskOrchestrationWorkItem workItem);
34+
35+
/// <summary>
36+
/// Ends the session.
37+
/// </summary>
38+
/// <returns>A task that completes when the session has been ended.</returns>
39+
Task EndSessionAsync();
3340
}
3441
}

src/DurableTask.Core/TaskEntityDispatcher.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
193193
if (concurrencyLockAcquired)
194194
{
195195
this.concurrentSessionLock.Release();
196+
await workItem.Session.EndSessionAsync();
196197
}
197198
}
198199
}

src/DurableTask.Core/TaskOrchestrationDispatcher.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
295295
"OnProcessWorkItemSession-Release",
296296
$"Releasing extended session after {processCount} batch(es).");
297297
this.concurrentSessionLock.Release();
298+
await workItem.Session.EndSessionAsync();
298299
}
299300
}
300301
}
@@ -552,10 +553,6 @@ protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
552553
orchestratorMessages.AddRange(subOrchestrationRewindMessages);
553554
workItem.OrchestrationRuntimeState = newRuntimeState;
554555
runtimeState = newRuntimeState;
555-
// Setting this to true here will end an extended session if it is in progress.
556-
// We don't want to save the state across executions, since we essentially manually modify
557-
// the orchestration history here and so that stored by the extended session is incorrect.
558-
isRewinding = true;
559556
break;
560557
default:
561558
throw TraceHelper.TraceExceptionInstance(

0 commit comments

Comments
 (0)