diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 6e7da40411..95e3d5b295 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -13,6 +13,7 @@ package io.reactivex; +import java.util.NoSuchElementException; import java.util.concurrent.*; import org.reactivestreams.Publisher; @@ -1035,8 +1036,9 @@ public static Single wrap(SingleSource source) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static Single zip(final Iterable> sources, Function zipper) { + ObjectHelper.requireNonNull(zipper, "zipper is null"); ObjectHelper.requireNonNull(sources, "sources is null"); - return toSingle(Flowable.zipIterable(SingleInternalHelper.iterableToFlowable(sources), zipper, false, 1)); + return RxJavaPlugins.onAssembly(new SingleZipIterable(sources, zipper)); } /** @@ -1475,17 +1477,13 @@ public static Single zip( */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - @SuppressWarnings({"rawtypes", "unchecked"}) public static Single zipArray(Function zipper, SingleSource... sources) { + ObjectHelper.requireNonNull(zipper, "zipper is null"); ObjectHelper.requireNonNull(sources, "sources is null"); - Publisher[] sourcePublishers = new Publisher[sources.length]; - int i = 0; - for (SingleSource s : sources) { - ObjectHelper.requireNonNull(s, "The " + i + "th source is null"); - sourcePublishers[i] = RxJavaPlugins.onAssembly(new SingleToFlowable(s)); - i++; + if (sources.length == 0) { + return error(new NoSuchElementException()); } - return toSingle(Flowable.zipArray(zipper, false, 1, sourcePublishers)); + return RxJavaPlugins.onAssembly(new SingleZipArray(sources, zipper)); } /** diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java index 81de82bd1f..5c1c852ee7 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java @@ -59,7 +59,13 @@ public R apply(T t) throws Exception { return; } - sources[i].subscribe(parent.observers[i]); + MaybeSource source = sources[i]; + + if (source == null) { + parent.innerError(new NullPointerException("One of the sources is null"), i); + return; + } + source.subscribe(parent.observers[i]); } } diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java index 846c0d6a3e..5ee11925cc 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java @@ -40,6 +40,10 @@ protected void subscribeActual(MaybeObserver observer) { try { for (MaybeSource source : sources) { + if (source == null) { + EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer); + return; + } if (n == a.length) { a = Arrays.copyOf(a, n + (n >> 2)); } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleMap.java b/src/main/java/io/reactivex/internal/operators/single/SingleMap.java index 71093ad848..cb3735c3ed 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleMap.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleMap.java @@ -30,30 +30,42 @@ public SingleMap(SingleSource source, Function t) { - source.subscribe(new SingleObserver() { - @Override - public void onSubscribe(Disposable d) { - t.onSubscribe(d); - } + source.subscribe(new MapSingleObserver(t, mapper)); + } - @Override - public void onSuccess(T value) { - R v; - try { - v = mapper.apply(value); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - onError(e); - return; - } - - t.onSuccess(v); - } + static final class MapSingleObserver implements SingleObserver { + + final SingleObserver t; + + final Function mapper; - @Override - public void onError(Throwable e) { - t.onError(e); + MapSingleObserver(SingleObserver t, Function mapper) { + this.t = t; + this.mapper = mapper; + } + + @Override + public void onSubscribe(Disposable d) { + t.onSubscribe(d); + } + + @Override + public void onSuccess(T value) { + R v; + try { + v = mapper.apply(value); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + onError(e); + return; } - }); + + t.onSuccess(v); + } + + @Override + public void onError(Throwable e) { + t.onError(e); + } } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleZipArray.java b/src/main/java/io/reactivex/internal/operators/single/SingleZipArray.java new file mode 100644 index 0000000000..2c02210637 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleZipArray.java @@ -0,0 +1,185 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.plugins.RxJavaPlugins; + +public final class SingleZipArray extends Single { + + final SingleSource[] sources; + + final Function zipper; + + public SingleZipArray(SingleSource[] sources, Function zipper) { + this.sources = sources; + this.zipper = zipper; + } + + @Override + protected void subscribeActual(SingleObserver observer) { + SingleSource[] sources = this.sources; + int n = sources.length; + + + if (n == 1) { + sources[0].subscribe(new SingleMap.MapSingleObserver(observer, new Function() { + @Override + public R apply(T t) throws Exception { + return zipper.apply(new Object[] { t }); + } + })); + return; + } + + ZipCoordinator parent = new ZipCoordinator(observer, n, zipper); + + observer.onSubscribe(parent); + + for (int i = 0; i < n; i++) { + if (parent.isDisposed()) { + return; + } + + SingleSource source = sources[i]; + + if (source == null) { + parent.innerError(new NullPointerException("One of the sources is null"), i); + return; + } + + source.subscribe(parent.observers[i]); + } + } + + static final class ZipCoordinator extends AtomicInteger implements Disposable { + + + private static final long serialVersionUID = -5556924161382950569L; + + final SingleObserver actual; + + final Function zipper; + + final ZipSingleObserver[] observers; + + final Object[] values; + + @SuppressWarnings("unchecked") + ZipCoordinator(SingleObserver observer, int n, Function zipper) { + super(n); + this.actual = observer; + this.zipper = zipper; + ZipSingleObserver[] o = new ZipSingleObserver[n]; + for (int i = 0; i < n; i++) { + o[i] = new ZipSingleObserver(this, i); + } + this.observers = o; + this.values = new Object[n]; + } + + @Override + public boolean isDisposed() { + return get() <= 0; + } + + @Override + public void dispose() { + if (getAndSet(0) > 0) { + for (ZipSingleObserver d : observers) { + d.dispose(); + } + } + } + + void innerSuccess(T value, int index) { + values[index] = value; + if (decrementAndGet() == 0) { + R v; + + try { + v = ObjectHelper.requireNonNull(zipper.apply(values), "The zipper returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + actual.onError(ex); + return; + } + + actual.onSuccess(v); + } + } + + void disposeExcept(int index) { + ZipSingleObserver[] observers = this.observers; + int n = observers.length; + for (int i = 0; i < index; i++) { + observers[i].dispose(); + } + for (int i = index + 1; i < n; i++) { + observers[i].dispose(); + } + } + + void innerError(Throwable ex, int index) { + if (getAndSet(0) > 0) { + disposeExcept(index); + actual.onError(ex); + } else { + RxJavaPlugins.onError(ex); + } + } + } + + static final class ZipSingleObserver + extends AtomicReference + implements SingleObserver { + + private static final long serialVersionUID = 3323743579927613702L; + + final ZipCoordinator parent; + + final int index; + + ZipSingleObserver(ZipCoordinator parent, int index) { + this.parent = parent; + this.index = index; + } + + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onSuccess(T value) { + parent.innerSuccess(value, index); + } + + @Override + public void onError(Throwable e) { + parent.innerError(e, index); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleZipIterable.java b/src/main/java/io/reactivex/internal/operators/single/SingleZipIterable.java new file mode 100644 index 0000000000..3ab5f7ec3f --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleZipIterable.java @@ -0,0 +1,86 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.*; + +import io.reactivex.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.internal.operators.single.SingleZipArray.ZipCoordinator; + +public final class SingleZipIterable extends Single { + + final Iterable> sources; + + final Function zipper; + + public SingleZipIterable(Iterable> sources, Function zipper) { + this.sources = sources; + this.zipper = zipper; + } + + @Override + protected void subscribeActual(SingleObserver observer) { + @SuppressWarnings("unchecked") + SingleSource[] a = new SingleSource[8]; + int n = 0; + + try { + for (SingleSource source : sources) { + if (source == null) { + EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer); + return; + } + if (n == a.length) { + a = Arrays.copyOf(a, n + (n >> 2)); + } + a[n++] = source; + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + EmptyDisposable.error(ex, observer); + return; + } + + if (n == 0) { + EmptyDisposable.error(new NoSuchElementException(), observer); + return; + } + + if (n == 1) { + a[0].subscribe(new SingleMap.MapSingleObserver(observer, new Function() { + @Override + public R apply(T t) throws Exception { + return zipper.apply(new Object[] { t }); + } + })); + return; + } + + ZipCoordinator parent = new ZipCoordinator(observer, n, zipper); + + observer.onSubscribe(parent); + + for (int i = 0; i < n; i++) { + if (parent.isDisposed()) { + return; + } + + a[i].subscribe(parent.observers[i]); + } + } + +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipArrayTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipArrayTest.java index f48b2bd7e1..29c4c0cd67 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipArrayTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipArrayTest.java @@ -15,7 +15,7 @@ import static org.junit.Assert.*; -import java.util.List; +import java.util.*; import org.junit.Test; @@ -150,4 +150,15 @@ public void run() { } } } + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void zipArrayOneIsNull() { + Maybe.zipArray(new Function() { + @Override + public Object apply(Object[] v) { + return 1; + } + }, Maybe.just(1), null) + .blockingGet(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipIterableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipIterableTest.java index ad5f746ea9..ca5b15501c 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipIterableTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipIterableTest.java @@ -188,4 +188,28 @@ public Maybe apply(Integer v) throws Exception { .test() .assertFailureAndMessage(TestException.class, "next()"); } + + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void zipIterableOneIsNull() { + Maybe.zip(Arrays.asList(null, Maybe.just(1)), new Function() { + @Override + public Object apply(Object[] v) { + return 1; + } + }) + .blockingGet(); + } + + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void zipIterableTwoIsNull() { + Maybe.zip(Arrays.asList(Maybe.just(1), null), new Function() { + @Override + public Object apply(Object[] v) { + return 1; + } + }) + .blockingGet(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleZipArrayTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleZipArrayTest.java new file mode 100644 index 0000000000..fe3e7e7f77 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleZipArrayTest.java @@ -0,0 +1,190 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import static org.junit.Assert.*; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; + +public class SingleZipArrayTest { + + final BiFunction addString = new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return "" + a + b; + } + }; + + + final Function3 addString3 = new Function3() { + @Override + public Object apply(Object a, Object b, Object c) throws Exception { + return "" + a + b + c; + } + }; + + @Test + public void firstError() { + Single.zip(Single.error(new TestException()), Single.just(1), addString) + .test() + .assertFailure(TestException.class); + } + + @Test + public void secondError() { + Single.zip(Single.just(1), Single.error(new TestException()), addString) + .test() + .assertFailure(TestException.class); + } + + @Test + public void dispose() { + PublishProcessor pp = PublishProcessor.create(); + + TestObserver to = Single.zip(pp.single(0), pp.single(0), addString) + .test(); + + assertTrue(pp.hasSubscribers()); + + to.cancel(); + + assertFalse(pp.hasSubscribers()); + } + + @Test + public void zipperThrows() { + Single.zip(Single.just(1), Single.just(2), new BiFunction() { + @Override + public Object apply(Integer a, Integer b) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void zipperReturnsNull() { + Single.zip(Single.just(1), Single.just(2), new BiFunction() { + @Override + public Object apply(Integer a, Integer b) throws Exception { + return null; + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void middleError() { + PublishProcessor pp0 = PublishProcessor.create(); + PublishProcessor pp1 = PublishProcessor.create(); + + TestObserver to = Single.zip(pp0.single(0), pp1.single(0), pp0.single(0), addString3) + .test(); + + pp1.onError(new TestException()); + + assertFalse(pp0.hasSubscribers()); + + to.assertFailure(TestException.class); + } + + @Test + public void innerErrorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp0 = PublishProcessor.create(); + final PublishProcessor pp1 = PublishProcessor.create(); + + final TestObserver to = Single.zip(pp0.single(0), pp1.single(0), addString) + .test(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp0.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + pp1.onError(ex); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertFailure(TestException.class); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void zipArrayOneIsNull() { + Single.zipArray(new Function() { + @Override + public Object apply(Object[] v) { + return 1; + } + }, Single.just(1), null) + .blockingGet(); + } + + @SuppressWarnings("unchecked") + @Test + public void emptyArray() { + Single.zipArray(new Function() { + @Override + public Object[] apply(Object[] a) throws Exception { + return a; + } + }, new SingleSource[0]) + .test() + .assertFailure(NoSuchElementException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void oneArray() { + Single.zipArray(new Function() { + @Override + public Object apply(Object[] a) throws Exception { + return (Integer)a[0] + 1; + } + }, Single.just(1)) + .test() + .assertResult(2); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleZipIterableTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleZipIterableTest.java new file mode 100644 index 0000000000..5b8f00d1b0 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleZipIterableTest.java @@ -0,0 +1,239 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import static org.junit.Assert.*; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.internal.util.CrashingMappedIterable; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; + +public class SingleZipIterableTest { + + final Function addString = new Function() { + @Override + public Object apply(Object[] a) throws Exception { + return Arrays.toString(a); + } + }; + + @SuppressWarnings("unchecked") + @Test + public void firstError() { + Single.zip(Arrays.asList(Single.error(new TestException()), Single.just(1)), addString) + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void secondError() { + Single.zip(Arrays.asList(Single.just(1), Single.error(new TestException())), addString) + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void dispose() { + PublishProcessor pp = PublishProcessor.create(); + + TestObserver to = Single.zip(Arrays.asList(pp.single(0), pp.single(0)), addString) + .test(); + + assertTrue(pp.hasSubscribers()); + + to.cancel(); + + assertFalse(pp.hasSubscribers()); + } + + @SuppressWarnings("unchecked") + @Test + public void zipperThrows() { + Single.zip(Arrays.asList(Single.just(1), Single.just(2)), new Function() { + @Override + public Object apply(Object[] b) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void zipperReturnsNull() { + Single.zip(Arrays.asList(Single.just(1), Single.just(2)), new Function() { + @Override + public Object apply(Object[] a) throws Exception { + return null; + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void middleError() { + PublishProcessor pp0 = PublishProcessor.create(); + PublishProcessor pp1 = PublishProcessor.create(); + + TestObserver to = Single.zip( + Arrays.asList(pp0.single(0), pp1.single(0), pp0.single(0)), addString) + .test(); + + pp1.onError(new TestException()); + + assertFalse(pp0.hasSubscribers()); + + to.assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void innerErrorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp0 = PublishProcessor.create(); + final PublishProcessor pp1 = PublishProcessor.create(); + + final TestObserver to = Single.zip( + Arrays.asList(pp0.single(0), pp1.single(0)), addString) + .test(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp0.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + pp1.onError(ex); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertFailure(TestException.class); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void iteratorThrows() { + Single.zip(new CrashingMappedIterable>(1, 100, 100, new Function>() { + @Override + public Single apply(Integer v) throws Exception { + return Single.just(v); + } + }), addString) + .test() + .assertFailureAndMessage(TestException.class, "iterator()"); + } + + @Test + public void hasNextThrows() { + Single.zip(new CrashingMappedIterable>(100, 20, 100, new Function>() { + @Override + public Single apply(Integer v) throws Exception { + return Single.just(v); + } + }), addString) + .test() + .assertFailureAndMessage(TestException.class, "hasNext()"); + } + + @Test + public void nextThrows() { + Single.zip(new CrashingMappedIterable>(100, 100, 5, new Function>() { + @Override + public Single apply(Integer v) throws Exception { + return Single.just(v); + } + }), addString) + .test() + .assertFailureAndMessage(TestException.class, "next()"); + } + + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void zipIterableOneIsNull() { + Single.zip(Arrays.asList(null, Single.just(1)), new Function() { + @Override + public Object apply(Object[] v) { + return 1; + } + }) + .blockingGet(); + } + + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void zipIterableTwoIsNull() { + Single.zip(Arrays.asList(Single.just(1), null), new Function() { + @Override + public Object apply(Object[] v) { + return 1; + } + }) + .blockingGet(); + } + + @Test + public void emptyIterable() { + Single.zip(Collections.>emptyList(), new Function() { + @Override + public Object[] apply(Object[] a) throws Exception { + return a; + } + }) + .test() + .assertFailure(NoSuchElementException.class); + } + + @Test + public void oneIterable() { + Single.zip(Collections.singleton(Single.just(1)), new Function() { + @Override + public Object apply(Object[] a) throws Exception { + return (Integer)a[0] + 1; + } + }) + .test() + .assertResult(2); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleZipTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleZipTest.java index 50aed05a70..e60e8f517e 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleZipTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleZipTest.java @@ -13,6 +13,10 @@ package io.reactivex.internal.operators.single; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; import org.junit.Test; import io.reactivex.Single; @@ -135,4 +139,50 @@ public Object apply(Integer a, Integer b, Integer c, Integer d, Integer e, Integ .assertResult("123456789"); } + @Test + public void noDisposeOnAllSuccess() { + final AtomicInteger counter = new AtomicInteger(); + + Single source = Single.just(1).doOnDispose(new Action() { + @Override + public void run() throws Exception { + counter.getAndIncrement(); + } + }); + + Single.zip(source, source, new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .test() + .assertResult(2); + + assertEquals(0, counter.get()); + } + + @SuppressWarnings("unchecked") + @Test + public void noDisposeOnAllSuccess2() { + final AtomicInteger counter = new AtomicInteger(); + + Single source = Single.just(1).doOnDispose(new Action() { + @Override + public void run() throws Exception { + counter.getAndIncrement(); + } + }); + + Single.zip(Arrays.asList(source, source), new Function() { + @Override + public Integer apply(Object[] o) throws Exception { + return (Integer)o[0] + (Integer)o[1]; + } + }) + .test() + .assertResult(2); + + assertEquals(0, counter.get()); + } } diff --git a/src/test/java/io/reactivex/single/SingleNullTests.java b/src/test/java/io/reactivex/single/SingleNullTests.java index 903907eb58..405054243c 100644 --- a/src/test/java/io/reactivex/single/SingleNullTests.java +++ b/src/test/java/io/reactivex/single/SingleNullTests.java @@ -530,6 +530,18 @@ public Object apply(Object[] v) { }, (Single[])null); } + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void zipIterableTwoIsNull() { + Single.zip(Arrays.asList(just1, null), new Function() { + @Override + public Object apply(Object[] v) { + return 1; + } + }) + .blockingGet(); + } + @SuppressWarnings("unchecked") @Test(expected = NullPointerException.class) public void zipArrayOneIsNull() { @@ -538,7 +550,8 @@ public void zipArrayOneIsNull() { public Object apply(Object[] v) { return 1; } - }, just1, null); + }, just1, null) + .blockingGet(); } @SuppressWarnings("unchecked")