Skip to content

Commit b34b72b

Browse files
authored
PostgreSQL: bulk AppendMessages in a single round-trip (#152)
* PostgreSQL: bulk AppendMessages in a single round-trip Move the per-flow AppendMessages SQL into SqlGenerator and have the bulk overload group by StoredId, build one append command per flow, and execute together with the interrupt in a single NpgsqlBatch. This drops the separate GetMaxPositions round-trip — positions are now computed server-side via COALESCE(MAX(position), -1) + random base + unnest(...) WITH ORDINALITY, so the SQL string is constant regardless of batch size and the prepared statement cache is reused across calls. * EffectLoopingWorks: raise BusyWait timeouts to 30s The default 10s threshold occasionally trips on the SqlServer CI runner under 12-way parallel test execution. Bumping to 30s gives the workflow restart + replay cycle more headroom without changing test semantics.
1 parent 0c8e1f6 commit b34b72b

3 files changed

Lines changed: 53 additions & 53 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,8 +1021,8 @@ await elms.CaptureEach(
10211021

10221022
for (var i = 0; i < 6; i++)
10231023
{
1024-
await BusyWait.Until(() => flag.Value == i);
1025-
await cp.BusyWaitUntil(c => c.Status == Status.Suspended);
1024+
await BusyWait.Until(() => flag.Value == i, maxWait: TimeSpan.FromSeconds(30));
1025+
await cp.BusyWaitUntil(c => c.Status == Status.Suspended, maxWait: TimeSpan.FromSeconds(30));
10261026
var storedEffects = await effectStore.GetEffectResults(registration.MapToStoredId(id.Instance));
10271027
storedEffects.Any(e => e.Alias == "Before").ShouldBeTrue();
10281028
storedEffects.Any(e => e.Alias == "Loop").ShouldBeTrue();
@@ -1032,7 +1032,7 @@ await elms.CaptureEach(
10321032
await messageWriter.AppendMessage(i.ToString());
10331033
}
10341034

1035-
await cp.BusyWaitUntil(c => c.Status == Status.Succeeded);
1035+
await cp.BusyWaitUntil(c => c.Status == Status.Succeeded, maxWait: TimeSpan.FromSeconds(30));
10361036

10371037
iterations.SequenceEqual([0, 1, 2, 3, 4, 5]).ShouldBeTrue();
10381038
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs

Lines changed: 12 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System;
2-
using System.Collections.Generic;
1+
using System.Collections.Generic;
32
using System.Linq;
43
using System.Threading.Tasks;
54
using Cleipnir.ResilientFunctions.Helpers;
@@ -60,64 +59,27 @@ public async Task TruncateTable()
6059
public async Task AppendMessage(StoredId storedId, StoredMessage storedMessage)
6160
=> await messageBatcher.Handle(storedId, [storedMessage]);
6261

63-
private string? _appendMessagesSql;
64-
private async Task AppendMessages(StoredId storedId, IReadOnlyList<StoredMessage> messages)
62+
private Task AppendMessages(StoredId storedId, IReadOnlyList<StoredMessage> messages)
63+
=> AppendMessages(messages.Select(m => new StoredIdAndMessage(storedId, m)).ToList());
64+
65+
public async Task AppendMessages(IReadOnlyList<StoredIdAndMessage> messages)
6566
{
6667
if (messages.Count == 0)
6768
return;
6869

69-
var randomOffset = Random.Shared.Next();
70-
71-
_appendMessagesSql ??= @$"
72-
WITH max_pos AS (
73-
SELECT COALESCE(MAX(position), -1) + 2147483647 + $2 AS pos
74-
FROM {tablePrefix}_messages
75-
WHERE id = $1
76-
)
77-
INSERT INTO {tablePrefix}_messages (id, position, content)
78-
SELECT $1, (SELECT pos FROM max_pos) + ord, content
79-
FROM unnest($3::bytea[]) WITH ORDINALITY AS t(content, ord);";
80-
81-
var contents = new byte[messages.Count][];
82-
for (var i = 0; i < messages.Count; i++)
83-
{
84-
var (messageContent, messageType, _, idempotencyKey, sender, receiver) = messages[i];
85-
contents[i] = BinaryPacker.Pack(messageContent, messageType, idempotencyKey?.ToUtf8Bytes(), sender?.ToUtf8Bytes(), receiver?.ToUtf8Bytes());
86-
}
87-
88-
var appendBatchCommand = new NpgsqlBatchCommand(_appendMessagesSql);
89-
appendBatchCommand.Parameters.Add(new() { Value = storedId.AsGuid });
90-
appendBatchCommand.Parameters.Add(new() { Value = (long)randomOffset });
91-
appendBatchCommand.Parameters.Add(new() { Value = contents });
92-
93-
var interruptCommand = sqlGenerator.Interrupt([storedId]);
70+
var commands = messages
71+
.GroupBy(m => m.StoredId)
72+
.Select(g => sqlGenerator.AppendMessages(g.Key, g.Select(m => m.StoredMessage)))
73+
.Append(sqlGenerator.Interrupt(messages.Select(m => m.StoredId).Distinct()));
9474

9575
await using var conn = await CreateConnection();
96-
await using var batch = new NpgsqlBatch(conn);
97-
batch.BatchCommands.Add(appendBatchCommand);
98-
batch.BatchCommands.Add(interruptCommand.ToNpgsqlBatchCommand());
76+
await using var batch = commands
77+
.ToNpgsqlBatch()
78+
.WithConnection(conn);
9979

10080
await batch.ExecuteNonQueryAsync();
10181
}
10282

103-
public async Task AppendMessages(IReadOnlyList<StoredIdAndMessage> messages)
104-
{
105-
var maxPositions = await GetMaxPositions(
106-
storedIds: messages.Select(msg => msg.StoredId).Distinct().ToList()
107-
);
108-
109-
var messageWithPositions = messages
110-
.Select(msg =>
111-
new StoredIdAndMessageWithPosition(
112-
msg.StoredId,
113-
msg.StoredMessage,
114-
++maxPositions[msg.StoredId]
115-
)
116-
).ToList();
117-
118-
await AppendMessages(messageWithPositions);
119-
}
120-
12183
public async Task AppendMessages(IReadOnlyList<StoredIdAndMessageWithPosition> messages)
12284
{
12385
if (messages.Count == 0)

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/SqlGenerator.cs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,44 @@ 10 owner
452452
return null;
453453
}
454454

455+
private string? _appendMessagesSql;
456+
public StoreCommand AppendMessages(StoredId storedId, IEnumerable<StoredMessage> messages)
457+
{
458+
// Computes per-message positions server-side so the SQL string is constant regardless of batch size.
459+
// max_pos: base = COALESCE(MAX(position), -1) + 2147483647 + $2 (random) — placed well above any
460+
// existing position and randomized per call so concurrent appenders to the same flow rarely collide.
461+
// unnest($3::bytea[]) WITH ORDINALITY expands the byte[][] parameter into rows with a 1-based offset,
462+
// so each inserted row gets a unique position = base + pos_offset, in caller order.
463+
_appendMessagesSql ??= @$"
464+
WITH max_pos AS (
465+
SELECT COALESCE(MAX(position), -1) + 2147483647 + $2 AS pos
466+
FROM {tablePrefix}_messages
467+
WHERE id = $1
468+
)
469+
INSERT INTO {tablePrefix}_messages (id, position, content)
470+
SELECT $1, (SELECT pos FROM max_pos) + pos_offset, content
471+
FROM unnest($3::bytea[]) WITH ORDINALITY AS t(content, pos_offset);";
472+
473+
var contents = messages
474+
.Select(m => BinaryPacker.Pack(
475+
m.MessageContent,
476+
m.MessageType,
477+
m.IdempotencyKey?.ToUtf8Bytes(),
478+
m.Sender?.ToUtf8Bytes(),
479+
m.Receiver?.ToUtf8Bytes()
480+
))
481+
.ToArray();
482+
483+
return StoreCommand.Create(
484+
_appendMessagesSql,
485+
values: [
486+
storedId.AsGuid,
487+
(long)Random.Shared.Next(),
488+
contents
489+
]
490+
);
491+
}
492+
455493
public StoreCommand AppendMessages(IReadOnlyList<StoredIdAndMessageWithPosition> messages)
456494
{
457495
var sql = @$"

0 commit comments

Comments
 (0)