Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,20 @@ final class AutoDisposingObserverImpl<T> implements AutoDisposingObserver<T> {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(d);
AutoDisposingObserverImpl.this.dispose();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be changed to AutoDisposableHelper.dispose(mainDisposable)

lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(d);
AutoDisposingObserverImpl.this.onError(e);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(d);
Copy link
Author

@duanbo1983 duanbo1983 Nov 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When subscribing to a completed ScopeProvider, callMainSubscribeIfNecessary() might be called here before the main source is subscribed, which triggers the DoubleSubscriptionsException.

lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
// Noop - we're unbound now
}
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,22 @@

package com.uber.autodispose;

import com.google.common.truth.BooleanSubject;
import com.uber.autodispose.observers.AutoDisposingObserver;
import com.uber.autodispose.test.RecordingObserver;

import org.junit.After;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.ProtocolViolationException;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Cancellable;
import io.reactivex.functions.Consumer;
Expand All @@ -32,10 +41,6 @@
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.MaybeSubject;
import io.reactivex.subjects.PublishSubject;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Test;

import static com.google.common.truth.Truth.assertThat;

Expand All @@ -49,6 +54,7 @@ public class AutoDisposeObserverTest {

@After public void resetPlugins() {
AutoDisposePlugins.reset();
RxJavaPlugins.reset();
}

@Test public void autoDispose_withMaybe_normal() {
Expand Down Expand Up @@ -171,6 +177,24 @@ public class AutoDisposeObserverTest {
assertThat(lifecycle.hasObservers()).isFalse();
}

@Test public void autoDispose_withScopeProviderCompleted_shouldNotReportDoubleSubscriptions() {
RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
assertThat(throwable instanceof ProtocolViolationException).isFalse();
Copy link
Author

@duanbo1983 duanbo1983 Nov 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case fails in master as:

Exception in thread "main" io.reactivex.exceptions.ProtocolViolationException: It is not allowed to subscribe with a(n) com.uber.autodispose.AutoDisposingObserverImpl multiple times. Please create a fresh instance of com.uber.autodispose.AutoDisposingObserverImpl and subscribe that to the target source instead.
	at com.uber.autodispose.AutoDisposeEndConsumerHelper.reportDoubleSubscription(AutoDisposeEndConsumerHelper.java:110)
	at com.uber.autodispose.AutoDisposeEndConsumerHelper.setOnce(AutoDisposeEndConsumerHelper.java:56)
	at com.uber.autodispose.AutoDisposingObserverImpl.onSubscribe(AutoDisposingObserverImpl.java:67)
	at io.reactivex.subjects.PublishSubject.subscribeActual(PublishSubject.java:85)
	at io.reactivex.Observable.subscribe(Observable.java:10842)
	at com.uber.autodispose.ObservableScoper$AutoDisposeObservable.subscribeActual(ObservableScoper.java:113)
	at io.reactivex.Observable.subscribe(Observable.java:10842)
	at com.uber.autodispose.ObservableScoper$1.subscribe(ObservableScoper.java:94)
	at com.uber.autodispose.AutoDisposeObserverTest.autoDispose_withScopeProviderCompleted_shouldNotReportDoubleSubscriptions(AutoDisposeObserverTest.java:192)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this test, as the ProtocolViolationException isn't obvious. Some sort of comment and asserting what you do expect should happen here

It also won't fail if this callback isn't hit, let's make it explicit somehow that we expect this callback to be hit

}
});
TestObserver<Integer> o = new TestObserver<>();
PublishSubject<Integer> source = PublishSubject.create();
MaybeSubject<Integer> scope = MaybeSubject.create();
scope.onComplete();
ScopeProvider scopeProvider = TestUtil.makeProvider(scope);
source.to(AutoDispose.with(scopeProvider).<Integer>forObservable())
.subscribe(o);
o.assertNoValues();
o.assertNoErrors();
}

@Test public void autoDispose_withProvider_withoutStartingLifecycle_shouldFail() {
BehaviorSubject<Integer> lifecycle = BehaviorSubject.create();
RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER);
Expand Down