diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java index 45336c8f4..00b956718 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java @@ -37,6 +37,10 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet this.delegate = delegate; } + @Override public CompletableObserver delegateObserver() { + return delegate; + } + @Override public void onSubscribe(final Disposable d) { if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, lifecycle.doOnEvent(new BiConsumer() { diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java index de1c2b4ae..2fd67be6e 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java @@ -37,6 +37,10 @@ final class AutoDisposingMaybeObserverImpl implements AutoDisposingMaybeObser this.delegate = delegate; } + @Override public MaybeObserver delegateObserver() { + return delegate; + } + @Override public void onSubscribe(final Disposable d) { if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, lifecycle.doOnEvent(new BiConsumer() { diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java index 2cd1eb9eb..031552ad0 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java @@ -37,6 +37,10 @@ final class AutoDisposingObserverImpl implements AutoDisposingObserver { this.delegate = delegate; } + @Override public Observer delegateObserver() { + return delegate; + } + @Override public void onSubscribe(final Disposable d) { if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, lifecycle.doOnEvent(new BiConsumer() { diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java index 972861462..e72980843 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java @@ -37,6 +37,10 @@ final class AutoDisposingSingleObserverImpl implements AutoDisposingSingleObs this.delegate = delegate; } + @Override public SingleObserver delegateObserver() { + return delegate; + } + @Override public void onSubscribe(final Disposable d) { if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, lifecycle.doOnEvent(new BiConsumer() { diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java index 1a0fa60bc..151b6d643 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java @@ -38,6 +38,10 @@ final class AutoDisposingSubscriberImpl implements AutoDisposingSubscriber this.delegate = delegate; } + @Override public Subscriber delegateSubscriber() { + return delegate; + } + @Override public void onSubscribe(final Subscription s) { if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, lifecycle.doOnEvent(new BiConsumer() { diff --git a/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingCompletableObserver.java b/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingCompletableObserver.java index b5442277c..a9b766ae6 100644 --- a/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingCompletableObserver.java +++ b/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingCompletableObserver.java @@ -17,10 +17,19 @@ package com.uber.autodispose.observers; import io.reactivex.CompletableObserver; +import io.reactivex.annotations.Experimental; import io.reactivex.disposables.Disposable; /** * A {@link Disposable} {@link CompletableObserver} that can automatically dispose itself. * Interface here for type safety but enforcement is left to the implementation. */ -public interface AutoDisposingCompletableObserver extends CompletableObserver, Disposable {} +public interface AutoDisposingCompletableObserver extends CompletableObserver, Disposable { + + /** + * @return The delegate {@link CompletableObserver} that is used under the hood forintrospection + * purposes. This will be updated once LambdaIntrospection is out of @Experimental in RxJava. + */ + @Experimental + CompletableObserver delegateObserver(); +} diff --git a/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingMaybeObserver.java b/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingMaybeObserver.java index a94372b98..d4db418ec 100644 --- a/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingMaybeObserver.java +++ b/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingMaybeObserver.java @@ -17,10 +17,19 @@ package com.uber.autodispose.observers; import io.reactivex.MaybeObserver; +import io.reactivex.annotations.Experimental; import io.reactivex.disposables.Disposable; /** * A {@link Disposable} {@link MaybeObserver} that can automatically dispose itself. * Interface here for type safety but enforcement is left to the implementation. */ -public interface AutoDisposingMaybeObserver extends MaybeObserver, Disposable {} +public interface AutoDisposingMaybeObserver extends MaybeObserver, Disposable { + + /** + * @return The delegate {@link MayberObserver} that is used under the hood for introspection + * purposes. This will be updated once LambdaIntrospection is out of @Experimental in RxJava. + */ + @Experimental + MaybeObserver delegateObserver(); +} diff --git a/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingObserver.java b/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingObserver.java index e955e6e11..b8aeda459 100644 --- a/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingObserver.java +++ b/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingObserver.java @@ -17,10 +17,19 @@ package com.uber.autodispose.observers; import io.reactivex.Observer; +import io.reactivex.annotations.Experimental; import io.reactivex.disposables.Disposable; /** * A {@link Disposable} {@link Observer} that can automatically dispose itself. * Interface here for type safety but enforcement is left to the implementation. */ -public interface AutoDisposingObserver extends Observer, Disposable {} +public interface AutoDisposingObserver extends Observer, Disposable { + + /** + * @return The delegate {@link Observer} that is used under the hood for introspection purpose. + * This will be updated once LambdaIntrospection is out of @Experimental in RxJava. + */ + @Experimental + Observer delegateObserver(); +} diff --git a/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingSingleObserver.java b/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingSingleObserver.java index f60553114..d013acb70 100644 --- a/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingSingleObserver.java +++ b/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingSingleObserver.java @@ -17,10 +17,19 @@ package com.uber.autodispose.observers; import io.reactivex.SingleObserver; +import io.reactivex.annotations.Experimental; import io.reactivex.disposables.Disposable; /** * A {@link Disposable} {@link SingleObserver} that can automatically dispose itself. * Interface here for type safety but enforcement is left to the implementation. */ -public interface AutoDisposingSingleObserver extends SingleObserver, Disposable {} +public interface AutoDisposingSingleObserver extends SingleObserver, Disposable { + + /** + * @return The delegate {@link SingleObserver} that is used under the hood for introspection + * purposes. This will be updated once LambdaIntrospection is out of @Experimental in RxJava. + */ + @Experimental + SingleObserver delegateObserver(); +} diff --git a/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingSubscriber.java b/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingSubscriber.java index 053e3209e..2740dc1ff 100644 --- a/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingSubscriber.java +++ b/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingSubscriber.java @@ -16,6 +16,8 @@ package com.uber.autodispose.observers; +import io.reactivex.FlowableSubscriber; +import io.reactivex.annotations.Experimental; import io.reactivex.disposables.Disposable; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -24,4 +26,13 @@ * A {@link Disposable} {@link Subscriber} that can automatically dispose itself. Interface here * for type safety but enforcement is left to the implementation. */ -public interface AutoDisposingSubscriber extends Subscriber, Subscription, Disposable {} +public interface AutoDisposingSubscriber + extends FlowableSubscriber, Subscription, Disposable { + + /** + * @return The delegate {@link Subscriber} that is used under the hood for introspection + * purposes. This will be updated once LambdaIntrospection is out of @Experimental in RxJava. + */ + @Experimental + Subscriber delegateSubscriber(); +} diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeCompletableObserverTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeCompletableObserverTest.java index fa24eee47..9eef4e9cc 100755 --- a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeCompletableObserverTest.java +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeCompletableObserverTest.java @@ -17,17 +17,26 @@ package com.uber.autodispose; import com.uber.autodispose.test.RecordingObserver; +import com.uber.autodispose.observers.AutoDisposingCompletableObserver; + import io.reactivex.Completable; import io.reactivex.CompletableEmitter; +import io.reactivex.CompletableObserver; import io.reactivex.CompletableOnSubscribe; +import io.reactivex.Maybe; +import io.reactivex.functions.BiFunction; import io.reactivex.functions.Cancellable; import io.reactivex.functions.Consumer; import io.reactivex.functions.Predicate; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subjects.BehaviorSubject; import io.reactivex.subjects.CompletableSubject; import io.reactivex.subjects.MaybeSubject; + import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + import org.junit.After; import org.junit.Test; @@ -38,16 +47,19 @@ public class AutoDisposeCompletableObserverTest { private static final RecordingObserver.Logger LOGGER = new RecordingObserver.Logger() { - @Override public void log(String message) { + @Override + public void log(String message) { System.out.println(AutoDisposeCompletableObserverTest.class.getSimpleName() + ": " + message); } }; - @After public void resetPlugins() { + @After + public void resetPlugins() { AutoDisposePlugins.reset(); } - @Test public void autoDispose_withMaybe_normal() { + @Test + public void autoDispose_withMaybe_normal() { RecordingObserver o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); MaybeSubject lifecycle = MaybeSubject.create(); @@ -68,7 +80,8 @@ public class AutoDisposeCompletableObserverTest { assertThat(lifecycle.hasObservers()).isFalse(); } - @Test public void autoDispose_withMaybe_interrupted() { + @Test + public void autoDispose_withMaybe_interrupted() { RecordingObserver o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); MaybeSubject lifecycle = MaybeSubject.create(); @@ -89,7 +102,8 @@ public class AutoDisposeCompletableObserverTest { o.assertNoMoreEvents(); } - @Test public void autoDispose_withProvider_completion() { + @Test + public void autoDispose_withProvider_completion() { RecordingObserver o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); MaybeSubject scope = MaybeSubject.create(); @@ -109,7 +123,8 @@ public class AutoDisposeCompletableObserverTest { assertThat(scope.hasObservers()).isFalse(); } - @Test public void autoDispose_withProvider_interrupted() { + @Test + public void autoDispose_withProvider_interrupted() { RecordingObserver o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); MaybeSubject scope = MaybeSubject.create(); @@ -132,7 +147,8 @@ public class AutoDisposeCompletableObserverTest { o.assertNoMoreEvents(); } - @Test public void autoDispose_withLifecycleProvider_completion() { + @Test + public void autoDispose_withLifecycleProvider_completion() { RecordingObserver o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); @@ -157,7 +173,8 @@ public class AutoDisposeCompletableObserverTest { assertThat(lifecycle.hasObservers()).isFalse(); } - @Test public void autoDispose_withLifecycleProvider_interrupted() { + @Test + public void autoDispose_withLifecycleProvider_interrupted() { RecordingObserver o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); @@ -185,7 +202,8 @@ public class AutoDisposeCompletableObserverTest { o.assertNoMoreEvents(); } - @Test public void autoDispose_withLifecycleProvider_withoutStartingLifecycle_shouldFail() { + @Test + public void autoDispose_withLifecycleProvider_withoutStartingLifecycle_shouldFail() { BehaviorSubject lifecycle = BehaviorSubject.create(); RecordingObserver o = new RecordingObserver<>(LOGGER); LifecycleScopeProvider provider = makeLifecycleProvider(lifecycle); @@ -197,7 +215,8 @@ public class AutoDisposeCompletableObserverTest { assertThat(o.takeError()).isInstanceOf(LifecycleNotStartedException.class); } - @Test public void autoDispose_withLifecycleProvider_afterLifecycle_shouldFail() { + @Test + public void autoDispose_withLifecycleProvider_afterLifecycle_shouldFail() { BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); lifecycle.onNext(1); lifecycle.onNext(2); @@ -212,9 +231,11 @@ public class AutoDisposeCompletableObserverTest { assertThat(o.takeError()).isInstanceOf(LifecycleEndedException.class); } - @Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { + @Test + public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer() { - @Override public void accept(OutsideLifecycleException e) throws Exception { } + @Override + public void accept(OutsideLifecycleException e) throws Exception { } }); BehaviorSubject lifecycle = BehaviorSubject.create(); TestObserver o = new TestObserver<>(); @@ -229,9 +250,11 @@ public class AutoDisposeCompletableObserverTest { o.assertNoErrors(); } - @Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { + @Test + public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer() { - @Override public void accept(OutsideLifecycleException e) throws Exception { + @Override + public void accept(OutsideLifecycleException e) throws Exception { // Noop } }); @@ -251,9 +274,11 @@ public class AutoDisposeCompletableObserverTest { o.assertNoErrors(); } - @Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithWrappedExp() { + @Test + public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithWrappedExp() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer() { - @Override public void accept(OutsideLifecycleException e) throws Exception { + @Override + public void accept(OutsideLifecycleException e) throws Exception { // Wrap in an IllegalStateException so we can verify this is the exception we see on the // other side throw new IllegalStateException(e); @@ -268,20 +293,59 @@ public class AutoDisposeCompletableObserverTest { o.assertNoValues(); o.assertError(new Predicate() { - @Override public boolean test(Throwable throwable) throws Exception { + @Override + public boolean test(Throwable throwable) throws Exception { return throwable instanceof IllegalStateException && throwable.getCause() instanceof OutsideLifecycleException; } }); } - @Test public void verifyCancellation() throws Exception { + @Test + public void verifyObserverDelegate() { + final AtomicReference atomicObserver = new AtomicReference<>(); + final AtomicReference atomicAutoDisposingObserver + = new AtomicReference<>(); + try { + RxJavaPlugins.setOnCompletableSubscribe(new BiFunction() { + @Override public CompletableObserver apply( + Completable source, + CompletableObserver observer) { + if (atomicObserver.get() == null) { + atomicObserver.set(observer); + } else if (atomicAutoDisposingObserver.get() == null) { + atomicAutoDisposingObserver.set(observer); + RxJavaPlugins.setOnObservableSubscribe(null); + } + return observer; + } + }); + Completable.complete().to(new CompletableScoper(Maybe.never())).subscribe(); + + assertThat(atomicAutoDisposingObserver.get()).isNotNull(); + assertThat(atomicAutoDisposingObserver.get()) + .isInstanceOf(AutoDisposingCompletableObserver.class); + assertThat(((AutoDisposingCompletableObserver) atomicAutoDisposingObserver.get()) + .delegateObserver()).isNotNull(); + assertThat(((AutoDisposingCompletableObserver) atomicAutoDisposingObserver.get()) + .delegateObserver()).isSameAs(atomicObserver.get()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void verifyCancellation() throws Exception { final AtomicInteger i = new AtomicInteger(); //noinspection unchecked because Java Completable source = Completable.create(new CompletableOnSubscribe() { - @Override public void subscribe(CompletableEmitter e) throws Exception { + @Override + public void subscribe(CompletableEmitter e) throws Exception { e.setCancellable(new Cancellable() { - @Override public void cancel() throws Exception { + @Override + public void cancel() throws Exception { i.incrementAndGet(); } }); diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeMaybeObserverTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeMaybeObserverTest.java index f79e2fd8d..b9959b136 100755 --- a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeMaybeObserverTest.java +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeMaybeObserverTest.java @@ -17,16 +17,24 @@ package com.uber.autodispose; import com.uber.autodispose.test.RecordingObserver; +import com.uber.autodispose.observers.AutoDisposingMaybeObserver; + import io.reactivex.Maybe; import io.reactivex.MaybeEmitter; +import io.reactivex.MaybeObserver; import io.reactivex.MaybeOnSubscribe; +import io.reactivex.functions.BiFunction; import io.reactivex.functions.Cancellable; import io.reactivex.functions.Consumer; import io.reactivex.functions.Predicate; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subjects.BehaviorSubject; import io.reactivex.subjects.MaybeSubject; + import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + import org.junit.After; import org.junit.Test; @@ -37,16 +45,19 @@ public class AutoDisposeMaybeObserverTest { private static final RecordingObserver.Logger LOGGER = new RecordingObserver.Logger() { - @Override public void log(String message) { + @Override + public void log(String message) { System.out.println(AutoDisposeMaybeObserverTest.class.getSimpleName() + ": " + message); } }; - @After public void resetPlugins() { + @After + public void resetPlugins() { AutoDisposePlugins.reset(); } - @Test public void autoDispose_withMaybe_normal() { + @Test + public void autoDispose_withMaybe_normal() { RecordingObserver o = new RecordingObserver<>(LOGGER); MaybeSubject source = MaybeSubject.create(); MaybeSubject lifecycle = MaybeSubject.create(); @@ -67,23 +78,27 @@ public class AutoDisposeMaybeObserverTest { assertThat(lifecycle.hasObservers()).isFalse(); } - @Test public void autoDispose_withSuperClassGenerics_compilesFine() { + @Test + public void autoDispose_withSuperClassGenerics_compilesFine() { Maybe.just(new BClass()) .to(new MaybeScoper(Maybe.never())) .subscribe(new Consumer() { - @Override public void accept(AClass aClass) throws Exception { + @Override + public void accept(AClass aClass) throws Exception { } }); } - @Test public void autoDispose_noGenericsOnEmpty_isFine() { + @Test + public void autoDispose_noGenericsOnEmpty_isFine() { Maybe.just(new BClass()) .to(new MaybeScoper<>(Maybe.never())) .subscribe(); } - @Test public void autoDispose_withMaybe_interrupted() { + @Test + public void autoDispose_withMaybe_interrupted() { RecordingObserver o = new RecordingObserver<>(LOGGER); MaybeSubject source = MaybeSubject.create(); MaybeSubject lifecycle = MaybeSubject.create(); @@ -91,7 +106,8 @@ public class AutoDisposeMaybeObserverTest { .subscribe(o); source.to(new MaybeScoper(lifecycle)) .subscribe(new Consumer() { - @Override public void accept(Integer integer) throws Exception { + @Override + public void accept(Integer integer) throws Exception { } }); @@ -110,7 +126,8 @@ public class AutoDisposeMaybeObserverTest { o.assertNoMoreEvents(); } - @Test public void autoDispose_withProvider_success() { + @Test + public void autoDispose_withProvider_success() { RecordingObserver o = new RecordingObserver<>(LOGGER); MaybeSubject source = MaybeSubject.create(); BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); @@ -135,7 +152,8 @@ public class AutoDisposeMaybeObserverTest { assertThat(lifecycle.hasObservers()).isFalse(); } - @Test public void autoDispose_withProvider_completion() { + @Test + public void autoDispose_withProvider_completion() { RecordingObserver o = new RecordingObserver<>(LOGGER); MaybeSubject source = MaybeSubject.create(); MaybeSubject scope = MaybeSubject.create(); @@ -155,7 +173,8 @@ public class AutoDisposeMaybeObserverTest { assertThat(scope.hasObservers()).isFalse(); } - @Test public void autoDispose_withProvider_interrupted() { + @Test + public void autoDispose_withProvider_interrupted() { RecordingObserver o = new RecordingObserver<>(LOGGER); MaybeSubject source = MaybeSubject.create(); MaybeSubject scope = MaybeSubject.create(); @@ -178,7 +197,8 @@ public class AutoDisposeMaybeObserverTest { o.assertNoMoreEvents(); } - @Test public void autoDispose_withLifecycleProvider_completion() { + @Test + public void autoDispose_withLifecycleProvider_completion() { RecordingObserver o = new RecordingObserver<>(LOGGER); MaybeSubject source = MaybeSubject.create(); BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); @@ -203,7 +223,8 @@ public class AutoDisposeMaybeObserverTest { assertThat(lifecycle.hasObservers()).isFalse(); } - @Test public void autoDispose_withLifecycleProvider_interrupted() { + @Test + public void autoDispose_withLifecycleProvider_interrupted() { RecordingObserver o = new RecordingObserver<>(LOGGER); MaybeSubject source = MaybeSubject.create(); BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); @@ -231,7 +252,8 @@ public class AutoDisposeMaybeObserverTest { o.assertNoMoreEvents(); } - @Test public void autoDispose_withLifecycleProvider_withoutStartingLifecycle_shouldFail() { + @Test + public void autoDispose_withLifecycleProvider_withoutStartingLifecycle_shouldFail() { BehaviorSubject lifecycle = BehaviorSubject.create(); RecordingObserver o = new RecordingObserver<>(LOGGER); LifecycleScopeProvider provider = makeLifecycleProvider(lifecycle); @@ -243,7 +265,8 @@ public class AutoDisposeMaybeObserverTest { assertThat(o.takeError()).isInstanceOf(LifecycleNotStartedException.class); } - @Test public void autoDispose_withLifecycleProvider_afterLifecycle_shouldFail() { + @Test + public void autoDispose_withLifecycleProvider_afterLifecycle_shouldFail() { BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); lifecycle.onNext(1); lifecycle.onNext(2); @@ -258,9 +281,11 @@ public class AutoDisposeMaybeObserverTest { assertThat(o.takeError()).isInstanceOf(LifecycleEndedException.class); } - @Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { + @Test + public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer() { - @Override public void accept(OutsideLifecycleException e) throws Exception { } + @Override + public void accept(OutsideLifecycleException e) throws Exception { } }); BehaviorSubject lifecycle = BehaviorSubject.create(); TestObserver o = new TestObserver<>(); @@ -275,9 +300,11 @@ public class AutoDisposeMaybeObserverTest { o.assertNoErrors(); } - @Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { + @Test + public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer() { - @Override public void accept(OutsideLifecycleException e) throws Exception { + @Override + public void accept(OutsideLifecycleException e) throws Exception { // Noop } }); @@ -297,9 +324,11 @@ public class AutoDisposeMaybeObserverTest { o.assertNoErrors(); } - @Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithWrappedExp() { + @Test + public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithWrappedExp() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer() { - @Override public void accept(OutsideLifecycleException e) throws Exception { + @Override + public void accept(OutsideLifecycleException e) throws Exception { // Wrap in an IllegalStateException so we can verify this is the exception we see on the // other side throw new IllegalStateException(e); @@ -314,20 +343,53 @@ public class AutoDisposeMaybeObserverTest { o.assertNoValues(); o.assertError(new Predicate() { - @Override public boolean test(Throwable throwable) throws Exception { + @Override + public boolean test(Throwable throwable) throws Exception { return throwable instanceof IllegalStateException && throwable.getCause() instanceof OutsideLifecycleException; } }); } - @Test public void verifyCancellation() throws Exception { + @Test + public void verifyObserverDelegate() { + final AtomicReference atomicObserver = new AtomicReference<>(); + final AtomicReference atomicAutoDisposingObserver = new AtomicReference<>(); + try { + RxJavaPlugins.setOnMaybeSubscribe(new BiFunction() { + @Override public MaybeObserver apply(Maybe source, MaybeObserver observer) { + if (atomicObserver.get() == null) { + atomicObserver.set(observer); + } else if (atomicAutoDisposingObserver.get() == null) { + atomicAutoDisposingObserver.set(observer); + RxJavaPlugins.setOnObservableSubscribe(null); + } + return observer; + } + }); + Maybe.just(1).to(new MaybeScoper(Maybe.never())).subscribe(); + + assertThat(atomicAutoDisposingObserver.get()).isNotNull(); + assertThat(atomicAutoDisposingObserver.get()).isInstanceOf(AutoDisposingMaybeObserver.class); + assertThat(((AutoDisposingMaybeObserver) atomicAutoDisposingObserver.get()) + .delegateObserver()).isNotNull(); + assertThat(((AutoDisposingMaybeObserver) atomicAutoDisposingObserver.get()) + .delegateObserver()).isSameAs(atomicObserver.get()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void verifyCancellation() throws Exception { final AtomicInteger i = new AtomicInteger(); //noinspection unchecked because Java Maybe source = Maybe.create(new MaybeOnSubscribe() { - @Override public void subscribe(MaybeEmitter e) throws Exception { + @Override + public void subscribe(MaybeEmitter e) throws Exception { e.setCancellable(new Cancellable() { - @Override public void cancel() throws Exception { + @Override + public void cancel() throws Exception { i.incrementAndGet(); } }); diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeObserverTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeObserverTest.java index cdfeff6cc..2871b10f4 100755 --- a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeObserverTest.java +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeObserverTest.java @@ -17,19 +17,24 @@ package com.uber.autodispose; import com.uber.autodispose.test.RecordingObserver; +import com.uber.autodispose.observers.AutoDisposingObserver; import io.reactivex.Maybe; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; +import io.reactivex.Observer; import io.reactivex.disposables.Disposable; +import io.reactivex.functions.BiFunction; import io.reactivex.functions.Cancellable; import io.reactivex.functions.Consumer; import io.reactivex.functions.Predicate; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subjects.BehaviorSubject; import io.reactivex.subjects.MaybeSubject; import io.reactivex.subjects.PublishSubject; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Test; @@ -256,6 +261,34 @@ public class AutoDisposeObserverTest { }); } + @Test public void verifyObserverDelegate() { + final AtomicReference atomicObserver = new AtomicReference(); + final AtomicReference atomicAutoDisposingObserver = new AtomicReference(); + try { + RxJavaPlugins.setOnObservableSubscribe(new BiFunction() { + @Override public Observer apply(Observable source, Observer observer) { + if (atomicObserver.get() == null) { + atomicObserver.set(observer); + } else if (atomicAutoDisposingObserver.get() == null) { + atomicAutoDisposingObserver.set(observer); + RxJavaPlugins.setOnObservableSubscribe(null); + } + return observer; + } + }); + Observable.just(1).to(new ObservableScoper(Maybe.never())).subscribe(); + + assertThat(atomicAutoDisposingObserver.get()).isNotNull(); + assertThat(atomicAutoDisposingObserver.get()).isInstanceOf(AutoDisposingObserver.class); + assertThat(((AutoDisposingObserver) atomicAutoDisposingObserver.get()).delegateObserver()) + .isNotNull(); + assertThat(((AutoDisposingObserver) atomicAutoDisposingObserver.get()).delegateObserver()) + .isSameAs(atomicObserver.get()); + } finally { + RxJavaPlugins.reset(); + } + } + @Test public void verifyCancellation() throws Exception { final AtomicInteger i = new AtomicInteger(); //noinspection unchecked because Java diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSingleObserverTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSingleObserverTest.java index 78a677dab..7f137fb61 100755 --- a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSingleObserverTest.java +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSingleObserverTest.java @@ -17,18 +17,26 @@ package com.uber.autodispose; import com.uber.autodispose.test.RecordingObserver; +import com.uber.autodispose.observers.AutoDisposingSingleObserver; + import io.reactivex.Maybe; import io.reactivex.Single; import io.reactivex.SingleEmitter; +import io.reactivex.SingleObserver; import io.reactivex.SingleOnSubscribe; +import io.reactivex.functions.BiFunction; import io.reactivex.functions.Cancellable; import io.reactivex.functions.Consumer; import io.reactivex.functions.Predicate; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subjects.BehaviorSubject; import io.reactivex.subjects.MaybeSubject; import io.reactivex.subjects.SingleSubject; + import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + import org.junit.After; import org.junit.Test; @@ -39,16 +47,19 @@ public class AutoDisposeSingleObserverTest { private static final RecordingObserver.Logger LOGGER = new RecordingObserver.Logger() { - @Override public void log(String message) { + @Override + public void log(String message) { System.out.println(AutoDisposeSingleObserverTest.class.getSimpleName() + ": " + message); } }; - @After public void resetPlugins() { + @After + public void resetPlugins() { AutoDisposePlugins.reset(); } - @Test public void autoDispose_withMaybe_normal() { + @Test + public void autoDispose_withMaybe_normal() { RecordingObserver o = new RecordingObserver<>(LOGGER); SingleSubject source = SingleSubject.create(); MaybeSubject lifecycle = MaybeSubject.create(); @@ -69,23 +80,27 @@ public class AutoDisposeSingleObserverTest { assertThat(lifecycle.hasObservers()).isFalse(); } - @Test public void autoDispose_withSuperClassGenerics_compilesFine() { + @Test + public void autoDispose_withSuperClassGenerics_compilesFine() { Single.just(new BClass()) .to(new SingleScoper(Maybe.never())) .subscribe(new Consumer() { - @Override public void accept(AClass aClass) throws Exception { + @Override + public void accept(AClass aClass) throws Exception { } }); } - @Test public void autoDispose_noGenericsOnEmpty_isFine() { + @Test + public void autoDispose_noGenericsOnEmpty_isFine() { Single.just(new BClass()) .to(new SingleScoper<>(Maybe.never())) .subscribe(); } - @Test public void autoDispose_withMaybe_interrupted() { + @Test + public void autoDispose_withMaybe_interrupted() { RecordingObserver o = new RecordingObserver<>(LOGGER); SingleSubject source = SingleSubject.create(); MaybeSubject lifecycle = MaybeSubject.create(); @@ -106,7 +121,8 @@ public class AutoDisposeSingleObserverTest { o.assertNoMoreEvents(); } - @Test public void autoDispose_withProvider() { + @Test + public void autoDispose_withProvider() { RecordingObserver o = new RecordingObserver<>(LOGGER); SingleSubject source = SingleSubject.create(); MaybeSubject scope = MaybeSubject.create(); @@ -127,7 +143,8 @@ public class AutoDisposeSingleObserverTest { assertThat(scope.hasObservers()).isFalse(); } - @Test public void autoDispose_withProvider_interrupted() { + @Test + public void autoDispose_withProvider_interrupted() { RecordingObserver o = new RecordingObserver<>(LOGGER); SingleSubject source = SingleSubject.create(); MaybeSubject scope = MaybeSubject.create(); @@ -149,7 +166,8 @@ public class AutoDisposeSingleObserverTest { o.assertNoMoreEvents(); } - @Test public void autoDispose_withLifecycleProvider() { + @Test + public void autoDispose_withLifecycleProvider() { RecordingObserver o = new RecordingObserver<>(LOGGER); SingleSubject source = SingleSubject.create(); BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); @@ -175,7 +193,8 @@ public class AutoDisposeSingleObserverTest { assertThat(lifecycle.hasObservers()).isFalse(); } - @Test public void autoDispose_withLifecycleProvider_interrupted() { + @Test + public void autoDispose_withLifecycleProvider_interrupted() { RecordingObserver o = new RecordingObserver<>(LOGGER); SingleSubject source = SingleSubject.create(); BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); @@ -202,7 +221,8 @@ public class AutoDisposeSingleObserverTest { o.assertNoMoreEvents(); } - @Test public void autoDispose_withProvider_withoutStartingLifecycle_shouldFail() { + @Test + public void autoDispose_withProvider_withoutStartingLifecycle_shouldFail() { BehaviorSubject lifecycle = BehaviorSubject.create(); RecordingObserver o = new RecordingObserver<>(LOGGER); LifecycleScopeProvider provider = makeLifecycleProvider(lifecycle); @@ -214,7 +234,8 @@ public class AutoDisposeSingleObserverTest { assertThat(o.takeError()).isInstanceOf(LifecycleNotStartedException.class); } - @Test public void autoDispose_withProvider_afterLifecycle_shouldFail() { + @Test + public void autoDispose_withProvider_afterLifecycle_shouldFail() { BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); lifecycle.onNext(1); lifecycle.onNext(2); @@ -229,9 +250,11 @@ public class AutoDisposeSingleObserverTest { assertThat(o.takeError()).isInstanceOf(LifecycleEndedException.class); } - @Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { + @Test + public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer() { - @Override public void accept(OutsideLifecycleException e) throws Exception { } + @Override + public void accept(OutsideLifecycleException e) throws Exception { } }); BehaviorSubject lifecycle = BehaviorSubject.create(); TestObserver o = new TestObserver<>(); @@ -246,9 +269,11 @@ public class AutoDisposeSingleObserverTest { o.assertNoErrors(); } - @Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { + @Test + public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer() { - @Override public void accept(OutsideLifecycleException e) throws Exception { + @Override + public void accept(OutsideLifecycleException e) throws Exception { // Noop } }); @@ -268,9 +293,11 @@ public class AutoDisposeSingleObserverTest { o.assertNoErrors(); } - @Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithExp() { + @Test + public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithExp() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer() { - @Override public void accept(OutsideLifecycleException e) throws Exception { + @Override + public void accept(OutsideLifecycleException e) throws Exception { // Wrap in an IllegalStateException so we can verify this is the exception we see on the // other side throw new IllegalStateException(e); @@ -285,20 +312,53 @@ public class AutoDisposeSingleObserverTest { o.assertNoValues(); o.assertError(new Predicate() { - @Override public boolean test(Throwable throwable) throws Exception { + @Override + public boolean test(Throwable throwable) throws Exception { return throwable instanceof IllegalStateException && throwable.getCause() instanceof OutsideLifecycleException; } }); } - @Test public void verifyCancellation() throws Exception { + @Test + public void verifyObserverDelegate() { + final AtomicReference atomicObserver = new AtomicReference<>(); + final AtomicReference atomicAutoDisposingObserver = new AtomicReference<>(); + try { + RxJavaPlugins.setOnSingleSubscribe(new BiFunction() { + @Override public SingleObserver apply(Single source, SingleObserver observer) { + if (atomicObserver.get() == null) { + atomicObserver.set(observer); + } else if (atomicAutoDisposingObserver.get() == null) { + atomicAutoDisposingObserver.set(observer); + RxJavaPlugins.setOnObservableSubscribe(null); + } + return observer; + } + }); + Single.just(1).to(new SingleScoper(Maybe.never())).subscribe(); + + assertThat(atomicAutoDisposingObserver.get()).isNotNull(); + assertThat(atomicAutoDisposingObserver.get()).isInstanceOf(AutoDisposingSingleObserver.class); + assertThat(((AutoDisposingSingleObserver) atomicAutoDisposingObserver.get()) + .delegateObserver()).isNotNull(); + assertThat(((AutoDisposingSingleObserver) atomicAutoDisposingObserver.get()) + .delegateObserver()).isSameAs(atomicObserver.get()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void verifyCancellation() throws Exception { final AtomicInteger i = new AtomicInteger(); //noinspection unchecked because Java Single source = Single.create(new SingleOnSubscribe() { - @Override public void subscribe(SingleEmitter e) throws Exception { + @Override + public void subscribe(SingleEmitter e) throws Exception { e.setCancellable(new Cancellable() { - @Override public void cancel() throws Exception { + @Override + public void cancel() throws Exception { i.incrementAndGet(); } }); diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSubscriberTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSubscriberTest.java index 45655244e..179ce0cbe 100755 --- a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSubscriberTest.java +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSubscriberTest.java @@ -16,21 +16,28 @@ package com.uber.autodispose; +import com.uber.autodispose.observers.AutoDisposingSubscriber; + import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.FlowableEmitter; import io.reactivex.FlowableOnSubscribe; import io.reactivex.Maybe; import io.reactivex.disposables.Disposable; +import io.reactivex.functions.BiFunction; import io.reactivex.functions.Cancellable; import io.reactivex.functions.Consumer; import io.reactivex.functions.Predicate; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.subjects.BehaviorSubject; import io.reactivex.subjects.MaybeSubject; import io.reactivex.subscribers.TestSubscriber; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.reactivestreams.Subscriber; + import org.junit.After; import org.junit.Test; @@ -263,6 +270,36 @@ public class AutoDisposeSubscriberTest { }); } + @Test public void verifySubscriberDelegate() { + final AtomicReference atomicSubscriber = new AtomicReference<>(); + final AtomicReference atomicAutoDisposingSubscriber = new AtomicReference<>(); + try { + RxJavaPlugins.setOnFlowableSubscribe(new BiFunction() { + @Override public Subscriber apply(Flowable source, Subscriber subscriber) { + if (atomicSubscriber.get() == null) { + System.out.println(subscriber.getClass().toString()); + atomicSubscriber.set(subscriber); + } else if (atomicAutoDisposingSubscriber.get() == null) { + System.out.println(subscriber.getClass().toString()); + atomicAutoDisposingSubscriber.set(subscriber); + RxJavaPlugins.setOnFlowableSubscribe(null); + } + return subscriber; + } + }); + Flowable.just(1).to(new FlowableScoper(Maybe.never())).subscribe(); + + assertThat(atomicAutoDisposingSubscriber.get()).isNotNull(); + assertThat(atomicAutoDisposingSubscriber.get()).isInstanceOf(AutoDisposingSubscriber.class); + assertThat(((AutoDisposingSubscriber) atomicAutoDisposingSubscriber.get()) + .delegateSubscriber()).isNotNull(); + assertThat(((AutoDisposingSubscriber) atomicAutoDisposingSubscriber.get()) + .delegateSubscriber()).isSameAs(atomicSubscriber.get()); + } finally { + RxJavaPlugins.reset(); + } + } + @Test public void verifyCancellation() throws Exception { final AtomicInteger i = new AtomicInteger(); //noinspection unchecked because Java