1717
1818import static org .junit .Assert .assertEquals ;
1919import static org .junit .Assert .assertFalse ;
20+ import static org .junit .Assert .assertSame ;
2021import static org .junit .Assert .assertTrue ;
2122import static org .junit .Assert .fail ;
2223import static org .mockito .Matchers .any ;
@@ -274,10 +275,18 @@ public void runConcurrencyTest() {
274275 }
275276 }
276277
278+ /**
279+ * Test that a notification does not get delayed in the queue waiting for the next event to push it through.
280+ *
281+ * @throws InterruptedException
282+ */
277283 @ Test
278- public void testNotificationDelay () {
284+ public void testNotificationDelay () throws InterruptedException {
279285 ExecutorService tp = Executors .newFixedThreadPool (2 );
280286
287+ final CountDownLatch onNextCount = new CountDownLatch (1 );
288+ final CountDownLatch latch = new CountDownLatch (1 );
289+
281290 TestSubscriber <String > to = new TestSubscriber <String >(new Observer <String >() {
282291
283292 @ Override
@@ -292,12 +301,12 @@ public void onError(Throwable e) {
292301
293302 @ Override
294303 public void onNext (String t ) {
295- // force it to take time when delivering
296- // so the second thread will asynchronously enqueue
304+ // know when the first thread gets in
305+ onNextCount .countDown ();
306+ // force it to take time when delivering so the second one is enqueued
297307 try {
298- Thread . sleep ( 50 );
308+ latch . await ( );
299309 } catch (InterruptedException e ) {
300- e .printStackTrace ();
301310 }
302311 }
303312
@@ -307,10 +316,23 @@ public void onNext(String t) {
307316 Future <?> f1 = tp .submit (new OnNextThread (o , 1 ));
308317 Future <?> f2 = tp .submit (new OnNextThread (o , 1 ));
309318
319+ onNextCount .await ();
320+
321+ Thread t1 = to .getLastSeenThread ();
322+ System .out .println ("first onNext on thread: " + t1 );
323+
324+ latch .countDown ();
325+
310326 waitOnThreads (f1 , f2 );
311327 // not completed yet
312328
313329 assertEquals (2 , to .getOnNextEvents ().size ());
330+
331+ Thread t2 = to .getLastSeenThread ();
332+ System .out .println ("second onNext on thread: " + t2 );
333+
334+ assertSame (t1 , t2 );
335+
314336 System .out .println (to .getOnNextEvents ());
315337 o .onCompleted ();
316338 System .out .println (to .getOnNextEvents ());
0 commit comments