@@ -53,9 +53,9 @@ public abstract class AbstractGroupNode<InTuple_ extends Tuple, OutTuple_ extend
5353 private final boolean useAssertingGroupKey ;
5454
5555 protected AbstractGroupNode (int groupStoreIndex , Function <InTuple_ , GroupKey_ > groupKeyFunction ,
56- Supplier <ResultContainer_ > supplier ,
57- Function <ResultContainer_ , Result_ > finisher ,
58- TupleLifecycle <OutTuple_ > nextNodesTupleLifecycle , EnvironmentMode environmentMode ) {
56+ Supplier <ResultContainer_ > supplier ,
57+ Function <ResultContainer_ , Result_ > finisher ,
58+ TupleLifecycle <OutTuple_ > nextNodesTupleLifecycle , EnvironmentMode environmentMode ) {
5959 this .groupStoreIndex = groupStoreIndex ;
6060 this .groupKeyFunction = groupKeyFunction ;
6161 this .supplier = supplier ;
@@ -80,8 +80,8 @@ protected AbstractGroupNode(int groupStoreIndex, Function<InTuple_, GroupKey_> g
8080 }
8181
8282 protected AbstractGroupNode (int groupStoreIndex ,
83- Function <InTuple_ , GroupKey_ > groupKeyFunction , TupleLifecycle <OutTuple_ > nextNodesTupleLifecycle ,
84- EnvironmentMode environmentMode ) {
83+ Function <InTuple_ , GroupKey_ > groupKeyFunction , TupleLifecycle <OutTuple_ > nextNodesTupleLifecycle ,
84+ EnvironmentMode environmentMode ) {
8585 this (groupStoreIndex ,
8686 groupKeyFunction , null , null , nextNodesTupleLifecycle ,
8787 environmentMode );
@@ -95,33 +95,35 @@ public StreamKind getStreamKind() {
9595 @ Override
9696 public final void insert (InTuple_ tuple ) {
9797 if (tuple .getStore (groupStoreIndex ) != null ) {
98- throw new IllegalStateException ("Impossible state: the input for the tuple (" + tuple
99- + ") was already added in the tupleStore." );
98+ throw new IllegalStateException (
99+ "Impossible state: the input for the tuple (%s) was already added in the tupleStore."
100+ .formatted (tuple ));
100101 }
101102 var userSuppliedKey = hasMultipleGroups ? groupKeyFunction .apply (tuple ) : null ;
102103 createTuple (tuple , userSuppliedKey );
103104 }
104105
105106 private void createTuple (InTuple_ tuple , GroupKey_ userSuppliedKey ) {
106- var newGroup = getOrCreateGroup (userSuppliedKey );
107- var outTuple = accumulate (tuple , newGroup );
107+ var group = getOrCreateGroup (userSuppliedKey );
108+ var needsPropagation = group .parentCount == 1 ; // Fresh outTuple.
109+ if (hasCollector ) {
110+ needsPropagation = groupInsert (group .getResultContainer (), tuple ) || needsPropagation ;
111+ }
112+ tuple .setStore (groupStoreIndex , group );
113+ if (!needsPropagation ) {
114+ return ;
115+ }
116+ var outTuple = group .getTuple ();
108117 switch (outTuple .getState ()) {
109118 case CREATING , UPDATING -> {
110119 // Already in the correct state.
111120 }
112- case OK , DYING -> propagationQueue .update (newGroup );
113- case ABORTING -> propagationQueue .insert (newGroup );
114- default -> throw new IllegalStateException ("Impossible state: The group (" + newGroup + ") in node (" + this
115- + ") is in an unexpected state (" + outTuple .getState () + ")." );
116- }
117- }
118-
119- private OutTuple_ accumulate (InTuple_ tuple , Group <OutTuple_ , ResultContainer_ > group ) {
120- if (hasCollector ) {
121- groupInsert (group .getResultContainer (), tuple );
121+ case OK , DYING -> propagationQueue .update (group );
122+ case ABORTING -> propagationQueue .insert (group );
123+ default -> throw new IllegalStateException (
124+ "Impossible state: The group (%s) in node (%s) is in an unexpected state (%s)."
125+ .formatted (group , this , outTuple .getState ()));
122126 }
123- tuple .setStore (groupStoreIndex , group );
124- return group .getTuple ();
125127 }
126128
127129 private Group <OutTuple_ , ResultContainer_ > getOrCreateGroup (GroupKey_ userSuppliedKey ) {
@@ -150,16 +152,17 @@ private Group<OutTuple_, ResultContainer_> createGroupWithGroupKey(Object groupM
150152 var userSuppliedKey = extractUserSuppliedKey (groupMapKey );
151153 var outTuple = createOutTuple (userSuppliedKey );
152154 var group = hasCollector ? Group .create (groupMapKey , supplier .get (), outTuple )
153- : Group .<OutTuple_ , ResultContainer_ > createWithoutAccumulate (groupMapKey , outTuple );
155+ : Group .<OutTuple_ , ResultContainer_ >createWithoutAccumulate (groupMapKey , outTuple );
154156 propagationQueue .insert (group );
155157 return group ;
156158 }
157159
158160 private Group <OutTuple_ , ResultContainer_ > createGroupWithoutGroupKey () {
159161 var outTuple = createOutTuple (null );
160162 if (!hasCollector ) {
161- throw new IllegalStateException ("Impossible state: The node (" + this + ") has no collector, "
162- + "but it is still trying to create a group without a group key." );
163+ throw new IllegalStateException (
164+ "Impossible state: The node (%s) has no collector, but it is still trying to create a group without a group key."
165+ .formatted (this ));
163166 }
164167 var group = Group .createWithoutGroupKey (supplier .get (), outTuple );
165168 propagationQueue .insert (group );
@@ -188,10 +191,12 @@ public final void update(InTuple_ tuple) {
188191 if (Objects .equals (oldUserSuppliedGroupKey , newUserSuppliedGroupKey )) {
189192 updateGroup (tuple , oldGroup );
190193 } else {
194+ var needsPropagation = false ;
191195 if (hasCollector ) {
192- groupRetract (oldGroup . getResultContainer (), tuple );
196+ needsPropagation = groupRetract (tuple );
193197 }
194- killTuple (oldGroup );
198+ var newParentCount = --oldGroup .parentCount ;
199+ killOutTuple (oldGroup , newParentCount == 0 , needsPropagation );
195200 createTuple (tuple , newUserSuppliedGroupKey );
196201 }
197202 }
@@ -204,28 +209,38 @@ private void updateGroup(InTuple_ tuple, Group<OutTuple_, ResultContainer_> oldG
204209 return ;
205210 }
206211 }
212+ // TODO if it has no collectors, maybe skip the propagation?
207213 var outTuple = oldGroup .getTuple ();
208214 switch (outTuple .getState ()) {
209215 case CREATING , UPDATING -> {
210216 // Already in the correct state.
211217 }
212218 case OK -> propagationQueue .update (oldGroup );
213- default ->
214- throw new IllegalStateException ( "Impossible state: The group (%s) in node (%s) is in an unexpected state (%s)."
215- .formatted (oldGroup , this , outTuple .getState ()));
219+ default -> throw new IllegalStateException (
220+ "Impossible state: The group (%s) in node (%s) is in an unexpected state (%s)."
221+ .formatted (oldGroup , this , outTuple .getState ()));
216222 }
217223 }
218224
219- private void killTuple (Group <OutTuple_ , ResultContainer_ > group ) {
220- var newParentCount = --group .parentCount ;
221- var killGroup = (newParentCount == 0 );
225+ /**
226+ *
227+ * @param group the group which created the outTuple
228+ * @param killGroup true if the group should be removed from downstream nodes
229+ * @param propagateUpdate may be true while killGroup is also true;
230+ * propagating updates is irrelevant if the group is already being killed
231+ */
232+ private void killOutTuple (Group <OutTuple_ , ResultContainer_ > group , boolean killGroup , boolean propagateUpdate ) {
233+ if (!killGroup && !propagateUpdate ) { // Nothing has changed.
234+ return ;
235+ }
222236 if (killGroup ) {
223237 var groupKey = hasMultipleGroups ? group .getGroupKey () : null ;
224238 var oldGroup = removeGroup (groupKey );
225239 if (oldGroup == null ) {
226- throw new IllegalStateException ("Impossible state: the group for the groupKey ("
227- + groupKey + ") doesn't exist in the groupMap.\n " +
228- "Maybe groupKey hashcode changed while it shouldn't have?" );
240+ throw new IllegalStateException ("""
241+ Impossible state: the group for the groupKey (%s) doesn't exist in the groupMap.
242+ Maybe groupKey hashcode changed while it shouldn't have?"""
243+ .formatted (groupKey ));
229244 }
230245 }
231246 var outTuple = group .getTuple ();
@@ -247,8 +262,9 @@ private void killTuple(Group<OutTuple_, ResultContainer_> group) {
247262 propagationQueue .update (group );
248263 }
249264 }
250- default -> throw new IllegalStateException ("Impossible state: The group (" + group + ") in node (" + this
251- + ") is in an unexpected state (" + outTuple .getState () + ")." );
265+ default -> throw new IllegalStateException (
266+ "Impossible state: The group (%s) in node (%s) is in an unexpected state (%s)."
267+ .formatted (group , this , outTuple .getState ()));
252268 }
253269 }
254270
@@ -269,21 +285,23 @@ public final void retract(InTuple_ tuple) {
269285 // No fail fast if null because we don't track which tuples made it through the filter predicate(s)
270286 return ;
271287 }
288+ var needsPropagation = false ;
272289 if (hasCollector ) {
273- groupRetract (group . getResultContainer (), tuple );
290+ needsPropagation = groupRetract (tuple );
274291 }
275- killTuple (group );
292+ var newParentCount = --group .parentCount ;
293+ killOutTuple (group , newParentCount == 0 , needsPropagation );
276294 }
277295
278- protected abstract void groupInsert (ResultContainer_ resultContainer , InTuple_ tuple );
296+ protected abstract boolean groupInsert (ResultContainer_ resultContainer , InTuple_ tuple );
279297
280298 protected boolean groupUpdate (ResultContainer_ resultContainer , InTuple_ tuple ) {
281- groupRetract (resultContainer , tuple );
282- groupInsert (resultContainer , tuple );
283- return true ;
299+ var retractNeedsPropagation = groupRetract (tuple );
300+ var insertNeedsPropagation = groupInsert (resultContainer , tuple );
301+ return retractNeedsPropagation || insertNeedsPropagation ;
284302 }
285303
286- protected abstract void groupRetract (ResultContainer_ resultContainer , InTuple_ tuple );
304+ protected abstract boolean groupRetract (InTuple_ tuple );
287305
288306 @ Override
289307 public Propagator getPropagator () {
0 commit comments