From 5ea6bf98696523a2822e359349909fa74d5a2236 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 18 Feb 2017 23:39:33 +0100 Subject: [PATCH 1/2] 2.x: add ParallelFlowable.sequentialDelayError --- .../parallel/ParallelFromPublisher.java | 76 +++--- .../operators/parallel/ParallelJoin.java | 239 ++++++++++++++++-- .../reactivex/parallel/ParallelFlowable.java | 55 +++- .../parallel/ParallelFlowableTest.java | 5 +- .../reactivex/parallel/ParallelJoinTest.java | 208 ++++++++++++++- .../reactivex/parallel/ParallelPeekTest.java | 4 +- 6 files changed, 531 insertions(+), 56 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java index de0289843e..5abd0f7595 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java @@ -101,8 +101,10 @@ static final class ParallelDispatcher this.subscribers = subscribers; this.prefetch = prefetch; this.limit = prefetch - (prefetch >> 2); - this.requests = new AtomicLongArray(subscribers.length); - this.emissions = new long[subscribers.length]; + int m = subscribers.length; + this.requests = new AtomicLongArray(m + 1); + this.requests.lazySet(m, m); + this.emissions = new long[m]; } @Override @@ -145,42 +147,58 @@ public void onSubscribe(Subscription s) { } void setupSubscribers() { - final int m = subscribers.length; + Subscriber[] subs = subscribers; + final int m = subs.length; for (int i = 0; i < m; i++) { if (cancelled) { return; } - final int j = i; subscriberCount.lazySet(i + 1); - subscribers[i].onSubscribe(new Subscription() { - @Override - public void request(long n) { - if (SubscriptionHelper.validate(n)) { - AtomicLongArray ra = requests; - for (;;) { - long r = ra.get(j); - if (r == Long.MAX_VALUE) { - return; - } - long u = BackpressureHelper.addCap(r, n); - if (ra.compareAndSet(j, r, u)) { - break; - } - } - if (subscriberCount.get() == m) { - drain(); - } + subs[i].onSubscribe(new RailSubscription(i, m)); + } + } + + final class RailSubscription extends AtomicBoolean implements Subscription { + + private static final long serialVersionUID = 7289979168658050255L; + + final int j; + + final int m; + + RailSubscription(int j, int m) { + this.j = j; + this.m = m; + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + AtomicLongArray ra = requests; + for (;;) { + long r = ra.get(j); + if (r == Long.MAX_VALUE) { + return; + } + long u = BackpressureHelper.addCap(r, n); + if (ra.compareAndSet(j, r, u)) { + break; } } - - @Override - public void cancel() { - ParallelDispatcher.this.cancel(); + if (subscriberCount.get() == m) { + drain(); } - }); + } + } + + @Override + public void cancel() { + if (compareAndSet(false, true)) { + ParallelDispatcher.this.cancel(m); + } } } @@ -209,8 +227,8 @@ public void onComplete() { drain(); } - void cancel() { - if (!cancelled) { + void cancel(int m) { + if (requests.decrementAndGet(m) == 0) { cancelled = true; this.s.cancel(); diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelJoin.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelJoin.java index 7f44e8c5ae..3d414fba39 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelJoin.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelJoin.java @@ -22,7 +22,7 @@ import io.reactivex.internal.fuseable.*; import io.reactivex.internal.queue.SpscArrayQueue; import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.internal.util.*; import io.reactivex.parallel.ParallelFlowable; import io.reactivex.plugins.RxJavaPlugins; @@ -38,20 +38,27 @@ public final class ParallelJoin extends Flowable { final int prefetch; - public ParallelJoin(ParallelFlowable source, int prefetch) { + final boolean delayErrors; + + public ParallelJoin(ParallelFlowable source, int prefetch, boolean delayErrors) { this.source = source; this.prefetch = prefetch; + this.delayErrors = delayErrors; } @Override protected void subscribeActual(Subscriber s) { - JoinSubscription parent = new JoinSubscription(s, source.parallelism(), prefetch); + JoinSubscriptionBase parent; + if (delayErrors) { + parent = new JoinSubscriptionDelayError(s, source.parallelism(), prefetch); + } else { + parent = new JoinSubscription(s, source.parallelism(), prefetch); + } s.onSubscribe(parent); source.subscribe(parent.subscribers); } - static final class JoinSubscription - extends AtomicInteger + abstract static class JoinSubscriptionBase extends AtomicInteger implements Subscription { private static final long serialVersionUID = 3100232009247827843L; @@ -60,7 +67,7 @@ static final class JoinSubscription final JoinInnerSubscriber[] subscribers; - final AtomicReference error = new AtomicReference(); + final AtomicThrowable errors = new AtomicThrowable(); final AtomicLong requested = new AtomicLong(); @@ -68,7 +75,7 @@ static final class JoinSubscription final AtomicInteger done = new AtomicInteger(); - JoinSubscription(Subscriber actual, int n, int prefetch) { + JoinSubscriptionBase(Subscriber actual, int n, int prefetch) { this.actual = actual; @SuppressWarnings("unchecked") JoinInnerSubscriber[] a = new JoinInnerSubscriber[n]; @@ -114,7 +121,25 @@ void cleanup() { } } - void onNext(JoinInnerSubscriber inner, T value) { + abstract void onNext(JoinInnerSubscriber inner, T value); + + abstract void onError(Throwable e); + + abstract void onComplete(); + + abstract void drain(); + } + + static final class JoinSubscription extends JoinSubscriptionBase { + + private static final long serialVersionUID = 6312374661811000451L; + + JoinSubscription(Subscriber actual, int n, int prefetch) { + super(actual, n, prefetch); + } + + @Override + public void onNext(JoinInnerSubscriber inner, T value) { if (get() == 0 && compareAndSet(0, 1)) { if (requested.get() != 0) { actual.onNext(value); @@ -128,7 +153,7 @@ void onNext(JoinInnerSubscriber inner, T value) { if (!q.offer(value)) { cancelAll(); Throwable mbe = new MissingBackpressureException("Queue full?!"); - if (error.compareAndSet(null, mbe)) { + if (errors.compareAndSet(null, mbe)) { actual.onError(mbe); } else { RxJavaPlugins.onError(mbe); @@ -156,18 +181,20 @@ void onNext(JoinInnerSubscriber inner, T value) { drainLoop(); } - void onError(Throwable e) { - if (error.compareAndSet(null, e)) { + @Override + public void onError(Throwable e) { + if (errors.compareAndSet(null, e)) { cancelAll(); drain(); } else { - if (e != error.get()) { + if (e != errors.get()) { RxJavaPlugins.onError(e); } } } - void onComplete() { + @Override + public void onComplete() { done.decrementAndGet(); drain(); } @@ -199,7 +226,7 @@ void drainLoop() { return; } - Throwable ex = error.get(); + Throwable ex = errors.get(); if (ex != null) { cleanup(); a.onError(ex); @@ -244,7 +271,7 @@ void drainLoop() { return; } - Throwable ex = error.get(); + Throwable ex = errors.get(); if (ex != null) { cleanup(); a.onError(ex); @@ -288,14 +315,184 @@ void drainLoop() { } } + static final class JoinSubscriptionDelayError extends JoinSubscriptionBase { + + private static final long serialVersionUID = -5737965195918321883L; + + JoinSubscriptionDelayError(Subscriber actual, int n, int prefetch) { + super(actual, n, prefetch); + } + + void onNext(JoinInnerSubscriber inner, T value) { + if (get() == 0 && compareAndSet(0, 1)) { + if (requested.get() != 0) { + actual.onNext(value); + if (requested.get() != Long.MAX_VALUE) { + requested.decrementAndGet(); + } + inner.request(1); + } else { + SimplePlainQueue q = inner.getQueue(); + + if (!q.offer(value)) { + inner.cancel(); + errors.addThrowable(new MissingBackpressureException("Queue full?!")); + done.decrementAndGet(); + drainLoop(); + return; + } + } + if (decrementAndGet() == 0) { + return; + } + } else { + SimplePlainQueue q = inner.getQueue(); + + if (!q.offer(value)) { + if (inner.cancel()) { + errors.addThrowable(new MissingBackpressureException("Queue full?!")); + done.decrementAndGet(); + } + } + + if (getAndIncrement() != 0) { + return; + } + } + + drainLoop(); + } + + void onError(Throwable e) { + errors.addThrowable(e); + done.decrementAndGet(); + drain(); + } + + void onComplete() { + done.decrementAndGet(); + drain(); + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + drainLoop(); + } + + void drainLoop() { + int missed = 1; + + JoinInnerSubscriber[] s = this.subscribers; + int n = s.length; + Subscriber a = this.actual; + + for (;;) { + + long r = requested.get(); + long e = 0; + + middle: + while (e != r) { + if (cancelled) { + cleanup(); + return; + } + + boolean d = done.get() == 0; + + boolean empty = true; + + for (int i = 0; i < n; i++) { + JoinInnerSubscriber inner = s[i]; + + SimplePlainQueue q = inner.queue; + if (q != null) { + T v = q.poll(); + + if (v != null) { + empty = false; + a.onNext(v); + inner.requestOne(); + if (++e == r) { + break middle; + } + } + } + } + + if (d && empty) { + Throwable ex = errors.get(); + if (ex != null) { + a.onError(errors.terminate()); + } else { + a.onComplete(); + } + return; + } + + if (empty) { + break; + } + } + + if (e == r) { + if (cancelled) { + cleanup(); + return; + } + + boolean d = done.get() == 0; + + boolean empty = true; + + for (int i = 0; i < n; i++) { + JoinInnerSubscriber inner = s[i]; + + SimpleQueue q = inner.queue; + if (q != null && !q.isEmpty()) { + empty = false; + break; + } + } + + if (d && empty) { + Throwable ex = errors.get(); + if (ex != null) { + a.onError(errors.terminate()); + } else { + a.onComplete(); + } + return; + } + } + + if (e != 0 && r != Long.MAX_VALUE) { + requested.addAndGet(-e); + } + + int w = get(); + if (w == missed) { + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } else { + missed = w; + } + } + } + } + static final class JoinInnerSubscriber extends AtomicReference implements FlowableSubscriber { - private static final long serialVersionUID = 8410034718427740355L; - final JoinSubscription parent; + final JoinSubscriptionBase parent; final int prefetch; @@ -305,9 +502,7 @@ static final class JoinInnerSubscriber volatile SimplePlainQueue queue; - volatile boolean done; - - JoinInnerSubscriber(JoinSubscription parent, int prefetch) { + JoinInnerSubscriber(JoinSubscriptionBase parent, int prefetch) { this.parent = parent; this.prefetch = prefetch ; this.limit = prefetch - (prefetch >> 2); @@ -355,8 +550,8 @@ public void request(long n) { } } - public void cancel() { - SubscriptionHelper.cancel(this); + public boolean cancel() { + return SubscriptionHelper.cancel(this); } SimplePlainQueue getQueue() { diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowable.java b/src/main/java/io/reactivex/parallel/ParallelFlowable.java index 8622922dd5..f87a0a35f7 100644 --- a/src/main/java/io/reactivex/parallel/ParallelFlowable.java +++ b/src/main/java/io/reactivex/parallel/ParallelFlowable.java @@ -247,6 +247,7 @@ public final ParallelFlowable reduce(Callable initialSupplier, BiFunct * * @return the new Flowable instance * @see ParallelFlowable#sequential(int) + * @see ParallelFlowable#sequentialDelayError() */ @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @@ -269,13 +270,65 @@ public final Flowable sequential() { * @param prefetch the prefetch amount to use for each rail * @return the new Flowable instance * @see ParallelFlowable#sequential() + * @see ParallelFlowable#sequentialDelayError(int) */ @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @CheckReturnValue public final Flowable sequential(int prefetch) { ObjectHelper.verifyPositive(prefetch, "prefetch"); - return RxJavaPlugins.onAssembly(new ParallelJoin(this, prefetch)); + return RxJavaPlugins.onAssembly(new ParallelJoin(this, prefetch, false)); + } + + /** + * Merges the values from each 'rail' in a round-robin or same-order fashion and + * exposes it as a regular Flowable sequence, running with a default prefetch value + * for the rails and delaying errors from all rails till all terminate. + *

+ * This operator uses the default prefetch size returned by {@code Flowable.bufferSize()}. + * + *

+ *
Backpressure:
+ *
The operator honors backpressure.
+ *
Scheduler:
+ *
{@code sequential} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new Flowable instance + * @see ParallelFlowable#sequentialDelayError(int) + * @see ParallelFlowable#sequential() + * @since 2.0.7 - experimental + */ + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @CheckReturnValue + @Experimental + public final Flowable sequentialDelayError() { + return sequentialDelayError(Flowable.bufferSize()); + } + + /** + * Merges the values from each 'rail' in a round-robin or same-order fashion and + * exposes it as a regular Publisher sequence, running with a give prefetch value + * for the rails and delaying errors from all rails till all terminate. + * + *
+ *
Backpressure:
+ *
The operator honors backpressure.
+ *
Scheduler:
+ *
{@code sequential} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param prefetch the prefetch amount to use for each rail + * @return the new Flowable instance + * @see ParallelFlowable#sequential() + * @see ParallelFlowable#sequentialDelayError() + * @since 2.0.7 - experimental + */ + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @CheckReturnValue + public final Flowable sequentialDelayError(int prefetch) { + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new ParallelJoin(this, prefetch, true)); } /** diff --git a/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java b/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java index 12b903fda2..c46d6a07c5 100644 --- a/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java +++ b/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java @@ -803,7 +803,7 @@ public boolean test(Integer v) throws Exception { @Test public void filterThrows() throws Exception { final boolean[] cancelled = { false }; - Flowable.range(1, 20) + Flowable.range(1, 20).concatWith(Flowable.never()) .doOnCancel(new Action() { @Override public void run() throws Exception { @@ -912,6 +912,9 @@ public void errorNotRepeating() throws Exception { Thread.sleep(300); + for (Throwable ex : errors) { + ex.printStackTrace(); + } assertTrue(errors.toString(), errors.isEmpty()); } finally { RxJavaPlugins.reset(); diff --git a/src/test/java/io/reactivex/parallel/ParallelJoinTest.java b/src/test/java/io/reactivex/parallel/ParallelJoinTest.java index 9cff824b63..3854641d8f 100644 --- a/src/test/java/io/reactivex/parallel/ParallelJoinTest.java +++ b/src/test/java/io/reactivex/parallel/ParallelJoinTest.java @@ -13,11 +13,14 @@ package io.reactivex.parallel; +import java.util.List; + import org.junit.Test; import org.reactivestreams.Subscriber; -import io.reactivex.Flowable; -import io.reactivex.exceptions.MissingBackpressureException; +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.subscribers.TestSubscriber; @@ -85,4 +88,205 @@ public void emptyBackpressured() { .test(0) .assertResult(); } + + @Test + public void overflowFastpathDelayError() { + new ParallelFlowable() { + @Override + public void subscribe(Subscriber[] subscribers) { + subscribers[0].onSubscribe(new BooleanSubscription()); + subscribers[0].onNext(1); + subscribers[0].onNext(2); + } + + @Override + public int parallelism() { + return 1; + } + } + .sequentialDelayError(1) + .test(0) + .requestMore(1) + .assertFailure(MissingBackpressureException.class, 1); + } + + @Test + public void overflowSlowpathDelayError() { + @SuppressWarnings("unchecked") + final Subscriber[] subs = new Subscriber[1]; + + TestSubscriber ts = new TestSubscriber(1) { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + subs[0].onNext(2); + subs[0].onNext(3); + } + } + }; + + new ParallelFlowable() { + @Override + public void subscribe(Subscriber[] subscribers) { + subs[0] = subscribers[0]; + subscribers[0].onSubscribe(new BooleanSubscription()); + subscribers[0].onNext(1); + } + + @Override + public int parallelism() { + return 1; + } + } + .sequentialDelayError(1) + .subscribe(ts); + + ts.request(1); + + ts.assertFailure(MissingBackpressureException.class, 1, 2); + } + + @Test + public void emptyBackpressuredDelayError() { + Flowable.empty() + .parallel() + .sequentialDelayError() + .test(0) + .assertResult(); + } + + @Test + public void delayError() { + TestSubscriber flow = Flowable.range(1, 2) + .parallel(2) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .sequentialDelayError() + .test() + .assertFailure(CompositeException.class); + + List error = TestHelper.errorList(flow); + TestHelper.assertError(error, 0, TestException.class); + TestHelper.assertError(error, 1, TestException.class); + } + + @Test + public void normalDelayError() { + Flowable.just(1) + .parallel(1) + .sequentialDelayError(1) + .test() + .assertResult(1); + } + + @Test + public void rangeDelayError() { + Flowable.range(1, 2) + .parallel(1) + .sequentialDelayError(1) + .take(1) + .test() + .assertResult(1); + } + + @Test + public void rangeDelayErrorBackpressure() { + Flowable.range(1, 3) + .parallel(1) + .sequentialDelayError(1) + .take(2) + .rebatchRequests(1) + .test() + .assertResult(1, 2); + } + + @Test + public void rangeDelayErrorBackpressure2() { + Flowable.range(1, 3) + .parallel(1) + .sequentialDelayError(1) + .rebatchRequests(1) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void delayErrorCancelBackpressured() { + TestSubscriber ts = Flowable.range(1, 3) + .parallel(1) + .sequentialDelayError(1) + .test(0); + + ts + .cancel(); + + ts.assertEmpty(); + } + + @Test + public void delayErrorCancelBackpressured2() { + TestSubscriber ts = Flowable.empty() + .parallel(1) + .sequentialDelayError(1) + .test(0); + + ts.assertResult(); + } + + @Test + public void consumerCancelsAfterOne() { + TestSubscriber ts = new TestSubscriber(1) { + @Override + public void onNext(Integer t) { + super.onNext(t); + cancel(); + onComplete(); + } + }; + + Flowable.range(1, 3) + .parallel(1) + .sequential() + .subscribe(ts); + + ts.assertResult(1); + } + + @Test + public void delayErrorConsumerCancelsAfterOne() { + TestSubscriber ts = new TestSubscriber(1) { + @Override + public void onNext(Integer t) { + super.onNext(t); + cancel(); + onComplete(); + } + }; + + Flowable.range(1, 3) + .parallel(1) + .sequentialDelayError() + .subscribe(ts); + + ts.assertResult(1); + } + + @Test + public void delayErrorDrainTrigger() { + Flowable.range(1, 3) + .parallel(1) + .sequentialDelayError() + .test(0) + .requestMore(1) + .assertValues(1) + .requestMore(1) + .assertValues(1, 2) + .requestMore(1) + .assertResult(1, 2, 3); + } } diff --git a/src/test/java/io/reactivex/parallel/ParallelPeekTest.java b/src/test/java/io/reactivex/parallel/ParallelPeekTest.java index 2eca0fe420..deb5c2a6ba 100644 --- a/src/test/java/io/reactivex/parallel/ParallelPeekTest.java +++ b/src/test/java/io/reactivex/parallel/ParallelPeekTest.java @@ -185,7 +185,9 @@ public void run() throws Exception { assertFalse(errors.isEmpty()); for (Throwable ex : errors) { - assertTrue(ex.toString(), ex.getCause() instanceof TestException); + Throwable exc = ex.getCause(); + assertTrue(ex.toString(), exc instanceof TestException + || exc instanceof IOException); } } finally { RxJavaPlugins.reset(); From 97b29c3536cefbb2f3da67bb38e6a1424cdd15f0 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sun, 19 Feb 2017 00:16:13 +0100 Subject: [PATCH 2/2] Fix javadoc, make sure failed rails are ignored. --- .../parallel/ParallelFromPublisher.java | 18 +++++----- .../reactivex/parallel/ParallelFlowable.java | 4 +-- .../reactivex/parallel/ParallelJoinTest.java | 36 +++++++++++++++++++ 3 files changed, 46 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java index 5abd0f7595..2053570845 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java @@ -102,8 +102,8 @@ static final class ParallelDispatcher this.prefetch = prefetch; this.limit = prefetch - (prefetch >> 2); int m = subscribers.length; - this.requests = new AtomicLongArray(m + 1); - this.requests.lazySet(m, m); + this.requests = new AtomicLongArray(m + m + 1); + this.requests.lazySet(m + m, m); this.emissions = new long[m]; } @@ -161,9 +161,7 @@ void setupSubscribers() { } } - final class RailSubscription extends AtomicBoolean implements Subscription { - - private static final long serialVersionUID = 7289979168658050255L; + final class RailSubscription implements Subscription { final int j; @@ -196,8 +194,8 @@ public void request(long n) { @Override public void cancel() { - if (compareAndSet(false, true)) { - ParallelDispatcher.this.cancel(m); + if (requests.compareAndSet(m + j, 0L, 1L)) { + ParallelDispatcher.this.cancel(m + m); } } } @@ -228,7 +226,7 @@ public void onComplete() { } void cancel(int m) { - if (requests.decrementAndGet(m) == 0) { + if (requests.decrementAndGet(m) == 0L) { cancelled = true; this.s.cancel(); @@ -286,7 +284,7 @@ void drainAsync() { long ridx = r.get(idx); long eidx = e[idx]; - if (ridx != eidx) { + if (ridx != eidx && r.get(n + idx) == 0) { T v; @@ -374,7 +372,7 @@ void drainSync() { long ridx = r.get(idx); long eidx = e[idx]; - if (ridx != eidx) { + if (ridx != eidx && r.get(n + idx) == 0) { T v; diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowable.java b/src/main/java/io/reactivex/parallel/ParallelFlowable.java index f87a0a35f7..b2eab028c1 100644 --- a/src/main/java/io/reactivex/parallel/ParallelFlowable.java +++ b/src/main/java/io/reactivex/parallel/ParallelFlowable.java @@ -291,7 +291,7 @@ public final Flowable sequential(int prefetch) { *
Backpressure:
*
The operator honors backpressure.
*
Scheduler:
- *
{@code sequential} does not operate by default on a particular {@link Scheduler}.
+ *
{@code sequentialDelayError} does not operate by default on a particular {@link Scheduler}.
* * @return the new Flowable instance * @see ParallelFlowable#sequentialDelayError(int) @@ -315,7 +315,7 @@ public final Flowable sequentialDelayError() { *
Backpressure:
*
The operator honors backpressure.
*
Scheduler:
- *
{@code sequential} does not operate by default on a particular {@link Scheduler}.
+ *
{@code sequentialDelayError} does not operate by default on a particular {@link Scheduler}.
* * @param prefetch the prefetch amount to use for each rail * @return the new Flowable instance diff --git a/src/test/java/io/reactivex/parallel/ParallelJoinTest.java b/src/test/java/io/reactivex/parallel/ParallelJoinTest.java index 3854641d8f..80ffade276 100644 --- a/src/test/java/io/reactivex/parallel/ParallelJoinTest.java +++ b/src/test/java/io/reactivex/parallel/ParallelJoinTest.java @@ -289,4 +289,40 @@ public void delayErrorDrainTrigger() { .requestMore(1) .assertResult(1, 2, 3); } + + @Test + public void failedRailIsIgnored() { + Flowable.range(1, 4) + .parallel(2) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + if (v == 1) { + throw new TestException(); + } + return v; + } + }) + .sequentialDelayError() + .test() + .assertFailure(TestException.class, 2, 3, 4); + } + + @Test + public void failedRailIsIgnoredHidden() { + Flowable.range(1, 4).hide() + .parallel(2) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + if (v == 1) { + throw new TestException(); + } + return v; + } + }) + .sequentialDelayError() + .test() + .assertFailure(TestException.class, 2, 3, 4); + } }