From d4af08ca4940b407ede7686ea5f765da62a6876c Mon Sep 17 00:00:00 2001 From: stidsborg Date: Mon, 25 May 2026 09:38:49 +0200 Subject: [PATCH 1/2] PostgreSQL: bulk AppendMessages in a single round-trip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move the per-flow AppendMessages SQL into SqlGenerator and have the bulk overload group by StoredId, build one append command per flow, and execute together with the interrupt in a single NpgsqlBatch. This drops the separate GetMaxPositions round-trip — positions are now computed server-side via COALESCE(MAX(position), -1) + random base + unnest(...) WITH ORDINALITY, so the SQL string is constant regardless of batch size and the prepared statement cache is reused across calls. --- .../PostgreSqlMessageStore.cs | 62 ++++--------------- .../SqlGenerator.cs | 38 ++++++++++++ 2 files changed, 50 insertions(+), 50 deletions(-) diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs index 79a4ffbab..438c72537 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs @@ -1,5 +1,4 @@ -using System; -using System.Collections.Generic; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.Helpers; @@ -60,64 +59,27 @@ public async Task TruncateTable() public async Task AppendMessage(StoredId storedId, StoredMessage storedMessage) => await messageBatcher.Handle(storedId, [storedMessage]); - private string? _appendMessagesSql; - private async Task AppendMessages(StoredId storedId, IReadOnlyList messages) + private Task AppendMessages(StoredId storedId, IReadOnlyList messages) + => AppendMessages(messages.Select(m => new StoredIdAndMessage(storedId, m)).ToList()); + + public async Task AppendMessages(IReadOnlyList messages) { if (messages.Count == 0) return; - var randomOffset = Random.Shared.Next(); - - _appendMessagesSql ??= @$" - WITH max_pos AS ( - SELECT COALESCE(MAX(position), -1) + 2147483647 + $2 AS pos - FROM {tablePrefix}_messages - WHERE id = $1 - ) - INSERT INTO {tablePrefix}_messages (id, position, content) - SELECT $1, (SELECT pos FROM max_pos) + ord, content - FROM unnest($3::bytea[]) WITH ORDINALITY AS t(content, ord);"; - - var contents = new byte[messages.Count][]; - for (var i = 0; i < messages.Count; i++) - { - var (messageContent, messageType, _, idempotencyKey, sender, receiver) = messages[i]; - contents[i] = BinaryPacker.Pack(messageContent, messageType, idempotencyKey?.ToUtf8Bytes(), sender?.ToUtf8Bytes(), receiver?.ToUtf8Bytes()); - } - - var appendBatchCommand = new NpgsqlBatchCommand(_appendMessagesSql); - appendBatchCommand.Parameters.Add(new() { Value = storedId.AsGuid }); - appendBatchCommand.Parameters.Add(new() { Value = (long)randomOffset }); - appendBatchCommand.Parameters.Add(new() { Value = contents }); - - var interruptCommand = sqlGenerator.Interrupt([storedId]); + var commands = messages + .GroupBy(m => m.StoredId) + .Select(g => sqlGenerator.AppendMessages(g.Key, g.Select(m => m.StoredMessage))) + .Append(sqlGenerator.Interrupt(messages.Select(m => m.StoredId).Distinct())); await using var conn = await CreateConnection(); - await using var batch = new NpgsqlBatch(conn); - batch.BatchCommands.Add(appendBatchCommand); - batch.BatchCommands.Add(interruptCommand.ToNpgsqlBatchCommand()); + await using var batch = commands + .ToNpgsqlBatch() + .WithConnection(conn); await batch.ExecuteNonQueryAsync(); } - public async Task AppendMessages(IReadOnlyList messages) - { - var maxPositions = await GetMaxPositions( - storedIds: messages.Select(msg => msg.StoredId).Distinct().ToList() - ); - - var messageWithPositions = messages - .Select(msg => - new StoredIdAndMessageWithPosition( - msg.StoredId, - msg.StoredMessage, - ++maxPositions[msg.StoredId] - ) - ).ToList(); - - await AppendMessages(messageWithPositions); - } - public async Task AppendMessages(IReadOnlyList messages) { if (messages.Count == 0) diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs index 50b2223d3..484284116 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs @@ -452,6 +452,44 @@ 10 owner return null; } + private string? _appendMessagesSql; + public StoreCommand AppendMessages(StoredId storedId, IEnumerable messages) + { + // Computes per-message positions server-side so the SQL string is constant regardless of batch size. + // max_pos: base = COALESCE(MAX(position), -1) + 2147483647 + $2 (random) — placed well above any + // existing position and randomized per call so concurrent appenders to the same flow rarely collide. + // unnest($3::bytea[]) WITH ORDINALITY expands the byte[][] parameter into rows with a 1-based offset, + // so each inserted row gets a unique position = base + pos_offset, in caller order. + _appendMessagesSql ??= @$" + WITH max_pos AS ( + SELECT COALESCE(MAX(position), -1) + 2147483647 + $2 AS pos + FROM {tablePrefix}_messages + WHERE id = $1 + ) + INSERT INTO {tablePrefix}_messages (id, position, content) + SELECT $1, (SELECT pos FROM max_pos) + pos_offset, content + FROM unnest($3::bytea[]) WITH ORDINALITY AS t(content, pos_offset);"; + + var contents = messages + .Select(m => BinaryPacker.Pack( + m.MessageContent, + m.MessageType, + m.IdempotencyKey?.ToUtf8Bytes(), + m.Sender?.ToUtf8Bytes(), + m.Receiver?.ToUtf8Bytes() + )) + .ToArray(); + + return StoreCommand.Create( + _appendMessagesSql, + values: [ + storedId.AsGuid, + (long)Random.Shared.Next(), + contents + ] + ); + } + public StoreCommand AppendMessages(IReadOnlyList messages) { var sql = @$" From 7b14d23464fa0b0379a1e92aaf28b6956a0001a9 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Mon, 25 May 2026 10:22:50 +0200 Subject: [PATCH 2/2] EffectLoopingWorks: raise BusyWait timeouts to 30s The default 10s threshold occasionally trips on the SqlServer CI runner under 12-way parallel test execution. Bumping to 30s gives the workflow restart + replay cycle more headroom without changing test semantics. --- .../TestTemplates/FunctionTests/EffectTests.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs index d9eeff98a..9009b9851 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs @@ -1021,8 +1021,8 @@ await elms.CaptureEach( for (var i = 0; i < 6; i++) { - await BusyWait.Until(() => flag.Value == i); - await cp.BusyWaitUntil(c => c.Status == Status.Suspended); + await BusyWait.Until(() => flag.Value == i, maxWait: TimeSpan.FromSeconds(30)); + await cp.BusyWaitUntil(c => c.Status == Status.Suspended, maxWait: TimeSpan.FromSeconds(30)); var storedEffects = await effectStore.GetEffectResults(registration.MapToStoredId(id.Instance)); storedEffects.Any(e => e.Alias == "Before").ShouldBeTrue(); storedEffects.Any(e => e.Alias == "Loop").ShouldBeTrue(); @@ -1032,7 +1032,7 @@ await elms.CaptureEach( await messageWriter.AppendMessage(i.ToString()); } - await cp.BusyWaitUntil(c => c.Status == Status.Succeeded); + await cp.BusyWaitUntil(c => c.Status == Status.Succeeded, maxWait: TimeSpan.FromSeconds(30)); iterations.SequenceEqual([0, 1, 2, 3, 4, 5]).ShouldBeTrue(); }