diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java index e410733888..59872e836e 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java @@ -13,14 +13,14 @@ package io.reactivex.internal.operators.flowable; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; import org.reactivestreams.*; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.BiFunction; -import io.reactivex.internal.subscriptions.*; -import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.subscribers.SerializedSubscriber; public final class FlowableWithLatestFrom extends AbstractFlowableWithUpstream { @@ -37,6 +37,8 @@ protected void subscribeActual(Subscriber s) { final SerializedSubscriber serial = new SerializedSubscriber(s); final WithLatestFromSubscriber wlf = new WithLatestFromSubscriber(serial, combiner); + serial.onSubscribe(wlf); + other.subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { @@ -73,6 +75,8 @@ static final class WithLatestFromSubscriber extends AtomicReference final AtomicReference s = new AtomicReference(); + final AtomicLong requested = new AtomicLong(); + final AtomicReference other = new AtomicReference(); WithLatestFromSubscriber(Subscriber actual, BiFunction combiner) { @@ -81,9 +85,7 @@ static final class WithLatestFromSubscriber extends AtomicReference } @Override public void onSubscribe(Subscription s) { - if (SubscriptionHelper.setOnce(this.s, s)) { - actual.onSubscribe(this); - } + SubscriptionHelper.deferredSetOnce(this.s, requested, s); } @Override @@ -92,7 +94,7 @@ public void onNext(T t) { if (u != null) { R r; try { - r = combiner.apply(t, u); + r = ObjectHelper.requireNonNull(combiner.apply(t, u), "The combiner returned a null value"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancel(); @@ -117,12 +119,12 @@ public void onComplete() { @Override public void request(long n) { - s.get().request(n); + SubscriptionHelper.deferredRequest(s, requested, n); } @Override public void cancel() { - s.get().cancel(); + SubscriptionHelper.cancel(s); SubscriptionHelper.cancel(other); } @@ -131,16 +133,8 @@ public boolean setOther(Subscription o) { } public void otherError(Throwable e) { - if (s.compareAndSet(null, SubscriptionHelper.CANCELLED)) { - EmptySubscription.error(e, actual); - } else { - if (s.get() != SubscriptionHelper.CANCELLED) { - cancel(); - actual.onError(e); - } else { - RxJavaPlugins.onError(e); - } - } + SubscriptionHelper.cancel(s); + actual.onError(e); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFrom.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFrom.java index cf36a60641..3f79625910 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFrom.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFrom.java @@ -20,6 +20,7 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.BiFunction; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.observers.SerializedObserver; public final class ObservableWithLatestFrom extends AbstractObservableWithUpstream { @@ -37,7 +38,7 @@ public void subscribeActual(Observer t) { final SerializedObserver serial = new SerializedObserver(t); final WithLatestFromObserver wlf = new WithLatestFromObserver(serial, combiner); - t.onSubscribe(wlf); + serial.onSubscribe(wlf); other.subscribe(new Observer() { @Override @@ -91,7 +92,7 @@ public void onNext(T t) { if (u != null) { R r; try { - r = combiner.apply(t, u); + r = ObjectHelper.requireNonNull(combiner.apply(t, u), "The combiner returned a null value"); } catch (Throwable e) { Exceptions.throwIfFatal(e); dispose(); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java index 9b06cfb156..026f4b863e 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java @@ -681,4 +681,31 @@ public Integer apply(Integer a, Integer b) throws Exception { RxJavaPlugins.reset(); } } + + @Test + public void combineToNull1() { + Flowable.just(1) + .withLatestFrom(Flowable.just(2), new BiFunction() { + @Override + public Object apply(Integer a, Integer b) throws Exception { + return null; + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void combineToNull2() { + Flowable.just(1) + .withLatestFrom(Arrays.asList(Flowable.just(2), Flowable.just(3)), new Function() { + @Override + public Object apply(Object[] o) throws Exception { + return null; + } + }) + .test() + .assertFailure(NullPointerException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromTest.java index b4f19c95d7..dae8cfb615 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromTest.java @@ -21,9 +21,9 @@ import org.junit.*; import org.mockito.InOrder; +import io.reactivex.*; import io.reactivex.Observable; import io.reactivex.Observer; -import io.reactivex.TestHelper; import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; @@ -620,4 +620,31 @@ public Object apply(Integer a, Integer b, Integer c) throws Exception { RxJavaPlugins.reset(); } } + + @Test + public void combineToNull1() { + Observable.just(1) + .withLatestFrom(Observable.just(2), new BiFunction() { + @Override + public Object apply(Integer a, Integer b) throws Exception { + return null; + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void combineToNull2() { + Observable.just(1) + .withLatestFrom(Arrays.asList(Observable.just(2), Observable.just(3)), new Function() { + @Override + public Object apply(Object[] o) throws Exception { + return null; + } + }) + .test() + .assertFailure(NullPointerException.class); + } } \ No newline at end of file