diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 5ad99eea94..fbb5bd0dca 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -2701,6 +2701,32 @@ public final Single flatMapSingle(final Function(this, mapper)); } + /** + * Returns a {@link Maybe} based on applying a specified function to the item emitted by the + * source {@link Maybe}, where that function returns a {@link Single}. + * When this Maybe just completes the resulting {@code Maybe} completes as well. + *

+ * + *

+ *
Scheduler:
+ *
{@code flatMapSingleElement} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the result value type + * @param mapper + * a function that, when applied to the item emitted by the source Maybe, returns a + * Single + * @return the new Maybe instance + * @see ReactiveX operators documentation: FlatMap + * @since 2.0.2 - experimental + */ + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Maybe flatMapSingleElement(final Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new MaybeFlatMapSingleElement(this, mapper)); + } + /** * Returns a {@link Completable} that completes based on applying a specified function to the item emitted by the * source {@link Maybe}, where that function returns a {@link Completable}. diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingleElement.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingleElement.java new file mode 100644 index 0000000000..d39fde2d73 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingleElement.java @@ -0,0 +1,134 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; + +/** + * Maps the success value of the source MaybeSource into a Single. + * @param the input value type + * @param the result value type + * + * @since 2.0.2 - experimental + */ +@Experimental +public final class MaybeFlatMapSingleElement extends Maybe { + + final MaybeSource source; + + final Function> mapper; + + public MaybeFlatMapSingleElement(MaybeSource source, Function> mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(MaybeObserver actual) { + source.subscribe(new FlatMapMaybeObserver(actual, mapper)); + } + + static final class FlatMapMaybeObserver + extends AtomicReference + implements MaybeObserver, Disposable { + + private static final long serialVersionUID = 4827726964688405508L; + + final MaybeObserver actual; + + final Function> mapper; + + FlatMapMaybeObserver(MaybeObserver actual, Function> mapper) { + this.actual = actual; + this.mapper = mapper; + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.setOnce(this, d)) { + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + SingleSource ss; + + try { + ss = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null SingleSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + onError(ex); + return; + } + + ss.subscribe(new FlatMapSingleObserver(this, actual)); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + } + + static final class FlatMapSingleObserver implements SingleObserver { + + final AtomicReference parent; + + final MaybeObserver actual; + + FlatMapSingleObserver(AtomicReference parent, MaybeObserver actual) { + this.parent = parent; + this.actual = actual; + } + + @Override + public void onSubscribe(final Disposable d) { + DisposableHelper.replace(parent, d); + } + + @Override + public void onSuccess(final R value) { + actual.onSuccess(value); + } + + @Override + public void onError(final Throwable e) { + actual.onError(e); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingleElementTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingleElementTest.java new file mode 100644 index 0000000000..f4318c52ca --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingleElementTest.java @@ -0,0 +1,147 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; + +public class MaybeFlatMapSingleElementTest { + @Test(expected = NullPointerException.class) + public void flatMapSingleElementNull() { + Maybe.just(1) + .flatMapSingleElement(null); + } + + @Test + public void flatMapSingleElementValue() { + Maybe.just(1).flatMapSingleElement(new Function>() { + @Override public SingleSource apply(final Integer integer) throws Exception { + if (integer == 1) { + return Single.just(2); + } + + return Single.just(1); + } + }) + .test() + .assertResult(2); + } + + @Test + public void flatMapSingleElementValueDifferentType() { + Maybe.just(1).flatMapSingleElement(new Function>() { + @Override public SingleSource apply(final Integer integer) throws Exception { + if (integer == 1) { + return Single.just("2"); + } + + return Single.just("1"); + } + }) + .test() + .assertResult("2"); + } + + @Test + public void flatMapSingleElementValueNull() { + Maybe.just(1).flatMapSingleElement(new Function>() { + @Override public SingleSource apply(final Integer integer) throws Exception { + return null; + } + }) + .test() + .assertNoValues() + .assertError(NullPointerException.class) + .assertErrorMessage("The mapper returned a null SingleSource"); + } + + @Test + public void flatMapSingleElementValueErrorThrown() { + Maybe.just(1).flatMapSingleElement(new Function>() { + @Override public SingleSource apply(final Integer integer) throws Exception { + throw new RuntimeException("something went terribly wrong!"); + } + }) + .test() + .assertNoValues() + .assertError(RuntimeException.class) + .assertErrorMessage("something went terribly wrong!"); + } + + @Test + public void flatMapSingleElementError() { + RuntimeException exception = new RuntimeException("test"); + + Maybe.error(exception).flatMapSingleElement(new Function>() { + @Override public SingleSource apply(final Object integer) throws Exception { + return Single.just(new Object()); + } + }) + .test() + .assertError(exception); + } + + @Test + public void flatMapSingleElementEmpty() { + Maybe.empty().flatMapSingleElement(new Function>() { + @Override public SingleSource apply(final Integer integer) throws Exception { + return Single.just(2); + } + }) + .test() + .assertNoValues() + .assertResult(); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Maybe.just(1).flatMapSingleElement(new Function>() { + @Override + public SingleSource apply(final Integer integer) throws Exception { + return Single.just(2); + } + })); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(new Function, Maybe>() { + @Override + public Maybe apply(Maybe m) throws Exception { + return m.flatMapSingleElement(new Function>() { + @Override + public SingleSource apply(final Integer integer) throws Exception { + return Single.just(2); + } + }); + } + }); + } + + @Test + public void singleErrors() { + Maybe.just(1) + .flatMapSingleElement(new Function>() { + @Override + public SingleSource apply(final Integer integer) throws Exception { + return Single.error(new TestException()); + } + }) + .test() + .assertFailure(TestException.class); + } +}