-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathOttoEventProcessor.java
More file actions
executable file
·309 lines (285 loc) · 11 KB
/
OttoEventProcessor.java
File metadata and controls
executable file
·309 lines (285 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
/*
* Copyright (C) 2016 Sysdata Digital, S.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.baseandroid.events.otto;
import com.baseandroid.events.Event;
import com.baseandroid.events.EventProcessor;
import com.squareup.otto.Bus;
import com.squareup.otto.DeadEvent;
import com.squareup.otto.Subscribe;
import com.squareup.otto.ThreadEnforcer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
/**
* Class managing the events used throughout the application.
* <p>
* This class defines two {@link Bus}es:
* <ul>
* <li>{@link #BUS}: a Bus working on a thread separated by the Android Main Thread</li>
* <li>{@link #UI_BUS}: a Bus working on the Android Main Thread</li>
* </ul>
* <br>
* <p>
* Events are stored in queues depending on their {@link Event.Type} and these queues are checked periodically in a specific order:
* <ol>
* <li><code>UI</code></li>
* <li><code>NETWORK</code></li>
* <li><code>DATA</code></li>
* <li><code>GENERIC</code></li>
* <li><code>CONTEXT</code></li>
* </ol>
* <br>
* <p>
* If an Event is not caught by anybody, it will eventually be caught by the {@link DeadEventManager},
* which will do nothing more than log that an event has not been managed.
* </p>
*
* @author Stefano Ciarcia'
* created on 22/07/2015.
*/
public final class OttoEventProcessor implements EventProcessor {
/**
* Bus working on the Android Main Thread
*/
public static final Bus UI_BUS = new MainThreadBus();
private static final Logger LOGGER = LoggerFactory.getLogger(OttoEventProcessor.class);
/**
* Defines the time interval (in milliseconds) used to poll the event queues
*/
private static final long EVENT_CONSUMPTION_INTERVAL = 10; // 10ms
/**
* Bus working on a thread separated by the Android Main Thread
*/
private static final Bus BUS = new Bus(ThreadEnforcer.ANY);
/**
* flag used to determine whether this EventDispatcher has been initialised
*/
private static boolean mInitialised = false;
/**
* Synchronized Queues of <code>NETWORK</code> events
*/
private static List<Object> mNetworkEvents;
/**
* Synchronized Queues of <code>DATA</code> events
*/
private static List<Object> mDataEvents;
/**
* Synchronized Queues of <code>GENERIC</code> events
*/
private static List<Object> mGenericEvents;
/**
* Synchronized Queues of <code>UI</code> events
*/
private static List<Object> mUIEvents;
/**
* Synchronized Queues of <code>CONTEXT</code> events
*/
private static List<Object> mContextEvents;
DeadEventManager mDeadEventManager;
private OttoEventProcessor() {
// No instances.
}
public static EventProcessor newInstance() {
return new OttoEventProcessor();
}
/**
* This method starts some kind of polling of the events queues.
* <p>
* Once every 10 ms (or whatever is specified by the {@link #EVENT_CONSUMPTION_INTERVAL} field in this very class)
* the queues will be checked for the presence of an event of some sort.
* </p>
* <p>
* The UI events will be managed on their own, while the other events will be checked in the following order:
* <ol>
* <li>Network Events</li>
* <li>Data Events</li>
* <li>Generic Events</li>
* <li>Context Events</li>
* </ol>
* Moreover, since the Events are ordered by Priority, the more urgent events of each queue will be processed in turn
* <p>
* If an event is present, it will be posted on the related bus to be processed.
*/
private static void startEventsConsumption() {
LOGGER.info("starting UI events consumption processors");
/*
* This Observable (Observable.interval) cycles every EVENT_CONSUMPTION_INTERVAL millis
* we observe results on a new Thread and remove items from non-UI queues
*/
rx.Observable.interval(EVENT_CONSUMPTION_INTERVAL, TimeUnit.MILLISECONDS)
.onBackpressureBuffer()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(aLong1 -> {
if (!mNetworkEvents.isEmpty()) {
Object ev = mNetworkEvents.remove(0);
logEvent(ev, false);
BUS.post(ev);
} else if (!mDataEvents.isEmpty()) {
Object ev = mDataEvents.remove(0);
logEvent(ev, false);
BUS.post(ev);
} else if (!mGenericEvents.isEmpty()) {
Object ev = mGenericEvents.remove(0);
logEvent(ev, false);
BUS.post(ev);
} else if (!mContextEvents.isEmpty()) {
Object ev = mContextEvents.remove(0);
logEvent(ev, false);
BUS.post(ev);
}
});
/*
* This Observable (Observable.interval) cycles every EVENT_CONSUMPTION_INTERVAL millis
* we observe results on the UI Thread and remove items UI queues
*/
rx.Observable.interval(EVENT_CONSUMPTION_INTERVAL, TimeUnit.MILLISECONDS)
.onBackpressureBuffer()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())//UI Events must be posted on the Main Thread
.subscribe(aLong -> {
if (!mUIEvents.isEmpty()) {
Object ev = mUIEvents.remove(0);
logEvent(ev, true);
UI_BUS.post(ev);
}
});
}
/**
* Convenience method used for debugging purposes. It will log what kind of event is being posted on what Bus.
*
* @param ev The event being posted
* @param uiEvent whether it's a UI event or not.
*/
private static void logEvent(Object ev, boolean uiEvent) {
String bus;
if (uiEvent) {
bus = "UI_BUS";
} else {
bus = "BUS";
}
LOGGER.info("posting " + ev.getClass().getSimpleName() + " of type " + ev.getClass().getAnnotation(Event.class).type() + " on " + bus);
}
/**
* Registers a given Object on both Buses
*
* @param o Object to register on the Buses
*/
public void onRegister(Object o) {
BUS.register(o);
UI_BUS.register(o);
}
/**
* Unregisters a given Object from both Buses
*
* @param o Object to unregister from the Buses
*/
public void onUnregister(Object o) {
BUS.unregister(o);
UI_BUS.unregister(o);
}
/**
* After checking whether the object being posted is annotated with the {@link Event} Annotation,
* it will be placed in the corresponding queue and such queue will be then sorted by {@link Event.Priority}.
* <p>
* <b>NOTE: if an object being posted has not been annotated with the {@link Event} Annotation it will be disregarded!!!</b>
* </p>
*
* @param o the Object we want to post as an event
*/
public void onPost(Object o) {
LOGGER.info("received new object to post: " + o.getClass().getSimpleName());
if (!mInitialised) {
//init events lists implicetely synchronized
mNetworkEvents = Collections.synchronizedList(new ArrayList<>());
mGenericEvents = Collections.synchronizedList(new ArrayList<>());
mDataEvents = Collections.synchronizedList(new ArrayList<>());
mUIEvents = Collections.synchronizedList(new ArrayList<>());
mContextEvents = Collections.synchronizedList(new ArrayList<>());
//init dead events manager
if(mDeadEventManager == null) {
mDeadEventManager = new DeadEventManager();
} else {
BUS.unregister(mDeadEventManager);
UI_BUS.unregister(mDeadEventManager);
}
BUS.register(mDeadEventManager);
UI_BUS.register(mDeadEventManager);
//mark us as initialised
mInitialised = true;
startEventsConsumption();
}
//check if it's an event we recognise
if (o != null && o.getClass().isAnnotationPresent(Event.class)) {
//put it in the right list and sort the list
Event.Type t = o.getClass().getAnnotation(Event.class).type();
LOGGER.info("object " + o.getClass().getSimpleName() + " is an event of type " + t);
switch (t) {
default:
case GENERIC:
mGenericEvents.add(o);
Collections.sort(mGenericEvents, Event.COMPARATOR);
break;
case DATA:
mDataEvents.add(o);
Collections.sort(mDataEvents, Event.COMPARATOR);
break;
case NETWORK:
mNetworkEvents.add(o);
Collections.sort(mNetworkEvents, Event.COMPARATOR);
break;
case UI:
mUIEvents.add(o);
Collections.sort(mUIEvents, Event.COMPARATOR);
break;
case CONTEXT:
mContextEvents.add(o);
Collections.sort(mContextEvents, Event.COMPARATOR);
break;
}
} else if (o != null) {
DeadEvent deadEvent = new DeadEvent(BUS, o);
BUS.post(deadEvent);
}
}
@Override
public String onSavePoint(Object object) {
return null;
}
@Override
public void onLoadPoint(Object object, String key) {
}
/**
* If an event is posted but nobody is {@link Subscribe}d to it, it will become a "dead" event.
* This class is here to catch and log such occurrences, since this should never happen. This is very useful to debug possible leaks.
*/
private static class DeadEventManager {
/**
* Catches every {@link DeadEvent} not being consumed by nobody.
*
* @param de the event that nobody caught
*/
@Subscribe
public void onConsumeDeadEvent(DeadEvent de) {
LOGGER.warn("received DeadEvent of type " + de.event.getClass().getSimpleName());
}
}
}