Skip to content

MongoTrackingToken.lowerBound() incompatible with ReplayToken.wasProcessedBeforeReset() — replays broken #498

@martijnvanderwoud

Description

@martijnvanderwoud

Summary

After upgrading from Axon Framework 4.7.3 + axon-mongo 4.7.0 to Axon Framework 4.13.0 + axon-mongo 4.12.0, event processor replays are completely broken. ReplayToken.wasProcessedBeforeReset() returns false for events dated years before the reset token, causing ReplayToken.advancedTo() to incorrectly treat replayed events as "new events not seen before" instead of "events still well behind the reset position."

This is related to #9, which reported that MongoTrackingToken's trackedEvents grow unboundedly during replay. The current issue has a much more severe consequence: replays don't work at all, because the replay/non-replay classification is wrong for virtually every replayed event.

Root cause

ReplayToken.wasProcessedBeforeReset() was introduced in Axon Framework as a refactoring of the replay detection logic. It relies on lowerBound() + samePositionAs():

private static boolean wasProcessedBeforeReset(TrackingToken tokenAtReset, TrackingToken newToken) {
    TrackingToken resetLowerBound = WrappedToken.unwrapLowerBound(tokenAtReset);
    TrackingToken newTokenLowerBound = WrappedToken.unwrapLowerBound(newToken);
    TrackingToken resetTokenLowerNewToken = resetLowerBound.lowerBound(newTokenLowerBound);
    return resetTokenLowerNewToken.samePositionAs(newTokenLowerBound);
}

This works for GapAwareTrackingToken because its lowerBound() produces a token with a meaningful index, and its samePositionAs() compares indices.

MongoTrackingToken.lowerBound() computes the intersection of tracked event IDs:

public TrackingToken lowerBound(TrackingToken other) {
    MongoTrackingToken otherToken = (MongoTrackingToken) other;
    Map<String, Long> intersection = new HashMap<>(this.trackedEvents);
    trackedEvents.keySet().forEach(k -> {
        if (!otherToken.trackedEvents.containsKey(k)) {
            intersection.remove(k);
        }
    });
    return new MongoTrackingToken(min(timestamp, otherToken.timestamp), intersection);
}

During replay, the token at reset (recent) and the replayed token (old) have completely disjoint event IDs — they are from different time periods and the old events have long been trimmed from the reset token's lookback window. The intersection is empty.

The resulting degenerate MongoTrackingToken{timestamp=<min>, trackedEvents={}} then fails the default samePositionAs() check (which uses covers(other) && other.covers(this)), because covers() cannot verify the other token's events against an empty tracked events set — the fallback oldest-event threshold defaults to 0L.

Observed values during debug session

Variable Value
tokenAtReset MongoTrackingToken{timestamp=1776948514695, trackedEvents={a812a642-…=1776948514695}}
newToken MongoTrackingToken{timestamp=1524402340572, trackedEvents={ef664838-…=1524402340176, 78bae6b9-…=1524402340572}}
resetLowerBound same as tokenAtReset
newTokenLowerBound same as newToken
resetTokenLowerNewToken MongoTrackingToken{timestamp=1524402340572, trackedEvents={}} ← empty!
Result false (should be true — newToken is ~8 years before tokenAtReset)

Versions

  • Before (working): Axon Framework 4.7.3, axon-mongo 4.7.0
  • After (broken): Axon Framework 4.13.0, axon-mongo 4.12.0

Suggested fix

MongoTrackingToken.lowerBound() should use the same "older than the oldest tracked event means trimmed from the lookback window, therefore definitely processed" reasoning that covers() already uses. Instead of a pure event ID intersection, include events from either token that the other token has necessarily processed — either because the event ID is in both tokens, or because the event predates the other token's lookback window:

@Override
public TrackingToken lowerBound(TrackingToken other) {
    Assert.isTrue(other instanceof MongoTrackingToken, () -> "Incompatible token type provided.");
    MongoTrackingToken otherToken = (MongoTrackingToken) other;

    long minTimestamp = min(timestamp, otherToken.timestamp);

    long thisOldest = this.trackedEvents.values().stream()
            .min(Comparator.naturalOrder()).orElse(minTimestamp);
    long otherOldest = otherToken.trackedEvents.values().stream()
            .min(Comparator.naturalOrder()).orElse(minTimestamp);

    Map<String, Long> events = new HashMap<>();

    this.trackedEvents.forEach((k, v) -> {
        if (otherToken.trackedEvents.containsKey(k) || v < otherOldest) {
            events.put(k, v);
        }
    });
    otherToken.trackedEvents.forEach((k, v) -> {
        if (this.trackedEvents.containsKey(k) || v < thisOldest) {
            events.putIfAbsent(k, v);
        }
    });

    return new MongoTrackingToken(minTimestamp, events);
}

This preserves the out-of-order delivery guarantee: events within the lookback window that only one token has seen are still excluded from the lower bound, so samePositionAs correctly returns false for genuinely unprocessed events near the boundary.

Metadata

Metadata

Assignees

No one assigned

    Type

    No fields configured for Bug.

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions