diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableScan.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableScan.java index ed73e01841..154dcc0ea6 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableScan.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableScan.java @@ -19,6 +19,7 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.BiFunction; import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.plugins.RxJavaPlugins; public final class FlowableScan extends AbstractFlowableWithUpstream { final BiFunction accumulator; @@ -39,6 +40,8 @@ static final class ScanSubscriber implements Subscriber, Subscription { Subscription s; T value; + + boolean done; ScanSubscriber(Subscriber actual, BiFunction accumulator) { this.actual = actual; @@ -55,6 +58,9 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { + if (done) { + return; + } final Subscriber a = actual; T v = value; if (v == null) { @@ -68,7 +74,7 @@ public void onNext(T t) { } catch (Throwable e) { Exceptions.throwIfFatal(e); s.cancel(); - a.onError(e); + onError(e); return; } @@ -79,11 +85,20 @@ public void onNext(T t) { @Override public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; actual.onError(t); } @Override public void onComplete() { + if (done) { + return; + } + done = true; actual.onComplete(); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java index 9bf34b5f0f..bd7b413e17 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java @@ -21,6 +21,7 @@ import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscribers.SinglePostCompleteSubscriber; import io.reactivex.internal.subscriptions.EmptySubscription; +import io.reactivex.plugins.RxJavaPlugins; public final class FlowableScanSeed extends AbstractFlowableWithUpstream { final BiFunction accumulator; @@ -87,6 +88,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } done = true; diff --git a/src/test/java/io/reactivex/flowable/FlowableScanTests.java b/src/test/java/io/reactivex/flowable/FlowableScanTests.java index cbf9b4be4e..4f0556340d 100644 --- a/src/test/java/io/reactivex/flowable/FlowableScanTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableScanTests.java @@ -15,9 +15,13 @@ import static org.junit.Assert.assertEquals; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; import org.junit.Test; @@ -25,9 +29,11 @@ import io.reactivex.Flowable; import io.reactivex.flowable.FlowableEventStream.Event; import io.reactivex.functions.*; +import io.reactivex.plugins.RxJavaPlugins; public class FlowableScanTests { + @Test public void testUnsubscribeScan() { @@ -49,38 +55,120 @@ public void accept(HashMap v) { } @Test - public void testFlowableScanSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() { + public void testScanWithSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() { + final List list = new CopyOnWriteArrayList(); + Consumer errorConsumer = new Consumer() { + @Override + public void accept(Throwable t) throws Exception { + list.add(t); + }}; + try { + RxJavaPlugins.setErrorHandler(errorConsumer); + final RuntimeException e = new RuntimeException(); + final RuntimeException e2 = new RuntimeException(); + Burst.items(1).error(e2) + .scan(0, throwingBiFunction(e)) + .test() + .assertNoValues() + .assertError(e); + assertEquals(Arrays.asList(e2), list); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void testScanWithSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() { final RuntimeException e = new RuntimeException(); - Burst.item(1).error(e).scan(0, new BiFunction() { + Burst.item(1).create() + .scan(0, throwingBiFunction(e)) + .test() + .assertNoValues() + .assertError(e); + } + + @Test + public void testScanWithSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() { + final RuntimeException e = new RuntimeException(); + final AtomicInteger count = new AtomicInteger(); + Burst.items(1, 2).create().scan(0, new BiFunction() { @Override public Integer apply(Integer n1, Integer n2) throws Exception { + count.incrementAndGet(); throw e; }}) .test() .assertNoValues() .assertError(e); + assertEquals(1, count.get()); + } + + @Test + public void testScanWithSeedCompletesNormally() { + Flowable.just(1,2,3).scan(0, SUM) + .test() + .assertValues(0, 1, 3, 6) + .assertComplete(); } @Test - public void testFlowableScanSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() { + public void testScanWithSeedWhenScanSeedProviderThrows() { final RuntimeException e = new RuntimeException(); - Burst.item(1).create().scan(0, new BiFunction() { + Flowable.just(1,2,3).scanWith(throwingCallable(e), + SUM) + .test() + .assertError(e) + .assertNoValues(); + } + @Test + public void testScanNoSeed() { + Flowable.just(1, 2, 3) + .scan(SUM) + .test() + .assertValues(1, 3, 6) + .assertComplete(); + } + + @Test + public void testScanNoSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() { + final List list = new CopyOnWriteArrayList(); + Consumer errorConsumer = new Consumer() { @Override - public Integer apply(Integer n1, Integer n2) throws Exception { - throw e; - }}) + public void accept(Throwable t) throws Exception { + list.add(t); + }}; + try { + RxJavaPlugins.setErrorHandler(errorConsumer); + final RuntimeException e = new RuntimeException(); + final RuntimeException e2 = new RuntimeException(); + Burst.items(1, 2).error(e2) + .scan(throwingBiFunction(e)) + .test() + .assertValue(1) + .assertError(e); + assertEquals(Arrays.asList(e2), list); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void testScanNoSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() { + final RuntimeException e = new RuntimeException(); + Burst.items(1, 2).create() + .scan(throwingBiFunction(e)) .test() - .assertNoValues() + .assertValue(1) .assertError(e); } @Test - public void testFlowableScanSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() { + public void testScanNoSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() { final RuntimeException e = new RuntimeException(); final AtomicInteger count = new AtomicInteger(); - Burst.items(1, 2).create().scan(0, new BiFunction() { + Burst.items(1, 2, 3).create().scan(new BiFunction() { @Override public Integer apply(Integer n1, Integer n2) throws Exception { @@ -88,42 +176,34 @@ public Integer apply(Integer n1, Integer n2) throws Exception { throw e; }}) .test() - .assertNoValues() + .assertValue(1) .assertError(e); assertEquals(1, count.get()); } - @Test - public void testFlowableScanSeedCompletesNormally() { - Flowable.just(1,2,3).scan(0, new BiFunction() { - + private static BiFunction throwingBiFunction(final RuntimeException e) { + return new BiFunction() { @Override - public Integer apply(Integer t1, Integer t2) throws Exception { - return t1 + t2; - }}) - .test() - .assertValues(0, 1, 3, 6) - .assertComplete(); + public Integer apply(Integer n1, Integer n2) throws Exception { + throw e; + } + }; } + + private static final BiFunction SUM = new BiFunction() { + + @Override + public Integer apply(Integer t1, Integer t2) throws Exception { + return t1 + t2; + } + }; - @Test - public void testFlowableScanSeedWhenScanSeedProviderThrows() { - final RuntimeException e = new RuntimeException(); - Flowable.just(1,2,3).scanWith(new Callable() { + private static Callable throwingCallable(final RuntimeException e) { + return new Callable() { @Override public Integer call() throws Exception { throw e; } - }, - new BiFunction() { - - @Override - public Integer apply(Integer t1, Integer t2) throws Exception { - return t1 + t2; - } - }) - .test() - .assertError(e) - .assertNoValues(); + }; } } \ No newline at end of file