Skip to content

Commit ef053d8

Browse files
authored
Merge pull request #14 from zooper-lib/feature/retries
Retries
2 parents dfaad6c + 18d6355 commit ef053d8

13 files changed

Lines changed: 1192 additions & 19 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- **Concurrency retry on `saveChangesAsync`**: `ContinuumSession.saveChangesAsync` now accepts an optional `maxRetries` parameter (default `0`, fully backward-compatible). When a `ConcurrencyException` is detected and retries remain, the session automatically reloads conflicting streams from the store, reconstructs fresh aggregates, re-applies pending events on top of the latest state, and retries the save. This prevents silent event loss when multiple workflows modify the same aggregate concurrently.
13+
1014
## [4.1.0] - 2026-01-22
1115

1216
### Added

packages/continuum/analysis_options.yaml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ linter:
3535

3636
# **Error Prevention**
3737

38-
# Enforce non-nullable types where possible.
39-
always_require_non_null_named_parameters: true
40-
4138
# **Documentation**
4239
# Require documentation for public members.
4340
public_member_api_docs: false

packages/continuum/lib/src/persistence/session.dart

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,20 @@ abstract interface class ContinuumSession {
4646
/// Uses optimistic concurrency control based on the versions
4747
/// observed when streams were loaded.
4848
///
49-
/// Throws [ConcurrencyException] if a version conflict is detected.
50-
Future<void> saveChangesAsync();
49+
/// When [maxRetries] is greater than zero and a [ConcurrencyException]
50+
/// is detected, the session automatically reloads conflicting streams
51+
/// from the store, reconstructs fresh aggregates, re-applies the
52+
/// pending events on top of the latest state, and retries the save.
53+
/// This is repeated up to [maxRetries] times.
54+
///
55+
/// After a successful retry the in-session aggregate references are
56+
/// replaced with the newly reconstructed instances. Callers holding
57+
/// references obtained before [saveChangesAsync] should re-read the
58+
/// aggregate via [loadAsync] if they need the latest state.
59+
///
60+
/// Throws [ConcurrencyException] if a version conflict is detected
61+
/// and retries are exhausted (or [maxRetries] is zero).
62+
Future<void> saveChangesAsync({int maxRetries = 1});
5163

5264
/// Discards pending events for a specific stream.
5365
///

packages/continuum/lib/src/persistence/session_impl.dart

Lines changed: 159 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import '../events/continuum_event.dart';
2+
import '../exceptions/concurrency_exception.dart';
23
import '../exceptions/invalid_creation_event_exception.dart';
34
import '../exceptions/stream_not_found_exception.dart';
45
import '../exceptions/unsupported_event_exception.dart';
@@ -223,7 +224,33 @@ final class SessionImpl implements ContinuumSession {
223224
}
224225

225226
@override
226-
Future<void> saveChangesAsync() async {
227+
Future<void> saveChangesAsync({int maxRetries = 1}) async {
228+
// Attempt to persist, retrying on concurrency conflicts up to the
229+
// configured limit. Each retry reloads conflicting streams and
230+
// re-applies pending events on top of the latest persisted state.
231+
for (var attempt = 0; attempt <= maxRetries; attempt++) {
232+
try {
233+
await _persistPendingEventsAsync();
234+
return;
235+
} on ConcurrencyException {
236+
final isLastAttempt = attempt == maxRetries;
237+
if (isLastAttempt) {
238+
// All retries exhausted — propagate the conflict to the caller.
239+
rethrow;
240+
}
241+
242+
// Reload every stream that still has pending events so the next
243+
// attempt uses the latest persisted versions.
244+
await _reloadStreamsWithPendingEventsAsync();
245+
}
246+
}
247+
}
248+
249+
/// Core persistence logic extracted so [saveChangesAsync] can retry it.
250+
///
251+
/// Serialises pending events, writes them to the store (atomically when
252+
/// possible), runs inline projections, and updates internal stream state.
253+
Future<void> _persistPendingEventsAsync() async {
227254
final pendingEntries = <MapEntry<StreamId, _StreamState>>[];
228255
for (final entry in _streams.entries) {
229256
if (entry.value.pendingEvents.isNotEmpty) {
@@ -331,6 +358,137 @@ final class SessionImpl implements ContinuumSession {
331358
}
332359
}
333360

361+
/// Reloads all streams that still carry pending events.
362+
///
363+
/// For each such stream the method fetches the latest persisted events,
364+
/// reconstructs a fresh aggregate, re-applies the pending events on top,
365+
/// and replaces the internal [_StreamState] so the next save attempt
366+
/// uses the correct [ExpectedVersion].
367+
///
368+
/// New streams (those created via [startStream] with `loadedVersion == -1`)
369+
/// are skipped because their concurrency conflict is a duplicate-stream
370+
/// error, not a stale-version error, and reloading would not help.
371+
Future<void> _reloadStreamsWithPendingEventsAsync() async {
372+
final streamIdsToReload = <StreamId>[];
373+
374+
for (final entry in _streams.entries) {
375+
final hasPendingEvents = entry.value.pendingEvents.isNotEmpty;
376+
final isExistingStream = entry.value.loadedVersion != -1;
377+
378+
// Only existing streams benefit from a reload. New streams that
379+
// conflict have a duplicate-stream problem, not a stale-version one.
380+
if (hasPendingEvents && isExistingStream) {
381+
streamIdsToReload.add(entry.key);
382+
}
383+
}
384+
385+
for (final streamId in streamIdsToReload) {
386+
final currentState = _streams[streamId]!;
387+
388+
// Snapshot the events we still need to persist.
389+
final pendingEvents = List<ContinuumEvent>.of(currentState.pendingEvents);
390+
391+
// Fetch the latest persisted events from the store.
392+
final storedEvents = await _eventStore.loadStreamAsync(streamId);
393+
394+
if (storedEvents.isEmpty) {
395+
throw StreamNotFoundException(streamId: streamId);
396+
}
397+
398+
// Reconstruct the aggregate from all persisted events (including any
399+
// that were written by competing sessions since our last load).
400+
final freshAggregate = _reconstructAggregateByRuntimeType(
401+
storedEvents,
402+
currentState.aggregateType,
403+
);
404+
final latestVersion = storedEvents.last.version;
405+
406+
// Re-apply our pending events on top of the freshly rebuilt state.
407+
for (final event in pendingEvents) {
408+
_applyEventByRuntimeType(freshAggregate, event, currentState.aggregateType);
409+
}
410+
411+
// Replace the stream state so the next persist attempt carries the
412+
// updated loadedVersion and the correctly-mutated aggregate.
413+
_streams[streamId] = _StreamState(
414+
aggregate: freshAggregate,
415+
aggregateType: currentState.aggregateType,
416+
loadedVersion: latestVersion,
417+
pendingEvents: pendingEvents,
418+
);
419+
}
420+
}
421+
422+
/// Reconstructs an aggregate using its runtime [Type] instead of a
423+
/// generic type parameter.
424+
///
425+
/// This is used during concurrency retries where the concrete generic
426+
/// type is no longer available — only the [Type] captured at load time.
427+
Object _reconstructAggregateByRuntimeType(
428+
List<StoredEvent> events,
429+
Type aggregateType,
430+
) {
431+
// Deserialize the creation event (first event in the stream).
432+
final creationStored = events.first;
433+
final creationEvent = _serializer.deserialize(
434+
eventType: creationStored.eventType,
435+
data: creationStored.data,
436+
storedMetadata: creationStored.metadata,
437+
);
438+
439+
// Look up the factory using the runtime type.
440+
final factory = _aggregateFactories.getFactory<Object>(
441+
aggregateType,
442+
creationEvent.runtimeType,
443+
);
444+
445+
if (factory == null) {
446+
throw InvalidCreationEventException(
447+
eventType: creationEvent.runtimeType,
448+
aggregateType: aggregateType,
449+
);
450+
}
451+
452+
final aggregate = factory(creationEvent);
453+
454+
// Apply remaining mutation events.
455+
for (var i = 1; i < events.length; i++) {
456+
final storedEvent = events[i];
457+
final domainEvent = _serializer.deserialize(
458+
eventType: storedEvent.eventType,
459+
data: storedEvent.data,
460+
storedMetadata: storedEvent.metadata,
461+
);
462+
_applyEventByRuntimeType(aggregate, domainEvent, aggregateType);
463+
}
464+
465+
return aggregate;
466+
}
467+
468+
/// Applies a single event to an aggregate using the runtime [Type].
469+
///
470+
/// Counterpart to [_applyEvent] for use in retry paths where the
471+
/// generic type parameter is unavailable.
472+
void _applyEventByRuntimeType(
473+
Object aggregate,
474+
ContinuumEvent event,
475+
Type aggregateType,
476+
) {
477+
final applier = _eventAppliers.getApplier<Object>(
478+
aggregateType,
479+
event.runtimeType,
480+
);
481+
482+
if (applier == null) {
483+
throw UnsupportedEventException(
484+
eventType: event.runtimeType,
485+
aggregateType: aggregateType,
486+
);
487+
}
488+
489+
applier(aggregate, event);
490+
}
491+
334492
@override
335493
void discardStream(StreamId streamId) {
336494
final state = _streams[streamId];

0 commit comments

Comments
 (0)