diff --git a/src/test/java/io/reactivex/XFlatMapTest.java b/src/test/java/io/reactivex/XFlatMapTest.java index 6604700c00..028814e5ec 100644 --- a/src/test/java/io/reactivex/XFlatMapTest.java +++ b/src/test/java/io/reactivex/XFlatMapTest.java @@ -23,7 +23,7 @@ import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; -import io.reactivex.observers.TestObserver; +import io.reactivex.observers.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -31,7 +31,7 @@ public class XFlatMapTest { @Rule - public Retry retry = new Retry(3, 1000, true); + public Retry retry = new Retry(5, 1000, true); static final int SLEEP_AFTER_CANCEL = 500; @@ -40,12 +40,23 @@ public class XFlatMapTest { void sleep() throws Exception { cb.await(); try { + long before = System.currentTimeMillis(); Thread.sleep(5000); + throw new IllegalStateException("Was not interrupted in time?! " + (System.currentTimeMillis() - before)); } catch (InterruptedException ex) { // ignored here } } + void beforeCancelSleep(BaseTestConsumer ts) throws Exception { + long before = System.currentTimeMillis(); + Thread.sleep(50); + if (System.currentTimeMillis() - before > 100) { + ts.dispose(); + throw new IllegalStateException("Overslept?" + (System.currentTimeMillis() - before)); + } + } + @Test public void flowableFlowable() throws Exception { List errors = TestHelper.trackPluginErrors(); @@ -63,7 +74,7 @@ public Publisher apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -94,7 +105,7 @@ public Single apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -125,7 +136,7 @@ public Maybe apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -156,7 +167,7 @@ public Completable apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -188,7 +199,7 @@ public Completable apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -219,7 +230,7 @@ public Observable apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -250,7 +261,7 @@ public Single apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -281,7 +292,7 @@ public Maybe apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -312,7 +323,7 @@ public Completable apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -344,7 +355,7 @@ public Completable apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -375,7 +386,7 @@ public Single apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -406,7 +417,7 @@ public Maybe apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -437,7 +448,7 @@ public Completable apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -469,7 +480,7 @@ public Completable apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -500,7 +511,7 @@ public Single apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -531,7 +542,7 @@ public Maybe apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -562,7 +573,7 @@ public Completable apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); @@ -594,7 +605,7 @@ public Completable apply(Integer v) throws Exception { cb.await(); - Thread.sleep(50); + beforeCancelSleep(ts); ts.cancel(); diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeMaxConcurrentTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeMaxConcurrentTest.java index 057f51dff2..202fe7fdc8 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeMaxConcurrentTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeMaxConcurrentTest.java @@ -201,7 +201,7 @@ public void testSimpleAsyncLoop() { } } } - @Test(timeout = 10000) + @Test(timeout = 30000) public void testSimpleAsync() { for (int i = 1; i < 50; i++) { TestObserver ts = new TestObserver(); @@ -221,13 +221,13 @@ public void testSimpleAsync() { assertEquals(expected, actual); } } - @Test(timeout = 10000) + @Test(timeout = 30000) public void testSimpleOneLessAsyncLoop() { for (int i = 0; i < 200; i++) { testSimpleOneLessAsync(); } } - @Test(timeout = 10000) + @Test(timeout = 30000) public void testSimpleOneLessAsync() { long t = System.currentTimeMillis(); for (int i = 2; i < 50; i++) {