Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1021,8 +1021,8 @@ await elms.CaptureEach(

for (var i = 0; i < 6; i++)
{
await BusyWait.Until(() => flag.Value == i);
await cp.BusyWaitUntil(c => c.Status == Status.Suspended);
await BusyWait.Until(() => flag.Value == i, maxWait: TimeSpan.FromSeconds(30));
await cp.BusyWaitUntil(c => c.Status == Status.Suspended, maxWait: TimeSpan.FromSeconds(30));
var storedEffects = await effectStore.GetEffectResults(registration.MapToStoredId(id.Instance));
storedEffects.Any(e => e.Alias == "Before").ShouldBeTrue();
storedEffects.Any(e => e.Alias == "Loop").ShouldBeTrue();
Expand All @@ -1032,7 +1032,7 @@ await elms.CaptureEach(
await messageWriter.AppendMessage(i.ToString());
}

await cp.BusyWaitUntil(c => c.Status == Status.Succeeded);
await cp.BusyWaitUntil(c => c.Status == Status.Succeeded, maxWait: TimeSpan.FromSeconds(30));

iterations.SequenceEqual([0, 1, 2, 3, 4, 5]).ShouldBeTrue();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Helpers;
Expand Down Expand Up @@ -60,64 +59,27 @@ public async Task TruncateTable()
public async Task AppendMessage(StoredId storedId, StoredMessage storedMessage)
=> await messageBatcher.Handle(storedId, [storedMessage]);

private string? _appendMessagesSql;
private async Task AppendMessages(StoredId storedId, IReadOnlyList<StoredMessage> messages)
private Task AppendMessages(StoredId storedId, IReadOnlyList<StoredMessage> messages)
=> AppendMessages(messages.Select(m => new StoredIdAndMessage(storedId, m)).ToList());

public async Task AppendMessages(IReadOnlyList<StoredIdAndMessage> messages)
{
if (messages.Count == 0)
return;

var randomOffset = Random.Shared.Next();

_appendMessagesSql ??= @$"
WITH max_pos AS (
SELECT COALESCE(MAX(position), -1) + 2147483647 + $2 AS pos
FROM {tablePrefix}_messages
WHERE id = $1
)
INSERT INTO {tablePrefix}_messages (id, position, content)
SELECT $1, (SELECT pos FROM max_pos) + ord, content
FROM unnest($3::bytea[]) WITH ORDINALITY AS t(content, ord);";

var contents = new byte[messages.Count][];
for (var i = 0; i < messages.Count; i++)
{
var (messageContent, messageType, _, idempotencyKey, sender, receiver) = messages[i];
contents[i] = BinaryPacker.Pack(messageContent, messageType, idempotencyKey?.ToUtf8Bytes(), sender?.ToUtf8Bytes(), receiver?.ToUtf8Bytes());
}

var appendBatchCommand = new NpgsqlBatchCommand(_appendMessagesSql);
appendBatchCommand.Parameters.Add(new() { Value = storedId.AsGuid });
appendBatchCommand.Parameters.Add(new() { Value = (long)randomOffset });
appendBatchCommand.Parameters.Add(new() { Value = contents });

var interruptCommand = sqlGenerator.Interrupt([storedId]);
var commands = messages
.GroupBy(m => m.StoredId)
.Select(g => sqlGenerator.AppendMessages(g.Key, g.Select(m => m.StoredMessage)))
.Append(sqlGenerator.Interrupt(messages.Select(m => m.StoredId).Distinct()));

await using var conn = await CreateConnection();
await using var batch = new NpgsqlBatch(conn);
batch.BatchCommands.Add(appendBatchCommand);
batch.BatchCommands.Add(interruptCommand.ToNpgsqlBatchCommand());
await using var batch = commands
.ToNpgsqlBatch()
.WithConnection(conn);

await batch.ExecuteNonQueryAsync();
}

public async Task AppendMessages(IReadOnlyList<StoredIdAndMessage> messages)
{
var maxPositions = await GetMaxPositions(
storedIds: messages.Select(msg => msg.StoredId).Distinct().ToList()
);

var messageWithPositions = messages
.Select(msg =>
new StoredIdAndMessageWithPosition(
msg.StoredId,
msg.StoredMessage,
++maxPositions[msg.StoredId]
)
).ToList();

await AppendMessages(messageWithPositions);
}

public async Task AppendMessages(IReadOnlyList<StoredIdAndMessageWithPosition> messages)
{
if (messages.Count == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,44 @@ 10 owner
return null;
}

private string? _appendMessagesSql;
public StoreCommand AppendMessages(StoredId storedId, IEnumerable<StoredMessage> messages)
{
// Computes per-message positions server-side so the SQL string is constant regardless of batch size.
// max_pos: base = COALESCE(MAX(position), -1) + 2147483647 + $2 (random) — placed well above any
// existing position and randomized per call so concurrent appenders to the same flow rarely collide.
// unnest($3::bytea[]) WITH ORDINALITY expands the byte[][] parameter into rows with a 1-based offset,
// so each inserted row gets a unique position = base + pos_offset, in caller order.
_appendMessagesSql ??= @$"
WITH max_pos AS (
SELECT COALESCE(MAX(position), -1) + 2147483647 + $2 AS pos
FROM {tablePrefix}_messages
WHERE id = $1
)
INSERT INTO {tablePrefix}_messages (id, position, content)
SELECT $1, (SELECT pos FROM max_pos) + pos_offset, content
FROM unnest($3::bytea[]) WITH ORDINALITY AS t(content, pos_offset);";

var contents = messages
.Select(m => BinaryPacker.Pack(
m.MessageContent,
m.MessageType,
m.IdempotencyKey?.ToUtf8Bytes(),
m.Sender?.ToUtf8Bytes(),
m.Receiver?.ToUtf8Bytes()
))
.ToArray();

return StoreCommand.Create(
_appendMessagesSql,
values: [
storedId.AsGuid,
(long)Random.Shared.Next(),
contents
]
);
}

public StoreCommand AppendMessages(IReadOnlyList<StoredIdAndMessageWithPosition> messages)
{
var sql = @$"
Expand Down