From 17ecd213600e8f78459389cbb1ef38c2083fea42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Thu, 20 Oct 2016 21:40:13 +0200 Subject: [PATCH 1/2] 2.x: coverage, fixes, cleanup 10/20-2 --- .../internal/fuseable/SimplePlainQueue.java | 25 ++ .../operators/flowable/FlowableFlatMap.java | 198 +++------ .../operators/flowable/FlowableGenerate.java | 37 +- .../operators/flowable/FlowableObserveOn.java | 99 +++-- .../FlowableWindowBoundarySelector.java | 30 +- .../observable/ObservableFlatMap.java | 45 +- .../observable/ObservableGenerate.java | 6 +- .../internal/queue/MpscLinkedQueue.java | 4 +- .../internal/queue/SpscArrayQueue.java | 4 +- .../internal/queue/SpscLinkedArrayQueue.java | 4 +- .../subscribers/QueueDrainSubscriber.java | 8 +- .../io/reactivex/flowable/FlowableTests.java | 8 +- .../flowable/FlowableDoOnLifecycleTest.java | 135 ++++++ .../flowable/FlowableFlatMapTest.java | 46 ++- .../flowable/FlowableFromObservableTest.java | 34 ++ .../flowable/FlowableGenerateTest.java | 112 ++++- .../flowable/FlowableObserveOnTest.java | 386 +++++++++++++++++- .../flowable/FlowableRangeLongTest.java | 1 - ...lowableWindowWithStartEndFlowableTest.java | 9 + .../observable/ObservableFlatMapTest.java | 45 +- .../observable/ObservableGenerateTest.java | 21 + .../reactivex/observable/ObservableTest.java | 9 +- 22 files changed, 976 insertions(+), 290 deletions(-) create mode 100644 src/main/java/io/reactivex/internal/fuseable/SimplePlainQueue.java create mode 100644 src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycleTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/flowable/FlowableFromObservableTest.java diff --git a/src/main/java/io/reactivex/internal/fuseable/SimplePlainQueue.java b/src/main/java/io/reactivex/internal/fuseable/SimplePlainQueue.java new file mode 100644 index 0000000000..9c04a50186 --- /dev/null +++ b/src/main/java/io/reactivex/internal/fuseable/SimplePlainQueue.java @@ -0,0 +1,25 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.fuseable; + +/** + * Override of the SimpleQueue interface with no throws Exception on poll. + * + * @param the value type to enqueue and dequeue, not null + */ +public interface SimplePlainQueue extends SimpleQueue { + + @Override + T poll(); +} diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java index 2ff03470bb..781685edc0 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java @@ -13,10 +13,6 @@ package io.reactivex.internal.operators.flowable; -import io.reactivex.plugins.RxJavaPlugins; - -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.atomic.*; @@ -25,10 +21,12 @@ import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.fuseable.*; import io.reactivex.internal.queue.*; import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; public final class FlowableFlatMap extends AbstractFlowableWithUpstream { final Function> mapper; @@ -64,13 +62,11 @@ static final class MergeSubscriber extends AtomicInteger implements Subscr final int maxConcurrency; final int bufferSize; - volatile SimpleQueue queue; + volatile SimplePlainQueue queue; volatile boolean done; - final AtomicReference> errors = new AtomicReference>(); - - static final SimpleQueue ERRORS_CLOSED = new RejectingQueue(); + final AtomicThrowable errs = new AtomicThrowable(); volatile boolean cancelled; @@ -126,7 +122,7 @@ public void onNext(T t) { } Publisher p; try { - p = mapper.apply(t); + p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Publisher"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.cancel(); @@ -140,7 +136,7 @@ public void onNext(T t) { u = ((Callable)p).call(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - getErrorQueue().offer(ex); + errs.addThrowable(ex); drain(); return; } @@ -210,7 +206,7 @@ void removeInner(InnerSubscriber inner) { } SimpleQueue getMainQueue() { - SimpleQueue q = queue; + SimplePlainQueue q = queue; if (q == null) { if (maxConcurrency == Integer.MAX_VALUE) { q = new SpscLinkedArrayQueue(bufferSize); @@ -316,9 +312,12 @@ public void onError(Throwable t) { RxJavaPlugins.onError(t); return; } - getErrorQueue().offer(t); - done = true; - drain(); + if (errs.addThrowable(t)) { + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); + } } @Override @@ -343,9 +342,13 @@ public void request(long n) { public void cancel() { if (!cancelled) { cancelled = true; + s.cancel(); + disposeAll(); if (getAndIncrement() == 0) { - s.cancel(); - disposeAll(); + SimpleQueue q = queue; + if (q != null) { + q.clear(); + } } } } @@ -363,7 +366,7 @@ void drainLoop() { if (checkTerminate()) { return; } - SimpleQueue svq = queue; + SimplePlainQueue svq = queue; long r = requested.get(); boolean unbounded = r == Long.MAX_VALUE; @@ -375,12 +378,8 @@ void drainLoop() { long scalarEmission = 0; U o = null; while (r != 0L) { - try { - o = svq.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - getErrorQueue().offer(ex); - } + o = svq.poll(); + if (checkTerminate()) { return; } @@ -413,11 +412,11 @@ void drainLoop() { int n = inner.length; if (d && (svq == null || svq.isEmpty()) && n == 0) { - SimpleQueue e = errors.get(); - if (e == null || e.isEmpty()) { + Throwable ex = errs.terminate(); + if (ex == null) { child.onComplete(); } else { - reportError(e); + child.onError(ex); } return; } @@ -447,6 +446,7 @@ void drainLoop() { } int j = index; + sourceLoop: for (int i = 0; i < n; i++) { if (checkTerminate()) { return; @@ -456,26 +456,29 @@ void drainLoop() { U o = null; for (;;) { + if (checkTerminate()) { + return; + } + SimpleQueue q = is.queue; + if (q == null) { + break; + } long produced = 0; while (r != 0L) { - if (checkTerminate()) { - return; - } - SimpleQueue q = is.queue; - if (q == null) { - break; - } try { o = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - - s.cancel(); - disposeAll(); - - child.onError(ex); - return; + is.dispose(); + errs.addThrowable(ex); + if (checkTerminate()) { + return; + } + removeInner(is); + innerCompleted = true; + i++; + continue sourceLoop; } if (o == null) { break; @@ -483,6 +486,10 @@ void drainLoop() { child.onNext(o); + if (checkTerminate()) { + return; + } + r--; produced++; } @@ -536,86 +543,31 @@ void drainLoop() { boolean checkTerminate() { if (cancelled) { - s.cancel(); - disposeAll(); + SimpleQueue q = queue; + if (q != null) { + q.clear(); + } return true; } - SimpleQueue e = errors.get(); - if (!delayErrors && (e != null && !e.isEmpty())) { - try { - reportError(e); - } finally { - disposeAll(); - } + if (!delayErrors && errs.get() != null) { + actual.onError(errs.terminate()); return true; } return false; } - void reportError(SimpleQueue q) { - List composite = null; - Throwable ex = null; - - for (;;) { - Throwable t; - try { - t = q.poll(); - } catch (Throwable exc) { - Exceptions.throwIfFatal(exc); - if (ex == null) { - ex = exc; - } else { - if (composite == null) { - composite = new ArrayList(); - composite.add(ex); - } - composite.add(exc); - } - break; - } - - if (t == null) { - break; - } - if (ex == null) { - ex = t; - } else { - if (composite == null) { - composite = new ArrayList(); - composite.add(ex); - } - composite.add(t); - } - } - if (composite != null) { - actual.onError(new CompositeException(composite)); - } else { - actual.onError(ex); - } - } - void disposeAll() { InnerSubscriber[] a = subscribers.get(); if (a != CANCELLED) { a = subscribers.getAndSet(CANCELLED); if (a != CANCELLED) { - errors.getAndSet(ERRORS_CLOSED); for (InnerSubscriber inner : a) { inner.dispose(); } - } - } - } - - SimpleQueue getErrorQueue() { - for (;;) { - SimpleQueue q = errors.get(); - if (q != null) { - return q; - } - q = new MpscLinkedQueue(); - if (errors.compareAndSet(null, q)) { - return q; + Throwable ex = errs.terminate(); + if (ex != null && ex != ExceptionHelper.TERMINATED) { + RxJavaPlugins.onError(ex); + } } } } @@ -676,9 +628,12 @@ public void onNext(U t) { } @Override public void onError(Throwable t) { - parent.getErrorQueue().offer(t); - done = true; - parent.drain(); + if (parent.errs.addThrowable(t)) { + done = true; + parent.drain(); + } else { + RxJavaPlugins.onError(t); + } } @Override public void onComplete() { @@ -708,31 +663,4 @@ public boolean isDisposed() { return get() == SubscriptionHelper.CANCELLED; } } - - static final class RejectingQueue implements SimpleQueue { - @Override - public boolean offer(T e) { - return false; - } - - @Override - public boolean offer(T v1, T v2) { - return false; - } - - @Override - public T poll() { - return null; - } - - @Override - public void clear() { - - } - - @Override - public boolean isEmpty() { - return true; - } - } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java index b8da2c0cd9..84c82125d5 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java @@ -93,14 +93,7 @@ public void request(long n) { final BiFunction, S> f = generator; for (;;) { - if (cancelled) { - dispose(s); - return; - } - - boolean unbounded = n == Long.MAX_VALUE; // NOPMD - - while (n != 0L) { + while (e != n) { if (cancelled) { dispose(s); @@ -122,30 +115,16 @@ public void request(long n) { return; } - n--; - e--; + e++; } - if (!unbounded) { - n = get(); - if (n == Long.MAX_VALUE) { - continue; - } - n += e; - if (n != 0L) { - continue; // keep draining and delay the addAndGet as much as possible + n = get(); + if (e == n) { + n = addAndGet(-e); + if (n == 0L) { + break; } - } - if (e != 0L) { - if (!unbounded) { - state = s; // save state in case we run out of requests - n = addAndGet(e); - e = 0L; - } - } - - if (n == 0L) { - break; + e = 0L; } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java index c06222ad47..a5d118e8c4 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java @@ -49,10 +49,10 @@ public void subscribeActual(Subscriber s) { Worker worker = scheduler.createWorker(); if (s instanceof ConditionalSubscriber) { - source.subscribe(new PublisherObserveOnConditionalSubscriber( + source.subscribe(new ObserveOnConditionalSubscriber( (ConditionalSubscriber) s, worker, delayError, prefetch)); } else { - source.subscribe(new PublisherObserveOnSubscriber(s, worker, delayError, prefetch)); + source.subscribe(new ObserveOnSubscriber(s, worker, delayError, prefetch)); } } @@ -192,21 +192,24 @@ final boolean checkTerminated(boolean d, boolean empty, Subscriber a) { if (empty) { Throwable e = error; if (e != null) { - doError(a, e); + a.onError(e); } else { - doComplete(a); + a.onComplete(); } + worker.dispose(); return true; } } else { Throwable e = error; if (e != null) { clear(); - doError(a, e); + a.onError(e); + worker.dispose(); return true; } else if (empty) { - doComplete(a); + a.onComplete(); + worker.dispose(); return true; } } @@ -215,22 +218,6 @@ final boolean checkTerminated(boolean d, boolean empty, Subscriber a) { return false; } - final void doComplete(Subscriber a) { - try { - a.onComplete(); - } finally { - worker.dispose(); - } - } - - final void doError(Subscriber a, Throwable e) { - try { - a.onError(e); - } finally { - worker.dispose(); - } - } - @Override public final int requestFusion(int requestedMode) { if ((requestedMode & ASYNC) != 0) { @@ -251,14 +238,14 @@ public final boolean isEmpty() { } } - static final class PublisherObserveOnSubscriber extends BaseObserveOnSubscriber + static final class ObserveOnSubscriber extends BaseObserveOnSubscriber implements Subscriber { private static final long serialVersionUID = -4547113800637756442L; final Subscriber actual; - PublisherObserveOnSubscriber( + ObserveOnSubscriber( Subscriber actual, Worker worker, boolean delayError, @@ -326,7 +313,9 @@ void runSync() { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - doError(a, ex); + s.cancel(); + a.onError(ex); + worker.dispose(); return; } @@ -334,7 +323,8 @@ void runSync() { return; } if (v == null) { - doComplete(a); + a.onComplete(); + worker.dispose(); return; } @@ -343,15 +333,14 @@ void runSync() { e++; } - if (e == r) { - if (cancelled) { - return; - } + if (cancelled) { + return; + } - if (q.isEmpty()) { - doComplete(a); - return; - } + if (q.isEmpty()) { + a.onComplete(); + worker.dispose(); + return; } int w = get(); @@ -392,7 +381,8 @@ void runAsync() { s.cancel(); q.clear(); - doError(a, ex); + a.onError(ex); + worker.dispose(); return; } @@ -452,10 +442,11 @@ void runBackfused() { if (d) { Throwable e = error; if (e != null) { - doError(actual, e); + actual.onError(e); } else { - doComplete(actual); + actual.onComplete(); } + worker.dispose(); return; } @@ -483,7 +474,7 @@ public T poll() throws Exception { } - static final class PublisherObserveOnConditionalSubscriber + static final class ObserveOnConditionalSubscriber extends BaseObserveOnSubscriber { private static final long serialVersionUID = 644624475404284533L; @@ -492,7 +483,7 @@ static final class PublisherObserveOnConditionalSubscriber long consumed; - PublisherObserveOnConditionalSubscriber( + ObserveOnConditionalSubscriber( ConditionalSubscriber actual, Worker worker, boolean delayError, @@ -559,7 +550,9 @@ void runSync() { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - doError(a, ex); + s.cancel(); + a.onError(ex); + worker.dispose(); return; } @@ -567,7 +560,8 @@ void runSync() { return; } if (v == null) { - doComplete(a); + a.onComplete(); + worker.dispose(); return; } @@ -576,15 +570,14 @@ void runSync() { } } - if (e == r) { - if (cancelled) { - return; - } + if (cancelled) { + return; + } - if (q.isEmpty()) { - doComplete(a); - return; - } + if (q.isEmpty()) { + a.onComplete(); + worker.dispose(); + return; } int w = get(); @@ -625,7 +618,8 @@ void runAsync() { s.cancel(); q.clear(); - doError(a, ex); + a.onError(ex); + worker.dispose(); return; } boolean empty = v == null; @@ -686,10 +680,11 @@ void runBackfused() { if (d) { Throwable e = error; if (e != null) { - doError(actual, e); + actual.onError(e); } else { - doComplete(actual); + actual.onComplete(); } + worker.dispose(); return; } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java index 12f8bd0d1e..1da7422ad0 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java @@ -20,11 +20,11 @@ import io.reactivex.Flowable; import io.reactivex.disposables.*; -import io.reactivex.exceptions.*; +import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.functions.Function; import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.fuseable.SimpleQueue; +import io.reactivex.internal.fuseable.SimplePlainQueue; import io.reactivex.internal.queue.MpscLinkedQueue; import io.reactivex.internal.subscribers.QueueDrainSubscriber; import io.reactivex.internal.subscriptions.SubscriptionHelper; @@ -162,17 +162,6 @@ public void onComplete() { actual.onComplete(); } - - - void complete() { - if (windows.decrementAndGet() == 0) { - s.cancel(); - resources.dispose(); - } - - actual.onComplete(); - } - void error(Throwable t) { s.cancel(); resources.dispose(); @@ -197,7 +186,7 @@ void dispose() { } void drainLoop() { - final SimpleQueue q = queue; + final SimplePlainQueue q = queue; final Subscriber> a = actual; final List> ws = this.ws; int missed = 1; @@ -206,18 +195,7 @@ void drainLoop() { for (;;) { boolean d = done; - Object o; - - try { - o = q.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - dispose(); - for (UnicastProcessor w : ws) { - w.onError(ex); - } - return; - } + Object o = q.poll(); boolean empty = o == null; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java index 05a8532db3..973c520bd4 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java @@ -65,7 +65,7 @@ static final class MergeObserver extends AtomicInteger implements Disposab final int maxConcurrency; final int bufferSize; - volatile SimpleQueue queue; + volatile SimplePlainQueue queue; volatile boolean done; @@ -235,7 +235,7 @@ void tryEmitScalar(Callable value) { return; } } else { - SimpleQueue q = queue; + SimplePlainQueue q = queue; if (q == null) { if (maxConcurrency == Integer.MAX_VALUE) { q = new SpscLinkedArrayQueue(bufferSize); @@ -330,7 +330,7 @@ void drainLoop() { if (checkTerminate()) { return; } - SimpleQueue svq = queue; + SimplePlainQueue svq = queue; if (svq != null) { for (;;) { @@ -339,13 +339,9 @@ void drainLoop() { if (checkTerminate()) { return; } - try { - o = svq.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - errors.addThrowable(ex); - continue; - } + + o = svq.poll(); + if (o == null) { break; } @@ -398,6 +394,7 @@ void drainLoop() { } int j = index; + sourceLoop: for (int i = 0; i < n; i++) { if (checkTerminate()) { return; @@ -407,26 +404,37 @@ void drainLoop() { U o = null; for (;;) { + if (checkTerminate()) { + return; + } + SimpleQueue q = is.queue; + if (q == null) { + break; + } for (;;) { - if (checkTerminate()) { - return; - } - SimpleQueue q = is.queue; - if (q == null) { - break; - } try { o = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + is.dispose(); errors.addThrowable(ex); - continue; + if (checkTerminate()) { + return; + } + removeInner(is); + innerCompleted = true; + i++; + continue sourceLoop; } if (o == null) { break; } child.onNext(o); + + if (checkTerminate()) { + return; + } } if (o == null) { break; @@ -478,6 +486,7 @@ boolean checkTerminate() { } Throwable e = errors.get(); if (!delayErrors && (e != null)) { + disposeAll(); actual.onError(errors.terminate()); return true; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java index f9fc5c8313..f5bcc6516d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java @@ -13,7 +13,6 @@ package io.reactivex.internal.operators.observable; -import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import io.reactivex.*; @@ -143,8 +142,11 @@ public void onNext(T t) { @Override public void onError(Throwable t) { + if (t == null) { + t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); + } terminate = true; - actual.onError(ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.")); + actual.onError(t); } @Override diff --git a/src/main/java/io/reactivex/internal/queue/MpscLinkedQueue.java b/src/main/java/io/reactivex/internal/queue/MpscLinkedQueue.java index 2bc5e3563c..a8074093a0 100644 --- a/src/main/java/io/reactivex/internal/queue/MpscLinkedQueue.java +++ b/src/main/java/io/reactivex/internal/queue/MpscLinkedQueue.java @@ -20,13 +20,13 @@ import java.util.concurrent.atomic.AtomicReference; -import io.reactivex.internal.fuseable.SimpleQueue; +import io.reactivex.internal.fuseable.SimplePlainQueue; /** * A multi-producer single consumer unbounded queue. * @param the contained value type */ -public final class MpscLinkedQueue implements SimpleQueue { +public final class MpscLinkedQueue implements SimplePlainQueue { private final AtomicReference> producerNode; private final AtomicReference> consumerNode; diff --git a/src/main/java/io/reactivex/internal/queue/SpscArrayQueue.java b/src/main/java/io/reactivex/internal/queue/SpscArrayQueue.java index c78b6451da..7cd6030e70 100644 --- a/src/main/java/io/reactivex/internal/queue/SpscArrayQueue.java +++ b/src/main/java/io/reactivex/internal/queue/SpscArrayQueue.java @@ -20,7 +20,7 @@ import java.util.concurrent.atomic.*; -import io.reactivex.internal.fuseable.SimpleQueue; +import io.reactivex.internal.fuseable.SimplePlainQueue; import io.reactivex.internal.util.Pow2; /** @@ -37,7 +37,7 @@ * * @param */ -public final class SpscArrayQueue extends AtomicReferenceArray implements SimpleQueue { +public final class SpscArrayQueue extends AtomicReferenceArray implements SimplePlainQueue { private static final long serialVersionUID = -1296597691183856449L; private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096); final int mask; diff --git a/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java b/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java index 3c03ed57c4..a521fd580b 100644 --- a/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java +++ b/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java @@ -20,7 +20,7 @@ import java.util.concurrent.atomic.*; -import io.reactivex.internal.fuseable.SimpleQueue; +import io.reactivex.internal.fuseable.SimplePlainQueue; import io.reactivex.internal.util.Pow2; /** @@ -28,7 +28,7 @@ * than the producer. * @param the contained value type */ -public final class SpscLinkedArrayQueue implements SimpleQueue { +public final class SpscLinkedArrayQueue implements SimplePlainQueue { static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096); final AtomicLong producerIndex = new AtomicLong(); diff --git a/src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java index 54efd91261..d7dbc6bd04 100644 --- a/src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java @@ -19,7 +19,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.MissingBackpressureException; -import io.reactivex.internal.fuseable.SimpleQueue; +import io.reactivex.internal.fuseable.*; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.*; @@ -33,14 +33,14 @@ */ public abstract class QueueDrainSubscriber extends QueueDrainSubscriberPad4 implements Subscriber, QueueDrain { protected final Subscriber actual; - protected final SimpleQueue queue; + protected final SimplePlainQueue queue; protected volatile boolean cancelled; protected volatile boolean done; protected Throwable error; - public QueueDrainSubscriber(Subscriber actual, SimpleQueue queue) { + public QueueDrainSubscriber(Subscriber actual, SimplePlainQueue queue) { this.actual = actual; this.queue = queue; } @@ -66,7 +66,7 @@ public final boolean fastEnter() { protected final void fastPathEmitMax(U value, boolean delayError, Disposable dispose) { final Subscriber s = actual; - final SimpleQueue q = queue; + final SimplePlainQueue q = queue; if (wip.get() == 0 && wip.compareAndSet(0, 1)) { long r = requested.get(); diff --git a/src/test/java/io/reactivex/flowable/FlowableTests.java b/src/test/java/io/reactivex/flowable/FlowableTests.java index dc816e2ec3..49707b71aa 100644 --- a/src/test/java/io/reactivex/flowable/FlowableTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableTests.java @@ -281,12 +281,8 @@ public Integer apply(Integer t1, Integer t2) { } }) .toFlowable() - .blockingForEach(new Consumer() { - @Override - public void accept(Integer t1) { - // do nothing ... we expect an exception instead - } - }); + .test() + .assertResult(); } /** diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycleTest.java new file mode 100644 index 0000000000..e44d54d541 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycleTest.java @@ -0,0 +1,135 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import java.util.List; + +import static org.junit.Assert.*; +import org.junit.Test; +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; + +public class FlowableDoOnLifecycleTest { + + @Test + public void onSubscribeCrashed() { + Flowable.just(1) + .doOnLifecycle(new Consumer() { + @Override + public void accept(Subscription s) throws Exception { + throw new TestException(); + } + }, Functions.EMPTY_LONG_CONSUMER, Functions.EMPTY_ACTION) + .test() + .assertFailure(TestException.class); + } + + @Test + public void doubleOnSubscribe() { + final int[] calls = { 0, 0 }; + + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable o) throws Exception { + return o + .doOnLifecycle(new Consumer() { + @Override + public void accept(Subscription s) throws Exception { + calls[0]++; + } + }, Functions.EMPTY_LONG_CONSUMER, new Action() { + @Override + public void run() throws Exception { + calls[1]++; + } + }); + } + }); + + assertEquals(2, calls[0]); + assertEquals(0, calls[1]); + } + + @Test + public void dispose() { + final int[] calls = { 0, 0 }; + + TestHelper.checkDisposed(Flowable.just(1) + .doOnLifecycle(new Consumer() { + @Override + public void accept(Subscription s) throws Exception { + calls[0]++; + } + }, Functions.EMPTY_LONG_CONSUMER, new Action() { + @Override + public void run() throws Exception { + calls[1]++; + } + }) + ); + + assertEquals(1, calls[0]); + assertEquals(2, calls[1]); + } + + @Test + public void requestCrashed() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.just(1) + .doOnLifecycle(Functions.emptyConsumer(), + new LongConsumer() { + @Override + public void accept(long v) throws Exception { + throw new TestException(); + } + }, + Functions.EMPTY_ACTION) + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void cancelCrashed() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.just(1) + .doOnLifecycle(Functions.emptyConsumer(), + Functions.EMPTY_LONG_CONSUMER, + new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .take(1) + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java index 400a9b9259..e2ff2ffcb2 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java @@ -24,8 +24,7 @@ import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.Flowable; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; @@ -847,4 +846,47 @@ public void run() { } } + + @Test + public void fusedInnerThrows() { + Flowable.just(1).hide() + .flatMap(new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + return Flowable.range(1, 2).map(new Function() { + @Override + public Object apply(Integer w) throws Exception { + throw new TestException(); + } + }); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void fusedInnerThrows2() { + TestSubscriber to = Flowable.range(1, 2).hide() + .flatMap(new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + return Flowable.range(1, 2).map(new Function() { + @Override + public Integer apply(Integer w) throws Exception { + throw new TestException(); + } + }); + } + }, true) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.errorList(to); + + TestHelper.assertError(errors, 0, TestException.class); + + TestHelper.assertError(errors, 1, TestException.class); + } + } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromObservableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromObservableTest.java new file mode 100644 index 0000000000..098f41247d --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFromObservableTest.java @@ -0,0 +1,34 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; + +public class FlowableFromObservableTest { + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.just(1).toFlowable(BackpressureStrategy.NONE)); + } + + @Test + public void error() { + Observable.error(new TestException()) + .toFlowable(BackpressureStrategy.NONE) + .test() + .assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableGenerateTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableGenerateTest.java index e306b95e6e..6166c8f38d 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableGenerateTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableGenerateTest.java @@ -13,16 +13,19 @@ package io.reactivex.internal.operators.flowable; +import static org.junit.Assert.assertEquals; + import java.util.List; import java.util.concurrent.Callable; -import org.junit.*; +import org.junit.Test; import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subscribers.TestSubscriber; public class FlowableGenerateTest { @@ -126,4 +129,111 @@ public void accept(Object s, Emitter e) throws Exception { } }, Functions.emptyConsumer())); } + + @Test + public void nullError() { + final int[] call = { 0 }; + Flowable.generate(Functions.justCallable(1), + new BiConsumer>() { + @Override + public void accept(Integer s, Emitter e) throws Exception { + try { + e.onError(null); + } catch (NullPointerException ex) { + call[0]++; + } + } + }, Functions.emptyConsumer()) + .test() + .assertFailure(NullPointerException.class); + + assertEquals(0, call[0]); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(Flowable.generate(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new BiConsumer>() { + @Override + public void accept(Object s, Emitter e) throws Exception { + e.onComplete(); + } + }, Functions.emptyConsumer())); + } + + @Test + public void rebatchAndTake() { + Flowable.generate(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new BiConsumer>() { + @Override + public void accept(Object s, Emitter e) throws Exception { + e.onNext(1); + } + }, Functions.emptyConsumer()) + .rebatchRequests(1) + .take(5) + .test() + .assertResult(1, 1, 1, 1, 1); + } + + @Test + public void backpressure() { + Flowable.generate(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new BiConsumer>() { + @Override + public void accept(Object s, Emitter e) throws Exception { + e.onNext(1); + } + }, Functions.emptyConsumer()) + .rebatchRequests(1) + .test(5) + .assertSubscribed() + .assertValues(1, 1, 1, 1, 1) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void requestRace() { + Flowable source = Flowable.generate(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new BiConsumer>() { + @Override + public void accept(Object s, Emitter e) throws Exception { + e.onNext(1); + } + }, Functions.emptyConsumer()); + + for (int i = 0; i < 500; i++) { + final TestSubscriber ts = source.test(0L); + + Runnable r = new Runnable() { + @Override + public void run() { + for (int j = 0; j < 500; j++) { + ts.request(1); + } + } + }; + + TestHelper.race(r, r); + + ts.assertValueCount(1000); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableObserveOnTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableObserveOnTest.java index 2f5f1ec700..d9c814ace8 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableObserveOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableObserveOnTest.java @@ -1349,6 +1349,8 @@ protected void subscribeActual(Subscriber observer) { @SuppressWarnings("unchecked") BaseObserveOnSubscriber oo = (BaseObserveOnSubscriber)observer; + oo.sourceMode = QueueFuseable.SYNC; + oo.requested.lazySet(1); oo.queue = new SimpleQueue() { @Override @@ -1382,7 +1384,159 @@ public void clear() { } } .observeOn(Schedulers.single()) - .test() + .test(0L) + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void conditionalNonFusedPollThrows() { + new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + + @SuppressWarnings("unchecked") + BaseObserveOnSubscriber oo = (BaseObserveOnSubscriber)observer; + + oo.sourceMode = QueueFuseable.SYNC; + oo.requested.lazySet(1); + oo.queue = new SimpleQueue() { + + @Override + public boolean offer(Integer value) { + return false; + } + + @Override + public boolean offer(Integer v1, Integer v2) { + return false; + } + + @Override + public Integer poll() throws Exception { + throw new TestException(); + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public void clear() { + } + }; + + oo.clear(); + + oo.trySchedule(); + } + } + .observeOn(Schedulers.single()) + .filter(Functions.alwaysTrue()) + .test(0L) + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void asycFusedPollThrows() { + new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + + @SuppressWarnings("unchecked") + BaseObserveOnSubscriber oo = (BaseObserveOnSubscriber)observer; + + oo.sourceMode = QueueFuseable.ASYNC; + oo.requested.lazySet(1); + oo.queue = new SimpleQueue() { + + @Override + public boolean offer(Integer value) { + return false; + } + + @Override + public boolean offer(Integer v1, Integer v2) { + return false; + } + + @Override + public Integer poll() throws Exception { + throw new TestException(); + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public void clear() { + } + }; + + oo.clear(); + + oo.trySchedule(); + } + } + .observeOn(Schedulers.single()) + .test(0L) + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void conditionalAsyncFusedPollThrows() { + new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + + @SuppressWarnings("unchecked") + BaseObserveOnSubscriber oo = (BaseObserveOnSubscriber)observer; + + oo.sourceMode = QueueFuseable.ASYNC; + oo.requested.lazySet(1); + oo.queue = new SimpleQueue() { + + @Override + public boolean offer(Integer value) { + return false; + } + + @Override + public boolean offer(Integer v1, Integer v2) { + return false; + } + + @Override + public Integer poll() throws Exception { + throw new TestException(); + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public void clear() { + } + }; + + oo.clear(); + + oo.trySchedule(); + } + } + .observeOn(Schedulers.single()) + .filter(Functions.alwaysTrue()) + .test(0L) .awaitDone(5, TimeUnit.SECONDS) .assertFailure(TestException.class); } @@ -1394,4 +1548,234 @@ public void trampolineScheduler() { .test() .assertResult(1); } + + @Test + public void conditionalNormal() { + Flowable.range(1, 1000).hide() + .observeOn(Schedulers.single()) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .take(250) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(250) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void syncFusedCancelAfterRequest() { + final TestSubscriber ts = new TestSubscriber(2L) { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 2) { + cancel(); + onComplete(); + } + } + }; + + Flowable.range(1, 3) + .observeOn(Schedulers.single()) + .subscribe(ts); + + ts + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2); + } + + @Test + public void syncFusedCancelAfterRequest2() { + final TestSubscriber ts = new TestSubscriber(2L); + + Flowable.range(1, 2) + .observeOn(Schedulers.single()) + .subscribe(ts); + + ts + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2); + } + + @Test + public void syncFusedCancelAfterRequestConditional() { + final TestSubscriber ts = new TestSubscriber(2L) { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 2) { + cancel(); + onComplete(); + } + } + }; + + Flowable.range(1, 3) + .observeOn(Schedulers.single()) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ts + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2); + } + + @Test + public void syncFusedCancelAfterRequestConditional2() { + final TestSubscriber ts = new TestSubscriber(2L); + + Flowable.range(1, 2) + .observeOn(Schedulers.single()) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ts + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2); + } + + @Test + public void nonFusedCancelAfterRequestConditional2() { + final TestSubscriber ts = new TestSubscriber(2L); + + Flowable.range(1, 2).hide() + .observeOn(Schedulers.single()) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ts + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2); + } + + @Test + public void doubleObserveOn() { + Flowable.just(1).hide() + .observeOn(Schedulers.computation()) + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + } + + @Test + public void doubleObserveOnError() { + Flowable.error(new TestException()) + .observeOn(Schedulers.computation()) + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void doubleObserveOnConditional() { + Flowable.just(1).hide() + .observeOn(Schedulers.computation()) + .distinct() + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + } + + @Test + public void doubleObserveOnErrorConditional() { + Flowable.error(new TestException()) + .observeOn(Schedulers.computation()) + .distinct() + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void request1Conditional() { + Flowable.range(1, 10).hide() + .observeOn(ImmediateThinScheduler.INSTANCE) + .filter(Functions.alwaysTrue()) + .test(1L) + .assertValue(1); + } + + @Test + public void backFusedConditional() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + Flowable.range(1, 100).hide() + .observeOn(ImmediateThinScheduler.INSTANCE) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.ASYNC) + .assertValueCount(100) + .assertComplete() + .assertNoErrors(); + } + + @Test + public void backFusedErrorConditional() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + Flowable.error(new TestException()) + .observeOn(ImmediateThinScheduler.INSTANCE) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.ASYNC) + .assertFailure(TestException.class); + } + + @Test + public void backFusedCancelConditional() { + for (int i = 0; i < 500; i++) { + final TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + final TestScheduler scheduler = new TestScheduler(); + + Flowable.just(1).hide() + .observeOn(scheduler) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + scheduler.triggerActions(); + } + }; + + TestHelper.race(r1, r2); + + SubscriberFusion.assertFusion(ts, QueueSubscription.ASYNC); + + if (ts.valueCount() != 0) { + ts.assertResult(1); + } + } + } + + @Test + public void syncFusedRequestOneByOneConditional() { + Flowable.range(1, 5) + .observeOn(ImmediateThinScheduler.INSTANCE) + .filter(Functions.alwaysTrue()) + .rebatchRequests(1) + .test() + .assertResult(1, 2, 3, 4, 5); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRangeLongTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRangeLongTest.java index 3a423246bd..25835b0676 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRangeLongTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRangeLongTest.java @@ -378,7 +378,6 @@ public void conditionalSlowPathRebatch() { @Test public void slowPathRebatch() { Flowable.rangeLong(1L, 5L) - .filter(Functions.alwaysTrue()) .rebatchRequests(1) .test() .assertResult(1L, 2L, 3L, 4L, 5L); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java index c5d98e0b42..48c95dadd1 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java @@ -389,4 +389,13 @@ public Flowable apply(Integer v) throws Exception { assertFalse("Start has observers!", start.hasSubscribers()); assertFalse("End has observers!", end.hasSubscribers()); } + + @Test + public void mainError() { + Flowable.error(new TestException()) + .window(Flowable.never(), Functions.justFunction(Flowable.just(1))) + .flatMap(Functions.>identity()) + .test() + .assertFailure(TestException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java index 5d39b0c6ed..4699de03dc 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.util.*; @@ -26,7 +27,7 @@ import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.observers.TestObserver; import io.reactivex.schedulers.Schedulers; @@ -713,4 +714,46 @@ public void run() { TestHelper.race(r1, r2); } } + + @Test + public void fusedInnerThrows() { + Observable.just(1).hide() + .flatMap(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + return Observable.range(1, 2).map(new Function() { + @Override + public Object apply(Integer w) throws Exception { + throw new TestException(); + } + }); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void fusedInnerThrows2() { + TestObserver to = Observable.range(1, 2).hide() + .flatMap(new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + return Observable.range(1, 2).map(new Function() { + @Override + public Integer apply(Integer w) throws Exception { + throw new TestException(); + } + }); + } + }, true) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.errorList(to); + + TestHelper.assertError(errors, 0, TestException.class); + + TestHelper.assertError(errors, 1, TestException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableGenerateTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableGenerateTest.java index a76b0b7b8e..7235eb3f13 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableGenerateTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableGenerateTest.java @@ -16,6 +16,7 @@ import java.util.List; import java.util.concurrent.Callable; +import static org.junit.Assert.*; import org.junit.Test; import io.reactivex.*; @@ -126,4 +127,24 @@ public void accept(Object s, Emitter e) throws Exception { } }, Functions.emptyConsumer())); } + + @Test + public void nullError() { + final int[] call = { 0 }; + Observable.generate(Functions.justCallable(1), + new BiConsumer>() { + @Override + public void accept(Integer s, Emitter e) throws Exception { + try { + e.onError(null); + } catch (NullPointerException ex) { + call[0]++; + } + } + }, Functions.emptyConsumer()) + .test() + .assertFailure(NullPointerException.class); + + assertEquals(0, call[0]); + } } diff --git a/src/test/java/io/reactivex/observable/ObservableTest.java b/src/test/java/io/reactivex/observable/ObservableTest.java index 5fe89ad640..b92659d649 100644 --- a/src/test/java/io/reactivex/observable/ObservableTest.java +++ b/src/test/java/io/reactivex/observable/ObservableTest.java @@ -282,12 +282,9 @@ public Integer apply(Integer t1, Integer t2) { return t1 + t2; } }) - .subscribe(new Consumer() { - @Override - public void accept(Integer t1) { - // do nothing ... we expect an exception instead - } - }); + .toObservable() + .test() + .assertResult(); } /** From b44ac0f99b4c01c9311fecbbc793198d16c2532b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Thu, 20 Oct 2016 22:19:26 +0200 Subject: [PATCH 2/2] Fix Generate not saving the state --- .../reactivex/internal/operators/flowable/FlowableGenerate.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java index 84c82125d5..0520670a93 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java @@ -120,6 +120,7 @@ public void request(long n) { n = get(); if (e == n) { + state = s; n = addAndGet(-e); if (n == 0L) { break;