diff --git a/src/main/java/rx/internal/operators/OnSubscribeReduce.java b/src/main/java/rx/internal/operators/OnSubscribeReduce.java index ef910e88ac..5136aa376e 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeReduce.java +++ b/src/main/java/rx/internal/operators/OnSubscribeReduce.java @@ -22,6 +22,7 @@ import rx.Observable.OnSubscribe; import rx.exceptions.Exceptions; import rx.functions.Func2; +import rx.plugins.RxJavaHooks; public final class OnSubscribeReduce implements OnSubscribe { @@ -57,6 +58,8 @@ static final class ReduceSubscriber extends Subscriber { static final Object EMPTY = new Object(); + boolean done; + @SuppressWarnings("unchecked") public ReduceSubscriber(Subscriber actual, Func2 reducer) { this.actual = actual; @@ -68,6 +71,9 @@ public ReduceSubscriber(Subscriber actual, Func2 reducer) { @SuppressWarnings("unchecked") @Override public void onNext(T t) { + if (done) { + return; + } Object o = value; if (o == EMPTY) { value = t; @@ -77,19 +83,28 @@ public void onNext(T t) { } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); - actual.onError(ex); + onError(ex); } } } @Override public void onError(Throwable e) { - actual.onError(e); + if (!done) { + done = true; + actual.onError(e); + } else { + RxJavaHooks.onError(e); + } } @SuppressWarnings("unchecked") @Override public void onCompleted() { + if (done) { + return; + } + done = true; Object o = value; if (o != EMPTY) { actual.onNext((T)o); diff --git a/src/test/java/rx/internal/operators/OnSubscribeReduceTest.java b/src/test/java/rx/internal/operators/OnSubscribeReduceTest.java new file mode 100644 index 0000000000..b09f92c158 --- /dev/null +++ b/src/test/java/rx/internal/operators/OnSubscribeReduceTest.java @@ -0,0 +1,277 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Arrays; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import rx.*; +import rx.Observable.OnSubscribe; +import rx.Observer; +import rx.exceptions.TestException; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.internal.util.UtilityFunctions; +import rx.observers.TestSubscriber; +import rx.plugins.RxJavaHooks; + +public class OnSubscribeReduceTest { + @Mock + Observer observer; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + + Func2 sum = new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + }; + + @Test + public void testAggregateAsIntSum() { + + Observable result = Observable.just(1, 2, 3, 4, 5).reduce(0, sum).map(UtilityFunctions. identity()); + + result.subscribe(observer); + + verify(observer).onNext(1 + 2 + 3 + 4 + 5); + verify(observer).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + + @Test + public void testAggregateAsIntSumSourceThrows() { + Observable result = Observable.concat(Observable.just(1, 2, 3, 4, 5), + Observable. error(new TestException())) + .reduce(0, sum).map(UtilityFunctions. identity()); + + result.subscribe(observer); + + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + verify(observer, times(1)).onError(any(TestException.class)); + } + + @Test + public void testAggregateAsIntSumAccumulatorThrows() { + Func2 sumErr = new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + throw new TestException(); + } + }; + + Observable result = Observable.just(1, 2, 3, 4, 5) + .reduce(0, sumErr).map(UtilityFunctions. identity()); + + result.subscribe(observer); + + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + verify(observer, times(1)).onError(any(TestException.class)); + } + + @Test + public void testAggregateAsIntSumResultSelectorThrows() { + + Func1 error = new Func1() { + + @Override + public Integer call(Integer t1) { + throw new TestException(); + } + }; + + Observable result = Observable.just(1, 2, 3, 4, 5) + .reduce(0, sum).map(error); + + result.subscribe(observer); + + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + verify(observer, times(1)).onError(any(TestException.class)); + } + + @Test + public void testBackpressureWithNoInitialValue() throws InterruptedException { + Observable source = Observable.just(1, 2, 3, 4, 5, 6); + Observable reduced = source.reduce(sum); + + Integer r = reduced.toBlocking().first(); + assertEquals(21, r.intValue()); + } + + @Test + public void testBackpressureWithInitialValue() throws InterruptedException { + Observable source = Observable.just(1, 2, 3, 4, 5, 6); + Observable reduced = source.reduce(0, sum); + + Integer r = reduced.toBlocking().first(); + assertEquals(21, r.intValue()); + } + + @Test + public void testNoInitialValueDoesNotEmitMultipleTerminalEvents() { + TestSubscriber ts = TestSubscriber.create(); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber sub) { + sub.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 0) { + sub.onNext(1); + sub.onNext(2); + sub.onCompleted(); + } + } + }); + } + }) + .reduce(new Func2() { + + @Override + public Integer call(Integer a, Integer b) { + throw new RuntimeException("boo"); + }}) + .unsafeSubscribe(ts); + ts.assertError(RuntimeException.class); + ts.assertNotCompleted(); + } + + @Test + public void testNoInitialValueUpstreamEmitsMoreOnNextDespiteUnsubscription() { + TestSubscriber ts = TestSubscriber.create(); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber sub) { + sub.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 2) { + sub.onNext(1); + sub.onNext(2); + sub.onNext(3); + sub.onCompleted(); + } + } + }); + } + }) + .reduce(new Func2() { + boolean once = true; + + @Override + public Integer call(Integer a, Integer b) { + if (once) { + throw new RuntimeException("boo"); + } else { + once = false; + return a + b; + } + }}) + .unsafeSubscribe(ts); + ts.assertNoValues(); + ts.assertError(RuntimeException.class); + ts.assertNotCompleted(); + } + + @Test + public void testNoInitialValueDoesNotEmitMultipleErrorEventsAndReportsSecondErrorToHooks() { + try { + final List list = new CopyOnWriteArrayList(); + RxJavaHooks.setOnError(new Action1() { + + @Override + public void call(Throwable t) { + list.add(t); + } + }); + TestSubscriber ts = TestSubscriber.create(); + final RuntimeException e1 = new RuntimeException("e1"); + final Throwable e2 = new RuntimeException("e2"); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber sub) { + sub.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 1) { + sub.onNext(1); + sub.onNext(2); + sub.onError(e2); + } + } + }); + } + }) + .reduce(new Func2() { + + @Override + public Integer call(Integer a, Integer b) { + throw e1; + }}) + .unsafeSubscribe(ts); + ts.assertNotCompleted(); + System.out.println(ts.getOnErrorEvents()); + assertEquals(Arrays.asList(e1), ts.getOnErrorEvents()); + assertEquals(Arrays.asList(e2), list); + } finally { + RxJavaHooks.setOnError(null); + } + } + + + @Test + public void testNoInitialValueEmitsNoSuchElementExceptionIfEmptyStream() { + TestSubscriber ts = TestSubscriber.create(); + Observable.empty().reduce(new Func2() { + + @Override + public Integer call(Integer a, Integer b) { + return a + b; + } + }).subscribe(ts); + ts.assertError(NoSuchElementException.class); + } + + +} diff --git a/src/test/java/rx/internal/operators/OperatorReduceTest.java b/src/test/java/rx/internal/operators/OperatorReduceTest.java deleted file mode 100644 index c550c835ea..0000000000 --- a/src/test/java/rx/internal/operators/OperatorReduceTest.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package rx.internal.operators; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import rx.Observable; -import rx.Observer; -import rx.exceptions.TestException; -import rx.functions.Func1; -import rx.functions.Func2; -import rx.internal.util.UtilityFunctions; - -public class OperatorReduceTest { - @Mock - Observer observer; - - @Before - public void before() { - MockitoAnnotations.initMocks(this); - } - - Func2 sum = new Func2() { - @Override - public Integer call(Integer t1, Integer t2) { - return t1 + t2; - } - }; - - @Test - public void testAggregateAsIntSum() { - - Observable result = Observable.just(1, 2, 3, 4, 5).reduce(0, sum).map(UtilityFunctions. identity()); - - result.subscribe(observer); - - verify(observer).onNext(1 + 2 + 3 + 4 + 5); - verify(observer).onCompleted(); - verify(observer, never()).onError(any(Throwable.class)); - } - - @Test - public void testAggregateAsIntSumSourceThrows() { - Observable result = Observable.concat(Observable.just(1, 2, 3, 4, 5), - Observable. error(new TestException())) - .reduce(0, sum).map(UtilityFunctions. identity()); - - result.subscribe(observer); - - verify(observer, never()).onNext(any()); - verify(observer, never()).onCompleted(); - verify(observer, times(1)).onError(any(TestException.class)); - } - - @Test - public void testAggregateAsIntSumAccumulatorThrows() { - Func2 sumErr = new Func2() { - @Override - public Integer call(Integer t1, Integer t2) { - throw new TestException(); - } - }; - - Observable result = Observable.just(1, 2, 3, 4, 5) - .reduce(0, sumErr).map(UtilityFunctions. identity()); - - result.subscribe(observer); - - verify(observer, never()).onNext(any()); - verify(observer, never()).onCompleted(); - verify(observer, times(1)).onError(any(TestException.class)); - } - - @Test - public void testAggregateAsIntSumResultSelectorThrows() { - - Func1 error = new Func1() { - - @Override - public Integer call(Integer t1) { - throw new TestException(); - } - }; - - Observable result = Observable.just(1, 2, 3, 4, 5) - .reduce(0, sum).map(error); - - result.subscribe(observer); - - verify(observer, never()).onNext(any()); - verify(observer, never()).onCompleted(); - verify(observer, times(1)).onError(any(TestException.class)); - } - - @Test - public void testBackpressureWithNoInitialValue() throws InterruptedException { - Observable source = Observable.just(1, 2, 3, 4, 5, 6); - Observable reduced = source.reduce(sum); - - Integer r = reduced.toBlocking().first(); - assertEquals(21, r.intValue()); - } - - @Test - public void testBackpressureWithInitialValue() throws InterruptedException { - Observable source = Observable.just(1, 2, 3, 4, 5, 6); - Observable reduced = source.reduce(0, sum); - - Integer r = reduced.toBlocking().first(); - assertEquals(21, r.intValue()); - } - - - -}