Skip to content
This repository was archived by the owner on Aug 17, 2020. It is now read-only.

Commit 3572936

Browse files
committed
Merge pull request #97 from square/jw/slow-scheduler
Guard against a slow scheduler with backpressure strategy.
2 parents 815b3ac + e6c781a commit 3572936

4 files changed

Lines changed: 55 additions & 12 deletions

File tree

sqlbrite/src/androidTest/java/com/squareup/sqlbrite/BriteContentResolverTest.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.List;
2828
import java.util.Map;
2929
import rx.Subscription;
30+
import rx.internal.util.RxRingBuffer;
3031
import rx.subscriptions.Subscriptions;
3132

3233
import static com.google.common.truth.Truth.assertThat;
@@ -124,7 +125,7 @@ public void testUnsubscribeDoesNotTrigger() {
124125
assertThat(logs).isEmpty();
125126
}
126127

127-
public void testBackpressureSupported() {
128+
public void testBackpressureSupportedWhenConsumerSlow() {
128129
contentResolver.insert(TABLE, values("key1", "val1"));
129130
o.doRequest(2);
130131

@@ -170,6 +171,26 @@ public void testBackpressureSupported() {
170171
o.assertNoMoreEvents();
171172
}
172173

174+
public void testBackpressureSupportedWhenSchedulerSlow() {
175+
subscription = db.createQuery(TABLE, null, null, null, null, false).subscribe(o);
176+
o.assertCursor().isExhausted();
177+
178+
// Switch the scheduler to queue actions.
179+
scheduler.runTasksImmediately(false);
180+
181+
// Shotgun twice as many insertions as the scheduler queue can handle.
182+
for (int i = 0; i < RxRingBuffer.SIZE * 2; i++) {
183+
contentResolver.insert(TABLE, values("key" + i, "val" + i));
184+
}
185+
186+
scheduler.triggerActions();
187+
188+
// Assert we got all the events from the queue plus the one buffered from backpressure.
189+
for (int i = 0; i < RxRingBuffer.SIZE + 1; i++) {
190+
o.assertCursor(); // Ignore contents, just assert we got notified.
191+
}
192+
}
193+
173194
public void testInitialValueAndTriggerUsesScheduler() {
174195
scheduler.runTasksImmediately(false);
175196

sqlbrite/src/androidTest/java/com/squareup/sqlbrite/BriteDatabaseTest.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import rx.Observable;
3838
import rx.Subscription;
3939
import rx.functions.Action1;
40+
import rx.internal.util.RxRingBuffer;
4041

4142
import static android.database.sqlite.SQLiteDatabase.CONFLICT_IGNORE;
4243
import static com.google.common.truth.Truth.assertThat;
@@ -744,7 +745,7 @@ public final class BriteDatabaseTest {
744745
o.assertNoMoreEvents();
745746
}
746747

747-
@Test public void backpressureSupported() {
748+
@Test public void backpressureSupportedWhenConsumerSlow() {
748749
o.doRequest(2);
749750

750751
db.createQuery(TABLE_EMPLOYEE, SELECT_EMPLOYEES).subscribe(o);
@@ -789,6 +790,30 @@ public final class BriteDatabaseTest {
789790
o.assertNoMoreEvents();
790791
}
791792

793+
@Test public void backpressureSupportedWhenSchedulerSlow() {
794+
db.createQuery(TABLE_EMPLOYEE, SELECT_EMPLOYEES).subscribe(o);
795+
o.assertCursor()
796+
.hasRow("alice", "Alice Allison")
797+
.hasRow("bob", "Bob Bobberson")
798+
.hasRow("eve", "Eve Evenson")
799+
.isExhausted();
800+
801+
// Switch the scheduler to queue actions.
802+
scheduler.runTasksImmediately(false);
803+
804+
// Shotgun twice as many insertions as the scheduler queue can handle.
805+
for (int i = 0; i < RxRingBuffer.SIZE * 2; i++) {
806+
db.insert(TABLE_EMPLOYEE, employee("user" + i, "name" + i));
807+
}
808+
809+
scheduler.triggerActions();
810+
811+
// Assert we got all the events from the queue plus the one buffered from backpressure.
812+
for (int i = 0; i < RxRingBuffer.SIZE + 1; i++) {
813+
o.assertCursor(); // Ignore contents, just assert we got notified.
814+
}
815+
}
816+
792817
@Test public void badQueryThrows() {
793818
try {
794819
db.query("SELECT * FROM missing");

sqlbrite/src/main/java/com/squareup/sqlbrite/BriteContentResolver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,10 @@ public QueryObservable createQuery(@NonNull final Uri uri, @Nullable final Strin
120120
}
121121
};
122122
Observable<Query> queryObservable = Observable.create(subscribe) //
123+
.onBackpressureLatest() // Guard against uncontrollable frequency of upstream emissions.
123124
.startWith(query) //
124125
.observeOn(scheduler) //
125-
.onBackpressureLatest();
126+
.onBackpressureLatest(); // Guard against uncontrollable frequency of scheduler executions.
126127
return new QueryObservable(queryObservable);
127128
}
128129

sqlbrite/src/main/java/com/squareup/sqlbrite/BriteDatabase.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@
5353
* the result of a query. Create using a {@link SqlBrite} instance.
5454
*/
5555
public final class BriteDatabase implements Closeable {
56-
private static final Set<String> INITIAL = Collections.emptySet();
57-
5856
private final SQLiteOpenHelper helper;
5957
private final SqlBrite.Logger logger;
6058

@@ -247,7 +245,7 @@ public QueryObservable createQuery(@NonNull final String table, @NonNull String
247245
@NonNull String... args) {
248246
Func1<Set<String>, Boolean> tableFilter = new Func1<Set<String>, Boolean>() {
249247
@Override public Boolean call(Set<String> triggers) {
250-
return triggers == INITIAL || triggers.contains(table);
248+
return triggers.contains(table);
251249
}
252250

253251
@Override public String toString() {
@@ -268,9 +266,6 @@ public QueryObservable createQuery(@NonNull final Iterable<String> tables, @NonN
268266
@NonNull String... args) {
269267
Func1<Set<String>, Boolean> tableFilter = new Func1<Set<String>, Boolean>() {
270268
@Override public Boolean call(Set<String> triggers) {
271-
if (triggers == INITIAL) {
272-
return true;
273-
}
274269
for (String table : tables) {
275270
if (triggers.contains(table)) {
276271
return true;
@@ -318,15 +313,16 @@ private QueryObservable createQuery(final Func1<Set<String>, Boolean> tableFilte
318313
};
319314

320315
Observable<Query> queryObservable = triggers //
321-
.startWith(INITIAL) // Immediately trigger the query for initial value.
322-
.observeOn(scheduler) //
323316
.filter(tableFilter) // Only trigger on tables we care about.
324317
.map(new Func1<Set<String>, Query>() {
325318
@Override public Query call(Set<String> trigger) {
326319
return query;
327320
}
328321
}) //
329-
.onBackpressureLatest() //
322+
.onBackpressureLatest() // Guard against uncontrollable frequency of upstream emissions.
323+
.startWith(query) //
324+
.observeOn(scheduler) //
325+
.onBackpressureLatest() // Guard against uncontrollable frequency of scheduler executions.
330326
.doOnSubscribe(new Action0() {
331327
@Override public void call() {
332328
if (transactions.get() != null) {

0 commit comments

Comments
 (0)