diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 7a2d91a2af..73dd40eb67 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -6230,7 +6230,7 @@ public final Observable onErrorResumeNext(final Func1ReactiveX operators documentation: Catch */ public final Observable onErrorResumeNext(final Observable resumeSequence) { - return lift(new OperatorOnErrorResumeNextViaObservable(resumeSequence)); + return lift(OperatorOnErrorResumeNextViaFunction.withOther(resumeSequence)); } /** @@ -6260,7 +6260,7 @@ public final Observable onErrorResumeNext(final Observable resum * @see ReactiveX operators documentation: Catch */ public final Observable onErrorReturn(Func1 resumeFunction) { - return lift(new OperatorOnErrorReturn(resumeFunction)); + return lift(OperatorOnErrorResumeNextViaFunction.withSingle(resumeFunction)); } /** @@ -6296,7 +6296,7 @@ public final Observable onErrorReturn(Func1 resumeFun * @see ReactiveX operators documentation: Catch */ public final Observable onExceptionResumeNext(final Observable resumeSequence) { - return lift(new OperatorOnExceptionResumeNextViaObservable(resumeSequence)); + return lift(OperatorOnErrorResumeNextViaFunction.withException(resumeSequence)); } /** diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index a768779a4d..dfab031332 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -1411,7 +1411,7 @@ public final Single observeOn(Scheduler scheduler) { * @see ReactiveX operators documentation: Catch */ public final Single onErrorReturn(Func1 resumeFunction) { - return lift(new OperatorOnErrorReturn(resumeFunction)); + return lift(OperatorOnErrorResumeNextViaFunction.withSingle(resumeFunction)); } /** diff --git a/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java b/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java index b12c10d391..48a03ea30b 100644 --- a/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java +++ b/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java @@ -45,6 +45,36 @@ public final class OperatorOnErrorResumeNextViaFunction implements Operator> resumeFunction; + public static OperatorOnErrorResumeNextViaFunction withSingle(final Func1 resumeFunction) { + return new OperatorOnErrorResumeNextViaFunction(new Func1>() { + @Override + public Observable call(Throwable t) { + return Observable.just(resumeFunction.call(t)); + } + }); + } + + public static OperatorOnErrorResumeNextViaFunction withOther(final Observable other) { + return new OperatorOnErrorResumeNextViaFunction(new Func1>() { + @Override + public Observable call(Throwable t) { + return other; + } + }); + } + + public static OperatorOnErrorResumeNextViaFunction withException(final Observable other) { + return new OperatorOnErrorResumeNextViaFunction(new Func1>() { + @Override + public Observable call(Throwable t) { + if (t instanceof Exception) { + return other; + } + return Observable.error(t); + } + }); + } + public OperatorOnErrorResumeNextViaFunction(Func1> f) { this.resumeFunction = f; } @@ -52,10 +82,14 @@ public OperatorOnErrorResumeNextViaFunction(Func1 call(final Subscriber child) { final ProducerArbiter pa = new ProducerArbiter(); + final SerialSubscription ssub = new SerialSubscription(); + Subscriber parent = new Subscriber() { - private boolean done = false; + private boolean done; + + long produced; @Override public void onCompleted() { @@ -70,12 +104,13 @@ public void onCompleted() { public void onError(Throwable e) { if (done) { Exceptions.throwIfFatal(e); + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); return; } done = true; try { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); unsubscribe(); + Subscriber next = new Subscriber() { @Override public void onNext(T t) { @@ -96,7 +131,13 @@ public void setProducer(Producer producer) { }; ssub.set(next); + long p = produced; + if (p != 0L) { + pa.produced(p); + } + Observable resume = resumeFunction.call(e); + resume.unsafeSubscribe(next); } catch (Throwable e2) { Exceptions.throwOrReport(e2, child); @@ -108,6 +149,7 @@ public void onNext(T t) { if (done) { return; } + produced++; child.onNext(t); } @@ -117,9 +159,11 @@ public void setProducer(final Producer producer) { } }; - child.add(ssub); ssub.set(parent); + + child.add(ssub); child.setProducer(pa); + return parent; } diff --git a/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java b/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java deleted file mode 100644 index 3e8afcea00..0000000000 --- a/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package rx.internal.operators; - -import rx.Observable; -import rx.Producer; -import rx.Observable.Operator; -import rx.Subscriber; -import rx.exceptions.Exceptions; -import rx.plugins.RxJavaPlugins; - -/** - * Instruct an Observable to pass control to another Observable rather than invoking - * onError if it encounters an error. - *

- * - *

- * By default, when an Observable encounters an error that prevents it from emitting the expected item to its - * Observer, the Observable invokes its Observer's {@code onError} method, and then quits without invoking any - * more of its Observer's methods. The {@code onErrorResumeNext} operation changes this behavior. If you pass - * an Observable ({@code resumeSequence}) to {@code onErrorResumeNext}, if the source Observable encounters an - * error, instead of invoking its Observer's {@code onError} method, it will instead relinquish control to this - * new Observable, which will invoke the Observer's {@code onNext} method if it is able to do so. In such a - * case, because no Observable necessarily invokes {@code onError}, the Observer may never know that an error - * happened. - *

- * You can use this to prevent errors from propagating or to supply fallback data should errors be - * encountered. - * - * @param the value type - */ -public final class OperatorOnErrorResumeNextViaObservable implements Operator { - final Observable resumeSequence; - - public OperatorOnErrorResumeNextViaObservable(Observable resumeSequence) { - this.resumeSequence = resumeSequence; - } - - @Override - public Subscriber call(final Subscriber child) { - // shared subscription won't work here - Subscriber s = new Subscriber() { - - private boolean done = false; - - @Override - public void onNext(T t) { - if (done) { - return; - } - child.onNext(t); - } - - @Override - public void onError(Throwable e) { - if (done) { - Exceptions.throwIfFatal(e); - return; - } - done = true; - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); - unsubscribe(); - resumeSequence.unsafeSubscribe(child); - } - - @Override - public void onCompleted() { - if (done) { - return; - } - done = true; - child.onCompleted(); - } - - @Override - public void setProducer(final Producer producer) { - child.setProducer(new Producer() { - @Override - public void request(long n) { - producer.request(n); - } - }); - } - - }; - child.add(s); - - return s; - } - -} diff --git a/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java b/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java deleted file mode 100644 index 3830f591fd..0000000000 --- a/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package rx.internal.operators; - -import java.util.Arrays; - -import rx.Observable.Operator; -import rx.Producer; -import rx.Subscriber; -import rx.exceptions.CompositeException; -import rx.exceptions.Exceptions; -import rx.functions.Func1; -import rx.plugins.RxJavaPlugins; - -/** - * Instruct an Observable to emit a particular item to its Observer's onNext method - * rather than invoking onError if it encounters an error. - *

- * - *

- * By default, when an Observable encounters an error that prevents it from emitting the expected - * item to its Observer, the Observable invokes its Observer's onError method, and then - * quits without invoking any more of its Observer's methods. The onErrorReturn operation changes - * this behavior. If you pass a function (resumeFunction) to onErrorReturn, if the original - * Observable encounters an error, instead of invoking its Observer's onError method, - * it will instead pass the return value of resumeFunction to the Observer's onNext - * method. - *

- * You can use this to prevent errors from propagating or to supply fallback data should errors be - * encountered. - * - * @param the value type - */ -public final class OperatorOnErrorReturn implements Operator { - final Func1 resultFunction; - - public OperatorOnErrorReturn(Func1 resultFunction) { - this.resultFunction = resultFunction; - } - - @Override - public Subscriber call(final Subscriber child) { - Subscriber parent = new Subscriber() { - - private boolean done = false; - - @Override - public void onNext(T t) { - if (done) { - return; - } - child.onNext(t); - } - - @Override - public void onError(Throwable e) { - if (done) { - Exceptions.throwIfFatal(e); - return; - } - done = true; - try { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); - unsubscribe(); - T result = resultFunction.call(e); - child.onNext(result); - } catch (Throwable x) { - Exceptions.throwIfFatal(x); - child.onError(new CompositeException(Arrays.asList(e, x))); - return; - } - child.onCompleted(); - } - - @Override - public void onCompleted() { - if (done) { - return; - } - done = true; - child.onCompleted(); - } - - @Override - public void setProducer(final Producer producer) { - child.setProducer(new Producer() { - @Override - public void request(long n) { - producer.request(n); - } - }); - } - - }; - child.add(parent); - return parent; - } -} diff --git a/src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java b/src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java deleted file mode 100644 index be76097443..0000000000 --- a/src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package rx.internal.operators; - -import rx.Observable; -import rx.Producer; -import rx.Observable.Operator; -import rx.Subscriber; -import rx.exceptions.Exceptions; -import rx.plugins.RxJavaPlugins; - -/** - * Instruct an Observable to pass control to another Observable rather than invoking - * onError if it encounters an error of type {@link java.lang.Exception}. - *

- * This differs from {@link Observable#onErrorResumeNext} in that this one does not handle - * {@link java.lang.Throwable} or {@link java.lang.Error} but lets those continue through. - *

- * - *

- * By default, when an Observable encounters an error that prevents it from emitting the expected - * item to its Observer, the Observable invokes its Observer's onError method, and - * then quits without invoking any more of its Observer's methods. The onErrorResumeNext operation - * changes this behavior. If you pass an Observable (resumeSequence) to onErrorResumeNext, if the - * source Observable encounters an error, instead of invoking its Observer's onError - * method, it will instead relinquish control to this new Observable, which will invoke the - * Observer's onNext method if it is able to do so. In such a case, because no - * Observable necessarily invokes onError, the Observer may never know that an error - * happened. - *

- * You can use this to prevent errors from propagating or to supply fallback data should errors be - * encountered. - * - * @param the value type - */ -public final class OperatorOnExceptionResumeNextViaObservable implements Operator { - final Observable resumeSequence; - - public OperatorOnExceptionResumeNextViaObservable(Observable resumeSequence) { - this.resumeSequence = resumeSequence; - } - - @Override - public Subscriber call(final Subscriber child) { - // needs to independently unsubscribe so child can continue with the resume - Subscriber s = new Subscriber() { - - private boolean done = false; - - @Override - public void onNext(T t) { - if (done) { - return; - } - child.onNext(t); - } - - @Override - public void onError(Throwable e) { - if (done) { - Exceptions.throwIfFatal(e); - return; - } - done = true; - if (e instanceof Exception) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); - unsubscribe(); - resumeSequence.unsafeSubscribe(child); - } else { - child.onError(e); - } - } - - @Override - public void onCompleted() { - if (done) { - return; - } - done = true; - child.onCompleted(); - } - - @Override - public void setProducer(final Producer producer) { - child.setProducer(new Producer() { - @Override - public void request(long n) { - producer.request(n); - } - }); - } - - }; - child.add(s); - - return s; - } - - -} diff --git a/src/test/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunctionTest.java b/src/test/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunctionTest.java index 1aab90867d..a7cee6966f 100644 --- a/src/test/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunctionTest.java +++ b/src/test/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunctionTest.java @@ -30,12 +30,14 @@ import rx.Observable; import rx.Observable.Operator; +import rx.exceptions.TestException; import rx.Observer; import rx.Subscriber; import rx.Subscription; import rx.functions.Func1; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; public class OperatorOnErrorResumeNextViaFunctionTest { @@ -344,4 +346,35 @@ public Integer call(Integer t1) { ts.awaitTerminalEvent(); ts.assertNoErrors(); } + + @Test + public void normalBackpressure() { + TestSubscriber ts = TestSubscriber.create(0); + + PublishSubject ps = PublishSubject.create(); + + ps.onErrorResumeNext(new Func1>() { + @Override + public Observable call(Throwable v) { + return Observable.range(3, 2); + } + }).subscribe(ts); + + ts.requestMore(2); + + ps.onNext(1); + ps.onNext(2); + ps.onError(new TestException("Forced failure")); + + ts.assertValues(1, 2); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(2); + + ts.assertValues(1, 2, 3, 4); + ts.assertNoErrors(); + ts.assertCompleted(); + } + } diff --git a/src/test/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservableTest.java b/src/test/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservableTest.java index 586c2b689d..d67e1d3814 100644 --- a/src/test/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservableTest.java +++ b/src/test/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservableTest.java @@ -26,12 +26,14 @@ import rx.Observable; import rx.Observable.OnSubscribe; +import rx.exceptions.TestException; import rx.Observer; import rx.Subscriber; import rx.Subscription; import rx.functions.Func1; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; public class OperatorOnErrorResumeNextViaObservableTest { @@ -221,4 +223,30 @@ public Integer call(Integer t1) { ts.awaitTerminalEvent(); ts.assertNoErrors(); } + + @Test + public void normalBackpressure() { + TestSubscriber ts = TestSubscriber.create(0); + + PublishSubject ps = PublishSubject.create(); + + ps.onErrorResumeNext(Observable.range(3, 2)).subscribe(ts); + + ts.requestMore(2); + + ps.onNext(1); + ps.onNext(2); + ps.onError(new TestException("Forced failure")); + + ts.assertValues(1, 2); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(2); + + ts.assertValues(1, 2, 3, 4); + ts.assertNoErrors(); + ts.assertCompleted(); + } + } diff --git a/src/test/java/rx/internal/operators/OperatorOnErrorReturnTest.java b/src/test/java/rx/internal/operators/OperatorOnErrorReturnTest.java index f74d5d93f4..4124d8d344 100644 --- a/src/test/java/rx/internal/operators/OperatorOnErrorReturnTest.java +++ b/src/test/java/rx/internal/operators/OperatorOnErrorReturnTest.java @@ -30,9 +30,11 @@ import rx.Observable; import rx.Observer; import rx.Subscriber; +import rx.exceptions.TestException; import rx.functions.Func1; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; public class OperatorOnErrorReturnTest { @@ -217,6 +219,33 @@ public void run() { } } - - + @Test + public void normalBackpressure() { + TestSubscriber ts = TestSubscriber.create(0); + + PublishSubject ps = PublishSubject.create(); + + ps.onErrorReturn(new Func1() { + @Override + public Integer call(Throwable e) { + return 3; + } + }).subscribe(ts); + + ts.requestMore(2); + + ps.onNext(1); + ps.onNext(2); + ps.onError(new TestException("Forced failure")); + + ts.assertValues(1, 2); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(2); + + ts.assertValues(1, 2, 3); + ts.assertNoErrors(); + ts.assertCompleted(); + } } diff --git a/src/test/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservableTest.java b/src/test/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservableTest.java index b447a7ab23..2ac3e6eadb 100644 --- a/src/test/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservableTest.java +++ b/src/test/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservableTest.java @@ -17,21 +17,17 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.*; import org.junit.Test; import org.mockito.Mockito; -import rx.Observable; -import rx.Observer; -import rx.Subscriber; +import rx.*; +import rx.exceptions.TestException; import rx.functions.Func1; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; public class OperatorOnExceptionResumeNextViaObservableTest { @@ -265,4 +261,29 @@ else if ("THROWABLE".equals(s)) System.out.println("done starting TestObservable thread"); } } + + @Test + public void normalBackpressure() { + TestSubscriber ts = TestSubscriber.create(0); + + PublishSubject ps = PublishSubject.create(); + + ps.onExceptionResumeNext(Observable.range(3, 2)).subscribe(ts); + + ts.requestMore(2); + + ps.onNext(1); + ps.onNext(2); + ps.onError(new TestException("Forced failure")); + + ts.assertValues(1, 2); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(2); + + ts.assertValues(1, 2, 3, 4); + ts.assertNoErrors(); + ts.assertCompleted(); + } }