diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 2ac2fb7b5d..ba777759cb 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2167,6 +2167,41 @@ public static Observable timer(long initialDelay, long period, TimeUnit un return create(new OperationTimer.TimerPeriodically(initialDelay, period, unit, scheduler)); } + /** + * Create an Observable which delays the events via another Observable on a per item-basis. + *

+ * Note: onError event is immediately propagated. + * + * @param the item delay value type (ignored) + * @param itemDelay function that returns an Observable for each source item which is + * then used for delaying that particular item until the Observable + * fires its first onNext event. + * @return an Observable which delays the events via another Observable on a per item-basis. + */ + public Observable delay(Func1> itemDelay) { + return create(OperationDelay.delay(this, itemDelay)); + } + /** + * Create an Observable which delays the subscription and events via another Observables on a per item-basis. + *

+ * Note: onError event is immediately propagated. + * + * @param the subscription delay value type (ignored) + * @param the item delay value type (ignored) + * @param subscriptionDelay function that returns an Observable which will trigger + * the subscription to the source observable once it fires an + * onNext event. + * @param itemDelay function that returns an Observable for each source item which is + * then used for delaying that particular item until the Observable + * fires its first onNext event. + * @return an Observable which delays the events via another Observable on a per item-basis. + */ + public Observable delay( + Func0> subscriptionDelay, + Func1> itemDelay) { + return create(OperationDelay.delay(this, subscriptionDelay, itemDelay)); + } + /** * Returns an Observable that emits the items emitted by the source * Observable shifted forward in time by a specified delay. Error diff --git a/rxjava-core/src/main/java/rx/operators/OperationDelay.java b/rxjava-core/src/main/java/rx/operators/OperationDelay.java index 4594adda76..e553d09b60 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDelay.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDelay.java @@ -23,8 +23,11 @@ import rx.Scheduler; import rx.Subscription; import rx.observables.ConnectableObservable; +import rx.subscriptions.CompositeSubscription; import rx.subscriptions.SerialSubscription; +import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; +import rx.util.functions.Func0; import rx.util.functions.Func1; public final class OperationDelay { @@ -82,4 +85,213 @@ public void call() { return ssub; } } + /** + * Delay the emission of the source items by a per-item observable that fires its first element. + */ + public static OnSubscribeFunc delay(Observable source, + Func1> itemDelay) { + return new DelayViaObservable(source, null, itemDelay); + } + /** + * Delay the subscription and emission of the source items by a per-item observable that fires its first element. + */ + public static OnSubscribeFunc delay(Observable source, + Func0> subscriptionDelay, + Func1> itemDelay) { + return new DelayViaObservable(source, subscriptionDelay, itemDelay); + } + /** + * Delay the emission of the source items by a per-item observable that fires its first element. + */ + private static final class DelayViaObservable implements OnSubscribeFunc { + final Observable source; + final Func0> subscriptionDelay; + final Func1> itemDelay; + + public DelayViaObservable(Observable source, + Func0> subscriptionDelay, + Func1> itemDelay) { + this.source = source; + this.subscriptionDelay = subscriptionDelay; + this.itemDelay = itemDelay; + } + + @Override + public Subscription onSubscribe(Observer t1) { + CompositeSubscription csub = new CompositeSubscription(); + + SerialSubscription sosub = new SerialSubscription(); + csub.add(sosub); + SourceObserver so = new SourceObserver(t1, itemDelay, csub, sosub); + if (subscriptionDelay == null) { + sosub.set(source.subscribe(so)); + } else { + Observable subscriptionSource; + try { + subscriptionSource = subscriptionDelay.call(); + } catch (Throwable t) { + t1.onError(t); + return Subscriptions.empty(); + } + SerialSubscription ssub = new SerialSubscription(); + csub.add(ssub); + ssub.set(subscriptionSource.subscribe(new SubscribeDelay(source, so, csub, ssub))); + } + + return csub; + } + /** Subscribe delay observer. */ + private static final class SubscribeDelay implements Observer { + final Observable source; + final SourceObserver so; + final CompositeSubscription csub; + final Subscription self; + /** Prevent any onError once the first item was delivered. */ + boolean subscribed; + + public SubscribeDelay( + Observable source, + SourceObserver so, + CompositeSubscription csub, Subscription self) { + this.source = source; + this.so = so; + this.csub = csub; + this.self = self; + } + + @Override + public void onNext(U args) { + onCompleted(); + } + + @Override + public void onError(Throwable e) { + if (!subscribed) { + so.observer.onError(e); + csub.unsubscribe(); + } + } + + @Override + public void onCompleted() { + subscribed = true; + csub.remove(self); + so.self.set(source.subscribe(so)); + } + } + /** The source observer. */ + private static final class SourceObserver implements Observer { + final Observer observer; + final Func1> itemDelay; + final CompositeSubscription csub; + final SerialSubscription self; + /** Guard to avoid overlapping events from the various sources. */ + final Object guard; + boolean done; + int wip; + + public SourceObserver(Observer observer, + Func1> itemDelay, + CompositeSubscription csub, + SerialSubscription self) { + this.observer = observer; + this.itemDelay = itemDelay; + this.csub = csub; + this.guard = new Object(); + this.self = self; + } + + @Override + public void onNext(T args) { + Observable delayer; + try { + delayer = itemDelay.call(args); + } catch (Throwable t) { + onError(t); + return; + } + + synchronized (guard) { + wip++; + } + + SerialSubscription ssub = new SerialSubscription(); + csub.add(ssub); + ssub.set(delayer.subscribe(new DelayObserver(args, this, ssub))); + } + + @Override + public void onError(Throwable e) { + synchronized (guard) { + observer.onError(e); + } + csub.unsubscribe(); + } + + @Override + public void onCompleted() { + boolean b; + synchronized (guard) { + done = true; + b = checkDone(); + } + if (b) { + csub.unsubscribe(); + } else { + self.unsubscribe(); + } + } + + void emit(T value, Subscription token) { + boolean b; + synchronized (guard) { + observer.onNext(value); + wip--; + b = checkDone(); + } + if (b) { + csub.unsubscribe(); + } else { + csub.remove(token); + } + } + boolean checkDone() { + if (done && wip == 0) { + observer.onCompleted(); + return true; + } + return false; + } + } + /** + * Delay observer. + */ + private static final class DelayObserver implements Observer { + final T value; + final SourceObserver parent; + final Subscription token; + + public DelayObserver(T value, SourceObserver parent, Subscription token) { + this.value = value; + this.parent = parent; + this.token = token; + } + + @Override + public void onNext(U args) { + parent.emit(value, token); + } + + @Override + public void onError(Throwable e) { + parent.onError(e); + } + + @Override + public void onCompleted() { + parent.emit(value, token); + } + + } + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java b/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java index 10fdd459bd..81728753c6 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java @@ -15,6 +15,8 @@ */ package rx.operators; +import java.util.ArrayList; +import java.util.List; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.inOrder; @@ -36,6 +38,8 @@ import rx.Observer; import rx.Subscription; import rx.schedulers.TestScheduler; +import rx.subjects.PublishSubject; +import rx.util.functions.Func0; import rx.util.functions.Func1; public class OperationDelayTest { @@ -235,4 +239,308 @@ public void testDelaySubscriptionCancelBeforeTime() { verify(o, never()).onCompleted(); verify(o, never()).onError(any(Throwable.class)); } + + @Test + public void testDelayWithObservableNormal1() { + PublishSubject source = PublishSubject.create(); + final List> delays = new ArrayList>(); + final int n = 10; + for (int i = 0; i < n; i++) { + PublishSubject delay = PublishSubject.create(); + delays.add(delay); + } + + Func1> delayFunc = new Func1>() { + @Override + public Observable call(Integer t1) { + return delays.get(t1); + } + }; + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(delayFunc).subscribe(o); + + + for (int i = 0; i < n; i++) { + source.onNext(i); + delays.get(i).onNext(i); + inOrder.verify(o).onNext(i); + } + source.onCompleted(); + + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testDelayWithObservableSingleSend1() { + PublishSubject source = PublishSubject.create(); + final PublishSubject delay = PublishSubject.create(); + + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return delay; + } + }; + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(delayFunc).subscribe(o); + + source.onNext(1); + delay.onNext(1); + delay.onNext(2); + + inOrder.verify(o).onNext(1); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onError(any(Throwable.class)); + } + @Test + public void testDelayWithObservableSourceThrows() { + PublishSubject source = PublishSubject.create(); + final PublishSubject delay = PublishSubject.create(); + + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return delay; + } + }; + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(delayFunc).subscribe(o); + source.onNext(1); + source.onError(new OperationReduceTest.CustomException()); + delay.onNext(1); + + inOrder.verify(o).onError(any(OperationReduceTest.CustomException.class)); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + } + @Test + public void testDelayWithObservableDelayFunctionThrows() { + PublishSubject source = PublishSubject.create(); + + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + throw new OperationReduceTest.CustomException(); + } + }; + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(delayFunc).subscribe(o); + source.onNext(1); + + inOrder.verify(o).onError(any(OperationReduceTest.CustomException.class)); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + } + + @Test + public void testDelayWithObservableDelayThrows() { + PublishSubject source = PublishSubject.create(); + final PublishSubject delay = PublishSubject.create(); + + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return delay; + } + }; + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(delayFunc).subscribe(o); + source.onNext(1); + delay.onError(new OperationReduceTest.CustomException()); + + inOrder.verify(o).onError(any(OperationReduceTest.CustomException.class)); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + } + @Test + public void testDelayWithObservableSubscriptionNormal() { + PublishSubject source = PublishSubject.create(); + final PublishSubject delay = PublishSubject.create(); + Func0> subFunc = new Func0>() { + @Override + public Observable call() { + return delay; + } + }; + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return delay; + } + }; + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(subFunc, delayFunc).subscribe(o); + + source.onNext(1); + delay.onNext(1); + + source.onNext(2); + delay.onNext(2); + + inOrder.verify(o).onNext(2); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onError(any(Throwable.class)); + verify(o, never()).onCompleted(); + } + @Test + public void testDelayWithObservableSubscriptionFunctionThrows() { + PublishSubject source = PublishSubject.create(); + final PublishSubject delay = PublishSubject.create(); + Func0> subFunc = new Func0>() { + @Override + public Observable call() { + throw new OperationReduceTest.CustomException(); + } + }; + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return delay; + } + }; + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(subFunc, delayFunc).subscribe(o); + + source.onNext(1); + delay.onNext(1); + + source.onNext(2); + + inOrder.verify(o).onError(any(OperationReduceTest.CustomException.class)); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + } + @Test + public void testDelayWithObservableSubscriptionThrows() { + PublishSubject source = PublishSubject.create(); + final PublishSubject delay = PublishSubject.create(); + Func0> subFunc = new Func0>() { + @Override + public Observable call() { + return delay; + } + }; + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return delay; + } + }; + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(subFunc, delayFunc).subscribe(o); + + source.onNext(1); + delay.onError(new OperationReduceTest.CustomException()); + + source.onNext(2); + + inOrder.verify(o).onError(any(OperationReduceTest.CustomException.class)); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + } + @Test + public void testDelayWithObservableEmptyDelayer() { + PublishSubject source = PublishSubject.create(); + + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return Observable.empty(); + } + }; + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(delayFunc).subscribe(o); + + source.onNext(1); + source.onCompleted(); + + inOrder.verify(o).onNext(1); + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testDelayWithObservableSubscriptionRunCompletion() { + PublishSubject source = PublishSubject.create(); + final PublishSubject sdelay = PublishSubject.create(); + final PublishSubject delay = PublishSubject.create(); + Func0> subFunc = new Func0>() { + @Override + public Observable call() { + return sdelay; + } + }; + Func1> delayFunc = new Func1>() { + + @Override + public Observable call(Integer t1) { + return delay; + } + }; + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.delay(subFunc, delayFunc).subscribe(o); + + source.onNext(1); + sdelay.onCompleted(); + + source.onNext(2); + delay.onNext(2); + + inOrder.verify(o).onNext(2); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onError(any(Throwable.class)); + verify(o, never()).onCompleted(); + } }