-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRxFirebase.java
More file actions
26 lines (25 loc) · 1.14 KB
/
RxFirebase.java
File metadata and controls
26 lines (25 loc) · 1.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Nullable
public static Observable<DataSnapshot> observe(final Query query) {
return Observable.create(new Action1<Emitter<DataSnapshot>>() {
@Override
public void call(final Emitter<DataSnapshot> dataSnapshotEmitter) {
final ValueEventListener valueEventListener = query.addValueEventListener(new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
dataSnapshotEmitter.onNext(dataSnapshot);
}
@Override
public void onCancelled(DatabaseError databaseError) {
dataSnapshotEmitter.onNext(null);
}
});
dataSnapshotEmitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
query.removeEventListener(valueEventListener);
}
});
}
}, Emitter.BackpressureMode.BUFFER)
.observeOn(Schedulers.computation());
}