11package org .stt .command ;
22
3+ import net .engio .mbassy .bus .BusRuntime ;
4+ import net .engio .mbassy .bus .IMessagePublication ;
35import net .engio .mbassy .bus .MBassador ;
6+ import net .engio .mbassy .bus .common .PubSubSupport ;
47import org .stt .model .ItemDeleted ;
58import org .stt .model .ItemInserted ;
69import org .stt .model .ItemReplaced ;
710import org .stt .model .TimeTrackingItem ;
811import org .stt .persistence .ItemPersister ;
912import org .stt .query .Criteria ;
1013import org .stt .query .TimeTrackingItemQueries ;
14+ import org .stt .time .DateTimes ;
1115
1216import javax .inject .Inject ;
1317import javax .inject .Singleton ;
2327public class Activities implements CommandHandler {
2428 private ItemPersister persister ;
2529 private final TimeTrackingItemQueries queries ;
26- private Optional < MBassador < Object >> eventBus ;
30+ private final PubSubSupport < Object > publisher ;
2731
2832 @ Inject
2933 public Activities (ItemPersister persister ,
3034 TimeTrackingItemQueries queries ,
31- Optional <MBassador <Object >> eventBus ) {
32- this .eventBus = requireNonNull (eventBus );
35+ Optional <MBassador <Object >> publisher ) {
36+ this .publisher = requireNonNull (publisher ).map (PubSubSupport .class ::cast )
37+ .orElseGet (DoNotPublish ::new );
3338 this .persister = requireNonNull (persister );
3439 this .queries = requireNonNull (queries );
3540 }
@@ -46,10 +51,10 @@ public void addNewActivity(NewActivity command) {
4651 if (potentialItemToReplace .isPresent ()) {
4752 TimeTrackingItem itemToReplace = potentialItemToReplace .get ();
4853 persister .replace (itemToReplace , command .newItem );
49- eventBus . ifPresent ( eb -> eb . publish (new ItemReplaced (itemToReplace , command .newItem ) ));
54+ publisher . publish (new ItemReplaced (itemToReplace , command .newItem ));
5055 } else {
5156 persister .persist (command .newItem );
52- eventBus . ifPresent ( eb -> eb . publish (new ItemInserted (command .newItem ) ));
57+ publisher . publish (new ItemInserted (command .newItem ));
5358 }
5459 }
5560
@@ -74,15 +79,15 @@ public void endCurrentActivity(EndCurrentItem command) {
7479 .ifPresent (item -> {
7580 TimeTrackingItem derivedItem = item .withEnd (command .endAt );
7681 persister .replace (item , derivedItem );
77- eventBus . ifPresent ( eb -> eb . publish (new ItemReplaced (item , derivedItem ) ));
82+ publisher . publish (new ItemReplaced (item , derivedItem ));
7883 });
7984 }
8085
8186 @ Override
8287 public void removeActivity (RemoveActivity command ) {
8388 requireNonNull (command );
8489 persister .delete (command .itemToDelete );
85- eventBus . ifPresent ( eb -> eb . publish (new ItemDeleted (command .itemToDelete ) ));
90+ publisher . publish (new ItemDeleted (command .itemToDelete ));
8691 }
8792
8893 @ Override
@@ -96,11 +101,13 @@ public void removeActivityAndCloseGap(RemoveActivity command) {
96101 TimeTrackingItem replaceAllWith = next .get ().getEnd ()
97102 .map (previous .get ()::withEnd ).orElse (previous .get ().withPendingEnd ());
98103 persister .persist (replaceAllWith );
99- eventBus .ifPresent (eb -> eb .publish (new ItemInserted (replaceAllWith )));
100- } else if (previous .isPresent () && !command .itemToDelete .getEnd ().isPresent ()) {
104+ publisher .publish (new ItemInserted (replaceAllWith ));
105+ } else if (previous .isPresent ()
106+ && DateTimes .isOnSameDay (previous .get ().getStart (), command .itemToDelete .getStart ())
107+ && !command .itemToDelete .getEnd ().isPresent ()) {
101108 TimeTrackingItem replaceAllWith = previous .get ().withPendingEnd ();
102109 persister .persist (replaceAllWith );
103- eventBus . ifPresent ( eb -> eb . publish (new ItemInserted (replaceAllWith ) ));
110+ publisher . publish (new ItemInserted (replaceAllWith ));
104111 } else {
105112 removeActivity (command );
106113 }
@@ -118,7 +125,7 @@ public void resumeActivity(ResumeActivity command) {
118125 .withPendingEnd ()
119126 .withStart (command .beginningWith );
120127 persister .persist (resumedItem );
121- eventBus . ifPresent ( eb -> eb . publish (new ItemInserted (resumedItem ) ));
128+ publisher . publish (new ItemInserted (resumedItem ));
122129 }
123130
124131 @ Override
@@ -130,7 +137,7 @@ public void resumeLastActivity(ResumeLastActivity command) {
130137 lastTimeTrackingItem .ifPresent (timeTrackingItem -> {
131138 TimeTrackingItem resumedItem = timeTrackingItem .withPendingEnd ().withStart (command .resumeAt );
132139 persister .persist (resumedItem );
133- eventBus . ifPresent ( eb -> eb . publish (new ItemInserted (resumedItem ) ));
140+ publisher . publish (new ItemInserted (resumedItem ));
134141 });
135142 }
136143 }
@@ -140,8 +147,30 @@ public void bulkChangeActivity(Collection<TimeTrackingItem> itemsToChange, Strin
140147 requireNonNull (itemsToChange );
141148 requireNonNull (activity );
142149 Collection <ItemPersister .UpdatedItem > updatedItems = persister .updateActivitities (itemsToChange , activity );
143- eventBus . ifPresent ( eb -> updatedItems .stream ()
150+ updatedItems .stream ()
144151 .map (updatedItem -> new ItemReplaced (updatedItem .original , updatedItem .updated ))
145- .forEach (eb ::publish ));
152+ .forEach (publisher ::publish );
153+ }
154+
155+ private static class DoNotPublish implements PubSubSupport <Object > {
156+ @ Override
157+ public void subscribe (Object listener ) {
158+ throw new IllegalStateException ();
159+ }
160+
161+ @ Override
162+ public boolean unsubscribe (Object listener ) {
163+ throw new IllegalStateException ();
164+ }
165+
166+ @ Override
167+ public IMessagePublication publish (Object message ) {
168+ return null ;
169+ }
170+
171+ @ Override
172+ public BusRuntime getRuntime () {
173+ throw new IllegalStateException ();
174+ }
146175 }
147176}
0 commit comments