diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java index 6782e4282e..e51c6435e5 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java @@ -21,6 +21,7 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.BiFunction; import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.ConditionalSubscriber; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.subscribers.SerializedSubscriber; @@ -45,7 +46,8 @@ protected void subscribeActual(Subscriber s) { source.subscribe(wlf); } - static final class WithLatestFromSubscriber extends AtomicReference implements FlowableSubscriber, Subscription { + static final class WithLatestFromSubscriber extends AtomicReference + implements ConditionalSubscriber, Subscription { private static final long serialVersionUID = -312246233408980075L; @@ -69,6 +71,13 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { + if (!tryOnNext(t)) { + s.get().request(1); + } + } + + @Override + public boolean tryOnNext(T t) { U u = get(); if (u != null) { R r; @@ -78,11 +87,12 @@ public void onNext(T t) { Exceptions.throwIfFatal(e); cancel(); actual.onError(e); - return; + return false; } actual.onNext(r); - } else{ - request(1); + return true; + } else { + return false; } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java index 2108dadf1a..874bf92ccf 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java @@ -19,10 +19,10 @@ import io.reactivex.*; import io.reactivex.annotations.*; -import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.ConditionalSubscriber; import io.reactivex.internal.subscriptions.*; import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; @@ -95,7 +95,7 @@ protected void subscribeActual(Subscriber s) { static final class WithLatestFromSubscriber extends AtomicInteger - implements FlowableSubscriber, Subscription { + implements ConditionalSubscriber, Subscription { private static final long serialVersionUID = 1577321883966341961L; @@ -133,7 +133,7 @@ void subscribe(Publisher[] others, int n) { WithLatestInnerSubscriber[] subscribers = this.subscribers; AtomicReference s = this.s; for (int i = 0; i < n; i++) { - if (SubscriptionHelper.isCancelled(s.get()) || done) { + if (SubscriptionHelper.isCancelled(s.get())) { return; } others[i].subscribe(subscribers[i]); @@ -147,8 +147,15 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { + if (!tryOnNext(t) && !done) { + s.get().request(1); + } + } + + @Override + public boolean tryOnNext(T t) { if (done) { - return; + return false; } AtomicReferenceArray ara = values; int n = ara.length(); @@ -159,8 +166,7 @@ public void onNext(T t) { Object o = ara.get(i); if (o == null) { // somebody hasn't signalled yet, skip this T - s.get().request(1); - return; + return false; } objects[i + 1] = o; } @@ -173,10 +179,11 @@ public void onNext(T t) { Exceptions.throwIfFatal(ex); cancel(); onError(ex); - return; + return false; } HalfSerializer.onNext(actual, v, this, error); + return true; } @Override @@ -207,7 +214,7 @@ public void request(long n) { @Override public void cancel() { SubscriptionHelper.cancel(s); - for (Disposable s : subscribers) { + for (WithLatestInnerSubscriber s : subscribers) { s.dispose(); } } @@ -226,6 +233,7 @@ void innerError(int index, Throwable t) { void innerComplete(int index, boolean nonEmpty) { if (!nonEmpty) { done = true; + SubscriptionHelper.cancel(s); cancelAllBut(index); HalfSerializer.onComplete(actual, this, error); } @@ -243,7 +251,7 @@ void cancelAllBut(int index) { static final class WithLatestInnerSubscriber extends AtomicReference - implements FlowableSubscriber, Disposable { + implements FlowableSubscriber { private static final long serialVersionUID = 3256684027868224024L; @@ -283,13 +291,7 @@ public void onComplete() { parent.innerComplete(index, hasValue); } - @Override - public boolean isDisposed() { - return SubscriptionHelper.isCancelled(get()); - } - - @Override - public void dispose() { + void dispose() { SubscriptionHelper.cancel(this); } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java index c3dc8bbaa4..01ca2f1edf 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java @@ -726,7 +726,7 @@ public void zeroOtherCombinerReturnsNull() { } @Test - public void testSingleRequestNotForgottenWhenNoData() { + public void singleRequestNotForgottenWhenNoData() { PublishProcessor source = PublishProcessor.create(); PublishProcessor other = PublishProcessor.create(); @@ -750,4 +750,116 @@ public void testSingleRequestNotForgottenWhenNoData() { ts.assertValue((2 << 8) + 1); } + + @Test + public void coldSourceConsumedWithoutOther() { + Flowable.range(1, 10).withLatestFrom(Flowable.never(), + new BiFunction() { + @Override + public Object apply(Integer a, Object b) throws Exception { + return a; + } + }) + .test(1) + .assertResult(); + } + + @Test + public void coldSourceConsumedWithoutManyOthers() { + Flowable.range(1, 10).withLatestFrom(Flowable.never(), Flowable.never(), Flowable.never(), + new Function4() { + @Override + public Object apply(Integer a, Object b, Object c, Object d) throws Exception { + return a; + } + }) + .test(1) + .assertResult(); + } + + @Test + public void otherOnSubscribeRace() { + for (int i = 0; i < 1000; i++) { + final PublishProcessor pp0 = PublishProcessor.create(); + final PublishProcessor pp1 = PublishProcessor.create(); + final PublishProcessor pp2 = PublishProcessor.create(); + final PublishProcessor pp3 = PublishProcessor.create(); + + final Flowable source = pp0.withLatestFrom(pp1, pp2, pp3, new Function4() { + @Override + public Object apply(Object a, Integer b, Integer c, Integer d) + throws Exception { + return a; + } + }); + + final TestSubscriber ts = new TestSubscriber(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + source.subscribe(ts); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertEmpty(); + + assertFalse(pp0.hasSubscribers()); + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + assertFalse(pp3.hasSubscribers()); + } + } + + @Test + public void otherCompleteRace() { + for (int i = 0; i < 1000; i++) { + final PublishProcessor pp0 = PublishProcessor.create(); + final PublishProcessor pp1 = PublishProcessor.create(); + final PublishProcessor pp2 = PublishProcessor.create(); + final PublishProcessor pp3 = PublishProcessor.create(); + + final Flowable source = pp0.withLatestFrom(pp1, pp2, pp3, new Function4() { + @Override + public Object apply(Object a, Integer b, Integer c, Integer d) + throws Exception { + return a; + } + }); + + final TestSubscriber ts = new TestSubscriber(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + source.subscribe(ts); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + pp1.onComplete(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertResult(); + + assertFalse(pp0.hasSubscribers()); + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + assertFalse(pp3.hasSubscribers()); + } + } }