diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index d2f187bef3..3f31491b08 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -5507,9 +5507,9 @@ public final Observable distinctUntilChanged(Func2 doOnCompleted(final Action0 onCompleted) { Action1 onNext = Actions.empty(); Action1 onError = Actions.empty(); - Observer observer = new ActionSubscriber(onNext, onError, onCompleted); + Observer observer = new ActionObserver(onNext, onError, onCompleted); - return lift(new OperatorDoOnEach(observer)); + return create(new OnSubscribeDoOnEach(this, observer)); } /** @@ -5531,8 +5531,7 @@ public final Observable doOnCompleted(final Action0 onCompleted) { */ public final Observable doOnEach(final Action1> onNotification) { Observer observer = new ActionNotificationObserver(onNotification); - - return lift(new OperatorDoOnEach(observer)); + return create(new OnSubscribeDoOnEach(this, observer)); } /** @@ -5559,7 +5558,7 @@ public final Observable doOnEach(final Action1> onNot * @see ReactiveX operators documentation: Do */ public final Observable doOnEach(Observer observer) { - return lift(new OperatorDoOnEach(observer)); + return create(new OnSubscribeDoOnEach(this, observer)); } /** @@ -5585,9 +5584,9 @@ public final Observable doOnEach(Observer observer) { public final Observable doOnError(final Action1 onError) { Action1 onNext = Actions.empty(); Action0 onCompleted = Actions.empty(); - Observer observer = new ActionSubscriber(onNext, onError, onCompleted); + Observer observer = new ActionObserver(onNext, onError, onCompleted); - return lift(new OperatorDoOnEach(observer)); + return create(new OnSubscribeDoOnEach(this, observer)); } /** @@ -5610,9 +5609,9 @@ public final Observable doOnError(final Action1 onError) { public final Observable doOnNext(final Action1 onNext) { Action1 onError = Actions.empty(); Action0 onCompleted = Actions.empty(); - Observer observer = new ActionSubscriber(onNext, onError, onCompleted); + Observer observer = new ActionObserver(onNext, onError, onCompleted); - return lift(new OperatorDoOnEach(observer)); + return create(new OnSubscribeDoOnEach(this, observer)); } /** @@ -5693,9 +5692,9 @@ public final Observable doOnTerminate(final Action0 onTerminate) { Action1 onNext = Actions.empty(); Action1 onError = Actions.toAction1(onTerminate); - Observer observer = new ActionSubscriber(onNext, onError, onTerminate); + Observer observer = new ActionObserver(onNext, onError, onTerminate); - return lift(new OperatorDoOnEach(observer)); + return create(new OnSubscribeDoOnEach(this, observer)); } /** diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index c1a89043a6..198dba876c 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -2413,7 +2413,7 @@ public void onNext(T t) { } }; - return lift(new OperatorDoOnEach(observer)); + return Observable.create(new OnSubscribeDoOnEach(this.toObservable(), observer)).toSingle(); } /** @@ -2449,7 +2449,7 @@ public void onNext(T t) { } }; - return lift(new OperatorDoOnEach(observer)); + return Observable.create(new OnSubscribeDoOnEach(this.toObservable(), observer)).toSingle(); } /** diff --git a/src/main/java/rx/internal/operators/OnSubscribeDoOnEach.java b/src/main/java/rx/internal/operators/OnSubscribeDoOnEach.java new file mode 100644 index 0000000000..34694ed311 --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeDoOnEach.java @@ -0,0 +1,104 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import java.util.Arrays; + +import rx.*; +import rx.Observable.OnSubscribe; +import rx.exceptions.*; +import rx.plugins.RxJavaHooks; + +/** + * Calls specified actions for each notification. + * + * @param the value type + */ +public class OnSubscribeDoOnEach implements OnSubscribe { + private final Observer doOnEachObserver; + private final Observable source; + + public OnSubscribeDoOnEach(Observable source, Observer doOnEachObserver) { + this.source = source; + this.doOnEachObserver = doOnEachObserver; + } + + @Override + public void call(final Subscriber subscriber) { + source.unsafeSubscribe(new DoOnEachSubscriber(subscriber, doOnEachObserver)); + } + + private static final class DoOnEachSubscriber extends Subscriber { + + private final Subscriber subscriber; + private final Observer doOnEachObserver; + + private boolean done; + + DoOnEachSubscriber(Subscriber subscriber, Observer doOnEachObserver) { + super(subscriber); + this.subscriber = subscriber; + this.doOnEachObserver = doOnEachObserver; + } + + @Override + public void onCompleted() { + if (done) { + return; + } + try { + doOnEachObserver.onCompleted(); + } catch (Throwable e) { + Exceptions.throwOrReport(e, this); + return; + } + // Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer + done = true; + subscriber.onCompleted(); + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaHooks.onError(e); + return; + } + done = true; + try { + doOnEachObserver.onError(e); + } catch (Throwable e2) { + Exceptions.throwIfFatal(e2); + subscriber.onError(new CompositeException(Arrays.asList(e, e2))); + return; + } + subscriber.onError(e); + } + + @Override + public void onNext(T value) { + if (done) { + return; + } + try { + doOnEachObserver.onNext(value); + } catch (Throwable e) { + Exceptions.throwOrReport(e, this, value); + return; + } + subscriber.onNext(value); + } + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/operators/OperatorDoOnEach.java b/src/main/java/rx/internal/operators/OperatorDoOnEach.java deleted file mode 100644 index 0e3034a425..0000000000 --- a/src/main/java/rx/internal/operators/OperatorDoOnEach.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.internal.operators; - -import java.util.Arrays; - -import rx.*; -import rx.Observable.Operator; -import rx.exceptions.*; - -/** - * Converts the elements of an observable sequence to the specified type. - * @param the value type - */ -public class OperatorDoOnEach implements Operator { - final Observer doOnEachObserver; - - public OperatorDoOnEach(Observer doOnEachObserver) { - this.doOnEachObserver = doOnEachObserver; - } - - @Override - public Subscriber call(final Subscriber observer) { - return new Subscriber(observer) { - - private boolean done; - - @Override - public void onCompleted() { - if (done) { - return; - } - try { - doOnEachObserver.onCompleted(); - } catch (Throwable e) { - Exceptions.throwOrReport(e, this); - return; - } - // Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer - done = true; - observer.onCompleted(); - } - - @Override - public void onError(Throwable e) { - // need to throwIfFatal since we swallow errors after terminated - Exceptions.throwIfFatal(e); - if (done) { - return; - } - done = true; - try { - doOnEachObserver.onError(e); - } catch (Throwable e2) { - Exceptions.throwIfFatal(e2); - observer.onError(new CompositeException(Arrays.asList(e, e2))); - return; - } - observer.onError(e); - } - - @Override - public void onNext(T value) { - if (done) { - return; - } - try { - doOnEachObserver.onNext(value); - } catch (Throwable e) { - Exceptions.throwOrReport(e, this, value); - return; - } - observer.onNext(value); - } - }; - } -} \ No newline at end of file diff --git a/src/main/java/rx/internal/util/ActionObserver.java b/src/main/java/rx/internal/util/ActionObserver.java new file mode 100644 index 0000000000..c7a17791bd --- /dev/null +++ b/src/main/java/rx/internal/util/ActionObserver.java @@ -0,0 +1,51 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.util; + +import rx.Observer; +import rx.functions.*; + +/** + * An Observer that forwards the onXXX method calls to callbacks. + * @param the value type + */ +public final class ActionObserver implements Observer { + + final Action1 onNext; + final Action1 onError; + final Action0 onCompleted; + + public ActionObserver(Action1 onNext, Action1 onError, Action0 onCompleted) { + this.onNext = onNext; + this.onError = onError; + this.onCompleted = onCompleted; + } + + @Override + public void onNext(T t) { + onNext.call(t); + } + + @Override + public void onError(Throwable e) { + onError.call(e); + } + + @Override + public void onCompleted() { + onCompleted.call(); + } +} \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java b/src/test/java/rx/internal/operators/OnSubscribeDoOnEachTest.java similarity index 62% rename from src/test/java/rx/internal/operators/OperatorDoOnEachTest.java rename to src/test/java/rx/internal/operators/OnSubscribeDoOnEachTest.java index 3c4cf9f9bb..f9997c3827 100644 --- a/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeDoOnEachTest.java @@ -19,18 +19,23 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; +import java.util.Arrays; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.*; import org.mockito.*; import rx.*; +import rx.Observable.OnSubscribe; import rx.exceptions.*; import rx.functions.*; import rx.observers.TestSubscriber; +import rx.plugins.RxJavaHooks; -public class OperatorDoOnEachTest { +public class OnSubscribeDoOnEachTest { @Mock Observer subscribedObserver; @@ -219,4 +224,146 @@ public void call(Throwable e) { assertTrue(exceptions.get(0) instanceof TestException); assertTrue(exceptions.get(1) instanceof TestException); } + + @Test + public void testIfOnNextActionFailsEmitsErrorAndDoesNotFollowWithCompleted() { + TestSubscriber ts = TestSubscriber.create(); + final RuntimeException e1 = new RuntimeException(); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber subscriber) { + subscriber.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 0) { + subscriber.onNext(1); + subscriber.onCompleted(); + } + }}); + }}) + .doOnNext(new Action1() { + + @Override + public void call(Integer t) { + throw e1; + }}) + .unsafeSubscribe(ts); + ts.assertNoValues(); + ts.assertError(e1); + ts.assertNotCompleted(); + } + + @Test + public void testIfOnNextActionFailsEmitsErrorAndDoesNotFollowWithOnNext() { + TestSubscriber ts = TestSubscriber.create(); + final RuntimeException e1 = new RuntimeException(); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber subscriber) { + subscriber.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 2) { + subscriber.onNext(1); + subscriber.onNext(2); + } + }}); + }}) + .doOnNext(new Action1() { + + @Override + public void call(Integer t) { + throw e1; + }}) + .unsafeSubscribe(ts); + ts.assertNoValues(); + assertEquals(1, ts.getOnErrorEvents().size()); + ts.assertNotCompleted(); + } + + @Test + public void testIfOnNextActionFailsEmitsErrorAndReportsMoreErrorsToRxJavaHooksNotDownstream() { + try { + final List list= new CopyOnWriteArrayList(); + RxJavaHooks.setOnError(new Action1() { + + @Override + public void call(Throwable e) { + list.add(e); + }}); + TestSubscriber ts = TestSubscriber.create(); + final RuntimeException e1 = new RuntimeException(); + final RuntimeException e2 = new RuntimeException(); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber subscriber) { + subscriber.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 2) { + subscriber.onNext(1); + subscriber.onError(e2); + } + } + }); + } + }).doOnNext(new Action1() { + + @Override + public void call(Integer t) { + throw e1; + } + }).unsafeSubscribe(ts); + ts.assertNoValues(); + assertEquals(1, ts.getOnErrorEvents().size()); + ts.assertNotCompleted(); + assertEquals(Arrays.asList(e2), list); + } finally { + RxJavaHooks.reset(); + } + } + + @Test + public void testIfCompleteActionFailsEmitsError() { + TestSubscriber ts = TestSubscriber.create(); + final RuntimeException e1 = new RuntimeException(); + Observable.empty() + .doOnCompleted(new Action0() { + + @Override + public void call() { + throw e1; + }}) + .unsafeSubscribe(ts); + ts.assertNoValues(); + ts.assertError(e1); + ts.assertNotCompleted(); + } + + @Test + public void testUnsubscribe() { + TestSubscriber ts = TestSubscriber.create(0); + final AtomicBoolean unsub = new AtomicBoolean(); + Observable.just(1,2,3,4) + .doOnUnsubscribe(new Action0() { + + @Override + public void call() { + unsub.set(true); + }}) + .doOnNext(Actions.empty()) + .subscribe(ts); + ts.requestMore(1); + ts.unsubscribe(); + ts.assertNotCompleted(); + ts.assertValueCount(1); + assertTrue(unsub.get()); + } + } \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OnSubscribeReduceTest.java b/src/test/java/rx/internal/operators/OnSubscribeReduceTest.java index 7b0f55f6f3..eddc465189 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeReduceTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeReduceTest.java @@ -273,5 +273,4 @@ public Integer call(Integer a, Integer b) { ts.assertError(NoSuchElementException.class); } - }