diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java
index c167ef17d1..3fb872b875 100644
--- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java
+++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java
@@ -25,11 +25,15 @@
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
-import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
-import rx.util.functions.Action1;
+import rx.subjects.SubjectSubscriptionManager.*;
+import rx.util.functions.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
/**
- * Subject that retains all events and will replay them to an {@link Observer} that subscribes.
+ * Subject that retains events (unlimited or with given replay capacity) and will replay them to an {@link Observer} that subscribes.
*
*
*
@@ -52,13 +56,24 @@
* @param
*/
public final class ReplaySubject extends Subject {
+ private static final Integer ReplaySubjectUnlimitedCapacity = Integer.MAX_VALUE;
+
+ /**
+ * @param
+ * @return a new replay subject with the unlimited capacity.
+ */
public static ReplaySubject create() {
- return create(16);
+ return create(ReplaySubjectUnlimitedCapacity);
}
- public static ReplaySubject create(int initialCapacity) {
+ /**
+ * @param capacity Maximum element count of the replay buffer
+ * @param
+ * @return a new replay subject with the given capacity.
+ */
+ public static ReplaySubject create(int capacity) {
final SubjectSubscriptionManager subscriptionManager = new SubjectSubscriptionManager();
- final ReplayState state = new ReplayState(initialCapacity);
+ final ReplayState state = new ReplayState(capacity);
OnSubscribe onSubscribe = subscriptionManager.getOnSubscribeFunc(
/**
@@ -71,11 +86,12 @@ public static ReplaySubject create(int initialCapacity) {
new Action1>() {
@Override
- public void call(SubjectObserver super T> o) {
+ public void call(SubjectSubscriptionManager.SubjectObserver super T> o) {
// replay history for this observer using the subscribing thread
int lastIndex = replayObserverFromIndex(state.history, 0, o);
// now that it is caught up add to observers
+ o.caughtUp = true;
state.replayState.put(o, lastIndex);
}
},
@@ -99,9 +115,8 @@ private static class ReplayState {
final History history;
// each Observer is tracked here for what events they have received
final ConcurrentHashMap, Integer> replayState;
-
- public ReplayState(int initialCapacity) {
- history = new History(initialCapacity);
+ public ReplayState(int capacity) {
+ history = new History(capacity);
replayState = new ConcurrentHashMap, Integer>();
}
}
@@ -216,23 +231,34 @@ private static class History {
private final AtomicInteger index;
private final ArrayList list;
private final AtomicReference> terminalValue;
-
- public History(int initialCapacity) {
+ private final int capacity;
+ public History(int capacity) {
+ this.capacity = capacity;
index = new AtomicInteger(0);
- list = new ArrayList(initialCapacity);
+ list = this.capacity == ReplaySubjectUnlimitedCapacity ? new ArrayList(16) : new ArrayList(capacity);
terminalValue = new AtomicReference>();
}
public boolean next(T n) {
if (terminalValue.get() == null) {
list.add(n);
- index.getAndIncrement();
+ trim();
return true;
} else {
return false;
}
}
+ private void trim() {
+ if (this.capacity != ReplaySubjectUnlimitedCapacity && list.size() > this.capacity ) {
+ while(list.size() > this.capacity) {
+ list.remove(0);
+ }
+ }
+
+ index.set(list.size());
+ }
+
public void complete(Notification n) {
terminalValue.set(n);
}
diff --git a/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java b/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java
index 2d50027dad..82634909b1 100644
--- a/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java
+++ b/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java
@@ -15,25 +15,26 @@
*/
package rx.observables;
-import static org.junit.Assert.*;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-
import rx.Observable;
import rx.Observer;
import rx.Subscription;
+import rx.subjects.BehaviorSubject;
+import rx.subjects.ReplaySubject;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
public class BlockingObservableTest {
@Mock
@@ -382,6 +383,26 @@ public Boolean call(String args) {
assertEquals("default", first);
}
+ @Test
+ public void testBehaviorSubject() {
+ BehaviorSubject subject = BehaviorSubject.create("default");
+ subject.onNext("one");
+ BlockingObservable observable = subject.toBlockingObservable();
+ String last = observable.first();
+ assertEquals("one", last);
+ }
+
+ @Test
+ public void testReplaySubjectWithCapacity() {
+ ReplaySubject subject = ReplaySubject.create(1);
+ subject.onNext("one");
+ subject.onNext("two");
+ subject.onNext("three");
+
+ String last = subject.asObservable().toBlockingObservable().first();
+ assertEquals("three", last);
+ }
+
private static class TestException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
diff --git a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java
index 87cfdb4b8d..adea522e96 100644
--- a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java
+++ b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java
@@ -1,12 +1,12 @@
/**
* Copyright 2014 Netflix, Inc.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,22 +15,20 @@
*/
package rx.subjects;
-import static org.junit.Assert.*;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.*;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
-
+import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.mockito.InOrder;
+import static org.mockito.Matchers.any;
import org.mockito.Mockito;
-
+import static org.mockito.Mockito.*;
import rx.Observer;
import rx.Subscription;
import rx.observers.TestObserver;
import rx.schedulers.Schedulers;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
public class ReplaySubjectTest {
private final Throwable testException = new Throwable();
@@ -63,14 +61,10 @@ public void testCompleted() {
@Test
public void testCompletedStopsEmittingData() {
ReplaySubject channel = ReplaySubject.create();
- @SuppressWarnings("unchecked")
- Observer