Skip to content
This repository was archived by the owner on Aug 17, 2020. It is now read-only.

Commit e6c781a

Browse files
committed
Guard against a slow scheduler with backpressure strategy.
Schedulers apply backpressure upstream which previously behaved as an unbounded stream. When the scheduler worker couldn't keep up an MBPE was thrown. We already use the 'latest' strategy for slowing 'Query' emissions after the scheduler so we can apply the same thing before it. This comes with a subtle change in that transactions will now synchronously run every filter Func before continuing. Previously the filters ran on the scheduler and did not block the thread performing the transaction. This means that the more queries that are listening at once, the slower every transaction's completing step will be. I don't expect this to have any meaningful impact since it's very fast and happening on a background thread already.
1 parent 815b3ac commit e6c781a

4 files changed

Lines changed: 55 additions & 12 deletions

File tree

sqlbrite/src/androidTest/java/com/squareup/sqlbrite/BriteContentResolverTest.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.List;
2828
import java.util.Map;
2929
import rx.Subscription;
30+
import rx.internal.util.RxRingBuffer;
3031
import rx.subscriptions.Subscriptions;
3132

3233
import static com.google.common.truth.Truth.assertThat;
@@ -124,7 +125,7 @@ public void testUnsubscribeDoesNotTrigger() {
124125
assertThat(logs).isEmpty();
125126
}
126127

127-
public void testBackpressureSupported() {
128+
public void testBackpressureSupportedWhenConsumerSlow() {
128129
contentResolver.insert(TABLE, values("key1", "val1"));
129130
o.doRequest(2);
130131

@@ -170,6 +171,26 @@ public void testBackpressureSupported() {
170171
o.assertNoMoreEvents();
171172
}
172173

174+
public void testBackpressureSupportedWhenSchedulerSlow() {
175+
subscription = db.createQuery(TABLE, null, null, null, null, false).subscribe(o);
176+
o.assertCursor().isExhausted();
177+
178+
// Switch the scheduler to queue actions.
179+
scheduler.runTasksImmediately(false);
180+
181+
// Shotgun twice as many insertions as the scheduler queue can handle.
182+
for (int i = 0; i < RxRingBuffer.SIZE * 2; i++) {
183+
contentResolver.insert(TABLE, values("key" + i, "val" + i));
184+
}
185+
186+
scheduler.triggerActions();
187+
188+
// Assert we got all the events from the queue plus the one buffered from backpressure.
189+
for (int i = 0; i < RxRingBuffer.SIZE + 1; i++) {
190+
o.assertCursor(); // Ignore contents, just assert we got notified.
191+
}
192+
}
193+
173194
public void testInitialValueAndTriggerUsesScheduler() {
174195
scheduler.runTasksImmediately(false);
175196

sqlbrite/src/androidTest/java/com/squareup/sqlbrite/BriteDatabaseTest.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import rx.Observable;
3838
import rx.Subscription;
3939
import rx.functions.Action1;
40+
import rx.internal.util.RxRingBuffer;
4041

4142
import static android.database.sqlite.SQLiteDatabase.CONFLICT_IGNORE;
4243
import static com.google.common.truth.Truth.assertThat;
@@ -744,7 +745,7 @@ public final class BriteDatabaseTest {
744745
o.assertNoMoreEvents();
745746
}
746747

747-
@Test public void backpressureSupported() {
748+
@Test public void backpressureSupportedWhenConsumerSlow() {
748749
o.doRequest(2);
749750

750751
db.createQuery(TABLE_EMPLOYEE, SELECT_EMPLOYEES).subscribe(o);
@@ -789,6 +790,30 @@ public final class BriteDatabaseTest {
789790
o.assertNoMoreEvents();
790791
}
791792

793+
@Test public void backpressureSupportedWhenSchedulerSlow() {
794+
db.createQuery(TABLE_EMPLOYEE, SELECT_EMPLOYEES).subscribe(o);
795+
o.assertCursor()
796+
.hasRow("alice", "Alice Allison")
797+
.hasRow("bob", "Bob Bobberson")
798+
.hasRow("eve", "Eve Evenson")
799+
.isExhausted();
800+
801+
// Switch the scheduler to queue actions.
802+
scheduler.runTasksImmediately(false);
803+
804+
// Shotgun twice as many insertions as the scheduler queue can handle.
805+
for (int i = 0; i < RxRingBuffer.SIZE * 2; i++) {
806+
db.insert(TABLE_EMPLOYEE, employee("user" + i, "name" + i));
807+
}
808+
809+
scheduler.triggerActions();
810+
811+
// Assert we got all the events from the queue plus the one buffered from backpressure.
812+
for (int i = 0; i < RxRingBuffer.SIZE + 1; i++) {
813+
o.assertCursor(); // Ignore contents, just assert we got notified.
814+
}
815+
}
816+
792817
@Test public void badQueryThrows() {
793818
try {
794819
db.query("SELECT * FROM missing");

sqlbrite/src/main/java/com/squareup/sqlbrite/BriteContentResolver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,10 @@ public QueryObservable createQuery(@NonNull final Uri uri, @Nullable final Strin
120120
}
121121
};
122122
Observable<Query> queryObservable = Observable.create(subscribe) //
123+
.onBackpressureLatest() // Guard against uncontrollable frequency of upstream emissions.
123124
.startWith(query) //
124125
.observeOn(scheduler) //
125-
.onBackpressureLatest();
126+
.onBackpressureLatest(); // Guard against uncontrollable frequency of scheduler executions.
126127
return new QueryObservable(queryObservable);
127128
}
128129

sqlbrite/src/main/java/com/squareup/sqlbrite/BriteDatabase.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@
5353
* the result of a query. Create using a {@link SqlBrite} instance.
5454
*/
5555
public final class BriteDatabase implements Closeable {
56-
private static final Set<String> INITIAL = Collections.emptySet();
57-
5856
private final SQLiteOpenHelper helper;
5957
private final SqlBrite.Logger logger;
6058

@@ -247,7 +245,7 @@ public QueryObservable createQuery(@NonNull final String table, @NonNull String
247245
@NonNull String... args) {
248246
Func1<Set<String>, Boolean> tableFilter = new Func1<Set<String>, Boolean>() {
249247
@Override public Boolean call(Set<String> triggers) {
250-
return triggers == INITIAL || triggers.contains(table);
248+
return triggers.contains(table);
251249
}
252250

253251
@Override public String toString() {
@@ -268,9 +266,6 @@ public QueryObservable createQuery(@NonNull final Iterable<String> tables, @NonN
268266
@NonNull String... args) {
269267
Func1<Set<String>, Boolean> tableFilter = new Func1<Set<String>, Boolean>() {
270268
@Override public Boolean call(Set<String> triggers) {
271-
if (triggers == INITIAL) {
272-
return true;
273-
}
274269
for (String table : tables) {
275270
if (triggers.contains(table)) {
276271
return true;
@@ -318,15 +313,16 @@ private QueryObservable createQuery(final Func1<Set<String>, Boolean> tableFilte
318313
};
319314

320315
Observable<Query> queryObservable = triggers //
321-
.startWith(INITIAL) // Immediately trigger the query for initial value.
322-
.observeOn(scheduler) //
323316
.filter(tableFilter) // Only trigger on tables we care about.
324317
.map(new Func1<Set<String>, Query>() {
325318
@Override public Query call(Set<String> trigger) {
326319
return query;
327320
}
328321
}) //
329-
.onBackpressureLatest() //
322+
.onBackpressureLatest() // Guard against uncontrollable frequency of upstream emissions.
323+
.startWith(query) //
324+
.observeOn(scheduler) //
325+
.onBackpressureLatest() // Guard against uncontrollable frequency of scheduler executions.
330326
.doOnSubscribe(new Action0() {
331327
@Override public void call() {
332328
if (transactions.get() != null) {

0 commit comments

Comments
 (0)