From dc0889412cd9e83d99ee935b7cf3017b9b45dfb9 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 20 Mar 2017 15:46:06 +0100 Subject: [PATCH] 2.x: add resilient versions of parallel map(), filter() & doOnNext() --- .../parallel/ParallelDoOnNextTry.java | 294 +++++++++++++ .../operators/parallel/ParallelFilter.java | 2 +- .../operators/parallel/ParallelFilterTry.java | 278 +++++++++++++ .../operators/parallel/ParallelMapTry.java | 299 ++++++++++++++ .../parallel/ParallelFailureHandling.java | 46 +++ .../reactivex/parallel/ParallelFlowable.java | 119 ++++++ .../reactivex/ParamValidationCheckerTest.java | 3 + src/test/java/io/reactivex/TestHelper.java | 17 + .../parallel/ParallelDoOnNextTryTest.java | 388 ++++++++++++++++++ .../parallel/ParallelFilterTryTest.java | 377 +++++++++++++++++ .../parallel/ParallelMapTryTest.java | 351 ++++++++++++++++ 11 files changed, 2173 insertions(+), 1 deletion(-) create mode 100644 src/main/java/io/reactivex/internal/operators/parallel/ParallelDoOnNextTry.java create mode 100644 src/main/java/io/reactivex/internal/operators/parallel/ParallelFilterTry.java create mode 100644 src/main/java/io/reactivex/internal/operators/parallel/ParallelMapTry.java create mode 100644 src/main/java/io/reactivex/parallel/ParallelFailureHandling.java create mode 100644 src/test/java/io/reactivex/parallel/ParallelDoOnNextTryTest.java create mode 100644 src/test/java/io/reactivex/parallel/ParallelFilterTryTest.java create mode 100644 src/test/java/io/reactivex/parallel/ParallelMapTryTest.java diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelDoOnNextTry.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelDoOnNextTry.java new file mode 100644 index 0000000000..4a599b2435 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelDoOnNextTry.java @@ -0,0 +1,294 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import org.reactivestreams.*; + +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.ConditionalSubscriber; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.parallel.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Calls a Consumer for each upstream value passing by + * and handles any failure with a handler function. + * + * @param the input value type + * @since 2.0.8 - experimental + */ +public final class ParallelDoOnNextTry extends ParallelFlowable { + + final ParallelFlowable source; + + final Consumer onNext; + + final BiFunction errorHandler; + + public ParallelDoOnNextTry(ParallelFlowable source, Consumer onNext, + BiFunction errorHandler) { + this.source = source; + this.onNext = onNext; + this.errorHandler = errorHandler; + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + @SuppressWarnings("unchecked") + Subscriber[] parents = new Subscriber[n]; + + for (int i = 0; i < n; i++) { + Subscriber a = subscribers[i]; + if (a instanceof ConditionalSubscriber) { + parents[i] = new ParallelDoOnNextConditionalSubscriber((ConditionalSubscriber)a, onNext, errorHandler); + } else { + parents[i] = new ParallelDoOnNextSubscriber(a, onNext, errorHandler); + } + } + + source.subscribe(parents); + } + + @Override + public int parallelism() { + return source.parallelism(); + } + + static final class ParallelDoOnNextSubscriber implements ConditionalSubscriber, Subscription { + + final Subscriber actual; + + final Consumer onNext; + + final BiFunction errorHandler; + + Subscription s; + + boolean done; + + ParallelDoOnNextSubscriber(Subscriber actual, Consumer onNext, + BiFunction errorHandler) { + this.actual = actual; + this.onNext = onNext; + this.errorHandler = errorHandler; + } + + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + s.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (!tryOnNext(t)) { + s.request(1); + } + } + + @Override + public boolean tryOnNext(T t) { + if (done) { + return false; + } + long retries = 0; + + for (;;) { + try { + onNext.accept(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + + ParallelFailureHandling h; + + try { + h = ObjectHelper.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item"); + } catch (Throwable exc) { + Exceptions.throwIfFatal(exc); + cancel(); + onError(new CompositeException(ex, exc)); + return false; + } + + switch (h) { + case RETRY: + continue; + case SKIP: + return false; + case STOP: + cancel(); + onComplete(); + return false; + default: + cancel(); + onError(ex); + return false; + } + } + + actual.onNext(t); + return true; + } + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + actual.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + actual.onComplete(); + } + + } + static final class ParallelDoOnNextConditionalSubscriber implements ConditionalSubscriber, Subscription { + + final ConditionalSubscriber actual; + + final Consumer onNext; + + final BiFunction errorHandler; + Subscription s; + + boolean done; + + ParallelDoOnNextConditionalSubscriber(ConditionalSubscriber actual, + Consumer onNext, + BiFunction errorHandler) { + this.actual = actual; + this.onNext = onNext; + this.errorHandler = errorHandler; + } + + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + s.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (!tryOnNext(t) && !done) { + s.request(1); + } + } + + @Override + public boolean tryOnNext(T t) { + if (done) { + return false; + } + long retries = 0; + + for (;;) { + try { + onNext.accept(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + + ParallelFailureHandling h; + + try { + h = ObjectHelper.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item"); + } catch (Throwable exc) { + Exceptions.throwIfFatal(exc); + cancel(); + onError(new CompositeException(ex, exc)); + return false; + } + + switch (h) { + case RETRY: + continue; + case SKIP: + return false; + case STOP: + cancel(); + onComplete(); + return false; + default: + cancel(); + onError(ex); + return false; + } + } + + return actual.tryOnNext(t); + } + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + actual.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + actual.onComplete(); + } + + } +} diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFilter.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFilter.java index 78b945417c..5dab623306 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFilter.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFilter.java @@ -88,7 +88,7 @@ public final void cancel() { @Override public final void onNext(T t) { - if (!tryOnNext(t)) { + if (!tryOnNext(t) && !done) { s.request(1); } } diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFilterTry.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFilterTry.java new file mode 100644 index 0000000000..094c88a06e --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFilterTry.java @@ -0,0 +1,278 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import org.reactivestreams.*; + +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.ConditionalSubscriber; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.parallel.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Filters each 'rail' of the source ParallelFlowable with a predicate function. + * + * @param the input value type + */ +public final class ParallelFilterTry extends ParallelFlowable { + + final ParallelFlowable source; + + final Predicate predicate; + + final BiFunction errorHandler; + + public ParallelFilterTry(ParallelFlowable source, Predicate predicate, + BiFunction errorHandler) { + this.source = source; + this.predicate = predicate; + this.errorHandler = errorHandler; + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + @SuppressWarnings("unchecked") + Subscriber[] parents = new Subscriber[n]; + + for (int i = 0; i < n; i++) { + Subscriber a = subscribers[i]; + if (a instanceof ConditionalSubscriber) { + parents[i] = new ParallelFilterConditionalSubscriber((ConditionalSubscriber)a, predicate, errorHandler); + } else { + parents[i] = new ParallelFilterSubscriber(a, predicate, errorHandler); + } + } + + source.subscribe(parents); + } + + @Override + public int parallelism() { + return source.parallelism(); + } + + abstract static class BaseFilterSubscriber implements ConditionalSubscriber, Subscription { + final Predicate predicate; + + final BiFunction errorHandler; + + Subscription s; + + boolean done; + + BaseFilterSubscriber(Predicate predicate, BiFunction errorHandler) { + this.predicate = predicate; + this.errorHandler = errorHandler; + } + + @Override + public final void request(long n) { + s.request(n); + } + + @Override + public final void cancel() { + s.cancel(); + } + + @Override + public final void onNext(T t) { + if (!tryOnNext(t) && !done) { + s.request(1); + } + } + } + + static final class ParallelFilterSubscriber extends BaseFilterSubscriber { + + final Subscriber actual; + + ParallelFilterSubscriber(Subscriber actual, Predicate predicate, BiFunction errorHandler) { + super(predicate, errorHandler); + this.actual = actual; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + } + } + + @Override + public boolean tryOnNext(T t) { + if (!done) { + long retries = 0L; + + for (;;) { + boolean b; + + try { + b = predicate.test(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + + ParallelFailureHandling h; + + try { + h = ObjectHelper.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item"); + } catch (Throwable exc) { + Exceptions.throwIfFatal(exc); + cancel(); + onError(new CompositeException(ex, exc)); + return false; + } + + switch (h) { + case RETRY: + continue; + case SKIP: + return false; + case STOP: + cancel(); + onComplete(); + return false; + default: + cancel(); + onError(ex); + return false; + } + } + + if (b) { + actual.onNext(t); + return true; + } + return false; + } + } + return false; + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + actual.onError(t); + } + + @Override + public void onComplete() { + if (!done) { + done = true; + actual.onComplete(); + } + } + } + + static final class ParallelFilterConditionalSubscriber extends BaseFilterSubscriber { + + final ConditionalSubscriber actual; + + ParallelFilterConditionalSubscriber(ConditionalSubscriber actual, + Predicate predicate, + BiFunction errorHandler) { + super(predicate, errorHandler); + this.actual = actual; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + } + } + + @Override + public boolean tryOnNext(T t) { + if (!done) { + long retries = 0L; + + for (;;) { + boolean b; + + try { + b = predicate.test(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + + ParallelFailureHandling h; + + try { + h = ObjectHelper.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item"); + } catch (Throwable exc) { + Exceptions.throwIfFatal(exc); + cancel(); + onError(new CompositeException(ex, exc)); + return false; + } + + switch (h) { + case RETRY: + continue; + case SKIP: + return false; + case STOP: + cancel(); + onComplete(); + return false; + default: + cancel(); + onError(ex); + return false; + } + } + + if (b) { + return actual.tryOnNext(t); + } + return false; + } + } + return false; + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + actual.onError(t); + } + + @Override + public void onComplete() { + if (!done) { + done = true; + actual.onComplete(); + } + } + }} diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelMapTry.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelMapTry.java new file mode 100644 index 0000000000..59c8b85fde --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelMapTry.java @@ -0,0 +1,299 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.parallel; + +import org.reactivestreams.*; + +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.ConditionalSubscriber; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.parallel.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps each 'rail' of the source ParallelFlowable with a mapper function + * and handle any failure based on a handler function. + * + * @param the input value type + * @param the output value type + * @since 2.0.8 - experimental + */ +public final class ParallelMapTry extends ParallelFlowable { + + final ParallelFlowable source; + + final Function mapper; + + final BiFunction errorHandler; + + public ParallelMapTry(ParallelFlowable source, Function mapper, + BiFunction errorHandler) { + this.source = source; + this.mapper = mapper; + this.errorHandler = errorHandler; + } + + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + int n = subscribers.length; + @SuppressWarnings("unchecked") + Subscriber[] parents = new Subscriber[n]; + + for (int i = 0; i < n; i++) { + Subscriber a = subscribers[i]; + if (a instanceof ConditionalSubscriber) { + parents[i] = new ParallelMapTryConditionalSubscriber((ConditionalSubscriber)a, mapper, errorHandler); + } else { + parents[i] = new ParallelMapTrySubscriber(a, mapper, errorHandler); + } + } + + source.subscribe(parents); + } + + @Override + public int parallelism() { + return source.parallelism(); + } + + static final class ParallelMapTrySubscriber implements ConditionalSubscriber, Subscription { + + final Subscriber actual; + + final Function mapper; + + final BiFunction errorHandler; + + Subscription s; + + boolean done; + + ParallelMapTrySubscriber(Subscriber actual, Function mapper, + BiFunction errorHandler) { + this.actual = actual; + this.mapper = mapper; + this.errorHandler = errorHandler; + } + + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + s.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (!tryOnNext(t) && !done) { + s.request(1); + } + } + + @Override + public boolean tryOnNext(T t) { + if (done) { + return false; + } + long retries = 0; + + for (;;) { + R v; + + try { + v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + + ParallelFailureHandling h; + + try { + h = ObjectHelper.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item"); + } catch (Throwable exc) { + Exceptions.throwIfFatal(exc); + cancel(); + onError(new CompositeException(ex, exc)); + return false; + } + + switch (h) { + case RETRY: + continue; + case SKIP: + return false; + case STOP: + cancel(); + onComplete(); + return false; + default: + cancel(); + onError(ex); + return false; + } + } + + actual.onNext(v); + return true; + } + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + actual.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + actual.onComplete(); + } + + } + static final class ParallelMapTryConditionalSubscriber implements ConditionalSubscriber, Subscription { + + final ConditionalSubscriber actual; + + final Function mapper; + + final BiFunction errorHandler; + Subscription s; + + boolean done; + + ParallelMapTryConditionalSubscriber(ConditionalSubscriber actual, + Function mapper, + BiFunction errorHandler) { + this.actual = actual; + this.mapper = mapper; + this.errorHandler = errorHandler; + } + + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + s.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (!tryOnNext(t) && !done) { + s.request(1); + } + } + + @Override + public boolean tryOnNext(T t) { + if (done) { + return false; + } + long retries = 0; + + for (;;) { + R v; + + try { + v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + + ParallelFailureHandling h; + + try { + h = ObjectHelper.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item"); + } catch (Throwable exc) { + Exceptions.throwIfFatal(exc); + cancel(); + onError(new CompositeException(ex, exc)); + return false; + } + + switch (h) { + case RETRY: + continue; + case SKIP: + return false; + case STOP: + cancel(); + onComplete(); + return false; + default: + cancel(); + onError(ex); + return false; + } + } + + return actual.tryOnNext(v); + } + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; + actual.onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + actual.onComplete(); + } + + } +} diff --git a/src/main/java/io/reactivex/parallel/ParallelFailureHandling.java b/src/main/java/io/reactivex/parallel/ParallelFailureHandling.java new file mode 100644 index 0000000000..dd53aa622a --- /dev/null +++ b/src/main/java/io/reactivex/parallel/ParallelFailureHandling.java @@ -0,0 +1,46 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import io.reactivex.annotations.Experimental; +import io.reactivex.functions.BiFunction; + +/** + * Enumerations for handling failure within a parallel operator. + * @since 2.0.8 - experimental + */ +@Experimental +public enum ParallelFailureHandling implements BiFunction { + /** + * The current rail is stopped and the error is dropped. + */ + STOP, + /** + * The current rail is stopped and the error is signalled. + */ + ERROR, + /** + * The current value and error is ignored and the rail resumes with the next item. + */ + SKIP, + /** + * Retry the current value. + */ + RETRY; + + @Override + public ParallelFailureHandling apply(Long t1, Throwable t2) { + return this; + } +} diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowable.java b/src/main/java/io/reactivex/parallel/ParallelFlowable.java index f7484bc9d5..178cfe103b 100644 --- a/src/main/java/io/reactivex/parallel/ParallelFlowable.java +++ b/src/main/java/io/reactivex/parallel/ParallelFlowable.java @@ -135,6 +135,47 @@ public final ParallelFlowable map(@NonNull Function(this, mapper)); } + /** + * Maps the source values on each 'rail' to another value and + * handles errors based on the given {@link ParallelFailureHandling} enumeration value. + *

+ * Note that the same mapper function may be called from multiple threads concurrently. + * @param the output value type + * @param mapper the mapper function turning Ts into Us. + * @param errorHandler the enumeration that defines how to handle errors thrown + * from the mapper function + * @return the new ParallelFlowable instance + * @since 2.0.8 - experimental + */ + @CheckReturnValue + @Experimental + public final ParallelFlowable map(@NonNull Function mapper, @NonNull ParallelFailureHandling errorHandler) { + ObjectHelper.requireNonNull(mapper, "mapper"); + ObjectHelper.requireNonNull(errorHandler, "errorHandler is null"); + return RxJavaPlugins.onAssembly(new ParallelMapTry(this, mapper, errorHandler)); + } + + /** + * Maps the source values on each 'rail' to another value and + * handles errors based on the returned value by the handler function. + *

+ * Note that the same mapper function may be called from multiple threads concurrently. + * @param the output value type + * @param mapper the mapper function turning Ts into Us. + * @param errorHandler the function called with the current repeat count and + * failure Throwable and should return one of the {@link ParallelFailureHandling} + * enumeration values to indicate how to proceed. + * @return the new ParallelFlowable instance + * @since 2.0.8 - experimental + */ + @CheckReturnValue + @Experimental + public final ParallelFlowable map(@NonNull Function mapper, @NonNull BiFunction errorHandler) { + ObjectHelper.requireNonNull(mapper, "mapper"); + ObjectHelper.requireNonNull(errorHandler, "errorHandler is null"); + return RxJavaPlugins.onAssembly(new ParallelMapTry(this, mapper, errorHandler)); + } + /** * Filters the source values on each 'rail'. *

@@ -148,6 +189,46 @@ public final ParallelFlowable filter(@NonNull Predicate predicate) return RxJavaPlugins.onAssembly(new ParallelFilter(this, predicate)); } + /** + * Filters the source values on each 'rail' and + * handles errors based on the given {@link ParallelFailureHandling} enumeration value. + *

+ * Note that the same predicate may be called from multiple threads concurrently. + * @param predicate the function returning true to keep a value or false to drop a value + * @param errorHandler the enumeration that defines how to handle errors thrown + * from the predicate + * @return the new ParallelFlowable instance + * @since 2.0.8 - experimental + */ + @CheckReturnValue + @Experimental + public final ParallelFlowable filter(@NonNull Predicate predicate, @NonNull ParallelFailureHandling errorHandler) { + ObjectHelper.requireNonNull(predicate, "predicate"); + ObjectHelper.requireNonNull(errorHandler, "errorHandler is null"); + return RxJavaPlugins.onAssembly(new ParallelFilterTry(this, predicate, errorHandler)); + } + + + /** + * Filters the source values on each 'rail' and + * handles errors based on the returned value by the handler function. + *

+ * Note that the same predicate may be called from multiple threads concurrently. + * @param predicate the function returning true to keep a value or false to drop a value + * @param errorHandler the function called with the current repeat count and + * failure Throwable and should return one of the {@link ParallelFailureHandling} + * enumeration values to indicate how to proceed. + * @return the new ParallelFlowable instance + * @since 2.0.8 - experimental + */ + @CheckReturnValue + @Experimental + public final ParallelFlowable filter(@NonNull Predicate predicate, @NonNull BiFunction errorHandler) { + ObjectHelper.requireNonNull(predicate, "predicate"); + ObjectHelper.requireNonNull(errorHandler, "errorHandler is null"); + return RxJavaPlugins.onAssembly(new ParallelFilterTry(this, predicate, errorHandler)); + } + /** * Specifies where each 'rail' will observe its incoming values with * no work-stealing and default prefetch amount. @@ -422,6 +503,44 @@ public final ParallelFlowable doOnNext(@NonNull Consumer onNext) { )); } + + /** + * Call the specified consumer with the current element passing through any 'rail' and + * handles errors based on the given {@link ParallelFailureHandling} enumeration value. + * + * @param onNext the callback + * @param errorHandler the enumeration that defines how to handle errors thrown + * from the onNext consumer + * @return the new ParallelFlowable instance + * @since 2.0.8 - experimental + */ + @CheckReturnValue + @Experimental + public final ParallelFlowable doOnNext(@NonNull Consumer onNext, @NonNull ParallelFailureHandling errorHandler) { + ObjectHelper.requireNonNull(onNext, "onNext is null"); + ObjectHelper.requireNonNull(errorHandler, "errorHandler is null"); + return RxJavaPlugins.onAssembly(new ParallelDoOnNextTry(this, onNext, errorHandler)); + } + + /** + * Call the specified consumer with the current element passing through any 'rail' and + * handles errors based on the returned value by the handler function. + * + * @param onNext the callback + * @param errorHandler the function called with the current repeat count and + * failure Throwable and should return one of the {@link ParallelFailureHandling} + * enumeration values to indicate how to proceed. + * @return the new ParallelFlowable instance + * @since 2.0.8 - experimental + */ + @CheckReturnValue + @Experimental + public final ParallelFlowable doOnNext(@NonNull Consumer onNext, @NonNull BiFunction errorHandler) { + ObjectHelper.requireNonNull(onNext, "onNext is null"); + ObjectHelper.requireNonNull(errorHandler, "errorHandler is null"); + return RxJavaPlugins.onAssembly(new ParallelDoOnNextTry(this, onNext, errorHandler)); + } + /** * Call the specified consumer with the current element passing through any 'rail' * after it has been delivered to downstream within the rail. diff --git a/src/test/java/io/reactivex/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/ParamValidationCheckerTest.java index d0993cd10a..3013dda85b 100644 --- a/src/test/java/io/reactivex/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/ParamValidationCheckerTest.java @@ -554,6 +554,9 @@ public void checkParallelFlowable() { defaultValues.put(ParallelFlowable.class, ParallelFlowable.from(Flowable.never())); defaultValues.put(Subscriber[].class, new Subscriber[] { new AllFunctionals() }); + + defaultValues.put(ParallelFailureHandling.class, ParallelFailureHandling.ERROR); + // ----------------------------------------------------------------------------------- defaultInstances = new HashMap, List>(); diff --git a/src/test/java/io/reactivex/TestHelper.java b/src/test/java/io/reactivex/TestHelper.java index 5a329b4163..eabe17035b 100644 --- a/src/test/java/io/reactivex/TestHelper.java +++ b/src/test/java/io/reactivex/TestHelper.java @@ -38,6 +38,7 @@ import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.observers.TestObserver; +import io.reactivex.parallel.ParallelFlowable; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; @@ -2635,4 +2636,20 @@ protected void subscribeActual(Subscriber observer) { RxJavaPlugins.reset(); } } + + public static void checkInvalidParallelSubscribers(ParallelFlowable source) { + int n = source.parallelism(); + + @SuppressWarnings("unchecked") + TestSubscriber[] tss = new TestSubscriber[n + 1]; + for (int i = 0; i <= n; i++) { + tss[i] = new TestSubscriber().withTag("" + i); + } + + source.subscribe(tss); + + for (int i = 0; i <= n; i++) { + tss[i].assertFailure(IllegalArgumentException.class); + } + } } diff --git a/src/test/java/io/reactivex/parallel/ParallelDoOnNextTryTest.java b/src/test/java/io/reactivex/parallel/ParallelDoOnNextTryTest.java new file mode 100644 index 0000000000..3a40edc229 --- /dev/null +++ b/src/test/java/io/reactivex/parallel/ParallelDoOnNextTryTest.java @@ -0,0 +1,388 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import static org.junit.Assert.assertEquals; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subscribers.TestSubscriber; + +public class ParallelDoOnNextTryTest implements Consumer { + + volatile int calls; + + @Override + public void accept(Object t) throws Exception { + calls++; + } + + @Test + public void doOnNextNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.just(1) + .parallel(1) + .doOnNext(this, e) + .sequential() + .test() + .assertResult(1); + + assertEquals(calls, 1); + calls = 0; + } + } + @Test + public void doOnNextErrorNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.error(new TestException()) + .parallel(1) + .doOnNext(this, e) + .sequential() + .test() + .assertFailure(TestException.class); + + assertEquals(calls, 0); + } + } + + @Test + public void doOnNextConditionalNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.just(1) + .parallel(1) + .doOnNext(this, e) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(1); + + assertEquals(calls, 1); + calls = 0; + } + } + + @Test + public void doOnNextErrorConditionalNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.error(new TestException()) + .parallel(1) + .doOnNext(this, e) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(TestException.class); + + assertEquals(calls, 0); + } + } + + @Test + public void doOnNextFailWithError() { + Flowable.range(0, 2) + .parallel(1) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + if (1 / v < 0) { + System.out.println("Should not happen!"); + } + } + }, ParallelFailureHandling.ERROR) + .sequential() + .test() + .assertFailure(ArithmeticException.class); + } + + @Test + public void doOnNextFailWithStop() { + Flowable.range(0, 2) + .parallel(1) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + if (1 / v < 0) { + System.out.println("Should not happen!"); + } + } + }, ParallelFailureHandling.STOP) + .sequential() + .test() + .assertResult(); + } + + @Test + public void doOnNextFailWithRetry() { + Flowable.range(0, 2) + .parallel(1) + .doOnNext(new Consumer() { + int count; + @Override + public void accept(Integer v) throws Exception { + if (count++ == 1) { + return; + } + if (1 / v < 0) { + System.out.println("Should not happen!"); + } + } + }, ParallelFailureHandling.RETRY) + .sequential() + .test() + .assertResult(0, 1); + } + + @Test + public void doOnNextFailWithRetryLimited() { + Flowable.range(0, 2) + .parallel(1) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + if (1 / v < 0) { + System.out.println("Should not happen!"); + } + } + }, new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + return n < 5 ? ParallelFailureHandling.RETRY : ParallelFailureHandling.SKIP; + } + }) + .sequential() + .test() + .assertResult(1); + } + + @Test + public void doOnNextFailWithSkip() { + Flowable.range(0, 2) + .parallel(1) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + if (1 / v < 0) { + System.out.println("Should not happen!"); + } + } + }, ParallelFailureHandling.SKIP) + .sequential() + .test() + .assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void doOnNextFailHandlerThrows() { + TestSubscriber ts = Flowable.range(0, 2) + .parallel(1) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + if (1 / v < 0) { + System.out.println("Should not happen!"); + } + } + }, new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + throw new TestException(); + } + }) + .sequential() + .test() + .assertFailure(CompositeException.class); + + TestHelper.assertCompositeExceptions(ts, ArithmeticException.class, TestException.class); + } + + @Test + public void doOnNextWrongParallelism() { + TestHelper.checkInvalidParallelSubscribers( + Flowable.just(1).parallel(1) + .doOnNext(Functions.emptyConsumer(), ParallelFailureHandling.ERROR) + ); + } + + @Test + public void filterInvalidSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .doOnNext(Functions.emptyConsumer(), ParallelFailureHandling.ERROR) + .sequential() + .test(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void doOnNextFailWithErrorConditional() { + Flowable.range(0, 2) + .parallel(1) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + if (1 / v < 0) { + System.out.println("Should not happen!"); + } + } + }, ParallelFailureHandling.ERROR) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(ArithmeticException.class); + } + + @Test + public void doOnNextFailWithStopConditional() { + Flowable.range(0, 2) + .parallel(1) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + if (1 / v < 0) { + System.out.println("Should not happen!"); + } + } + }, ParallelFailureHandling.STOP) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(); + } + + @Test + public void doOnNextFailWithRetryConditional() { + Flowable.range(0, 2) + .parallel(1) + .doOnNext(new Consumer() { + int count; + @Override + public void accept(Integer v) throws Exception { + if (count++ == 1) { + return; + } + if (1 / v < 0) { + System.out.println("Should not happen!"); + } + } + }, ParallelFailureHandling.RETRY) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(0, 1); + } + + @Test + public void doOnNextFailWithRetryLimitedConditional() { + Flowable.range(0, 2) + .parallel(1) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + if (1 / v < 0) { + System.out.println("Should not happen!"); + } + } + }, new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + return n < 5 ? ParallelFailureHandling.RETRY : ParallelFailureHandling.SKIP; + } + }) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(1); + } + + @Test + public void doOnNextFailWithSkipConditional() { + Flowable.range(0, 2) + .parallel(1) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + if (1 / v < 0) { + System.out.println("Should not happen!"); + } + } + }, ParallelFailureHandling.SKIP) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void doOnNextFailHandlerThrowsConditional() { + TestSubscriber ts = Flowable.range(0, 2) + .parallel(1) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + if (1 / v < 0) { + System.out.println("Should not happen!"); + } + } + }, new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + throw new TestException(); + } + }) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(CompositeException.class); + + TestHelper.assertCompositeExceptions(ts, ArithmeticException.class, TestException.class); + } + + @Test + public void doOnNextWrongParallelismConditional() { + TestHelper.checkInvalidParallelSubscribers( + Flowable.just(1).parallel(1) + .doOnNext(Functions.emptyConsumer(), ParallelFailureHandling.ERROR) + .filter(Functions.alwaysTrue()) + ); + } + + @Test + public void filterInvalidSourceConditional() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .doOnNext(Functions.emptyConsumer(), ParallelFailureHandling.ERROR) + .filter(Functions.alwaysTrue()) + .sequential() + .test(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } +} diff --git a/src/test/java/io/reactivex/parallel/ParallelFilterTryTest.java b/src/test/java/io/reactivex/parallel/ParallelFilterTryTest.java new file mode 100644 index 0000000000..bb7d919ce0 --- /dev/null +++ b/src/test/java/io/reactivex/parallel/ParallelFilterTryTest.java @@ -0,0 +1,377 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subscribers.TestSubscriber; + +public class ParallelFilterTryTest implements Consumer { + + volatile int calls; + + @Override + public void accept(Object t) throws Exception { + calls++; + } + + @Test + public void filterNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.just(1) + .parallel(1) + .filter(Functions.alwaysTrue(), e) + .sequential() + .test() + .assertResult(1); + } + } + + @Test + public void filterFalse() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.just(1) + .parallel(1) + .filter(Functions.alwaysFalse(), e) + .sequential() + .test() + .assertResult(); + } + } + + @Test + public void filterFalseConditional() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.just(1) + .parallel(1) + .filter(Functions.alwaysFalse(), e) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(); + } + } + + @Test + public void filterErrorNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.error(new TestException()) + .parallel(1) + .filter(Functions.alwaysTrue(), e) + .sequential() + .test() + .assertFailure(TestException.class); + } + } + + @Test + public void filterConditionalNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.just(1) + .parallel(1) + .filter(Functions.alwaysTrue(), e) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(1); + } + } + @Test + public void filterErrorConditionalNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.error(new TestException()) + .parallel(1) + .filter(Functions.alwaysTrue(), e) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(TestException.class); + } + } + + @Test + public void filterFailWithError() { + Flowable.range(0, 2) + .parallel(1) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return 1 / v > 0; + } + }, ParallelFailureHandling.ERROR) + .sequential() + .test() + .assertFailure(ArithmeticException.class); + } + + @Test + public void filterFailWithStop() { + Flowable.range(0, 2) + .parallel(1) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return 1 / v > 0; + } + }, ParallelFailureHandling.STOP) + .sequential() + .test() + .assertResult(); + } + + @Test + public void filterFailWithRetry() { + Flowable.range(0, 2) + .parallel(1) + .filter(new Predicate() { + int count; + @Override + public boolean test(Integer v) throws Exception { + if (count++ == 1) { + return true; + } + return 1 / v > 0; + } + }, ParallelFailureHandling.RETRY) + .sequential() + .test() + .assertResult(0, 1); + } + + @Test + public void filterFailWithRetryLimited() { + Flowable.range(0, 2) + .parallel(1) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return 1 / v > 0; + } + }, new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + return n < 5 ? ParallelFailureHandling.RETRY : ParallelFailureHandling.SKIP; + } + }) + .sequential() + .test() + .assertResult(1); + } + + @Test + public void filterFailWithSkip() { + Flowable.range(0, 2) + .parallel(1) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return 1 / v > 0; + } + }, ParallelFailureHandling.SKIP) + .sequential() + .test() + .assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void filterFailHandlerThrows() { + TestSubscriber ts = Flowable.range(0, 2) + .parallel(1) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return 1 / v > 0; + } + }, new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + throw new TestException(); + } + }) + .sequential() + .test() + .assertFailure(CompositeException.class); + + TestHelper.assertCompositeExceptions(ts, ArithmeticException.class, TestException.class); + } + + @Test + public void filterWrongParallelism() { + TestHelper.checkInvalidParallelSubscribers( + Flowable.just(1).parallel(1) + .filter(Functions.alwaysTrue(), ParallelFailureHandling.ERROR) + ); + } + + @Test + public void filterInvalidSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .filter(Functions.alwaysTrue(), ParallelFailureHandling.ERROR) + .sequential() + .test(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void filterFailWithErrorConditional() { + Flowable.range(0, 2) + .parallel(1) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return 1 / v > 0; + } + }, ParallelFailureHandling.ERROR) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(ArithmeticException.class); + } + + @Test + public void filterFailWithStopConditional() { + Flowable.range(0, 2) + .parallel(1) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return 1 / v > 0; + } + }, ParallelFailureHandling.STOP) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(); + } + + @Test + public void filterFailWithRetryConditional() { + Flowable.range(0, 2) + .parallel(1) + .filter(new Predicate() { + int count; + @Override + public boolean test(Integer v) throws Exception { + if (count++ == 1) { + return true; + } + return 1 / v > 0; + } + }, ParallelFailureHandling.RETRY) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(0, 1); + } + + @Test + public void filterFailWithRetryLimitedConditional() { + Flowable.range(0, 2) + .parallel(1) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return 1 / v > 0; + } + }, new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + return n < 5 ? ParallelFailureHandling.RETRY : ParallelFailureHandling.SKIP; + } + }) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(1); + } + + @Test + public void filterFailWithSkipConditional() { + Flowable.range(0, 2) + .parallel(1) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return 1 / v > 0; + } + }, ParallelFailureHandling.SKIP) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void filterFailHandlerThrowsConditional() { + TestSubscriber ts = Flowable.range(0, 2) + .parallel(1) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return 1 / v > 0; + } + }, new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + throw new TestException(); + } + }) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(CompositeException.class); + + TestHelper.assertCompositeExceptions(ts, ArithmeticException.class, TestException.class); + } + + @Test + public void filterWrongParallelismConditional() { + TestHelper.checkInvalidParallelSubscribers( + Flowable.just(1).parallel(1) + .filter(Functions.alwaysTrue(), ParallelFailureHandling.ERROR) + .filter(Functions.alwaysTrue()) + ); + } + + @Test + public void filterInvalidSourceConditional() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .filter(Functions.alwaysTrue(), ParallelFailureHandling.ERROR) + .filter(Functions.alwaysTrue()) + .sequential() + .test(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } +} diff --git a/src/test/java/io/reactivex/parallel/ParallelMapTryTest.java b/src/test/java/io/reactivex/parallel/ParallelMapTryTest.java new file mode 100644 index 0000000000..261eea5e9e --- /dev/null +++ b/src/test/java/io/reactivex/parallel/ParallelMapTryTest.java @@ -0,0 +1,351 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subscribers.TestSubscriber; + +public class ParallelMapTryTest implements Consumer { + + volatile int calls; + + @Override + public void accept(Object t) throws Exception { + calls++; + } + + @Test + public void mapNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.just(1) + .parallel(1) + .map(Functions.identity(), e) + .sequential() + .test() + .assertResult(1); + } + } + @Test + public void mapErrorNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.error(new TestException()) + .parallel(1) + .map(Functions.identity(), e) + .sequential() + .test() + .assertFailure(TestException.class); + } + } + + @Test + public void mapConditionalNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.just(1) + .parallel(1) + .map(Functions.identity(), e) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(1); + } + } + @Test + public void mapErrorConditionalNoError() { + for (ParallelFailureHandling e : ParallelFailureHandling.values()) { + Flowable.error(new TestException()) + .parallel(1) + .map(Functions.identity(), e) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(TestException.class); + } + } + + @Test + public void mapFailWithError() { + Flowable.range(0, 2) + .parallel(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return 1 / v; + } + }, ParallelFailureHandling.ERROR) + .sequential() + .test() + .assertFailure(ArithmeticException.class); + } + + @Test + public void mapFailWithStop() { + Flowable.range(0, 2) + .parallel(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return 1 / v; + } + }, ParallelFailureHandling.STOP) + .sequential() + .test() + .assertResult(); + } + + @Test + public void mapFailWithRetry() { + Flowable.range(0, 2) + .parallel(1) + .map(new Function() { + int count; + @Override + public Integer apply(Integer v) throws Exception { + if (count++ == 1) { + return -1; + } + return 1 / v; + } + }, ParallelFailureHandling.RETRY) + .sequential() + .test() + .assertResult(-1, 1); + } + + @Test + public void mapFailWithRetryLimited() { + Flowable.range(0, 2) + .parallel(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return 1 / v; + } + }, new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + return n < 5 ? ParallelFailureHandling.RETRY : ParallelFailureHandling.SKIP; + } + }) + .sequential() + .test() + .assertResult(1); + } + + @Test + public void mapFailWithSkip() { + Flowable.range(0, 2) + .parallel(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return 1 / v; + } + }, ParallelFailureHandling.SKIP) + .sequential() + .test() + .assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void mapFailHandlerThrows() { + TestSubscriber ts = Flowable.range(0, 2) + .parallel(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return 1 / v; + } + }, new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + throw new TestException(); + } + }) + .sequential() + .test() + .assertFailure(CompositeException.class); + + TestHelper.assertCompositeExceptions(ts, ArithmeticException.class, TestException.class); + } + + @Test + public void mapWrongParallelism() { + TestHelper.checkInvalidParallelSubscribers( + Flowable.just(1).parallel(1) + .map(Functions.identity(), ParallelFailureHandling.ERROR) + ); + } + + @Test + public void mapInvalidSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .map(Functions.identity(), ParallelFailureHandling.ERROR) + .sequential() + .test(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void mapFailWithErrorConditional() { + Flowable.range(0, 2) + .parallel(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return 1 / v; + } + }, ParallelFailureHandling.ERROR) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(ArithmeticException.class); + } + + @Test + public void mapFailWithStopConditional() { + Flowable.range(0, 2) + .parallel(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return 1 / v; + } + }, ParallelFailureHandling.STOP) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(); + } + + @Test + public void mapFailWithRetryConditional() { + Flowable.range(0, 2) + .parallel(1) + .map(new Function() { + int count; + @Override + public Integer apply(Integer v) throws Exception { + if (count++ == 1) { + return -1; + } + return 1 / v; + } + }, ParallelFailureHandling.RETRY) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(-1, 1); + } + + @Test + public void mapFailWithRetryLimitedConditional() { + Flowable.range(0, 2) + .parallel(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return 1 / v; + } + }, new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + return n < 5 ? ParallelFailureHandling.RETRY : ParallelFailureHandling.SKIP; + } + }) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(1); + } + + @Test + public void mapFailWithSkipConditional() { + Flowable.range(0, 2) + .parallel(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return 1 / v; + } + }, ParallelFailureHandling.SKIP) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void mapFailHandlerThrowsConditional() { + TestSubscriber ts = Flowable.range(0, 2) + .parallel(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return 1 / v; + } + }, new BiFunction() { + @Override + public ParallelFailureHandling apply(Long n, Throwable e) throws Exception { + throw new TestException(); + } + }) + .filter(Functions.alwaysTrue()) + .sequential() + .test() + .assertFailure(CompositeException.class); + + TestHelper.assertCompositeExceptions(ts, ArithmeticException.class, TestException.class); + } + + @Test + public void mapWrongParallelismConditional() { + TestHelper.checkInvalidParallelSubscribers( + Flowable.just(1).parallel(1) + .map(Functions.identity(), ParallelFailureHandling.ERROR) + .filter(Functions.alwaysTrue()) + ); + } + + @Test + public void mapInvalidSourceConditional() { + List errors = TestHelper.trackPluginErrors(); + try { + new ParallelInvalid() + .map(Functions.identity(), ParallelFailureHandling.ERROR) + .filter(Functions.alwaysTrue()) + .sequential() + .test(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } +}