From be8702d1c46d75c228d567df9a0726cf1fdc1dab Mon Sep 17 00:00:00 2001 From: Sha Date: Wed, 15 Jan 2014 15:53:04 +0800 Subject: [PATCH 1/4] modify replay subject to comply with original reactive extensions behaviour the capacity of ReplaySubject specifies maximum element count of the replay buffer code: https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/ReplaySubject.cs test: https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/ReplaySubjectTest.cs --- .../main/java/rx/subjects/ReplaySubject.java | 53 ++++++++++++++----- .../java/rx/subjects/ReplaySubjectTest.java | 42 +++++++++++---- 2 files changed, 70 insertions(+), 25 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index 69ddb7470b..2e11b47a0d 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -15,19 +15,20 @@ */ package rx.subjects; +import rx.Notification; +import rx.Observer; +import rx.subjects.SubjectSubscriptionManager.SubjectObserver; +import rx.util.functions.Action1; + import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import rx.Notification; -import rx.Observer; -import rx.subjects.SubjectSubscriptionManager.SubjectObserver; -import rx.util.functions.Action1; - /** - * Subject that retains all events and will replay them to an {@link Observer} that subscribes. + * Subject that retains events (unlimited or with given replay capacity) and will replay them to an {@link Observer} that subscribes. *

* *

@@ -50,12 +51,24 @@ * @param */ public final class ReplaySubject extends Subject { + private static final Integer ReplaySubjectUnlimitedCapacity = Integer.MAX_VALUE; + + /** + * @param + * @return a new replay subject with the unlimited capacity. + */ public static ReplaySubject create() { - return create(16); + return create(ReplaySubjectUnlimitedCapacity); } - public static ReplaySubject create(int initialCapacity) { + + /** + * @param capacity Maximum element count of the replay buffer + * @param + * @return a new replay subject with the given capacity. + */ + public static ReplaySubject create(int capacity) { final SubjectSubscriptionManager subscriptionManager = new SubjectSubscriptionManager(); - final ReplayState state = new ReplayState(initialCapacity); + final ReplayState state = new ReplayState(capacity); OnSubscribeFunc onSubscribe = subscriptionManager.getOnSubscribeFunc( /** @@ -96,8 +109,8 @@ private static class ReplayState { final History history; // each Observer is tracked here for what events they have received final ConcurrentHashMap, Integer> replayState; - public ReplayState(int initialCapacity) { - history = new History(initialCapacity); + public ReplayState(int capacity) { + history = new History(capacity); replayState = new ConcurrentHashMap, Integer>(); } } @@ -206,21 +219,33 @@ private static class History { private final AtomicInteger index; private final ArrayList list; private final AtomicReference> terminalValue; - public History(int initialCapacity) { + private final int capacity; + public History(int capacity) { + this.capacity = capacity; index = new AtomicInteger(0); - list = new ArrayList(initialCapacity); + list = this.capacity == ReplaySubjectUnlimitedCapacity ? new ArrayList(16) : new ArrayList(capacity); terminalValue = new AtomicReference>(); } + public boolean next(T n) { if (terminalValue.get() == null) { list.add(n); - index.getAndIncrement(); + trim(); return true; } else { return false; } } + private void trim() { + if (this.capacity != ReplaySubjectUnlimitedCapacity && list.size() > this.capacity ) { + List toRemove = list.subList(0, list.size() - this.capacity); + list.removeAll(toRemove); + } + + index.set(list.size()); + } + public void complete(Notification n) { terminalValue.set(n); } diff --git a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java index e853944820..9d04daccbd 100644 --- a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java +++ b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java @@ -16,19 +16,15 @@ package rx.subjects; import static org.junit.Assert.*; -import static org.mockito.Matchers.*; +import org.junit.*; +import org.mockito.*; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; +import rx.*; +import rx.schedulers.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; - -import org.junit.Test; -import org.mockito.InOrder; -import org.mockito.Mockito; - -import rx.Observer; -import rx.Subscription; -import rx.schedulers.Schedulers; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; public class ReplaySubjectTest { @@ -146,6 +142,30 @@ public void testCompletedAfterError() { verifyNoMoreInteractions(aObserver); } + @Test + public void testCapacity() { + ReplaySubject subject = ReplaySubject.create(1); + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + subject.onNext("three"); + subject.onCompleted(); + + assertCompletedObserver(aObserver); + + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + + verify(anotherObserver, times(0)).onNext("one"); + verify(anotherObserver, times(0)).onNext("two"); + verify(anotherObserver, times(1)).onNext("three"); + verify(anotherObserver, times(1)).onCompleted(); + verifyNoMoreInteractions(anotherObserver); + } + private void assertCompletedObserver(Observer aObserver) { InOrder inOrder = inOrder(aObserver); From 9bf2c44bc234c3dfffd18541b21ccad9c7a448ce Mon Sep 17 00:00:00 2001 From: Sha Date: Wed, 15 Jan 2014 16:21:44 +0800 Subject: [PATCH 2/4] fix java.util.ConcurrentModificationException --- rxjava-core/src/main/java/rx/subjects/ReplaySubject.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index 6d031e87e2..602c985688 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -236,8 +236,9 @@ public boolean next(T n) { private void trim() { if (this.capacity != ReplaySubjectUnlimitedCapacity && list.size() > this.capacity ) { - List toRemove = list.subList(0, list.size() - this.capacity); - list.removeAll(toRemove); + while(list.size() > this.capacity) { + list.remove(0); + } } index.set(list.size()); From 8c6049f05e13084277410524f607036dbac68fa4 Mon Sep 17 00:00:00 2001 From: Sha Date: Mon, 27 Jan 2014 22:55:58 +0700 Subject: [PATCH 3/4] added two test cases --- .../observables/BlockingObservableTest.java | 33 +++++++++++++++---- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java b/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java index 2d50027dad..82634909b1 100644 --- a/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java +++ b/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java @@ -15,25 +15,26 @@ */ package rx.observables; -import static org.junit.Assert.*; - -import java.util.Iterator; -import java.util.NoSuchElementException; - import org.junit.Assert; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; - import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subjects.BehaviorSubject; +import rx.subjects.ReplaySubject; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action1; import rx.util.functions.Func1; +import java.util.Iterator; +import java.util.NoSuchElementException; + public class BlockingObservableTest { @Mock @@ -382,6 +383,26 @@ public Boolean call(String args) { assertEquals("default", first); } + @Test + public void testBehaviorSubject() { + BehaviorSubject subject = BehaviorSubject.create("default"); + subject.onNext("one"); + BlockingObservable observable = subject.toBlockingObservable(); + String last = observable.first(); + assertEquals("one", last); + } + + @Test + public void testReplaySubjectWithCapacity() { + ReplaySubject subject = ReplaySubject.create(1); + subject.onNext("one"); + subject.onNext("two"); + subject.onNext("three"); + + String last = subject.asObservable().toBlockingObservable().first(); + assertEquals("three", last); + } + private static class TestException extends RuntimeException { private static final long serialVersionUID = 1L; } From a3aef85f8306e45d0fd0044001ff7b2a609d94b2 Mon Sep 17 00:00:00 2001 From: Sha Date: Tue, 28 Jan 2014 13:32:50 +0700 Subject: [PATCH 4/4] bug fixes --- .../main/java/rx/subjects/ReplaySubject.java | 1 + .../java/rx/subjects/ReplaySubjectTest.java | 60 +++++++++++++++---- 2 files changed, 48 insertions(+), 13 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index 602c985688..702ec8189a 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -83,6 +83,7 @@ public void call(SubjectObserver o) { int lastIndex = replayObserverFromIndex(state.history, 0, o); // now that it is caught up add to observers + o.caughtUp = true; state.replayState.put(o, lastIndex); } }, diff --git a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java index 25b815924b..a0eeaf3b13 100644 --- a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java +++ b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java @@ -144,26 +144,60 @@ public void testCompletedAfterError() { @Test public void testCapacity() { + { + ReplaySubject subject = ReplaySubject.create(1); + @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + subject.onNext("three"); + subject.onCompleted(); + + assertCompletedObserver(aObserver); + + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + + verify(anotherObserver, times(0)).onNext("one"); + verify(anotherObserver, times(0)).onNext("two"); + verify(anotherObserver, times(1)).onNext("three"); + verify(anotherObserver, times(1)).onCompleted(); + verifyNoMoreInteractions(anotherObserver); + } + { + ReplaySubject subject = ReplaySubject.create(1); + @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); + subject.onNext("one"); + + subject.asObservable().distinctUntilChanged().subscribe(aObserver); + + subject.onNext("two"); + subject.onNext("one"); + subject.onNext("one"); + verify(aObserver, times(2)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + } + } + + @Test + public void testThatObserverReceivesLatestAndThenSubsequentEvents() { ReplaySubject subject = ReplaySubject.create(1); + + subject.onNext("one"); + @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); subject.subscribe(aObserver); - - subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); - subject.onCompleted(); - assertCompletedObserver(aObserver); - - Observer anotherObserver = mock(Observer.class); - subject.subscribe(anotherObserver); - - verify(anotherObserver, times(0)).onNext("one"); - verify(anotherObserver, times(0)).onNext("two"); - verify(anotherObserver, times(1)).onNext("three"); - verify(anotherObserver, times(1)).onCompleted(); - verifyNoMoreInteractions(anotherObserver); + verify(aObserver, Mockito.never()).onNext("default"); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, Mockito.never()).onError(testException); + verify(aObserver, Mockito.never()).onCompleted(); } private void assertCompletedObserver(Observer aObserver) {