Skip to content

Commit 6161dfd

Browse files
committed
Added function store GetResults-method
1 parent db557a8 commit 6161dfd

12 files changed

Lines changed: 284 additions & 0 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoreTests.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,4 +245,16 @@ public override Task GetInterruptedFunctionsReturnsEmptyListWhenQueriedIdsDoNotE
245245
[TestMethod]
246246
public override Task GetInterruptedFunctionsOnlyReturnsMatchingInterruptedFunctions()
247247
=> GetInterruptedFunctionsOnlyReturnsMatchingInterruptedFunctions(FunctionStoreFactory.Create());
248+
249+
[TestMethod]
250+
public override Task GetResultsReturnsResultsForExistingFunctions()
251+
=> GetResultsReturnsResultsForExistingFunctions(FunctionStoreFactory.Create());
252+
253+
[TestMethod]
254+
public override Task GetResultsReturnsEmptyDictionaryForEmptyInput()
255+
=> GetResultsReturnsEmptyDictionaryForEmptyInput(FunctionStoreFactory.Create());
256+
257+
[TestMethod]
258+
public override Task GetResultsReturnsOnlyExistingFunctionResults()
259+
=> GetResultsReturnsOnlyExistingFunctionResults(FunctionStoreFactory.Create());
248260
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreTests.cs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2067,4 +2067,133 @@ await store.CreateFunction(
20672067
interruptedFunctions.Any(id => id == functionId1).ShouldBeFalse();
20682068
interruptedFunctions.Any(id => id == functionId3).ShouldBeFalse();
20692069
}
2070+
2071+
public abstract Task GetResultsReturnsResultsForExistingFunctions();
2072+
protected async Task GetResultsReturnsResultsForExistingFunctions(Task<IFunctionStore> storeTask)
2073+
{
2074+
var store = await storeTask;
2075+
var functionId1 = TestStoredId.Create();
2076+
var functionId2 = StoredId.Create(functionId1.Type, Guid.NewGuid().ToString());
2077+
var functionId3 = StoredId.Create(functionId1.Type, Guid.NewGuid().ToString());
2078+
2079+
var result1 = "result1".ToJson().ToUtf8Bytes();
2080+
var result2 = "result2".ToJson().ToUtf8Bytes();
2081+
2082+
// Create function 1 and succeed with result1
2083+
await store.CreateFunction(
2084+
functionId1,
2085+
"humanInstanceId1",
2086+
param: Test.SimpleStoredParameter,
2087+
leaseExpiration: DateTime.UtcNow.Ticks,
2088+
postponeUntil: null,
2089+
timestamp: DateTime.UtcNow.Ticks,
2090+
parent: null,
2091+
owner: ReplicaId.Empty
2092+
).ShouldNotBeNullAsync();
2093+
2094+
await store.SucceedFunction(
2095+
functionId1,
2096+
result: result1,
2097+
timestamp: DateTime.UtcNow.Ticks,
2098+
expectedReplica: ReplicaId.Empty,
2099+
effects: null,
2100+
messages: null,
2101+
storageSession: null
2102+
).ShouldBeTrueAsync();
2103+
2104+
// Create function 2 and succeed with result2
2105+
await store.CreateFunction(
2106+
functionId2,
2107+
"humanInstanceId2",
2108+
param: Test.SimpleStoredParameter,
2109+
leaseExpiration: DateTime.UtcNow.Ticks,
2110+
postponeUntil: null,
2111+
timestamp: DateTime.UtcNow.Ticks,
2112+
parent: null,
2113+
owner: ReplicaId.Empty
2114+
).ShouldNotBeNullAsync();
2115+
2116+
await store.SucceedFunction(
2117+
functionId2,
2118+
result: result2,
2119+
timestamp: DateTime.UtcNow.Ticks,
2120+
expectedReplica: ReplicaId.Empty,
2121+
effects: null,
2122+
messages: null,
2123+
storageSession: null
2124+
).ShouldBeTrueAsync();
2125+
2126+
// Create function 3 with no result (just created, not completed)
2127+
await store.CreateFunction(
2128+
functionId3,
2129+
"humanInstanceId3",
2130+
param: Test.SimpleStoredParameter,
2131+
leaseExpiration: DateTime.UtcNow.Ticks,
2132+
postponeUntil: null,
2133+
timestamp: DateTime.UtcNow.Ticks,
2134+
parent: null,
2135+
owner: null
2136+
).ShouldNotBeNullAsync();
2137+
2138+
// Get results for all three functions
2139+
var results = await store.GetResults([functionId1, functionId2, functionId3]);
2140+
2141+
// Verify results
2142+
results.Count.ShouldBe(3);
2143+
results[functionId1].ShouldBe(result1);
2144+
results[functionId2].ShouldBe(result2);
2145+
results[functionId3].ShouldBeNull();
2146+
}
2147+
2148+
public abstract Task GetResultsReturnsEmptyDictionaryForEmptyInput();
2149+
protected async Task GetResultsReturnsEmptyDictionaryForEmptyInput(Task<IFunctionStore> storeTask)
2150+
{
2151+
var store = await storeTask;
2152+
2153+
var results = await store.GetResults([]);
2154+
2155+
results.ShouldNotBeNull();
2156+
results.Count.ShouldBe(0);
2157+
}
2158+
2159+
public abstract Task GetResultsReturnsOnlyExistingFunctionResults();
2160+
protected async Task GetResultsReturnsOnlyExistingFunctionResults(Task<IFunctionStore> storeTask)
2161+
{
2162+
var store = await storeTask;
2163+
var existingFunctionId = TestStoredId.Create();
2164+
var nonExistentFunctionId = StoredId.Create(existingFunctionId.Type, Guid.NewGuid().ToString());
2165+
2166+
var result = "my result".ToJson().ToUtf8Bytes();
2167+
2168+
// Create and succeed one function
2169+
await store.CreateFunction(
2170+
existingFunctionId,
2171+
"humanInstanceId",
2172+
param: Test.SimpleStoredParameter,
2173+
leaseExpiration: DateTime.UtcNow.Ticks,
2174+
postponeUntil: null,
2175+
timestamp: DateTime.UtcNow.Ticks,
2176+
parent: null,
2177+
owner: ReplicaId.Empty
2178+
).ShouldNotBeNullAsync();
2179+
2180+
await store.SucceedFunction(
2181+
existingFunctionId,
2182+
result: result,
2183+
timestamp: DateTime.UtcNow.Ticks,
2184+
expectedReplica: ReplicaId.Empty,
2185+
effects: null,
2186+
messages: null,
2187+
storageSession: null
2188+
).ShouldBeTrueAsync();
2189+
2190+
// Query for both existing and non-existent function
2191+
var results = await store.GetResults([existingFunctionId, nonExistentFunctionId]);
2192+
2193+
// Should only return the existing function
2194+
results.Count.ShouldBe(1);
2195+
results.ContainsKey(existingFunctionId).ShouldBeTrue();
2196+
results[existingFunctionId].ShouldBe(result);
2197+
results.ContainsKey(nonExistentFunctionId).ShouldBeFalse();
2198+
}
20702199
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,11 @@ public Task<bool> DeleteFunction(StoredId storedId)
212212
? Task.FromException<bool>(new TimeoutException())
213213
: _inner.DeleteFunction(storedId);
214214

215+
public Task<IReadOnlyDictionary<StoredId, byte[]?>> GetResults(IEnumerable<StoredId> storedIds)
216+
=> _crashed
217+
? Task.FromException<IReadOnlyDictionary<StoredId, byte[]?>>(new TimeoutException())
218+
: _inner.GetResults(storedIds);
219+
215220
public IFunctionStore WithPrefix(string prefix) => _inner.WithPrefix(prefix);
216221
}
217222

Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,5 @@ Task<bool> SuspendFunction(
112112
Task<bool> DeleteFunction(StoredId storedId);
113113

114114
IFunctionStore WithPrefix(string prefix);
115+
Task<IReadOnlyDictionary<StoredId, byte[]?>> GetResults(IEnumerable<StoredId> storedIds);
115116
}

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,21 @@ public virtual Task<bool> DeleteFunction(StoredId storedId)
495495

496496
public IFunctionStore WithPrefix(string prefix) => new InMemoryFunctionStore();
497497

498+
public Task<IReadOnlyDictionary<StoredId, byte[]?>> GetResults(IEnumerable<StoredId> storedIds)
499+
{
500+
lock (_sync)
501+
{
502+
var results = new Dictionary<StoredId, byte[]?>();
503+
foreach (var storedId in storedIds)
504+
{
505+
if (_states.TryGetValue(storedId, out var state))
506+
results[storedId] = state.Result;
507+
}
508+
509+
return results.CastTo<IReadOnlyDictionary<StoredId, byte[]?>>().ToTask();
510+
}
511+
}
512+
498513
private class InnerState
499514
{
500515
public StoredId StoredId { get; init; } = null!;

Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,11 @@ public Task<bool> DeleteFunction(StoredId storedId)
190190
? Task.FromException<bool>(new TimeoutException())
191191
: _inner.DeleteFunction(storedId);
192192

193+
public Task<IReadOnlyDictionary<StoredId, byte[]?>> GetResults(IEnumerable<StoredId> storedIds)
194+
=> _crashed
195+
? Task.FromException<IReadOnlyDictionary<StoredId, byte[]?>>(new TimeoutException())
196+
: _inner.GetResults(storedIds);
197+
193198
public IFunctionStore WithPrefix(string prefix)
194199
=> _inner.WithPrefix(prefix);
195200
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/StoreTests.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,4 +234,16 @@ public override Task GetInterruptedFunctionsReturnsEmptyListWhenQueriedIdsDoNotE
234234
[TestMethod]
235235
public override Task GetInterruptedFunctionsOnlyReturnsMatchingInterruptedFunctions()
236236
=> GetInterruptedFunctionsOnlyReturnsMatchingInterruptedFunctions(FunctionStoreFactory.Create());
237+
238+
[TestMethod]
239+
public override Task GetResultsReturnsResultsForExistingFunctions()
240+
=> GetResultsReturnsResultsForExistingFunctions(FunctionStoreFactory.Create());
241+
242+
[TestMethod]
243+
public override Task GetResultsReturnsEmptyDictionaryForEmptyInput()
244+
=> GetResultsReturnsEmptyDictionaryForEmptyInput(FunctionStoreFactory.Create());
245+
246+
[TestMethod]
247+
public override Task GetResultsReturnsOnlyExistingFunctionResults()
248+
=> GetResultsReturnsOnlyExistingFunctionResults(FunctionStoreFactory.Create());
237249
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,34 @@ public async Task<bool> DeleteFunction(StoredId storedId)
752752
public IFunctionStore WithPrefix(string prefix)
753753
=> new MariaDbFunctionStore(_connectionString, prefix);
754754

755+
public async Task<IReadOnlyDictionary<StoredId, byte[]?>> GetResults(IEnumerable<StoredId> storedIds)
756+
{
757+
var inSql = storedIds.Select(id => $"'{id.AsGuid:N}'").StringJoin(", ");
758+
if (inSql == "")
759+
return new Dictionary<StoredId, byte[]?>();
760+
761+
var sql = @$"
762+
SELECT id, result_json
763+
FROM {_tablePrefix}
764+
WHERE id IN ({inSql})";
765+
766+
await using var conn = await CreateOpenConnection(_connectionString);
767+
await using var command = new MySqlCommand(sql, conn);
768+
769+
await using var reader = await command.ExecuteReaderAsync();
770+
var results = new Dictionary<StoredId, byte[]?>();
771+
while (await reader.ReadAsync())
772+
{
773+
var guid = reader.GetString(0).ToGuid();
774+
var storedId = new StoredId(guid);
775+
var hasResult = !await reader.IsDBNullAsync(1);
776+
var result = hasResult ? (byte[])reader.GetValue(1) : null;
777+
results[storedId] = result;
778+
}
779+
780+
return results;
781+
}
782+
755783
private string? _deleteFunctionSql;
756784
private async Task<bool> DeleteStoredFunction(StoredId storedId)
757785
{

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/StoreTests.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,4 +237,16 @@ public override Task GetInterruptedFunctionsReturnsEmptyListWhenQueriedIdsDoNotE
237237
[TestMethod]
238238
public override Task GetInterruptedFunctionsOnlyReturnsMatchingInterruptedFunctions()
239239
=> GetInterruptedFunctionsOnlyReturnsMatchingInterruptedFunctions(FunctionStoreFactory.Create());
240+
241+
[TestMethod]
242+
public override Task GetResultsReturnsResultsForExistingFunctions()
243+
=> GetResultsReturnsResultsForExistingFunctions(FunctionStoreFactory.Create());
244+
245+
[TestMethod]
246+
public override Task GetResultsReturnsEmptyDictionaryForEmptyInput()
247+
=> GetResultsReturnsEmptyDictionaryForEmptyInput(FunctionStoreFactory.Create());
248+
249+
[TestMethod]
250+
public override Task GetResultsReturnsOnlyExistingFunctionResults()
251+
=> GetResultsReturnsOnlyExistingFunctionResults(FunctionStoreFactory.Create());
240252
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,33 @@ public IFunctionStore WithPrefix(string prefix)
721721
prefix
722722
);
723723

724+
public async Task<IReadOnlyDictionary<StoredId, byte[]?>> GetResults(IEnumerable<StoredId> storedIds)
725+
{
726+
var idsClause = storedIds.Select(id => $"'{id.AsGuid}'").StringJoin(", ");
727+
if (idsClause == "")
728+
return new Dictionary<StoredId, byte[]?>();
729+
730+
var sql = @$"
731+
SELECT id, result_json
732+
FROM {_tableName}
733+
WHERE id IN ({idsClause})";
734+
735+
await using var conn = await CreateConnection();
736+
await using var command = new NpgsqlCommand(sql, conn);
737+
738+
await using var reader = await command.ExecuteReaderAsync();
739+
var results = new Dictionary<StoredId, byte[]?>();
740+
while (await reader.ReadAsync())
741+
{
742+
var storedId = new StoredId(reader.GetGuid(0));
743+
var hasResult = !await reader.IsDBNullAsync(1);
744+
var result = hasResult ? (byte[])reader.GetValue(1) : null;
745+
results[storedId] = result;
746+
}
747+
748+
return results;
749+
}
750+
724751
private string? _deleteFunctionSql;
725752
private async Task<bool> DeleteStoredFunction(StoredId storedId)
726753
{

0 commit comments

Comments
 (0)