diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 2ac2fb7b5d..896150e8d1 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -3373,6 +3373,21 @@ public Observable> window(Observable the window element type (ignored) + * @param boundary the Observable sequence whose emitted item is used for closing + * and opening windows + * @return an Observable which emits non-overlapping windows of items it collects from the + * source observable where the boundary of each window is determined by the items + * emitted from the boundary observable + */ + public Observable> window(Observable boundary) { + return create(OperationWindow.window(this, boundary)); + } + /** * Creates an Observable that emits windows of items it collects from the * source Observable. The resulting Observable emits connected, diff --git a/rxjava-core/src/main/java/rx/operators/OperationWindow.java b/rxjava-core/src/main/java/rx/operators/OperationWindow.java index ef867898ed..c1026241a2 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationWindow.java +++ b/rxjava-core/src/main/java/rx/operators/OperationWindow.java @@ -23,6 +23,10 @@ import rx.Scheduler; import rx.Subscription; import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; +import rx.subjects.Subject; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; import rx.util.functions.Func0; import rx.util.functions.Func1; @@ -354,4 +358,144 @@ public Observable getContents() { return Observable.from(contents); } } + /** + * Emits windows of values of the source Observable where the window boundary is + * determined by the items of the boundary Observable. + */ + public static OnSubscribeFunc> window(Observable source, Observable boundary) { + return new WindowViaObservable(source, boundary); + } + /** + * Create non-overlapping windows from the source values by using another observable's + * values as to when to replace a window. + */ + private static final class WindowViaObservable implements OnSubscribeFunc> { + final Observable source; + final Observable boundary; + + public WindowViaObservable(Observable source, Observable boundary) { + this.source = source; + this.boundary = boundary; + } + + @Override + public Subscription onSubscribe(Observer> t1) { + CompositeSubscription csub = new CompositeSubscription(); + + final SourceObserver so = new SourceObserver(t1, csub); + try { + t1.onNext(so.subject); + } catch (Throwable t) { + t1.onError(t); + return Subscriptions.empty(); + } + csub.add(source.subscribe(so)); + + if (!csub.isUnsubscribed()) { + csub.add(boundary.subscribe(new BoundaryObserver(so))); + } + + return csub; + } + /** + * Observe the source and emit the values into the current window. + */ + private static final class SourceObserver implements Observer { + final Observer> observer; + final Subscription cancel; + final Object guard; + Subject subject; + + public SourceObserver(Observer> observer, Subscription cancel) { + this.observer = observer; + this.cancel = cancel; + this.guard = new Object(); + this.subject = create(); + } + + Subject create() { + return PublishSubject.create(); + } + @Override + public void onNext(T args) { + synchronized (guard) { + if (subject == null) { + return; + } + subject.onNext(args); + } + } + + @Override + public void onError(Throwable e) { + synchronized (guard) { + if (subject == null) { + return; + } + Subject s = subject; + subject = null; + + s.onError(e); + observer.onError(e); + } + cancel.unsubscribe(); + } + + @Override + public void onCompleted() { + synchronized (guard) { + if (subject == null) { + return; + } + Subject s = subject; + subject = null; + + s.onCompleted(); + observer.onCompleted(); + } + cancel.unsubscribe(); + } + public void replace() { + try { + synchronized (guard) { + if (subject == null) { + return; + } + Subject s = subject; + s.onCompleted(); + + subject = create(); + observer.onNext(subject); + } + } catch (Throwable t) { + onError(t); + } + } + } + /** + * Observe the boundary and replace the window on each item. + */ + private static final class BoundaryObserver implements Observer { + final SourceObserver so; + + public BoundaryObserver(SourceObserver so) { + this.so = so; + } + + @Override + public void onNext(U args) { + so.replace(); + } + + @Override + public void onError(Throwable e) { + so.onError(e); + } + + @Override + public void onCompleted() { + so.onCompleted(); + } + } + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java index ebe26dacba..4886ed1661 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java @@ -24,12 +24,14 @@ import org.junit.Before; import org.junit.Test; +import static org.mockito.Mockito.*; import rx.Observable; import rx.Observer; import rx.Subscription; import rx.schedulers.Schedulers; import rx.schedulers.TestScheduler; +import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; @@ -321,4 +323,223 @@ public void onNext(String args) { } }; } + @Test + public void testWindowViaObservableNormal1() { + PublishSubject source = PublishSubject.create(); + PublishSubject boundary = PublishSubject.create(); + + @SuppressWarnings("unchecked") + final Observer o = mock(Observer.class); + + final List> values = new ArrayList>(); + + Observer> wo = new Observer>() { + @Override + public void onNext(Observable args) { + @SuppressWarnings("unchecked") + final Observer mo = mock(Observer.class); + values.add(mo); + + args.subscribe(mo); + } + + @Override + public void onError(Throwable e) { + o.onError(e); + } + + @Override + public void onCompleted() { + o.onCompleted(); + } + }; + + source.window(boundary).subscribe(wo); + + int n = 30; + for (int i = 0; i < n; i++) { + source.onNext(i); + if (i % 3 == 2 && i < n - 1) { + boundary.onNext(i / 3); + } + } + source.onCompleted(); + + assertEquals(n / 3, values.size()); + + int j = 0; + for (Observer mo : values) { + for (int i = 0; i < 3; i++) { + verify(mo).onNext(j + i); + } + verify(mo).onCompleted(); + verify(mo, never()).onError(any(Throwable.class)); + j += 3; + } + + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testWindowViaObservableBoundaryCompletes() { + PublishSubject source = PublishSubject.create(); + PublishSubject boundary = PublishSubject.create(); + + @SuppressWarnings("unchecked") + final Observer o = mock(Observer.class); + + final List> values = new ArrayList>(); + + Observer> wo = new Observer>() { + @Override + public void onNext(Observable args) { + @SuppressWarnings("unchecked") + final Observer mo = mock(Observer.class); + values.add(mo); + + args.subscribe(mo); + } + + @Override + public void onError(Throwable e) { + o.onError(e); + } + + @Override + public void onCompleted() { + o.onCompleted(); + } + }; + + source.window(boundary).subscribe(wo); + + int n = 30; + for (int i = 0; i < n; i++) { + source.onNext(i); + if (i % 3 == 2 && i < n - 1) { + boundary.onNext(i / 3); + } + } + boundary.onCompleted(); + + assertEquals(n / 3, values.size()); + + int j = 0; + for (Observer mo : values) { + for (int i = 0; i < 3; i++) { + verify(mo).onNext(j + i); + } + verify(mo).onCompleted(); + verify(mo, never()).onError(any(Throwable.class)); + j += 3; + } + + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testWindowViaObservableBoundaryThrows() { + PublishSubject source = PublishSubject.create(); + PublishSubject boundary = PublishSubject.create(); + + @SuppressWarnings("unchecked") + final Observer o = mock(Observer.class); + + final List> values = new ArrayList>(); + + Observer> wo = new Observer>() { + @Override + public void onNext(Observable args) { + @SuppressWarnings("unchecked") + final Observer mo = mock(Observer.class); + values.add(mo); + + args.subscribe(mo); + } + + @Override + public void onError(Throwable e) { + o.onError(e); + } + + @Override + public void onCompleted() { + o.onCompleted(); + } + }; + + source.window(boundary).subscribe(wo); + + source.onNext(0); + source.onNext(1); + source.onNext(2); + + boundary.onError(new OperationReduceTest.CustomException()); + + assertEquals(1, values.size()); + + Observer mo = values.get(0); + + verify(mo).onNext(0); + verify(mo).onNext(1); + verify(mo).onNext(2); + verify(mo).onError(any(OperationReduceTest.CustomException.class)); + + verify(o, never()).onCompleted(); + verify(o).onError(any(OperationReduceTest.CustomException.class)); + } + + @Test + public void testWindowViaObservableourceThrows() { + PublishSubject source = PublishSubject.create(); + PublishSubject boundary = PublishSubject.create(); + + @SuppressWarnings("unchecked") + final Observer o = mock(Observer.class); + + final List> values = new ArrayList>(); + + Observer> wo = new Observer>() { + @Override + public void onNext(Observable args) { + @SuppressWarnings("unchecked") + final Observer mo = mock(Observer.class); + values.add(mo); + + args.subscribe(mo); + } + + @Override + public void onError(Throwable e) { + o.onError(e); + } + + @Override + public void onCompleted() { + o.onCompleted(); + } + }; + + source.window(boundary).subscribe(wo); + + source.onNext(0); + source.onNext(1); + source.onNext(2); + + source.onError(new OperationReduceTest.CustomException()); + + assertEquals(1, values.size()); + + Observer mo = values.get(0); + + verify(mo).onNext(0); + verify(mo).onNext(1); + verify(mo).onNext(2); + verify(mo).onError(any(OperationReduceTest.CustomException.class)); + + verify(o, never()).onCompleted(); + verify(o).onError(any(OperationReduceTest.CustomException.class)); + } }