From 739aad88b525e5bc64799ae013472781dc2007a6 Mon Sep 17 00:00:00 2001 From: Niklas Baudy Date: Tue, 27 Sep 2016 16:10:22 +0200 Subject: [PATCH] 2.x: Add Single.flatMapMaybe --- src/main/java/io/reactivex/Single.java | 30 +++- .../operators/single/SingleFlatMapMaybe.java | 128 ++++++++++++++++++ .../single/SingleFlatMapMaybeTest.java | 97 +++++++++++++ 3 files changed, 251 insertions(+), 4 deletions(-) create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleFlatMapMaybe.java create mode 100644 src/test/java/io/reactivex/internal/operators/single/SingleFlatMapMaybeTest.java diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 3fa7864d77..c20c155103 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -1817,7 +1817,7 @@ public final Maybe filter(Predicate predicate) { * @param the result value type * @param mapper * a function that, when applied to the item emitted by the source Single, returns a SingleSource - * @return the Single returned from {@code func} when applied to the item emitted by the source Single + * @return the Single returned from {@code mapper} when applied to the item emitted by the source Single * @see ReactiveX operators documentation: FlatMap */ @SchedulerSupport(SchedulerSupport.NONE) @@ -1826,6 +1826,28 @@ public final Single flatMap(Function(this, mapper)); } + /** + * Returns a Maybe that is based on applying a specified function to the item emitted by the source Single, + * where that function returns a MaybeSource. + *

+ * + *

+ *
Scheduler:
+ *
{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the result value type + * @param mapper + * a function that, when applied to the item emitted by the source Single, returns a MaybeSource + * @return the Maybe returned from {@code mapper} when applied to the item emitted by the source Single + * @see ReactiveX operators documentation: FlatMap + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Maybe flatMapMaybe(final Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new SingleFlatMapMaybe(this, mapper)); + } + /** * Returns a Flowable that emits items based on applying a specified function to the item emitted by the * source Single, where that function returns a Publisher. @@ -1853,7 +1875,7 @@ public final Flowable flatMapPublisher(Function * @@ -1864,8 +1886,8 @@ public final Flowable flatMapPublisher(Function the result value type * @param mapper - * a function that, when applied to the item emitted by the source Single, returns a SingleSource - * @return the Single returned from {@code func} when applied to the item emitted by the source Single + * a function that, when applied to the item emitted by the source Single, returns an ObservableSource + * @return the Observable returned from {@code func} when applied to the item emitted by the source Single * @see ReactiveX operators documentation: FlatMap */ @SchedulerSupport(SchedulerSupport.NONE) diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapMaybe.java new file mode 100644 index 0000000000..6cc1c54371 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapMaybe.java @@ -0,0 +1,128 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.Maybe; +import io.reactivex.MaybeObserver; +import io.reactivex.MaybeSource; +import io.reactivex.SingleObserver; +import io.reactivex.SingleSource; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import java.util.concurrent.atomic.AtomicReference; + +public final class SingleFlatMapMaybe extends Maybe { + + final SingleSource source; + + final Function> mapper; + + public SingleFlatMapMaybe(SingleSource source, Function> mapper) { + this.mapper = mapper; + this.source = source; + } + + @Override + protected void subscribeActual(MaybeObserver actual) { + source.subscribe(new FlatMapSingleObserver(actual, mapper)); + } + + static final class FlatMapSingleObserver + extends AtomicReference + implements SingleObserver, Disposable { + + private static final long serialVersionUID = -5843758257109742742L; + + final MaybeObserver actual; + + final Function> mapper; + + FlatMapSingleObserver(MaybeObserver actual, Function> mapper) { + this.actual = actual; + this.mapper = mapper; + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.setOnce(this, d)) { + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + MaybeSource ms; + + try { + ms = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null MaybeSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + onError(ex); + return; + } + + ms.subscribe(new FlatMapMaybeObserver(this, actual)); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + } + + static final class FlatMapMaybeObserver implements MaybeObserver { + + final AtomicReference parent; + + final MaybeObserver actual; + + FlatMapMaybeObserver(AtomicReference parent, MaybeObserver actual) { + this.parent = parent; + this.actual = actual; + } + + @Override + public void onSubscribe(final Disposable d) { + DisposableHelper.replace(parent, d); + } + + @Override + public void onSuccess(final R value) { + actual.onSuccess(value); + } + + @Override + public void onError(final Throwable e) { + actual.onError(e); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapMaybeTest.java new file mode 100644 index 0000000000..4613d81968 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapMaybeTest.java @@ -0,0 +1,97 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.Maybe; +import io.reactivex.MaybeSource; +import io.reactivex.Single; +import io.reactivex.functions.Function; +import org.junit.Test; + +public class SingleFlatMapMaybeTest { + @Test(expected = NullPointerException.class) + public void flatMapMaybeNull() { + Single.just(1) + .flatMapMaybe(null); + } + + @Test + public void flatMapMaybeValue() { + Single.just(1).flatMapMaybe(new Function>() { + @Override public MaybeSource apply(final Integer integer) throws Exception { + if (integer == 1) { + return Maybe.just(2); + } + + return Maybe.just(1); + } + }) + .test() + .assertResult(2); + } + + @Test + public void flatMapMaybeValueDifferentType() { + Single.just(1).flatMapMaybe(new Function>() { + @Override public MaybeSource apply(final Integer integer) throws Exception { + if (integer == 1) { + return Maybe.just("2"); + } + + return Maybe.just("1"); + } + }) + .test() + .assertResult("2"); + } + + @Test + public void flatMapMaybeValueNull() { + Single.just(1).flatMapMaybe(new Function>() { + @Override public MaybeSource apply(final Integer integer) throws Exception { + return null; + } + }) + .test() + .assertNoValues() + .assertError(NullPointerException.class) + .assertErrorMessage("The mapper returned a null MaybeSource"); + } + + @Test + public void flatMapMaybeValueErrorThrown() { + Single.just(1).flatMapMaybe(new Function>() { + @Override public MaybeSource apply(final Integer integer) throws Exception { + throw new RuntimeException("something went terribly wrong!"); + } + }) + .test() + .assertNoValues() + .assertError(RuntimeException.class) + .assertErrorMessage("something went terribly wrong!"); + } + + @Test + public void flatMapMaybeError() { + RuntimeException exception = new RuntimeException("test"); + + Single.error(exception).flatMapMaybe(new Function>() { + @Override public MaybeSource apply(final Object integer) throws Exception { + return Maybe.just(new Object()); + } + }) + .test() + .assertError(exception); + } +}