diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index b63cdbc30b..ffa77e5360 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -1702,6 +1702,10 @@ public final Disposable subscribe(final Action onComplete, final Consumer + * If the Completable emits an error, it is wrapped into an + * {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException} + * and routed to the RxJavaPlugins.onError handler. + *

* If this Completable emits an error, it is sent to RxJavaPlugins.onError and gets swallowed. *

*
Scheduler:
diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 34067ffe55..5d7c336005 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -5559,6 +5559,11 @@ public final void blockingSubscribe() { /** * Subscribes to the source and calls the given callbacks on the current thread. + *

+ * If the Flowable emits an error, it is wrapped into an + * {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException} + * and routed to the RxJavaPlugins.onError handler. + *

*

*
Backpressure:
*
The operator consumes the source {@code Flowable} in an unbounded manner @@ -5572,7 +5577,7 @@ public final void blockingSubscribe() { @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @SchedulerSupport(SchedulerSupport.NONE) public final void blockingSubscribe(Consumer onNext) { - FlowableBlockingSubscribe.subscribe(this, onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION); + FlowableBlockingSubscribe.subscribe(this, onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION); } /** @@ -8983,6 +8988,10 @@ public final Disposable forEach(Consumer onNext) { /** * Subscribes to the {@link Publisher} and receives notifications for each element until the * onNext Predicate returns false. + *

+ * If the Flowable emits an error, it is wrapped into an + * {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException} + * and routed to the RxJavaPlugins.onError handler. *

*
Backpressure:
*
The operator consumes the source {@code Publisher} in an unbounded manner (i.e., no @@ -9003,7 +9012,7 @@ public final Disposable forEach(Consumer onNext) { @BackpressureSupport(BackpressureKind.NONE) @SchedulerSupport(SchedulerSupport.NONE) public final Disposable forEachWhile(Predicate onNext) { - return forEachWhile(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION); + return forEachWhile(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION); } /** @@ -12713,7 +12722,9 @@ public final Flowable strict() { /** * Subscribes to a Publisher and ignores {@code onNext} and {@code onComplete} emissions. *

- * If the Flowable emits an error, it is routed to the RxJavaPlugins.onError handler. + * If the Flowable emits an error, it is wrapped into an + * {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException} + * and routed to the RxJavaPlugins.onError handler. *

*
Backpressure:
*
The operator consumes the source {@code Publisher} in an unbounded manner (i.e., no @@ -12729,14 +12740,16 @@ public final Flowable strict() { @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe() { - return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, + return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE); } /** * Subscribes to a Publisher and provides a callback to handle the items it emits. *

- * If the Flowable emits an error, it is routed to the RxJavaPlugins.onError handler. + * If the Flowable emits an error, it is wrapped into an + * {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException} + * and routed to the RxJavaPlugins.onError handler. *

*
Backpressure:
*
The operator consumes the source {@code Publisher} in an unbounded manner (i.e., no @@ -12757,7 +12770,7 @@ public final Disposable subscribe() { @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer onNext) { - return subscribe(onNext, Functions.ERROR_CONSUMER, + return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE); } diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 72948d969e..7fef4bf868 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -3592,7 +3592,9 @@ public final Maybe retryWhen( /** * Subscribes to a Maybe and ignores {@code onSuccess} and {@code onComplete} emissions. *

- * If the Maybe emits an error, it is routed to the RxJavaPlugins.onError handler. + * If the Maybe emits an error, it is wrapped into an + * {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException} + * and routed to the RxJavaPlugins.onError handler. *

*
Scheduler:
*
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
@@ -3604,13 +3606,15 @@ public final Maybe retryWhen( */ @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe() { - return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION); + return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION); } /** * Subscribes to a Maybe and provides a callback to handle the items it emits. *

- * If the Maybe emits an error, it is routed to the RxJavaPlugins.onError handler. + * If the Maybe emits an error, it is wrapped into an + * {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException} + * and routed to the RxJavaPlugins.onError handler. *

*
Scheduler:
*
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
@@ -3627,7 +3631,7 @@ public final Disposable subscribe() { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer onSuccess) { - return subscribe(onSuccess, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION); + return subscribe(onSuccess, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION); } /** diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 04518e176e..059612d444 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -4917,6 +4917,10 @@ public final void blockingSubscribe() { /** * Subscribes to the source and calls the given callbacks on the current thread. + *

+ * If the Observable emits an error, it is wrapped into an + * {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException} + * and routed to the RxJavaPlugins.onError handler. *

*
Scheduler:
*
{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.
@@ -4926,7 +4930,7 @@ public final void blockingSubscribe() { */ @SchedulerSupport(SchedulerSupport.NONE) public final void blockingSubscribe(Consumer onNext) { - ObservableBlockingSubscribe.subscribe(this, onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION); + ObservableBlockingSubscribe.subscribe(this, onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION); } /** @@ -6761,7 +6765,7 @@ public final Observable doFinally(Action onFinally) { * The action is shared between subscriptions and thus may be called concurrently from multiple * threads; the action must be thread safe. *

- * If the action throws a runtime exception, that exception is rethrown by the {@code unsubscribe()} call, + * If the action throws a runtime exception, that exception is rethrown by the {@code dispose()} call, * sometimes as a {@code CompositeException} if there were multiple exceptions along the way. *

* Note that terminal events trigger the action unless the {@code ObservableSource} is subscribed to via {@code unsafeSubscribe()}. @@ -7785,6 +7789,10 @@ public final Disposable forEach(Consumer onNext) { /** * Subscribes to the {@link ObservableSource} and receives notifications for each element until the * onNext Predicate returns false. + *

+ * If the Observable emits an error, it is wrapped into an + * {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException} + * and routed to the RxJavaPlugins.onError handler. *

*
Scheduler:
*
{@code forEachWhile} does not operate by default on a particular {@link Scheduler}.
@@ -7801,7 +7809,7 @@ public final Disposable forEach(Consumer onNext) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable forEachWhile(Predicate onNext) { - return forEachWhile(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION); + return forEachWhile(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION); } /** @@ -10536,7 +10544,9 @@ public final Observable startWithArray(T... items) { /** * Subscribes to an ObservableSource and ignores {@code onNext} and {@code onComplete} emissions. *

- * If the Observable emits an error, it is routed to the RxJavaPlugins.onError handler. + * If the Observable emits an error, it is wrapped into an + * {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException} + * and routed to the RxJavaPlugins.onError handler. *

*
Scheduler:
*
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
@@ -10548,13 +10558,15 @@ public final Observable startWithArray(T... items) { */ @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe() { - return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer()); + return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } /** * Subscribes to an ObservableSource and provides a callback to handle the items it emits. *

- * If the Observable emits an error, it is routed to the RxJavaPlugins.onError handler. + * If the Observable emits an error, it is wrapped into an + * {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException} + * and routed to the RxJavaPlugins.onError handler. *

*
Scheduler:
*
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
@@ -10571,7 +10583,7 @@ public final Disposable subscribe() { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer onNext) { - return subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer()); + return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } /** diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 95e3d5b295..d8d8ce0406 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -2553,6 +2553,10 @@ public final Single retryWhen(Function, ? extends /** * Subscribes to a Single but ignore its emission or notification. + *

+ * If the Single emits an error, it is wrapped into an + * {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException} + * and routed to the RxJavaPlugins.onError handler. *

*
Scheduler:
*
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
@@ -2563,7 +2567,7 @@ public final Single retryWhen(Function, ? extends */ @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe() { - return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER); + return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING); } /** @@ -2594,6 +2598,10 @@ public final Disposable subscribe(final BiConsumer /** * Subscribes to a Single and provides a callback to handle the item it emits. + *

+ * If the Single emits an error, it is wrapped into an + * {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException} + * and routed to the RxJavaPlugins.onError handler. *

*
Scheduler:
*
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
@@ -2609,7 +2617,7 @@ public final Disposable subscribe(final BiConsumer @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer onSuccess) { - return subscribe(onSuccess, Functions.ERROR_CONSUMER); + return subscribe(onSuccess, Functions.ON_ERROR_MISSING); } /** diff --git a/src/main/java/io/reactivex/exceptions/OnErrorNotImplementedException.java b/src/main/java/io/reactivex/exceptions/OnErrorNotImplementedException.java new file mode 100644 index 0000000000..dc61f2eea8 --- /dev/null +++ b/src/main/java/io/reactivex/exceptions/OnErrorNotImplementedException.java @@ -0,0 +1,53 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.exceptions; + +import io.reactivex.annotations.Experimental; + +/** + * Represents an exception used to signal to the {@code RxJavaPlugins.onError()} that a + * callback-based subscribe() method on a base reactive type didn't specify + * an onError handler. + * @since 2.0.6 - experimental + */ +@Experimental +public final class OnErrorNotImplementedException extends RuntimeException { + + private static final long serialVersionUID = -6298857009889503852L; + + /** + * Customizes the {@code Throwable} with a custom message and wraps it before it + * is signalled to the {@code RxJavaPlugins.onError()} handler as {@code OnErrorNotImplementedException}. + * + * @param message + * the message to assign to the {@code Throwable} to signal + * @param e + * the {@code Throwable} to signal; if null, a NullPointerException is constructed + */ + public OnErrorNotImplementedException(String message, Throwable e) { + super(message, e != null ? e : new NullPointerException()); + } + + /** + * Wraps the {@code Throwable} before it + * is signalled to the {@code RxJavaPlugins.onError()} + * handler as {@code OnErrorNotImplementedException}. + * + * @param e + * the {@code Throwable} to signal; if null, a NullPointerException is constructed + */ + public OnErrorNotImplementedException(Throwable e) { + super(e != null ? e.getMessage() : null, e != null ? e : new NullPointerException()); + } +} \ No newline at end of file diff --git a/src/main/java/io/reactivex/internal/functions/Functions.java b/src/main/java/io/reactivex/internal/functions/Functions.java index 6d27e7b29b..5b4b97f759 100644 --- a/src/main/java/io/reactivex/internal/functions/Functions.java +++ b/src/main/java/io/reactivex/internal/functions/Functions.java @@ -18,6 +18,7 @@ import org.reactivestreams.Subscription; import io.reactivex.*; +import io.reactivex.exceptions.OnErrorNotImplementedException; import io.reactivex.functions.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Timed; @@ -218,6 +219,17 @@ public void accept(Throwable error) { } }; + /** + * Wraps the consumed Throwable into an OnErrorNotImplementedException and + * signals it to the plugin error handler. + */ + public static final Consumer ON_ERROR_MISSING = new Consumer() { + @Override + public void accept(Throwable error) { + RxJavaPlugins.onError(new OnErrorNotImplementedException(error)); + } + }; + public static final LongConsumer EMPTY_LONG_CONSUMER = new LongConsumer() { @Override public void accept(long v) { } diff --git a/src/main/java/io/reactivex/internal/observers/CallbackCompletableObserver.java b/src/main/java/io/reactivex/internal/observers/CallbackCompletableObserver.java index 3fc0591b93..d64e705e0e 100644 --- a/src/main/java/io/reactivex/internal/observers/CallbackCompletableObserver.java +++ b/src/main/java/io/reactivex/internal/observers/CallbackCompletableObserver.java @@ -17,9 +17,9 @@ import io.reactivex.CompletableObserver; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.Exceptions; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; -import io.reactivex.internal.disposables.*; +import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.plugins.RxJavaPlugins; public final class CallbackCompletableObserver @@ -43,7 +43,7 @@ public CallbackCompletableObserver(Consumer onError, Action o @Override public void accept(Throwable e) { - RxJavaPlugins.onError(e); + RxJavaPlugins.onError(new OnErrorNotImplementedException(e)); } @Override diff --git a/src/main/java/io/reactivex/internal/observers/EmptyCompletableObserver.java b/src/main/java/io/reactivex/internal/observers/EmptyCompletableObserver.java index 72b17a8ac4..64e295566c 100644 --- a/src/main/java/io/reactivex/internal/observers/EmptyCompletableObserver.java +++ b/src/main/java/io/reactivex/internal/observers/EmptyCompletableObserver.java @@ -17,7 +17,8 @@ import io.reactivex.CompletableObserver; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.*; +import io.reactivex.exceptions.OnErrorNotImplementedException; +import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.plugins.RxJavaPlugins; public final class EmptyCompletableObserver @@ -46,7 +47,7 @@ public void onComplete() { @Override public void onError(Throwable e) { lazySet(DisposableHelper.DISPOSED); - RxJavaPlugins.onError(e); + RxJavaPlugins.onError(new OnErrorNotImplementedException(e)); } @Override diff --git a/src/test/java/io/reactivex/exceptions/OnErrorNotImplementedExceptionTest.java b/src/test/java/io/reactivex/exceptions/OnErrorNotImplementedExceptionTest.java new file mode 100644 index 0000000000..1e6b97fc8c --- /dev/null +++ b/src/test/java/io/reactivex/exceptions/OnErrorNotImplementedExceptionTest.java @@ -0,0 +1,130 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.exceptions; + +import static org.junit.Assert.*; + +import java.util.List; + +import org.junit.*; + +import io.reactivex.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; + +public class OnErrorNotImplementedExceptionTest { + + List errors; + + @Before + public void before() { + errors = TestHelper.trackPluginErrors(); + } + + @After + public void after() { + RxJavaPlugins.reset(); + + assertFalse("" + errors, errors.isEmpty()); + TestHelper.assertError(errors, 0, OnErrorNotImplementedException.class); + Throwable c = errors.get(0).getCause(); + assertTrue("" + c, c instanceof TestException); + } + + @Test + public void flowableSubscribe0() { + Flowable.error(new TestException()) + .subscribe(); + } + + @Test + public void flowableSubscribe1() { + Flowable.error(new TestException()) + .subscribe(Functions.emptyConsumer()); + } + + @Test + public void flowableForEachWhile() { + Flowable.error(new TestException()) + .forEachWhile(Functions.alwaysTrue()); + } + + @Test + public void flowableBlockingSubscribe1() { + Flowable.error(new TestException()) + .blockingSubscribe(Functions.emptyConsumer()); + } + + @Test + public void observableSubscribe0() { + Observable.error(new TestException()) + .subscribe(); + } + + @Test + public void observableSubscribe1() { + Observable.error(new TestException()) + .subscribe(Functions.emptyConsumer()); + } + + @Test + public void observableForEachWhile() { + Observable.error(new TestException()) + .forEachWhile(Functions.alwaysTrue()); + } + + @Test + public void observableBlockingSubscribe1() { + Observable.error(new TestException()) + .blockingSubscribe(Functions.emptyConsumer()); + } + + @Test + public void singleSubscribe0() { + Single.error(new TestException()) + .subscribe(); + } + + @Test + public void singleSubscribe1() { + Single.error(new TestException()) + .subscribe(Functions.emptyConsumer()); + } + + + @Test + public void maybeSubscribe0() { + Maybe.error(new TestException()) + .subscribe(); + } + + @Test + public void maybeSubscribe1() { + Maybe.error(new TestException()) + .subscribe(Functions.emptyConsumer()); + } + + @Test + public void completableSubscribe0() { + Completable.error(new TestException()) + .subscribe(); + } + + @Test + public void completableSubscribe1() { + Completable.error(new TestException()) + .subscribe(Functions.EMPTY_ACTION); + } + +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableForEachTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableForEachTest.java index 72fd72af79..745c34b5cb 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableForEachTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableForEachTest.java @@ -111,7 +111,9 @@ public boolean test(Integer v) throws Exception { } }); - TestHelper.assertError(errors, 0, TestException.class); + TestHelper.assertError(errors, 0, OnErrorNotImplementedException.class); + Throwable c = errors.get(0).getCause(); + assertTrue("" + c, c instanceof TestException); } finally { RxJavaPlugins.reset(); } diff --git a/src/test/java/io/reactivex/maybe/MaybeTest.java b/src/test/java/io/reactivex/maybe/MaybeTest.java index 5f2809e76a..cba1ba4dce 100644 --- a/src/test/java/io/reactivex/maybe/MaybeTest.java +++ b/src/test/java/io/reactivex/maybe/MaybeTest.java @@ -2214,7 +2214,9 @@ public void subscribeZeroError() { assertTrue(Maybe.error(new TestException()) .subscribe().isDisposed()); - TestHelper.assertError(errors, 0, TestException.class); + TestHelper.assertError(errors, 0, OnErrorNotImplementedException.class); + Throwable c = errors.get(0).getCause(); + assertTrue("" + c, c instanceof TestException); } finally { RxJavaPlugins.reset(); }