Skip to content

Remove Message upper bound from MessageStream to support arbitrary element types #4539

@hjohn

Description

@hjohn

Enhancement Description

MessageStream<M extends Message> constrains its element type to subtypes of Message. This is an artificial restriction — the stream's entire implementation (filtering, mapping, concatenating, reducing) is generic and makes no use of any Message-specific method on M. The bound was declared but was never earned.

Removing the bound — making it MessageStream<M> — turns the abstraction into a true general-purpose async sequential stream, comparable to java.util.stream.Stream<T> or Project Reactor's Flux<T>. Existing code is unaffected: every use of MessageStream<EventMessage> or any other Message subtype remains valid once the upper bound is dropped. Removing an upper bound is source-compatible.

The only rough edge is Entry.message(), whose name implies M is a Message. This can be resolved by introducing Entry.value() as the canonical name and deprecating message() for a release cycle, giving callers time to migrate.

Current Behaviour

MessageStream, MessageStream.Entry, MessageStream.Single, and MessageStream.Empty all declare M extends Message. Every derived class in the implementation follows suit. As a result, any mapped or reduced type must itself be a Message subtype:

// map() forces the return type to be a Message subtype
default <RM extends Message> MessageStream<RM> map(Function<Entry<M>, Entry<RM>> mapper)

This forces callers that want to work with non-Message types to work around the constraint.

Wanted Behaviour

MessageStream<M> with no upper bound. The map(), filter(), reduce(), and concatWith() operations become fully general. Callers can express the semantic structure of a stream using ordinary Java types.


Example 1 — Event sourcing with a sealed message hierarchy

Today: the framework distinguishes snapshot entries from regular event entries by making SnapshotEventMessage a subtype of EventMessage and pattern-matching on the message type inside reduce():

BiFunction<E, MessageStream.Entry<? extends EventMessage>, E> accumulator =
    (entity, entry) -> switch (entry.message()) {
        case SnapshotEventMessage sem -> restoreFromSnapshot(sem.payload());
        case EventMessage em -> evolver.evolve(entity, em);
    };

With this change: define a sealed type that captures the full set of possible stream elements. The stream can be typed directly, and reduce() processes it with a complete, compiler-verified switch:

sealed interface SourcingMessage {
    record Snapshot(SnapshotPayload snapshot) implements SourcingMessage {}
    record Event(EventMessage event)          implements SourcingMessage {}
    record Marker(ConsistencyMarker marker)  implements SourcingMessage {}
}

MessageStream<SourcingMessage> stream = ...;

stream.reduce(null, (entity, entry) -> switch (entry.value()) {
    case SourcingMessage.Snapshot(var s) -> s.snapshot().entity();
    case SourcingMessage.Event(var e)    -> evolver.evolve(entity, e);
    case SourcingMessage.Marker(var m)   -> { recordMarker(m); yield entity; }
});

The compiler enforces exhaustiveness. Adding a new SourcingMessage variant produces a compile error at every switch site, not a silent runtime miss.


Example 2 — ConsistencyMarker via Context vs. a typed stream

Today: ConsistencyMarker cannot be an element of the event stream, so it is communicated as a side-channel resource attached to Entry's Context:

// Reading side — callers must know to look in the context
stream.filter(entry -> entry.getResource(ConsistencyMarker.RESOURCE_KEY) == null)
      .reduce(...);

With this change: ConsistencyMarker can be a proper element in the stream alongside events. The filter above becomes unnecessary; the switch in reduce() handles it explicitly and exhaustively.


Example 3 — User-space: mapping events to domain objects

Today: a user who wants to map a stream of EventMessage to their own domain type must either drain the stream into a List and switch to a Java Stream, or pull in Project Reactor:

// Option A — drain to list, lose async processing
List<OrderEvent> events = stream
    .reduce(new ArrayList<>(), (list, entry) -> { list.add(parse(entry.message())); return list; })
    .join();
events.stream().map(this::handle)...;

// Option B — convert to Flux (requires Project Reactor dependency)
FluxUtils.toFlux(stream)
         .map(entry -> parse(entry.message()))
         .subscribe(...);

With this change: the mapping stays inside MessageStream and remains lazy and asynchronous:

MessageStream<OrderEvent> domainStream = rawStream.map(entry -> entry.map(this::parse));

domainStream.reduce(initialState, (state, entry) -> handler.handle(state, entry.value()));

No extra dependency. No eager materialisation. The stream's async, sequential-processing guarantees are preserved throughout.


Example 4 — User-space: accumulating into a custom result type

Today: the result type of reduce() is unconstrained, but building up to it requires going through Entry<M extends Message> the whole way:

record Summary(int count, Money total) {}

// Works, but entry.message() is always accessed through the Message API
stream.reduce(new Summary(0, Money.ZERO), (summary, entry) -> {
    OrderPlaced event = (OrderPlaced) entry.message().payload(); // unchecked cast needed
    return new Summary(summary.count() + 1, summary.total().add(event.amount()));
});

With this change: map once to a typed value, then reduce cleanly without casts:

MessageStream<OrderPlaced> typed = rawStream.mapMessage(msg -> (OrderPlaced) msg.payload());

typed.reduce(new Summary(0, Money.ZERO), (summary, entry) -> {
    OrderPlaced event = entry.value(); // no cast
    return new Summary(summary.count() + 1, summary.total().add(event.amount()));
});

Possible Workarounds

Users currently resort to one or more of the following:

  • Abuse Context — attach non-Message data as a typed resource on Entry's Context and read it back at the consumer. Used internally in the framework for ConsistencyMarker.
  • Create artificial Message subtypes — introduce a marker interface or class hierarchy purely to satisfy the extends Message bound, with no domain meaning.
  • Drain to a List — materialise the stream eagerly into a collection, then process with a standard Java Stream. Loses all async and lazy-evaluation benefits.
  • Convert to Flux — use FluxUtils to bridge to Project Reactor. Introduces a mandatory dependency on Reactor for what should be a plain Java operation.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels
    No fields configured for Enhancement.

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions