diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 27ba8c954d..0dad7ebcc9 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -140,6 +140,11 @@ public static int bufferSize() { * Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of * the source Publishers each time an item is received from any of the source Publishers, where this * aggregation is defined by a specified function. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *

*
Backpressure:
*
The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s @@ -163,7 +168,7 @@ public static int bufferSize() { */ @SchedulerSupport(SchedulerSupport.NONE) @BackpressureSupport(BackpressureKind.FULL) - public static Flowable combineLatest(Publisher[] sources, Function combiner) { + public static Flowable combineLatest(Publisher[] sources, Function combiner) { return combineLatest(sources, combiner, bufferSize()); } @@ -171,6 +176,11 @@ public static Flowable combineLatest(Publisher[] sources, * Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of * the source Publishers each time an item is received from any of the source Publishers, where this * aggregation is defined by a specified function. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *

*
Backpressure:
*
The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s @@ -194,7 +204,7 @@ public static Flowable combineLatest(Publisher[] sources, */ @SchedulerSupport(SchedulerSupport.NONE) @BackpressureSupport(BackpressureKind.FULL) - public static Flowable combineLatest(Function combiner, Publisher... sources) { + public static Flowable combineLatest(Function combiner, Publisher... sources) { return combineLatest(sources, combiner, bufferSize()); } @@ -202,6 +212,11 @@ public static Flowable combineLatest(Function + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *
*
Backpressure:
*
The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s @@ -227,7 +242,7 @@ public static Flowable combineLatest(Function Flowable combineLatest(Publisher[] sources, Function combiner, int bufferSize) { + public static Flowable combineLatest(Publisher[] sources, Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); if (sources.length == 0) { return empty(); @@ -241,6 +256,11 @@ public static Flowable combineLatest(Publisher[] sources, * Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of * the source Publishers each time an item is received from any of the source Publishers, where this * aggregation is defined by a specified function. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *

*
Backpressure:
*
The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s @@ -265,7 +285,7 @@ public static Flowable combineLatest(Publisher[] sources, @SchedulerSupport(SchedulerSupport.NONE) @BackpressureSupport(BackpressureKind.FULL) public static Flowable combineLatest(Iterable> sources, - Function combiner) { + Function combiner) { return combineLatest(sources, combiner, bufferSize()); } @@ -273,6 +293,11 @@ public static Flowable combineLatest(Iterable + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *
*
Backpressure:
*
The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s @@ -299,7 +324,7 @@ public static Flowable combineLatest(Iterable Flowable combineLatest(Iterable> sources, - Function combiner, int bufferSize) { + Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); @@ -310,6 +335,11 @@ public static Flowable combineLatest(Iterable + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *
*
Backpressure:
*
The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s @@ -334,7 +364,7 @@ public static Flowable combineLatest(Iterable Flowable combineLatestDelayError(Publisher[] sources, - Function combiner) { + Function combiner) { return combineLatestDelayError(sources, combiner, bufferSize()); } @@ -343,6 +373,10 @@ public static Flowable combineLatestDelayError(Publisher[ * the source Publishers each time an item is received from any of the source Publishers, where this * aggregation is defined by a specified function and delays any error from the sources until * all source Publishers terminate. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *

*
Backpressure:
@@ -367,7 +401,7 @@ public static Flowable combineLatestDelayError(Publisher[ */ @SchedulerSupport(SchedulerSupport.NONE) @BackpressureSupport(BackpressureKind.FULL) - public static Flowable combineLatestDelayError(Function combiner, + public static Flowable combineLatestDelayError(Function combiner, Publisher... sources) { return combineLatestDelayError(sources, combiner, bufferSize()); } @@ -377,6 +411,10 @@ public static Flowable combineLatestDelayError(Function + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *
*
Scheduler:
@@ -398,7 +436,7 @@ public static Flowable combineLatestDelayError(FunctionReactiveX operators documentation: CombineLatest */ @SchedulerSupport(SchedulerSupport.NONE) - public static Flowable combineLatestDelayError(Function combiner, + public static Flowable combineLatestDelayError(Function combiner, int bufferSize, Publisher... sources) { return combineLatestDelayError(sources, combiner, bufferSize); } @@ -408,6 +446,10 @@ public static Flowable combineLatestDelayError(Function + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *
*
Backpressure:
@@ -435,7 +477,7 @@ public static Flowable combineLatestDelayError(Function Flowable combineLatestDelayError(Publisher[] sources, - Function combiner, int bufferSize) { + Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); @@ -450,6 +492,10 @@ public static Flowable combineLatestDelayError(Publisher[ * the source Publishers each time an item is received from any of the source Publishers, where this * aggregation is defined by a specified function and delays any error from the sources until * all source Publishers terminate. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *

*
Backpressure:
@@ -475,7 +521,7 @@ public static Flowable combineLatestDelayError(Publisher[ @SchedulerSupport(SchedulerSupport.NONE) @BackpressureSupport(BackpressureKind.FULL) public static Flowable combineLatestDelayError(Iterable> sources, - Function combiner) { + Function combiner) { return combineLatestDelayError(sources, combiner, bufferSize()); } @@ -484,6 +530,10 @@ public static Flowable combineLatestDelayError(Iterable + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *
*
Backpressure:
@@ -511,7 +561,7 @@ public static Flowable combineLatestDelayError(Iterable Flowable combineLatestDelayError(Iterable> sources, - Function combiner, int bufferSize) { + Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); @@ -14955,4 +15005,4 @@ public final TestSubscriber test(long initialRequest, boolean cancel) { // No return ts; } -} \ No newline at end of file +} diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 85a25fc3e0..cb0a0f0fc0 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -1343,6 +1343,11 @@ public static Maybe wrap(MaybeSource source) { * Returns a Maybe that emits the results of a specified combiner function applied to combinations of * items emitted, in sequence, by an Iterable of other MaybeSources. *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

This operator terminates eagerly if any of the source MaybeSources signal an onError or onComplete. This * also means it is possible some sources may not get subscribed to at all. @@ -1362,7 +1367,7 @@ public static Maybe wrap(MaybeSource source) { * @see ReactiveX operators documentation: Zip */ @SchedulerSupport(SchedulerSupport.NONE) - public static Maybe zip(Iterable> sources, Function zipper) { + public static Maybe zip(Iterable> sources, Function zipper) { ObjectHelper.requireNonNull(zipper, "zipper is null"); ObjectHelper.requireNonNull(sources, "sources is null"); return RxJavaPlugins.onAssembly(new MaybeZipIterable(sources, zipper)); @@ -1774,6 +1779,11 @@ public static Maybe zip( * Returns a Maybe that emits the results of a specified combiner function applied to combinations of * items emitted, in sequence, by an array of other MaybeSources. *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

This operator terminates eagerly if any of the source MaybeSources signal an onError or onComplete. This * also means it is possible some sources may not get subscribed to at all. @@ -1796,6 +1806,7 @@ public static Maybe zip( @SchedulerSupport(SchedulerSupport.NONE) public static Maybe zipArray(Function zipper, MaybeSource... sources) { + ObjectHelper.requireNonNull(sources, "sources is null"); if (sources.length == 0) { return empty(); } diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 5996e7180a..74469ee628 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -128,6 +128,11 @@ public static int bufferSize() { * Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of * the source ObservableSources each time an item is received from any of the source ObservableSources, where this * aggregation is defined by a specified function. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *

*
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
@@ -148,7 +153,7 @@ public static int bufferSize() { * @see ReactiveX operators documentation: CombineLatest */ @SchedulerSupport(SchedulerSupport.NONE) - public static Observable combineLatest(Function combiner, int bufferSize, ObservableSource... sources) { + public static Observable combineLatest(Function combiner, int bufferSize, ObservableSource... sources) { return combineLatest(sources, combiner, bufferSize); } @@ -156,6 +161,11 @@ public static Observable combineLatest(Function + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *
*
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
@@ -175,7 +185,7 @@ public static Observable combineLatest(Function Observable combineLatest(Iterable> sources, - Function combiner) { + Function combiner) { return combineLatest(sources, combiner, bufferSize()); } @@ -184,6 +194,11 @@ public static Observable combineLatest(Iterable + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *
*
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
@@ -205,7 +220,7 @@ public static Observable combineLatest(Iterable Observable combineLatest(Iterable> sources, - Function combiner, int bufferSize) { + Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); @@ -219,6 +234,11 @@ public static Observable combineLatest(Iterable + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *
*
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
@@ -238,7 +258,7 @@ public static Observable combineLatest(Iterable Observable combineLatest(ObservableSource[] sources, - Function combiner) { + Function combiner) { return combineLatest(sources, combiner, bufferSize()); } @@ -246,6 +266,11 @@ public static Observable combineLatest(ObservableSource[] * Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of * the source ObservableSources each time an item is received from any of the source ObservableSources, where this * aggregation is defined by a specified function. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *

*
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
@@ -267,7 +292,7 @@ public static Observable combineLatest(ObservableSource[] */ @SchedulerSupport(SchedulerSupport.NONE) public static Observable combineLatest(ObservableSource[] sources, - Function combiner, int bufferSize) { + Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); if (sources.length == 0) { return empty(); @@ -640,6 +665,11 @@ public static Observable combineLates * Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of * the source ObservableSources each time an item is received from any of the source ObservableSources, where this * aggregation is defined by a specified function. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *

*
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
@@ -659,7 +689,7 @@ public static Observable combineLates */ @SchedulerSupport(SchedulerSupport.NONE) public static Observable combineLatestDelayError(ObservableSource[] sources, - Function combiner) { + Function combiner) { return combineLatestDelayError(sources, combiner, bufferSize()); } @@ -668,6 +698,10 @@ public static Observable combineLatestDelayError(ObservableSource + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *
*
Scheduler:
@@ -689,7 +723,7 @@ public static Observable combineLatestDelayError(ObservableSourceReactiveX operators documentation: CombineLatest */ @SchedulerSupport(SchedulerSupport.NONE) - public static Observable combineLatestDelayError(Function combiner, + public static Observable combineLatestDelayError(Function combiner, int bufferSize, ObservableSource... sources) { return combineLatestDelayError(sources, combiner, bufferSize); } @@ -699,6 +733,10 @@ public static Observable combineLatestDelayError(Function + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *
*
Scheduler:
@@ -721,7 +759,7 @@ public static Observable combineLatestDelayError(Function Observable combineLatestDelayError(ObservableSource[] sources, - Function combiner, int bufferSize) { + Function combiner, int bufferSize) { ObjectHelper.verifyPositive(bufferSize, "bufferSize"); ObjectHelper.requireNonNull(combiner, "combiner is null"); if (sources.length == 0) { @@ -737,6 +775,10 @@ public static Observable combineLatestDelayError(ObservableSource + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *
*
Scheduler:
@@ -757,7 +799,7 @@ public static Observable combineLatestDelayError(ObservableSource Observable combineLatestDelayError(Iterable> sources, - Function combiner) { + Function combiner) { return combineLatestDelayError(sources, combiner, bufferSize()); } @@ -766,6 +808,10 @@ public static Observable combineLatestDelayError(Iterable + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *
*
Scheduler:
@@ -788,7 +834,7 @@ public static Observable combineLatestDelayError(Iterable Observable combineLatestDelayError(Iterable> sources, - Function combiner, int bufferSize) { + Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); @@ -3358,6 +3404,11 @@ public static Observable wrap(ObservableSource source) { * use {@code doOnUnsubscribed()} as well or use {@code using()} to do cleanup in case of completion * or unsubscription. *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

*
Scheduler:
@@ -3375,7 +3426,7 @@ public static Observable wrap(ObservableSource source) { * @see ReactiveX operators documentation: Zip */ @SchedulerSupport(SchedulerSupport.NONE) - public static Observable zip(Iterable> sources, Function zipper) { + public static Observable zip(Iterable> sources, Function zipper) { ObjectHelper.requireNonNull(zipper, "zipper is null"); ObjectHelper.requireNonNull(sources, "sources is null"); return RxJavaPlugins.onAssembly(new ObservableZip(null, sources, zipper, bufferSize(), false)); @@ -3405,6 +3456,11 @@ public static Observable zip(Iterable + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

*
Scheduler:
@@ -3423,7 +3479,7 @@ public static Observable zip(Iterable Observable zip(ObservableSource> sources, final Function zipper) { + public static Observable zip(ObservableSource> sources, final Function zipper) { ObjectHelper.requireNonNull(zipper, "zipper is null"); ObjectHelper.requireNonNull(sources, "sources is null"); return RxJavaPlugins.onAssembly(new ObservableToList(sources, 16) @@ -4074,6 +4130,11 @@ public static Observable zip( * use {@code doOnUnsubscribed()} as well or use {@code using()} to do cleanup in case of completion * or unsubscription. *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

*
Scheduler:
@@ -4095,7 +4156,7 @@ public static Observable zip( * @see ReactiveX operators documentation: Zip */ @SchedulerSupport(SchedulerSupport.NONE) - public static Observable zipArray(Function zipper, + public static Observable zipArray(Function zipper, boolean delayError, int bufferSize, ObservableSource... sources) { if (sources.length == 0) { return empty(); @@ -4129,6 +4190,11 @@ public static Observable zipArray(Function z * use {@code doOnUnsubscribed()} as well or use {@code using()} to do cleanup in case of completion * or unsubscription. *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

*
Scheduler:
@@ -4152,7 +4218,7 @@ public static Observable zipArray(Function z */ @SchedulerSupport(SchedulerSupport.NONE) public static Observable zipIterable(Iterable> sources, - Function zipper, boolean delayError, + Function zipper, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(zipper, "zipper is null"); ObjectHelper.requireNonNull(sources, "sources is null"); diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 80ab7d5d77..e22aaabadf 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -877,6 +877,11 @@ public static Single wrap(SingleSource source) { * value and calls a zipper function with an array of these values to return a result * to be emitted to downstream. *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

* If any of the SingleSources signal an error, all other SingleSources get cancelled and the @@ -1295,6 +1300,11 @@ public static Single zip( * value and calls a zipper function with an array of these values to return a result * to be emitted to downstream. *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

* If any of the SingleSources signal an error, all other SingleSources get cancelled and the diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java index a6f21df0cf..fa490a8da1 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java @@ -40,14 +40,14 @@ public final class FlowableCombineLatest final Iterable> iterable; - final Function combiner; + final Function combiner; final int bufferSize; final boolean delayErrors; public FlowableCombineLatest(Publisher[] array, - Function combiner, + Function combiner, int bufferSize, boolean delayErrors) { if (bufferSize <= 0) { throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize); @@ -61,7 +61,7 @@ public FlowableCombineLatest(Publisher[] array, } public FlowableCombineLatest(Iterable> iterable, - Function combiner, + Function combiner, int bufferSize, boolean delayErrors) { if (bufferSize <= 0) { throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize); @@ -173,7 +173,7 @@ static final class CombineLatestCoordinator final Subscriber actual; - final Function combiner; + final Function combiner; final CombineLatestInnerSubscriber[] subscribers; @@ -198,7 +198,7 @@ static final class CombineLatestCoordinator final AtomicReference error; public CombineLatestCoordinator(Subscriber actual, - Function combiner, int n, + Function combiner, int n, int bufferSize, boolean delayErrors) { this.actual = actual; this.combiner = combiner; diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java index aece8cc510..5f58a7e62c 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java @@ -27,9 +27,9 @@ public final class MaybeZipArray extends Maybe { final MaybeSource[] sources; - final Function zipper; + final Function zipper; - public MaybeZipArray(MaybeSource[] sources, Function zipper) { + public MaybeZipArray(MaybeSource[] sources, Function zipper) { this.sources = sources; this.zipper = zipper; } @@ -71,14 +71,14 @@ static final class ZipCoordinator extends AtomicInteger implements Disposa final MaybeObserver actual; - final Function zipper; + final Function zipper; final ZipMaybeObserver[] observers; final Object[] values; @SuppressWarnings("unchecked") - public ZipCoordinator(MaybeObserver observer, int n, Function zipper) { + public ZipCoordinator(MaybeObserver observer, int n, Function zipper) { super(n); this.actual = observer; this.zipper = zipper; diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java index 8f89193c33..4ea94f0731 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java @@ -25,9 +25,9 @@ public final class MaybeZipIterable extends Maybe { final Iterable> sources; - final Function zipper; + final Function zipper; - public MaybeZipIterable(Iterable> sources, Function zipper) { + public MaybeZipIterable(Iterable> sources, Function zipper) { this.sources = sources; this.zipper = zipper; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java index 3797763783..f0ffecfbb7 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java @@ -27,13 +27,13 @@ public final class ObservableCombineLatest extends Observable { final ObservableSource[] sources; final Iterable> sourcesIterable; - final Function combiner; + final Function combiner; final int bufferSize; final boolean delayError; public ObservableCombineLatest(ObservableSource[] sources, Iterable> sourcesIterable, - Function combiner, int bufferSize, + Function combiner, int bufferSize, boolean delayError) { this.sources = sources; this.sourcesIterable = sourcesIterable; @@ -75,7 +75,7 @@ static final class LatestCoordinator extends AtomicInteger implements Disp /** */ private static final long serialVersionUID = 8567835998786448817L; final Observer actual; - final Function combiner; + final Function combiner; final int count; final CombinerSubscriber[] subscribers; final int bufferSize; @@ -94,7 +94,7 @@ static final class LatestCoordinator extends AtomicInteger implements Disp @SuppressWarnings("unchecked") public LatestCoordinator(Observer actual, - Function combiner, + Function combiner, int count, int bufferSize, boolean delayError) { this.actual = actual; this.combiner = combiner; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java index 411bd44218..f9aaac221e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java @@ -296,9 +296,9 @@ public static Function>, ObservableSource static final class ZipIterableFunction implements Function>, ObservableSource> { - private final Function zipper; + private final Function zipper; - ZipIterableFunction(Function zipper) { + ZipIterableFunction(Function zipper) { this.zipper = zipper; } @@ -308,7 +308,7 @@ public ObservableSource apply(List> l } } - public static Function>, ObservableSource> zipIterable(final Function zipper) { + public static Function>, ObservableSource> zipIterable(final Function zipper) { return new ZipIterableFunction(zipper); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java index 92b54a110d..e110813703 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java @@ -27,13 +27,13 @@ public final class ObservableZip extends Observable { final ObservableSource[] sources; final Iterable> sourcesIterable; - final Function zipper; + final Function zipper; final int bufferSize; final boolean delayError; public ObservableZip(ObservableSource[] sources, Iterable> sourcesIterable, - Function zipper, + Function zipper, int bufferSize, boolean delayError) { this.sources = sources; @@ -75,7 +75,7 @@ static final class ZipCoordinator extends AtomicInteger implements Disposa /** */ private static final long serialVersionUID = 2983708048395377667L; final Observer actual; - final Function zipper; + final Function zipper; final ZipSubscriber[] subscribers; final T[] row; final boolean delayError; @@ -84,7 +84,7 @@ static final class ZipCoordinator extends AtomicInteger implements Disposa @SuppressWarnings("unchecked") public ZipCoordinator(Observer actual, - Function zipper, + Function zipper, int count, boolean delayError) { this.actual = actual; this.zipper = zipper; diff --git a/src/test/java/io/reactivex/flowable/FlowableTests.java b/src/test/java/io/reactivex/flowable/FlowableTests.java index de01ccac68..fc56cd59de 100644 --- a/src/test/java/io/reactivex/flowable/FlowableTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableTests.java @@ -1060,4 +1060,34 @@ public void toObservableRange() { public void toObservableError() { Flowable.error(new TestException()).toObservable().test().assertFailure(TestException.class); } + + @Test + public void zipIterableObject() { + final List> flowables = Arrays.asList(Flowable.just(1, 2, 3), Flowable.just(1, 2, 3)); + Flowable.zip(flowables, new Function() { + @Override + public Object apply(Object[] o) throws Exception { + int sum = 0; + for (Object i : o) { + sum += (Integer) i; + } + return sum; + } + }).test().assertResult(2, 4, 6); + } + + @Test + public void combineLatestObject() { + final List> flowables = Arrays.asList(Flowable.just(1, 2, 3), Flowable.just(1, 2, 3)); + Flowable.combineLatest(flowables, new Function() { + @Override + public Object apply(final Object[] o) throws Exception { + int sum = 1; + for (Object i : o) { + sum *= (Integer) i; + } + return sum; + } + }).test().assertResult(3, 6, 9); + } } diff --git a/src/test/java/io/reactivex/maybe/MaybeTest.java b/src/test/java/io/reactivex/maybe/MaybeTest.java index 56330ba989..b1e1db85d3 100644 --- a/src/test/java/io/reactivex/maybe/MaybeTest.java +++ b/src/test/java/io/reactivex/maybe/MaybeTest.java @@ -2858,4 +2858,18 @@ public void ambWith2SignalsSuccess() { ts.assertResult(2); } + @Test + public void zipIterableObject() { + final List> maybes = Arrays.asList(Maybe.just(1), Maybe.just(4)); + Maybe.zip(maybes, new Function() { + @Override + public Object apply(final Object[] o) throws Exception { + int sum = 0; + for (Object i : o) { + sum += (Integer) i; + } + return sum; + } + }).test().assertResult(5); + } } diff --git a/src/test/java/io/reactivex/observable/ObservableTest.java b/src/test/java/io/reactivex/observable/ObservableTest.java index 0df7803154..6dcaea8fd2 100644 --- a/src/test/java/io/reactivex/observable/ObservableTest.java +++ b/src/test/java/io/reactivex/observable/ObservableTest.java @@ -1056,4 +1056,34 @@ public void singleDefault() { Observable.empty().toSingle(100).test().assertResult(100); } + + @Test + public void zipIterableObject() { + final List> observables = Arrays.asList(Observable.just(1, 2, 3), Observable.just(1, 2, 3)); + Observable.zip(observables, new Function() { + @Override + public Object apply(Object[] o) throws Exception { + int sum = 0; + for (Object i : o) { + sum += (Integer) i; + } + return sum; + } + }).test().assertResult(2, 4, 6); + } + + @Test + public void combineLatestObject() { + final List> observables = Arrays.asList(Observable.just(1, 2, 3), Observable.just(1, 2, 3)); + Observable.combineLatest(observables, new Function() { + @Override + public Object apply(final Object[] o) throws Exception { + int sum = 1; + for (Object i : o) { + sum *= (Integer) i; + } + return sum; + } + }).test().assertResult(3, 6, 9); + } } diff --git a/src/test/java/io/reactivex/single/SingleTest.java b/src/test/java/io/reactivex/single/SingleTest.java index ee8e52a00f..5a6396c298 100644 --- a/src/test/java/io/reactivex/single/SingleTest.java +++ b/src/test/java/io/reactivex/single/SingleTest.java @@ -515,5 +515,20 @@ public void toFlowableIterableRemove() { iterator.next(); iterator.remove(); } + + @Test + public void zipIterableObject() { + final List> singles = Arrays.asList(Single.just(1), Single.just(4)); + Single.zip(singles, new Function() { + @Override + public Object apply(final Object[] o) throws Exception { + int sum = 0; + for (Object i : o) { + sum += (Integer) i; + } + return sum; + } + }).test().assertResult(5); + } }