Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 52 additions & 10 deletions src/main/java/rx/subjects/UnicastSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class UnicastSubject<T> extends Subject<T, T> {
public static <T> UnicastSubject<T> create() {
return create(16);
}

/**
* Constructs an empty UnicastSubject instance with a capacity hint.
* <p>The capacity hint determines the internal queue's island size: the larger
Expand All @@ -59,7 +60,18 @@ public static <T> UnicastSubject<T> create() {
* @return the created BufferUntilSubscriber instance
*/
public static <T> UnicastSubject<T> create(int capacityHint) {
State<T> state = new State<T>(capacityHint, null);
State<T> state = new State<T>(capacityHint, false, null);
return new UnicastSubject<T>(state);
}

/**
* Constructs an empty UnicastSubject instance with the default capacity hint of 16 elements.
*
* @param delayError deliver pending next events before error.
* @return the created UnicastSubject instance
*/
public static <T> UnicastSubject<T> create(boolean delayError) {
State<T> state = new State<T>(16, delayError, null);
return new UnicastSubject<T>(state);
}

Expand All @@ -78,7 +90,28 @@ public static <T> UnicastSubject<T> create(int capacityHint) {
* @return the created BufferUntilSubscriber instance
*/
public static <T> UnicastSubject<T> create(int capacityHint, Action0 onTerminated) {
State<T> state = new State<T>(capacityHint, onTerminated);
State<T> state = new State<T>(capacityHint, false, onTerminated);
return new UnicastSubject<T>(state);
}

/**
* Constructs an empty UnicastSubject instance with a capacity hint, delay error
* flag and Action0 instance to call if the subject reaches its terminal state
* or the single Subscriber unsubscribes mid-sequence.
* <p>The capacity hint determines the internal queue's island size: the larger
* it is the less frequent allocation will happen if there is no subscriber
* or the subscriber hasn't caught up.
* @param <T> the input and output value type
* @param capacityHint the capacity hint for the internal queue
* @param onTerminated the optional callback to call when subject reaches its terminal state
* or the single Subscriber unsubscribes mid-sequence. It will be called
* at most once.
* @param delayError flag indicating whether to deliver pending next events before error.
* @return the created BufferUntilSubscriber instance
*/
public static <T> UnicastSubject<T> create(int capacityHint,
Action0 onTerminated, boolean delayError) {
State<T> state = new State<T>(capacityHint, delayError, onTerminated);
return new UnicastSubject<T>(state);
}

Expand Down Expand Up @@ -119,6 +152,8 @@ static final class State<T> extends AtomicLong implements Producer, Observer<T>,
final AtomicReference<Subscriber<? super T>> subscriber;
/** The queue holding values until the subscriber arrives and catches up. */
final Queue<Object> queue;
/** Deliver pending next events before error. */
final boolean delayError;
/** Atomically set to true on terminal condition. */
final AtomicReference<Action0> terminateOnce;
/** In case the source emitted an error. */
Expand All @@ -137,10 +172,12 @@ static final class State<T> extends AtomicLong implements Producer, Observer<T>,
* reduce allocation frequency
* @param onTerminated the action to call when the subject reaches its terminal state or
* the single subscriber unsubscribes.
* @param delayError deliver pending next events before error.
*/
public State(int capacityHint, Action0 onTerminated) {
public State(int capacityHint, boolean delayError, Action0 onTerminated) {
this.subscriber = new AtomicReference<Subscriber<? super T>>();
this.terminateOnce = onTerminated != null ? new AtomicReference<Action0>(onTerminated) : null;
this.delayError = delayError;

Queue<Object> q;
if (capacityHint > 1) {
Expand Down Expand Up @@ -266,14 +303,14 @@ void replay() {
emitting = true;
}
Queue<Object> q = queue;
boolean delayError = this.delayError;
for (;;) {
Subscriber<? super T> s = subscriber.get();
boolean unlimited = false;
if (s != null) {
boolean d = done;
boolean empty = q.isEmpty();

if (checkTerminated(d, empty, s)) {
if (checkTerminated(d, empty, delayError, s)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should have a local delayError field for this, otherwise you are re-reading the flag on each emission which can cause unnecessary false sharing.

Queue<Object> q = queue;
boolean delayError = this.delayError;
// ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return;
}
long r = get();
Expand All @@ -284,7 +321,7 @@ void replay() {
d = done;
Object v = q.poll();
empty = v == null;
if (checkTerminated(d, empty, s)) {
if (checkTerminated(d, empty, delayError, s)) {
return;
}
if (empty) {
Expand Down Expand Up @@ -348,23 +385,28 @@ public boolean isUnsubscribed() {
* an error happened or the source terminated and the queue is empty
* @param done indicates the source has called onCompleted
* @param empty indicates if there are no more source values in the queue
* @param delayError indicates whether to deliver pending next events before error
* @param s the target Subscriber to emit events to
* @return true if this Subject reached a terminal state and the drain loop should quit
*/
boolean checkTerminated(boolean done, boolean empty, Subscriber<? super T> s) {
boolean checkTerminated(boolean done, boolean empty, boolean delayError, Subscriber<? super T> s) {
if (s.isUnsubscribed()) {
queue.clear();
return true;
}
if (done) {
Throwable e = error;
if (e != null) {
if (e != null && !delayError) {
queue.clear();
s.onError(e);
return true;
} else
}
if (empty) {
s.onCompleted();
if (e != null) {
s.onError(e);
} else {
s.onCompleted();
}
return true;
}
}
Expand Down
101 changes: 101 additions & 0 deletions src/test/java/rx/subjects/UnicastSubjectTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package rx.subjects;

import org.junit.Test;
import rx.functions.Action0;
import rx.observers.TestSubscriber;

public class UnicastSubjectTest {

@Test
public void testOneArgFactoryDelayError() throws Exception {
TestSubscriber<Long> subscriber = TestSubscriber.<Long>create();
UnicastSubject<Long> s = UnicastSubject.create(true);
s.onNext(1L);
s.onNext(2L);
s.onError(new RuntimeException());
s.subscribe(subscriber);
subscriber.assertValueCount(2);
subscriber.assertError(RuntimeException.class);
}

@Test
public void testOneArgFactoryNoDelayError() throws Exception {
TestSubscriber<Long> subscriber = TestSubscriber.<Long>create();
UnicastSubject<Long> s = UnicastSubject.create(false);
s.onNext(1L);
s.onNext(2L);
s.onError(new RuntimeException());
s.subscribe(subscriber);
subscriber.assertValueCount(0);
subscriber.assertError(RuntimeException.class);
}

@Test
public void testThreeArgsFactoryDelayError() throws Exception {
TestSubscriber<Long> subscriber = TestSubscriber.<Long>create();
UnicastSubject<Long> s = UnicastSubject.create(16, new NoopAction0(), true);
s.onNext(1L);
s.onNext(2L);
s.onError(new RuntimeException());
s.subscribe(subscriber);
subscriber.assertValueCount(2);
subscriber.assertError(RuntimeException.class);
}

@Test
public void testThreeArgsFactoryNoDelayError() throws Exception {
TestSubscriber<Long> subscriber = TestSubscriber.<Long>create();
UnicastSubject<Long> s = UnicastSubject.create(16, new NoopAction0(), false);
s.onNext(1L);
s.onNext(2L);
s.onError(new RuntimeException());
s.subscribe(subscriber);
subscriber.assertValueCount(0);
subscriber.assertError(RuntimeException.class);
}

@Test
public void testZeroArgsFactory() throws Exception {
TestSubscriber<Long> subscriber = TestSubscriber.<Long>create();
UnicastSubject<Long> s = UnicastSubject.create();
s.onNext(1L);
s.onNext(2L);
s.onError(new RuntimeException());
s.subscribe(subscriber);
subscriber.assertValueCount(0);
subscriber.assertError(RuntimeException.class);
}

@Test
public void testOneArgFactory() throws Exception {
TestSubscriber<Long> subscriber = TestSubscriber.<Long>create();
UnicastSubject<Long> s = UnicastSubject.create(16);
s.onNext(1L);
s.onNext(2L);
s.onError(new RuntimeException());
s.subscribe(subscriber);
subscriber.assertValueCount(0);
subscriber.assertError(RuntimeException.class);
}

@Test
public void testTwoArgsFactory() throws Exception {
TestSubscriber<Long> subscriber = TestSubscriber.<Long>create();
UnicastSubject<Long> s = UnicastSubject.create(16, new NoopAction0());
s.onNext(1L);
s.onNext(2L);
s.onError(new RuntimeException());
s.subscribe(subscriber);
subscriber.assertValueCount(0);
subscriber.assertError(RuntimeException.class);
}



private static final class NoopAction0 implements Action0 {

@Override
public void call() {
}
}
}