From fb9d28c929c2bf59a274e04dc3797d36094ba798 Mon Sep 17 00:00:00 2001 From: "m.ostroverkhov" Date: Sat, 18 Mar 2017 22:09:36 +0200 Subject: [PATCH 1/4] [2.x] Unicast subject fail fast support --- .../io/reactivex/subjects/UnicastSubject.java | 143 ++++++++++++++---- .../subjects/UnicastSubjectTest.java | 26 ++++ 2 files changed, 142 insertions(+), 27 deletions(-) diff --git a/src/main/java/io/reactivex/subjects/UnicastSubject.java b/src/main/java/io/reactivex/subjects/UnicastSubject.java index 4a1b8815e9..96800713a7 100644 --- a/src/main/java/io/reactivex/subjects/UnicastSubject.java +++ b/src/main/java/io/reactivex/subjects/UnicastSubject.java @@ -53,6 +53,9 @@ public final class UnicastSubject extends Subject { /** The optional callback when the Subject gets cancelled or terminates. */ final AtomicReference onTerminate; + /** deliver onNext events before error event */ + final boolean delayError; + /** Indicates the single observer has cancelled. */ volatile boolean disposed; @@ -102,37 +105,96 @@ public static UnicastSubject create(int capacityHint) { * * @param the value type * @param capacityHint the hint to size the internal unbounded buffer - * @param onCancelled the non null callback + * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed * @return an UnicastSubject instance */ @CheckReturnValue - public static UnicastSubject create(int capacityHint, Runnable onCancelled) { - return new UnicastSubject(capacityHint, onCancelled); + public static UnicastSubject create(int capacityHint, Runnable onTerminate) { + return new UnicastSubject(capacityHint, onTerminate, true); } /** - * Creates an UnicastSubject with the given capacity hint. + * Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and + * a callback for the case when the single Subscriber cancels its subscription. + * + *

The callback, if not null, is called exactly once and + * non-overlapped with any active replay. + * + * @param the value type + * @param capacityHint the hint to size the internal unbounded buffer + * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed + * @param delayError deliver pending onNext events before onError + * @return an UnicastSubject instance + */ + @CheckReturnValue + public static UnicastSubject create(int capacityHint, Runnable onTerminate, boolean delayError) { + return new UnicastSubject(capacityHint, onTerminate, delayError); + } + + /** + * Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag + * + *

The callback, if not null, is called exactly once and + * non-overlapped with any active replay. + * + * @param the value type + * @param delayError deliver pending onNext events before onError + * @return an UnicastSubject instance + */ + @CheckReturnValue + public static UnicastSubject create(boolean delayError) { + return new UnicastSubject(bufferSize(), delayError); + } + + + /** + * Creates an UnicastSubject with the given capacity hint and delay error flag. * @param capacityHint the capacity hint for the internal, unbounded queue + * @param delayError deliver pending onNext events before onError * @since 2.0 */ - UnicastSubject(int capacityHint) { + UnicastSubject(int capacityHint, boolean delayError) { this.queue = new SpscLinkedArrayQueue(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); this.onTerminate = new AtomicReference(); + this.delayError = delayError; this.actual = new AtomicReference>(); this.once = new AtomicBoolean(); this.wip = new UnicastQueueDisposable(); } /** - * Creates an UnicastProcessor with the given capacity hint and callback - * for when the Processor is terminated normally or its single Subscriber cancels. + * Creates an UnicastSubject with the given capacity hint and callback + * for when the Subject is terminated normally or its single Subscriber cancels. * @param capacityHint the capacity hint for the internal, unbounded queue - * @param onTerminate the callback to run when the Processor is terminated or cancelled, null not allowed + * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed * @since 2.0 - */ + * + * */ UnicastSubject(int capacityHint, Runnable onTerminate) { + this(capacityHint, onTerminate, true); + } + + /** + * Creates an UnicastSubject with the given capacity hint. + * @param capacityHint the capacity hint for the internal, unbounded queue + * @since 2.0 + */ + UnicastSubject(int capacityHint) { + this(capacityHint, true); + } + + /** + * Creates an UnicastSubject with the given capacity hint, delay error flag and callback + * for when the Subject is terminated normally or its single Subscriber cancels. + * @param capacityHint the capacity hint for the internal, unbounded queue + * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed + * @param delayError deliver pending onNext events before onError + * @since 2.0 + */ + UnicastSubject(int capacityHint, Runnable onTerminate, boolean delayError) { this.queue = new SpscLinkedArrayQueue(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); this.onTerminate = new AtomicReference(ObjectHelper.requireNonNull(onTerminate, "onTerminate")); + this.delayError = delayError; this.actual = new AtomicReference>(); this.once = new AtomicBoolean(); this.wip = new UnicastQueueDisposable(); @@ -212,6 +274,8 @@ public void onComplete() { void drainNormal(Observer a) { int missed = 1; SimpleQueue q = queue; + boolean failFast = !this.delayError; + boolean canBeError = true; for (;;) { for (;;) { @@ -221,19 +285,23 @@ void drainNormal(Observer a) { return; } - boolean d = done; + boolean d = this.done; T v = queue.poll(); boolean empty = v == null; - if (d && empty) { - actual.lazySet(null); - Throwable ex = error; - if (ex != null) { - a.onError(ex); - } else { - a.onComplete(); + if (d) { + if (failFast && canBeError) { + if (failedFast(q, a)) { + return; + } else { + canBeError = false; + } + } + + if (empty) { + errorOrComplete(a); + return; } - return; } if (empty) { @@ -254,6 +322,7 @@ void drainFused(Observer a) { int missed = 1; final SpscLinkedArrayQueue q = queue; + final boolean failFast = !delayError; for (;;) { @@ -262,20 +331,18 @@ void drainFused(Observer a) { q.clear(); return; } - boolean d = done; + if (failFast && d) { + if (failedFast(q, a)) { + return; + } + } + a.onNext(null); if (d) { - actual.lazySet(null); - - Throwable ex = error; - if (ex != null) { - a.onError(ex); - } else { - a.onComplete(); - } + errorOrComplete(a); return; } @@ -286,6 +353,28 @@ void drainFused(Observer a) { } } + void errorOrComplete(Observer a) { + actual.lazySet(null); + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } else { + a.onComplete(); + } + } + + boolean failedFast(final SimpleQueue q, Observer a) { + Throwable ex = error; + if (ex != null) { + actual.lazySet(null); + q.clear(); + a.onError(ex); + return true; + } else { + return false; + } + } + void drain() { if (wip.getAndIncrement() != 0) { return; diff --git a/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java b/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java index 7cf8f8c374..d11a89abb7 100644 --- a/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java +++ b/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java @@ -69,6 +69,32 @@ public void fusionOfflie() { .assertResult(1); } + @Test + public void failFast() { + UnicastSubject ap = UnicastSubject.create(false); + ap.onNext(1); + ap.onError(new RuntimeException()); + TestObserver ts = TestObserver.create(); + ap.subscribe(ts); + + ts + .assertValueCount(0) + .assertError(RuntimeException.class); + } + + @Test + public void fusionOfflineFailFast() { + UnicastSubject ap = UnicastSubject.create(false); + ap.onNext(1); + ap.onError(new RuntimeException()); + TestObserver ts = ObserverFusion.newTest(QueueDisposable.ANY); + ap.subscribe(ts); + + ts + .assertValueCount(0) + .assertError(RuntimeException.class); + } + @Test public void onTerminateCalledWhenOnError() { final AtomicBoolean didRunOnTerminate = new AtomicBoolean(); From ae5c8f6c86dbe5d2ba35ba5fec3689e61935ee66 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Thu, 23 Mar 2017 17:35:50 +0200 Subject: [PATCH 2/4] follow up: mark new factory methods experimental, remove excessive constructor, fix typos --- .../io/reactivex/subjects/UnicastSubject.java | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/reactivex/subjects/UnicastSubject.java b/src/main/java/io/reactivex/subjects/UnicastSubject.java index 96800713a7..88be51495b 100644 --- a/src/main/java/io/reactivex/subjects/UnicastSubject.java +++ b/src/main/java/io/reactivex/subjects/UnicastSubject.java @@ -13,6 +13,7 @@ package io.reactivex.subjects; +import io.reactivex.annotations.Experimental; import io.reactivex.annotations.Nullable; import io.reactivex.plugins.RxJavaPlugins; import java.util.concurrent.atomic.*; @@ -82,7 +83,7 @@ public final class UnicastSubject extends Subject { */ @CheckReturnValue public static UnicastSubject create() { - return new UnicastSubject(bufferSize()); + return new UnicastSubject(bufferSize(), true); } /** @@ -93,7 +94,7 @@ public static UnicastSubject create() { */ @CheckReturnValue public static UnicastSubject create(int capacityHint) { - return new UnicastSubject(capacityHint); + return new UnicastSubject(capacityHint, true); } /** @@ -127,12 +128,13 @@ public static UnicastSubject create(int capacityHint, Runnable onTerminat * @return an UnicastSubject instance */ @CheckReturnValue + @Experimental public static UnicastSubject create(int capacityHint, Runnable onTerminate, boolean delayError) { return new UnicastSubject(capacityHint, onTerminate, delayError); } /** - * Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag + * Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag. * *

The callback, if not null, is called exactly once and * non-overlapped with any active replay. @@ -142,6 +144,7 @@ public static UnicastSubject create(int capacityHint, Runnable onTerminat * @return an UnicastSubject instance */ @CheckReturnValue + @Experimental public static UnicastSubject create(boolean delayError) { return new UnicastSubject(bufferSize(), delayError); } @@ -174,15 +177,6 @@ public static UnicastSubject create(boolean delayError) { this(capacityHint, onTerminate, true); } - /** - * Creates an UnicastSubject with the given capacity hint. - * @param capacityHint the capacity hint for the internal, unbounded queue - * @since 2.0 - */ - UnicastSubject(int capacityHint) { - this(capacityHint, true); - } - /** * Creates an UnicastSubject with the given capacity hint, delay error flag and callback * for when the Subject is terminated normally or its single Subscriber cancels. From 6c53e63bd6e25d016863182cd4d993531382e015 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Thu, 23 Mar 2017 18:22:54 +0200 Subject: [PATCH 3/4] follow up: test coverage --- .../subjects/UnicastSubjectTest.java | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java b/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java index d11a89abb7..1788638967 100644 --- a/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java +++ b/src/test/java/io/reactivex/subjects/UnicastSubjectTest.java @@ -27,6 +27,7 @@ import io.reactivex.observers.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; +import static org.mockito.Mockito.mock; public class UnicastSubjectTest { @@ -82,6 +83,34 @@ public void failFast() { .assertError(RuntimeException.class); } + @Test + public void threeArgsFactoryFailFast() { + Runnable noop = mock(Runnable.class); + UnicastSubject ap = UnicastSubject.create(16, noop, false); + ap.onNext(1); + ap.onError(new RuntimeException()); + TestObserver ts = TestObserver.create(); + ap.subscribe(ts); + + ts + .assertValueCount(0) + .assertError(RuntimeException.class); + } + + @Test + public void threeArgsFactoryDelayError() { + Runnable noop = mock(Runnable.class); + UnicastSubject ap = UnicastSubject.create(16, noop, true); + ap.onNext(1); + ap.onError(new RuntimeException()); + TestObserver ts = TestObserver.create(); + ap.subscribe(ts); + + ts + .assertValueCount(1) + .assertError(RuntimeException.class); + } + @Test public void fusionOfflineFailFast() { UnicastSubject ap = UnicastSubject.create(false); @@ -95,6 +124,36 @@ public void fusionOfflineFailFast() { .assertError(RuntimeException.class); } + @Test + public void fusionOfflineFailFastMultipleEvents() { + UnicastSubject ap = UnicastSubject.create(false); + ap.onNext(1); + ap.onNext(2); + ap.onNext(3); + ap.onComplete(); + TestObserver ts = ObserverFusion.newTest(QueueDisposable.ANY); + ap.subscribe(ts); + + ts + .assertValueCount(3) + .assertComplete(); + } + + @Test + public void failFastMultipleEvents() { + UnicastSubject ap = UnicastSubject.create(false); + ap.onNext(1); + ap.onNext(2); + ap.onNext(3); + ap.onComplete(); + TestObserver ts = TestObserver.create(); + ap.subscribe(ts); + + ts + .assertValueCount(3) + .assertComplete(); + } + @Test public void onTerminateCalledWhenOnError() { final AtomicBoolean didRunOnTerminate = new AtomicBoolean(); From efcd9a9c172248192a5b14a068d8b0c994061deb Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Thu, 23 Mar 2017 18:30:49 +0200 Subject: [PATCH 4/4] follow up: add @since for new methods --- src/main/java/io/reactivex/subjects/UnicastSubject.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/reactivex/subjects/UnicastSubject.java b/src/main/java/io/reactivex/subjects/UnicastSubject.java index 88be51495b..3c6f482ca4 100644 --- a/src/main/java/io/reactivex/subjects/UnicastSubject.java +++ b/src/main/java/io/reactivex/subjects/UnicastSubject.java @@ -126,6 +126,7 @@ public static UnicastSubject create(int capacityHint, Runnable onTerminat * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed * @param delayError deliver pending onNext events before onError * @return an UnicastSubject instance + * @since 2.0.8 - experimental */ @CheckReturnValue @Experimental @@ -142,6 +143,7 @@ public static UnicastSubject create(int capacityHint, Runnable onTerminat * @param the value type * @param delayError deliver pending onNext events before onError * @return an UnicastSubject instance + * @since 2.0.8 - experimental */ @CheckReturnValue @Experimental @@ -154,7 +156,7 @@ public static UnicastSubject create(boolean delayError) { * Creates an UnicastSubject with the given capacity hint and delay error flag. * @param capacityHint the capacity hint for the internal, unbounded queue * @param delayError deliver pending onNext events before onError - * @since 2.0 + * @since 2.0.8 - experimental */ UnicastSubject(int capacityHint, boolean delayError) { this.queue = new SpscLinkedArrayQueue(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); @@ -183,7 +185,7 @@ public static UnicastSubject create(boolean delayError) { * @param capacityHint the capacity hint for the internal, unbounded queue * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed * @param delayError deliver pending onNext events before onError - * @since 2.0 + * @since 2.0.8 - experimental */ UnicastSubject(int capacityHint, Runnable onTerminate, boolean delayError) { this.queue = new SpscLinkedArrayQueue(ObjectHelper.verifyPositive(capacityHint, "capacityHint"));