diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index ffa77e5360..a620028844 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -46,7 +46,8 @@ public abstract class Completable implements CompletableSource { *
Scheduler:
*
{@code ambArray} does not operate by default on a particular {@link Scheduler}.
* - * @param sources the array of source Completables + * @param sources the array of source Completables. A subscription to each source will + * occur in the same order as in this array. * @return the new Completable instance * @throws NullPointerException if sources is null */ @@ -71,7 +72,8 @@ public static Completable ambArray(final CompletableSource... sources) { *
Scheduler:
*
{@code amb} does not operate by default on a particular {@link Scheduler}.
* - * @param sources the array of source Completables + * @param sources the array of source Completables. A subscription to each source will + * occur in the same order as in this Iterable. * @return the new Completable instance * @throws NullPointerException if sources is null */ @@ -776,7 +778,8 @@ public static Completable wrap(CompletableSource source) { *
Scheduler:
*
{@code ambWith} does not operate by default on a particular {@link Scheduler}.
* - * @param other the other Completable, not null + * @param other the other Completable, not null. A subscription to this provided source will occur after subscribing + * to the current source. * @return the new Completable instance * @throws NullPointerException if other is null */ diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 5d7c336005..63648ac9aa 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -78,7 +78,8 @@ public abstract class Flowable implements Publisher { * * @param the common element type * @param sources - * an Iterable of Publishers sources competing to react first + * an Iterable of Publishers sources competing to react first. A subscription to each Publisher will + * occur in the same order as in this Iterable. * @return a Flowable that emits the same sequence as whichever of the source Publishers first * emitted an item or sent a termination notification * @see ReactiveX operators documentation: Amb @@ -106,7 +107,8 @@ public static Flowable amb(Iterable> sou * * @param the common element type * @param sources - * an array of Publisher sources competing to react first + * an array of Publisher sources competing to react first. A subscription to each Publisher will + * occur in the same order as in this Iterable. * @return a Flowable that emits the same sequence as whichever of the source Publishers first * emitted an item or sent a termination notification * @see ReactiveX operators documentation: Amb @@ -5131,7 +5133,8 @@ public final Single all(Predicate predicate) { * * * @param other - * a Publisher competing to react first + * a Publisher competing to react first. A subscription to this provided Publisher will occur after subscribing + * to the current Publisher. * @return a Flowable that emits the same sequence as whichever of the source Publishers first * emitted an item or sent a termination notification * @see ReactiveX operators documentation: Amb diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 7fef4bf868..a538ef5550 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -54,7 +54,8 @@ public abstract class Maybe implements MaybeSource { *
{@code amb} does not operate by default on a particular {@link Scheduler}.
* * @param the value type - * @param sources the Iterable sequence of sources + * @param sources the Iterable sequence of sources. A subscription to each source will + * occur in the same order as in the Iterable. * @return the new Maybe instance */ @CheckReturnValue @@ -72,7 +73,8 @@ public static Maybe amb(final Iterable *
{@code ambArray} does not operate by default on a particular {@link Scheduler}.
* * @param the value type - * @param sources the array of sources + * @param sources the array of sources. A subscription to each source will + * occur in the same order as in the array. * @return the new Maybe instance */ @CheckReturnValue @@ -1966,7 +1968,8 @@ public static Maybe zipArray(Function z * * * @param other - * a MaybeSource competing to react first + * a MaybeSource competing to react first. A subscription to this provided source will occur after + * subscribing to the current source. * @return a Maybe that emits the same sequence as whichever of the source MaybeSources first * signalled * @see ReactiveX operators documentation: Amb diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 059612d444..6c8c6dadd5 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -69,7 +69,8 @@ public abstract class Observable implements ObservableSource { * * @param the common element type * @param sources - * an Iterable of ObservableSources sources competing to react first + * an Iterable of ObservableSource sources competing to react first. A subscription to each source will + * occur in the same order as in the Iterable. * @return an Observable that emits the same sequence as whichever of the source ObservableSources first * emitted an item or sent a termination notification * @see ReactiveX operators documentation: Amb @@ -93,7 +94,8 @@ public static Observable amb(Iterable the common element type * @param sources - * an array of ObservableSource sources competing to react first + * an array of ObservableSource sources competing to react first. A subscription to each source will + * occur in the same order as in the array. * @return an Observable that emits the same sequence as whichever of the source ObservableSources first * emitted an item or sent a termination notification * @see ReactiveX operators documentation: Amb @@ -4547,7 +4549,8 @@ public final Single all(Predicate predicate) { * * * @param other - * an ObservableSource competing to react first + * an ObservableSource competing to react first. A subscription to this provided source will occur after + * subscribing to the current source. * @return an Observable that emits the same sequence as whichever of the source ObservableSources first * emitted an item or sent a termination notification * @see ReactiveX operators documentation: Amb diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index d8d8ce0406..a15fb2a5b7 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -67,7 +67,8 @@ public abstract class Single implements SingleSource { *
{@code amb} does not operate by default on a particular {@link Scheduler}.
* * @param the value type - * @param sources the Iterable sequence of sources + * @param sources the Iterable sequence of sources. A subscription to each source will + * occur in the same order as in this Iterable. * @return the new Single instance * @since 2.0 */ @@ -86,7 +87,8 @@ public static Single amb(final Iterable{@code ambArray} does not operate by default on a particular {@link Scheduler}. * * @param the value type - * @param sources the array of sources + * @param sources the array of sources. A subscription to each source will + * occur in the same order as in this array. * @return the new Single instance * @since 2.0 */ @@ -1493,7 +1495,8 @@ public static Single zipArray(Function *
{@code ambWith} does not operate by default on a particular {@link Scheduler}.
* * @param other the other SingleSource to race for the first emission of success or error - * @return the new Single instance + * @return the new Single instance. A subscription to this provided source will occur after subscribing + * to the current source. * @since 2.0 */ @CheckReturnValue diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java index 3cfb1c89b7..b7ea9efc64 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java @@ -145,4 +145,22 @@ public void run() { } } + @Test + public void ambWithOrder() { + Completable error = Completable.error(new RuntimeException()); + Completable.complete().ambWith(error).test().assertComplete(); + } + + @Test + public void ambIterableOrder() { + Completable error = Completable.error(new RuntimeException()); + Completable.amb(Arrays.asList(Completable.complete(), error)).test().assertComplete(); + } + + @Test + public void ambArrayOrder() { + Completable error = Completable.error(new RuntimeException()); + Completable.ambArray(Completable.complete(), error).test().assertComplete(); + } + } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableAmbTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableAmbTest.java index d743a3a9d0..b4dfc11446 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableAmbTest.java @@ -697,4 +697,24 @@ public Flowable apply(Integer v) throws Exception { .test() .assertFailureAndMessage(TestException.class, "next()"); } + + @Test + public void ambWithOrder() { + Flowable error = Flowable.error(new RuntimeException()); + Flowable.just(1).ambWith(error).test().assertValue(1).assertComplete(); + } + + @SuppressWarnings("unchecked") + @Test + public void ambIterableOrder() { + Flowable error = Flowable.error(new RuntimeException()); + Flowable.amb(Arrays.asList(Flowable.just(1), error)).test().assertValue(1).assertComplete(); + } + + @SuppressWarnings("unchecked") + @Test + public void ambArrayOrder() { + Flowable error = Flowable.error(new RuntimeException()); + Flowable.ambArray(Flowable.just(1), error).test().assertValue(1).assertComplete(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java index 3179cff4df..5018db3c26 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java @@ -363,4 +363,24 @@ public void run() { } } } + + @Test + public void ambWithOrder() { + Observable error = Observable.error(new RuntimeException()); + Observable.just(1).ambWith(error).test().assertValue(1).assertComplete(); + } + + @SuppressWarnings("unchecked") + @Test + public void ambIterableOrder() { + Observable error = Observable.error(new RuntimeException()); + Observable.amb(Arrays.asList(Observable.just(1), error)).test().assertValue(1).assertComplete(); + } + + @SuppressWarnings("unchecked") + @Test + public void ambArrayOrder() { + Observable error = Observable.error(new RuntimeException()); + Observable.ambArray(Observable.just(1), error).test().assertValue(1).assertComplete(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java index 7c25a54fda..41aa56255b 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java @@ -261,4 +261,24 @@ public void manySources() { .test() .assertResult(31); } + + @Test + public void ambWithOrder() { + Single error = Single.error(new RuntimeException()); + Single.just(1).ambWith(error).test().assertValue(1); + } + + @SuppressWarnings("unchecked") + @Test + public void ambIterableOrder() { + Single error = Single.error(new RuntimeException()); + Single.amb(Arrays.asList(Single.just(1), error)).test().assertValue(1); + } + + @SuppressWarnings("unchecked") + @Test + public void ambArrayOrder() { + Single error = Single.error(new RuntimeException()); + Single.ambArray(Single.just(1), error).test().assertValue(1); + } } diff --git a/src/test/java/io/reactivex/maybe/MaybeTest.java b/src/test/java/io/reactivex/maybe/MaybeTest.java index cba1ba4dce..e5e1a1b644 100644 --- a/src/test/java/io/reactivex/maybe/MaybeTest.java +++ b/src/test/java/io/reactivex/maybe/MaybeTest.java @@ -1580,6 +1580,26 @@ public void ambArrayOne() { assertSame(Maybe.never(), Maybe.ambArray(Maybe.never())); } + @Test + public void ambWithOrder() { + Maybe error = Maybe.error(new RuntimeException()); + Maybe.just(1).ambWith(error).test().assertValue(1); + } + + @SuppressWarnings("unchecked") + @Test + public void ambIterableOrder() { + Maybe error = Maybe.error(new RuntimeException()); + Maybe.amb(Arrays.asList(Maybe.just(1), error)).test().assertValue(1); + } + + @SuppressWarnings("unchecked") + @Test + public void ambArrayOrder() { + Maybe error = Maybe.error(new RuntimeException()); + Maybe.ambArray(Maybe.just(1), error).test().assertValue(1); + } + @SuppressWarnings("unchecked") @Test public void ambArray1SignalsSuccess() {