Enhancement Description
MessageStream<M extends Message> constrains its element type to subtypes of Message. This is an artificial restriction — the stream's entire implementation (filtering, mapping, concatenating, reducing) is generic and makes no use of any Message-specific method on M. The bound was declared but was never earned.
Removing the bound — making it MessageStream<M> — turns the abstraction into a true general-purpose async sequential stream, comparable to java.util.stream.Stream<T> or Project Reactor's Flux<T>. Existing code is unaffected: every use of MessageStream<EventMessage> or any other Message subtype remains valid once the upper bound is dropped. Removing an upper bound is source-compatible.
The only rough edge is Entry.message(), whose name implies M is a Message. This can be resolved by introducing Entry.value() as the canonical name and deprecating message() for a release cycle, giving callers time to migrate.
Current Behaviour
MessageStream, MessageStream.Entry, MessageStream.Single, and MessageStream.Empty all declare M extends Message. Every derived class in the implementation follows suit. As a result, any mapped or reduced type must itself be a Message subtype:
// map() forces the return type to be a Message subtype
default <RM extends Message> MessageStream<RM> map(Function<Entry<M>, Entry<RM>> mapper)
This forces callers that want to work with non-Message types to work around the constraint.
Wanted Behaviour
MessageStream<M> with no upper bound. The map(), filter(), reduce(), and concatWith() operations become fully general. Callers can express the semantic structure of a stream using ordinary Java types.
Example 1 — Event sourcing with a sealed message hierarchy
Today: the framework distinguishes snapshot entries from regular event entries by making SnapshotEventMessage a subtype of EventMessage and pattern-matching on the message type inside reduce():
BiFunction<E, MessageStream.Entry<? extends EventMessage>, E> accumulator =
(entity, entry) -> switch (entry.message()) {
case SnapshotEventMessage sem -> restoreFromSnapshot(sem.payload());
case EventMessage em -> evolver.evolve(entity, em);
};
With this change: define a sealed type that captures the full set of possible stream elements. The stream can be typed directly, and reduce() processes it with a complete, compiler-verified switch:
sealed interface SourcingMessage {
record Snapshot(SnapshotPayload snapshot) implements SourcingMessage {}
record Event(EventMessage event) implements SourcingMessage {}
record Marker(ConsistencyMarker marker) implements SourcingMessage {}
}
MessageStream<SourcingMessage> stream = ...;
stream.reduce(null, (entity, entry) -> switch (entry.value()) {
case SourcingMessage.Snapshot(var s) -> s.snapshot().entity();
case SourcingMessage.Event(var e) -> evolver.evolve(entity, e);
case SourcingMessage.Marker(var m) -> { recordMarker(m); yield entity; }
});
The compiler enforces exhaustiveness. Adding a new SourcingMessage variant produces a compile error at every switch site, not a silent runtime miss.
Example 2 — ConsistencyMarker via Context vs. a typed stream
Today: ConsistencyMarker cannot be an element of the event stream, so it is communicated as a side-channel resource attached to Entry's Context:
// Reading side — callers must know to look in the context
stream.filter(entry -> entry.getResource(ConsistencyMarker.RESOURCE_KEY) == null)
.reduce(...);
With this change: ConsistencyMarker can be a proper element in the stream alongside events. The filter above becomes unnecessary; the switch in reduce() handles it explicitly and exhaustively.
Example 3 — User-space: mapping events to domain objects
Today: a user who wants to map a stream of EventMessage to their own domain type must either drain the stream into a List and switch to a Java Stream, or pull in Project Reactor:
// Option A — drain to list, lose async processing
List<OrderEvent> events = stream
.reduce(new ArrayList<>(), (list, entry) -> { list.add(parse(entry.message())); return list; })
.join();
events.stream().map(this::handle)...;
// Option B — convert to Flux (requires Project Reactor dependency)
FluxUtils.toFlux(stream)
.map(entry -> parse(entry.message()))
.subscribe(...);
With this change: the mapping stays inside MessageStream and remains lazy and asynchronous:
MessageStream<OrderEvent> domainStream = rawStream.map(entry -> entry.map(this::parse));
domainStream.reduce(initialState, (state, entry) -> handler.handle(state, entry.value()));
No extra dependency. No eager materialisation. The stream's async, sequential-processing guarantees are preserved throughout.
Example 4 — User-space: accumulating into a custom result type
Today: the result type of reduce() is unconstrained, but building up to it requires going through Entry<M extends Message> the whole way:
record Summary(int count, Money total) {}
// Works, but entry.message() is always accessed through the Message API
stream.reduce(new Summary(0, Money.ZERO), (summary, entry) -> {
OrderPlaced event = (OrderPlaced) entry.message().payload(); // unchecked cast needed
return new Summary(summary.count() + 1, summary.total().add(event.amount()));
});
With this change: map once to a typed value, then reduce cleanly without casts:
MessageStream<OrderPlaced> typed = rawStream.mapMessage(msg -> (OrderPlaced) msg.payload());
typed.reduce(new Summary(0, Money.ZERO), (summary, entry) -> {
OrderPlaced event = entry.value(); // no cast
return new Summary(summary.count() + 1, summary.total().add(event.amount()));
});
Possible Workarounds
Users currently resort to one or more of the following:
- Abuse
Context — attach non-Message data as a typed resource on Entry's Context and read it back at the consumer. Used internally in the framework for ConsistencyMarker.
- Create artificial
Message subtypes — introduce a marker interface or class hierarchy purely to satisfy the extends Message bound, with no domain meaning.
- Drain to a
List — materialise the stream eagerly into a collection, then process with a standard Java Stream. Loses all async and lazy-evaluation benefits.
- Convert to
Flux — use FluxUtils to bridge to Project Reactor. Introduces a mandatory dependency on Reactor for what should be a plain Java operation.
Enhancement Description
MessageStream<M extends Message>constrains its element type to subtypes ofMessage. This is an artificial restriction — the stream's entire implementation (filtering, mapping, concatenating, reducing) is generic and makes no use of anyMessage-specific method onM. The bound was declared but was never earned.Removing the bound — making it
MessageStream<M>— turns the abstraction into a true general-purpose async sequential stream, comparable tojava.util.stream.Stream<T>or Project Reactor'sFlux<T>. Existing code is unaffected: every use ofMessageStream<EventMessage>or any otherMessagesubtype remains valid once the upper bound is dropped. Removing an upper bound is source-compatible.The only rough edge is
Entry.message(), whose name impliesMis aMessage. This can be resolved by introducingEntry.value()as the canonical name and deprecatingmessage()for a release cycle, giving callers time to migrate.Current Behaviour
MessageStream,MessageStream.Entry,MessageStream.Single, andMessageStream.Emptyall declareM extends Message. Every derived class in the implementation follows suit. As a result, any mapped or reduced type must itself be aMessagesubtype:This forces callers that want to work with non-
Messagetypes to work around the constraint.Wanted Behaviour
MessageStream<M>with no upper bound. Themap(),filter(),reduce(), andconcatWith()operations become fully general. Callers can express the semantic structure of a stream using ordinary Java types.Example 1 — Event sourcing with a sealed message hierarchy
Today: the framework distinguishes snapshot entries from regular event entries by making
SnapshotEventMessagea subtype ofEventMessageand pattern-matching on the message type insidereduce():With this change: define a sealed type that captures the full set of possible stream elements. The stream can be typed directly, and
reduce()processes it with a complete, compiler-verified switch:The compiler enforces exhaustiveness. Adding a new
SourcingMessagevariant produces a compile error at every switch site, not a silent runtime miss.Example 2 —
ConsistencyMarkerviaContextvs. a typed streamToday:
ConsistencyMarkercannot be an element of the event stream, so it is communicated as a side-channel resource attached toEntry'sContext:With this change:
ConsistencyMarkercan be a proper element in the stream alongside events. The filter above becomes unnecessary; the switch inreduce()handles it explicitly and exhaustively.Example 3 — User-space: mapping events to domain objects
Today: a user who wants to map a stream of
EventMessageto their own domain type must either drain the stream into aListand switch to a JavaStream, or pull in Project Reactor:With this change: the mapping stays inside
MessageStreamand remains lazy and asynchronous:No extra dependency. No eager materialisation. The stream's async, sequential-processing guarantees are preserved throughout.
Example 4 — User-space: accumulating into a custom result type
Today: the result type of
reduce()is unconstrained, but building up to it requires going throughEntry<M extends Message>the whole way:With this change: map once to a typed value, then reduce cleanly without casts:
Possible Workarounds
Users currently resort to one or more of the following:
Context— attach non-Messagedata as a typed resource onEntry'sContextand read it back at the consumer. Used internally in the framework forConsistencyMarker.Messagesubtypes — introduce a marker interface or class hierarchy purely to satisfy theextends Messagebound, with no domain meaning.List— materialise the stream eagerly into a collection, then process with a standard JavaStream. Loses all async and lazy-evaluation benefits.Flux— useFluxUtilsto bridge to Project Reactor. Introduces a mandatory dependency on Reactor for what should be a plain Java operation.