diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java index ebf87f0b45..928c0ffbd7 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java @@ -137,6 +137,8 @@ static final class MulticastProcessor extends Flowable implements Subscrib final AtomicReference[]> subscribers; final int prefetch; + + final int limit; final boolean delayError; @@ -148,10 +150,13 @@ static final class MulticastProcessor extends Flowable implements Subscrib volatile boolean done; Throwable error; + + int consumed; @SuppressWarnings("unchecked") MulticastProcessor(int prefetch, boolean delayError) { this.prefetch = prefetch; + this.limit = prefetch - (prefetch >> 2); // request after 75% consumption this.delayError = delayError; this.wip = new AtomicInteger(); this.s = new AtomicReference(); @@ -314,7 +319,11 @@ void drain() { int missed = 1; SimpleQueue q = queue; - + + int upstreamConsumed = consumed; + int localLimit = limit; + boolean canRequest = sourceMode != QueueSubscription.SYNC; + for (;;) { MulticastSubscription[] array = subscribers.get(); @@ -383,6 +392,11 @@ void drain() { } e++; + + if (canRequest && ++upstreamConsumed == localLimit) { + upstreamConsumed = 0; + s.get().request(localLimit); + } } if (e == r) { @@ -417,6 +431,7 @@ void drain() { } } + consumed = upstreamConsumed; missed = wip.addAndGet(-missed); if (missed == 0) { break; @@ -472,8 +487,10 @@ public void request(long n) { @Override public void cancel() { - getAndSet(Long.MIN_VALUE); - parent.remove(this); + if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) { + parent.remove(this); + parent.drain(); // unblock the others + } } public boolean isCancelled() { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishFunctionTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishFunctionTest.java index 29ad8ca192..b404f3ddc8 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishFunctionTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishFunctionTest.java @@ -26,7 +26,7 @@ import io.reactivex.*; import io.reactivex.exceptions.*; -import io.reactivex.functions.Function; +import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.processors.PublishProcessor; @@ -408,7 +408,7 @@ public Publisher apply(Flowable f) throws Exception { for (int i = 0; i < 500; i++) { source.test() - .awaitDone(5, TimeUnit.MILLISECONDS) + .awaitDone(5, TimeUnit.SECONDS) .assertResult(1); } } @@ -420,7 +420,7 @@ public void inputOutputSubscribeRace2() { for (int i = 0; i < 500; i++) { source.test() - .awaitDone(5, TimeUnit.MILLISECONDS) + .awaitDone(5, TimeUnit.SECONDS) .assertResult(1); } } @@ -459,4 +459,86 @@ public void run() { ts1.assertResult(1); } } + + @Test + public void longFlow() { + Flowable.range(1, 1000000) + .publish(new Function, Publisher>() { + @SuppressWarnings("unchecked") + @Override + public Publisher apply(Flowable v) throws Exception { + return Flowable.mergeArray( + v.filter(new Predicate() { + @Override + public boolean test(Integer w) throws Exception { + return w % 2 == 0; + } + }), + v.filter(new Predicate() { + @Override + public boolean test(Integer w) throws Exception { + return w % 2 != 0; + } + })); + } + }) + .takeLast(1) + .test() + .assertResult(1000000); + } + + @Test + public void longFlow2() { + Flowable.range(1, 100000) + .publish(new Function, Publisher>() { + @SuppressWarnings("unchecked") + @Override + public Publisher apply(Flowable v) throws Exception { + return Flowable.mergeArray( + v.filter(new Predicate() { + @Override + public boolean test(Integer w) throws Exception { + return w % 2 == 0; + } + }), + v.filter(new Predicate() { + @Override + public boolean test(Integer w) throws Exception { + return w % 2 != 0; + } + })); + } + }) + .test() + .assertValueCount(100000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void longFlowHidden() { + Flowable.range(1, 1000000).hide() + .publish(new Function, Publisher>() { + @SuppressWarnings("unchecked") + @Override + public Publisher apply(Flowable v) throws Exception { + return Flowable.mergeArray( + v.filter(new Predicate() { + @Override + public boolean test(Integer w) throws Exception { + return w % 2 == 0; + } + }), + v.filter(new Predicate() { + @Override + public boolean test(Integer w) throws Exception { + return w % 2 != 0; + } + })); + } + }) + .takeLast(1) + .test() + .assertResult(1000000); + } }