Skip to content

Commit eb6bfc7

Browse files
authored
QueueManager: extract fetch-and-deliver pipeline into FetchedMessages (#146)
1 parent da8c67d commit eb6bfc7

2 files changed

Lines changed: 301 additions & 239 deletions

File tree

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Cleipnir.ResilientFunctions.CoreRuntime;
7+
using Cleipnir.ResilientFunctions.CoreRuntime.Serialization;
8+
using Cleipnir.ResilientFunctions.Domain;
9+
using Cleipnir.ResilientFunctions.Messaging;
10+
using Cleipnir.ResilientFunctions.Storage;
11+
12+
namespace Cleipnir.ResilientFunctions.Queuing;
13+
14+
internal class FetchedMessages
15+
{
16+
private const int ReservedIdPrefix = -1;
17+
private static readonly EffectId PendingDeletionsRoot = new([ReservedIdPrefix, 0]);
18+
private static EffectId PendingDeletion(int index) => new([ReservedIdPrefix, 0, index]);
19+
private static readonly EffectId IdempotencyKeysRoot = new([ReservedIdPrefix, -1]);
20+
21+
private readonly FlowId _flowId;
22+
private readonly StoredId _storedId;
23+
private readonly IMessageStore _messageStore;
24+
private readonly ISerializer _serializer;
25+
private readonly Effect _effect;
26+
private readonly FlowState _flowState;
27+
private readonly UnhandledExceptionHandler _unhandledExceptionHandler;
28+
private readonly FlowTimeouts _timeouts;
29+
private readonly SettingsWithDefaults _settings;
30+
private readonly IdempotencyKeys _idempotencyKeys;
31+
32+
private readonly SemaphoreSlim _semaphore = new(1);
33+
private readonly Lock _lock = new();
34+
private readonly List<MessageData> _toDeliver = new();
35+
private readonly HashSet<long> _fetchedPositions = new();
36+
private readonly List<Subscription> _subscriptions = new();
37+
private int _nextToRemoveIndex;
38+
private volatile Exception? _thrownException;
39+
40+
public Exception? ThrownException => _thrownException;
41+
42+
public FetchedMessages(
43+
FlowId flowId,
44+
StoredId storedId,
45+
IMessageStore messageStore,
46+
ISerializer serializer,
47+
Effect effect,
48+
FlowState flowState,
49+
UnhandledExceptionHandler unhandledExceptionHandler,
50+
FlowTimeouts timeouts,
51+
UtcNow utcNow,
52+
SettingsWithDefaults settings,
53+
int maxIdempotencyKeyCount,
54+
TimeSpan? maxIdempotencyKeyTtl)
55+
{
56+
_flowId = flowId;
57+
_storedId = storedId;
58+
_messageStore = messageStore;
59+
_serializer = serializer;
60+
_effect = effect;
61+
_flowState = flowState;
62+
_unhandledExceptionHandler = unhandledExceptionHandler;
63+
_timeouts = timeouts;
64+
_settings = settings;
65+
_idempotencyKeys = new IdempotencyKeys(IdempotencyKeysRoot, _effect, maxIdempotencyKeyCount, maxIdempotencyKeyTtl, utcNow);
66+
}
67+
68+
public async Task Initialize()
69+
{
70+
_idempotencyKeys.Initialize();
71+
72+
_nextToRemoveIndex = await _effect.CreateOrGet(PendingDeletionsRoot, 0, alias: null, flush: false);
73+
var children = _effect.GetChildren(PendingDeletionsRoot);
74+
var positions = new List<long>();
75+
foreach (var childId in children)
76+
{
77+
var position = _effect.Get<long>(childId);
78+
positions.Add(position);
79+
}
80+
81+
if (positions.Any())
82+
{
83+
await _messageStore.DeleteMessages(_storedId, positions);
84+
foreach (var childId in children)
85+
await _effect.Clear(childId, flush: false);
86+
}
87+
}
88+
89+
public async Task FetchOnce()
90+
{
91+
await _semaphore.WaitAsync();
92+
try
93+
{
94+
if (_thrownException != null)
95+
return;
96+
97+
List<long> skipPositions;
98+
lock (_lock)
99+
skipPositions = _fetchedPositions.ToList();
100+
101+
var messages = await _messageStore.GetMessages(_storedId, skipPositions);
102+
foreach (var (messageContent, messageType, position, idempotencyKey, sender, receiver) in messages)
103+
{
104+
try
105+
{
106+
var msg = _serializer.Deserialize(messageContent, _serializer.ResolveType(messageType)!);
107+
108+
if (idempotencyKey != null && !_idempotencyKeys.Add(idempotencyKey, position))
109+
{
110+
await _messageStore.DeleteMessages(_storedId, [position]);
111+
continue;
112+
}
113+
114+
var envelope = new Envelope(msg, receiver, sender);
115+
var messageData = new MessageData(
116+
envelope,
117+
position,
118+
messageContent,
119+
messageType,
120+
receiver,
121+
sender
122+
);
123+
lock (_lock)
124+
{
125+
_toDeliver.Add(messageData);
126+
_fetchedPositions.Add(position);
127+
}
128+
}
129+
catch (Exception e)
130+
{
131+
_unhandledExceptionHandler.Invoke(_flowId.Type, e);
132+
_thrownException = e;
133+
FailAllSubscriptions(e);
134+
return;
135+
}
136+
}
137+
}
138+
finally
139+
{
140+
TryDispatch();
141+
_semaphore.Release();
142+
}
143+
}
144+
145+
public async Task<MatchResult?> WaitForMessageOrTimeout(EffectId timeoutId, MessagePredicate predicate, DateTime? timeout)
146+
{
147+
var subscription = new Subscription(predicate, timeout);
148+
149+
lock (_lock)
150+
_subscriptions.Add(subscription);
151+
152+
TryDispatch();
153+
154+
var waitTask = timeout != null
155+
? _timeouts.AddTimeout(timeoutId, timeout.Value)
156+
: Task.Delay(_settings.MessagesDefaultMaxWaitForCompletion);
157+
158+
_flowState.SubflowWaiting();
159+
await Task.WhenAny(subscription.Tcs.Task, waitTask);
160+
var success = _flowState.ResumeSubflow();
161+
if (!success)
162+
await new TaskCompletionSource().Task;
163+
164+
lock (_lock)
165+
{
166+
var stillRegistered = _subscriptions.Remove(subscription);
167+
if (stillRegistered)
168+
subscription.Tcs.TrySetResult(null);
169+
}
170+
171+
var result = await subscription.Tcs.Task;
172+
if (result != null)
173+
_timeouts.RemoveTimeout(timeoutId);
174+
175+
return result;
176+
}
177+
178+
public async Task AfterFlush()
179+
{
180+
await _semaphore.WaitAsync();
181+
try
182+
{
183+
var children = _effect.GetChildren(PendingDeletionsRoot);
184+
var nonDirtyChildren = new List<EffectId>();
185+
foreach (var childId in children)
186+
if (!_effect.IsDirty(childId))
187+
nonDirtyChildren.Add(childId);
188+
189+
if (nonDirtyChildren.Any())
190+
{
191+
var positions = new List<long>();
192+
foreach (var nonDirtyChild in nonDirtyChildren)
193+
{
194+
var position = _effect.Get<long>(nonDirtyChild);
195+
positions.Add(position);
196+
}
197+
198+
await _messageStore.DeleteMessages(_storedId, positions);
199+
foreach (var nonDirtyChild in nonDirtyChildren)
200+
await _effect.Clear(nonDirtyChild, flush: false);
201+
202+
lock (_lock)
203+
foreach (var position in positions)
204+
_fetchedPositions.Remove(position);
205+
}
206+
}
207+
catch (Exception exception)
208+
{
209+
_unhandledExceptionHandler.Invoke(_flowId.Type, exception);
210+
}
211+
finally
212+
{
213+
_semaphore.Release();
214+
}
215+
}
216+
217+
private void TryDispatch()
218+
{
219+
lock (_lock)
220+
for (var subscriptionIndex = 0; subscriptionIndex < _subscriptions.Count; subscriptionIndex++)
221+
{
222+
var subscription = _subscriptions[subscriptionIndex];
223+
for (var matchIndex = 0; matchIndex < _toDeliver.Count; matchIndex++)
224+
if (subscription.Predicate(_toDeliver[matchIndex].Envelope))
225+
{
226+
var msg = _toDeliver[matchIndex];
227+
_toDeliver.RemoveAt(matchIndex);
228+
var positionToRemoveIndex = _nextToRemoveIndex++;
229+
var toRemoveId = PendingDeletion(positionToRemoveIndex);
230+
_effect.FlushlessUpsert(PendingDeletionsRoot, _nextToRemoveIndex, alias: null);
231+
_subscriptions.RemoveAt(subscriptionIndex);
232+
subscription.Tcs.TrySetResult(new MatchResult(msg, toRemoveId));
233+
TryDispatch();
234+
return;
235+
}
236+
}
237+
}
238+
239+
private void FailAllSubscriptions(Exception exception)
240+
{
241+
lock (_lock)
242+
{
243+
foreach (var sub in _subscriptions)
244+
sub.Tcs.TrySetException(exception);
245+
_subscriptions.Clear();
246+
}
247+
}
248+
249+
public record MessageData(
250+
Envelope Envelope,
251+
long Position,
252+
byte[] MessageContentBytes,
253+
byte[] MessageTypeBytes,
254+
string? Receiver,
255+
string? Sender
256+
);
257+
258+
public record MatchResult(MessageData Message, EffectId ToRemoveId);
259+
260+
private record Subscription(MessagePredicate Predicate, DateTime? Timeout)
261+
{
262+
public TaskCompletionSource<MatchResult?> Tcs { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously);
263+
}
264+
}

0 commit comments

Comments
 (0)