Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions rxjava-core/src/main/java/rx/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
import rx.util.functions.Action1;
import rx.subjects.SubjectSubscriptionManager.*;
import rx.util.functions.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

/**
* Subject that retains all events and will replay them to an {@link Observer} that subscribes.
* Subject that retains events (unlimited or with given replay capacity) and will replay them to an {@link Observer} that subscribes.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/S.ReplaySubject.png">
* <p>
Expand All @@ -52,13 +56,24 @@
* @param <T>
*/
public final class ReplaySubject<T> extends Subject<T, T> {
private static final Integer ReplaySubjectUnlimitedCapacity = Integer.MAX_VALUE;

/**
* @param <T>
* @return a new replay subject with the unlimited capacity.
*/
public static <T> ReplaySubject<T> create() {
return create(16);
return create(ReplaySubjectUnlimitedCapacity);
}

public static <T> ReplaySubject<T> create(int initialCapacity) {
/**
* @param capacity Maximum element count of the replay buffer
* @param <T>
* @return a new replay subject with the given capacity.
*/
public static <T> ReplaySubject<T> create(int capacity) {
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
final ReplayState<T> state = new ReplayState<T>(initialCapacity);
final ReplayState<T> state = new ReplayState<T>(capacity);

OnSubscribe<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
/**
Expand All @@ -71,11 +86,12 @@ public static <T> ReplaySubject<T> create(int initialCapacity) {
new Action1<SubjectObserver<? super T>>() {

@Override
public void call(SubjectObserver<? super T> o) {
public void call(SubjectSubscriptionManager.SubjectObserver<? super T> o) {
// replay history for this observer using the subscribing thread
int lastIndex = replayObserverFromIndex(state.history, 0, o);

// now that it is caught up add to observers
o.caughtUp = true;
state.replayState.put(o, lastIndex);
}
},
Expand All @@ -99,9 +115,8 @@ private static class ReplayState<T> {
final History<T> history;
// each Observer is tracked here for what events they have received
final ConcurrentHashMap<Observer<? super T>, Integer> replayState;

public ReplayState(int initialCapacity) {
history = new History<T>(initialCapacity);
public ReplayState(int capacity) {
history = new History<T>(capacity);
replayState = new ConcurrentHashMap<Observer<? super T>, Integer>();
}
}
Expand Down Expand Up @@ -216,23 +231,34 @@ private static class History<T> {
private final AtomicInteger index;
private final ArrayList<T> list;
private final AtomicReference<Notification<T>> terminalValue;

public History(int initialCapacity) {
private final int capacity;
public History(int capacity) {
this.capacity = capacity;
index = new AtomicInteger(0);
list = new ArrayList<T>(initialCapacity);
list = this.capacity == ReplaySubjectUnlimitedCapacity ? new ArrayList<T>(16) : new ArrayList<T>(capacity);
terminalValue = new AtomicReference<Notification<T>>();
}

public boolean next(T n) {
if (terminalValue.get() == null) {
list.add(n);
index.getAndIncrement();
trim();
return true;
} else {
return false;
}
}

private void trim() {
if (this.capacity != ReplaySubjectUnlimitedCapacity && list.size() > this.capacity ) {
while(list.size() > this.capacity) {
list.remove(0);
}
}

index.set(list.size());
}

public void complete(Notification<T> n) {
terminalValue.set(n);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,26 @@
*/
package rx.observables;

import static org.junit.Assert.*;

import java.util.Iterator;
import java.util.NoSuchElementException;

import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subjects.BehaviorSubject;
import rx.subjects.ReplaySubject;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

import java.util.Iterator;
import java.util.NoSuchElementException;

public class BlockingObservableTest {

@Mock
Expand Down Expand Up @@ -382,6 +383,26 @@ public Boolean call(String args) {
assertEquals("default", first);
}

@Test
public void testBehaviorSubject() {
BehaviorSubject<String> subject = BehaviorSubject.create("default");
subject.onNext("one");
BlockingObservable<String> observable = subject.toBlockingObservable();
String last = observable.first();
assertEquals("one", last);
}

@Test
public void testReplaySubjectWithCapacity() {
ReplaySubject<String> subject = ReplaySubject.create(1);
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");

String last = subject.asObservable().toBlockingObservable().first();
assertEquals("three", last);
}

private static class TestException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
Expand Down
92 changes: 71 additions & 21 deletions rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2014 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -15,22 +15,20 @@
*/
package rx.subjects;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.mockito.InOrder;
import static org.mockito.Matchers.any;
import org.mockito.Mockito;

import static org.mockito.Mockito.*;
import rx.Observer;
import rx.Subscription;
import rx.observers.TestObserver;
import rx.schedulers.Schedulers;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

public class ReplaySubjectTest {

private final Throwable testException = new Throwable();
Expand Down Expand Up @@ -63,14 +61,10 @@ public void testCompleted() {
@Test
public void testCompletedStopsEmittingData() {
ReplaySubject<Integer> channel = ReplaySubject.create();
@SuppressWarnings("unchecked")
Observer<Object> observerA = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<Object> observerB = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<Object> observerC = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<Object> observerD = mock(Observer.class);
@SuppressWarnings("unchecked") Observer<Object> observerA = mock(Observer.class);
@SuppressWarnings("unchecked") Observer<Object> observerB = mock(Observer.class);
@SuppressWarnings("unchecked") Observer<Object> observerC = mock(Observer.class);
@SuppressWarnings("unchecked") Observer<Object> observerD = mock(Observer.class);

Subscription a = channel.toObservable().subscribe(new TestObserver<Object>(observerA));
Subscription b = channel.toObservable().subscribe(new TestObserver<Object>(observerB));
Expand Down Expand Up @@ -132,8 +126,7 @@ public void testCompletedStopsEmittingData() {
public void testCompletedAfterError() {
ReplaySubject<String> subject = ReplaySubject.create();

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
@SuppressWarnings("unchecked") Observer<String> observer = mock(Observer.class);

subject.onNext("one");
subject.onError(testException);
Expand All @@ -147,6 +140,63 @@ public void testCompletedAfterError() {
verifyNoMoreInteractions(observer);
}

@Test
public void testCapacity() {
{
ReplaySubject<String> subject = ReplaySubject.create(1);
@SuppressWarnings("unchecked") Observer<String> aObserver = mock(Observer.class);
subject.toObservable().subscribe(new TestObserver<String>(aObserver));

subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onCompleted();

assertCompletedObserver(aObserver);

Observer<String> anotherObserver = mock(Observer.class);
subject.toObservable().subscribe(new TestObserver<String>(anotherObserver));//.toObservable().subscribe(anotherObserver);

verify(anotherObserver, times(0)).onNext("one");
verify(anotherObserver, times(0)).onNext("two");
verify(anotherObserver, times(1)).onNext("three");
verify(anotherObserver, times(1)).onCompleted();
verifyNoMoreInteractions(anotherObserver);
}
{
ReplaySubject<String> subject = ReplaySubject.create(1);
@SuppressWarnings("unchecked") Observer<String> aObserver = mock(Observer.class);
subject.onNext("one");

subject.toObservable().distinctUntilChanged().subscribe(new TestObserver<String>(aObserver));

subject.onNext("two");
subject.onNext("one");
subject.onNext("one");
verify(aObserver, times(2)).onNext("one");
verify(aObserver, times(1)).onNext("two");
}
}

@Test
public void testThatObserverReceivesLatestAndThenSubsequentEvents() {
ReplaySubject<String> subject = ReplaySubject.create(1);

subject.onNext("one");

@SuppressWarnings("unchecked") Observer<String> aObserver = mock(Observer.class);
subject.toObservable().subscribe(new TestObserver<String>(aObserver));
subject.onNext("two");
subject.onNext("three");

verify(aObserver, Mockito.never()).onNext("default");
verify(aObserver, times(1)).onNext("one");
verify(aObserver, times(1)).onNext("two");
verify(aObserver, times(1)).onNext("three");
verify(aObserver, Mockito.never()).onError(testException);
verify(aObserver, Mockito.never()).onCompleted();
}

private void assertCompletedObserver(Observer<String> observer) {
InOrder inOrder = inOrder(observer);

Expand Down