diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 5ea6b627e4..d22f8ce95f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -15,16 +15,7 @@ */ package rx.operators; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; -import static rx.operators.AbstractOperation.UnitTest.*; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - import org.junit.Test; - import rx.Observable; import rx.Observer; import rx.Subscription; @@ -32,6 +23,17 @@ import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static rx.operators.AbstractOperation.UnitTest.assertTrustedObservable; /** * Returns a specified number of contiguous values from the start of an observable sequence. */ diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java index f45efabc92..8b384528b1 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -26,6 +26,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; @@ -162,8 +163,8 @@ public Boolean call(Integer input) @Test public void testTakeWhileOnSubject1() { - Subject s = Subject.create(); - Observable w = (Observable)s; + PublishSubject s = PublishSubject.create(); + Observable w = s; Observable take = Observable.create(takeWhile(w, new Func1() { @Override diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java new file mode 100644 index 0000000000..ab317815da --- /dev/null +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -0,0 +1,307 @@ +package rx.subjects; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import junit.framework.Assert; + +import org.junit.Test; + +import org.mockito.Mockito; +import rx.Notification; +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.testing.UnsubscribeTester; +import rx.util.AtomicObservableSubscription; +import rx.util.SynchronizedObserver; +import rx.util.functions.Action1; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class PublishSubject extends Subject { + public static PublishSubject create() { + final ConcurrentHashMap> observers = new ConcurrentHashMap>(); + + Func1, Subscription> onSubscribe = new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + + subscription.wrap(new Subscription() { + @Override + public void unsubscribe() { + // on unsubscribe remove it from the map of outbound observers to notify + observers.remove(subscription); + } + }); + + // on subscribe add it to the map of outbound observers to notify + observers.put(subscription, new SynchronizedObserver(observer, subscription)); + return subscription; + } + }; + + return new PublishSubject(onSubscribe, observers); + } + + private final ConcurrentHashMap> observers; + + protected PublishSubject(Func1, Subscription> onSubscribe, ConcurrentHashMap> observers) { + super(onSubscribe); + this.observers = observers; + } + + @Override + public void onCompleted() { + for (Observer observer : observers.values()) { + observer.onCompleted(); + } + } + + @Override + public void onError(Exception e) { + for (Observer observer : observers.values()) { + observer.onError(e); + } + } + + @Override + public void onNext(T args) { + for (Observer observer : observers.values()) { + observer.onNext(args); + } + } + + public static class UnitTest { + @Test + public void test() { + PublishSubject publishSubject = PublishSubject.create(); + final AtomicReference>> actualRef = new AtomicReference>>(); + + Observable>> wNotificationsList = publishSubject.materialize().toList(); + wNotificationsList.subscribe(new Action1>>() { + @Override + public void call(List> actual) { + actualRef.set(actual); + } + }); + + Subscription sub = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(final Observer observer) { + final AtomicBoolean stop = new AtomicBoolean(false); + new Thread() { + @Override + public void run() { + int i = 1; + while (!stop.get()) { + observer.onNext(i++); + } + observer.onCompleted(); + } + }.start(); + return new Subscription() { + @Override + public void unsubscribe() { + stop.set(true); + } + }; + } + }).subscribe(publishSubject); + // the publishSubject has received an onComplete from the first subscribe because + // it is synchronous and the next subscribe won't do anything. + Observable.toObservable(-1, -2, -3).subscribe(publishSubject); + + List> expected = new ArrayList>(); + expected.add(new Notification(-1)); + expected.add(new Notification(-2)); + expected.add(new Notification(-3)); + expected.add(new Notification()); + Assert.assertTrue(actualRef.get().containsAll(expected)); + + sub.unsubscribe(); + } + + private final Exception testException = new Exception(); + + @Test + public void testCompleted() { + PublishSubject subject = PublishSubject.create(); + + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + subject.onNext("three"); + subject.onCompleted(); + + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + + subject.onNext("four"); + subject.onCompleted(); + subject.onError(new Exception()); + + assertCompletedObserver(aObserver); +// todo bug? assertNeverObserver(anotherObserver); + } + + private void assertCompletedObserver(Observer aObserver) + { + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + private void assertNeverObserver(Observer aObserver) + { + verify(aObserver, Mockito.never()).onNext(any(String.class)); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onCompleted(); + } + + @Test + public void testError() { + PublishSubject subject = PublishSubject.create(); + + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + subject.onNext("three"); + subject.onError(testException); + + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + + subject.onNext("four"); + subject.onError(new Exception()); + subject.onCompleted(); + + assertErrorObserver(aObserver); +// todo bug? assertNeverObserver(anotherObserver); + } + + private void assertErrorObserver(Observer aObserver) + { + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, times(1)).onError(testException); + verify(aObserver, Mockito.never()).onCompleted(); + } + + + @Test + public void testSubscribeMidSequence() { + PublishSubject subject = PublishSubject.create(); + + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + + assertObservedUntilTwo(aObserver); + + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + + subject.onNext("three"); + subject.onCompleted(); + + assertCompletedObserver(aObserver); + assertCompletedStartingWithThreeObserver(anotherObserver); + } + + + private void assertCompletedStartingWithThreeObserver(Observer aObserver) + { + verify(aObserver, Mockito.never()).onNext("one"); + verify(aObserver, Mockito.never()).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testUnsubscribeFirstObserver() { + PublishSubject subject = PublishSubject.create(); + + Observer aObserver = mock(Observer.class); + Subscription subscription = subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + + subscription.unsubscribe(); + assertObservedUntilTwo(aObserver); + + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + + subject.onNext("three"); + subject.onCompleted(); + + assertObservedUntilTwo(aObserver); + assertCompletedStartingWithThreeObserver(anotherObserver); + } + + private void assertObservedUntilTwo(Observer aObserver) + { + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, Mockito.never()).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onCompleted(); + } + + @Test + public void testUnsubscribe() + { + UnsubscribeTester.test(new Func0>() + { + @Override + public PublishSubject call() + { + return PublishSubject.create(); + } + }, new Action1>() + { + @Override + public void call(PublishSubject PublishSubject) + { + PublishSubject.onCompleted(); + } + }, new Action1>() + { + @Override + public void call(PublishSubject PublishSubject) + { + PublishSubject.onError(new Exception()); + } + }, new Action1>() + { + @Override + public void call(PublishSubject PublishSubject) + { + PublishSubject.onNext("one"); + } + } + ); + } + } +} diff --git a/rxjava-core/src/main/java/rx/subjects/RepeatSubject.java b/rxjava-core/src/main/java/rx/subjects/RepeatSubject.java new file mode 100644 index 0000000000..7ab3a9e2c7 --- /dev/null +++ b/rxjava-core/src/main/java/rx/subjects/RepeatSubject.java @@ -0,0 +1,306 @@ +package rx.subjects; + +import org.junit.Test; +import org.mockito.Mockito; +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.testing.UnsubscribeTester; +import rx.util.functions.Action1; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public final class RepeatSubject extends Subject +{ + + private boolean isDone = false; + private Exception exception = null; + private final Map> subscriptions = new HashMap>(); + private final List history = Collections.synchronizedList(new ArrayList()); + + public static RepeatSubject create() { + return new RepeatSubject(new DelegateSubscriptionFunc()); + } + + private RepeatSubject(DelegateSubscriptionFunc onSubscribe) { + super(onSubscribe); + onSubscribe.wrap(new SubscriptionFunc()); + } + private static final class DelegateSubscriptionFunc implements Func1,Subscription> + { + private Func1, Subscription> delegate = null; + + public void wrap(Func1, Subscription> delegate) + { + if (this.delegate != null) { + throw new UnsupportedOperationException("delegate already set"); + } + this.delegate = delegate; + } + + @Override + public Subscription call(Observer observer) + { + return delegate.call(observer); + } + } + + private class SubscriptionFunc implements Func1, Subscription> + { + @Override + public Subscription call(Observer observer) { + int item = 0; + Subscription subscription; + + for (;;) { + while (item < history.size()) { + observer.onNext(history.get(item++)); + } + + synchronized (subscriptions) { + if (item < history.size()) { + continue; + } + + if (exception != null) { + observer.onError(exception); + return Subscriptions.empty(); + } + if (isDone) { + observer.onCompleted(); + return Subscriptions.empty(); + } + + subscription = new RepeatSubjectSubscription(); + subscriptions.put(subscription, observer); + break; + } + } + + return subscription; + } + } + + private class RepeatSubjectSubscription implements Subscription + { + @Override + public void unsubscribe() + { + synchronized (subscriptions) { + subscriptions.remove(this); + } + } + } + + @Override + public void onCompleted() + { + synchronized (subscriptions) { + isDone = true; + for (Observer observer : new ArrayList>(subscriptions.values())) { + observer.onCompleted(); + } + subscriptions.clear(); + } + } + + @Override + public void onError(Exception e) + { + synchronized (subscriptions) { + if (isDone) { + return; + } + isDone = true; + exception = e; + for (Observer observer : new ArrayList>(subscriptions.values())) { + observer.onError(e); + } + subscriptions.clear(); + } + } + + @Override + public void onNext(T args) + { + synchronized (subscriptions) { + history.add(args); + for (Observer observer : new ArrayList>(subscriptions.values())) { + observer.onNext(args); + } + } + } + + public static class UnitTest { + + private final Exception testException = new Exception(); + + @Test + public void testCompleted() { + RepeatSubject subject = RepeatSubject.create(); + + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + subject.onNext("three"); + subject.onCompleted(); + + subject.onNext("four"); + subject.onCompleted(); + subject.onError(new Exception()); + + assertCompletedObserver(aObserver); + + aObserver = mock(Observer.class); + subject.subscribe(aObserver); + assertCompletedObserver(aObserver); + } + + private void assertCompletedObserver(Observer aObserver) + { + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testError() { + RepeatSubject subject = RepeatSubject.create(); + + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + subject.onNext("three"); + subject.onError(testException); + + subject.onNext("four"); + subject.onError(new Exception()); + subject.onCompleted(); + + assertErrorObserver(aObserver); + + aObserver = mock(Observer.class); + subject.subscribe(aObserver); + assertErrorObserver(aObserver); + } + + private void assertErrorObserver(Observer aObserver) + { + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, times(1)).onError(testException); + verify(aObserver, Mockito.never()).onCompleted(); + } + + + @Test + public void testSubscribeMidSequence() { + RepeatSubject subject = RepeatSubject.create(); + + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + + assertObservedUntilTwo(aObserver); + + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + assertObservedUntilTwo(anotherObserver); + + subject.onNext("three"); + subject.onCompleted(); + + assertCompletedObserver(aObserver); + assertCompletedObserver(anotherObserver); + } + + @Test + public void testUnsubscribeFirstObserver() { + RepeatSubject subject = RepeatSubject.create(); + + Observer aObserver = mock(Observer.class); + Subscription subscription = subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + + subscription.unsubscribe(); + assertObservedUntilTwo(aObserver); + + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + assertObservedUntilTwo(anotherObserver); + + subject.onNext("three"); + subject.onCompleted(); + + assertObservedUntilTwo(aObserver); + assertCompletedObserver(anotherObserver); + } + + private void assertObservedUntilTwo(Observer aObserver) + { + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, Mockito.never()).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onCompleted(); + } + + @Test + public void testUnsubscribe() + { + UnsubscribeTester.test(new Func0>() + { + @Override + public RepeatSubject call() + { + return RepeatSubject.create(); + } + }, new Action1>() + { + @Override + public void call(RepeatSubject repeatSubject) + { + repeatSubject.onCompleted(); + } + }, new Action1>() + { + @Override + public void call(RepeatSubject repeatSubject) + { + repeatSubject.onError(new Exception()); + } + }, new Action1>() + { + @Override + public void call(RepeatSubject repeatSubject) + { + repeatSubject.onNext("one"); + } + } + ); + } + } +} diff --git a/rxjava-core/src/main/java/rx/subjects/Subject.java b/rxjava-core/src/main/java/rx/subjects/Subject.java index c9279adb38..ca9242d471 100644 --- a/rxjava-core/src/main/java/rx/subjects/Subject.java +++ b/rxjava-core/src/main/java/rx/subjects/Subject.java @@ -1,126 +1,18 @@ package rx.subjects; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import junit.framework.Assert; - -import org.junit.Test; - -import rx.Notification; import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.util.AtomicObservableSubscription; -import rx.util.SynchronizedObserver; -import rx.util.functions.Action1; import rx.util.functions.Func1; -public class Subject extends Observable implements Observer { - public static Subject create() { - final ConcurrentHashMap> observers = new ConcurrentHashMap>(); - - Func1, Subscription> onSubscribe = new Func1, Subscription>() { - @Override - public Subscription call(Observer observer) { - final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); - - subscription.wrap(new Subscription() { - @Override - public void unsubscribe() { - // on unsubscribe remove it from the map of outbound observers to notify - observers.remove(subscription); - } - }); - - // on subscribe add it to the map of outbound observers to notify - observers.put(subscription, new SynchronizedObserver(observer, subscription)); - return subscription; - } - }; - - return new Subject(onSubscribe, observers); +public abstract class Subject extends Observable implements Observer { + protected Subject() + { + super(); } - private final ConcurrentHashMap> observers; - - protected Subject(Func1, Subscription> onSubscribe, ConcurrentHashMap> observers) { + protected Subject(Func1, Subscription> onSubscribe) + { super(onSubscribe); - this.observers = observers; - } - - @Override - public void onCompleted() { - for (Observer observer : observers.values()) { - observer.onCompleted(); - } - } - - @Override - public void onError(Exception e) { - for (Observer observer : observers.values()) { - observer.onError(e); - } - } - - @Override - public void onNext(T args) { - for (Observer observer : observers.values()) { - observer.onNext(args); - } - } - - public static class UnitTest { - @Test - public void test() { - Subject subject = Subject.create(); - final AtomicReference>> actualRef = new AtomicReference>>(); - - Observable>> wNotificationsList = subject.materialize().toList(); - wNotificationsList.subscribe(new Action1>>() { - @Override - public void call(List> actual) { - actualRef.set(actual); - } - }); - - Subscription sub = Observable.create(new Func1, Subscription>() { - @Override - public Subscription call(final Observer observer) { - final AtomicBoolean stop = new AtomicBoolean(false); - new Thread() { - @Override - public void run() { - int i = 1; - while (!stop.get()) { - observer.onNext(i++); - } - observer.onCompleted(); - } - }.start(); - return new Subscription() { - @Override - public void unsubscribe() { - stop.set(true); - } - }; - } - }).subscribe(subject); - // the subject has received an onComplete from the first subscribe because - // it is synchronous and the next subscribe won't do anything. - Observable.toObservable(-1, -2, -3).subscribe(subject); - - List> expected = new ArrayList>(); - expected.add(new Notification(-1)); - expected.add(new Notification(-2)); - expected.add(new Notification(-3)); - expected.add(new Notification()); - Assert.assertTrue(actualRef.get().containsAll(expected)); - - sub.unsubscribe(); - } } } diff --git a/rxjava-core/src/main/java/rx/testing/UnsubscribeTester.java b/rxjava-core/src/main/java/rx/testing/UnsubscribeTester.java new file mode 100644 index 0000000000..701e7025cf --- /dev/null +++ b/rxjava-core/src/main/java/rx/testing/UnsubscribeTester.java @@ -0,0 +1,157 @@ +package rx.testing; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.util.functions.Action1; +import rx.util.functions.Func0; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class UnsubscribeTester +{ + private boolean isDone = false; + private Subscription subscription; + + public UnsubscribeTester() {} + + /** + * Tests the unsubscription semantics of an observable. + * + * @param provider Function that when called provides an instance of the observable being tested + * @param generateOnCompleted Causes an observer generated by @param provider to generate an onCompleted event. Null to not test onCompleted. + * @param generateOnError Causes an observer generated by @param provider to generate an onError event. Null to not test onError. + * @param generateOnNext Causes an observer generated by @param provider to generate an onNext event. Null to not test onNext. + * @param The type of object passed by the Observable + */ + public static > void test(Func0 provider, Action1 generateOnCompleted, Action1 generateOnError, Action1 generateOnNext) + { + if (generateOnCompleted != null) { + O observable = provider.call(); + UnsubscribeTester tester1 = createOnCompleted(observable); + UnsubscribeTester tester2 = createOnCompleted(observable); + generateOnCompleted.call(observable); + tester1.assertPassed(); + tester2.assertPassed(); + } + if (generateOnError != null) { + O observable = provider.call(); + UnsubscribeTester tester1 = createOnError(observable); + UnsubscribeTester tester2 = createOnError(observable); + generateOnError.call(observable); + tester1.assertPassed(); + tester2.assertPassed(); + } + if (generateOnNext != null) { + O observable = provider.call(); + UnsubscribeTester tester1 = createOnNext(observable); + UnsubscribeTester tester2 = createOnNext(observable); + generateOnNext.call(observable); + tester1.assertPassed(); + tester2.assertPassed(); + } + } + + private static UnsubscribeTester createOnCompleted(Observable observable) + { + final UnsubscribeTester test = new UnsubscribeTester(); + test.setSubscription(observable.subscribe(new Observer() + { + @Override + public void onCompleted() + { + test.doUnsubscribe("onCompleted"); + } + + @Override + public void onError(Exception e) + { + test.gotEvent("onError"); + } + + @Override + public void onNext(T args) + { + test.gotEvent("onNext"); + } + })); + return test; + } + + private static UnsubscribeTester createOnError(Observable observable) + { + final UnsubscribeTester test = new UnsubscribeTester(); + test.setSubscription(observable.subscribe(new Observer() + { + @Override + public void onCompleted() + { + test.gotEvent("onCompleted"); + } + + @Override + public void onError(Exception e) + { + test.doUnsubscribe("onError"); + } + + @Override + public void onNext(T args) + { + test.gotEvent("onNext"); + } + })); + return test; + } + + private static UnsubscribeTester createOnNext(Observable observable) + { + final UnsubscribeTester test = new UnsubscribeTester(); + test.setSubscription(observable.subscribe(new Observer() + { + @Override + public void onCompleted() + { + test.gotEvent("onCompleted"); + } + + @Override + public void onError(Exception e) + { + test.gotEvent("onError"); + } + + @Override + public void onNext(T args) + { + test.doUnsubscribe("onNext"); + } + })); + return test; + } + + private void setSubscription(Subscription subscription) + { + this.subscription = subscription; + } + + private void gotEvent(String event) + { + assertFalse("received " + event + " after unsubscribe", isDone); + } + + private void doUnsubscribe(String event) + { + gotEvent(event); + if (subscription != null) { + isDone = true; + subscription.unsubscribe(); + } + } + + private void assertPassed() + { + assertTrue("expected notification was received", isDone); + } +}