diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 2465bea27b..4ae993775b 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -26,6 +26,7 @@ import io.reactivex.internal.operators.completable.*; import io.reactivex.internal.operators.flowable.FlowableDelaySubscriptionOther; import io.reactivex.internal.operators.maybe.MaybeFromCompletable; +import io.reactivex.internal.operators.maybe.MaybeDelayWithCompletable; import io.reactivex.internal.operators.observable.ObservableDelaySubscriptionOther; import io.reactivex.internal.operators.single.SingleDelayWithCompletable; import io.reactivex.internal.util.ExceptionHelper; @@ -799,6 +800,26 @@ public final Single andThen(SingleSource next) { return RxJavaPlugins.onAssembly(new SingleDelayWithCompletable(next, this)); } + /** + * Returns a {@link Maybe} which will subscribe to this Completable and once that is completed then + * will subscribe to the {@code next} MaybeSource. An error event from this Completable will be + * propagated to the downstream subscriber and will result in skipping the subscription of the + * Maybe. + *
+ *
Scheduler:
+ *
{@code andThen} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the value type of the next MaybeSource + * @param next the Maybe to subscribe after this Completable is completed, not null + * @return Maybe that composes this Completable and next + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Maybe andThen(MaybeSource next) { + ObjectHelper.requireNonNull(next, "next is null"); + return RxJavaPlugins.onAssembly(new MaybeDelayWithCompletable(next, this)); + } + /** * Returns a Completable that first runs this Completable * and then the other completable. diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithCompletable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithCompletable.java new file mode 100644 index 0000000000..80cc10805e --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDelayWithCompletable.java @@ -0,0 +1,115 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.CompletableObserver; +import io.reactivex.CompletableSource; +import io.reactivex.Maybe; +import io.reactivex.MaybeObserver; +import io.reactivex.MaybeSource; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; +import java.util.concurrent.atomic.AtomicReference; + +public final class MaybeDelayWithCompletable extends Maybe { + + final MaybeSource source; + + final CompletableSource other; + + public MaybeDelayWithCompletable(MaybeSource source, CompletableSource other) { + this.source = source; + this.other = other; + } + + @Override + protected void subscribeActual(MaybeObserver subscriber) { + other.subscribe(new OtherObserver(subscriber, source)); + } + + static final class OtherObserver + extends AtomicReference + implements CompletableObserver, Disposable { + private static final long serialVersionUID = 703409937383992161L; + + final MaybeObserver actual; + + final MaybeSource source; + + OtherObserver(MaybeObserver actual, MaybeSource source) { + this.actual = actual; + this.source = source; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.setOnce(this, d)) { + + actual.onSubscribe(this); + } + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void onComplete() { + source.subscribe(new DelayWithMainObserver(this, actual)); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + } + + static final class DelayWithMainObserver implements MaybeObserver { + + final AtomicReference parent; + + final MaybeObserver actual; + + DelayWithMainObserver(AtomicReference parent, MaybeObserver actual) { + this.parent = parent; + this.actual = actual; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.replace(parent, d); + } + + @Override + public void onSuccess(T value) { + actual.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithCompletable.java b/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithCompletable.java index 855a67aa53..538580c510 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithCompletable.java @@ -53,7 +53,7 @@ static final class OtherObserver @Override public void onSubscribe(Disposable d) { - if (DisposableHelper.set(this, d)) { + if (DisposableHelper.setOnce(this, d)) { actual.onSubscribe(this); } diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenTest.java new file mode 100644 index 0000000000..8e6786d0a5 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenTest.java @@ -0,0 +1,66 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import io.reactivex.Completable; +import io.reactivex.Maybe; +import org.junit.Test; + +public class CompletableAndThenTest { + @Test(expected = NullPointerException.class) + public void andThenMaybeNull() { + Completable.complete() + .andThen((Maybe) null); + } + + @Test + public void andThenMaybeCompleteValue() { + Completable.complete() + .andThen(Maybe.just(1)) + .test() + .assertResult(1); + } + + @Test + public void andThenMaybeCompleteError() { + Completable.complete() + .andThen(Maybe.error(new RuntimeException("test"))) + .test() + .assertNotComplete() + .assertNoValues() + .assertError(RuntimeException.class) + .assertErrorMessage("test"); + } + + @Test + public void andThenMaybeCompleteEmpty() { + Completable.complete() + .andThen(Maybe.empty()) + .test() + .assertNoValues() + .assertNoErrors() + .assertComplete(); + } + + @Test + public void andThenMaybeError() { + Completable.error(new RuntimeException("bla")) + .andThen(Maybe.empty()) + .test() + .assertNotComplete() + .assertNoValues() + .assertError(RuntimeException.class) + .assertErrorMessage("bla"); + } +}