Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 82 additions & 20 deletions src/main/java/io/reactivex/processors/UnicastProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.reactivex.annotations.CheckReturnValue;
import java.util.concurrent.atomic.*;

import io.reactivex.annotations.Experimental;
import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

Expand Down Expand Up @@ -49,7 +50,10 @@ public final class UnicastProcessor<T> extends FlowableProcessor<T> {

final AtomicReference<Runnable> onTerminate;

final boolean delayError;

volatile boolean done;

Throwable error;

final AtomicReference<Subscriber<? super T>> actual;
Expand Down Expand Up @@ -85,6 +89,19 @@ public static <T> UnicastProcessor<T> create(int capacityHint) {
return new UnicastProcessor<T>(capacityHint);
}

/**
* Creates an UnicastProcessor with default internal buffer capacity hint and delay error flag.
* @param <T> the value type
* @param delayError deliver pending onNext events before onError
* @return an UnicastProcessor instance
* @since 2.0.8 - experimental
*/
@CheckReturnValue
@Experimental
public static <T> UnicastProcessor<T> create(boolean delayError) {
return new UnicastProcessor<T>(bufferSize(), null, delayError);
}

/**
* Creates an UnicastProcessor with the given internal buffer capacity hint and a callback for
* the case when the single Subscriber cancels its subscription.
Expand All @@ -99,21 +116,38 @@ public static <T> UnicastProcessor<T> create(int capacityHint) {
*/
@CheckReturnValue
public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancelled) {
ObjectHelper.requireNonNull(onCancelled, "onTerminate");
return new UnicastProcessor<T>(capacityHint, onCancelled);
}

/**
* Creates an UnicastProcessor with the given internal buffer capacity hint, delay error flag and a callback for
* the case when the single Subscriber cancels its subscription.
*
* <p>The callback, if not null, is called exactly once and
* non-overlapped with any active replay.
*
* @param <T> the value type
* @param capacityHint the hint to size the internal unbounded buffer
* @param onCancelled the non null callback
* @param delayError deliver pending onNext events before onError
* @return an UnicastProcessor instance
* @since 2.0.8 - experimental
*/
@CheckReturnValue
@Experimental
public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancelled, boolean delayError) {
ObjectHelper.requireNonNull(onCancelled, "onTerminate");
return new UnicastProcessor<T>(capacityHint, onCancelled, delayError);
}

/**
* Creates an UnicastProcessor with the given capacity hint.
* @param capacityHint the capacity hint for the internal, unbounded queue
* @since 2.0
*/
UnicastProcessor(int capacityHint) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this one really necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you elaborate a bit?

this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint"));
this.onTerminate = new AtomicReference<Runnable>();
this.actual = new AtomicReference<Subscriber<? super T>>();
this.once = new AtomicBoolean();
this.wip = new UnicastQueueSubscription();
this.requested = new AtomicLong();
this(capacityHint,null, true);
}

/**
Expand All @@ -124,8 +158,21 @@ public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancel
* @since 2.0
*/
UnicastProcessor(int capacityHint, Runnable onTerminate) {
this(capacityHint, onTerminate, true);
}

/**
* Creates an UnicastProcessor with the given capacity hint and callback
* for when the Processor is terminated normally or its single Subscriber cancels.
* @param capacityHint the capacity hint for the internal, unbounded queue
* @param onTerminate the callback to run when the Processor is terminated or cancelled, null not allowed
* @param delayError deliver pending onNext events before onError
* @since 2.0.8 - experimental
*/
UnicastProcessor(int capacityHint, Runnable onTerminate, boolean delayError) {
this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint"));
this.onTerminate = new AtomicReference<Runnable>(ObjectHelper.requireNonNull(onTerminate, "onTerminate"));
this.onTerminate = new AtomicReference<Runnable>(onTerminate);
this.delayError = delayError;
this.actual = new AtomicReference<Subscriber<? super T>>();
this.once = new AtomicBoolean();
this.wip = new UnicastQueueSubscription();
Expand All @@ -143,7 +190,7 @@ void drainRegular(Subscriber<? super T> a) {
int missed = 1;

final SpscLinkedArrayQueue<T> q = queue;

final boolean failFast = !delayError;
for (;;) {

long r = requested.get();
Expand All @@ -155,7 +202,7 @@ void drainRegular(Subscriber<? super T> a) {
T t = q.poll();
boolean empty = t == null;

if (checkTerminated(d, empty, a, q)) {
if (checkTerminated(failFast, d, empty, a, q)) {
return;
}

Expand All @@ -168,7 +215,7 @@ void drainRegular(Subscriber<? super T> a) {
e++;
}

if (r == e && checkTerminated(done, q.isEmpty(), a, q)) {
if (r == e && checkTerminated(failFast, done, q.isEmpty(), a, q)) {
return;
}

Expand All @@ -187,7 +234,7 @@ void drainFused(Subscriber<? super T> a) {
int missed = 1;

final SpscLinkedArrayQueue<T> q = queue;

final boolean failFast = !delayError;
for (;;) {

if (cancelled) {
Expand All @@ -198,6 +245,12 @@ void drainFused(Subscriber<? super T> a) {

boolean d = done;

if (failFast && d && error != null) {
q.clear();
actual.lazySet(null);
a.onError(error);
return;
}
a.onNext(null);

if (d) {
Expand Down Expand Up @@ -246,21 +299,30 @@ void drain() {
}
}

boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, SpscLinkedArrayQueue<T> q) {
boolean checkTerminated(boolean failFast, boolean d, boolean empty, Subscriber<? super T> a, SpscLinkedArrayQueue<T> q) {
if (cancelled) {
q.clear();
actual.lazySet(null);
return true;
}
if (d && empty) {
Throwable e = error;
actual.lazySet(null);
if (e != null) {
a.onError(e);
} else {
a.onComplete();

if (d) {
if (failFast && error != null) {
q.clear();
actual.lazySet(null);
a.onError(error);
return true;
}
if (empty) {
Throwable e = error;
actual.lazySet(null);
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
return true;
}
return true;
}

return false;
Expand Down
48 changes: 48 additions & 0 deletions src/test/java/io/reactivex/processors/UnicastProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,54 @@ public void fusionOfflie() {
.assertResult(1);
}

@Test
public void failFast() {
UnicastProcessor<Integer> ap = UnicastProcessor.create(false);
ap.onNext(1);
ap.onError(new RuntimeException());

TestSubscriber<Integer> ts = TestSubscriber.create();

ap.subscribe(ts);

ts
.assertValueCount(0)
.assertError(RuntimeException.class);
}

@Test
public void failFastFusionOffline() {
UnicastProcessor<Integer> ap = UnicastProcessor.create(false);
ap.onNext(1);
ap.onError(new RuntimeException());

TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueSubscription.ANY);

ap.subscribe(ts);
ts
.assertValueCount(0)
.assertError(RuntimeException.class);
}

@Test
public void threeArgsFactory() {
Runnable noop = new Runnable() {
@Override
public void run() {
}
};
UnicastProcessor<Integer> ap = UnicastProcessor.create(16, noop,false);
ap.onNext(1);
ap.onError(new RuntimeException());

TestSubscriber<Integer> ts = TestSubscriber.create();

ap.subscribe(ts);
ts
.assertValueCount(0)
.assertError(RuntimeException.class);
}

@Test
public void onTerminateCalledWhenOnError() {
final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
Expand Down