diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 0453b45462..372bb36c7c 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -929,6 +929,7 @@ public final void blockingAwait() { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final boolean blockingAwait(long timeout, TimeUnit unit) { + ObjectHelper.requireNonNull(unit, "unit is null"); BlockingMultiObserver observer = new BlockingMultiObserver(); subscribe(observer); return observer.blockingAwait(timeout, unit); diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 743c0ef419..237eac6243 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -1229,6 +1229,8 @@ public static Flowable concat(Publisher> @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public static Flowable concat(Publisher source1, Publisher source2) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); return concatArray(source1, source2); } @@ -1265,6 +1267,9 @@ public static Flowable concat(Publisher source1, Publisher Flowable concat( Publisher source1, Publisher source2, Publisher source3) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); return concatArray(source1, source2, source3); } @@ -1303,6 +1308,10 @@ public static Flowable concat( public static Flowable concat( Publisher source1, Publisher source2, Publisher source3, Publisher source4) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); return concatArray(source1, source2, source3, source4); } @@ -1426,6 +1435,9 @@ public static Flowable concatArrayEager(Publisher... sources @SchedulerSupport(SchedulerSupport.NONE) @SuppressWarnings({ "rawtypes", "unchecked" }) public static Flowable concatArrayEager(int maxConcurrency, int prefetch, Publisher... sources) { + ObjectHelper.requireNonNull(sources, "sources is null"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromArray(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.IMMEDIATE)); } @@ -1557,6 +1569,9 @@ public static Flowable concatEager(Publisher Flowable concatEager(Publisher> sources, int maxConcurrency, int prefetch) { + ObjectHelper.requireNonNull(sources, "sources is null"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new FlowableConcatMapEagerPublisher(sources, Functions.identity(), maxConcurrency, prefetch, ErrorMode.IMMEDIATE)); } @@ -1613,6 +1628,9 @@ public static Flowable concatEager(Iterable Flowable concatEager(Iterable> sources, int maxConcurrency, int prefetch) { + ObjectHelper.requireNonNull(sources, "sources is null"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromIterable(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.IMMEDIATE)); } @@ -6219,6 +6237,7 @@ public final Flowable> buffer(Publisher boundaryIndicator) { @BackpressureSupport(BackpressureKind.ERROR) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable> buffer(Publisher boundaryIndicator, final int initialCapacity) { + ObjectHelper.verifyPositive(initialCapacity, "initialCapacity"); return buffer(boundaryIndicator, Functions.createArrayList(initialCapacity)); } @@ -6640,6 +6659,7 @@ public final Flowable concatMap(Function Flowable concatMap(Function> mapper, int prefetch) { ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); if (this instanceof ScalarCallable) { @SuppressWarnings("unchecked") T v = ((ScalarCallable)this).call(); @@ -6648,7 +6668,6 @@ public final Flowable concatMap(Function(this, mapper, prefetch, ErrorMode.IMMEDIATE)); } @@ -6712,6 +6731,7 @@ public final Flowable concatMapDelayError(Function Flowable concatMapDelayError(Function> mapper, int prefetch, boolean tillTheEnd) { ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); if (this instanceof ScalarCallable) { @SuppressWarnings("unchecked") T v = ((ScalarCallable)this).call(); @@ -6720,7 +6740,6 @@ public final Flowable concatMapDelayError(Function(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY)); } @@ -6779,6 +6798,7 @@ public final Flowable concatMapEager(Function Flowable concatMapEager(Function> mapper, int maxConcurrency, int prefetch) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(this, mapper, maxConcurrency, prefetch, ErrorMode.IMMEDIATE)); @@ -6847,6 +6867,9 @@ public final Flowable concatMapEagerDelayError(Function Flowable concatMapEagerDelayError(Function> mapper, int maxConcurrency, int prefetch, boolean tillTheEnd) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(this, mapper, maxConcurrency, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY)); } @@ -8390,6 +8413,8 @@ public final Flowable flatMap(Function Flowable flatMap(Function> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); if (this instanceof ScalarCallable) { @SuppressWarnings("unchecked") T v = ((ScalarCallable)this).call(); @@ -8398,8 +8423,6 @@ public final Flowable flatMap(Function(this, mapper, delayErrors, maxConcurrency, bufferSize)); } @@ -8655,6 +8678,8 @@ public final Flowable flatMap(final Function combiner, boolean delayErrors, int maxConcurrency, int bufferSize) { ObjectHelper.requireNonNull(mapper, "mapper is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return flatMap(FlowableInternalHelper.flatMapWithCombiner(mapper, combiner), delayErrors, maxConcurrency, bufferSize); } @@ -9378,6 +9403,10 @@ public final Flowable groupJoin( Function> leftEnd, Function> rightEnd, BiFunction, ? extends R> resultSelector) { + ObjectHelper.requireNonNull(other, "other is null"); + ObjectHelper.requireNonNull(leftEnd, "leftEnd is null"); + ObjectHelper.requireNonNull(rightEnd, "rightEnd is null"); + ObjectHelper.requireNonNull(resultSelector, "resultSelector is null"); return RxJavaPlugins.onAssembly(new FlowableGroupJoin( this, other, leftEnd, rightEnd, resultSelector)); } @@ -9495,6 +9524,10 @@ public final Flowable join( Function> leftEnd, Function> rightEnd, BiFunction resultSelector) { + ObjectHelper.requireNonNull(other, "other is null"); + ObjectHelper.requireNonNull(leftEnd, "leftEnd is null"); + ObjectHelper.requireNonNull(rightEnd, "rightEnd is null"); + ObjectHelper.requireNonNull(resultSelector, "resultSelector is null"); return RxJavaPlugins.onAssembly(new FlowableJoin( this, other, leftEnd, rightEnd, resultSelector)); } @@ -10003,6 +10036,7 @@ public final Flowable onBackpressureBuffer(int capacity, boolean delayError, public final Flowable onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded, Action onOverflow) { ObjectHelper.requireNonNull(onOverflow, "onOverflow is null"); + ObjectHelper.verifyPositive(capacity, "capacity"); return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer(this, capacity, unbounded, delayError, onOverflow)); } @@ -10986,6 +11020,7 @@ public final Flowable replay(Function, ? extends Publ @SchedulerSupport(SchedulerSupport.NONE) public final Flowable replay(Function, ? extends Publisher> selector, final int bufferSize) { ObjectHelper.requireNonNull(selector, "selector is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return FlowableReplay.multicastSelector(FlowableInternalHelper.replayCallable(this, bufferSize), selector); } @@ -11068,8 +11103,10 @@ public final Flowable replay(Function, ? extends Publ @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.CUSTOM) public final Flowable replay(Function, ? extends Publisher> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) { - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); ObjectHelper.requireNonNull(selector, "selector is null"); + ObjectHelper.requireNonNull(unit, "unit is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return FlowableReplay.multicastSelector( FlowableInternalHelper.replayCallable(this, bufferSize, time, unit, scheduler), selector); } @@ -11107,6 +11144,9 @@ public final Flowable replay(Function, ? extends Publ @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.CUSTOM) public final Flowable replay(final Function, ? extends Publisher> selector, final int bufferSize, final Scheduler scheduler) { + ObjectHelper.requireNonNull(selector, "selector is null"); + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return FlowableReplay.multicastSelector(FlowableInternalHelper.replayCallable(this, bufferSize), FlowableInternalHelper.replayFunction(selector, scheduler) ); @@ -11251,6 +11291,7 @@ public final Flowable replay(final Function, ? extend @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final ConnectableFlowable replay(final int bufferSize) { + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return FlowableReplay.create(this, bufferSize); } @@ -11323,6 +11364,7 @@ public final ConnectableFlowable replay(int bufferSize, long time, TimeUnit u @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.CUSTOM) public final ConnectableFlowable replay(final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) { + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); ObjectHelper.requireNonNull(unit, "unit is null"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); @@ -12591,6 +12633,7 @@ public final Flowable sorted() { @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable sorted(Comparator sortFunction) { + ObjectHelper.requireNonNull(sortFunction, "sortFunction"); return toList().toFlowable().map(Functions.listSorter(sortFunction)).flatMapIterable(Functions.>identity()); } @@ -13226,6 +13269,7 @@ public final Flowable switchMapDelayError(Function Flowable switchMap0(Function> mapper, int bufferSize, boolean delayError) { ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); if (this instanceof ScalarCallable) { @SuppressWarnings("unchecked") T v = ((ScalarCallable)this).call(); @@ -13234,7 +13278,6 @@ Flowable switchMap0(Function> } return FlowableScalarXMap.scalarXMap(v, mapper); } - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new FlowableSwitchMap(this, mapper, bufferSize, delayError)); } @@ -14199,7 +14242,7 @@ public final Flowable timeout(long timeout, TimeUnit timeUnit) { @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.COMPUTATION) - public final Flowable timeout(long timeout, TimeUnit timeUnit, Flowable other) { + public final Flowable timeout(long timeout, TimeUnit timeUnit, Publisher other) { ObjectHelper.requireNonNull(other, "other is null"); return timeout0(timeout, timeUnit, other, Schedulers.computation()); } @@ -14235,7 +14278,7 @@ public final Flowable timeout(long timeout, TimeUnit timeUnit, Flowable timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler, Flowable other) { + public final Flowable timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler, Publisher other) { ObjectHelper.requireNonNull(other, "other is null"); return timeout0(timeout, timeUnit, other, scheduler); } @@ -14360,7 +14403,7 @@ public final Flowable timeout( return timeout0(firstTimeoutIndicator, itemTimeoutIndicator, other); } - private Flowable timeout0(long timeout, TimeUnit timeUnit, Flowable other, + private Flowable timeout0(long timeout, TimeUnit timeUnit, Publisher other, Scheduler scheduler) { ObjectHelper.requireNonNull(timeUnit, "timeUnit is null"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); @@ -15572,6 +15615,7 @@ public final Flowable> window(Publisher boundaryIndicator) { @SchedulerSupport(SchedulerSupport.NONE) public final Flowable> window(Publisher boundaryIndicator, int bufferSize) { ObjectHelper.requireNonNull(boundaryIndicator, "boundaryIndicator is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new FlowableWindowBoundary(this, boundaryIndicator, bufferSize)); } @@ -15650,6 +15694,7 @@ public final Flowable> window( Function> closingIndicator, int bufferSize) { ObjectHelper.requireNonNull(openingIndicator, "openingIndicator is null"); ObjectHelper.requireNonNull(closingIndicator, "closingIndicator is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new FlowableWindowBoundarySelector(this, openingIndicator, closingIndicator, bufferSize)); } @@ -15719,6 +15764,7 @@ public final Flowable> window(Callable> b @SchedulerSupport(SchedulerSupport.NONE) public final Flowable> window(Callable> boundaryIndicatorSupplier, int bufferSize) { ObjectHelper.requireNonNull(boundaryIndicatorSupplier, "boundaryIndicatorSupplier is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new FlowableWindowBoundarySupplier(this, boundaryIndicatorSupplier, bufferSize)); } @@ -15792,6 +15838,8 @@ public final Flowable withLatestFrom(Publisher other, @SchedulerSupport(SchedulerSupport.NONE) public final Flowable withLatestFrom(Publisher source1, Publisher source2, Function3 combiner) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); Function f = Functions.toFunction(combiner); return withLatestFrom(new Publisher[] { source1, source2 }, f); } @@ -15831,6 +15879,9 @@ public final Flowable withLatestFrom( Publisher source1, Publisher source2, Publisher source3, Function4 combiner) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); Function f = Functions.toFunction(combiner); return withLatestFrom(new Publisher[] { source1, source2, source3 }, f); } @@ -15872,6 +15923,10 @@ public final Flowable withLatestFrom( Publisher source1, Publisher source2, Publisher source3, Publisher source4, Function5 combiner) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); Function f = Functions.toFunction(combiner); return withLatestFrom(new Publisher[] { source1, source2, source3, source4 }, f); } diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 06a1ede963..1065dc06f0 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -827,6 +827,8 @@ public static Flowable merge(Publisher @SchedulerSupport(SchedulerSupport.NONE) @SuppressWarnings({ "unchecked", "rawtypes" }) public static Flowable merge(Publisher> sources, int maxConcurrency) { + ObjectHelper.requireNonNull(sources, "source is null"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), false, maxConcurrency, Flowable.bufferSize())); } @@ -852,6 +854,7 @@ public static Flowable merge(Publisher @SchedulerSupport(SchedulerSupport.NONE) @SuppressWarnings({ "unchecked", "rawtypes" }) public static Maybe merge(MaybeSource> source) { + ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new MaybeFlatten(source, Functions.identity())); } @@ -1028,6 +1031,9 @@ public static Flowable mergeArray(MaybeSource... sources) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static Flowable mergeArrayDelayError(MaybeSource... sources) { + if (sources.length == 0) { + return Flowable.empty(); + } return Flowable.fromArray(sources).flatMap((Function)MaybeToPublisher.instance(), true, sources.length); } @@ -1309,6 +1315,9 @@ public static Single sequenceEqual(MaybeSource source1 @SchedulerSupport(SchedulerSupport.NONE) public static Single sequenceEqual(MaybeSource source1, MaybeSource source2, BiPredicate isEqual) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(isEqual, "isEqual is null"); return RxJavaPlugins.onAssembly(new MaybeEqualSingle(source1, source2, isEqual)); } @@ -2281,6 +2290,7 @@ public final Maybe delay(long delay, TimeUnit unit, Scheduler scheduler) { @SchedulerSupport(SchedulerSupport.NONE) @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) public final Maybe delay(Publisher delayIndicator) { + ObjectHelper.requireNonNull(delayIndicator, "delayIndicator is null"); return RxJavaPlugins.onAssembly(new MaybeDelayOtherPublisher(this, delayIndicator)); } @@ -2677,6 +2687,8 @@ public final Maybe flatMap( @SchedulerSupport(SchedulerSupport.NONE) public final Maybe flatMap(Function> mapper, BiFunction resultSelector) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.requireNonNull(resultSelector, "resultSelector is null"); return RxJavaPlugins.onAssembly(new MaybeFlatMapBiSelector(this, mapper, resultSelector)); } @@ -2704,6 +2716,7 @@ public final Maybe flatMap(Function Flowable flattenAsFlowable(final Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new MaybeFlatMapIterableFlowable(this, mapper)); } @@ -2727,6 +2740,7 @@ public final Flowable flattenAsFlowable(final Function Observable flattenAsObservable(final Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new MaybeFlatMapIterableObservable(this, mapper)); } @@ -3691,6 +3705,9 @@ public final Disposable subscribe(Consumer onSuccess, Consumer onSuccess, Consumer onError, Action onComplete) { + ObjectHelper.requireNonNull(onSuccess, "onSuccess is null"); + ObjectHelper.requireNonNull(onError, "onError is null"); + ObjectHelper.requireNonNull(onComplete, "onComplete is null"); return subscribeWith(new MaybeCallbackObserver(onSuccess, onError, onComplete)); } diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index e1361a22fe..7457fcd2de 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -371,6 +371,8 @@ public static Observable combineLatest(ObservableSource[] public static Observable combineLatest( ObservableSource source1, ObservableSource source2, BiFunction combiner) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2); } @@ -412,6 +414,9 @@ public static Observable combineLatest( ObservableSource source1, ObservableSource source2, ObservableSource source3, Function3 combiner) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3); } @@ -456,6 +461,10 @@ public static Observable combineLatest( ObservableSource source1, ObservableSource source2, ObservableSource source3, ObservableSource source4, Function4 combiner) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4); } @@ -504,6 +513,11 @@ public static Observable combineLatest( ObservableSource source3, ObservableSource source4, ObservableSource source5, Function5 combiner) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); + ObjectHelper.requireNonNull(source5, "source5 is null"); return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5); } @@ -555,6 +569,12 @@ public static Observable combineLatest( ObservableSource source3, ObservableSource source4, ObservableSource source5, ObservableSource source6, Function6 combiner) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); + ObjectHelper.requireNonNull(source5, "source5 is null"); + ObjectHelper.requireNonNull(source6, "source6 is null"); return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6); } @@ -610,6 +630,13 @@ public static Observable combineLatest( ObservableSource source5, ObservableSource source6, ObservableSource source7, Function7 combiner) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); + ObjectHelper.requireNonNull(source5, "source5 is null"); + ObjectHelper.requireNonNull(source6, "source6 is null"); + ObjectHelper.requireNonNull(source7, "source7 is null"); return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6, source7); } @@ -668,6 +695,14 @@ public static Observable combineLatest( ObservableSource source5, ObservableSource source6, ObservableSource source7, ObservableSource source8, Function8 combiner) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); + ObjectHelper.requireNonNull(source5, "source5 is null"); + ObjectHelper.requireNonNull(source6, "source6 is null"); + ObjectHelper.requireNonNull(source7, "source7 is null"); + ObjectHelper.requireNonNull(source8, "source8 is null"); return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6, source7, source8); } @@ -730,6 +765,15 @@ public static Observable combineLates ObservableSource source7, ObservableSource source8, ObservableSource source9, Function9 combiner) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); + ObjectHelper.requireNonNull(source5, "source5 is null"); + ObjectHelper.requireNonNull(source6, "source6 is null"); + ObjectHelper.requireNonNull(source7, "source7 is null"); + ObjectHelper.requireNonNull(source8, "source8 is null"); + ObjectHelper.requireNonNull(source9, "source9 is null"); return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6, source7, source8, source9); } @@ -1012,6 +1056,7 @@ public static Observable concat(ObservableSource Observable concat(ObservableSource> sources, int prefetch) { ObjectHelper.requireNonNull(sources, "sources is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new ObservableConcatMap(sources, Functions.identity(), prefetch, ErrorMode.IMMEDIATE)); } @@ -1038,6 +1083,8 @@ public static Observable concat(ObservableSource Observable concat(ObservableSource source1, ObservableSource source2) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); return concatArray(source1, source2); } @@ -1068,6 +1115,9 @@ public static Observable concat(ObservableSource source1, Ob public static Observable concat( ObservableSource source1, ObservableSource source2, ObservableSource source3) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); return concatArray(source1, source2, source3); } @@ -1100,6 +1150,10 @@ public static Observable concat( public static Observable concat( ObservableSource source1, ObservableSource source2, ObservableSource source3, ObservableSource source4) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); return concatArray(source1, source2, source3, source4); } @@ -1268,6 +1322,8 @@ public static Observable concatDelayError(ObservableSource Observable concatDelayError(ObservableSource> sources, int prefetch, boolean tillTheEnd) { + ObjectHelper.requireNonNull(sources, "sources is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch is null"); return RxJavaPlugins.onAssembly(new ObservableConcatMap(sources, Functions.identity(), prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY)); } @@ -1318,6 +1374,8 @@ public static Observable concatEager(ObservableSource Observable concatEager(ObservableSource> sources, int maxConcurrency, int prefetch) { + ObjectHelper.requireNonNull(maxConcurrency, "maxConcurrency is null"); + ObjectHelper.requireNonNull(prefetch, "prefetch is null"); return wrap(sources).concatMapEager((Function)Functions.identity(), maxConcurrency, prefetch); } @@ -1368,6 +1426,8 @@ public static Observable concatEager(Iterable Observable concatEager(Iterable> sources, int maxConcurrency, int prefetch) { + ObjectHelper.requireNonNull(maxConcurrency, "maxConcurrency is null"); + ObjectHelper.requireNonNull(prefetch, "prefetch is null"); return fromIterable(sources).concatMapEagerDelayError((Function)Functions.identity(), maxConcurrency, prefetch, false); } @@ -2617,6 +2677,7 @@ public static Observable merge(Iterable Observable merge(ObservableSource> sources) { + ObjectHelper.requireNonNull(sources, "sources is null"); return RxJavaPlugins.onAssembly(new ObservableFlatMap(sources, Functions.identity(), false, Integer.MAX_VALUE, bufferSize())); } @@ -2650,6 +2711,8 @@ public static Observable merge(ObservableSource Observable merge(ObservableSource> sources, int maxConcurrency) { + ObjectHelper.requireNonNull(sources, "sources is null"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); return RxJavaPlugins.onAssembly(new ObservableFlatMap(sources, Functions.identity(), false, maxConcurrency, bufferSize())); } @@ -2943,6 +3006,7 @@ public static Observable mergeDelayError(Iterable Observable mergeDelayError(ObservableSource> sources) { + ObjectHelper.requireNonNull(sources, "sources is null"); return RxJavaPlugins.onAssembly(new ObservableFlatMap(sources, Functions.identity(), true, Integer.MAX_VALUE, bufferSize())); } @@ -2979,6 +3043,8 @@ public static Observable mergeDelayError(ObservableSource Observable mergeDelayError(ObservableSource> sources, int maxConcurrency) { + ObjectHelper.requireNonNull(sources, "sources is null"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); return RxJavaPlugins.onAssembly(new ObservableFlatMap(sources, Functions.identity(), true, maxConcurrency, bufferSize())); } @@ -3385,6 +3451,7 @@ public static Single sequenceEqual(ObservableSource so @SchedulerSupport(SchedulerSupport.NONE) public static Observable switchOnNext(ObservableSource> sources, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableSwitchMap(sources, Functions.identity(), bufferSize, false)); } @@ -3809,6 +3876,8 @@ public static Observable zip(ObservableSource Observable zip( ObservableSource source1, ObservableSource source2, BiFunction zipper) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2); } @@ -3863,6 +3932,8 @@ public static Observable zip( public static Observable zip( ObservableSource source1, ObservableSource source2, BiFunction zipper, boolean delayError) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); return zipArray(Functions.toFunction(zipper), delayError, bufferSize(), source1, source2); } @@ -3918,6 +3989,8 @@ public static Observable zip( public static Observable zip( ObservableSource source1, ObservableSource source2, BiFunction zipper, boolean delayError, int bufferSize) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); return zipArray(Functions.toFunction(zipper), delayError, bufferSize, source1, source2); } @@ -3975,6 +4048,9 @@ public static Observable zip( public static Observable zip( ObservableSource source1, ObservableSource source2, ObservableSource source3, Function3 zipper) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2, source3); } @@ -4036,6 +4112,10 @@ public static Observable zip( ObservableSource source1, ObservableSource source2, ObservableSource source3, ObservableSource source4, Function4 zipper) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2, source3, source4); } @@ -4100,6 +4180,11 @@ public static Observable zip( ObservableSource source1, ObservableSource source2, ObservableSource source3, ObservableSource source4, ObservableSource source5, Function5 zipper) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); + ObjectHelper.requireNonNull(source5, "source5 is null"); return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2, source3, source4, source5); } @@ -4166,6 +4251,12 @@ public static Observable zip( ObservableSource source1, ObservableSource source2, ObservableSource source3, ObservableSource source4, ObservableSource source5, ObservableSource source6, Function6 zipper) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); + ObjectHelper.requireNonNull(source5, "source5 is null"); + ObjectHelper.requireNonNull(source6, "source6 is null"); return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2, source3, source4, source5, source6); } @@ -4236,6 +4327,13 @@ public static Observable zip( ObservableSource source4, ObservableSource source5, ObservableSource source6, ObservableSource source7, Function7 zipper) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); + ObjectHelper.requireNonNull(source5, "source5 is null"); + ObjectHelper.requireNonNull(source6, "source6 is null"); + ObjectHelper.requireNonNull(source7, "source7 is null"); return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2, source3, source4, source5, source6, source7); } @@ -4309,6 +4407,14 @@ public static Observable zip( ObservableSource source4, ObservableSource source5, ObservableSource source6, ObservableSource source7, ObservableSource source8, Function8 zipper) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); + ObjectHelper.requireNonNull(source5, "source5 is null"); + ObjectHelper.requireNonNull(source6, "source6 is null"); + ObjectHelper.requireNonNull(source7, "source7 is null"); + ObjectHelper.requireNonNull(source8, "source8 is null"); return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2, source3, source4, source5, source6, source7, source8); } @@ -4385,6 +4491,15 @@ public static Observable zip( ObservableSource source4, ObservableSource source5, ObservableSource source6, ObservableSource source7, ObservableSource source8, ObservableSource source9, Function9 zipper) { + ObjectHelper.requireNonNull(source1, "source1 is null"); + ObjectHelper.requireNonNull(source2, "source2 is null"); + ObjectHelper.requireNonNull(source3, "source3 is null"); + ObjectHelper.requireNonNull(source4, "source4 is null"); + ObjectHelper.requireNonNull(source5, "source5 is null"); + ObjectHelper.requireNonNull(source6, "source6 is null"); + ObjectHelper.requireNonNull(source7, "source7 is null"); + ObjectHelper.requireNonNull(source8, "source8 is null"); + ObjectHelper.requireNonNull(source9, "source9 is null"); return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2, source3, source4, source5, source6, source7, source8, source9); } @@ -5490,6 +5605,7 @@ public final Observable> buffer(ObservableSource boundary) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Observable> buffer(ObservableSource boundary, final int initialCapacity) { + ObjectHelper.verifyPositive(initialCapacity, "initialCapacity"); return buffer(boundary, Functions.createArrayList(initialCapacity)); } @@ -5915,6 +6031,7 @@ public final Observable concatMapDelayError(Function Observable concatMapDelayError(Function> mapper, int prefetch, boolean tillTheEnd) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); ObjectHelper.verifyPositive(prefetch, "prefetch"); if (this instanceof ScalarCallable) { @SuppressWarnings("unchecked") @@ -6034,6 +6151,9 @@ public final Observable concatMapEagerDelayError(Function Observable concatMapEagerDelayError(Function> mapper, int maxConcurrency, int prefetch, boolean tillTheEnd) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new ObservableConcatMapEager(this, mapper, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, maxConcurrency, prefetch)); } @@ -6087,6 +6207,8 @@ public final Observable concatMapIterable(final Function Observable concatMapIterable(final Function> mapper, int prefetch) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); return concatMap(ObservableInternalHelper.flatMapIntoIterable(mapper), prefetch); } @@ -7681,6 +7803,8 @@ public final Observable flatMapIterable(final Function Observable flatMapIterable(final Function> mapper, BiFunction resultSelector) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.requireNonNull(resultSelector, "resultSelector is null"); return flatMap(ObservableInternalHelper.flatMapIntoIterable(mapper), resultSelector, false, bufferSize(), bufferSize()); } @@ -8112,6 +8236,10 @@ public final Observable groupJoin( Function> rightEnd, BiFunction, ? extends R> resultSelector ) { + ObjectHelper.requireNonNull(other, "other is null"); + ObjectHelper.requireNonNull(leftEnd, "leftEnd is null"); + ObjectHelper.requireNonNull(rightEnd, "rightEnd is null"); + ObjectHelper.requireNonNull(resultSelector, "resultSelector is null"); return RxJavaPlugins.onAssembly(new ObservableGroupJoin( this, other, leftEnd, rightEnd, resultSelector)); } @@ -8213,6 +8341,10 @@ public final Observable join( Function> rightEnd, BiFunction resultSelector ) { + ObjectHelper.requireNonNull(other, "other is null"); + ObjectHelper.requireNonNull(leftEnd, "leftEnd is null"); + ObjectHelper.requireNonNull(rightEnd, "rightEnd is null"); + ObjectHelper.requireNonNull(resultSelector, "resultSelector is null"); return RxJavaPlugins.onAssembly(new ObservableJoin( this, other, leftEnd, rightEnd, resultSelector)); } @@ -9046,6 +9178,7 @@ public final Observable replay(Function, ? extends @SchedulerSupport(SchedulerSupport.NONE) public final Observable replay(Function, ? extends ObservableSource> selector, final int bufferSize) { ObjectHelper.requireNonNull(selector, "selector is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return ObservableReplay.multicastSelector(ObservableInternalHelper.replayCallable(this, bufferSize), selector); } @@ -9118,8 +9251,10 @@ public final Observable replay(Function, ? extends @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable replay(Function, ? extends ObservableSource> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) { - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); ObjectHelper.requireNonNull(selector, "selector is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.requireNonNull(unit, "unit is null"); + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return ObservableReplay.multicastSelector( ObservableInternalHelper.replayCallable(this, bufferSize, time, unit, scheduler), selector); } @@ -9152,6 +9287,9 @@ public final Observable replay(Function, ? extends @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable replay(final Function, ? extends ObservableSource> selector, final int bufferSize, final Scheduler scheduler) { + ObjectHelper.requireNonNull(selector, "selector is null"); + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return ObservableReplay.multicastSelector(ObservableInternalHelper.replayCallable(this, bufferSize), ObservableInternalHelper.replayFunction(selector, scheduler)); } @@ -9275,6 +9413,7 @@ public final Observable replay(final Function, ? ex @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final ConnectableObservable replay(final int bufferSize) { + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return ObservableReplay.create(this, bufferSize); } @@ -9366,6 +9505,7 @@ public final ConnectableObservable replay(final int bufferSize, final long ti @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final ConnectableObservable replay(final int bufferSize, final Scheduler scheduler) { + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return ObservableReplay.observeOn(replay(bufferSize), scheduler); } @@ -10443,6 +10583,7 @@ public final Observable sorted() { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Observable sorted(Comparator sortFunction) { + ObjectHelper.requireNonNull(sortFunction, "sortFunction is null"); return toList().toObservable().map(Functions.listSorter(sortFunction)).flatMapIterable(Functions.>identity()); } @@ -12208,6 +12349,7 @@ public final > Single toList(Callable coll @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Single> toMap(final Function keySelector) { + ObjectHelper.requireNonNull(keySelector, "keySelector is null"); return collect(HashMapSupplier.asCallable(), Functions.toMapKeySelector(keySelector)); } @@ -12272,6 +12414,10 @@ public final Single> toMap( final Function keySelector, final Function valueSelector, Callable> mapSupplier) { + ObjectHelper.requireNonNull(keySelector, "keySelector is null"); + ObjectHelper.requireNonNull(keySelector, "keySelector is null"); + ObjectHelper.requireNonNull(valueSelector, "valueSelector is null"); + ObjectHelper.requireNonNull(mapSupplier, "mapSupplier is null"); return collect(mapSupplier, Functions.toMapKeyValueSelector(keySelector, valueSelector)); } @@ -13028,6 +13174,7 @@ public final Observable> window(ObservableSource boundary) @SchedulerSupport(SchedulerSupport.NONE) public final Observable> window(ObservableSource boundary, int bufferSize) { ObjectHelper.requireNonNull(boundary, "boundary is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableWindowBoundary(this, boundary, bufferSize)); } @@ -13094,6 +13241,7 @@ public final Observable> window( Function> closingIndicator, int bufferSize) { ObjectHelper.requireNonNull(openingIndicator, "openingIndicator is null"); ObjectHelper.requireNonNull(closingIndicator, "closingIndicator is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableWindowBoundarySelector(this, openingIndicator, closingIndicator, bufferSize)); } @@ -13150,6 +13298,7 @@ public final Observable> window(Callable Observable> window(Callable> boundary, int bufferSize) { ObjectHelper.requireNonNull(boundary, "boundary is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableWindowBoundarySupplier(this, boundary, bufferSize)); } diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index bb42de50f0..f516143c72 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -142,6 +142,7 @@ public static Flowable concat(Iterable Observable concat(ObservableSource> sources) { + ObjectHelper.requireNonNull(sources, "sources is null"); return RxJavaPlugins.onAssembly(new ObservableConcatMap(sources, SingleInternalHelper.toObservable(), 2, ErrorMode.IMMEDIATE)); } @@ -685,6 +686,7 @@ public static Flowable merge(Iterable @SchedulerSupport(SchedulerSupport.NONE) @SuppressWarnings({ "unchecked", "rawtypes" }) public static Flowable merge(Publisher> sources) { + ObjectHelper.requireNonNull(sources, "sources is null"); return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, SingleInternalHelper.toFlowable(), false, Integer.MAX_VALUE, Flowable.bufferSize())); } @@ -1669,6 +1671,7 @@ public final Single delay(final long time, final TimeUnit unit, final Schedul @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Single delaySubscription(CompletableSource other) { + ObjectHelper.requireNonNull(other, "other is null"); return RxJavaPlugins.onAssembly(new SingleDelayWithCompletable(this, other)); } @@ -1690,6 +1693,7 @@ public final Single delaySubscription(CompletableSource other) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Single delaySubscription(SingleSource other) { + ObjectHelper.requireNonNull(other, "other is null"); return RxJavaPlugins.onAssembly(new SingleDelayWithSingle(this, other)); } @@ -1711,6 +1715,7 @@ public final Single delaySubscription(SingleSource other) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Single delaySubscription(ObservableSource other) { + ObjectHelper.requireNonNull(other, "other is null"); return RxJavaPlugins.onAssembly(new SingleDelayWithObservable(this, other)); } @@ -1737,6 +1742,7 @@ public final Single delaySubscription(ObservableSource other) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Single delaySubscription(Publisher other) { + ObjectHelper.requireNonNull(other, "other is null"); return RxJavaPlugins.onAssembly(new SingleDelayWithPublisher(this, other)); } @@ -2059,6 +2065,7 @@ public final Flowable flatMapPublisher(Function Flowable flattenAsFlowable(final Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new SingleFlatMapIterableFlowable(this, mapper)); } @@ -2082,6 +2089,7 @@ public final Flowable flattenAsFlowable(final Function Observable flattenAsObservable(final Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new SingleFlatMapIterableObservable(this, mapper)); } @@ -2197,6 +2205,7 @@ public final Single lift(final SingleOperator lif @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Single map(Function mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new SingleMap(this, mapper)); } @@ -2781,6 +2790,7 @@ public final Single subscribeOn(final Scheduler scheduler) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Single takeUntil(final CompletableSource other) { + ObjectHelper.requireNonNull(other, "other is null"); return takeUntil(new CompletableToFlowable(other)); } @@ -2811,6 +2821,7 @@ public final Single takeUntil(final CompletableSource other) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Single takeUntil(final Publisher other) { + ObjectHelper.requireNonNull(other, "other is null"); return RxJavaPlugins.onAssembly(new SingleTakeUntil(this, other)); } @@ -2835,6 +2846,7 @@ public final Single takeUntil(final Publisher other) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Single takeUntil(final SingleSource other) { + ObjectHelper.requireNonNull(other, "other is null"); return takeUntil(new SingleToFlowable(other)); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java index 24ece91821..d79c6f3061 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java @@ -18,7 +18,7 @@ import org.reactivestreams.*; import io.reactivex.functions.*; -import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.functions.*; import io.reactivex.internal.subscribers.*; import io.reactivex.internal.util.*; @@ -103,6 +103,9 @@ public static void subscribe(Publisher o) { */ public static void subscribe(Publisher o, final Consumer onNext, final Consumer onError, final Action onComplete) { + ObjectHelper.requireNonNull(onNext, "onNext is null"); + ObjectHelper.requireNonNull(onError, "onError is null"); + ObjectHelper.requireNonNull(onComplete, "onComplete is null"); subscribe(o, new LambdaSubscriber(onNext, onError, onComplete, Functions.REQUEST_MAX)); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java index 8ddd579099..1151c280fc 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java @@ -17,7 +17,7 @@ import io.reactivex.*; import io.reactivex.functions.*; -import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.functions.*; import io.reactivex.internal.observers.*; import io.reactivex.internal.util.*; @@ -97,6 +97,9 @@ public static void subscribe(ObservableSource o) { */ public static void subscribe(ObservableSource o, final Consumer onNext, final Consumer onError, final Action onComplete) { + ObjectHelper.requireNonNull(onNext, "onNext is null"); + ObjectHelper.requireNonNull(onError, "onError is null"); + ObjectHelper.requireNonNull(onComplete, "onComplete is null"); subscribe(o, new LambdaObserver(onNext, onError, onComplete, Functions.emptyConsumer())); } } diff --git a/src/main/java/io/reactivex/subscribers/TestSubscriber.java b/src/main/java/io/reactivex/subscribers/TestSubscriber.java index 0a90beb500..e16af8feaf 100644 --- a/src/main/java/io/reactivex/subscribers/TestSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/TestSubscriber.java @@ -118,6 +118,9 @@ public TestSubscriber(Subscriber actual) { */ public TestSubscriber(Subscriber actual, long initialRequest) { super(); + if (initialRequest < 0) { + throw new IllegalArgumentException("Negative initial request not allowed"); + } this.actual = actual; this.subscription = new AtomicReference(); this.missedRequested = new AtomicLong(initialRequest); diff --git a/src/test/java/io/reactivex/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/ParamValidationCheckerTest.java new file mode 100644 index 0000000000..3a69a87565 --- /dev/null +++ b/src/test/java/io/reactivex/ParamValidationCheckerTest.java @@ -0,0 +1,1117 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import java.lang.reflect.*; +import java.util.*; +import java.util.concurrent.*; + +import org.junit.Test; +import org.reactivestreams.*; + +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; + +/** + * Check that static and instance methods validate their parameters against + * null and invalid values properly. + */ +public class ParamValidationCheckerTest { + + @Test(timeout = 30000) + public void checkFlowable() { + checkClass(Flowable.class); + } + + @Test(timeout = 30000) + public void checkObservable() { + checkClass(Observable.class); + } + + @Test(timeout = 30000) + public void checkSingle() { + checkClass(Single.class); + } + + @Test(timeout = 30000) + public void checkMaybe() { + checkClass(Maybe.class); + } + + @Test(timeout = 30000) + public void checkCompletable() { + checkClass(Completable.class); + } + + // --------------------------------------------------------------------------------------- + // --------------------------------------------------------------------------------------- + + static Map> overrides; + + static Map> ignores; + + static Map, Object> defaultValues; + + static Map, List> defaultInstances; + + static { + overrides = new HashMap>(); + + // *********************************************************************************************************************** + + // zero index allowed + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "elementAt", Long.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "elementAt", Long.TYPE, Object.class)); + + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "elementAtOrError", Long.TYPE)); + + // negative skip count is ignored + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "skip", Long.TYPE)); + // negative skip time is considered as zero skip time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "skip", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "skip", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // can start with zero initial request + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "test", Long.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "test", Long.TYPE, Boolean.TYPE)); + + // negative timeout time is considered as zero timeout time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class, Publisher.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class, Scheduler.class, Publisher.class)); + + // negative buffer time is considered as zero buffer time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, TimeUnit.class, Integer.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, TimeUnit.class, Scheduler.class, Integer.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, TimeUnit.class, Scheduler.class, Integer.TYPE, Callable.class, Boolean.TYPE)); + + // negative time/skip is considered zero time/skip + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class, Callable.class)); + addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "buffer", Long.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "buffer", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "buffer", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class, Callable.class)); + + // negative timeout is allowed + addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "fromFuture", Future.class, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "fromFuture", Future.class, Long.TYPE, TimeUnit.class, Scheduler.class)); + + // null default is allowed + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "blockingLast", Object.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "timer", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "timer", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "interval", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "interval", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "interval", Long.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "interval", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "interval", Long.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "interval", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Boolean.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE)); + + // null default is allowed + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "blockingMostRecent", Object.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "delaySubscription", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "delaySubscription", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // null default is allowed + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "blockingFirst", Object.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "debounce", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "debounce", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // null Action allowed + addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "onBackpressureBuffer", Long.TYPE, Action.class, BackpressureOverflowStrategy.class)); + + // zero repeat is allowed + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "repeat", Long.TYPE)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "replay", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "replay", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "replay", Integer.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "replay", Integer.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "replay", Function.class, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "replay", Function.class, Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 2, ParamMode.ANY, "replay", Function.class, Integer.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 2, ParamMode.ANY, "replay", Function.class, Integer.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + + // zero retry is allowed + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE, Predicate.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleWithTimeout", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleWithTimeout", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "take", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "take", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // zero retry is allowed + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "take", Long.TYPE)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Boolean.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "takeLast", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "takeLast", Long.TYPE, TimeUnit.class, Boolean.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "takeLast", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "takeLast", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "takeLast", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE, Integer.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "takeLast", Long.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "takeLast", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "takeLast", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE, Integer.TYPE)); + addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "takeLast", Long.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "takeLast", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "takeLast", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE, Integer.TYPE)); + + // take last 0 is allowed + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "takeLast", Integer.TYPE)); + + // skip last 0 is allowed + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.NON_NEGATIVE, "skipLast", Integer.TYPE)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "skipLast", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "skipLast", Long.TYPE, TimeUnit.class, Boolean.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "skipLast", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "skipLast", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "skipLast", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE, Integer.TYPE)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleLast", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleLast", Long.TYPE, TimeUnit.class, Scheduler.class)); + + + // negative buffer time is considered as zero buffer time + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class, Long.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class, Long.TYPE, Boolean.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class, Scheduler.class, Long.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class, Scheduler.class, Long.TYPE, Boolean.TYPE)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class, Scheduler.class, Long.TYPE, Boolean.TYPE, Integer.TYPE)); + + // *********************************************************************************************************************** + + // negative timeout time is considered as zero timeout time + addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class, CompletableSource.class)); + addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class, Scheduler.class, CompletableSource.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "timer", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "timer", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE)); + + // zero repeat is allowed + addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "repeat", Long.TYPE)); + + // zero retry is allowed + addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "blockingGet", Long.TYPE, TimeUnit.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "blockingAwait", Long.TYPE, TimeUnit.class)); + + // *********************************************************************************************************************** + + // negative timeout time is considered as zero timeout time + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class, MaybeSource.class)); + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class, Scheduler.class, MaybeSource.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "timer", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "timer", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // negative timeout is allowed + addOverride(new ParamOverride(Maybe.class, 1, ParamMode.ANY, "fromFuture", Future.class, Long.TYPE, TimeUnit.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // zero repeat is allowed + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.NON_NEGATIVE, "repeat", Long.TYPE)); + + // zero retry is allowed + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE)); + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE, Predicate.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delaySubscription", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delaySubscription", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // *********************************************************************************************************************** + + // negative timeout time is considered as zero timeout time + addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class, SingleSource.class)); + addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class, Scheduler.class, SingleSource.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "timer", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "timer", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // negative timeout is allowed + addOverride(new ParamOverride(Single.class, 1, ParamMode.ANY, "fromFuture", Future.class, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Single.class, 1, ParamMode.ANY, "fromFuture", Future.class, Long.TYPE, TimeUnit.class, Scheduler.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class)); + + + // zero repeat is allowed + addOverride(new ParamOverride(Single.class, 0, ParamMode.NON_NEGATIVE, "repeat", Long.TYPE)); + + // zero retry is allowed + addOverride(new ParamOverride(Single.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delaySubscription", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delaySubscription", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // *********************************************************************************************************************** + + // zero index allowed + addOverride(new ParamOverride(Observable.class, 0, ParamMode.NON_NEGATIVE, "elementAt", Long.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.NON_NEGATIVE, "elementAt", Long.TYPE, Object.class)); + + addOverride(new ParamOverride(Observable.class, 0, ParamMode.NON_NEGATIVE, "elementAtOrError", Long.TYPE)); + + // negative skip count is ignored + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "skip", Long.TYPE)); + // negative skip time is considered as zero skip time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "skip", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "skip", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // negative timeout time is considered as zero timeout time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class, ObservableSource.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "timeout", Long.TYPE, TimeUnit.class, Scheduler.class, ObservableSource.class)); + + // negative buffer time is considered as zero buffer time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, TimeUnit.class, Integer.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, TimeUnit.class, Scheduler.class, Integer.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, TimeUnit.class, Scheduler.class, Integer.TYPE, Callable.class, Boolean.TYPE)); + + // negative time/skip is considered zero time/skip + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "buffer", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class, Callable.class)); + addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "buffer", Long.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "buffer", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "buffer", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class, Callable.class)); + + // negative timeout is allowed + addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "fromFuture", Future.class, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "fromFuture", Future.class, Long.TYPE, TimeUnit.class, Scheduler.class)); + + // null default is allowed + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "blockingLast", Object.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "timer", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "timer", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "interval", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "interval", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "interval", Long.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "interval", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "interval", Long.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "interval", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Boolean.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE)); + + // null default is allowed + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "blockingMostRecent", Object.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "delaySubscription", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "delaySubscription", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // null default is allowed + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "blockingFirst", Object.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "debounce", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "debounce", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // zero repeat is allowed + addOverride(new ParamOverride(Observable.class, 0, ParamMode.NON_NEGATIVE, "repeat", Long.TYPE)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "replay", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "replay", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "replay", Integer.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "replay", Integer.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "replay", Function.class, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "replay", Function.class, Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 2, ParamMode.ANY, "replay", Function.class, Integer.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 2, ParamMode.ANY, "replay", Function.class, Integer.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + + // zero retry is allowed + addOverride(new ParamOverride(Observable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE, Predicate.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleWithTimeout", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleWithTimeout", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "take", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "take", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // zero retry is allowed + addOverride(new ParamOverride(Observable.class, 0, ParamMode.NON_NEGATIVE, "take", Long.TYPE)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Boolean.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "sample", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "takeLast", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "takeLast", Long.TYPE, TimeUnit.class, Boolean.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "takeLast", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "takeLast", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "takeLast", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE, Integer.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.NON_NEGATIVE, "takeLast", Long.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.NON_NEGATIVE, "takeLast", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.NON_NEGATIVE, "takeLast", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE, Integer.TYPE)); + addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "takeLast", Long.TYPE, Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "takeLast", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "takeLast", Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE, Integer.TYPE)); + + // take last 0 is allowed + addOverride(new ParamOverride(Observable.class, 0, ParamMode.NON_NEGATIVE, "takeLast", Integer.TYPE)); + + // skip last 0 is allowed + addOverride(new ParamOverride(Observable.class, 0, ParamMode.NON_NEGATIVE, "skipLast", Integer.TYPE)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "skipLast", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "skipLast", Long.TYPE, TimeUnit.class, Boolean.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "skipLast", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "skipLast", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "skipLast", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE, Integer.TYPE)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class, Scheduler.class)); + + // negative time is considered as zero time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleLast", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleLast", Long.TYPE, TimeUnit.class, Scheduler.class)); + + + // negative buffer time is considered as zero buffer time + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class, Long.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class, Long.TYPE, Boolean.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class, Scheduler.class, Long.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class, Scheduler.class, Long.TYPE, Boolean.TYPE)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class, Scheduler.class, Long.TYPE, Boolean.TYPE, Integer.TYPE)); + + // ----------------------------------------------------------------------------------- + + ignores = new HashMap>(); + + // needs special param validation due to (long)start + end - 1 <= Integer.MAX_VALUE + addIgnore(new ParamIgnore(Flowable.class, "range", Integer.TYPE, Integer.TYPE)); + addIgnore(new ParamIgnore(Flowable.class, "rangeLong", Long.TYPE, Long.TYPE)); + addIgnore(new ParamIgnore(Flowable.class, "intervalRange", Long.TYPE, Long.TYPE, Long.TYPE, TimeUnit.class)); + addIgnore(new ParamIgnore(Flowable.class, "intervalRange", Long.TYPE, Long.TYPE, Long.TYPE, Long.TYPE, TimeUnit.class)); + addIgnore(new ParamIgnore(Flowable.class, "intervalRange", Long.TYPE, Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + addIgnore(new ParamIgnore(Flowable.class, "intervalRange", Long.TYPE, Long.TYPE, Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + + addIgnore(new ParamIgnore(Flowable.class, "unsafeCreate", Publisher.class)); + + // needs special param validation due to (long)start + end - 1 <= Integer.MAX_VALUE + addIgnore(new ParamIgnore(Observable.class, "range", Integer.TYPE, Integer.TYPE)); + addIgnore(new ParamIgnore(Observable.class, "rangeLong", Long.TYPE, Long.TYPE)); + addIgnore(new ParamIgnore(Observable.class, "intervalRange", Long.TYPE, Long.TYPE, Long.TYPE, TimeUnit.class)); + addIgnore(new ParamIgnore(Observable.class, "intervalRange", Long.TYPE, Long.TYPE, Long.TYPE, Long.TYPE, TimeUnit.class)); + addIgnore(new ParamIgnore(Observable.class, "intervalRange", Long.TYPE, Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + addIgnore(new ParamIgnore(Observable.class, "intervalRange", Long.TYPE, Long.TYPE, Long.TYPE, Long.TYPE, TimeUnit.class, Scheduler.class)); + + addIgnore(new ParamIgnore(Observable.class, "unsafeCreate", ObservableSource.class)); + + addIgnore(new ParamIgnore(Maybe.class, "unsafeCreate", MaybeSource.class)); + + addIgnore(new ParamIgnore(Single.class, "unsafeCreate", SingleSource.class)); + + addIgnore(new ParamIgnore(Completable.class, "unsafeCreate", CompletableSource.class)); + + // ----------------------------------------------------------------------------------- + + defaultValues = new HashMap, Object>(); + + defaultValues.put(Publisher.class, new NeverPublisher()); + defaultValues.put(Flowable.class, new NeverPublisher()); + + defaultValues.put(ObservableSource.class, new NeverObservable()); + defaultValues.put(Observable.class, new NeverObservable()); + + defaultValues.put(SingleSource.class, new NeverSingle()); + defaultValues.put(Single.class, new NeverSingle()); + + defaultValues.put(MaybeSource.class, new NeverMaybe()); + defaultValues.put(Maybe.class, new NeverMaybe()); + + defaultValues.put(CompletableSource.class, new NeverCompletable()); + defaultValues.put(Completable.class, new NeverCompletable()); + + defaultValues.put(Action.class, Functions.EMPTY_ACTION); + defaultValues.put(Runnable.class, Functions.EMPTY_RUNNABLE); + defaultValues.put(Consumer.class, Functions.emptyConsumer()); + defaultValues.put(LongConsumer.class, Functions.EMPTY_LONG_CONSUMER); + defaultValues.put(Function.class, Functions.justFunction(1)); + defaultValues.put(Callable.class, Functions.justCallable(1)); + defaultValues.put(Iterable.class, Collections.emptyList()); + defaultValues.put(Object.class, 1); + defaultValues.put(Class.class, Integer.class); + Object af = new AllFunctionals(); + for (Class interfaces : AllFunctionals.class.getInterfaces()) { + defaultValues.put(interfaces, af); + } + defaultValues.put(TimeUnit.class, TimeUnit.SECONDS); + defaultValues.put(Scheduler.class, Schedulers.single()); + defaultValues.put(BackpressureStrategy.class, BackpressureStrategy.MISSING); + defaultValues.put(BackpressureOverflowStrategy.class, BackpressureOverflowStrategy.ERROR); + defaultValues.put(Throwable.class, new TestException()); + + defaultValues.put(Publisher[].class, new Publisher[] { new NeverPublisher(), new NeverPublisher() }); + defaultValues.put(ObservableSource[].class, new ObservableSource[] { new NeverObservable(), new NeverObservable() }); + defaultValues.put(SingleSource[].class, new SingleSource[] { new NeverSingle(), new NeverSingle() }); + defaultValues.put(MaybeSource[].class, new MaybeSource[] { new NeverMaybe(), new NeverMaybe() }); + defaultValues.put(CompletableSource[].class, new CompletableSource[] { new NeverCompletable(), new NeverCompletable() }); + + defaultValues.put(Object[].class, new Object[] { new Object(), new Object() }); + defaultValues.put(Future.class, new FutureTask(Functions.EMPTY_RUNNABLE, 1)); + + // ----------------------------------------------------------------------------------- + + defaultInstances = new HashMap, List>(); + +// addDefaultInstance(Flowable.class, Flowable.empty(), "Empty()"); +// addDefaultInstance(Flowable.class, Flowable.empty().hide(), "Empty().Hide()"); + addDefaultInstance(Flowable.class, Flowable.just(1), "Just(1)"); + addDefaultInstance(Flowable.class, Flowable.just(1).hide(), "Just(1).Hide()"); +// addDefaultInstance(Flowable.class, Flowable.range(1, 3), "Range(1, 3)"); +// addDefaultInstance(Flowable.class, Flowable.range(1, 3).hide(), "Range(1, 3).Hide()"); + +// addDefaultInstance(Observable.class, Observable.empty(), "Empty()"); +// addDefaultInstance(Observable.class, Observable.empty().hide(), "Empty().Hide()"); + addDefaultInstance(Observable.class, Observable.just(1), "Just(1)"); + addDefaultInstance(Observable.class, Observable.just(1).hide(), "Just(1).Hide()"); +// addDefaultInstance(Observable.class, Observable.range(1, 3), "Range(1, 3)"); +// addDefaultInstance(Observable.class, Observable.range(1, 3).hide(), "Range(1, 3).Hide()"); + + addDefaultInstance(Completable.class, Completable.complete(), "Complete()"); + addDefaultInstance(Completable.class, Completable.complete().hide(), "Complete().hide()"); + + addDefaultInstance(Single.class, Single.just(1), "Just(1)"); + addDefaultInstance(Single.class, Single.just(1).hide(), "Just(1).Hide()"); + + addDefaultInstance(Maybe.class, Maybe.just(1), "Just(1)"); + addDefaultInstance(Maybe.class, Maybe.just(1).hide(), "Just(1).Hide()"); + } + + static void addIgnore(ParamIgnore ignore) { + String key = ignore.toString(); + List list = ignores.get(key); + if (list == null) { + list = new ArrayList(); + ignores.put(key, list); + } + list.add(ignore); + } + + static void addOverride(ParamOverride ignore) { + String key = ignore.toString(); + List list = overrides.get(key); + if (list == null) { + list = new ArrayList(); + overrides.put(key, list); + } + list.add(ignore); + } + + static void addDefaultInstance(Class clazz, Object o, String tag) { + List list = defaultInstances.get(clazz); + if (list == null) { + list = new ArrayList(); + defaultInstances.put(clazz, list); + } + list.add(o); + list.add(tag); + } + + Object defaultPrimitive(Class clazz, ParamOverride override) { + if (Integer.TYPE == clazz) { + if (override != null) { + return 0; + } + return 1; + } + + if (Long.TYPE == clazz) { + if (override != null) { + return 0L; + } + return 1L; + } + + if (Boolean.TYPE == clazz) { + return true; + } + + return null; + } + + void addCheckPrimitive(Class clazz, ParamOverride override, List values) { + if (Integer.TYPE == clazz) { + values.add(-2); + values.add(override != null && override.mode == ParamMode.ANY); + values.add(-1); + values.add(override != null && override.mode == ParamMode.ANY); + values.add(0); + values.add(override != null); + values.add(1); + values.add(true); // should succeed + values.add(2); + values.add(true); + } + + if (Long.TYPE == clazz) { + values.add(-2L); + values.add(override != null && override.mode == ParamMode.ANY); + values.add(-1L); + values.add(override != null && override.mode == ParamMode.ANY); + values.add(0L); + values.add(override != null); + values.add(1L); + values.add(true); // should succeed + values.add(2L); + values.add(true); + } + + if (Boolean.TYPE == clazz) { + values.add(false); + values.add(true); + values.add(true); + values.add(true); + } + } + + void checkClass(Class clazz) { + List errors = TestHelper.trackPluginErrors(); + try { + StringBuilder b = new StringBuilder(); + int fail = 0; + + outer: + for (Method m : clazz.getMethods()) { + if (m.getDeclaringClass() != clazz) { + continue; + } + + String key = clazz.getName() + " " + m.getName(); + + List ignoreList = ignores.get(key); + if (ignoreList != null) { + for (ParamIgnore e : ignoreList) { + if (Arrays.equals(e.arguments, m.getParameterTypes())) { + System.out.println("CheckClass - ignore: " + m); + continue outer; + } + } + } + + List overrideList = overrides.get(key); + + List baseObjects = new ArrayList(); + + if ((m.getModifiers() & Modifier.STATIC) != 0) { + baseObjects.add(null); + baseObjects.add("NULL"); + } else { + List defaultInstancesList = defaultInstances.get(clazz); + if (defaultInstancesList == null) { + b.append("\r\nNo default instances for " + clazz); + fail++; + continue outer; + } + baseObjects.addAll(defaultInstancesList); + } + + for (int ii = 0; ii < baseObjects.size(); ii += 2) { + Object baseObject = baseObjects.get(ii); + Object tag = baseObjects.get(ii + 1); + Class[] params = m.getParameterTypes(); + int n = params.length; + + for (int i = 0; i < n; i++) { + ParamOverride overrideEntry = null; + if (overrideList != null) { + for (ParamOverride e : overrideList) { + if (e.index == i && Arrays.equals(e.arguments, params)) { + overrideEntry = e; + break; + } + } + } + + Class entryClass = params[i]; + + Object[] callParams = new Object[n]; + + for (int j = 0; j < n; j++) { + if (j != i) { + if (params[j].isPrimitive()) { + ParamOverride overrideParam = null; + if (overrideList != null) { + for (ParamOverride e : overrideList) { + if (e.index == j && Arrays.equals(e.arguments, params)) { + overrideParam = e; + break; + } + } + } + Object def = defaultPrimitive(params[j], overrideParam); + if (def == null) { + b.append("\r\nMissing default non-null value for " + m + " # " + j + " (" + params[j] + ")"); + fail++; + continue outer; + } + callParams[j] = def; + } else { + Object def = defaultValues.get(params[j]); + if (def == null) { + b.append("\r\nMissing default non-null value for " + m + " # " + j + " (" + params[j] + ")"); + fail++; + continue outer; + } + callParams[j] = def; + } + } + } + + List entryValues = new ArrayList(); + + if (entryClass.isPrimitive()) { + addCheckPrimitive(params[i], overrideEntry, entryValues); + } else { + entryValues.add(null); + entryValues.add(overrideEntry != null && overrideEntry.mode == ParamMode.ANY); + + Object def = defaultValues.get(params[i]); + if (def == null) { + b.append("\r\nMissing default non-null value for " + m + " # " + i + " (" + params[i] + ")"); + fail++; + continue outer; + } + entryValues.add(def); + entryValues.add(true); + } + + for (int k = 0; k < entryValues.size(); k += 2) { + Object[] callParams2 = callParams.clone(); + + Object p = entryValues.get(k); + callParams2[i] = p; + boolean shouldSucceed = (Boolean)entryValues.get(k + 1); + + boolean success = false; + Throwable error = null; + errors.clear(); + try { + m.invoke(baseObject, callParams2); + success = true; + } catch (Throwable ex) { + // let it fail + error = ex; + } + + if (success != shouldSucceed) { + fail++; + if (shouldSucceed) { + b.append("\r\nFailed (should have succeeded): " + m + " # " + i + " = " + p + ", tag = " + tag + ", params = " + Arrays.toString(callParams2)); + b.append("\r\n ").append(error); + if (error.getCause() != null) { + b.append("\r\n ").append(error.getCause()); + } + } else { + b.append("\r\nNo failure (should have failed): " + m + " # " + i + " = " + p + ", tag = " + tag + ", params = " + Arrays.toString(callParams2)); + } + continue outer; + } + if (!errors.isEmpty()) { + fail++; + b.append("\r\nUndeliverable errors:"); + for (Throwable err : errors) { + b.append("\r\n ").append(err); + if (err.getCause() != null) { + b.append("\r\n ").append(err.getCause()); + } + } + continue outer; + } + } + } + } + } + + if (fail != 0) { + throw new AssertionError("Parameter validation problems: " + fail + b.toString()); + } + } finally { + RxJavaPlugins.reset(); + } + } + + @SuppressWarnings("rawtypes") + static final class AllFunctionals + implements BiFunction, BiConsumer, + Predicate, BiPredicate, BooleanSupplier, + Function3, Function4, Function5, Function6, Function7, Function8, Function9, + FlowableOnSubscribe, ObservableOnSubscribe, SingleOnSubscribe, MaybeOnSubscribe, CompletableOnSubscribe, + FlowableTransformer, ObservableTransformer, SingleTransformer, MaybeTransformer, CompletableTransformer, + Subscriber, FlowableSubscriber, Observer, SingleObserver, MaybeObserver, CompletableObserver, + FlowableOperator, ObservableOperator, SingleOperator, MaybeOperator, CompletableOperator, + Comparator + { + + @Override + public Object apply(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6, Object t7, Object t8, + Object t9) throws Exception { + return null; + } + + @Override + public Object apply(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6, Object t7, Object t8) + throws Exception { + return null; + } + + @Override + public Object apply(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6, Object t7) + throws Exception { + return null; + } + + @Override + public Object apply(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6) throws Exception { + return null; + } + + @Override + public Object apply(Object t1, Object t2, Object t3, Object t4, Object t5) throws Exception { + return null; + } + + @Override + public Object apply(Object t1, Object t2, Object t3, Object t4) throws Exception { + return null; + } + + @Override + public Object apply(Object t1, Object t2, Object t3) throws Exception { + return null; + } + + @Override + public void accept(Object t1, Object t2) throws Exception { + } + + @Override + public Object apply(Object t1, Object t2) throws Exception { + return null; + } + + @Override + public void subscribe(CompletableEmitter e) throws Exception { + } + + @Override + public void subscribe(MaybeEmitter e) throws Exception { + } + + @Override + public void subscribe(SingleEmitter e) throws Exception { + } + + @Override + public void subscribe(ObservableEmitter e) throws Exception { + } + + @Override + public void subscribe(FlowableEmitter e) throws Exception { + } + + @Override + public boolean test(Object t1, Object t2) throws Exception { + return false; + } + + @Override + public boolean test(Object t) throws Exception { + return false; + } + + @Override + public void onSuccess(Object t) { + } + + @Override + public void onSubscribe(Disposable d) { + } + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Object t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + public CompletableSource apply(Completable upstream) { + return upstream; + } + + @Override + public MaybeSource apply(Maybe upstream) { + return upstream; + } + + @Override + public SingleSource apply(Single upstream) { + return upstream; + } + + @Override + public ObservableSource apply(Observable upstream) { + return upstream; + } + + @Override + public Publisher apply(Flowable upstream) { + return upstream; + } + + @Override + public CompletableObserver apply(CompletableObserver observer) throws Exception { + return observer; + } + + @Override + public MaybeObserver apply(MaybeObserver observer) throws Exception { + return observer; + } + + @Override + public SingleObserver apply(SingleObserver observer) throws Exception { + return observer; + } + + @Override + public Observer apply(Observer observer) throws Exception { + return observer; + } + + @Override + public Subscriber apply(Subscriber observer) throws Exception { + return observer; + } + + @Override + public boolean getAsBoolean() throws Exception { + return false; + } + + @Override + public int compare(Object o1, Object o2) { + return 0; + } + } + + enum ParamMode { + ANY, + NON_NEGATIVE + } + + static final class ParamIgnore { + final Class clazz; + final String name; + final Class[] arguments; + + ParamIgnore(Class clazz, String name, Class... arguments) { + this.clazz = clazz; + this.name = name; + this.arguments = arguments; + } + + @Override + public String toString() { + return clazz.getName() + " " + name; + } + } + + static final class ParamOverride { + final Class clazz; + final int index; + final ParamMode mode; + final String name; + final Class[] arguments; + + ParamOverride(Class clazz, int index, ParamMode mode, String name, Class... arguments) { + this.clazz = clazz; + this.index = index; + this.mode = mode; + this.name = name; + this.arguments = arguments; + + try { + clazz.getMethod(name, arguments); + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + + @Override + public String toString() { + return clazz.getName() + " " + name; + } + } + + static final class NeverPublisher extends Flowable { + + @Override + public void subscribeActual(Subscriber s) { + // not invoked, the class is a placeholder default value + } + + @Override + public String toString() { + return "NeverFlowable"; + } + } + + static final class NeverObservable extends Observable { + + @Override + public void subscribeActual(Observer s) { + // not invoked, the class is a placeholder default value + } + + @Override + public String toString() { + return "NeverFlowable"; + } + } + + static final class NeverSingle extends Single { + + @Override + public void subscribeActual(SingleObserver s) { + // not invoked, the class is a placeholder default value + } + + @Override + public String toString() { + return "NeverSingle"; + } + } + + static final class NeverMaybe extends Maybe { + + @Override + public void subscribeActual(MaybeObserver s) { + // not invoked, the class is a placeholder default value + } + + @Override + public String toString() { + return "NeverMaybe"; + } + } + static final class NeverCompletable extends Completable { + + @Override + public void subscribeActual(CompletableObserver s) { + // not invoked, the class is a placeholder default value + } + + @Override + public String toString() { + return "NeverCompletable"; + } + } +} diff --git a/src/test/java/io/reactivex/observable/ObservableNullTests.java b/src/test/java/io/reactivex/observable/ObservableNullTests.java index e79eee87ff..c39205f8bc 100644 --- a/src/test/java/io/reactivex/observable/ObservableNullTests.java +++ b/src/test/java/io/reactivex/observable/ObservableNullTests.java @@ -2427,8 +2427,8 @@ public void toSortedListNull() { just1.toSortedList(null); } - @Test - public void toMapKeyNullAllowed() { + @Test(expected = NullPointerException.class) + public void toMapKeyNull() { just1.toMap(null); }