Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -2701,6 +2701,32 @@ public final <R> Single<R> flatMapSingle(final Function<? super T, ? extends Sin
return RxJavaPlugins.onAssembly(new MaybeFlatMapSingle<T, R>(this, mapper));
}

/**
* Returns a {@link Maybe} based on applying a specified function to the item emitted by the
* source {@link Maybe}, where that function returns a {@link Single}.
* When this Maybe just completes the resulting {@code Maybe} completes as well.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMapSingle.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapSingleElement} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper
* a function that, when applied to the item emitted by the source Maybe, returns a
* Single
* @return the new Maybe instance
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 2.0.2 - experimental
*/
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final <R> Maybe<R> flatMapSingleElement(final Function<? super T, ? extends SingleSource<? extends R>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new MaybeFlatMapSingleElement<T, R>(this, mapper));
}

/**
* Returns a {@link Completable} that completes based on applying a specified function to the item emitted by the
* source {@link Maybe}, where that function returns a {@link Completable}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.*;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;

/**
* Maps the success value of the source MaybeSource into a Single.
* @param <T> the input value type
* @param <R> the result value type
*
* @since 2.0.2 - experimental
*/
@Experimental
public final class MaybeFlatMapSingleElement<T, R> extends Maybe<R> {

final MaybeSource<T> source;

final Function<? super T, ? extends SingleSource<? extends R>> mapper;

public MaybeFlatMapSingleElement(MaybeSource<T> source, Function<? super T, ? extends SingleSource<? extends R>> mapper) {
this.source = source;
this.mapper = mapper;
}

@Override
protected void subscribeActual(MaybeObserver<? super R> actual) {
source.subscribe(new FlatMapMaybeObserver<T, R>(actual, mapper));
}

static final class FlatMapMaybeObserver<T, R>
extends AtomicReference<Disposable>
implements MaybeObserver<T>, Disposable {

private static final long serialVersionUID = 4827726964688405508L;

final MaybeObserver<? super R> actual;

final Function<? super T, ? extends SingleSource<? extends R>> mapper;

FlatMapMaybeObserver(MaybeObserver<? super R> actual, Function<? super T, ? extends SingleSource<? extends R>> mapper) {
this.actual = actual;
this.mapper = mapper;
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
actual.onSubscribe(this);
}
}

@Override
public void onSuccess(T value) {
SingleSource<? extends R> ss;

try {
ss = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null SingleSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
onError(ex);
return;
}

ss.subscribe(new FlatMapSingleObserver<R>(this, actual));
}

@Override
public void onError(Throwable e) {
actual.onError(e);
}

@Override
public void onComplete() {
actual.onComplete();
}
}

static final class FlatMapSingleObserver<R> implements SingleObserver<R> {

final AtomicReference<Disposable> parent;

final MaybeObserver<? super R> actual;

FlatMapSingleObserver(AtomicReference<Disposable> parent, MaybeObserver<? super R> actual) {
this.parent = parent;
this.actual = actual;
}

@Override
public void onSubscribe(final Disposable d) {
DisposableHelper.replace(parent, d);
}

@Override
public void onSuccess(final R value) {
actual.onSuccess(value);
}

@Override
public void onError(final Throwable e) {
actual.onError(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import org.junit.Test;

import io.reactivex.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Function;

public class MaybeFlatMapSingleElementTest {
@Test(expected = NullPointerException.class)
public void flatMapSingleElementNull() {
Maybe.just(1)
.flatMapSingleElement(null);
}

@Test
public void flatMapSingleElementValue() {
Maybe.just(1).flatMapSingleElement(new Function<Integer, SingleSource<Integer>>() {
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
if (integer == 1) {
return Single.just(2);
}

return Single.just(1);
}
})
.test()
.assertResult(2);
}

@Test
public void flatMapSingleElementValueDifferentType() {
Maybe.just(1).flatMapSingleElement(new Function<Integer, SingleSource<String>>() {
@Override public SingleSource<String> apply(final Integer integer) throws Exception {
if (integer == 1) {
return Single.just("2");
}

return Single.just("1");
}
})
.test()
.assertResult("2");
}

@Test
public void flatMapSingleElementValueNull() {
Maybe.just(1).flatMapSingleElement(new Function<Integer, SingleSource<Integer>>() {
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
return null;
}
})
.test()
.assertNoValues()
.assertError(NullPointerException.class)
.assertErrorMessage("The mapper returned a null SingleSource");
}

@Test
public void flatMapSingleElementValueErrorThrown() {
Maybe.just(1).flatMapSingleElement(new Function<Integer, SingleSource<Integer>>() {
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
throw new RuntimeException("something went terribly wrong!");
}
})
.test()
.assertNoValues()
.assertError(RuntimeException.class)
.assertErrorMessage("something went terribly wrong!");
}

@Test
public void flatMapSingleElementError() {
RuntimeException exception = new RuntimeException("test");

Maybe.error(exception).flatMapSingleElement(new Function<Object, SingleSource<Object>>() {
@Override public SingleSource<Object> apply(final Object integer) throws Exception {
return Single.just(new Object());
}
})
.test()
.assertError(exception);
}

@Test
public void flatMapSingleElementEmpty() {
Maybe.<Integer>empty().flatMapSingleElement(new Function<Integer, SingleSource<Integer>>() {
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
return Single.just(2);
}
})
.test()
.assertNoValues()
.assertResult();
}

@Test
public void dispose() {
TestHelper.checkDisposed(Maybe.just(1).flatMapSingleElement(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(final Integer integer) throws Exception {
return Single.just(2);
}
}));
}

@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeMaybe(new Function<Maybe<Integer>, Maybe<Integer>>() {
@Override
public Maybe<Integer> apply(Maybe<Integer> m) throws Exception {
return m.flatMapSingleElement(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(final Integer integer) throws Exception {
return Single.just(2);
}
});
}
});
}

@Test
public void singleErrors() {
Maybe.just(1)
.flatMapSingleElement(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(final Integer integer) throws Exception {
return Single.error(new TestException());
}
})
.test()
.assertFailure(TestException.class);
}
}