From 16d85a2a6bce8d8ace5c307df87dafba43188941 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sun, 26 Apr 2026 10:36:24 +0200 Subject: [PATCH 1/2] Remove DistributedSemaphores functionality --- .../RFunctionTests/SemaphoreTests.cs | 24 -- .../InMemoryTests/SemaphoreStoreTests.cs | 25 -- .../FunctionTests/SemaphoreTests.cs | 239 ------------------ .../TestTemplates/SemaphoreStoreTests.cs | 118 --------- .../WatchDogsTests/CrashableFunctionStore.cs | 1 - .../Invocation/InvocationHelper.cs | 16 -- .../CoreRuntime/Invocation/Invoker.cs | 5 +- .../CoreRuntime/Invocation/Workflow.cs | 6 +- .../Domain/ControlPanel.cs | 56 ++-- .../Domain/ControlPanelFactory.cs | 5 +- .../Domain/DistributedSemaphore.cs | 75 ------ .../Domain/ExistingSemaphores.cs | 56 ---- .../Domain/SemaphoreIdAndStatus.cs | 3 - .../Domain/SemaphoreStatus.cs | 9 - .../Domain/Synchronization.cs | 16 -- .../Storage/IFunctionStore.cs | 1 - .../Storage/ISemaphoreStore.cs | 11 - .../Storage/InMemoryFunctionStore.cs | 1 - .../Storage/InMemorySemaphoreStore.cs | 56 ---- .../BankTransfer/Locking/Example.cs | 34 --- .../BankTransfer/Locking/TransferFunds.cs | 35 --- .../Subscription/SubscriptionSaga.cs | 47 ---- .../Utils/CrashableFunctionStore.cs | 1 - .../RFunctionTests/SemaphoreTests.cs | 24 -- .../SemaphoreStoreTests.cs | 24 -- .../MariaDbFunctionStore.cs | 6 - .../MariaDbSemaphoreStore.cs | 169 ------------- .../RFunctionTests/SemaphoreTests.cs | 24 -- .../SemaphoreStoreTests.cs | 25 -- .../PostgreSqlFunctionStore.cs | 7 +- .../PostgreSqlSemaphoreStore.cs | 193 -------------- .../SemaphoreStoreTests.cs | 25 -- .../SemaphoreTests.cs | 24 -- .../SqlServerFunctionStore.cs | 5 - .../SqlServerSemaphoreStore.cs | 155 ------------ 35 files changed, 31 insertions(+), 1490 deletions(-) delete mode 100644 Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/SemaphoreTests.cs delete mode 100644 Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/SemaphoreStoreTests.cs delete mode 100644 Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/SemaphoreTests.cs delete mode 100644 Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/SemaphoreStoreTests.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Domain/DistributedSemaphore.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Domain/ExistingSemaphores.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Domain/SemaphoreIdAndStatus.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Domain/SemaphoreStatus.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Domain/Synchronization.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Storage/ISemaphoreStore.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Storage/InMemorySemaphoreStore.cs delete mode 100644 Samples/Sample.ConsoleApp/BankTransfer/Locking/Example.cs delete mode 100644 Samples/Sample.ConsoleApp/BankTransfer/Locking/TransferFunds.cs delete mode 100644 Samples/Sample.ConsoleApp/Subscription/SubscriptionSaga.cs delete mode 100644 Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/SemaphoreTests.cs delete mode 100644 Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/SemaphoreStoreTests.cs delete mode 100644 Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbSemaphoreStore.cs delete mode 100644 Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/SemaphoreTests.cs delete mode 100644 Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/SemaphoreStoreTests.cs delete mode 100644 Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlSemaphoreStore.cs delete mode 100644 Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/SemaphoreStoreTests.cs delete mode 100644 Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/SemaphoreTests.cs delete mode 100644 Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerSemaphoreStore.cs diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/SemaphoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/SemaphoreTests.cs deleted file mode 100644 index 1aad17869..000000000 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/SemaphoreTests.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests.RFunctionTests; - -[TestClass] -public class SemaphoreTests : TestTemplates.FunctionTests.SemaphoreTests -{ - [TestMethod] - public override Task SunshineTest() - => SunshineTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task WaitingFlowIsInterruptedAfterSemaphoreBecomesFree() - => WaitingFlowIsInterruptedAfterSemaphoreBecomesFree(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SemaphoreAllowsTwoFlowsToContinueAtTheSameTime() - => SemaphoreAllowsTwoFlowsToContinueAtTheSameTime(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task ExistingSemaphoreCanBeForceReleased() - => ExistingSemaphoreCanBeForceReleased(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/SemaphoreStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/SemaphoreStoreTests.cs deleted file mode 100644 index 40ea81263..000000000 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/SemaphoreStoreTests.cs +++ /dev/null @@ -1,25 +0,0 @@ -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Helpers; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests; - -[TestClass] -public class SemaphoreStoreTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.SemaphoreStoreTests -{ - [TestMethod] - public override Task SunshineScenarioTest() - => SunshineScenarioTest(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); - - [TestMethod] - public override Task ReleasingSemaphoreTwiceSucceeds() - => ReleasingSemaphoreTwiceSucceeds(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); - - [TestMethod] - public override Task AcquiringTheSameSemaphoreTwiceIsIdempotent() - => AcquiringTheSameSemaphoreTwiceIsIdempotent(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); - - [TestMethod] - public override Task SemaphoreIsAcquiredDespitePositionBeingMoreThanSemaphoreCount() - => SemaphoreIsAcquiredDespitePositionBeingMoreThanSemaphoreCount(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/SemaphoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/SemaphoreTests.cs deleted file mode 100644 index 0ac12fc10..000000000 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/SemaphoreTests.cs +++ /dev/null @@ -1,239 +0,0 @@ -using System; -using System.Linq; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; -using Cleipnir.ResilientFunctions.Domain; -using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands; -using Cleipnir.ResilientFunctions.Helpers; -using Cleipnir.ResilientFunctions.Storage; -using Cleipnir.ResilientFunctions.Tests.Utils; -using Shouldly; - -namespace Cleipnir.ResilientFunctions.Tests.TestTemplates.FunctionTests; - -public abstract class SemaphoreTests -{ - public abstract Task SunshineTest(); - public async Task SunshineTest(Task storeTask) - { - var store = await storeTask; - - using var functionsRegistry = new FunctionsRegistry(store); - var flowId = TestFlowId.Create(); - var (flowType, flowInstance) = flowId; - - var checkSemaphoreFlag = new SyncedFlag(); - var continueFlowFlag = new SyncedFlag(); - - var rAction = functionsRegistry.RegisterAction( - flowType, - async Task(string param, Workflow workflow) => - { - var @lock = await workflow.Synchronization.AcquireLock("SomeGroup", "SomeInstance"); - - checkSemaphoreFlag.Raise(); - await continueFlowFlag.WaitForRaised(); - - await @lock.DisposeAsync(); - }); - - var scheduledFlow = await rAction.Schedule(flowInstance, "hello"); - - await checkSemaphoreFlag.WaitForRaised(); - - var storedId = rAction.MapToStoredId(flowId.Instance); - var queued = await store.SemaphoreStore.GetQueued("SomeGroup", "SomeInstance", count: 10); - queued.Count.ShouldBe(1); - var queuedStoredId = queued.Single(); - queuedStoredId.ShouldBe(storedId); - - var controlPanel = await rAction.ControlPanel(flowInstance).ShouldNotBeNullAsync(); - var existingSemaphores = await controlPanel.Semaphores.GetAll(); - existingSemaphores.Count.ShouldBe(1); - var existingSemaphore = existingSemaphores.Single(); - existingSemaphore.Group.ShouldBe("SomeGroup"); - existingSemaphore.Instance.ShouldBe("SomeInstance"); - - continueFlowFlag.Raise(); - - await scheduledFlow.Completion(); - queued = await store.SemaphoreStore.GetQueued("SomeGroup", "SomeInstance", count: 10); - queued.ShouldBeEmpty(); - } - - public abstract Task WaitingFlowIsInterruptedAfterSemaphoreBecomesFree(); - public async Task WaitingFlowIsInterruptedAfterSemaphoreBecomesFree(Task storeTask) - { - var store = await storeTask; - - using var functionsRegistry = new FunctionsRegistry(store); - var flowId1 = TestFlowId.Create(); - var flowId2 = TestFlowId.Create(); - - var checkSemaphoreFlag = new SyncedFlag(); - var continueFlowFlag = new SyncedFlag(); - - var firstFlow = functionsRegistry.RegisterAction( - flowId1.Type, - async Task(string param, Workflow workflow) => - { - var @lock = await workflow.Synchronization.AcquireLock("SomeGroup", "SomeInstance"); - - checkSemaphoreFlag.Raise(); - await continueFlowFlag.WaitForRaised(); - - await @lock.DisposeAsync(); - }); - - var secondFlow = functionsRegistry.RegisterAction( - flowId2.Type, - async Task(string param, Workflow workflow) => - { - var @lock = await workflow.Synchronization.AcquireLock("SomeGroup", "SomeInstance"); - - checkSemaphoreFlag.Raise(); - await continueFlowFlag.WaitForRaised(); - - await @lock.DisposeAsync(); - }); - - - await firstFlow.Schedule(flowId1.Instance, "hello"); - await checkSemaphoreFlag.WaitForRaised(); - await secondFlow.Schedule(flowId2.Instance, "hello"); - var controlPanelFlow2 = await secondFlow.ControlPanel(flowId2.Instance).ShouldNotBeNullAsync(); - - await BusyWait.Until(async () => - { - await controlPanelFlow2.Refresh(); - return controlPanelFlow2.Status == Status.Suspended; - }); - - continueFlowFlag.Raise(); - - await BusyWait.Until(async () => - { - await controlPanelFlow2.Refresh(); - return controlPanelFlow2.Status == Status.Succeeded; - }); - } - - public abstract Task SemaphoreAllowsTwoFlowsToContinueAtTheSameTime(); - public async Task SemaphoreAllowsTwoFlowsToContinueAtTheSameTime(Task storeTask) - { - var store = await storeTask; - - using var functionsRegistry = new FunctionsRegistry(store); - var flowId1 = TestFlowId.Create(); - var flowId2 = TestFlowId.Create(); - var flowId3 = TestFlowId.Create(); - - var firstFlowAcquiredSemaphore = new SyncedFlag(); - var secondFlowAcquiredSemaphore = new SyncedFlag(); - var continueFlowFlag = new SyncedFlag(); - - var firstFlow = functionsRegistry.RegisterAction( - flowId1.Type, - async Task(string param, Workflow workflow) => - { - var @lock = await workflow.Synchronization.AcquireSemaphore("SomeGroup", "SomeInstance", maximumCount: 2); - - firstFlowAcquiredSemaphore.Raise(); - await continueFlowFlag.WaitForRaised(); - - await @lock.DisposeAsync(); - }); - - var secondFlow = functionsRegistry.RegisterAction( - flowId2.Type, - async Task(string param, Workflow workflow) => - { - var @lock = await workflow.Synchronization.AcquireSemaphore("SomeGroup", "SomeInstance", maximumCount: 2); - - secondFlowAcquiredSemaphore.Raise(); - await continueFlowFlag.WaitForRaised(); - - await @lock.DisposeAsync(); - }); - - var thirdFlow = functionsRegistry.RegisterAction( - flowId3.Type, - async Task(string param, Workflow workflow) => - { - var @lock = await workflow.Synchronization.AcquireSemaphore("SomeGroup", "SomeInstance", maximumCount: 2); - await @lock.DisposeAsync(); - }); - - var firstFlowTask = Task.Run(() => firstFlow.Run(flowId1.Instance, "hello")); - var secondFlowTask = Task.Run(() => secondFlow.Run(flowId2.Instance, "hello")); - - try - { - await firstFlowAcquiredSemaphore.WaitForRaised(); - await secondFlowAcquiredSemaphore.WaitForRaised(); - } - catch (Exception) - { - if (firstFlowTask.IsFaulted) - await firstFlowTask; - if (secondFlowTask.IsFaulted) - await secondFlowTask; - - throw; - } - - var scheduled3 = await thirdFlow.Schedule(flowId3.Instance, "hello"); - - var thirdFlowControlPanel = await thirdFlow.ControlPanel(flowId3.Instance).ShouldNotBeNullAsync(); - await thirdFlowControlPanel.BusyWaitUntil(c => c.Status == Status.Suspended); - - var storedId1 = firstFlow.MapToStoredId(flowId1.Instance); - var storedId2 = secondFlow.MapToStoredId(flowId2.Instance); - var storedId3 = thirdFlow.MapToStoredId(flowId3.Instance); - var queued = await store.SemaphoreStore.GetQueued("SomeGroup", "SomeInstance", count: 10); - queued.Count.ShouldBe(3); - queued.Any(s => s == storedId1).ShouldBeTrue(); - queued.Any(s => s == storedId2).ShouldBeTrue(); - queued.Any(s => s == storedId3).ShouldBeTrue(); - - continueFlowFlag.Raise(); - - await scheduled3.Completion(); - - queued = await store.SemaphoreStore.GetQueued("SomeGroup", "SomeInstance", count: 10); - queued.Count.ShouldBe(0); - } - - public abstract Task ExistingSemaphoreCanBeForceReleased(); - public async Task ExistingSemaphoreCanBeForceReleased(Task storeTask) - { - var store = await storeTask; - - using var functionsRegistry = new FunctionsRegistry(store); - var flowId = TestFlowId.Create(); - var (flowType, flowInstance) = flowId; - - var rAction = functionsRegistry.RegisterAction( - flowType, - async Task(string param, Workflow workflow) => - { - await workflow.Synchronization.AcquireLock("SomeGroup", "SomeInstance"); - throw new SuspendInvocationException(); - }); - - var scheduledFlow = await rAction.Schedule(flowInstance, "hello"); - var controlPanel = await rAction.ControlPanel(flowInstance).ShouldNotBeNullAsync(); - await controlPanel.BusyWaitUntil(c => c.Status == Status.Suspended); - - var existingSemaphores = await controlPanel.Semaphores.GetAll(); - existingSemaphores.Count.ShouldBe(1); - var existingSemaphore = existingSemaphores.Single(); - existingSemaphore.Group.ShouldBe("SomeGroup"); - existingSemaphore.Instance.ShouldBe("SomeInstance"); - - await controlPanel.Semaphores.ForceRelease(existingSemaphore, maximumCount: 1); - - var queued = await store.SemaphoreStore.GetQueued("SomeGroup", "SomeInstance", count: 10); - queued.ShouldBeEmpty(); - } -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/SemaphoreStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/SemaphoreStoreTests.cs deleted file mode 100644 index e948cd6a6..000000000 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/SemaphoreStoreTests.cs +++ /dev/null @@ -1,118 +0,0 @@ -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Storage; -using Cleipnir.ResilientFunctions.Tests.Utils; -using Shouldly; - -namespace Cleipnir.ResilientFunctions.Tests.TestTemplates; - -public abstract class SemaphoreStoreTests -{ - public abstract Task SunshineScenarioTest(); - protected async Task SunshineScenarioTest(Task storeTask) - { - var store = await storeTask; - var id1 = TestStoredId.Create(); - var id2 = TestStoredId.Create(); - var id3 = TestStoredId.Create(); - var id4 = TestStoredId.Create(); - - await store - .Acquire("group", "instance", id1, maximumCount: 2) - .ShouldBeTrueAsync(); - - var takenIds = await store.GetQueued("group", "instance", count: 2); - takenIds.Count.ShouldBe(1); - takenIds[0].ShouldBe(id1); - - await store - .Acquire("group", "instance", id2, maximumCount: 2) - .ShouldBeTrueAsync(); - takenIds = await store.GetQueued("group", "instance", count: 2); - takenIds.Count.ShouldBe(2); - takenIds[0].ShouldBe(id1); - takenIds[1].ShouldBe(id2); - - await store - .Acquire("group", "instance", id3, maximumCount: 2) - .ShouldBeFalseAsync(); - takenIds = await store.GetQueued("group", "instance", count: 2); - takenIds.Count.ShouldBe(2); - takenIds[0].ShouldBe(id1); - takenIds[1].ShouldBe(id2); - - await store - .Acquire("group", "instance", id4, maximumCount: 2) - .ShouldBeFalseAsync(); - takenIds = await store.GetQueued("group", "instance", count: 2); - takenIds.Count.ShouldBe(2); - takenIds[0].ShouldBe(id1); - takenIds[1].ShouldBe(id2); - - takenIds = await store.Release("group", "instance", id2, maximumCount: 2); - takenIds.Count.ShouldBe(2); - takenIds[0].ShouldBe(id1); - takenIds[1].ShouldBe(id3); - - takenIds = await store.Release("group", "instance", id1, maximumCount: 2); - takenIds.Count.ShouldBe(2); - takenIds[0].ShouldBe(id3); - takenIds[1].ShouldBe(id4); - - takenIds = await store.Release("group", "instance", id3, maximumCount: 2); - takenIds.Count.ShouldBe(1); - takenIds[0].ShouldBe(id4); - - takenIds = await store.Release("group", "instance", id4, maximumCount: 2); - takenIds.Count.ShouldBe(0); - } - - public abstract Task ReleasingSemaphoreTwiceSucceeds(); - protected async Task ReleasingSemaphoreTwiceSucceeds(Task storeTask) - { - var store = await storeTask; - var id1 = TestStoredId.Create(); - - await store.Acquire("group", "instance", id1, maximumCount: 2); - - var takenIds = await store.Release("group", "instanc", id1, maximumCount: 2); - takenIds.Count.ShouldBe(0); - - takenIds = await store.Release("group", "instanc", id1, maximumCount: 2); - takenIds.Count.ShouldBe(0); - } - - public abstract Task AcquiringTheSameSemaphoreTwiceIsIdempotent(); - protected async Task AcquiringTheSameSemaphoreTwiceIsIdempotent(Task storeTask) - { - var store = await storeTask; - var id1 = TestStoredId.Create(); - - await store.Acquire("group", "instance", id1, maximumCount: 2).ShouldBeTrueAsync(); - await store.Acquire("group", "instance", id1, maximumCount: 2).ShouldBeTrueAsync(); - - var takenIds = await store.GetQueued("group", "instance", count: 2); - takenIds.Count.ShouldBe(1); - takenIds[0].ShouldBe(id1); - - takenIds = await store.Release("group", "instanc", id1, maximumCount: 2); - takenIds.Count.ShouldBe(0); - } - - public abstract Task SemaphoreIsAcquiredDespitePositionBeingMoreThanSemaphoreCount(); - protected async Task SemaphoreIsAcquiredDespitePositionBeingMoreThanSemaphoreCount(Task storeTask) - { - var store = await storeTask; - var id1 = TestStoredId.Create(); - var id2 = TestStoredId.Create(); - var id3 = TestStoredId.Create(); - - - await store.Acquire("group", "instance", id1, maximumCount: 2).ShouldBeTrueAsync(); - await store.Acquire("group", "instance", id2, maximumCount: 2).ShouldBeTrueAsync(); - - await store.Release("group", "instance", id1, maximumCount: 2); - await store.Acquire("group", "instance", id3, maximumCount: 2).ShouldBeTrueAsync(); - - - } -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs index 8bb71cd5e..c99e72ba8 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs @@ -23,7 +23,6 @@ public class CrashableFunctionStore : IFunctionStore public IEffectsStore EffectsStore => _crashableEffectStore; public ICorrelationStore CorrelationStore => _crashed ? throw new TimeoutException() : _inner.CorrelationStore; public Utilities Utilities => _crashed ? throw new TimeoutException() : _inner.Utilities; - public ISemaphoreStore SemaphoreStore => _crashed ? throw new TimeoutException() : _inner.SemaphoreStore; public IReplicaStore ReplicaStore => _crashed ? throw new TimeoutException() : _inner.ReplicaStore; public CrashableFunctionStore(IFunctionStore inner) diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs index ef0a08026..3398c3ff5 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs @@ -321,14 +321,6 @@ public async Task SaveControlPanelChanges( public async Task Delete(StoredId storedId) => await _functionStore.DeleteFunction(storedId); - public async Task Interrupt(IReadOnlyList storedIds) - { - if (storedIds.Count == 0) - return; - - await _functionStore.Interrupt(storedIds); - } - public async Task?> GetFunction(StoredId storedId, FlowId flowId) { var sf = await _functionStore.GetFunction(storedId); @@ -424,14 +416,6 @@ public async Task CreateExistingEffects(FlowId flowId) return new ExistingEffects(storedId, flowId, _functionStore.EffectsStore, Serializer, storedEffects); } public ExistingMessages CreateExistingMessages(FlowId flowId) => new(MapToStoredId(flowId), _functionStore.MessageStore, Serializer); - public async Task CreateExistingSemaphores(FlowId flowId) - { - var existingEffects = await CreateExistingEffects(flowId); - return new ExistingSemaphores(MapToStoredId(flowId), _functionStore, existingEffects); - } - - public DistributedSemaphores CreateSemaphores(StoredId storedId, Effect effect, FlowsManager flowsManager) - => new(effect, _functionStore.SemaphoreStore, storedId, Interrupt, flowsManager); public QueueManager CreateQueueManager(FlowId flowId, StoredId storedId, Effect effect, FlowState flowState, FlowTimeouts timeouts, UnhandledExceptionHandler unhandledExceptionHandler) => new(flowId, storedId, _functionStore.MessageStore, Serializer, effect, flowState, unhandledExceptionHandler, timeouts, UtcNow, _settings); diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs index 10dc56060..896d9c5ef 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs @@ -284,13 +284,12 @@ await _invocationHelper.PersistFunctionInStore( ); var correlations = _invocationHelper.CreateCorrelations(flowId); - var semaphores = _invocationHelper.CreateSemaphores(storedId, effect, _flowsManager); var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts); var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); var messageWriter = _invocationHelper.CreateMessageWriter(storedId); - var workflow = new Workflow(flowId, storedId, effect, _utilities, correlations, semaphores, queueManager, _invocationHelper.UtcNow, messageWriter, _flowsManager); + var workflow = new Workflow(flowId, storedId, effect, _utilities, correlations, queueManager, _invocationHelper.UtcNow, messageWriter, _flowsManager); return new PreparedInvocation( persisted, @@ -338,7 +337,6 @@ private async Task PrepareForReInvocation(StoredId storedI var effect = _invocationHelper.CreateEffect(storedId, flowId, effects, flowTimeouts, storageSession, _flowsManager); var correlations = _invocationHelper.CreateCorrelations(flowId); - var semaphores = _invocationHelper.CreateSemaphores(storedId, effect, _flowsManager); var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts); var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); @@ -350,7 +348,6 @@ private async Task PrepareForReInvocation(StoredId storedI effect, _utilities, correlations, - semaphores, queueManager, _invocationHelper.UtcNow, messageWriter, diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs index 18b0c5446..6f87f7662 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs @@ -15,22 +15,20 @@ public class Workflow public Effect Effect { get; } public Utilities Utilities { get; } public Correlations Correlations { get; } - public Synchronization Synchronization { get; } - + private QueueManager _queueManager; private FlowsManager _flowsManager; private readonly UtcNow _utcNow; private MessageWriter MessageWriter { get; } - public Workflow(FlowId flowId, StoredId storedId, Effect effect, Utilities utilities, Correlations correlations, DistributedSemaphores semaphores, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter, FlowsManager flowsManager) + public Workflow(FlowId flowId, StoredId storedId, Effect effect, Utilities utilities, Correlations correlations, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter, FlowsManager flowsManager) { FlowId = flowId; StoredId = storedId; Utilities = utilities; Effect = effect; Correlations = correlations; - Synchronization = new Synchronization(semaphores); _queueManager = queueManager; _utcNow = utcNow; MessageWriter = messageWriter; diff --git a/Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs b/Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs index 1637cfe0a..d77d58abf 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs @@ -12,21 +12,21 @@ namespace Cleipnir.ResilientFunctions.Domain; public class ControlPanel : BaseControlPanel { internal ControlPanel( - Invoker invoker, - InvocationHelper invocationHelper, + Invoker invoker, + InvocationHelper invocationHelper, FlowId flowId, StoredId storedId, ReplicaId? ownerReplica, - Status status, long expires, + Status status, long expires, ExistingEffects effects, - ExistingMessages messages, ExistingSemaphores semaphores, + ExistingMessages messages, Correlations correlations, FatalWorkflowException? fatalWorkflowException, UtcNow utcNow ) : base( - invoker, invocationHelper, + invoker, invocationHelper, flowId, storedId, ownerReplica, status, expires, innerParam: Unit.Instance, innerResult: Unit.Instance, effects, - messages, semaphores, correlations, fatalWorkflowException, + messages, correlations, fatalWorkflowException, utcNow ) { } @@ -56,21 +56,21 @@ public async Task BusyWaitUntil(Func predicate, TimeSpan? ma public class ControlPanel : BaseControlPanel where TParam : notnull { internal ControlPanel( - Invoker invoker, - InvocationHelper invocationHelper, + Invoker invoker, + InvocationHelper invocationHelper, FlowId flowId, StoredId storedId, ReplicaId? ownerReplica, - Status status, long expires, TParam innerParam, + Status status, long expires, TParam innerParam, ExistingEffects effects, - ExistingMessages messages, ExistingSemaphores semaphores, - Correlations correlations, + ExistingMessages messages, + Correlations correlations, FatalWorkflowException? fatalWorkflowException, UtcNow utcNow ) : base( - invoker, invocationHelper, + invoker, invocationHelper, flowId, storedId, ownerReplica, status, expires, innerParam, innerResult: Unit.Instance, effects, - messages, semaphores, correlations, fatalWorkflowException, + messages, correlations, fatalWorkflowException, utcNow ) { } @@ -106,19 +106,19 @@ public async Task BusyWaitUntil(Func, bool> predicate, Time public class ControlPanel : BaseControlPanel where TParam : notnull { internal ControlPanel( - Invoker invoker, - InvocationHelper invocationHelper, + Invoker invoker, + InvocationHelper invocationHelper, FlowId flowId, StoredId storedId, ReplicaId? ownerReplica, Status status, - long expires, TParam innerParam, - TReturn? innerResult, - ExistingEffects effects, ExistingMessages messages, ExistingSemaphores semaphores, + long expires, TParam innerParam, + TReturn? innerResult, + ExistingEffects effects, ExistingMessages messages, Correlations correlations, FatalWorkflowException? fatalWorkflowException, UtcNow utcNow ) : base( invoker, invocationHelper, flowId, storedId, ownerReplica, status, expires, innerParam, innerResult, effects, messages, - semaphores, correlations, fatalWorkflowException, + correlations, fatalWorkflowException, utcNow ) { } @@ -159,18 +159,17 @@ public abstract class BaseControlPanel private bool _innerParamChanged; internal BaseControlPanel( - Invoker invoker, + Invoker invoker, InvocationHelper invocationHelper, - FlowId flowId, + FlowId flowId, StoredId storedId, ReplicaId? ownerReplica, - Status status, + Status status, long expires, - TParam innerParam, + TParam innerParam, TReturn? innerResult, ExistingEffects effects, ExistingMessages messages, - ExistingSemaphores semaphores, Correlations correlations, FatalWorkflowException? fatalWorkflowException, UtcNow utcNow) @@ -183,13 +182,12 @@ internal BaseControlPanel( Status = status; _innerParam = innerParam; InnerResult = innerResult; - PostponedUntil = Status == Status.Postponed ? + PostponedUntil = Status == Status.Postponed ? (expires == long.MaxValue - ? DateTime.MaxValue + ? DateTime.MaxValue : new DateTime(expires, DateTimeKind.Utc)) : null; Effects = effects; Messages = messages; - Semaphores = semaphores; Correlations = correlations; FatalWorkflowException = fatalWorkflowException; UtcNow = utcNow; @@ -204,8 +202,7 @@ internal BaseControlPanel( public ExistingMessages Messages { get; private set; } public ExistingEffects Effects { get; private set; } - - public ExistingSemaphores Semaphores { get; private set; } + public Correlations Correlations { get; private set; } private TParam _innerParam; @@ -344,7 +341,6 @@ public async Task Refresh() Effects = await _invocationHelper.CreateExistingEffects(FlowId); Messages = _invocationHelper.CreateExistingMessages(FlowId); Correlations = _invocationHelper.CreateCorrelations(FlowId); - Semaphores = await _invocationHelper.CreateExistingSemaphores(FlowId); _innerParamChanged = false; } diff --git a/Core/Cleipnir.ResilientFunctions/Domain/ControlPanelFactory.cs b/Core/Cleipnir.ResilientFunctions/Domain/ControlPanelFactory.cs index 9a1e6092e..7b55219a4 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/ControlPanelFactory.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/ControlPanelFactory.cs @@ -42,7 +42,6 @@ internal ControlPanelFactory(FlowType flowType, StoredType storedType, Invoker where TParam : notnull +public class ControlPanelFactory where TParam : notnull { private readonly FlowType _flowType; private readonly StoredType _storedType; @@ -88,7 +87,6 @@ internal ControlPanelFactory(FlowType flowType, StoredType storedType, Invoker, Task> interrupt, FlowsManager flowsManager) -{ - public DistributedSemaphore Create(string group, string instance, int maximumCount) - => new(maximumCount, group, instance, effect, semaphoreStore, storedId, interrupt, flowsManager); - - public DistributedSemaphore CreateLock(string group, string instance) - => new(maximumCount: 1, group, instance, effect, semaphoreStore, storedId, interrupt, flowsManager); -} - -public class DistributedSemaphore(int maximumCount, string group, string instance, Effect effect, ISemaphoreStore store, StoredId storedId, Func, Task> interrupt, FlowsManager flowsManager) -{ - private EffectId? _effectId; - public async Task Acquire(TimeSpan? maxWait = null) - { - maxWait ??= TimeSpan.Zero; - if (maxWait < TimeSpan.Zero) - throw new ArgumentOutOfRangeException(nameof(maxWait), maxWait, "MaxWait must be non negative"); - - _effectId = effect.CreateNextImplicitId(); - var statusIdAndStatus = effect.TryGet(_effectId, out var storedStatus) - ? storedStatus! - : new SemaphoreIdAndStatus(group, instance, SemaphoreStatus.Created); - var status = statusIdAndStatus.Status; - - var gotLock = true; - if (status is SemaphoreStatus.Created or SemaphoreStatus.Waiting) - gotLock = await store.Acquire(group, instance, storedId, maximumCount); - else if (status == SemaphoreStatus.Released) - return new Lock(() => Task.CompletedTask); - - if (!gotLock && maxWait > TimeSpan.Zero) - { - var stopWatch = Stopwatch.StartNew(); - while (!gotLock && stopWatch.Elapsed < maxWait) - { - await Task.Delay(250); - var lockQueue = await store.GetQueued(group, instance, maximumCount); - gotLock = lockQueue.Any(id => id == storedId); - } - } - - if (!gotLock) - { - await effect.Upsert(_effectId, statusIdAndStatus with { Status = SemaphoreStatus.Waiting }, alias: null, flush: true); - await flowsManager.Suspend(storedId); - } - - await effect.Upsert(_effectId, statusIdAndStatus with { Status = SemaphoreStatus.Acquired }, alias: null, flush: true); - return new Lock(Release); - } - - private async Task Release() - { - var lockQueue = await store.Release(group, instance, storedId, maximumCount); - await interrupt(lockQueue); - - await effect.Upsert(_effectId!, new SemaphoreIdAndStatus(group, instance, SemaphoreStatus.Released), alias: null, flush: true); - } - - public class Lock(Func releaseFunc) : IAsyncDisposable - { - public ValueTask DisposeAsync() => new(releaseFunc()); - public Task Release() => DisposeAsync().AsTask(); - } -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Domain/ExistingSemaphores.cs b/Core/Cleipnir.ResilientFunctions/Domain/ExistingSemaphores.cs deleted file mode 100644 index c579c895f..000000000 --- a/Core/Cleipnir.ResilientFunctions/Domain/ExistingSemaphores.cs +++ /dev/null @@ -1,56 +0,0 @@ -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Storage; - -namespace Cleipnir.ResilientFunctions.Domain; - -public record ExistingSemaphore(string Group, string Instance); - -public class ExistingSemaphores(StoredId storedId, IFunctionStore store, ExistingEffects effect) -{ - public async Task> GetAll() - => (await GetExistingSemaphoreAndEffectIds()).Select(e => e.ExistingSemaphore).ToList(); - - public async Task ForceRelease(ExistingSemaphore existingSemaphore, int maximumCount) - { - var effectId = (await GetExistingSemaphoreAndEffectIds()) - .FirstOrDefault(e => e.ExistingSemaphore == existingSemaphore) - ?.EffectId; - - if (effectId != null) - await effect.Remove(effectId.Value); - - var enqueued = await store.SemaphoreStore.Release(existingSemaphore.Group, existingSemaphore.Instance, storedId, maximumCount); - if (enqueued.Any()) - await store.Interrupt(enqueued); - } - - private record ExistingSemaphoreAndEffectId(ExistingSemaphore ExistingSemaphore, int EffectId); - private async Task> GetExistingSemaphoreAndEffectIds() - { - var effectIds = await effect.AllIds; - - var existingSemaphoreAndEffectIds = new List(); - foreach (var effectId in effectIds) - { - try - { - var semaphoreData = await effect.GetValue(effectId); - if (semaphoreData != null) - { - var (group, instance, _) = semaphoreData; - var existingSemaphore = new ExistingSemaphore(group, instance); - var existingSemaphoreAndEffectId = new ExistingSemaphoreAndEffectId(existingSemaphore, effectId.Id); - existingSemaphoreAndEffectIds.Add(existingSemaphoreAndEffectId); - } - } - catch - { - // Effect is not a semaphore, skip it - } - } - - return existingSemaphoreAndEffectIds; - } -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Domain/SemaphoreIdAndStatus.cs b/Core/Cleipnir.ResilientFunctions/Domain/SemaphoreIdAndStatus.cs deleted file mode 100644 index 73d082178..000000000 --- a/Core/Cleipnir.ResilientFunctions/Domain/SemaphoreIdAndStatus.cs +++ /dev/null @@ -1,3 +0,0 @@ -namespace Cleipnir.ResilientFunctions.Domain; - -public record SemaphoreIdAndStatus(string Group, string Instance, SemaphoreStatus Status); \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Domain/SemaphoreStatus.cs b/Core/Cleipnir.ResilientFunctions/Domain/SemaphoreStatus.cs deleted file mode 100644 index 6e3a5f517..000000000 --- a/Core/Cleipnir.ResilientFunctions/Domain/SemaphoreStatus.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace Cleipnir.ResilientFunctions.Domain; - -public enum SemaphoreStatus -{ - Created = 0, - Waiting = 1, - Acquired = 2, - Released = 3 -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Domain/Synchronization.cs b/Core/Cleipnir.ResilientFunctions/Domain/Synchronization.cs deleted file mode 100644 index aa4c65c00..000000000 --- a/Core/Cleipnir.ResilientFunctions/Domain/Synchronization.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System; -using System.Threading.Tasks; - -namespace Cleipnir.ResilientFunctions.Domain; - -public class Synchronization(DistributedSemaphores semaphores) -{ - public Task AcquireLock(string group, string instance, TimeSpan? maxWait = null) - => AcquireSemaphore(group, instance, maximumCount: 1, maxWait); - - public Task AcquireSemaphore(string group, string instance, int maximumCount, TimeSpan? maxWait = null) - { - var distributedSemaphore = semaphores.Create(group, instance, maximumCount); - return distributedSemaphore.Acquire(maxWait); - } -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs index f4204dd44..4c1f59bc5 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs @@ -14,7 +14,6 @@ public interface IFunctionStore public IEffectsStore EffectsStore { get; } public ICorrelationStore CorrelationStore { get; } public Utilities Utilities { get; } - public ISemaphoreStore SemaphoreStore { get; } public IReplicaStore ReplicaStore { get; } public Task Initialize(); diff --git a/Core/Cleipnir.ResilientFunctions/Storage/ISemaphoreStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/ISemaphoreStore.cs deleted file mode 100644 index 21dd305c8..000000000 --- a/Core/Cleipnir.ResilientFunctions/Storage/ISemaphoreStore.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System.Collections.Generic; -using System.Threading.Tasks; - -namespace Cleipnir.ResilientFunctions.Storage; - -public interface ISemaphoreStore -{ - Task Acquire(string group, string instance, StoredId storedId, int maximumCount); - Task> Release(string group, string instance, StoredId storedId, int maximumCount); - Task> GetQueued(string group, string instance, int count); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs index 7996a71d7..9f2dd5f36 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs @@ -26,7 +26,6 @@ public class InMemoryFunctionStore : IFunctionStore, IMessageStore private readonly InMemoryCorrelationStore _correlationStore = new(); public ICorrelationStore CorrelationStore => _correlationStore; public Utilities Utilities { get; } - public ISemaphoreStore SemaphoreStore { get; } = new InMemorySemaphoreStore(); public IReplicaStore ReplicaStore { get; } = new InMemoryReplicaStore(); public Task Initialize() => Task.CompletedTask; diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemorySemaphoreStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemorySemaphoreStore.cs deleted file mode 100644 index 306e60b12..000000000 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemorySemaphoreStore.cs +++ /dev/null @@ -1,56 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Helpers; - -namespace Cleipnir.ResilientFunctions.Storage; - -public class InMemorySemaphoreStore : ISemaphoreStore -{ - private readonly Dictionary, List> _semaphores = new(); - private readonly Lock _lock = new(); - - public Task Acquire(string group, string instance, StoredId storedId, int maximumCount) - { - lock (_lock) - { - var key = Tuple.Create(group, instance); - if (!_semaphores.ContainsKey(key)) - _semaphores[key] = new(); - - var queue = _semaphores[key]; - for (var i = 0; i < queue.Count; i++) - if (queue[i] == storedId) - return (i < maximumCount).ToTask(); - - queue.Add(storedId); - return (queue.Count <= maximumCount).ToTask(); - } - } - - public Task> Release(string group, string instance, StoredId storedId, int maximumCount) - { - lock (_lock) - { - var key = Tuple.Create(group, instance); - if (!_semaphores.ContainsKey(key)) - _semaphores[key] = new(); - - _semaphores[key] = _semaphores[key].Where(id => id != storedId).ToList(); - - return _semaphores[key].Take(maximumCount).ToList().CastTo>().ToTask(); - } - } - - public Task> GetQueued(string group, string instance, int count) - { - lock (_lock) - return _semaphores[Tuple.Create(group, instance)] - .Take(count) - .ToList() - .CastTo>() - .ToTask(); - } -} \ No newline at end of file diff --git a/Samples/Sample.ConsoleApp/BankTransfer/Locking/Example.cs b/Samples/Sample.ConsoleApp/BankTransfer/Locking/Example.cs deleted file mode 100644 index 6302ce539..000000000 --- a/Samples/Sample.ConsoleApp/BankTransfer/Locking/Example.cs +++ /dev/null @@ -1,34 +0,0 @@ -using System; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions; -using Cleipnir.ResilientFunctions.Storage; -using ConsoleApp.BankTransfer.Versioning; - -namespace ConsoleApp.BankTransfer.Locking; - -public static class Example -{ - public static async Task Perform() - { - var functionsRegistry = new FunctionsRegistry(new InMemoryFunctionStore()); - - var actionRegistration = functionsRegistry - .RegisterAction( - flowType: nameof(TransferFunds), - TransferFunds.Perform - ); - - var transfer = new Transfer( - TransferId: Guid.NewGuid(), - FromAccount: "FAccount", - FromAccountTransactionId: Guid.NewGuid(), - ToAccount: "TAccount", - ToAccountTransactionId: Guid.NewGuid(), - Amount: 100 - ); - await actionRegistration.Run( - transfer.TransferId.ToString(), - transfer - ); - } -} \ No newline at end of file diff --git a/Samples/Sample.ConsoleApp/BankTransfer/Locking/TransferFunds.cs b/Samples/Sample.ConsoleApp/BankTransfer/Locking/TransferFunds.cs deleted file mode 100644 index d20b2b6f7..000000000 --- a/Samples/Sample.ConsoleApp/BankTransfer/Locking/TransferFunds.cs +++ /dev/null @@ -1,35 +0,0 @@ -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; -using ConsoleApp.BankTransfer.Versioning; - -namespace ConsoleApp.BankTransfer.Locking; - -public static class TransferFunds -{ - private static IBankCentralClient BankCentralClient { get; } = new BankCentralClient(); - - public static async Task Perform(Transfer transfer, Workflow workflow) - { - await using var fromAccountLock = await workflow.Synchronization.AcquireLock("Account", transfer.FromAccount); - await using var toAccountLock = await workflow.Synchronization.AcquireLock("Account", transfer.ToAccount); - - var deductTask = workflow.Effect.Capture( - () => BankCentralClient - .PostTransaction( - transfer.FromAccountTransactionId, - transfer.FromAccount, - -transfer.Amount - ) - ); - - var addTask = workflow.Effect.Capture( - () => BankCentralClient.PostTransaction( - transfer.ToAccountTransactionId, - transfer.ToAccount, - transfer.Amount - ) - ); - - await Task.WhenAll(deductTask, addTask); - } -} \ No newline at end of file diff --git a/Samples/Sample.ConsoleApp/Subscription/SubscriptionSaga.cs b/Samples/Sample.ConsoleApp/Subscription/SubscriptionSaga.cs deleted file mode 100644 index c871c0c88..000000000 --- a/Samples/Sample.ConsoleApp/Subscription/SubscriptionSaga.cs +++ /dev/null @@ -1,47 +0,0 @@ -using System; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; -using Cleipnir.ResilientFunctions.Domain; -using Cleipnir.ResilientFunctions.Helpers; - -namespace ConsoleApp.Subscription; - -public class SubscriptionSaga -{ - public async Task> UpdateSubscription(SubscriptionChange subscriptionChange, Workflow workflow) - { - await using var monitor = await workflow.Synchronization.AcquireLock(nameof(UpdateSubscription), subscriptionChange.SubscriptionId); - var (subscriptionId, startSubscription) = subscriptionChange; - - if (startSubscription) - await StartSubscription(subscriptionId); - else - await StopSubscription(subscriptionId); - - await StoreSubscriptionStatusLocally(startSubscription); - - return Succeed.WithUnit; - } - - private async Task StartSubscription(string subscriptionId) - { - Console.WriteLine("Starting subscription"); - await Task.Delay(1_000); - Console.WriteLine("Start completed"); - } - - private async Task StopSubscription(string subscriptionId) - { - Console.WriteLine("Stopping subscription"); - await Task.Delay(1_000); - Console.WriteLine("Stop completed"); - } - - private Task StoreSubscriptionStatusLocally(bool subscriptionStatus) - { - Console.WriteLine($"Storing '{subscriptionStatus}' for subscription"); - return Task.CompletedTask; - } - - public record SubscriptionChange(string SubscriptionId, bool StartSubscription); -} \ No newline at end of file diff --git a/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs b/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs index 355ba323c..49508c074 100644 --- a/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs +++ b/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs @@ -19,7 +19,6 @@ public class CrashableFunctionStore : IFunctionStore public IEffectsStore EffectsStore => _inner.EffectsStore; public ICorrelationStore CorrelationStore => _inner.CorrelationStore; public Utilities Utilities => _inner.Utilities; - public ISemaphoreStore SemaphoreStore => _inner.SemaphoreStore; public IReplicaStore ReplicaStore => _inner.ReplicaStore; public CrashableFunctionStore(IFunctionStore inner) => _inner = inner; diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/SemaphoreTests.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/SemaphoreTests.cs deleted file mode 100644 index 4ff54fb5b..000000000 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/SemaphoreTests.cs +++ /dev/null @@ -1,24 +0,0 @@ -using Cleipnir.ResilientFunctions.MariaDb.Tests; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.MariaDB.Tests.RFunctionTests; - -[TestClass] -public class SemaphoreTests : ResilientFunctions.Tests.TestTemplates.FunctionTests.SemaphoreTests -{ - [TestMethod] - public override Task SunshineTest() - => SunshineTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task WaitingFlowIsInterruptedAfterSemaphoreBecomesFree() - => WaitingFlowIsInterruptedAfterSemaphoreBecomesFree(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task ExistingSemaphoreCanBeForceReleased() - => ExistingSemaphoreCanBeForceReleased(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SemaphoreAllowsTwoFlowsToContinueAtTheSameTime() - => SemaphoreAllowsTwoFlowsToContinueAtTheSameTime(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/SemaphoreStoreTests.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/SemaphoreStoreTests.cs deleted file mode 100644 index 3816ef10d..000000000 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/SemaphoreStoreTests.cs +++ /dev/null @@ -1,24 +0,0 @@ -using Cleipnir.ResilientFunctions.Helpers; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.MariaDb.Tests; - -[TestClass] -public class SemaphoreStoreTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.SemaphoreStoreTests -{ - [TestMethod] - public override Task SunshineScenarioTest() - => SunshineScenarioTest(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); - - [TestMethod] - public override Task ReleasingSemaphoreTwiceSucceeds() - => ReleasingSemaphoreTwiceSucceeds(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); - - [TestMethod] - public override Task AcquiringTheSameSemaphoreTwiceIsIdempotent() - => AcquiringTheSameSemaphoreTwiceIsIdempotent(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); - - [TestMethod] - public override Task SemaphoreIsAcquiredDespitePositionBeingMoreThanSemaphoreCount() - => SemaphoreIsAcquiredDespitePositionBeingMoreThanSemaphoreCount(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); -} \ No newline at end of file diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs index 5da491829..a2f48d3f2 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs @@ -29,9 +29,6 @@ public class MariaDbFunctionStore : IFunctionStore private readonly MariaDbCorrelationStore _correlationStore; public ICorrelationStore CorrelationStore => _correlationStore; - - private readonly MariaDbSemaphoreStore _semaphoreStore; - public ISemaphoreStore SemaphoreStore => _semaphoreStore; private readonly MariaDbReplicaStore _replicaStore; public IReplicaStore ReplicaStore => _replicaStore; @@ -52,7 +49,6 @@ public MariaDbFunctionStore(string connectionString, string tablePrefix = "") _messageStore = new MariaDbMessageStore(connectionString, _sqlGenerator, tablePrefix); _effectsStore = new MariaDbEffectsStore(connectionString, tablePrefix); _correlationStore = new MariaDbCorrelationStore(connectionString, tablePrefix); - _semaphoreStore = new MariaDbSemaphoreStore(connectionString, tablePrefix); _mariaDbUnderlyingRegister = new MariaDbUnderlyingRegister(connectionString, tablePrefix); _typeStore = new MariaDbTypeStore(connectionString, tablePrefix); _replicaStore = new MariaDbReplicaStore(connectionString, tablePrefix); @@ -70,7 +66,6 @@ public async Task Initialize() await MessageStore.Initialize(); await EffectsStore.Initialize(); await CorrelationStore.Initialize(); - await _semaphoreStore.Initialize(); await _typeStore.Initialize(); await _replicaStore.Initialize(); await using var conn = await CreateOpenConnection(_connectionString); @@ -103,7 +98,6 @@ public async Task TruncateTables() await _mariaDbUnderlyingRegister.TruncateTable(); await _effectsStore.Truncate(); await _correlationStore.Truncate(); - await _semaphoreStore.Truncate(); await _typeStore.Truncate(); await _replicaStore.Truncate(); diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbSemaphoreStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbSemaphoreStore.cs deleted file mode 100644 index 4b05fda23..000000000 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbSemaphoreStore.cs +++ /dev/null @@ -1,169 +0,0 @@ -using Cleipnir.ResilientFunctions.Storage; -using MySqlConnector; - -namespace Cleipnir.ResilientFunctions.MariaDb; - -public class MariaDbSemaphoreStore(string connectionString, string tablePrefix = "") : ISemaphoreStore -{ - private string? _initializeSql; - public async Task Initialize() - { - await using var conn = await CreateConnection(); - _initializeSql ??= @$" - CREATE TABLE IF NOT EXISTS {tablePrefix}_semaphores ( - type VARCHAR(150) NOT NULL, - instance VARCHAR(150) NOT NULL, - position INT NOT NULL, - owner TEXT NOT NULL, - PRIMARY KEY (type, instance, position) - );"; - - var command = new MySqlCommand(_initializeSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _truncateSql; - public async Task Truncate() - { - await using var conn = await CreateConnection(); - _truncateSql ??= $"TRUNCATE TABLE {tablePrefix}_semaphores;"; - var command = new MySqlCommand(_truncateSql, conn); - await command.ExecuteNonQueryAsync(); - } - - public async Task Acquire(string group, string instance, StoredId storedId, int maximumCount) - => await Acquire(group, instance, storedId, maximumCount, depth: 0); - - private string? _takeSql; - private async Task Acquire(string group, string instance, StoredId storedId, int maximumCount, int depth) - { - _takeSql ??= @$" - IF (NOT EXISTS (SELECT 1 FROM {tablePrefix}_semaphores WHERE type = ? AND instance = ? AND owner = ?)) - THEN - INSERT INTO {tablePrefix}_semaphores - (type, instance, position, owner) - SELECT ?, ?, (COALESCE(MAX(position), -1) + 1), ? - FROM {tablePrefix}_semaphores; - END IF; - - SELECT owner FROM {tablePrefix}_semaphores WHERE type = ? AND instance = ? ORDER BY position;"; - - await using var conn = await CreateConnection(); - try - { - var command = new MySqlCommand(_takeSql, conn) - { - Parameters = - { - new() { Value = group }, - new() { Value = instance }, - new() { Value = storedId.Serialize() }, - - new() { Value = group }, - new() { Value = instance }, - new() { Value = storedId.Serialize() }, - - new() { Value = group }, - new() { Value = instance }, - new() { Value = storedId.Serialize() } - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - var i = 0; - while (await reader.ReadAsync()) - { - var owner = StoredId.Deserialize(reader.GetString(0)); - if (owner == storedId) - return i < maximumCount; - - i++; - } - - return false; - } - catch (MySqlException e) when (e.Number == 1213) //deadlock found when trying to get lock; try restarting transaction - { - // ReSharper disable once DisposeOnUsingVariable - await conn.DisposeAsync(); //eagerly free taken connection - if (depth == 10) throw; - - await Task.Delay(Random.Shared.Next(10, 250)); - return await Acquire(group, instance, storedId, maximumCount, depth + 1); - } - } - - private string? _releaseSql; - public async Task> Release(string group, string instance, StoredId storedId, int maximumCount) - { - await using var conn = await CreateConnection(); - - _releaseSql ??= @$" - DELETE FROM {tablePrefix}_semaphores - WHERE type = ? AND instance = ? AND owner = ?; - - SELECT owner - FROM {tablePrefix}_semaphores - WHERE type = ? AND instance = ? - ORDER BY position - LIMIT ?;"; - - await using var command = new MySqlCommand(_releaseSql, conn) - { - Parameters = - { - new() { Value = group }, - new() { Value = instance }, - new() { Value = storedId.Serialize() }, - - new() { Value = group }, - new() { Value = instance }, - new() { Value = maximumCount }, - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - var ids = new List(); - while (await reader.ReadAsync()) - ids.Add(StoredId.Deserialize(reader.GetString(0))); - - return ids; - } - - private string? _getQueuedSql; - public async Task> GetQueued(string group, string instance, int count) - { - await using var conn = await CreateConnection(); - - _getQueuedSql ??= @$" - SELECT owner - FROM {tablePrefix}_semaphores - WHERE type = ? AND instance = ? - ORDER BY position - LIMIT ?; - "; - var command = new MySqlCommand(_getQueuedSql, conn) - { - Parameters = - { - new() { Value = group }, - new() { Value = instance }, - new() { Value = count }, - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - var ids = new List(); - while (await reader.ReadAsync()) - ids.Add(StoredId.Deserialize(reader.GetString(0))); - - return ids; - } - - private async Task CreateConnection() - { - var conn = new MySqlConnection(connectionString); - await conn.OpenAsync(); - return conn; - } -} \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/SemaphoreTests.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/SemaphoreTests.cs deleted file mode 100644 index 829470e7e..000000000 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/SemaphoreTests.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.PostgreSQL.Tests.RFunctionTests; - -[TestClass] -public class SemaphoreTests : ResilientFunctions.Tests.TestTemplates.FunctionTests.SemaphoreTests -{ - [TestMethod] - public override Task SunshineTest() - => SunshineTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task WaitingFlowIsInterruptedAfterSemaphoreBecomesFree() - => WaitingFlowIsInterruptedAfterSemaphoreBecomesFree(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task ExistingSemaphoreCanBeForceReleased() - => ExistingSemaphoreCanBeForceReleased(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SemaphoreAllowsTwoFlowsToContinueAtTheSameTime() - => SemaphoreAllowsTwoFlowsToContinueAtTheSameTime(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/SemaphoreStoreTests.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/SemaphoreStoreTests.cs deleted file mode 100644 index 4af691b86..000000000 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/SemaphoreStoreTests.cs +++ /dev/null @@ -1,25 +0,0 @@ -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Helpers; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.PostgreSQL.Tests; - -[TestClass] -public class SemaphoreStoreTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.SemaphoreStoreTests -{ - [TestMethod] - public override Task SunshineScenarioTest() - => SunshineScenarioTest(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); - - [TestMethod] - public override Task ReleasingSemaphoreTwiceSucceeds() - => ReleasingSemaphoreTwiceSucceeds(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); - - [TestMethod] - public override Task AcquiringTheSameSemaphoreTwiceIsIdempotent() - => AcquiringTheSameSemaphoreTwiceIsIdempotent(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); - - [TestMethod] - public override Task SemaphoreIsAcquiredDespitePositionBeingMoreThanSemaphoreCount() - => SemaphoreIsAcquiredDespitePositionBeingMoreThanSemaphoreCount(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); -} \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs index 3b7fcdd4d..6149fcd48 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs @@ -29,9 +29,7 @@ public class PostgreSqlFunctionStore : IFunctionStore private readonly ICorrelationStore _correlationStore; public ICorrelationStore CorrelationStore => _correlationStore; - - private readonly PostgreSqlSemaphoreStore _semaphoreStore; - public ISemaphoreStore SemaphoreStore => _semaphoreStore; + private readonly PostgreSqlDbReplicaStore _replicaStore; public IReplicaStore ReplicaStore => _replicaStore; @@ -49,7 +47,6 @@ public PostgreSqlFunctionStore(string connectionString, string tablePrefix = "") _messageStore = new PostgreSqlMessageStore(connectionString, _sqlGenerator, _tableName); _effectsStore = new PostgreSqlEffectsStore(connectionString, _tableName); _correlationStore = new PostgreSqlCorrelationStore(connectionString, _tableName); - _semaphoreStore = new PostgreSqlSemaphoreStore(connectionString, _tableName); _typeStore = new PostgreSqlTypeStore(connectionString, _tableName); _postgresSqlUnderlyingRegister = new PostgresSqlUnderlyingRegister(connectionString, _tableName); _replicaStore = new PostgreSqlDbReplicaStore(connectionString, _tableName); @@ -73,7 +70,6 @@ public async Task Initialize() await _messageStore.Initialize(); await _effectsStore.Initialize(); await _correlationStore.Initialize(); - await _semaphoreStore.Initialize(); await _typeStore.Initialize(); await _replicaStore.Initialize(); await using var conn = await CreateConnection(); @@ -112,7 +108,6 @@ public async Task TruncateTables() await _effectsStore.Truncate(); await _correlationStore.Truncate(); await _typeStore.Truncate(); - await _semaphoreStore.Truncate(); await _replicaStore.Truncate(); await using var conn = await CreateConnection(); diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlSemaphoreStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlSemaphoreStore.cs deleted file mode 100644 index a976daad1..000000000 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlSemaphoreStore.cs +++ /dev/null @@ -1,193 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Storage; -using Npgsql; - -namespace Cleipnir.ResilientFunctions.PostgreSQL; - -public class PostgreSqlSemaphoreStore(string connectionString, string tablePrefix = "") : ISemaphoreStore -{ - private string? _initializeSql; - public async Task Initialize() - { - await using var conn = await CreateConnection(); - _initializeSql ??= @$" - CREATE TABLE IF NOT EXISTS {tablePrefix}_semaphores ( - type VARCHAR(150) NOT NULL, - instance VARCHAR(150) NOT NULL, - position INT NOT NULL, - owner TEXT NOT NULL, - PRIMARY KEY (type, instance, position) - );"; - - var command = new NpgsqlCommand(_initializeSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _truncateSql; - public async Task Truncate() - { - await using var conn = await CreateConnection(); - _truncateSql ??= $"TRUNCATE TABLE {tablePrefix}_semaphores;"; - var command = new NpgsqlCommand(_truncateSql, conn); - await command.ExecuteNonQueryAsync(); - } - - public async Task Acquire(string group, string instance, StoredId storedId, int maximumCount) - => await Acquire(group, instance, storedId, maximumCount, depth: 0); - - private string? _takeSql; - private string? _readAfterTakeSql; - private async Task Acquire(string group, string instance, StoredId storedId, int maximumCount, int depth) - { - _takeSql ??= @$" - INSERT INTO {tablePrefix}_semaphores - SELECT - $1, - $2, - (SELECT COALESCE(MAX(position), -1) + 1 FROM {tablePrefix}_semaphores WHERE type = $1 AND instance = $2), - $3 - WHERE NOT EXISTS (SELECT 1 FROM {tablePrefix}_semaphores WHERE type = $1 AND instance = $2 AND owner = $3);"; - - _readAfterTakeSql ??= $"SELECT owner FROM {tablePrefix}_semaphores WHERE type = $1 AND instance = $2 ORDER BY position;"; - - await using var conn = await CreateConnection(); - try - { - await using var batch = new NpgsqlBatch(conn) - { - BatchCommands = - { - new NpgsqlBatchCommand(_takeSql) - { - Parameters = - { - new() { Value = group }, - new() { Value = instance }, - new() { Value = storedId.Serialize() } - }, - }, - new NpgsqlBatchCommand(_readAfterTakeSql) { - Parameters = - { - new() { Value = group }, - new() { Value = instance }, - new() { Value = storedId.Serialize() } - }, - }, - } - }; - - await using var reader = await batch.ExecuteReaderAsync(); - var i = 0; - while (await reader.ReadAsync()) - { - var owner = StoredId.Deserialize(reader.GetString(0)); - if (owner == storedId) - return i < maximumCount; - - i++; - } - - return false; - } - catch (PostgresException e) when (e.SqlState == "23505") - { - // ReSharper disable once DisposeOnUsingVariable - await conn.DisposeAsync(); //eagerly free taken connection - if (depth == 10) throw; - - await Task.Delay(Random.Shared.Next(10, 250)); - return await Acquire(group, instance, storedId, maximumCount, depth + 1); - } - } - - private string? _releaseSql; - private string? _releaseGetQueuedSql; - public async Task> Release(string group, string instance, StoredId storedId, int maximumCount) - { - await using var conn = await CreateConnection(); - await using var batch = new NpgsqlBatch(conn); - - { - _releaseSql ??= @$" - DELETE FROM {tablePrefix}_semaphores - WHERE type = $1 AND instance = $2 AND owner = $3;"; - var command = new NpgsqlBatchCommand(_releaseSql) - { - Parameters = - { - new() { Value = group }, - new() { Value = instance }, - new() { Value = storedId.Serialize() } - } - }; - batch.BatchCommands.Add(command); - } - - { - _releaseGetQueuedSql ??= @$" - SELECT owner - FROM {tablePrefix}_semaphores - WHERE type = $1 AND instance = $2 - ORDER BY position - LIMIT $3; - "; - - var command = new NpgsqlBatchCommand(_releaseGetQueuedSql) - { - Parameters = - { - new() { Value = group }, - new() { Value = instance }, - new() { Value = maximumCount } - } - }; - batch.BatchCommands.Add(command); - } - - await using var reader = await batch.ExecuteReaderAsync(); - var ids = new List(); - while (await reader.ReadAsync()) - ids.Add(StoredId.Deserialize(reader.GetString(0))); - - return ids; - } - - private string? _getQueued; - public async Task> GetQueued(string group, string instance, int count) - { - await using var conn = await CreateConnection(); - _getQueued ??= @$" - SELECT owner - FROM {tablePrefix}_semaphores - WHERE type = $1 AND instance = $2 - ORDER BY position - LIMIT $3; - "; - var command = new NpgsqlCommand(_getQueued, conn) - { - Parameters = - { - new() { Value = group }, - new() { Value = instance }, - new() { Value = count } - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - var ids = new List(); - while (await reader.ReadAsync()) - ids.Add(StoredId.Deserialize(reader.GetString(0))); - - return ids; - } - - private async Task CreateConnection() - { - var conn = new NpgsqlConnection(connectionString); - await conn.OpenAsync(); - return conn; - } -} \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/SemaphoreStoreTests.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/SemaphoreStoreTests.cs deleted file mode 100644 index 3bde5cf37..000000000 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/SemaphoreStoreTests.cs +++ /dev/null @@ -1,25 +0,0 @@ -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Helpers; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.SqlServer.Tests; - -[TestClass] -public class SemaphoreStoreTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.SemaphoreStoreTests -{ - [TestMethod] - public override Task SunshineScenarioTest() - => SunshineScenarioTest(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); - - [TestMethod] - public override Task ReleasingSemaphoreTwiceSucceeds() - => ReleasingSemaphoreTwiceSucceeds(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); - - [TestMethod] - public override Task AcquiringTheSameSemaphoreTwiceIsIdempotent() - => AcquiringTheSameSemaphoreTwiceIsIdempotent(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); - - [TestMethod] - public override Task SemaphoreIsAcquiredDespitePositionBeingMoreThanSemaphoreCount() - => SemaphoreIsAcquiredDespitePositionBeingMoreThanSemaphoreCount(FunctionStoreFactory.Create().SelectAsync(s => s.SemaphoreStore)); -} \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/SemaphoreTests.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/SemaphoreTests.cs deleted file mode 100644 index 0a3fe7f79..000000000 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/SemaphoreTests.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.SqlServer.Tests; - -[TestClass] -public class SemaphoreTests : ResilientFunctions.Tests.TestTemplates.FunctionTests.SemaphoreTests -{ - [TestMethod] - public override Task SunshineTest() - => SunshineTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task WaitingFlowIsInterruptedAfterSemaphoreBecomesFree() - => WaitingFlowIsInterruptedAfterSemaphoreBecomesFree(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task ExistingSemaphoreCanBeForceReleased() - => ExistingSemaphoreCanBeForceReleased(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SemaphoreAllowsTwoFlowsToContinueAtTheSameTime() - => SemaphoreAllowsTwoFlowsToContinueAtTheSameTime(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs index 801704469..2c817b6d4 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs @@ -33,8 +33,6 @@ public class SqlServerFunctionStore : IFunctionStore public ITypeStore TypeStore => _typeStore; public IMessageStore MessageStore => _messageStore; public Utilities Utilities { get; } - private readonly SqlServerSemaphoreStore _semaphoreStore; - public ISemaphoreStore SemaphoreStore => _semaphoreStore; private readonly SqlServerReplicaStore _replicaStore; public IReplicaStore ReplicaStore => _replicaStore; @@ -53,7 +51,6 @@ public SqlServerFunctionStore(string connectionString, string tablePrefix = "") _underlyingRegister = new SqlServerUnderlyingRegister(connectionString, _tableName); _effectsStore = new SqlServerEffectsStore(connectionString, _tableName); _correlationStore = new SqlServerCorrelationsStore(connectionString, _tableName); - _semaphoreStore = new SqlServerSemaphoreStore(connectionString, _tableName); _typeStore = new SqlServerTypeStore(connectionString, _tableName); _replicaStore = new SqlServerReplicaStore(connectionString, _tableName); Utilities = new Utilities(_underlyingRegister); @@ -80,7 +77,6 @@ public async Task Initialize() await _effectsStore.Initialize(); await _correlationStore.Initialize(); await _typeStore.Initialize(); - await _semaphoreStore.Initialize(); await _replicaStore.Initialize(); await using var conn = await _connFunc(); _initializeSql ??= @$" @@ -131,7 +127,6 @@ public async Task TruncateTables() await _effectsStore.Truncate(); await _correlationStore.Truncate(); await _typeStore.Truncate(); - await _semaphoreStore.Truncate(); await _replicaStore.Truncate(); await using var conn = await _connFunc(); diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerSemaphoreStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerSemaphoreStore.cs deleted file mode 100644 index f211deb64..000000000 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerSemaphoreStore.cs +++ /dev/null @@ -1,155 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Storage; -using Microsoft.Data.SqlClient; - -namespace Cleipnir.ResilientFunctions.SqlServer; - -public class SqlServerSemaphoreStore(string connectionString, string tablePrefix = "") : ISemaphoreStore -{ - private string? _initializeSql; - public async Task Initialize() - { - await using var conn = await CreateConnection(); - - _initializeSql ??= @$" - CREATE TABLE {tablePrefix}_Semaphores ( - Type NVARCHAR(150), - Instance NVARCHAR(150), - Position INT NOT NULL, - Owner NVARCHAR(MAX), - PRIMARY KEY (Type, Instance, Position) - );"; - var command = new SqlCommand(_initializeSql, conn); - try - { - await command.ExecuteNonQueryAsync(); - } catch (SqlException exception) when (exception.Number == 2714) {} - } - - private string? _truncateSql; - public async Task Truncate() - { - await using var conn = await CreateConnection(); - _truncateSql ??= $"TRUNCATE TABLE {tablePrefix}_Semaphores;"; - var command = new SqlCommand(_truncateSql, conn); - await command.ExecuteNonQueryAsync(); - } - - public async Task Acquire(string group, string instance, StoredId storedId, int maximumCount) - => await Acquire(group, instance, storedId, maximumCount, depth: 0); - - private string? _acquireSql; - private async Task Acquire(string group, string instance, StoredId storedId, int maximumCount, int depth) - { - await using var conn = await CreateConnection(); - - _acquireSql ??= @$" - INSERT INTO {tablePrefix}_Semaphores - SELECT @Type, - @Instance, - (SELECT COALESCE(MAX(Position), -1) + 1 - FROM {tablePrefix}_Semaphores - WHERE Type = @Type AND Instance = @Instance), - @Owner - WHERE NOT EXISTS ( - SELECT 1 - FROM {tablePrefix}_Semaphores - WHERE Type = @Type AND Instance = @Instance AND Owner = @Owner - ); - - SELECT Owner FROM {tablePrefix}_Semaphores WHERE Type = @Type AND Instance = @Instance ORDER BY Position"; - - try - { - var command = new SqlCommand(_acquireSql, conn); - command.Parameters.AddWithValue("@Type", group); - command.Parameters.AddWithValue("@Instance", instance); - command.Parameters.AddWithValue("@Owner", storedId.Serialize()); - - await using var reader = await command.ExecuteReaderAsync(); - //var hasNextResult = await reader.NextResultAsync(); - - var i = 0; - while (await reader.ReadAsync()) - { - var owner = StoredId.Deserialize(reader.GetString(0)); - if (owner == storedId) - return i < maximumCount; - - i++; - } - - return false; - } - catch (SqlException e) - { - if (depth == 10 || (e.Number != SqlError.DEADLOCK_VICTIM && e.Number != SqlError.UNIQUENESS_VIOLATION)) - throw; - - // ReSharper disable once DisposeOnUsingVariable - await conn.DisposeAsync(); - await Task.Delay(Random.Shared.Next(50, 250)); - return await Acquire(group, instance, storedId, maximumCount, depth + 1); - } - } - - private string? _releaseSql; - public async Task> Release(string group, string instance, StoredId storedId, int maximumCount) - { - await using var conn = await CreateConnection(); - - _releaseSql ??= @$" - DELETE FROM {tablePrefix}_Semaphores - WHERE Type = @Type AND Instance = @Instance AND Owner = @Owner; - - SELECT TOP(@Limit) Owner - FROM {tablePrefix}_Semaphores - WHERE Type = @Type AND Instance = @Instance - ORDER BY Position;"; - - var command = new SqlCommand(_releaseSql, conn); - command.Parameters.AddWithValue("@Type", group); - command.Parameters.AddWithValue("@Instance", instance); - command.Parameters.AddWithValue("@Owner", storedId.Serialize()); - command.Parameters.AddWithValue("@Limit", maximumCount); - - await using var reader = await command.ExecuteReaderAsync(); - var ids = new List(); - while (await reader.ReadAsync()) - ids.Add(StoredId.Deserialize(reader.GetString(0))); - - return ids; - } - - private string? _queuedSql; - public async Task> GetQueued(string group, string instance, int count) - { - await using var conn = await CreateConnection(); - _queuedSql ??= @$" - SELECT TOP(@Limit) Owner - FROM {tablePrefix}_Semaphores - WHERE Type = @Type AND Instance = @Instance - ORDER BY Position; - "; - var command = new SqlCommand(_queuedSql, conn); - command.Parameters.AddWithValue("@Type", group); - command.Parameters.AddWithValue("@Instance", instance); - command.Parameters.AddWithValue("@Limit", count); - - await using var reader = await command.ExecuteReaderAsync(); - var ids = new List(); - while (await reader.ReadAsync()) - ids.Add(StoredId.Deserialize(reader.GetString(0))); - - return ids; - } - - private async Task CreateConnection() - { - var conn = new SqlConnection(connectionString); - await conn.OpenAsync(); - return conn; - } -} \ No newline at end of file From 5629e43597fa1b642a7baa74228364fd716803c0 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sun, 26 Apr 2026 10:43:00 +0200 Subject: [PATCH 2/2] Remove dead FlowsManager.Suspend and Workflow flowsManager wiring --- .../Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs | 6 ------ .../CoreRuntime/Invocation/Invoker.cs | 5 ++--- .../CoreRuntime/Invocation/Workflow.cs | 4 +--- 3 files changed, 3 insertions(+), 12 deletions(-) diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs index 9cb3482b6..500f95821 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs @@ -1,9 +1,7 @@ using System; using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands; using Cleipnir.ResilientFunctions.Storage; namespace Cleipnir.ResilientFunctions.CoreRuntime; @@ -79,8 +77,4 @@ public void CompleteThread(StoredId id) if (_dict.TryGetValue(id, out var flowState)) flowState.SubflowCompleted(); } - - [DoesNotReturn] - public async Task Suspend(StoredId id) => throw new SuspendInvocationException(); - } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs index 896d9c5ef..0ae51144f 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs @@ -289,7 +289,7 @@ await _invocationHelper.PersistFunctionInStore( var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); var messageWriter = _invocationHelper.CreateMessageWriter(storedId); - var workflow = new Workflow(flowId, storedId, effect, _utilities, correlations, queueManager, _invocationHelper.UtcNow, messageWriter, _flowsManager); + var workflow = new Workflow(flowId, storedId, effect, _utilities, correlations, queueManager, _invocationHelper.UtcNow, messageWriter); return new PreparedInvocation( persisted, @@ -350,8 +350,7 @@ private async Task PrepareForReInvocation(StoredId storedI correlations, queueManager, _invocationHelper.UtcNow, - messageWriter, - _flowsManager + messageWriter ); return new PreparedReInvocation( diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs index 6f87f7662..265caec71 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs @@ -17,12 +17,11 @@ public class Workflow public Correlations Correlations { get; } private QueueManager _queueManager; - private FlowsManager _flowsManager; private readonly UtcNow _utcNow; private MessageWriter MessageWriter { get; } - public Workflow(FlowId flowId, StoredId storedId, Effect effect, Utilities utilities, Correlations correlations, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter, FlowsManager flowsManager) + public Workflow(FlowId flowId, StoredId storedId, Effect effect, Utilities utilities, Correlations correlations, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter) { FlowId = flowId; StoredId = storedId; @@ -32,7 +31,6 @@ public Workflow(FlowId flowId, StoredId storedId, Effect effect, Utilities utili _queueManager = queueManager; _utcNow = utcNow; MessageWriter = messageWriter; - _flowsManager = flowsManager; } public async Task RegisterCorrelation(string correlation) => await Correlations.Register(correlation);